Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use once_cell::sync::OnceCell;
9 : use pageserver_api::models::TenantState;
10 : use pageserver_api::models::{
11 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
12 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
13 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
14 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
15 : PagestreamNblocksResponse, PagestreamProtocolVersion,
16 : };
17 : use pageserver_api::shard::TenantShardId;
18 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
19 : use pq_proto::framed::ConnectionError;
20 : use pq_proto::FeStartupPacket;
21 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
22 : use std::borrow::Cow;
23 : use std::io;
24 : use std::str;
25 : use std::str::FromStr;
26 : use std::sync::Arc;
27 : use std::time::SystemTime;
28 : use std::time::{Duration, Instant};
29 : use tokio::io::AsyncWriteExt;
30 : use tokio::io::{AsyncRead, AsyncWrite};
31 : use tokio::task::JoinHandle;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 : use utils::{
35 : auth::{Claims, Scope, SwappableJwtAuth},
36 : id::{TenantId, TimelineId},
37 : lsn::Lsn,
38 : simple_rcu::RcuReadGuard,
39 : };
40 :
41 : use crate::auth::check_permission;
42 : use crate::basebackup;
43 : use crate::basebackup::BasebackupError;
44 : use crate::config::PageServerConf;
45 : use crate::context::{DownloadBehavior, RequestContext};
46 : use crate::metrics;
47 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
48 : use crate::pgdatadir_mapping::Version;
49 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
50 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
51 : use crate::task_mgr::TaskKind;
52 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
53 : use crate::tenant::mgr::ShardSelector;
54 : use crate::tenant::mgr::TenantManager;
55 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
56 : use crate::tenant::timeline::{self, WaitLsnError};
57 : use crate::tenant::GetTimelineError;
58 : use crate::tenant::PageReconstructError;
59 : use crate::tenant::Timeline;
60 : use pageserver_api::key::rel_block_to_key;
61 : use pageserver_api::reltag::SlruKind;
62 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
63 : use postgres_ffi::BLCKSZ;
64 :
65 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
66 : /// is not yet in state [`TenantState::Active`].
67 : ///
68 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
69 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
70 :
71 : ///////////////////////////////////////////////////////////////////////////////
72 :
73 : pub struct Listener {
74 : cancel: CancellationToken,
75 : /// Cancel the listener task through `listen_cancel` to shut down the listener
76 : /// and get a handle on the existing connections.
77 : task: JoinHandle<Connections>,
78 : }
79 :
80 : pub struct Connections {
81 : cancel: CancellationToken,
82 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
83 : }
84 :
85 0 : pub fn spawn(
86 0 : conf: &'static PageServerConf,
87 0 : tenant_manager: Arc<TenantManager>,
88 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
89 0 : tcp_listener: tokio::net::TcpListener,
90 0 : ) -> Listener {
91 0 : let cancel = CancellationToken::new();
92 0 : let libpq_ctx = RequestContext::todo_child(
93 0 : TaskKind::LibpqEndpointListener,
94 0 : // listener task shouldn't need to download anything. (We will
95 0 : // create a separate sub-contexts for each connection, with their
96 0 : // own download behavior. This context is used only to listen and
97 0 : // accept connections.)
98 0 : DownloadBehavior::Error,
99 0 : );
100 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
101 0 : "libpq listener",
102 0 : libpq_listener_main(
103 0 : tenant_manager,
104 0 : pg_auth,
105 0 : tcp_listener,
106 0 : conf.pg_auth_type,
107 0 : libpq_ctx,
108 0 : cancel.clone(),
109 0 : )
110 0 : .map(anyhow::Ok),
111 0 : ));
112 0 :
113 0 : Listener { cancel, task }
114 0 : }
115 :
116 : impl Listener {
117 0 : pub async fn stop_accepting(self) -> Connections {
118 0 : self.cancel.cancel();
119 0 : self.task
120 0 : .await
121 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
122 0 : }
123 : }
124 : impl Connections {
125 0 : pub(crate) async fn shutdown(self) {
126 0 : let Self { cancel, mut tasks } = self;
127 0 : cancel.cancel();
128 0 : while let Some(res) = tasks.join_next().await {
129 0 : Self::handle_connection_completion(res);
130 0 : }
131 0 : }
132 :
133 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
134 0 : match res {
135 0 : Ok(Ok(())) => {}
136 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
137 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
138 : }
139 0 : }
140 : }
141 :
142 : ///
143 : /// Main loop of the page service.
144 : ///
145 : /// Listens for connections, and launches a new handler task for each.
146 : ///
147 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
148 : /// open connections.
149 : ///
150 0 : pub async fn libpq_listener_main(
151 0 : tenant_manager: Arc<TenantManager>,
152 0 : auth: Option<Arc<SwappableJwtAuth>>,
153 0 : listener: tokio::net::TcpListener,
154 0 : auth_type: AuthType,
155 0 : listener_ctx: RequestContext,
156 0 : listener_cancel: CancellationToken,
157 0 : ) -> Connections {
158 0 : let connections_cancel = CancellationToken::new();
159 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
160 :
161 : loop {
162 0 : let accepted = tokio::select! {
163 : biased;
164 : _ = listener_cancel.cancelled() => break,
165 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
166 : let res = next.expect("we dont poll while empty");
167 : Connections::handle_connection_completion(res);
168 : continue;
169 : }
170 : accepted = listener.accept() => accepted,
171 : };
172 :
173 0 : match accepted {
174 0 : Ok((socket, peer_addr)) => {
175 0 : // Connection established. Spawn a new task to handle it.
176 0 : debug!("accepted connection from {}", peer_addr);
177 0 : let local_auth = auth.clone();
178 0 : let connection_ctx = listener_ctx
179 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
180 0 : connection_handler_tasks.spawn(page_service_conn_main(
181 0 : tenant_manager.clone(),
182 0 : local_auth,
183 0 : socket,
184 0 : auth_type,
185 0 : connection_ctx,
186 0 : connections_cancel.child_token(),
187 0 : ));
188 : }
189 0 : Err(err) => {
190 0 : // accept() failed. Log the error, and loop back to retry on next connection.
191 0 : error!("accept() failed: {:?}", err);
192 : }
193 : }
194 : }
195 :
196 0 : debug!("page_service listener loop terminated");
197 :
198 0 : Connections {
199 0 : cancel: connections_cancel,
200 0 : tasks: connection_handler_tasks,
201 0 : }
202 0 : }
203 :
204 : type ConnectionHandlerResult = anyhow::Result<()>;
205 :
206 0 : #[instrument(skip_all, fields(peer_addr))]
207 : async fn page_service_conn_main(
208 : tenant_manager: Arc<TenantManager>,
209 : auth: Option<Arc<SwappableJwtAuth>>,
210 : socket: tokio::net::TcpStream,
211 : auth_type: AuthType,
212 : connection_ctx: RequestContext,
213 : cancel: CancellationToken,
214 : ) -> ConnectionHandlerResult {
215 : let _guard = LIVE_CONNECTIONS
216 : .with_label_values(&["page_service"])
217 : .guard();
218 :
219 : socket
220 : .set_nodelay(true)
221 : .context("could not set TCP_NODELAY")?;
222 :
223 : let peer_addr = socket.peer_addr().context("get peer address")?;
224 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
225 :
226 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
227 : // - long enough for most valid compute connections
228 : // - less than infinite to stop us from "leaking" connections to long-gone computes
229 : //
230 : // no write timeout is used, because the kernel is assumed to error writes after some time.
231 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
232 :
233 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
234 0 : let socket_timeout_ms = (|| {
235 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
236 : // Exponential distribution for simulating
237 : // poor network conditions, expect about avg_timeout_ms to be around 15
238 : // in tests
239 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
240 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
241 0 : let u = rand::random::<f32>();
242 0 : ((1.0 - u).ln() / (-avg)) as u64
243 : } else {
244 0 : default_timeout_ms
245 : }
246 0 : });
247 0 : default_timeout_ms
248 : })();
249 :
250 : // A timeout here does not mean the client died, it can happen if it's just idle for
251 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
252 : // they reconnect.
253 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
254 : let socket = std::pin::pin!(socket);
255 :
256 : fail::fail_point!("ps::connection-start::pre-login");
257 :
258 : // XXX: pgbackend.run() should take the connection_ctx,
259 : // and create a child per-query context when it invokes process_query.
260 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
261 : // and create the per-query context in process_query ourselves.
262 : let mut conn_handler =
263 : PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone());
264 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
265 :
266 : match pgbackend.run(&mut conn_handler, &cancel).await {
267 : Ok(()) => {
268 : // we've been requested to shut down
269 : Ok(())
270 : }
271 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
272 : if is_expected_io_error(&io_error) {
273 : info!("Postgres client disconnected ({io_error})");
274 : Ok(())
275 : } else {
276 : Err(io_error).context("Postgres connection error")
277 : }
278 : }
279 : other => other.context("Postgres query error"),
280 : }
281 : }
282 :
283 : struct PageServerHandler {
284 : auth: Option<Arc<SwappableJwtAuth>>,
285 : claims: Option<Claims>,
286 :
287 : /// The context created for the lifetime of the connection
288 : /// services by this PageServerHandler.
289 : /// For each query received over the connection,
290 : /// `process_query` creates a child context from this one.
291 : connection_ctx: RequestContext,
292 :
293 : cancel: CancellationToken,
294 :
295 : timeline_handles: TimelineHandles,
296 : }
297 :
298 : struct TimelineHandles {
299 : wrapper: TenantManagerWrapper,
300 : /// Note on size: the typical size of this map is 1. The largest size we expect
301 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
302 : /// or the ratio used when splitting shards (i.e. how many children created from one)
303 : /// parent shard, where a "large" number might be ~8.
304 : handles: timeline::handle::Cache<TenantManagerTypes>,
305 : }
306 :
307 : impl TimelineHandles {
308 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
309 0 : Self {
310 0 : wrapper: TenantManagerWrapper {
311 0 : tenant_manager,
312 0 : tenant_id: OnceCell::new(),
313 0 : },
314 0 : handles: Default::default(),
315 0 : }
316 0 : }
317 0 : async fn get(
318 0 : &mut self,
319 0 : tenant_id: TenantId,
320 0 : timeline_id: TimelineId,
321 0 : shard_selector: ShardSelector,
322 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
323 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
324 0 : return Err(GetActiveTimelineError::Tenant(
325 0 : GetActiveTenantError::SwitchedTenant,
326 0 : ));
327 0 : }
328 0 : self.handles
329 0 : .get(timeline_id, shard_selector, &self.wrapper)
330 0 : .await
331 0 : .map_err(|e| match e {
332 0 : timeline::handle::GetError::TenantManager(e) => e,
333 : timeline::handle::GetError::TimelineGateClosed => {
334 0 : trace!("timeline gate closed");
335 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
336 : }
337 : timeline::handle::GetError::PerTimelineStateShutDown => {
338 0 : trace!("per-timeline state shut down");
339 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
340 : }
341 0 : })
342 0 : }
343 : }
344 :
345 : pub(crate) struct TenantManagerWrapper {
346 : tenant_manager: Arc<TenantManager>,
347 : // We do not support switching tenant_id on a connection at this point.
348 : // We can can add support for this later if needed without changing
349 : // the protocol.
350 : tenant_id: once_cell::sync::OnceCell<TenantId>,
351 : }
352 :
353 : #[derive(Debug)]
354 : pub(crate) struct TenantManagerTypes;
355 :
356 : impl timeline::handle::Types for TenantManagerTypes {
357 : type TenantManagerError = GetActiveTimelineError;
358 : type TenantManager = TenantManagerWrapper;
359 : type Timeline = Arc<Timeline>;
360 : }
361 :
362 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
363 0 : fn gate(&self) -> &utils::sync::gate::Gate {
364 0 : &self.gate
365 0 : }
366 :
367 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
368 0 : Timeline::shard_timeline_id(self)
369 0 : }
370 :
371 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
372 0 : &self.handles
373 0 : }
374 :
375 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
376 0 : Timeline::get_shard_identity(self)
377 0 : }
378 : }
379 :
380 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
381 0 : async fn resolve(
382 0 : &self,
383 0 : timeline_id: TimelineId,
384 0 : shard_selector: ShardSelector,
385 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
386 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
387 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
388 0 : let wait_start = Instant::now();
389 0 : let deadline = wait_start + timeout;
390 0 : let tenant_shard = loop {
391 0 : let resolved = self
392 0 : .tenant_manager
393 0 : .resolve_attached_shard(tenant_id, shard_selector);
394 0 : match resolved {
395 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
396 : ShardResolveResult::NotFound => {
397 0 : return Err(GetActiveTimelineError::Tenant(
398 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
399 0 : ));
400 : }
401 0 : ShardResolveResult::InProgress(barrier) => {
402 : // We can't authoritatively answer right now: wait for InProgress state
403 : // to end, then try again
404 : tokio::select! {
405 : _ = barrier.wait() => {
406 : // The barrier completed: proceed around the loop to try looking up again
407 : },
408 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
409 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
410 : latest_state: None,
411 : wait_time: timeout,
412 : }));
413 : }
414 0 : }
415 0 : }
416 0 : };
417 0 : };
418 :
419 0 : tracing::debug!("Waiting for tenant to enter active state...");
420 0 : tenant_shard
421 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
422 0 : .await
423 0 : .map_err(GetActiveTimelineError::Tenant)?;
424 :
425 0 : let timeline = tenant_shard
426 0 : .get_timeline(timeline_id, true)
427 0 : .map_err(GetActiveTimelineError::Timeline)?;
428 0 : set_tracing_field_shard_id(&timeline);
429 0 : Ok(timeline)
430 0 : }
431 : }
432 :
433 0 : #[derive(thiserror::Error, Debug)]
434 : enum PageStreamError {
435 : /// We encountered an error that should prompt the client to reconnect:
436 : /// in practice this means we drop the connection without sending a response.
437 : #[error("Reconnect required: {0}")]
438 : Reconnect(Cow<'static, str>),
439 :
440 : /// We were instructed to shutdown while processing the query
441 : #[error("Shutting down")]
442 : Shutdown,
443 :
444 : /// Something went wrong reading a page: this likely indicates a pageserver bug
445 : #[error("Read error")]
446 : Read(#[source] PageReconstructError),
447 :
448 : /// Ran out of time waiting for an LSN
449 : #[error("LSN timeout: {0}")]
450 : LsnTimeout(WaitLsnError),
451 :
452 : /// The entity required to serve the request (tenant or timeline) is not found,
453 : /// or is not found in a suitable state to serve a request.
454 : #[error("Not found: {0}")]
455 : NotFound(Cow<'static, str>),
456 :
457 : /// Request asked for something that doesn't make sense, like an invalid LSN
458 : #[error("Bad request: {0}")]
459 : BadRequest(Cow<'static, str>),
460 : }
461 :
462 : impl From<PageReconstructError> for PageStreamError {
463 0 : fn from(value: PageReconstructError) -> Self {
464 0 : match value {
465 0 : PageReconstructError::Cancelled => Self::Shutdown,
466 0 : e => Self::Read(e),
467 : }
468 0 : }
469 : }
470 :
471 : impl From<GetActiveTimelineError> for PageStreamError {
472 0 : fn from(value: GetActiveTimelineError) -> Self {
473 0 : match value {
474 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
475 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
476 : TenantState::Stopping { .. },
477 : ))
478 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
479 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
480 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
481 : }
482 0 : }
483 : }
484 :
485 : impl From<WaitLsnError> for PageStreamError {
486 0 : fn from(value: WaitLsnError) -> Self {
487 0 : match value {
488 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
489 0 : WaitLsnError::Shutdown => Self::Shutdown,
490 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
491 : }
492 0 : }
493 : }
494 :
495 : impl From<WaitLsnError> for QueryError {
496 0 : fn from(value: WaitLsnError) -> Self {
497 0 : match value {
498 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
499 0 : WaitLsnError::Shutdown => Self::Shutdown,
500 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
501 : }
502 0 : }
503 : }
504 :
505 : impl PageServerHandler {
506 0 : pub fn new(
507 0 : tenant_manager: Arc<TenantManager>,
508 0 : auth: Option<Arc<SwappableJwtAuth>>,
509 0 : connection_ctx: RequestContext,
510 0 : cancel: CancellationToken,
511 0 : ) -> Self {
512 0 : PageServerHandler {
513 0 : auth,
514 0 : claims: None,
515 0 : connection_ctx,
516 0 : timeline_handles: TimelineHandles::new(tenant_manager),
517 0 : cancel,
518 0 : }
519 0 : }
520 :
521 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
522 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
523 : /// cancellation if there aren't any timelines in the cache.
524 : ///
525 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
526 : /// timeline cancellation token.
527 0 : async fn flush_cancellable<IO>(
528 0 : &self,
529 0 : pgb: &mut PostgresBackend<IO>,
530 0 : cancel: &CancellationToken,
531 0 : ) -> Result<(), QueryError>
532 0 : where
533 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
534 0 : {
535 : tokio::select!(
536 : flush_r = pgb.flush() => {
537 : Ok(flush_r?)
538 : },
539 : _ = cancel.cancelled() => {
540 : Err(QueryError::Shutdown)
541 : }
542 : )
543 0 : }
544 :
545 : /// Pagestream sub-protocol handler.
546 : ///
547 : /// It is a simple request-response protocol inside a COPYBOTH session.
548 : ///
549 : /// # Coding Discipline
550 : ///
551 : /// Coding discipline within this function: all interaction with the `pgb` connection
552 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
553 : /// This is so that we can shutdown page_service quickly.
554 0 : #[instrument(skip_all)]
555 : async fn handle_pagerequests<IO>(
556 : &mut self,
557 : pgb: &mut PostgresBackend<IO>,
558 : tenant_id: TenantId,
559 : timeline_id: TimelineId,
560 : protocol_version: PagestreamProtocolVersion,
561 : ctx: RequestContext,
562 : ) -> Result<(), QueryError>
563 : where
564 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
565 : {
566 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
567 :
568 : // switch client to COPYBOTH
569 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
570 : tokio::select! {
571 : biased;
572 : _ = self.cancel.cancelled() => {
573 : return Err(QueryError::Shutdown)
574 : }
575 : res = pgb.flush() => {
576 : res?;
577 : }
578 : }
579 :
580 : loop {
581 : // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
582 : let msg = tokio::select! {
583 : biased;
584 : _ = self.cancel.cancelled() => {
585 : return Err(QueryError::Shutdown)
586 : }
587 : msg = pgb.read_message() => { msg }
588 : };
589 : let copy_data_bytes = match msg? {
590 : Some(FeMessage::CopyData(bytes)) => bytes,
591 : Some(FeMessage::Terminate) => break,
592 : Some(m) => {
593 : return Err(QueryError::Other(anyhow::anyhow!(
594 : "unexpected message: {m:?} during COPY"
595 : )));
596 : }
597 : None => break, // client disconnected
598 : };
599 :
600 : trace!("query: {copy_data_bytes:?}");
601 : fail::fail_point!("ps::handle-pagerequest-message");
602 :
603 : // parse request
604 : let neon_fe_msg =
605 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
606 :
607 : // invoke handler function
608 : let (handler_result, span) = match neon_fe_msg {
609 : PagestreamFeMessage::Exists(req) => {
610 : fail::fail_point!("ps::handle-pagerequest-message::exists");
611 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
612 : (
613 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
614 : .instrument(span.clone())
615 : .await,
616 : span,
617 : )
618 : }
619 : PagestreamFeMessage::Nblocks(req) => {
620 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
621 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
622 : (
623 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
624 : .instrument(span.clone())
625 : .await,
626 : span,
627 : )
628 : }
629 : PagestreamFeMessage::GetPage(req) => {
630 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
631 : // shard_id is filled in by the handler
632 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
633 : (
634 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
635 : .instrument(span.clone())
636 : .await,
637 : span,
638 : )
639 : }
640 : PagestreamFeMessage::DbSize(req) => {
641 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
642 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
643 : (
644 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
645 : .instrument(span.clone())
646 : .await,
647 : span,
648 : )
649 : }
650 : PagestreamFeMessage::GetSlruSegment(req) => {
651 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
652 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
653 : (
654 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
655 : .instrument(span.clone())
656 : .await,
657 : span,
658 : )
659 : }
660 : };
661 :
662 : // Map handler result to protocol behavior.
663 : // Some handler errors cause exit from pagestream protocol.
664 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
665 : let response_msg = match handler_result {
666 : Err(e) => match &e {
667 : PageStreamError::Shutdown => {
668 : // If we fail to fulfil a request during shutdown, which may be _because_ of
669 : // shutdown, then do not send the error to the client. Instead just drop the
670 : // connection.
671 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
672 : return Err(QueryError::Shutdown);
673 : }
674 : PageStreamError::Reconnect(reason) => {
675 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
676 : return Err(QueryError::Reconnect);
677 : }
678 : PageStreamError::Read(_)
679 : | PageStreamError::LsnTimeout(_)
680 : | PageStreamError::NotFound(_)
681 : | PageStreamError::BadRequest(_) => {
682 : // print the all details to the log with {:#}, but for the client the
683 : // error message is enough. Do not log if shutting down, as the anyhow::Error
684 : // here includes cancellation which is not an error.
685 : let full = utils::error::report_compact_sources(&e);
686 0 : span.in_scope(|| {
687 0 : error!("error reading relation or page version: {full:#}")
688 0 : });
689 : PagestreamBeMessage::Error(PagestreamErrorResponse {
690 : message: e.to_string(),
691 : })
692 : }
693 : },
694 : Ok(response_msg) => response_msg,
695 : };
696 :
697 : // marshal & transmit response message
698 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
699 : tokio::select! {
700 : biased;
701 : _ = self.cancel.cancelled() => {
702 : // We were requested to shut down.
703 : info!("shutdown request received in page handler");
704 : return Err(QueryError::Shutdown)
705 : }
706 : res = pgb.flush() => {
707 : res?;
708 : }
709 : }
710 : }
711 : Ok(())
712 : }
713 :
714 : /// Helper function to handle the LSN from client request.
715 : ///
716 : /// Each GetPage (and Exists and Nblocks) request includes information about
717 : /// which version of the page is being requested. The primary compute node
718 : /// will always request the latest page version, by setting 'request_lsn' to
719 : /// the last inserted or flushed WAL position, while a standby will request
720 : /// a version at the LSN that it's currently caught up to.
721 : ///
722 : /// In either case, if the page server hasn't received the WAL up to the
723 : /// requested LSN yet, we will wait for it to arrive. The return value is
724 : /// the LSN that should be used to look up the page versions.
725 : ///
726 : /// In addition to the request LSN, each request carries another LSN,
727 : /// 'not_modified_since', which is a hint to the pageserver that the client
728 : /// knows that the page has not been modified between 'not_modified_since'
729 : /// and the request LSN. This allows skipping the wait, as long as the WAL
730 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
731 : /// information about when the page was modified, it will use
732 : /// not_modified_since == lsn. If the client lies and sends a too low
733 : /// not_modified_hint such that there are in fact later page versions, the
734 : /// behavior is undefined: the pageserver may return any of the page versions
735 : /// or an error.
736 0 : async fn wait_or_get_last_lsn(
737 0 : timeline: &Timeline,
738 0 : request_lsn: Lsn,
739 0 : not_modified_since: Lsn,
740 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
741 0 : ctx: &RequestContext,
742 0 : ) -> Result<Lsn, PageStreamError> {
743 0 : let last_record_lsn = timeline.get_last_record_lsn();
744 0 :
745 0 : // Sanity check the request
746 0 : if request_lsn < not_modified_since {
747 0 : return Err(PageStreamError::BadRequest(
748 0 : format!(
749 0 : "invalid request with request LSN {} and not_modified_since {}",
750 0 : request_lsn, not_modified_since,
751 0 : )
752 0 : .into(),
753 0 : ));
754 0 : }
755 0 :
756 0 : if request_lsn < **latest_gc_cutoff_lsn {
757 : // Check explicitly for INVALID just to get a less scary error message if the
758 : // request is obviously bogus
759 0 : return Err(if request_lsn == Lsn::INVALID {
760 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
761 : } else {
762 0 : PageStreamError::BadRequest(format!(
763 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
764 0 : request_lsn, **latest_gc_cutoff_lsn
765 0 : ).into())
766 : });
767 0 : }
768 0 :
769 0 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
770 0 : if not_modified_since > last_record_lsn {
771 0 : timeline
772 0 : .wait_lsn(
773 0 : not_modified_since,
774 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
775 0 : ctx,
776 0 : )
777 0 : .await?;
778 : // Since we waited for 'not_modified_since' to arrive, that is now the last
779 : // record LSN. (Or close enough for our purposes; the last-record LSN can
780 : // advance immediately after we return anyway)
781 0 : Ok(not_modified_since)
782 : } else {
783 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
784 : // here instead. That would give the same result, since we know that there
785 : // haven't been any modifications since 'not_modified_since'. Using an older
786 : // LSN might be faster, because that could allow skipping recent layers when
787 : // finding the page. However, we have historically used 'last_record_lsn', so
788 : // stick to that for now.
789 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
790 : }
791 0 : }
792 :
793 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
794 : async fn handle_make_lsn_lease<IO>(
795 : &mut self,
796 : pgb: &mut PostgresBackend<IO>,
797 : tenant_shard_id: TenantShardId,
798 : timeline_id: TimelineId,
799 : lsn: Lsn,
800 : ctx: &RequestContext,
801 : ) -> Result<(), QueryError>
802 : where
803 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
804 : {
805 : let timeline = self
806 : .timeline_handles
807 : .get(
808 : tenant_shard_id.tenant_id,
809 : timeline_id,
810 : ShardSelector::Known(tenant_shard_id.to_index()),
811 : )
812 : .await?;
813 : set_tracing_field_shard_id(&timeline);
814 :
815 : let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
816 : let valid_until = lease
817 : .valid_until
818 : .duration_since(SystemTime::UNIX_EPOCH)
819 0 : .map_err(|e| QueryError::Other(e.into()))?;
820 :
821 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
822 : b"valid_until",
823 : )]))?
824 : .write_message_noflush(&BeMessage::DataRow(&[Some(
825 : &valid_until.as_millis().to_be_bytes(),
826 : )]))?
827 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
828 :
829 : Ok(())
830 : }
831 :
832 0 : #[instrument(skip_all, fields(shard_id))]
833 : async fn handle_get_rel_exists_request(
834 : &mut self,
835 : tenant_id: TenantId,
836 : timeline_id: TimelineId,
837 : req: &PagestreamExistsRequest,
838 : ctx: &RequestContext,
839 : ) -> Result<PagestreamBeMessage, PageStreamError> {
840 : let timeline = self
841 : .timeline_handles
842 : .get(tenant_id, timeline_id, ShardSelector::Zero)
843 : .await?;
844 : let _timer = timeline
845 : .query_metrics
846 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
847 :
848 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
849 : let lsn = Self::wait_or_get_last_lsn(
850 : &timeline,
851 : req.request_lsn,
852 : req.not_modified_since,
853 : &latest_gc_cutoff_lsn,
854 : ctx,
855 : )
856 : .await?;
857 :
858 : let exists = timeline
859 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
860 : .await?;
861 :
862 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
863 : exists,
864 : }))
865 : }
866 :
867 0 : #[instrument(skip_all, fields(shard_id))]
868 : async fn handle_get_nblocks_request(
869 : &mut self,
870 : tenant_id: TenantId,
871 : timeline_id: TimelineId,
872 : req: &PagestreamNblocksRequest,
873 : ctx: &RequestContext,
874 : ) -> Result<PagestreamBeMessage, PageStreamError> {
875 : let timeline = self
876 : .timeline_handles
877 : .get(tenant_id, timeline_id, ShardSelector::Zero)
878 : .await?;
879 :
880 : let _timer = timeline
881 : .query_metrics
882 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
883 :
884 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
885 : let lsn = Self::wait_or_get_last_lsn(
886 : &timeline,
887 : req.request_lsn,
888 : req.not_modified_since,
889 : &latest_gc_cutoff_lsn,
890 : ctx,
891 : )
892 : .await?;
893 :
894 : let n_blocks = timeline
895 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
896 : .await?;
897 :
898 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
899 : n_blocks,
900 : }))
901 : }
902 :
903 0 : #[instrument(skip_all, fields(shard_id))]
904 : async fn handle_db_size_request(
905 : &mut self,
906 : tenant_id: TenantId,
907 : timeline_id: TimelineId,
908 : req: &PagestreamDbSizeRequest,
909 : ctx: &RequestContext,
910 : ) -> Result<PagestreamBeMessage, PageStreamError> {
911 : let timeline = self
912 : .timeline_handles
913 : .get(tenant_id, timeline_id, ShardSelector::Zero)
914 : .await?;
915 :
916 : let _timer = timeline
917 : .query_metrics
918 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
919 :
920 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
921 : let lsn = Self::wait_or_get_last_lsn(
922 : &timeline,
923 : req.request_lsn,
924 : req.not_modified_since,
925 : &latest_gc_cutoff_lsn,
926 : ctx,
927 : )
928 : .await?;
929 :
930 : let total_blocks = timeline
931 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
932 : .await?;
933 : let db_size = total_blocks as i64 * BLCKSZ as i64;
934 :
935 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
936 : db_size,
937 : }))
938 : }
939 :
940 0 : #[instrument(skip_all, fields(shard_id))]
941 : async fn handle_get_page_at_lsn_request(
942 : &mut self,
943 : tenant_id: TenantId,
944 : timeline_id: TimelineId,
945 : req: &PagestreamGetPageRequest,
946 : ctx: &RequestContext,
947 : ) -> Result<PagestreamBeMessage, PageStreamError> {
948 : let timeline = match self
949 : .timeline_handles
950 : .get(
951 : tenant_id,
952 : timeline_id,
953 : ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
954 : )
955 : .await
956 : {
957 : Ok(tl) => tl,
958 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
959 : // We already know this tenant exists in general, because we resolved it at
960 : // start of connection. Getting a NotFound here indicates that the shard containing
961 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
962 : // mapping is out of date.
963 : //
964 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
965 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
966 : // and talk to a different pageserver.
967 : return Err(PageStreamError::Reconnect(
968 : "getpage@lsn request routed to wrong shard".into(),
969 : ));
970 : }
971 : Err(e) => return Err(e.into()),
972 : };
973 :
974 : let _timer = timeline
975 : .query_metrics
976 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
977 :
978 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
979 : let lsn = Self::wait_or_get_last_lsn(
980 : &timeline,
981 : req.request_lsn,
982 : req.not_modified_since,
983 : &latest_gc_cutoff_lsn,
984 : ctx,
985 : )
986 : .await?;
987 :
988 : let page = timeline
989 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
990 : .await?;
991 :
992 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
993 : page,
994 : }))
995 : }
996 :
997 0 : #[instrument(skip_all, fields(shard_id))]
998 : async fn handle_get_slru_segment_request(
999 : &mut self,
1000 : tenant_id: TenantId,
1001 : timeline_id: TimelineId,
1002 : req: &PagestreamGetSlruSegmentRequest,
1003 : ctx: &RequestContext,
1004 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1005 : let timeline = self
1006 : .timeline_handles
1007 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1008 : .await?;
1009 :
1010 : let _timer = timeline
1011 : .query_metrics
1012 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1013 :
1014 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1015 : let lsn = Self::wait_or_get_last_lsn(
1016 : &timeline,
1017 : req.request_lsn,
1018 : req.not_modified_since,
1019 : &latest_gc_cutoff_lsn,
1020 : ctx,
1021 : )
1022 : .await?;
1023 :
1024 : let kind = SlruKind::from_repr(req.kind)
1025 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1026 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1027 :
1028 : Ok(PagestreamBeMessage::GetSlruSegment(
1029 : PagestreamGetSlruSegmentResponse { segment },
1030 : ))
1031 : }
1032 :
1033 : /// Note on "fullbackup":
1034 : /// Full basebackups should only be used for debugging purposes.
1035 : /// Originally, it was introduced to enable breaking storage format changes,
1036 : /// but that is not applicable anymore.
1037 : ///
1038 : /// # Coding Discipline
1039 : ///
1040 : /// Coding discipline within this function: all interaction with the `pgb` connection
1041 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1042 : /// This is so that we can shutdown page_service quickly.
1043 : ///
1044 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1045 : /// to connection cancellation.
1046 : #[allow(clippy::too_many_arguments)]
1047 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1048 : async fn handle_basebackup_request<IO>(
1049 : &mut self,
1050 : pgb: &mut PostgresBackend<IO>,
1051 : tenant_id: TenantId,
1052 : timeline_id: TimelineId,
1053 : lsn: Option<Lsn>,
1054 : prev_lsn: Option<Lsn>,
1055 : full_backup: bool,
1056 : gzip: bool,
1057 : ctx: &RequestContext,
1058 : ) -> Result<(), QueryError>
1059 : where
1060 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1061 : {
1062 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1063 0 : match err {
1064 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1065 0 : BasebackupError::Server(e) => QueryError::Other(e),
1066 : }
1067 0 : }
1068 :
1069 : let started = std::time::Instant::now();
1070 :
1071 : let timeline = self
1072 : .timeline_handles
1073 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1074 : .await?;
1075 :
1076 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1077 : if let Some(lsn) = lsn {
1078 : // Backup was requested at a particular LSN. Wait for it to arrive.
1079 : info!("waiting for {}", lsn);
1080 : timeline
1081 : .wait_lsn(
1082 : lsn,
1083 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1084 : ctx,
1085 : )
1086 : .await?;
1087 : timeline
1088 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1089 : .context("invalid basebackup lsn")?;
1090 : }
1091 :
1092 : let lsn_awaited_after = started.elapsed();
1093 :
1094 : // switch client to COPYOUT
1095 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1096 : .map_err(QueryError::Disconnected)?;
1097 : self.flush_cancellable(pgb, &self.cancel).await?;
1098 :
1099 : // Send a tarball of the latest layer on the timeline. Compress if not
1100 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1101 : if full_backup {
1102 : let mut writer = pgb.copyout_writer();
1103 : basebackup::send_basebackup_tarball(
1104 : &mut writer,
1105 : &timeline,
1106 : lsn,
1107 : prev_lsn,
1108 : full_backup,
1109 : ctx,
1110 : )
1111 : .await
1112 : .map_err(map_basebackup_error)?;
1113 : } else {
1114 : let mut writer = pgb.copyout_writer();
1115 : if gzip {
1116 : let mut encoder = GzipEncoder::with_quality(
1117 : writer,
1118 : // NOTE using fast compression because it's on the critical path
1119 : // for compute startup. For an empty database, we get
1120 : // <100KB with this method. The Level::Best compression method
1121 : // gives us <20KB, but maybe we should add basebackup caching
1122 : // on compute shutdown first.
1123 : async_compression::Level::Fastest,
1124 : );
1125 : basebackup::send_basebackup_tarball(
1126 : &mut encoder,
1127 : &timeline,
1128 : lsn,
1129 : prev_lsn,
1130 : full_backup,
1131 : ctx,
1132 : )
1133 : .await
1134 : .map_err(map_basebackup_error)?;
1135 : // shutdown the encoder to ensure the gzip footer is written
1136 : encoder
1137 : .shutdown()
1138 : .await
1139 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1140 : } else {
1141 : basebackup::send_basebackup_tarball(
1142 : &mut writer,
1143 : &timeline,
1144 : lsn,
1145 : prev_lsn,
1146 : full_backup,
1147 : ctx,
1148 : )
1149 : .await
1150 : .map_err(map_basebackup_error)?;
1151 : }
1152 : }
1153 :
1154 : pgb.write_message_noflush(&BeMessage::CopyDone)
1155 : .map_err(QueryError::Disconnected)?;
1156 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1157 :
1158 : let basebackup_after = started
1159 : .elapsed()
1160 : .checked_sub(lsn_awaited_after)
1161 : .unwrap_or(Duration::ZERO);
1162 :
1163 : info!(
1164 : lsn_await_millis = lsn_awaited_after.as_millis(),
1165 : basebackup_millis = basebackup_after.as_millis(),
1166 : "basebackup complete"
1167 : );
1168 :
1169 : Ok(())
1170 : }
1171 :
1172 : // when accessing management api supply None as an argument
1173 : // when using to authorize tenant pass corresponding tenant id
1174 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1175 0 : if self.auth.is_none() {
1176 : // auth is set to Trust, nothing to check so just return ok
1177 0 : return Ok(());
1178 0 : }
1179 0 : // auth is some, just checked above, when auth is some
1180 0 : // then claims are always present because of checks during connection init
1181 0 : // so this expect won't trigger
1182 0 : let claims = self
1183 0 : .claims
1184 0 : .as_ref()
1185 0 : .expect("claims presence already checked");
1186 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1187 0 : }
1188 : }
1189 :
1190 : #[async_trait::async_trait]
1191 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1192 : where
1193 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1194 : {
1195 0 : fn check_auth_jwt(
1196 0 : &mut self,
1197 0 : _pgb: &mut PostgresBackend<IO>,
1198 0 : jwt_response: &[u8],
1199 0 : ) -> Result<(), QueryError> {
1200 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1201 : // which requires auth to be present
1202 0 : let data = self
1203 0 : .auth
1204 0 : .as_ref()
1205 0 : .unwrap()
1206 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1207 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1208 :
1209 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1210 0 : return Err(QueryError::Unauthorized(
1211 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1212 0 : ));
1213 0 : }
1214 0 :
1215 0 : debug!(
1216 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1217 : data.claims.scope, data.claims.tenant_id,
1218 : );
1219 :
1220 0 : self.claims = Some(data.claims);
1221 0 : Ok(())
1222 0 : }
1223 :
1224 0 : fn startup(
1225 0 : &mut self,
1226 0 : _pgb: &mut PostgresBackend<IO>,
1227 0 : _sm: &FeStartupPacket,
1228 0 : ) -> Result<(), QueryError> {
1229 : fail::fail_point!("ps::connection-start::startup-packet");
1230 0 : Ok(())
1231 0 : }
1232 :
1233 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1234 : async fn process_query(
1235 : &mut self,
1236 : pgb: &mut PostgresBackend<IO>,
1237 : query_string: &str,
1238 0 : ) -> Result<(), QueryError> {
1239 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1240 0 : info!("Hit failpoint for bad connection");
1241 0 : Err(QueryError::SimulatedConnectionError)
1242 0 : });
1243 :
1244 : fail::fail_point!("ps::connection-start::process-query");
1245 :
1246 0 : let ctx = self.connection_ctx.attached_child();
1247 0 : debug!("process query {query_string:?}");
1248 0 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1249 0 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1250 0 : if params.len() != 2 {
1251 0 : return Err(QueryError::Other(anyhow::anyhow!(
1252 0 : "invalid param number for pagestream command"
1253 0 : )));
1254 0 : }
1255 0 : let tenant_id = TenantId::from_str(params[0])
1256 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1257 0 : let timeline_id = TimelineId::from_str(params[1])
1258 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1259 :
1260 0 : tracing::Span::current()
1261 0 : .record("tenant_id", field::display(tenant_id))
1262 0 : .record("timeline_id", field::display(timeline_id));
1263 0 :
1264 0 : self.check_permission(Some(tenant_id))?;
1265 :
1266 0 : COMPUTE_COMMANDS_COUNTERS
1267 0 : .for_command(ComputeCommandKind::PageStreamV2)
1268 0 : .inc();
1269 0 :
1270 0 : self.handle_pagerequests(
1271 0 : pgb,
1272 0 : tenant_id,
1273 0 : timeline_id,
1274 0 : PagestreamProtocolVersion::V2,
1275 0 : ctx,
1276 0 : )
1277 0 : .await?;
1278 0 : } else if let Some(params) = parts.strip_prefix(&["pagestream"]) {
1279 0 : if params.len() != 2 {
1280 0 : return Err(QueryError::Other(anyhow::anyhow!(
1281 0 : "invalid param number for pagestream command"
1282 0 : )));
1283 0 : }
1284 0 : let tenant_id = TenantId::from_str(params[0])
1285 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1286 0 : let timeline_id = TimelineId::from_str(params[1])
1287 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1288 :
1289 0 : tracing::Span::current()
1290 0 : .record("tenant_id", field::display(tenant_id))
1291 0 : .record("timeline_id", field::display(timeline_id));
1292 0 :
1293 0 : self.check_permission(Some(tenant_id))?;
1294 :
1295 0 : COMPUTE_COMMANDS_COUNTERS
1296 0 : .for_command(ComputeCommandKind::PageStream)
1297 0 : .inc();
1298 0 :
1299 0 : self.handle_pagerequests(
1300 0 : pgb,
1301 0 : tenant_id,
1302 0 : timeline_id,
1303 0 : PagestreamProtocolVersion::V1,
1304 0 : ctx,
1305 0 : )
1306 0 : .await?;
1307 0 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1308 0 : if params.len() < 2 {
1309 0 : return Err(QueryError::Other(anyhow::anyhow!(
1310 0 : "invalid param number for basebackup command"
1311 0 : )));
1312 0 : }
1313 :
1314 0 : let tenant_id = TenantId::from_str(params[0])
1315 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1316 0 : let timeline_id = TimelineId::from_str(params[1])
1317 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1318 :
1319 0 : tracing::Span::current()
1320 0 : .record("tenant_id", field::display(tenant_id))
1321 0 : .record("timeline_id", field::display(timeline_id));
1322 0 :
1323 0 : self.check_permission(Some(tenant_id))?;
1324 :
1325 0 : COMPUTE_COMMANDS_COUNTERS
1326 0 : .for_command(ComputeCommandKind::Basebackup)
1327 0 : .inc();
1328 :
1329 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1330 : Some(
1331 0 : Lsn::from_str(lsn_str)
1332 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1333 : )
1334 : } else {
1335 0 : None
1336 : };
1337 :
1338 0 : let gzip = match params.get(3) {
1339 0 : Some(&"--gzip") => true,
1340 0 : None => false,
1341 0 : Some(third_param) => {
1342 0 : return Err(QueryError::Other(anyhow::anyhow!(
1343 0 : "Parameter in position 3 unknown {third_param}",
1344 0 : )))
1345 : }
1346 : };
1347 :
1348 0 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1349 0 : let res = async {
1350 0 : self.handle_basebackup_request(
1351 0 : pgb,
1352 0 : tenant_id,
1353 0 : timeline_id,
1354 0 : lsn,
1355 0 : None,
1356 0 : false,
1357 0 : gzip,
1358 0 : &ctx,
1359 0 : )
1360 0 : .await?;
1361 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1362 0 : Result::<(), QueryError>::Ok(())
1363 0 : }
1364 0 : .await;
1365 0 : metric_recording.observe(&res);
1366 0 : res?;
1367 : }
1368 : // same as basebackup, but result includes relational data as well
1369 0 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1370 0 : if params.len() < 2 {
1371 0 : return Err(QueryError::Other(anyhow::anyhow!(
1372 0 : "invalid param number for fullbackup command"
1373 0 : )));
1374 0 : }
1375 :
1376 0 : let tenant_id = TenantId::from_str(params[0])
1377 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1378 0 : let timeline_id = TimelineId::from_str(params[1])
1379 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1380 :
1381 0 : tracing::Span::current()
1382 0 : .record("tenant_id", field::display(tenant_id))
1383 0 : .record("timeline_id", field::display(timeline_id));
1384 :
1385 : // The caller is responsible for providing correct lsn and prev_lsn.
1386 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1387 : Some(
1388 0 : Lsn::from_str(lsn_str)
1389 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1390 : )
1391 : } else {
1392 0 : None
1393 : };
1394 0 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1395 : Some(
1396 0 : Lsn::from_str(prev_lsn_str)
1397 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1398 : )
1399 : } else {
1400 0 : None
1401 : };
1402 :
1403 0 : self.check_permission(Some(tenant_id))?;
1404 :
1405 0 : COMPUTE_COMMANDS_COUNTERS
1406 0 : .for_command(ComputeCommandKind::Fullbackup)
1407 0 : .inc();
1408 0 :
1409 0 : // Check that the timeline exists
1410 0 : self.handle_basebackup_request(
1411 0 : pgb,
1412 0 : tenant_id,
1413 0 : timeline_id,
1414 0 : lsn,
1415 0 : prev_lsn,
1416 0 : true,
1417 0 : false,
1418 0 : &ctx,
1419 0 : )
1420 0 : .await?;
1421 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1422 0 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1423 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1424 : // on connect
1425 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1426 0 : } else if query_string.starts_with("lease lsn ") {
1427 0 : let params = &parts[2..];
1428 0 : if params.len() != 3 {
1429 0 : return Err(QueryError::Other(anyhow::anyhow!(
1430 0 : "invalid param number {} for lease lsn command",
1431 0 : params.len()
1432 0 : )));
1433 0 : }
1434 :
1435 0 : let tenant_shard_id = TenantShardId::from_str(params[0])
1436 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1437 0 : let timeline_id = TimelineId::from_str(params[1])
1438 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1439 :
1440 0 : tracing::Span::current()
1441 0 : .record("tenant_id", field::display(tenant_shard_id))
1442 0 : .record("timeline_id", field::display(timeline_id));
1443 0 :
1444 0 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1445 :
1446 0 : COMPUTE_COMMANDS_COUNTERS
1447 0 : .for_command(ComputeCommandKind::LeaseLsn)
1448 0 : .inc();
1449 :
1450 : // The caller is responsible for providing correct lsn.
1451 0 : let lsn = Lsn::from_str(params[2])
1452 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1453 :
1454 0 : match self
1455 0 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1456 0 : .await
1457 : {
1458 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1459 0 : Err(e) => {
1460 0 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1461 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1462 0 : &e.to_string(),
1463 0 : Some(e.pg_error_code()),
1464 0 : ))?
1465 : }
1466 : };
1467 : } else {
1468 0 : return Err(QueryError::Other(anyhow::anyhow!(
1469 0 : "unknown command {query_string}"
1470 0 : )));
1471 : }
1472 :
1473 0 : Ok(())
1474 0 : }
1475 : }
1476 :
1477 : impl From<GetActiveTenantError> for QueryError {
1478 0 : fn from(e: GetActiveTenantError) -> Self {
1479 0 : match e {
1480 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1481 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1482 0 : ),
1483 : GetActiveTenantError::Cancelled
1484 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1485 0 : QueryError::Shutdown
1486 : }
1487 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1488 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1489 : }
1490 0 : }
1491 : }
1492 :
1493 0 : #[derive(Debug, thiserror::Error)]
1494 : pub(crate) enum GetActiveTimelineError {
1495 : #[error(transparent)]
1496 : Tenant(GetActiveTenantError),
1497 : #[error(transparent)]
1498 : Timeline(#[from] GetTimelineError),
1499 : }
1500 :
1501 : impl From<GetActiveTimelineError> for QueryError {
1502 0 : fn from(e: GetActiveTimelineError) -> Self {
1503 0 : match e {
1504 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1505 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1506 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1507 : }
1508 0 : }
1509 : }
1510 :
1511 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1512 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1513 0 : tracing::Span::current().record(
1514 0 : "shard_id",
1515 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1516 0 : );
1517 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1518 0 : }
|