Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use once_cell::sync::OnceCell;
9 : use pageserver_api::models::TenantState;
10 : use pageserver_api::models::{
11 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
12 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
13 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
14 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
15 : PagestreamNblocksResponse, PagestreamProtocolVersion,
16 : };
17 : use pageserver_api::shard::TenantShardId;
18 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
19 : use pq_proto::framed::ConnectionError;
20 : use pq_proto::FeStartupPacket;
21 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
22 : use std::borrow::Cow;
23 : use std::io;
24 : use std::str;
25 : use std::str::FromStr;
26 : use std::sync::Arc;
27 : use std::time::SystemTime;
28 : use std::time::{Duration, Instant};
29 : use tokio::io::{AsyncRead, AsyncWrite};
30 : use tokio::io::{AsyncWriteExt, BufWriter};
31 : use tokio::task::JoinHandle;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 : use utils::{
35 : auth::{Claims, Scope, SwappableJwtAuth},
36 : id::{TenantId, TimelineId},
37 : lsn::Lsn,
38 : simple_rcu::RcuReadGuard,
39 : };
40 :
41 : use crate::auth::check_permission;
42 : use crate::basebackup;
43 : use crate::basebackup::BasebackupError;
44 : use crate::config::PageServerConf;
45 : use crate::context::{DownloadBehavior, RequestContext};
46 : use crate::metrics;
47 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
48 : use crate::pgdatadir_mapping::Version;
49 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
50 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
51 : use crate::task_mgr::TaskKind;
52 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
53 : use crate::tenant::mgr::ShardSelector;
54 : use crate::tenant::mgr::TenantManager;
55 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
56 : use crate::tenant::timeline::{self, WaitLsnError};
57 : use crate::tenant::GetTimelineError;
58 : use crate::tenant::PageReconstructError;
59 : use crate::tenant::Timeline;
60 : use pageserver_api::key::rel_block_to_key;
61 : use pageserver_api::reltag::SlruKind;
62 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
63 : use postgres_ffi::BLCKSZ;
64 :
65 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
66 : /// is not yet in state [`TenantState::Active`].
67 : ///
68 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
69 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
70 :
71 : ///////////////////////////////////////////////////////////////////////////////
72 :
73 : pub struct Listener {
74 : cancel: CancellationToken,
75 : /// Cancel the listener task through `listen_cancel` to shut down the listener
76 : /// and get a handle on the existing connections.
77 : task: JoinHandle<Connections>,
78 : }
79 :
80 : pub struct Connections {
81 : cancel: CancellationToken,
82 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
83 : }
84 :
85 0 : pub fn spawn(
86 0 : conf: &'static PageServerConf,
87 0 : tenant_manager: Arc<TenantManager>,
88 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
89 0 : tcp_listener: tokio::net::TcpListener,
90 0 : ) -> Listener {
91 0 : let cancel = CancellationToken::new();
92 0 : let libpq_ctx = RequestContext::todo_child(
93 0 : TaskKind::LibpqEndpointListener,
94 0 : // listener task shouldn't need to download anything. (We will
95 0 : // create a separate sub-contexts for each connection, with their
96 0 : // own download behavior. This context is used only to listen and
97 0 : // accept connections.)
98 0 : DownloadBehavior::Error,
99 0 : );
100 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
101 0 : "libpq listener",
102 0 : libpq_listener_main(
103 0 : tenant_manager,
104 0 : pg_auth,
105 0 : tcp_listener,
106 0 : conf.pg_auth_type,
107 0 : libpq_ctx,
108 0 : cancel.clone(),
109 0 : )
110 0 : .map(anyhow::Ok),
111 0 : ));
112 0 :
113 0 : Listener { cancel, task }
114 0 : }
115 :
116 : impl Listener {
117 0 : pub async fn stop_accepting(self) -> Connections {
118 0 : self.cancel.cancel();
119 0 : self.task
120 0 : .await
121 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
122 0 : }
123 : }
124 : impl Connections {
125 0 : pub(crate) async fn shutdown(self) {
126 0 : let Self { cancel, mut tasks } = self;
127 0 : cancel.cancel();
128 0 : while let Some(res) = tasks.join_next().await {
129 0 : Self::handle_connection_completion(res);
130 0 : }
131 0 : }
132 :
133 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
134 0 : match res {
135 0 : Ok(Ok(())) => {}
136 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
137 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
138 : }
139 0 : }
140 : }
141 :
142 : ///
143 : /// Main loop of the page service.
144 : ///
145 : /// Listens for connections, and launches a new handler task for each.
146 : ///
147 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
148 : /// open connections.
149 : ///
150 0 : pub async fn libpq_listener_main(
151 0 : tenant_manager: Arc<TenantManager>,
152 0 : auth: Option<Arc<SwappableJwtAuth>>,
153 0 : listener: tokio::net::TcpListener,
154 0 : auth_type: AuthType,
155 0 : listener_ctx: RequestContext,
156 0 : listener_cancel: CancellationToken,
157 0 : ) -> Connections {
158 0 : let connections_cancel = CancellationToken::new();
159 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
160 :
161 : loop {
162 0 : let accepted = tokio::select! {
163 : biased;
164 0 : _ = listener_cancel.cancelled() => break,
165 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
166 0 : let res = next.expect("we dont poll while empty");
167 0 : Connections::handle_connection_completion(res);
168 0 : continue;
169 : }
170 0 : accepted = listener.accept() => accepted,
171 0 : };
172 0 :
173 0 : match accepted {
174 0 : Ok((socket, peer_addr)) => {
175 0 : // Connection established. Spawn a new task to handle it.
176 0 : debug!("accepted connection from {}", peer_addr);
177 0 : let local_auth = auth.clone();
178 0 : let connection_ctx = listener_ctx
179 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
180 0 : connection_handler_tasks.spawn(page_service_conn_main(
181 0 : tenant_manager.clone(),
182 0 : local_auth,
183 0 : socket,
184 0 : auth_type,
185 0 : connection_ctx,
186 0 : connections_cancel.child_token(),
187 0 : ));
188 : }
189 0 : Err(err) => {
190 0 : // accept() failed. Log the error, and loop back to retry on next connection.
191 0 : error!("accept() failed: {:?}", err);
192 : }
193 : }
194 : }
195 :
196 0 : debug!("page_service listener loop terminated");
197 :
198 0 : Connections {
199 0 : cancel: connections_cancel,
200 0 : tasks: connection_handler_tasks,
201 0 : }
202 0 : }
203 :
204 : type ConnectionHandlerResult = anyhow::Result<()>;
205 :
206 0 : #[instrument(skip_all, fields(peer_addr))]
207 : async fn page_service_conn_main(
208 : tenant_manager: Arc<TenantManager>,
209 : auth: Option<Arc<SwappableJwtAuth>>,
210 : socket: tokio::net::TcpStream,
211 : auth_type: AuthType,
212 : connection_ctx: RequestContext,
213 : cancel: CancellationToken,
214 : ) -> ConnectionHandlerResult {
215 : let _guard = LIVE_CONNECTIONS
216 : .with_label_values(&["page_service"])
217 : .guard();
218 :
219 : socket
220 : .set_nodelay(true)
221 : .context("could not set TCP_NODELAY")?;
222 :
223 : let peer_addr = socket.peer_addr().context("get peer address")?;
224 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
225 :
226 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
227 : // - long enough for most valid compute connections
228 : // - less than infinite to stop us from "leaking" connections to long-gone computes
229 : //
230 : // no write timeout is used, because the kernel is assumed to error writes after some time.
231 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
232 :
233 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
234 0 : let socket_timeout_ms = (|| {
235 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
236 : // Exponential distribution for simulating
237 : // poor network conditions, expect about avg_timeout_ms to be around 15
238 : // in tests
239 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
240 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
241 0 : let u = rand::random::<f32>();
242 0 : ((1.0 - u).ln() / (-avg)) as u64
243 : } else {
244 0 : default_timeout_ms
245 : }
246 0 : });
247 0 : default_timeout_ms
248 : })();
249 :
250 : // A timeout here does not mean the client died, it can happen if it's just idle for
251 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
252 : // they reconnect.
253 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
254 : let socket = std::pin::pin!(socket);
255 :
256 : fail::fail_point!("ps::connection-start::pre-login");
257 :
258 : // XXX: pgbackend.run() should take the connection_ctx,
259 : // and create a child per-query context when it invokes process_query.
260 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
261 : // and create the per-query context in process_query ourselves.
262 : let mut conn_handler =
263 : PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone());
264 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
265 :
266 : match pgbackend.run(&mut conn_handler, &cancel).await {
267 : Ok(()) => {
268 : // we've been requested to shut down
269 : Ok(())
270 : }
271 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
272 : if is_expected_io_error(&io_error) {
273 : info!("Postgres client disconnected ({io_error})");
274 : Ok(())
275 : } else {
276 : let tenant_id = conn_handler.timeline_handles.tenant_id();
277 : Err(io_error).context(format!(
278 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
279 : tenant_id, peer_addr
280 : ))
281 : }
282 : }
283 : other => {
284 : let tenant_id = conn_handler.timeline_handles.tenant_id();
285 : other.context(format!(
286 : "Postgres query error for tenant_id={:?} client peer_addr={}",
287 : tenant_id, peer_addr
288 : ))
289 : }
290 : }
291 : }
292 :
293 : struct PageServerHandler {
294 : auth: Option<Arc<SwappableJwtAuth>>,
295 : claims: Option<Claims>,
296 :
297 : /// The context created for the lifetime of the connection
298 : /// services by this PageServerHandler.
299 : /// For each query received over the connection,
300 : /// `process_query` creates a child context from this one.
301 : connection_ctx: RequestContext,
302 :
303 : cancel: CancellationToken,
304 :
305 : timeline_handles: TimelineHandles,
306 : }
307 :
308 : struct TimelineHandles {
309 : wrapper: TenantManagerWrapper,
310 : /// Note on size: the typical size of this map is 1. The largest size we expect
311 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
312 : /// or the ratio used when splitting shards (i.e. how many children created from one)
313 : /// parent shard, where a "large" number might be ~8.
314 : handles: timeline::handle::Cache<TenantManagerTypes>,
315 : }
316 :
317 : impl TimelineHandles {
318 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
319 0 : Self {
320 0 : wrapper: TenantManagerWrapper {
321 0 : tenant_manager,
322 0 : tenant_id: OnceCell::new(),
323 0 : },
324 0 : handles: Default::default(),
325 0 : }
326 0 : }
327 0 : async fn get(
328 0 : &mut self,
329 0 : tenant_id: TenantId,
330 0 : timeline_id: TimelineId,
331 0 : shard_selector: ShardSelector,
332 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
333 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
334 0 : return Err(GetActiveTimelineError::Tenant(
335 0 : GetActiveTenantError::SwitchedTenant,
336 0 : ));
337 0 : }
338 0 : self.handles
339 0 : .get(timeline_id, shard_selector, &self.wrapper)
340 0 : .await
341 0 : .map_err(|e| match e {
342 0 : timeline::handle::GetError::TenantManager(e) => e,
343 : timeline::handle::GetError::TimelineGateClosed => {
344 0 : trace!("timeline gate closed");
345 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
346 : }
347 : timeline::handle::GetError::PerTimelineStateShutDown => {
348 0 : trace!("per-timeline state shut down");
349 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
350 : }
351 0 : })
352 0 : }
353 :
354 0 : fn tenant_id(&self) -> Option<TenantId> {
355 0 : self.wrapper.tenant_id.get().copied()
356 0 : }
357 : }
358 :
359 : pub(crate) struct TenantManagerWrapper {
360 : tenant_manager: Arc<TenantManager>,
361 : // We do not support switching tenant_id on a connection at this point.
362 : // We can can add support for this later if needed without changing
363 : // the protocol.
364 : tenant_id: once_cell::sync::OnceCell<TenantId>,
365 : }
366 :
367 : #[derive(Debug)]
368 : pub(crate) struct TenantManagerTypes;
369 :
370 : impl timeline::handle::Types for TenantManagerTypes {
371 : type TenantManagerError = GetActiveTimelineError;
372 : type TenantManager = TenantManagerWrapper;
373 : type Timeline = Arc<Timeline>;
374 : }
375 :
376 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
377 0 : fn gate(&self) -> &utils::sync::gate::Gate {
378 0 : &self.gate
379 0 : }
380 :
381 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
382 0 : Timeline::shard_timeline_id(self)
383 0 : }
384 :
385 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
386 0 : &self.handles
387 0 : }
388 :
389 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
390 0 : Timeline::get_shard_identity(self)
391 0 : }
392 : }
393 :
394 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
395 0 : async fn resolve(
396 0 : &self,
397 0 : timeline_id: TimelineId,
398 0 : shard_selector: ShardSelector,
399 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
400 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
401 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
402 0 : let wait_start = Instant::now();
403 0 : let deadline = wait_start + timeout;
404 0 : let tenant_shard = loop {
405 0 : let resolved = self
406 0 : .tenant_manager
407 0 : .resolve_attached_shard(tenant_id, shard_selector);
408 0 : match resolved {
409 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
410 : ShardResolveResult::NotFound => {
411 0 : return Err(GetActiveTimelineError::Tenant(
412 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
413 0 : ));
414 : }
415 0 : ShardResolveResult::InProgress(barrier) => {
416 0 : // We can't authoritatively answer right now: wait for InProgress state
417 0 : // to end, then try again
418 0 : tokio::select! {
419 0 : _ = barrier.wait() => {
420 0 : // The barrier completed: proceed around the loop to try looking up again
421 0 : },
422 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
423 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
424 0 : latest_state: None,
425 0 : wait_time: timeout,
426 0 : }));
427 : }
428 : }
429 : }
430 : };
431 : };
432 :
433 0 : tracing::debug!("Waiting for tenant to enter active state...");
434 0 : tenant_shard
435 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
436 0 : .await
437 0 : .map_err(GetActiveTimelineError::Tenant)?;
438 :
439 0 : let timeline = tenant_shard
440 0 : .get_timeline(timeline_id, true)
441 0 : .map_err(GetActiveTimelineError::Timeline)?;
442 0 : set_tracing_field_shard_id(&timeline);
443 0 : Ok(timeline)
444 0 : }
445 : }
446 :
447 0 : #[derive(thiserror::Error, Debug)]
448 : enum PageStreamError {
449 : /// We encountered an error that should prompt the client to reconnect:
450 : /// in practice this means we drop the connection without sending a response.
451 : #[error("Reconnect required: {0}")]
452 : Reconnect(Cow<'static, str>),
453 :
454 : /// We were instructed to shutdown while processing the query
455 : #[error("Shutting down")]
456 : Shutdown,
457 :
458 : /// Something went wrong reading a page: this likely indicates a pageserver bug
459 : #[error("Read error")]
460 : Read(#[source] PageReconstructError),
461 :
462 : /// Ran out of time waiting for an LSN
463 : #[error("LSN timeout: {0}")]
464 : LsnTimeout(WaitLsnError),
465 :
466 : /// The entity required to serve the request (tenant or timeline) is not found,
467 : /// or is not found in a suitable state to serve a request.
468 : #[error("Not found: {0}")]
469 : NotFound(Cow<'static, str>),
470 :
471 : /// Request asked for something that doesn't make sense, like an invalid LSN
472 : #[error("Bad request: {0}")]
473 : BadRequest(Cow<'static, str>),
474 : }
475 :
476 : impl From<PageReconstructError> for PageStreamError {
477 0 : fn from(value: PageReconstructError) -> Self {
478 0 : match value {
479 0 : PageReconstructError::Cancelled => Self::Shutdown,
480 0 : e => Self::Read(e),
481 : }
482 0 : }
483 : }
484 :
485 : impl From<GetActiveTimelineError> for PageStreamError {
486 0 : fn from(value: GetActiveTimelineError) -> Self {
487 0 : match value {
488 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
489 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
490 : TenantState::Stopping { .. },
491 : ))
492 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
493 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
494 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
495 : }
496 0 : }
497 : }
498 :
499 : impl From<WaitLsnError> for PageStreamError {
500 0 : fn from(value: WaitLsnError) -> Self {
501 0 : match value {
502 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
503 0 : WaitLsnError::Shutdown => Self::Shutdown,
504 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
505 : }
506 0 : }
507 : }
508 :
509 : impl From<WaitLsnError> for QueryError {
510 0 : fn from(value: WaitLsnError) -> Self {
511 0 : match value {
512 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
513 0 : WaitLsnError::Shutdown => Self::Shutdown,
514 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
515 : }
516 0 : }
517 : }
518 :
519 : impl PageServerHandler {
520 0 : pub fn new(
521 0 : tenant_manager: Arc<TenantManager>,
522 0 : auth: Option<Arc<SwappableJwtAuth>>,
523 0 : connection_ctx: RequestContext,
524 0 : cancel: CancellationToken,
525 0 : ) -> Self {
526 0 : PageServerHandler {
527 0 : auth,
528 0 : claims: None,
529 0 : connection_ctx,
530 0 : timeline_handles: TimelineHandles::new(tenant_manager),
531 0 : cancel,
532 0 : }
533 0 : }
534 :
535 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
536 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
537 : /// cancellation if there aren't any timelines in the cache.
538 : ///
539 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
540 : /// timeline cancellation token.
541 0 : async fn flush_cancellable<IO>(
542 0 : &self,
543 0 : pgb: &mut PostgresBackend<IO>,
544 0 : cancel: &CancellationToken,
545 0 : ) -> Result<(), QueryError>
546 0 : where
547 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
548 0 : {
549 0 : tokio::select!(
550 0 : flush_r = pgb.flush() => {
551 0 : Ok(flush_r?)
552 : },
553 0 : _ = cancel.cancelled() => {
554 0 : Err(QueryError::Shutdown)
555 : }
556 : )
557 0 : }
558 :
559 : /// Pagestream sub-protocol handler.
560 : ///
561 : /// It is a simple request-response protocol inside a COPYBOTH session.
562 : ///
563 : /// # Coding Discipline
564 : ///
565 : /// Coding discipline within this function: all interaction with the `pgb` connection
566 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
567 : /// This is so that we can shutdown page_service quickly.
568 0 : #[instrument(skip_all)]
569 : async fn handle_pagerequests<IO>(
570 : &mut self,
571 : pgb: &mut PostgresBackend<IO>,
572 : tenant_id: TenantId,
573 : timeline_id: TimelineId,
574 : _protocol_version: PagestreamProtocolVersion,
575 : ctx: RequestContext,
576 : ) -> Result<(), QueryError>
577 : where
578 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
579 : {
580 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
581 :
582 : // switch client to COPYBOTH
583 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
584 : tokio::select! {
585 : biased;
586 : _ = self.cancel.cancelled() => {
587 : return Err(QueryError::Shutdown)
588 : }
589 : res = pgb.flush() => {
590 : res?;
591 : }
592 : }
593 :
594 : loop {
595 : // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
596 : let msg = tokio::select! {
597 : biased;
598 : _ = self.cancel.cancelled() => {
599 : return Err(QueryError::Shutdown)
600 : }
601 : msg = pgb.read_message() => { msg }
602 : };
603 : let copy_data_bytes = match msg? {
604 : Some(FeMessage::CopyData(bytes)) => bytes,
605 : Some(FeMessage::Terminate) => break,
606 : Some(m) => {
607 : return Err(QueryError::Other(anyhow::anyhow!(
608 : "unexpected message: {m:?} during COPY"
609 : )));
610 : }
611 : None => break, // client disconnected
612 : };
613 :
614 : trace!("query: {copy_data_bytes:?}");
615 : fail::fail_point!("ps::handle-pagerequest-message");
616 :
617 : // parse request
618 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
619 :
620 : // invoke handler function
621 : let (handler_result, span) = match neon_fe_msg {
622 : PagestreamFeMessage::Exists(req) => {
623 : fail::fail_point!("ps::handle-pagerequest-message::exists");
624 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
625 : (
626 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
627 : .instrument(span.clone())
628 : .await,
629 : span,
630 : )
631 : }
632 : PagestreamFeMessage::Nblocks(req) => {
633 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
634 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
635 : (
636 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
637 : .instrument(span.clone())
638 : .await,
639 : span,
640 : )
641 : }
642 : PagestreamFeMessage::GetPage(req) => {
643 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
644 : // shard_id is filled in by the handler
645 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
646 : (
647 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
648 : .instrument(span.clone())
649 : .await,
650 : span,
651 : )
652 : }
653 : PagestreamFeMessage::DbSize(req) => {
654 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
655 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
656 : (
657 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
658 : .instrument(span.clone())
659 : .await,
660 : span,
661 : )
662 : }
663 : PagestreamFeMessage::GetSlruSegment(req) => {
664 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
665 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
666 : (
667 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
668 : .instrument(span.clone())
669 : .await,
670 : span,
671 : )
672 : }
673 : };
674 :
675 : // Map handler result to protocol behavior.
676 : // Some handler errors cause exit from pagestream protocol.
677 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
678 : let response_msg = match handler_result {
679 : Err(e) => match &e {
680 : PageStreamError::Shutdown => {
681 : // If we fail to fulfil a request during shutdown, which may be _because_ of
682 : // shutdown, then do not send the error to the client. Instead just drop the
683 : // connection.
684 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
685 : return Err(QueryError::Shutdown);
686 : }
687 : PageStreamError::Reconnect(reason) => {
688 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
689 : return Err(QueryError::Reconnect);
690 : }
691 : PageStreamError::Read(_)
692 : | PageStreamError::LsnTimeout(_)
693 : | PageStreamError::NotFound(_)
694 : | PageStreamError::BadRequest(_) => {
695 : // print the all details to the log with {:#}, but for the client the
696 : // error message is enough. Do not log if shutting down, as the anyhow::Error
697 : // here includes cancellation which is not an error.
698 : let full = utils::error::report_compact_sources(&e);
699 0 : span.in_scope(|| {
700 0 : error!("error reading relation or page version: {full:#}")
701 0 : });
702 : PagestreamBeMessage::Error(PagestreamErrorResponse {
703 : message: e.to_string(),
704 : })
705 : }
706 : },
707 : Ok(response_msg) => response_msg,
708 : };
709 :
710 : // marshal & transmit response message
711 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
712 : tokio::select! {
713 : biased;
714 : _ = self.cancel.cancelled() => {
715 : // We were requested to shut down.
716 : info!("shutdown request received in page handler");
717 : return Err(QueryError::Shutdown)
718 : }
719 : res = pgb.flush() => {
720 : res?;
721 : }
722 : }
723 : }
724 : Ok(())
725 : }
726 :
727 : /// Helper function to handle the LSN from client request.
728 : ///
729 : /// Each GetPage (and Exists and Nblocks) request includes information about
730 : /// which version of the page is being requested. The primary compute node
731 : /// will always request the latest page version, by setting 'request_lsn' to
732 : /// the last inserted or flushed WAL position, while a standby will request
733 : /// a version at the LSN that it's currently caught up to.
734 : ///
735 : /// In either case, if the page server hasn't received the WAL up to the
736 : /// requested LSN yet, we will wait for it to arrive. The return value is
737 : /// the LSN that should be used to look up the page versions.
738 : ///
739 : /// In addition to the request LSN, each request carries another LSN,
740 : /// 'not_modified_since', which is a hint to the pageserver that the client
741 : /// knows that the page has not been modified between 'not_modified_since'
742 : /// and the request LSN. This allows skipping the wait, as long as the WAL
743 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
744 : /// information about when the page was modified, it will use
745 : /// not_modified_since == lsn. If the client lies and sends a too low
746 : /// not_modified_hint such that there are in fact later page versions, the
747 : /// behavior is undefined: the pageserver may return any of the page versions
748 : /// or an error.
749 0 : async fn wait_or_get_last_lsn(
750 0 : timeline: &Timeline,
751 0 : request_lsn: Lsn,
752 0 : not_modified_since: Lsn,
753 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
754 0 : ctx: &RequestContext,
755 0 : ) -> Result<Lsn, PageStreamError> {
756 0 : let last_record_lsn = timeline.get_last_record_lsn();
757 0 :
758 0 : // Sanity check the request
759 0 : if request_lsn < not_modified_since {
760 0 : return Err(PageStreamError::BadRequest(
761 0 : format!(
762 0 : "invalid request with request LSN {} and not_modified_since {}",
763 0 : request_lsn, not_modified_since,
764 0 : )
765 0 : .into(),
766 0 : ));
767 0 : }
768 0 :
769 0 : if request_lsn < **latest_gc_cutoff_lsn {
770 0 : let gc_info = &timeline.gc_info.read().unwrap();
771 0 : if !gc_info.leases.contains_key(&request_lsn) {
772 : // The requested LSN is below gc cutoff and is not guarded by a lease.
773 :
774 : // Check explicitly for INVALID just to get a less scary error message if the
775 : // request is obviously bogus
776 0 : return Err(if request_lsn == Lsn::INVALID {
777 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
778 : } else {
779 0 : PageStreamError::BadRequest(format!(
780 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
781 0 : request_lsn, **latest_gc_cutoff_lsn
782 0 : ).into())
783 : });
784 0 : }
785 0 : }
786 :
787 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
788 0 : if not_modified_since > last_record_lsn {
789 0 : timeline
790 0 : .wait_lsn(
791 0 : not_modified_since,
792 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
793 0 : ctx,
794 0 : )
795 0 : .await?;
796 : // Since we waited for 'not_modified_since' to arrive, that is now the last
797 : // record LSN. (Or close enough for our purposes; the last-record LSN can
798 : // advance immediately after we return anyway)
799 0 : Ok(not_modified_since)
800 : } else {
801 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
802 : // here instead. That would give the same result, since we know that there
803 : // haven't been any modifications since 'not_modified_since'. Using an older
804 : // LSN might be faster, because that could allow skipping recent layers when
805 : // finding the page. However, we have historically used 'last_record_lsn', so
806 : // stick to that for now.
807 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
808 : }
809 0 : }
810 :
811 : /// Handles the lsn lease request.
812 : /// If a lease cannot be obtained, the client will receive NULL.
813 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
814 : async fn handle_make_lsn_lease<IO>(
815 : &mut self,
816 : pgb: &mut PostgresBackend<IO>,
817 : tenant_shard_id: TenantShardId,
818 : timeline_id: TimelineId,
819 : lsn: Lsn,
820 : ctx: &RequestContext,
821 : ) -> Result<(), QueryError>
822 : where
823 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
824 : {
825 : let timeline = self
826 : .timeline_handles
827 : .get(
828 : tenant_shard_id.tenant_id,
829 : timeline_id,
830 : ShardSelector::Known(tenant_shard_id.to_index()),
831 : )
832 : .await?;
833 : set_tracing_field_shard_id(&timeline);
834 :
835 : let lease = timeline
836 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
837 0 : .inspect_err(|e| {
838 0 : warn!("{e}");
839 0 : })
840 : .ok();
841 0 : let valid_until_str = lease.map(|l| {
842 0 : l.valid_until
843 0 : .duration_since(SystemTime::UNIX_EPOCH)
844 0 : .expect("valid_until is earlier than UNIX_EPOCH")
845 0 : .as_millis()
846 0 : .to_string()
847 0 : });
848 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
849 :
850 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
851 : b"valid_until",
852 : )]))?
853 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
854 :
855 : Ok(())
856 : }
857 :
858 0 : #[instrument(skip_all, fields(shard_id))]
859 : async fn handle_get_rel_exists_request(
860 : &mut self,
861 : tenant_id: TenantId,
862 : timeline_id: TimelineId,
863 : req: &PagestreamExistsRequest,
864 : ctx: &RequestContext,
865 : ) -> Result<PagestreamBeMessage, PageStreamError> {
866 : let timeline = self
867 : .timeline_handles
868 : .get(tenant_id, timeline_id, ShardSelector::Zero)
869 : .await?;
870 : let _timer = timeline
871 : .query_metrics
872 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
873 :
874 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
875 : let lsn = Self::wait_or_get_last_lsn(
876 : &timeline,
877 : req.request_lsn,
878 : req.not_modified_since,
879 : &latest_gc_cutoff_lsn,
880 : ctx,
881 : )
882 : .await?;
883 :
884 : let exists = timeline
885 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
886 : .await?;
887 :
888 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
889 : exists,
890 : }))
891 : }
892 :
893 0 : #[instrument(skip_all, fields(shard_id))]
894 : async fn handle_get_nblocks_request(
895 : &mut self,
896 : tenant_id: TenantId,
897 : timeline_id: TimelineId,
898 : req: &PagestreamNblocksRequest,
899 : ctx: &RequestContext,
900 : ) -> Result<PagestreamBeMessage, PageStreamError> {
901 : let timeline = self
902 : .timeline_handles
903 : .get(tenant_id, timeline_id, ShardSelector::Zero)
904 : .await?;
905 :
906 : let _timer = timeline
907 : .query_metrics
908 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
909 :
910 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
911 : let lsn = Self::wait_or_get_last_lsn(
912 : &timeline,
913 : req.request_lsn,
914 : req.not_modified_since,
915 : &latest_gc_cutoff_lsn,
916 : ctx,
917 : )
918 : .await?;
919 :
920 : let n_blocks = timeline
921 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
922 : .await?;
923 :
924 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
925 : n_blocks,
926 : }))
927 : }
928 :
929 0 : #[instrument(skip_all, fields(shard_id))]
930 : async fn handle_db_size_request(
931 : &mut self,
932 : tenant_id: TenantId,
933 : timeline_id: TimelineId,
934 : req: &PagestreamDbSizeRequest,
935 : ctx: &RequestContext,
936 : ) -> Result<PagestreamBeMessage, PageStreamError> {
937 : let timeline = self
938 : .timeline_handles
939 : .get(tenant_id, timeline_id, ShardSelector::Zero)
940 : .await?;
941 :
942 : let _timer = timeline
943 : .query_metrics
944 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
945 :
946 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
947 : let lsn = Self::wait_or_get_last_lsn(
948 : &timeline,
949 : req.request_lsn,
950 : req.not_modified_since,
951 : &latest_gc_cutoff_lsn,
952 : ctx,
953 : )
954 : .await?;
955 :
956 : let total_blocks = timeline
957 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
958 : .await?;
959 : let db_size = total_blocks as i64 * BLCKSZ as i64;
960 :
961 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
962 : db_size,
963 : }))
964 : }
965 :
966 0 : #[instrument(skip_all, fields(shard_id))]
967 : async fn handle_get_page_at_lsn_request(
968 : &mut self,
969 : tenant_id: TenantId,
970 : timeline_id: TimelineId,
971 : req: &PagestreamGetPageRequest,
972 : ctx: &RequestContext,
973 : ) -> Result<PagestreamBeMessage, PageStreamError> {
974 : let timeline = match self
975 : .timeline_handles
976 : .get(
977 : tenant_id,
978 : timeline_id,
979 : ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
980 : )
981 : .await
982 : {
983 : Ok(tl) => tl,
984 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
985 : // We already know this tenant exists in general, because we resolved it at
986 : // start of connection. Getting a NotFound here indicates that the shard containing
987 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
988 : // mapping is out of date.
989 : //
990 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
991 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
992 : // and talk to a different pageserver.
993 : return Err(PageStreamError::Reconnect(
994 : "getpage@lsn request routed to wrong shard".into(),
995 : ));
996 : }
997 : Err(e) => return Err(e.into()),
998 : };
999 :
1000 : let _timer = timeline
1001 : .query_metrics
1002 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
1003 :
1004 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1005 : let lsn = Self::wait_or_get_last_lsn(
1006 : &timeline,
1007 : req.request_lsn,
1008 : req.not_modified_since,
1009 : &latest_gc_cutoff_lsn,
1010 : ctx,
1011 : )
1012 : .await?;
1013 :
1014 : let page = timeline
1015 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
1016 : .await?;
1017 :
1018 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1019 : page,
1020 : }))
1021 : }
1022 :
1023 0 : #[instrument(skip_all, fields(shard_id))]
1024 : async fn handle_get_slru_segment_request(
1025 : &mut self,
1026 : tenant_id: TenantId,
1027 : timeline_id: TimelineId,
1028 : req: &PagestreamGetSlruSegmentRequest,
1029 : ctx: &RequestContext,
1030 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1031 : let timeline = self
1032 : .timeline_handles
1033 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1034 : .await?;
1035 :
1036 : let _timer = timeline
1037 : .query_metrics
1038 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1039 :
1040 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1041 : let lsn = Self::wait_or_get_last_lsn(
1042 : &timeline,
1043 : req.request_lsn,
1044 : req.not_modified_since,
1045 : &latest_gc_cutoff_lsn,
1046 : ctx,
1047 : )
1048 : .await?;
1049 :
1050 : let kind = SlruKind::from_repr(req.kind)
1051 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1052 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1053 :
1054 : Ok(PagestreamBeMessage::GetSlruSegment(
1055 : PagestreamGetSlruSegmentResponse { segment },
1056 : ))
1057 : }
1058 :
1059 : /// Note on "fullbackup":
1060 : /// Full basebackups should only be used for debugging purposes.
1061 : /// Originally, it was introduced to enable breaking storage format changes,
1062 : /// but that is not applicable anymore.
1063 : ///
1064 : /// # Coding Discipline
1065 : ///
1066 : /// Coding discipline within this function: all interaction with the `pgb` connection
1067 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1068 : /// This is so that we can shutdown page_service quickly.
1069 : ///
1070 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1071 : /// to connection cancellation.
1072 : #[allow(clippy::too_many_arguments)]
1073 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1074 : async fn handle_basebackup_request<IO>(
1075 : &mut self,
1076 : pgb: &mut PostgresBackend<IO>,
1077 : tenant_id: TenantId,
1078 : timeline_id: TimelineId,
1079 : lsn: Option<Lsn>,
1080 : prev_lsn: Option<Lsn>,
1081 : full_backup: bool,
1082 : gzip: bool,
1083 : ctx: &RequestContext,
1084 : ) -> Result<(), QueryError>
1085 : where
1086 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1087 : {
1088 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1089 0 : match err {
1090 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1091 0 : BasebackupError::Server(e) => QueryError::Other(e),
1092 : }
1093 0 : }
1094 :
1095 : let started = std::time::Instant::now();
1096 :
1097 : let timeline = self
1098 : .timeline_handles
1099 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1100 : .await?;
1101 :
1102 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1103 : if let Some(lsn) = lsn {
1104 : // Backup was requested at a particular LSN. Wait for it to arrive.
1105 : info!("waiting for {}", lsn);
1106 : timeline
1107 : .wait_lsn(
1108 : lsn,
1109 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1110 : ctx,
1111 : )
1112 : .await?;
1113 : timeline
1114 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1115 : .context("invalid basebackup lsn")?;
1116 : }
1117 :
1118 : let lsn_awaited_after = started.elapsed();
1119 :
1120 : // switch client to COPYOUT
1121 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1122 : .map_err(QueryError::Disconnected)?;
1123 : self.flush_cancellable(pgb, &self.cancel).await?;
1124 :
1125 : // Send a tarball of the latest layer on the timeline. Compress if not
1126 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1127 : if full_backup {
1128 : let mut writer = pgb.copyout_writer();
1129 : basebackup::send_basebackup_tarball(
1130 : &mut writer,
1131 : &timeline,
1132 : lsn,
1133 : prev_lsn,
1134 : full_backup,
1135 : ctx,
1136 : )
1137 : .await
1138 : .map_err(map_basebackup_error)?;
1139 : } else {
1140 : let mut writer = BufWriter::new(pgb.copyout_writer());
1141 : if gzip {
1142 : let mut encoder = GzipEncoder::with_quality(
1143 : &mut writer,
1144 : // NOTE using fast compression because it's on the critical path
1145 : // for compute startup. For an empty database, we get
1146 : // <100KB with this method. The Level::Best compression method
1147 : // gives us <20KB, but maybe we should add basebackup caching
1148 : // on compute shutdown first.
1149 : async_compression::Level::Fastest,
1150 : );
1151 : basebackup::send_basebackup_tarball(
1152 : &mut encoder,
1153 : &timeline,
1154 : lsn,
1155 : prev_lsn,
1156 : full_backup,
1157 : ctx,
1158 : )
1159 : .await
1160 : .map_err(map_basebackup_error)?;
1161 : // shutdown the encoder to ensure the gzip footer is written
1162 : encoder
1163 : .shutdown()
1164 : .await
1165 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1166 : } else {
1167 : basebackup::send_basebackup_tarball(
1168 : &mut writer,
1169 : &timeline,
1170 : lsn,
1171 : prev_lsn,
1172 : full_backup,
1173 : ctx,
1174 : )
1175 : .await
1176 : .map_err(map_basebackup_error)?;
1177 : }
1178 : writer
1179 : .flush()
1180 : .await
1181 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
1182 : }
1183 :
1184 : pgb.write_message_noflush(&BeMessage::CopyDone)
1185 : .map_err(QueryError::Disconnected)?;
1186 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1187 :
1188 : let basebackup_after = started
1189 : .elapsed()
1190 : .checked_sub(lsn_awaited_after)
1191 : .unwrap_or(Duration::ZERO);
1192 :
1193 : info!(
1194 : lsn_await_millis = lsn_awaited_after.as_millis(),
1195 : basebackup_millis = basebackup_after.as_millis(),
1196 : "basebackup complete"
1197 : );
1198 :
1199 : Ok(())
1200 : }
1201 :
1202 : // when accessing management api supply None as an argument
1203 : // when using to authorize tenant pass corresponding tenant id
1204 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1205 0 : if self.auth.is_none() {
1206 : // auth is set to Trust, nothing to check so just return ok
1207 0 : return Ok(());
1208 0 : }
1209 0 : // auth is some, just checked above, when auth is some
1210 0 : // then claims are always present because of checks during connection init
1211 0 : // so this expect won't trigger
1212 0 : let claims = self
1213 0 : .claims
1214 0 : .as_ref()
1215 0 : .expect("claims presence already checked");
1216 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1217 0 : }
1218 : }
1219 :
1220 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1221 : where
1222 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1223 : {
1224 0 : fn check_auth_jwt(
1225 0 : &mut self,
1226 0 : _pgb: &mut PostgresBackend<IO>,
1227 0 : jwt_response: &[u8],
1228 0 : ) -> Result<(), QueryError> {
1229 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1230 : // which requires auth to be present
1231 0 : let data = self
1232 0 : .auth
1233 0 : .as_ref()
1234 0 : .unwrap()
1235 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1236 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1237 :
1238 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1239 0 : return Err(QueryError::Unauthorized(
1240 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1241 0 : ));
1242 0 : }
1243 0 :
1244 0 : debug!(
1245 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1246 : data.claims.scope, data.claims.tenant_id,
1247 : );
1248 :
1249 0 : self.claims = Some(data.claims);
1250 0 : Ok(())
1251 0 : }
1252 :
1253 0 : fn startup(
1254 0 : &mut self,
1255 0 : _pgb: &mut PostgresBackend<IO>,
1256 0 : _sm: &FeStartupPacket,
1257 0 : ) -> Result<(), QueryError> {
1258 0 : fail::fail_point!("ps::connection-start::startup-packet");
1259 0 : Ok(())
1260 0 : }
1261 :
1262 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1263 : async fn process_query(
1264 : &mut self,
1265 : pgb: &mut PostgresBackend<IO>,
1266 : query_string: &str,
1267 : ) -> Result<(), QueryError> {
1268 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1269 0 : info!("Hit failpoint for bad connection");
1270 0 : Err(QueryError::SimulatedConnectionError)
1271 0 : });
1272 :
1273 : fail::fail_point!("ps::connection-start::process-query");
1274 :
1275 : let ctx = self.connection_ctx.attached_child();
1276 : debug!("process query {query_string:?}");
1277 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1278 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1279 : if params.len() != 2 {
1280 : return Err(QueryError::Other(anyhow::anyhow!(
1281 : "invalid param number for pagestream command"
1282 : )));
1283 : }
1284 : let tenant_id = TenantId::from_str(params[0])
1285 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1286 : let timeline_id = TimelineId::from_str(params[1])
1287 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1288 :
1289 : tracing::Span::current()
1290 : .record("tenant_id", field::display(tenant_id))
1291 : .record("timeline_id", field::display(timeline_id));
1292 :
1293 : self.check_permission(Some(tenant_id))?;
1294 :
1295 : COMPUTE_COMMANDS_COUNTERS
1296 : .for_command(ComputeCommandKind::PageStreamV2)
1297 : .inc();
1298 :
1299 : self.handle_pagerequests(
1300 : pgb,
1301 : tenant_id,
1302 : timeline_id,
1303 : PagestreamProtocolVersion::V2,
1304 : ctx,
1305 : )
1306 : .await?;
1307 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1308 : if params.len() < 2 {
1309 : return Err(QueryError::Other(anyhow::anyhow!(
1310 : "invalid param number for basebackup command"
1311 : )));
1312 : }
1313 :
1314 : let tenant_id = TenantId::from_str(params[0])
1315 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1316 : let timeline_id = TimelineId::from_str(params[1])
1317 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1318 :
1319 : tracing::Span::current()
1320 : .record("tenant_id", field::display(tenant_id))
1321 : .record("timeline_id", field::display(timeline_id));
1322 :
1323 : self.check_permission(Some(tenant_id))?;
1324 :
1325 : COMPUTE_COMMANDS_COUNTERS
1326 : .for_command(ComputeCommandKind::Basebackup)
1327 : .inc();
1328 :
1329 : let (lsn, gzip) = match (params.get(2), params.get(3)) {
1330 : (None, _) => (None, false),
1331 : (Some(&"--gzip"), _) => (None, true),
1332 : (Some(lsn_str), gzip_str_opt) => {
1333 : let lsn = Lsn::from_str(lsn_str)
1334 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?;
1335 : let gzip = match gzip_str_opt {
1336 : Some(&"--gzip") => true,
1337 : None => false,
1338 : Some(third_param) => {
1339 : return Err(QueryError::Other(anyhow::anyhow!(
1340 : "Parameter in position 3 unknown {third_param}",
1341 : )))
1342 : }
1343 : };
1344 : (Some(lsn), gzip)
1345 : }
1346 : };
1347 :
1348 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1349 0 : let res = async {
1350 0 : self.handle_basebackup_request(
1351 0 : pgb,
1352 0 : tenant_id,
1353 0 : timeline_id,
1354 0 : lsn,
1355 0 : None,
1356 0 : false,
1357 0 : gzip,
1358 0 : &ctx,
1359 0 : )
1360 0 : .await?;
1361 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1362 0 : Result::<(), QueryError>::Ok(())
1363 0 : }
1364 : .await;
1365 : metric_recording.observe(&res);
1366 : res?;
1367 : }
1368 : // same as basebackup, but result includes relational data as well
1369 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1370 : if params.len() < 2 {
1371 : return Err(QueryError::Other(anyhow::anyhow!(
1372 : "invalid param number for fullbackup command"
1373 : )));
1374 : }
1375 :
1376 : let tenant_id = TenantId::from_str(params[0])
1377 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1378 : let timeline_id = TimelineId::from_str(params[1])
1379 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1380 :
1381 : tracing::Span::current()
1382 : .record("tenant_id", field::display(tenant_id))
1383 : .record("timeline_id", field::display(timeline_id));
1384 :
1385 : // The caller is responsible for providing correct lsn and prev_lsn.
1386 : let lsn = if let Some(lsn_str) = params.get(2) {
1387 : Some(
1388 : Lsn::from_str(lsn_str)
1389 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1390 : )
1391 : } else {
1392 : None
1393 : };
1394 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1395 : Some(
1396 : Lsn::from_str(prev_lsn_str)
1397 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1398 : )
1399 : } else {
1400 : None
1401 : };
1402 :
1403 : self.check_permission(Some(tenant_id))?;
1404 :
1405 : COMPUTE_COMMANDS_COUNTERS
1406 : .for_command(ComputeCommandKind::Fullbackup)
1407 : .inc();
1408 :
1409 : // Check that the timeline exists
1410 : self.handle_basebackup_request(
1411 : pgb,
1412 : tenant_id,
1413 : timeline_id,
1414 : lsn,
1415 : prev_lsn,
1416 : true,
1417 : false,
1418 : &ctx,
1419 : )
1420 : .await?;
1421 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1422 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1423 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1424 : // on connect
1425 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1426 : } else if query_string.starts_with("lease lsn ") {
1427 : let params = &parts[2..];
1428 : if params.len() != 3 {
1429 : return Err(QueryError::Other(anyhow::anyhow!(
1430 : "invalid param number {} for lease lsn command",
1431 : params.len()
1432 : )));
1433 : }
1434 :
1435 : let tenant_shard_id = TenantShardId::from_str(params[0])
1436 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1437 : let timeline_id = TimelineId::from_str(params[1])
1438 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1439 :
1440 : tracing::Span::current()
1441 : .record("tenant_id", field::display(tenant_shard_id))
1442 : .record("timeline_id", field::display(timeline_id));
1443 :
1444 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1445 :
1446 : COMPUTE_COMMANDS_COUNTERS
1447 : .for_command(ComputeCommandKind::LeaseLsn)
1448 : .inc();
1449 :
1450 : // The caller is responsible for providing correct lsn.
1451 : let lsn = Lsn::from_str(params[2])
1452 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1453 :
1454 : match self
1455 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1456 : .await
1457 : {
1458 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1459 : Err(e) => {
1460 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1461 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1462 : &e.to_string(),
1463 : Some(e.pg_error_code()),
1464 : ))?
1465 : }
1466 : };
1467 : } else {
1468 : return Err(QueryError::Other(anyhow::anyhow!(
1469 : "unknown command {query_string}"
1470 : )));
1471 : }
1472 :
1473 : Ok(())
1474 : }
1475 : }
1476 :
1477 : impl From<GetActiveTenantError> for QueryError {
1478 0 : fn from(e: GetActiveTenantError) -> Self {
1479 0 : match e {
1480 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1481 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1482 0 : ),
1483 : GetActiveTenantError::Cancelled
1484 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1485 0 : QueryError::Shutdown
1486 : }
1487 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1488 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1489 : }
1490 0 : }
1491 : }
1492 :
1493 0 : #[derive(Debug, thiserror::Error)]
1494 : pub(crate) enum GetActiveTimelineError {
1495 : #[error(transparent)]
1496 : Tenant(GetActiveTenantError),
1497 : #[error(transparent)]
1498 : Timeline(#[from] GetTimelineError),
1499 : }
1500 :
1501 : impl From<GetActiveTimelineError> for QueryError {
1502 0 : fn from(e: GetActiveTimelineError) -> Self {
1503 0 : match e {
1504 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1505 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1506 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1507 : }
1508 0 : }
1509 : }
1510 :
1511 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1512 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1513 0 : tracing::Span::current().record(
1514 0 : "shard_id",
1515 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1516 0 : );
1517 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1518 0 : }
|