Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::config::{
11 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
12 : PageServiceProtocolPipelinedExecutionStrategy,
13 : };
14 : use pageserver_api::models::{self, TenantState};
15 : use pageserver_api::models::{
16 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
17 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
18 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
19 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
20 : PagestreamProtocolVersion, PagestreamRequest,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{
24 : is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError,
25 : };
26 : use pq_proto::framed::ConnectionError;
27 : use pq_proto::FeStartupPacket;
28 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
29 : use std::borrow::Cow;
30 : use std::io;
31 : use std::num::NonZeroUsize;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::SystemTime;
36 : use std::time::{Duration, Instant};
37 : use tokio::io::{AsyncRead, AsyncWrite};
38 : use tokio::io::{AsyncWriteExt, BufWriter};
39 : use tokio::task::JoinHandle;
40 : use tokio_util::sync::CancellationToken;
41 : use tracing::*;
42 : use utils::sync::gate::{Gate, GateGuard};
43 : use utils::sync::spsc_fold;
44 : use utils::{
45 : auth::{Claims, Scope, SwappableJwtAuth},
46 : id::{TenantId, TimelineId},
47 : lsn::Lsn,
48 : simple_rcu::RcuReadGuard,
49 : };
50 :
51 : use crate::auth::check_permission;
52 : use crate::basebackup::BasebackupError;
53 : use crate::config::PageServerConf;
54 : use crate::context::{DownloadBehavior, RequestContext};
55 : use crate::metrics::{self, SmgrOpTimer};
56 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
57 : use crate::pgdatadir_mapping::Version;
58 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
59 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
60 : use crate::task_mgr::TaskKind;
61 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
62 : use crate::tenant::mgr::ShardSelector;
63 : use crate::tenant::mgr::TenantManager;
64 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
65 : use crate::tenant::storage_layer::IoConcurrency;
66 : use crate::tenant::timeline::{self, WaitLsnError};
67 : use crate::tenant::GetTimelineError;
68 : use crate::tenant::PageReconstructError;
69 : use crate::tenant::Timeline;
70 : use crate::{basebackup, timed_after_cancellation};
71 : use pageserver_api::key::rel_block_to_key;
72 : use pageserver_api::models::PageTraceEvent;
73 : use pageserver_api::reltag::SlruKind;
74 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
75 : use postgres_ffi::BLCKSZ;
76 :
77 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
78 : /// is not yet in state [`TenantState::Active`].
79 : ///
80 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
81 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
82 :
83 : ///////////////////////////////////////////////////////////////////////////////
84 :
85 : pub struct Listener {
86 : cancel: CancellationToken,
87 : /// Cancel the listener task through `listen_cancel` to shut down the listener
88 : /// and get a handle on the existing connections.
89 : task: JoinHandle<Connections>,
90 : }
91 :
92 : pub struct Connections {
93 : cancel: CancellationToken,
94 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
95 : gate: Gate,
96 : }
97 :
98 0 : pub fn spawn(
99 0 : conf: &'static PageServerConf,
100 0 : tenant_manager: Arc<TenantManager>,
101 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
102 0 : tcp_listener: tokio::net::TcpListener,
103 0 : ) -> Listener {
104 0 : let cancel = CancellationToken::new();
105 0 : let libpq_ctx = RequestContext::todo_child(
106 0 : TaskKind::LibpqEndpointListener,
107 0 : // listener task shouldn't need to download anything. (We will
108 0 : // create a separate sub-contexts for each connection, with their
109 0 : // own download behavior. This context is used only to listen and
110 0 : // accept connections.)
111 0 : DownloadBehavior::Error,
112 0 : );
113 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
114 0 : "libpq listener",
115 0 : libpq_listener_main(
116 0 : conf,
117 0 : tenant_manager,
118 0 : pg_auth,
119 0 : tcp_listener,
120 0 : conf.pg_auth_type,
121 0 : conf.page_service_pipelining.clone(),
122 0 : libpq_ctx,
123 0 : cancel.clone(),
124 0 : )
125 0 : .map(anyhow::Ok),
126 0 : ));
127 0 :
128 0 : Listener { cancel, task }
129 0 : }
130 :
131 : impl Listener {
132 0 : pub async fn stop_accepting(self) -> Connections {
133 0 : self.cancel.cancel();
134 0 : self.task
135 0 : .await
136 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
137 0 : }
138 : }
139 : impl Connections {
140 0 : pub(crate) async fn shutdown(self) {
141 0 : let Self {
142 0 : cancel,
143 0 : mut tasks,
144 0 : gate,
145 0 : } = self;
146 0 : cancel.cancel();
147 0 : while let Some(res) = tasks.join_next().await {
148 0 : Self::handle_connection_completion(res);
149 0 : }
150 0 : gate.close().await;
151 0 : }
152 :
153 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
154 0 : match res {
155 0 : Ok(Ok(())) => {}
156 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
157 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
158 : }
159 0 : }
160 : }
161 :
162 : ///
163 : /// Main loop of the page service.
164 : ///
165 : /// Listens for connections, and launches a new handler task for each.
166 : ///
167 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
168 : /// open connections.
169 : ///
170 : #[allow(clippy::too_many_arguments)]
171 0 : pub async fn libpq_listener_main(
172 0 : conf: &'static PageServerConf,
173 0 : tenant_manager: Arc<TenantManager>,
174 0 : auth: Option<Arc<SwappableJwtAuth>>,
175 0 : listener: tokio::net::TcpListener,
176 0 : auth_type: AuthType,
177 0 : pipelining_config: PageServicePipeliningConfig,
178 0 : listener_ctx: RequestContext,
179 0 : listener_cancel: CancellationToken,
180 0 : ) -> Connections {
181 0 : let connections_cancel = CancellationToken::new();
182 0 : let connections_gate = Gate::default();
183 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
184 :
185 : loop {
186 0 : let gate_guard = match connections_gate.enter() {
187 0 : Ok(guard) => guard,
188 0 : Err(_) => break,
189 : };
190 :
191 0 : let accepted = tokio::select! {
192 : biased;
193 0 : _ = listener_cancel.cancelled() => break,
194 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
195 0 : let res = next.expect("we dont poll while empty");
196 0 : Connections::handle_connection_completion(res);
197 0 : continue;
198 : }
199 0 : accepted = listener.accept() => accepted,
200 0 : };
201 0 :
202 0 : match accepted {
203 0 : Ok((socket, peer_addr)) => {
204 0 : // Connection established. Spawn a new task to handle it.
205 0 : debug!("accepted connection from {}", peer_addr);
206 0 : let local_auth = auth.clone();
207 0 : let connection_ctx = listener_ctx
208 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
209 0 : connection_handler_tasks.spawn(page_service_conn_main(
210 0 : conf,
211 0 : tenant_manager.clone(),
212 0 : local_auth,
213 0 : socket,
214 0 : auth_type,
215 0 : pipelining_config.clone(),
216 0 : connection_ctx,
217 0 : connections_cancel.child_token(),
218 0 : gate_guard,
219 0 : ));
220 : }
221 0 : Err(err) => {
222 0 : // accept() failed. Log the error, and loop back to retry on next connection.
223 0 : error!("accept() failed: {:?}", err);
224 : }
225 : }
226 : }
227 :
228 0 : debug!("page_service listener loop terminated");
229 :
230 0 : Connections {
231 0 : cancel: connections_cancel,
232 0 : tasks: connection_handler_tasks,
233 0 : gate: connections_gate,
234 0 : }
235 0 : }
236 :
237 : type ConnectionHandlerResult = anyhow::Result<()>;
238 :
239 : #[instrument(skip_all, fields(peer_addr))]
240 : #[allow(clippy::too_many_arguments)]
241 : async fn page_service_conn_main(
242 : conf: &'static PageServerConf,
243 : tenant_manager: Arc<TenantManager>,
244 : auth: Option<Arc<SwappableJwtAuth>>,
245 : socket: tokio::net::TcpStream,
246 : auth_type: AuthType,
247 : pipelining_config: PageServicePipeliningConfig,
248 : connection_ctx: RequestContext,
249 : cancel: CancellationToken,
250 : gate_guard: GateGuard,
251 : ) -> ConnectionHandlerResult {
252 : let _guard = LIVE_CONNECTIONS
253 : .with_label_values(&["page_service"])
254 : .guard();
255 :
256 : socket
257 : .set_nodelay(true)
258 : .context("could not set TCP_NODELAY")?;
259 :
260 : let peer_addr = socket.peer_addr().context("get peer address")?;
261 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
262 :
263 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
264 : // - long enough for most valid compute connections
265 : // - less than infinite to stop us from "leaking" connections to long-gone computes
266 : //
267 : // no write timeout is used, because the kernel is assumed to error writes after some time.
268 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
269 :
270 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
271 0 : let socket_timeout_ms = (|| {
272 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
273 : // Exponential distribution for simulating
274 : // poor network conditions, expect about avg_timeout_ms to be around 15
275 : // in tests
276 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
277 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
278 0 : let u = rand::random::<f32>();
279 0 : ((1.0 - u).ln() / (-avg)) as u64
280 : } else {
281 0 : default_timeout_ms
282 : }
283 0 : });
284 0 : default_timeout_ms
285 : })();
286 :
287 : // A timeout here does not mean the client died, it can happen if it's just idle for
288 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
289 : // they reconnect.
290 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
291 : let socket = Box::pin(socket);
292 :
293 : fail::fail_point!("ps::connection-start::pre-login");
294 :
295 : // XXX: pgbackend.run() should take the connection_ctx,
296 : // and create a child per-query context when it invokes process_query.
297 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
298 : // and create the per-query context in process_query ourselves.
299 : let mut conn_handler = PageServerHandler::new(
300 : conf,
301 : tenant_manager,
302 : auth,
303 : pipelining_config,
304 : connection_ctx,
305 : cancel.clone(),
306 : gate_guard,
307 : );
308 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
309 :
310 : match pgbackend.run(&mut conn_handler, &cancel).await {
311 : Ok(()) => {
312 : // we've been requested to shut down
313 : Ok(())
314 : }
315 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
316 : if is_expected_io_error(&io_error) {
317 : info!("Postgres client disconnected ({io_error})");
318 : Ok(())
319 : } else {
320 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
321 : Err(io_error).context(format!(
322 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
323 : tenant_id, peer_addr
324 : ))
325 : }
326 : }
327 : other => {
328 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
329 : other.context(format!(
330 : "Postgres query error for tenant_id={:?} client peer_addr={}",
331 : tenant_id, peer_addr
332 : ))
333 : }
334 : }
335 : }
336 :
337 : struct PageServerHandler {
338 : conf: &'static PageServerConf,
339 : auth: Option<Arc<SwappableJwtAuth>>,
340 : claims: Option<Claims>,
341 :
342 : /// The context created for the lifetime of the connection
343 : /// services by this PageServerHandler.
344 : /// For each query received over the connection,
345 : /// `process_query` creates a child context from this one.
346 : connection_ctx: RequestContext,
347 :
348 : cancel: CancellationToken,
349 :
350 : /// None only while pagestream protocol is being processed.
351 : timeline_handles: Option<TimelineHandles>,
352 :
353 : pipelining_config: PageServicePipeliningConfig,
354 :
355 : gate_guard: GateGuard,
356 : }
357 :
358 : struct TimelineHandles {
359 : wrapper: TenantManagerWrapper,
360 : /// Note on size: the typical size of this map is 1. The largest size we expect
361 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
362 : /// or the ratio used when splitting shards (i.e. how many children created from one)
363 : /// parent shard, where a "large" number might be ~8.
364 : handles: timeline::handle::Cache<TenantManagerTypes>,
365 : }
366 :
367 : impl TimelineHandles {
368 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
369 0 : Self {
370 0 : wrapper: TenantManagerWrapper {
371 0 : tenant_manager,
372 0 : tenant_id: OnceCell::new(),
373 0 : },
374 0 : handles: Default::default(),
375 0 : }
376 0 : }
377 0 : async fn get(
378 0 : &mut self,
379 0 : tenant_id: TenantId,
380 0 : timeline_id: TimelineId,
381 0 : shard_selector: ShardSelector,
382 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
383 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
384 0 : return Err(GetActiveTimelineError::Tenant(
385 0 : GetActiveTenantError::SwitchedTenant,
386 0 : ));
387 0 : }
388 0 : self.handles
389 0 : .get(timeline_id, shard_selector, &self.wrapper)
390 0 : .await
391 0 : .map_err(|e| match e {
392 0 : timeline::handle::GetError::TenantManager(e) => e,
393 : timeline::handle::GetError::TimelineGateClosed => {
394 0 : trace!("timeline gate closed");
395 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
396 : }
397 : timeline::handle::GetError::PerTimelineStateShutDown => {
398 0 : trace!("per-timeline state shut down");
399 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
400 : }
401 0 : })
402 0 : }
403 :
404 0 : fn tenant_id(&self) -> Option<TenantId> {
405 0 : self.wrapper.tenant_id.get().copied()
406 0 : }
407 : }
408 :
409 : pub(crate) struct TenantManagerWrapper {
410 : tenant_manager: Arc<TenantManager>,
411 : // We do not support switching tenant_id on a connection at this point.
412 : // We can can add support for this later if needed without changing
413 : // the protocol.
414 : tenant_id: once_cell::sync::OnceCell<TenantId>,
415 : }
416 :
417 : #[derive(Debug)]
418 : pub(crate) struct TenantManagerTypes;
419 :
420 : impl timeline::handle::Types for TenantManagerTypes {
421 : type TenantManagerError = GetActiveTimelineError;
422 : type TenantManager = TenantManagerWrapper;
423 : type Timeline = Arc<Timeline>;
424 : }
425 :
426 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
427 0 : fn gate(&self) -> &utils::sync::gate::Gate {
428 0 : &self.gate
429 0 : }
430 :
431 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
432 0 : Timeline::shard_timeline_id(self)
433 0 : }
434 :
435 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
436 0 : &self.handles
437 0 : }
438 :
439 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
440 0 : Timeline::get_shard_identity(self)
441 0 : }
442 : }
443 :
444 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
445 0 : async fn resolve(
446 0 : &self,
447 0 : timeline_id: TimelineId,
448 0 : shard_selector: ShardSelector,
449 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
450 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
451 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
452 0 : let wait_start = Instant::now();
453 0 : let deadline = wait_start + timeout;
454 0 : let tenant_shard = loop {
455 0 : let resolved = self
456 0 : .tenant_manager
457 0 : .resolve_attached_shard(tenant_id, shard_selector);
458 0 : match resolved {
459 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
460 : ShardResolveResult::NotFound => {
461 0 : return Err(GetActiveTimelineError::Tenant(
462 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
463 0 : ));
464 : }
465 0 : ShardResolveResult::InProgress(barrier) => {
466 0 : // We can't authoritatively answer right now: wait for InProgress state
467 0 : // to end, then try again
468 0 : tokio::select! {
469 0 : _ = barrier.wait() => {
470 0 : // The barrier completed: proceed around the loop to try looking up again
471 0 : },
472 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
473 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
474 0 : latest_state: None,
475 0 : wait_time: timeout,
476 0 : }));
477 : }
478 : }
479 : }
480 : };
481 : };
482 :
483 0 : tracing::debug!("Waiting for tenant to enter active state...");
484 0 : tenant_shard
485 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
486 0 : .await
487 0 : .map_err(GetActiveTimelineError::Tenant)?;
488 :
489 0 : let timeline = tenant_shard
490 0 : .get_timeline(timeline_id, true)
491 0 : .map_err(GetActiveTimelineError::Timeline)?;
492 0 : set_tracing_field_shard_id(&timeline);
493 0 : Ok(timeline)
494 0 : }
495 : }
496 :
497 : #[derive(thiserror::Error, Debug)]
498 : enum PageStreamError {
499 : /// We encountered an error that should prompt the client to reconnect:
500 : /// in practice this means we drop the connection without sending a response.
501 : #[error("Reconnect required: {0}")]
502 : Reconnect(Cow<'static, str>),
503 :
504 : /// We were instructed to shutdown while processing the query
505 : #[error("Shutting down")]
506 : Shutdown,
507 :
508 : /// Something went wrong reading a page: this likely indicates a pageserver bug
509 : #[error("Read error")]
510 : Read(#[source] PageReconstructError),
511 :
512 : /// Ran out of time waiting for an LSN
513 : #[error("LSN timeout: {0}")]
514 : LsnTimeout(WaitLsnError),
515 :
516 : /// The entity required to serve the request (tenant or timeline) is not found,
517 : /// or is not found in a suitable state to serve a request.
518 : #[error("Not found: {0}")]
519 : NotFound(Cow<'static, str>),
520 :
521 : /// Request asked for something that doesn't make sense, like an invalid LSN
522 : #[error("Bad request: {0}")]
523 : BadRequest(Cow<'static, str>),
524 : }
525 :
526 : impl From<PageReconstructError> for PageStreamError {
527 0 : fn from(value: PageReconstructError) -> Self {
528 0 : match value {
529 0 : PageReconstructError::Cancelled => Self::Shutdown,
530 0 : e => Self::Read(e),
531 : }
532 0 : }
533 : }
534 :
535 : impl From<GetActiveTimelineError> for PageStreamError {
536 0 : fn from(value: GetActiveTimelineError) -> Self {
537 0 : match value {
538 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
539 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
540 : TenantState::Stopping { .. },
541 : ))
542 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
543 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
544 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
545 : }
546 0 : }
547 : }
548 :
549 : impl From<WaitLsnError> for PageStreamError {
550 0 : fn from(value: WaitLsnError) -> Self {
551 0 : match value {
552 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
553 0 : WaitLsnError::Shutdown => Self::Shutdown,
554 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
555 : }
556 0 : }
557 : }
558 :
559 : impl From<WaitLsnError> for QueryError {
560 0 : fn from(value: WaitLsnError) -> Self {
561 0 : match value {
562 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
563 0 : WaitLsnError::Shutdown => Self::Shutdown,
564 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
565 : }
566 0 : }
567 : }
568 :
569 : #[derive(thiserror::Error, Debug)]
570 : struct BatchedPageStreamError {
571 : req: PagestreamRequest,
572 : err: PageStreamError,
573 : }
574 :
575 : impl std::fmt::Display for BatchedPageStreamError {
576 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
577 0 : self.err.fmt(f)
578 0 : }
579 : }
580 :
581 : struct BatchedGetPageRequest {
582 : req: PagestreamGetPageRequest,
583 : timer: SmgrOpTimer,
584 : }
585 :
586 : #[cfg(feature = "testing")]
587 : struct BatchedTestRequest {
588 : req: models::PagestreamTestRequest,
589 : timer: SmgrOpTimer,
590 : }
591 :
592 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
593 : /// so that we don't keep the [`Timeline::gate`] open while the batch
594 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
595 : enum BatchedFeMessage {
596 : Exists {
597 : span: Span,
598 : timer: SmgrOpTimer,
599 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
600 : req: models::PagestreamExistsRequest,
601 : },
602 : Nblocks {
603 : span: Span,
604 : timer: SmgrOpTimer,
605 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
606 : req: models::PagestreamNblocksRequest,
607 : },
608 : GetPage {
609 : span: Span,
610 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
611 : effective_request_lsn: Lsn,
612 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
613 : },
614 : DbSize {
615 : span: Span,
616 : timer: SmgrOpTimer,
617 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
618 : req: models::PagestreamDbSizeRequest,
619 : },
620 : GetSlruSegment {
621 : span: Span,
622 : timer: SmgrOpTimer,
623 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
624 : req: models::PagestreamGetSlruSegmentRequest,
625 : },
626 : #[cfg(feature = "testing")]
627 : Test {
628 : span: Span,
629 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
630 : requests: Vec<BatchedTestRequest>,
631 : },
632 : RespondError {
633 : span: Span,
634 : error: BatchedPageStreamError,
635 : },
636 : }
637 :
638 : impl BatchedFeMessage {
639 0 : fn observe_execution_start(&mut self, at: Instant) {
640 0 : match self {
641 0 : BatchedFeMessage::Exists { timer, .. }
642 0 : | BatchedFeMessage::Nblocks { timer, .. }
643 0 : | BatchedFeMessage::DbSize { timer, .. }
644 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
645 0 : timer.observe_execution_start(at);
646 0 : }
647 0 : BatchedFeMessage::GetPage { pages, .. } => {
648 0 : for page in pages {
649 0 : page.timer.observe_execution_start(at);
650 0 : }
651 : }
652 : #[cfg(feature = "testing")]
653 0 : BatchedFeMessage::Test { requests, .. } => {
654 0 : for req in requests {
655 0 : req.timer.observe_execution_start(at);
656 0 : }
657 : }
658 0 : BatchedFeMessage::RespondError { .. } => {}
659 : }
660 0 : }
661 : }
662 :
663 : impl PageServerHandler {
664 0 : pub fn new(
665 0 : conf: &'static PageServerConf,
666 0 : tenant_manager: Arc<TenantManager>,
667 0 : auth: Option<Arc<SwappableJwtAuth>>,
668 0 : pipelining_config: PageServicePipeliningConfig,
669 0 : connection_ctx: RequestContext,
670 0 : cancel: CancellationToken,
671 0 : gate_guard: GateGuard,
672 0 : ) -> Self {
673 0 : PageServerHandler {
674 0 : conf,
675 0 : auth,
676 0 : claims: None,
677 0 : connection_ctx,
678 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
679 0 : cancel,
680 0 : pipelining_config,
681 0 : gate_guard,
682 0 : }
683 0 : }
684 :
685 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
686 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
687 : /// cancellation if there aren't any timelines in the cache.
688 : ///
689 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
690 : /// timeline cancellation token.
691 0 : async fn flush_cancellable<IO>(
692 0 : &self,
693 0 : pgb: &mut PostgresBackend<IO>,
694 0 : cancel: &CancellationToken,
695 0 : ) -> Result<(), QueryError>
696 0 : where
697 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
698 0 : {
699 0 : tokio::select!(
700 0 : flush_r = pgb.flush() => {
701 0 : Ok(flush_r?)
702 : },
703 0 : _ = cancel.cancelled() => {
704 0 : Err(QueryError::Shutdown)
705 : }
706 : )
707 0 : }
708 :
709 : #[allow(clippy::too_many_arguments)]
710 0 : async fn pagestream_read_message<IO>(
711 0 : pgb: &mut PostgresBackendReader<IO>,
712 0 : tenant_id: TenantId,
713 0 : timeline_id: TimelineId,
714 0 : timeline_handles: &mut TimelineHandles,
715 0 : cancel: &CancellationToken,
716 0 : ctx: &RequestContext,
717 0 : protocol_version: PagestreamProtocolVersion,
718 0 : parent_span: Span,
719 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
720 0 : where
721 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
722 0 : {
723 0 : let msg = tokio::select! {
724 : biased;
725 0 : _ = cancel.cancelled() => {
726 0 : return Err(QueryError::Shutdown)
727 : }
728 0 : msg = pgb.read_message() => { msg }
729 0 : };
730 0 :
731 0 : let received_at = Instant::now();
732 :
733 0 : let copy_data_bytes = match msg? {
734 0 : Some(FeMessage::CopyData(bytes)) => bytes,
735 : Some(FeMessage::Terminate) => {
736 0 : return Ok(None);
737 : }
738 0 : Some(m) => {
739 0 : return Err(QueryError::Other(anyhow::anyhow!(
740 0 : "unexpected message: {m:?} during COPY"
741 0 : )));
742 : }
743 : None => {
744 0 : return Ok(None);
745 : } // client disconnected
746 : };
747 0 : trace!("query: {copy_data_bytes:?}");
748 :
749 0 : fail::fail_point!("ps::handle-pagerequest-message");
750 :
751 : // parse request
752 0 : let neon_fe_msg =
753 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
754 :
755 : // TODO: turn in to async closure once available to avoid repeating received_at
756 0 : async fn record_op_start_and_throttle(
757 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
758 0 : op: metrics::SmgrQueryType,
759 0 : received_at: Instant,
760 0 : ) -> Result<SmgrOpTimer, QueryError> {
761 0 : // It's important to start the smgr op metric recorder as early as possible
762 0 : // so that the _started counters are incremented before we do
763 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
764 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
765 0 : let now = Instant::now();
766 0 : timer.observe_throttle_start(now);
767 0 : let throttled = tokio::select! {
768 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
769 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
770 : };
771 0 : timer.observe_throttle_done(throttled);
772 0 : Ok(timer)
773 0 : }
774 :
775 0 : let batched_msg = match neon_fe_msg {
776 0 : PagestreamFeMessage::Exists(req) => {
777 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
778 0 : let shard = timeline_handles
779 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
780 0 : .instrument(span.clone()) // sets `shard_id` field
781 0 : .await?;
782 0 : let timer = record_op_start_and_throttle(
783 0 : &shard,
784 0 : metrics::SmgrQueryType::GetRelExists,
785 0 : received_at,
786 0 : )
787 0 : .await?;
788 0 : BatchedFeMessage::Exists {
789 0 : span,
790 0 : timer,
791 0 : shard: shard.downgrade(),
792 0 : req,
793 0 : }
794 : }
795 0 : PagestreamFeMessage::Nblocks(req) => {
796 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
797 0 : let shard = timeline_handles
798 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
799 0 : .instrument(span.clone()) // sets `shard_id` field
800 0 : .await?;
801 0 : let timer = record_op_start_and_throttle(
802 0 : &shard,
803 0 : metrics::SmgrQueryType::GetRelSize,
804 0 : received_at,
805 0 : )
806 0 : .await?;
807 0 : BatchedFeMessage::Nblocks {
808 0 : span,
809 0 : timer,
810 0 : shard: shard.downgrade(),
811 0 : req,
812 0 : }
813 : }
814 0 : PagestreamFeMessage::DbSize(req) => {
815 0 : let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
816 0 : let shard = timeline_handles
817 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
818 0 : .instrument(span.clone()) // sets `shard_id` field
819 0 : .await?;
820 0 : let timer = record_op_start_and_throttle(
821 0 : &shard,
822 0 : metrics::SmgrQueryType::GetDbSize,
823 0 : received_at,
824 0 : )
825 0 : .await?;
826 0 : BatchedFeMessage::DbSize {
827 0 : span,
828 0 : timer,
829 0 : shard: shard.downgrade(),
830 0 : req,
831 0 : }
832 : }
833 0 : PagestreamFeMessage::GetSlruSegment(req) => {
834 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
835 0 : let shard = timeline_handles
836 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
837 0 : .instrument(span.clone()) // sets `shard_id` field
838 0 : .await?;
839 0 : let timer = record_op_start_and_throttle(
840 0 : &shard,
841 0 : metrics::SmgrQueryType::GetSlruSegment,
842 0 : received_at,
843 0 : )
844 0 : .await?;
845 0 : BatchedFeMessage::GetSlruSegment {
846 0 : span,
847 0 : timer,
848 0 : shard: shard.downgrade(),
849 0 : req,
850 0 : }
851 : }
852 0 : PagestreamFeMessage::GetPage(req) => {
853 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
854 :
855 : macro_rules! respond_error {
856 : ($error:expr) => {{
857 : let error = BatchedFeMessage::RespondError {
858 : span,
859 : error: BatchedPageStreamError {
860 : req: req.hdr,
861 : err: $error,
862 : },
863 : };
864 : Ok(Some(error))
865 : }};
866 : }
867 :
868 0 : let key = rel_block_to_key(req.rel, req.blkno);
869 0 : let shard = match timeline_handles
870 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
871 0 : .instrument(span.clone()) // sets `shard_id` field
872 0 : .await
873 : {
874 0 : Ok(tl) => tl,
875 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
876 : // We already know this tenant exists in general, because we resolved it at
877 : // start of connection. Getting a NotFound here indicates that the shard containing
878 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
879 : // mapping is out of date.
880 : //
881 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
882 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
883 : // and talk to a different pageserver.
884 0 : return respond_error!(PageStreamError::Reconnect(
885 0 : "getpage@lsn request routed to wrong shard".into()
886 0 : ));
887 : }
888 0 : Err(e) => {
889 0 : return respond_error!(e.into());
890 : }
891 : };
892 :
893 0 : let timer = record_op_start_and_throttle(
894 0 : &shard,
895 0 : metrics::SmgrQueryType::GetPageAtLsn,
896 0 : received_at,
897 0 : )
898 0 : .await?;
899 :
900 : // We're holding the Handle
901 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
902 0 : &shard,
903 0 : req.hdr.request_lsn,
904 0 : req.hdr.not_modified_since,
905 0 : &shard.get_latest_gc_cutoff_lsn(),
906 0 : ctx,
907 0 : )
908 0 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
909 0 : .await
910 : {
911 0 : Ok(lsn) => lsn,
912 0 : Err(e) => {
913 0 : return respond_error!(e);
914 : }
915 : };
916 : BatchedFeMessage::GetPage {
917 0 : span,
918 0 : shard: shard.downgrade(),
919 0 : effective_request_lsn,
920 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
921 : }
922 : }
923 : #[cfg(feature = "testing")]
924 0 : PagestreamFeMessage::Test(req) => {
925 0 : let span = tracing::info_span!(parent: parent_span, "handle_test_request");
926 0 : let shard = timeline_handles
927 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
928 0 : .instrument(span.clone()) // sets `shard_id` field
929 0 : .await?;
930 0 : let timer =
931 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
932 0 : .await?;
933 0 : BatchedFeMessage::Test {
934 0 : span,
935 0 : shard: shard.downgrade(),
936 0 : requests: vec![BatchedTestRequest { req, timer }],
937 0 : }
938 : }
939 : };
940 0 : Ok(Some(batched_msg))
941 0 : }
942 :
943 : /// Post-condition: `batch` is Some()
944 : #[instrument(skip_all, level = tracing::Level::TRACE)]
945 : #[allow(clippy::boxed_local)]
946 : fn pagestream_do_batch(
947 : max_batch_size: NonZeroUsize,
948 : batch: &mut Result<BatchedFeMessage, QueryError>,
949 : this_msg: Result<BatchedFeMessage, QueryError>,
950 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
951 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
952 :
953 : let this_msg = match this_msg {
954 : Ok(this_msg) => this_msg,
955 : Err(e) => return Err(Err(e)),
956 : };
957 :
958 : match (&mut *batch, this_msg) {
959 : // something batched already, let's see if we can add this message to the batch
960 : (
961 : Ok(BatchedFeMessage::GetPage {
962 : span: _,
963 : shard: accum_shard,
964 : pages: ref mut accum_pages,
965 : effective_request_lsn: accum_lsn,
966 : }),
967 : BatchedFeMessage::GetPage {
968 : span: _,
969 : shard: this_shard,
970 : pages: this_pages,
971 : effective_request_lsn: this_lsn,
972 : },
973 0 : ) if (|| {
974 0 : assert_eq!(this_pages.len(), 1);
975 0 : if accum_pages.len() >= max_batch_size.get() {
976 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
977 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
978 0 : return false;
979 0 : }
980 0 : if !accum_shard.is_same_handle_as(&this_shard) {
981 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
982 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
983 : // But the current logic for keeping responses in order does not support that.
984 0 : return false;
985 0 : }
986 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
987 0 : // as the effective request_lsn changes
988 0 : if *accum_lsn != this_lsn {
989 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
990 0 : return false;
991 0 : }
992 0 : true
993 : })() =>
994 : {
995 : // ok to batch
996 : accum_pages.extend(this_pages);
997 : Ok(())
998 : }
999 : #[cfg(feature = "testing")]
1000 : (
1001 : Ok(BatchedFeMessage::Test {
1002 : shard: accum_shard,
1003 : requests: accum_requests,
1004 : ..
1005 : }),
1006 : BatchedFeMessage::Test {
1007 : shard: this_shard,
1008 : requests: this_requests,
1009 : ..
1010 : },
1011 0 : ) if (|| {
1012 0 : assert!(this_requests.len() == 1);
1013 0 : if accum_requests.len() >= max_batch_size.get() {
1014 0 : trace!(%max_batch_size, "stopping batching because of batch size");
1015 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
1016 0 : return false;
1017 0 : }
1018 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1019 0 : trace!("stopping batching because timeline object mismatch");
1020 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1021 : // But the current logic for keeping responses in order does not support that.
1022 0 : return false;
1023 0 : }
1024 0 : let this_batch_key = this_requests[0].req.batch_key;
1025 0 : let accum_batch_key = accum_requests[0].req.batch_key;
1026 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
1027 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
1028 0 : return false;
1029 0 : }
1030 0 : true
1031 : })() =>
1032 : {
1033 : // ok to batch
1034 : accum_requests.extend(this_requests);
1035 : Ok(())
1036 : }
1037 : // something batched already but this message is unbatchable
1038 : (_, this_msg) => {
1039 : // by default, don't continue batching
1040 : Err(Ok(this_msg))
1041 : }
1042 : }
1043 : }
1044 :
1045 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1046 : async fn pagesteam_handle_batched_message<IO>(
1047 : &mut self,
1048 : pgb_writer: &mut PostgresBackend<IO>,
1049 : batch: BatchedFeMessage,
1050 : io_concurrency: IoConcurrency,
1051 : cancel: &CancellationToken,
1052 : protocol_version: PagestreamProtocolVersion,
1053 : ctx: &RequestContext,
1054 : ) -> Result<(), QueryError>
1055 : where
1056 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1057 : {
1058 : let started_at = Instant::now();
1059 : let batch = {
1060 : let mut batch = batch;
1061 : batch.observe_execution_start(started_at);
1062 : batch
1063 : };
1064 :
1065 : // invoke handler function
1066 : let (handler_results, span): (
1067 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1068 : _,
1069 : ) = match batch {
1070 : BatchedFeMessage::Exists {
1071 : span,
1072 : timer,
1073 : shard,
1074 : req,
1075 : } => {
1076 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1077 : (
1078 : vec![self
1079 : .handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
1080 : .instrument(span.clone())
1081 : .await
1082 0 : .map(|msg| (msg, timer))
1083 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1084 : span,
1085 : )
1086 : }
1087 : BatchedFeMessage::Nblocks {
1088 : span,
1089 : timer,
1090 : shard,
1091 : req,
1092 : } => {
1093 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1094 : (
1095 : vec![self
1096 : .handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
1097 : .instrument(span.clone())
1098 : .await
1099 0 : .map(|msg| (msg, timer))
1100 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1101 : span,
1102 : )
1103 : }
1104 : BatchedFeMessage::GetPage {
1105 : span,
1106 : shard,
1107 : effective_request_lsn,
1108 : pages,
1109 : } => {
1110 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1111 : (
1112 : {
1113 : let npages = pages.len();
1114 : trace!(npages, "handling getpage request");
1115 : let res = self
1116 : .handle_get_page_at_lsn_request_batched(
1117 : &*shard.upgrade()?,
1118 : effective_request_lsn,
1119 : pages,
1120 : io_concurrency,
1121 : ctx,
1122 : )
1123 : .instrument(span.clone())
1124 : .await;
1125 : assert_eq!(res.len(), npages);
1126 : res
1127 : },
1128 : span,
1129 : )
1130 : }
1131 : BatchedFeMessage::DbSize {
1132 : span,
1133 : timer,
1134 : shard,
1135 : req,
1136 : } => {
1137 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1138 : (
1139 : vec![self
1140 : .handle_db_size_request(&*shard.upgrade()?, &req, ctx)
1141 : .instrument(span.clone())
1142 : .await
1143 0 : .map(|msg| (msg, timer))
1144 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1145 : span,
1146 : )
1147 : }
1148 : BatchedFeMessage::GetSlruSegment {
1149 : span,
1150 : timer,
1151 : shard,
1152 : req,
1153 : } => {
1154 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1155 : (
1156 : vec![self
1157 : .handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
1158 : .instrument(span.clone())
1159 : .await
1160 0 : .map(|msg| (msg, timer))
1161 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1162 : span,
1163 : )
1164 : }
1165 : #[cfg(feature = "testing")]
1166 : BatchedFeMessage::Test {
1167 : span,
1168 : shard,
1169 : requests,
1170 : } => {
1171 : fail::fail_point!("ps::handle-pagerequest-message::test");
1172 : (
1173 : {
1174 : let npages = requests.len();
1175 : trace!(npages, "handling getpage request");
1176 : let res = self
1177 : .handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
1178 : .instrument(span.clone())
1179 : .await;
1180 : assert_eq!(res.len(), npages);
1181 : res
1182 : },
1183 : span,
1184 : )
1185 : }
1186 : BatchedFeMessage::RespondError { span, error } => {
1187 : // We've already decided to respond with an error, so we don't need to
1188 : // call the handler.
1189 : (vec![Err(error)], span)
1190 : }
1191 : };
1192 :
1193 : // Map handler result to protocol behavior.
1194 : // Some handler errors cause exit from pagestream protocol.
1195 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1196 : for handler_result in handler_results {
1197 : let (response_msg, timer) = match handler_result {
1198 : Err(e) => match &e.err {
1199 : PageStreamError::Shutdown => {
1200 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1201 : // shutdown, then do not send the error to the client. Instead just drop the
1202 : // connection.
1203 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1204 : return Err(QueryError::Shutdown);
1205 : }
1206 : PageStreamError::Reconnect(reason) => {
1207 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1208 : return Err(QueryError::Reconnect);
1209 : }
1210 : PageStreamError::Read(_)
1211 : | PageStreamError::LsnTimeout(_)
1212 : | PageStreamError::NotFound(_)
1213 : | PageStreamError::BadRequest(_) => {
1214 : // print the all details to the log with {:#}, but for the client the
1215 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1216 : // here includes cancellation which is not an error.
1217 : let full = utils::error::report_compact_sources(&e.err);
1218 0 : span.in_scope(|| {
1219 0 : error!("error reading relation or page version: {full:#}")
1220 0 : });
1221 : (
1222 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1223 : req: e.req,
1224 : message: e.err.to_string(),
1225 : }),
1226 : None, // TODO: measure errors
1227 : )
1228 : }
1229 : },
1230 : Ok((response_msg, timer)) => (response_msg, Some(timer)),
1231 : };
1232 :
1233 : //
1234 : // marshal & transmit response message
1235 : //
1236 :
1237 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1238 : &response_msg.serialize(protocol_version),
1239 : ))?;
1240 :
1241 : // We purposefully don't count flush time into the timer.
1242 : //
1243 : // The reason is that current compute client will not perform protocol processing
1244 : // if the postgres backend process is doing things other than `->smgr_read()`.
1245 : // This is especially the case for prefetch.
1246 : //
1247 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1248 : // all the way into our flush call below.
1249 : //
1250 : // The timer's underlying metric is used for a storage-internal latency SLO and
1251 : // we don't want to include latency in it that we can't control.
1252 : // And as pointed out above, in this case, we don't control the time that flush will take.
1253 0 : let flushing_timer = timer.map(|mut timer| {
1254 0 : timer
1255 0 : .observe_execution_end_flush_start(Instant::now())
1256 0 : .expect("we are the first caller")
1257 0 : });
1258 :
1259 : // what we want to do
1260 : let flush_fut = pgb_writer.flush();
1261 : // metric for how long flushing takes
1262 : let flush_fut = match flushing_timer {
1263 : Some(flushing_timer) => {
1264 : futures::future::Either::Left(flushing_timer.measure(flush_fut))
1265 : }
1266 : None => futures::future::Either::Right(flush_fut),
1267 : };
1268 : // do it while respecting cancellation
1269 0 : let _: () = async move {
1270 0 : tokio::select! {
1271 : biased;
1272 0 : _ = cancel.cancelled() => {
1273 : // We were requested to shut down.
1274 0 : info!("shutdown request received in page handler");
1275 0 : return Err(QueryError::Shutdown)
1276 : }
1277 0 : res = flush_fut => {
1278 0 : res?;
1279 : }
1280 : }
1281 0 : Ok(())
1282 0 : }
1283 : // and log the info! line inside the request span
1284 : .instrument(span.clone())
1285 : .await?;
1286 : }
1287 : Ok(())
1288 : }
1289 :
1290 : /// Pagestream sub-protocol handler.
1291 : ///
1292 : /// It is a simple request-response protocol inside a COPYBOTH session.
1293 : ///
1294 : /// # Coding Discipline
1295 : ///
1296 : /// Coding discipline within this function: all interaction with the `pgb` connection
1297 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1298 : /// This is so that we can shutdown page_service quickly.
1299 : #[instrument(skip_all)]
1300 : async fn handle_pagerequests<IO>(
1301 : &mut self,
1302 : pgb: &mut PostgresBackend<IO>,
1303 : tenant_id: TenantId,
1304 : timeline_id: TimelineId,
1305 : protocol_version: PagestreamProtocolVersion,
1306 : ctx: RequestContext,
1307 : ) -> Result<(), QueryError>
1308 : where
1309 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1310 : {
1311 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1312 :
1313 : // switch client to COPYBOTH
1314 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1315 : tokio::select! {
1316 : biased;
1317 : _ = self.cancel.cancelled() => {
1318 : return Err(QueryError::Shutdown)
1319 : }
1320 : res = pgb.flush() => {
1321 : res?;
1322 : }
1323 : }
1324 :
1325 : let io_concurrency = IoConcurrency::spawn_from_conf(
1326 : self.conf,
1327 : match self.gate_guard.try_clone() {
1328 : Ok(guard) => guard,
1329 : Err(_) => {
1330 : info!("shutdown request received in page handler");
1331 : return Err(QueryError::Shutdown);
1332 : }
1333 : },
1334 : );
1335 :
1336 : let pgb_reader = pgb
1337 : .split()
1338 : .context("implementation error: split pgb into reader and writer")?;
1339 :
1340 : let timeline_handles = self
1341 : .timeline_handles
1342 : .take()
1343 : .expect("implementation error: timeline_handles should not be locked");
1344 :
1345 : let request_span = info_span!("request", shard_id = tracing::field::Empty);
1346 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1347 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1348 : self.handle_pagerequests_pipelined(
1349 : pgb,
1350 : pgb_reader,
1351 : tenant_id,
1352 : timeline_id,
1353 : timeline_handles,
1354 : request_span,
1355 : pipelining_config,
1356 : protocol_version,
1357 : io_concurrency,
1358 : &ctx,
1359 : )
1360 : .await
1361 : }
1362 : PageServicePipeliningConfig::Serial => {
1363 : self.handle_pagerequests_serial(
1364 : pgb,
1365 : pgb_reader,
1366 : tenant_id,
1367 : timeline_id,
1368 : timeline_handles,
1369 : request_span,
1370 : protocol_version,
1371 : io_concurrency,
1372 : &ctx,
1373 : )
1374 : .await
1375 : }
1376 : };
1377 :
1378 : debug!("pagestream subprotocol shut down cleanly");
1379 :
1380 : pgb.unsplit(pgb_reader)
1381 : .context("implementation error: unsplit pgb")?;
1382 :
1383 : let replaced = self.timeline_handles.replace(timeline_handles);
1384 : assert!(replaced.is_none());
1385 :
1386 : result
1387 : }
1388 :
1389 : #[allow(clippy::too_many_arguments)]
1390 0 : async fn handle_pagerequests_serial<IO>(
1391 0 : &mut self,
1392 0 : pgb_writer: &mut PostgresBackend<IO>,
1393 0 : mut pgb_reader: PostgresBackendReader<IO>,
1394 0 : tenant_id: TenantId,
1395 0 : timeline_id: TimelineId,
1396 0 : mut timeline_handles: TimelineHandles,
1397 0 : request_span: Span,
1398 0 : protocol_version: PagestreamProtocolVersion,
1399 0 : io_concurrency: IoConcurrency,
1400 0 : ctx: &RequestContext,
1401 0 : ) -> (
1402 0 : (PostgresBackendReader<IO>, TimelineHandles),
1403 0 : Result<(), QueryError>,
1404 0 : )
1405 0 : where
1406 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1407 0 : {
1408 0 : let cancel = self.cancel.clone();
1409 0 : let err = loop {
1410 0 : let msg = Self::pagestream_read_message(
1411 0 : &mut pgb_reader,
1412 0 : tenant_id,
1413 0 : timeline_id,
1414 0 : &mut timeline_handles,
1415 0 : &cancel,
1416 0 : ctx,
1417 0 : protocol_version,
1418 0 : request_span.clone(),
1419 0 : )
1420 0 : .await;
1421 0 : let msg = match msg {
1422 0 : Ok(msg) => msg,
1423 0 : Err(e) => break e,
1424 : };
1425 0 : let msg = match msg {
1426 0 : Some(msg) => msg,
1427 : None => {
1428 0 : debug!("pagestream subprotocol end observed");
1429 0 : return ((pgb_reader, timeline_handles), Ok(()));
1430 : }
1431 : };
1432 :
1433 0 : let err = self
1434 0 : .pagesteam_handle_batched_message(
1435 0 : pgb_writer,
1436 0 : msg,
1437 0 : io_concurrency.clone(),
1438 0 : &cancel,
1439 0 : protocol_version,
1440 0 : ctx,
1441 0 : )
1442 0 : .await;
1443 0 : match err {
1444 0 : Ok(()) => {}
1445 0 : Err(e) => break e,
1446 : }
1447 : };
1448 0 : ((pgb_reader, timeline_handles), Err(err))
1449 0 : }
1450 :
1451 : /// # Cancel-Safety
1452 : ///
1453 : /// May leak tokio tasks if not polled to completion.
1454 : #[allow(clippy::too_many_arguments)]
1455 0 : async fn handle_pagerequests_pipelined<IO>(
1456 0 : &mut self,
1457 0 : pgb_writer: &mut PostgresBackend<IO>,
1458 0 : pgb_reader: PostgresBackendReader<IO>,
1459 0 : tenant_id: TenantId,
1460 0 : timeline_id: TimelineId,
1461 0 : mut timeline_handles: TimelineHandles,
1462 0 : request_span: Span,
1463 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1464 0 : protocol_version: PagestreamProtocolVersion,
1465 0 : io_concurrency: IoConcurrency,
1466 0 : ctx: &RequestContext,
1467 0 : ) -> (
1468 0 : (PostgresBackendReader<IO>, TimelineHandles),
1469 0 : Result<(), QueryError>,
1470 0 : )
1471 0 : where
1472 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1473 0 : {
1474 0 : //
1475 0 : // Pipelined pagestream handling consists of
1476 0 : // - a Batcher that reads requests off the wire and
1477 0 : // and batches them if possible,
1478 0 : // - an Executor that processes the batched requests.
1479 0 : //
1480 0 : // The batch is built up inside an `spsc_fold` channel,
1481 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1482 0 : //
1483 0 : // The Batcher continously folds client requests into the batch,
1484 0 : // while the Executor can at any time take out what's in the batch
1485 0 : // in order to process it.
1486 0 : // This means the next batch builds up while the Executor
1487 0 : // executes the last batch.
1488 0 : //
1489 0 : // CANCELLATION
1490 0 : //
1491 0 : // We run both Batcher and Executor futures to completion before
1492 0 : // returning from this function.
1493 0 : //
1494 0 : // If Executor exits first, it signals cancellation to the Batcher
1495 0 : // via a CancellationToken that is child of `self.cancel`.
1496 0 : // If Batcher exits first, it signals cancellation to the Executor
1497 0 : // by dropping the spsc_fold channel Sender.
1498 0 : //
1499 0 : // CLEAN SHUTDOWN
1500 0 : //
1501 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1502 0 : // In response to such a client message, the Batcher exits.
1503 0 : // The Executor continues to run, draining the spsc_fold channel.
1504 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1505 0 : // indicating that the sender disconnected.
1506 0 : // The Executor exits with Ok(()) in response to that error.
1507 0 : //
1508 0 : // Server initiated shutdown is not clean shutdown, but instead
1509 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1510 0 : // error propagation.
1511 0 : //
1512 0 : // ERROR PROPAGATION
1513 0 : //
1514 0 : // When the Batcher encounter an error, it sends it as a value
1515 0 : // through the spsc_fold channel and exits afterwards.
1516 0 : // When the Executor observes such an error in the channel,
1517 0 : // it exits returning that error value.
1518 0 : //
1519 0 : // This design ensures that the Executor stage will still process
1520 0 : // the batch that was in flight when the Batcher encountered an error,
1521 0 : // thereby beahving identical to a serial implementation.
1522 0 :
1523 0 : let PageServicePipeliningConfigPipelined {
1524 0 : max_batch_size,
1525 0 : execution,
1526 0 : } = pipelining_config;
1527 :
1528 : // Macro to _define_ a pipeline stage.
1529 : macro_rules! pipeline_stage {
1530 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1531 : let cancel: CancellationToken = $cancel;
1532 : let stage_fut = $make_fut(cancel.clone());
1533 0 : async move {
1534 0 : scopeguard::defer! {
1535 0 : debug!("exiting");
1536 0 : }
1537 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1538 0 : .await
1539 0 : }
1540 : .instrument(tracing::info_span!($name))
1541 : }};
1542 : }
1543 :
1544 : //
1545 : // Batcher
1546 : //
1547 :
1548 0 : let cancel_batcher = self.cancel.child_token();
1549 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1550 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1551 0 : let ctx = ctx.attached_child();
1552 0 : async move {
1553 0 : let mut pgb_reader = pgb_reader;
1554 0 : let mut exit = false;
1555 0 : while !exit {
1556 0 : let read_res = Self::pagestream_read_message(
1557 0 : &mut pgb_reader,
1558 0 : tenant_id,
1559 0 : timeline_id,
1560 0 : &mut timeline_handles,
1561 0 : &cancel_batcher,
1562 0 : &ctx,
1563 0 : protocol_version,
1564 0 : request_span.clone(),
1565 0 : )
1566 0 : .await;
1567 0 : let Some(read_res) = read_res.transpose() else {
1568 0 : debug!("client-initiated shutdown");
1569 0 : break;
1570 : };
1571 0 : exit |= read_res.is_err();
1572 0 : let could_send = batch_tx
1573 0 : .send(read_res, |batch, res| {
1574 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1575 0 : })
1576 0 : .await;
1577 0 : exit |= could_send.is_err();
1578 : }
1579 0 : (pgb_reader, timeline_handles)
1580 0 : }
1581 0 : });
1582 :
1583 : //
1584 : // Executor
1585 : //
1586 :
1587 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1588 0 : let ctx = ctx.attached_child();
1589 0 : async move {
1590 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1591 : loop {
1592 0 : let maybe_batch = batch_rx.recv().await;
1593 0 : let batch = match maybe_batch {
1594 0 : Ok(batch) => batch,
1595 : Err(spsc_fold::RecvError::SenderGone) => {
1596 0 : debug!("upstream gone");
1597 0 : return Ok(());
1598 : }
1599 : };
1600 0 : let batch = match batch {
1601 0 : Ok(batch) => batch,
1602 0 : Err(e) => {
1603 0 : return Err(e);
1604 : }
1605 : };
1606 0 : self.pagesteam_handle_batched_message(
1607 0 : pgb_writer,
1608 0 : batch,
1609 0 : io_concurrency.clone(),
1610 0 : &cancel,
1611 0 : protocol_version,
1612 0 : &ctx,
1613 0 : )
1614 0 : .await?;
1615 : }
1616 0 : }
1617 0 : });
1618 :
1619 : //
1620 : // Execute the stages.
1621 : //
1622 :
1623 0 : match execution {
1624 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1625 0 : tokio::join!(batcher, executor)
1626 : }
1627 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1628 : // These tasks are not tracked anywhere.
1629 0 : let read_messages_task = tokio::spawn(batcher);
1630 0 : let (read_messages_task_res, executor_res_) =
1631 0 : tokio::join!(read_messages_task, executor,);
1632 0 : (
1633 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1634 0 : executor_res_,
1635 0 : )
1636 : }
1637 : }
1638 0 : }
1639 :
1640 : /// Helper function to handle the LSN from client request.
1641 : ///
1642 : /// Each GetPage (and Exists and Nblocks) request includes information about
1643 : /// which version of the page is being requested. The primary compute node
1644 : /// will always request the latest page version, by setting 'request_lsn' to
1645 : /// the last inserted or flushed WAL position, while a standby will request
1646 : /// a version at the LSN that it's currently caught up to.
1647 : ///
1648 : /// In either case, if the page server hasn't received the WAL up to the
1649 : /// requested LSN yet, we will wait for it to arrive. The return value is
1650 : /// the LSN that should be used to look up the page versions.
1651 : ///
1652 : /// In addition to the request LSN, each request carries another LSN,
1653 : /// 'not_modified_since', which is a hint to the pageserver that the client
1654 : /// knows that the page has not been modified between 'not_modified_since'
1655 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1656 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1657 : /// information about when the page was modified, it will use
1658 : /// not_modified_since == lsn. If the client lies and sends a too low
1659 : /// not_modified_hint such that there are in fact later page versions, the
1660 : /// behavior is undefined: the pageserver may return any of the page versions
1661 : /// or an error.
1662 0 : async fn wait_or_get_last_lsn(
1663 0 : timeline: &Timeline,
1664 0 : request_lsn: Lsn,
1665 0 : not_modified_since: Lsn,
1666 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1667 0 : ctx: &RequestContext,
1668 0 : ) -> Result<Lsn, PageStreamError> {
1669 0 : let last_record_lsn = timeline.get_last_record_lsn();
1670 0 :
1671 0 : // Sanity check the request
1672 0 : if request_lsn < not_modified_since {
1673 0 : return Err(PageStreamError::BadRequest(
1674 0 : format!(
1675 0 : "invalid request with request LSN {} and not_modified_since {}",
1676 0 : request_lsn, not_modified_since,
1677 0 : )
1678 0 : .into(),
1679 0 : ));
1680 0 : }
1681 0 :
1682 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1683 0 : if request_lsn == Lsn::INVALID {
1684 0 : return Err(PageStreamError::BadRequest(
1685 0 : "invalid LSN(0) in request".into(),
1686 0 : ));
1687 0 : }
1688 0 :
1689 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1690 0 : //
1691 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1692 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1693 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1694 0 : let gc_info = &timeline.gc_info.read().unwrap();
1695 0 : if !gc_info.leases.contains_key(&request_lsn) {
1696 0 : return Err(
1697 0 : PageStreamError::BadRequest(format!(
1698 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1699 0 : request_lsn, **latest_gc_cutoff_lsn
1700 0 : ).into())
1701 0 : );
1702 0 : }
1703 0 : }
1704 :
1705 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1706 0 : if not_modified_since > last_record_lsn {
1707 0 : timeline
1708 0 : .wait_lsn(
1709 0 : not_modified_since,
1710 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1711 0 : timeline::WaitLsnTimeout::Default,
1712 0 : ctx,
1713 0 : )
1714 0 : .await?;
1715 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1716 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1717 : // advance immediately after we return anyway)
1718 0 : Ok(not_modified_since)
1719 : } else {
1720 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1721 : // here instead. That would give the same result, since we know that there
1722 : // haven't been any modifications since 'not_modified_since'. Using an older
1723 : // LSN might be faster, because that could allow skipping recent layers when
1724 : // finding the page. However, we have historically used 'last_record_lsn', so
1725 : // stick to that for now.
1726 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1727 : }
1728 0 : }
1729 :
1730 : /// Handles the lsn lease request.
1731 : /// If a lease cannot be obtained, the client will receive NULL.
1732 : #[instrument(skip_all, fields(shard_id, %lsn))]
1733 : async fn handle_make_lsn_lease<IO>(
1734 : &mut self,
1735 : pgb: &mut PostgresBackend<IO>,
1736 : tenant_shard_id: TenantShardId,
1737 : timeline_id: TimelineId,
1738 : lsn: Lsn,
1739 : ctx: &RequestContext,
1740 : ) -> Result<(), QueryError>
1741 : where
1742 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1743 : {
1744 : let timeline = self
1745 : .timeline_handles
1746 : .as_mut()
1747 : .unwrap()
1748 : .get(
1749 : tenant_shard_id.tenant_id,
1750 : timeline_id,
1751 : ShardSelector::Known(tenant_shard_id.to_index()),
1752 : )
1753 : .await?;
1754 : set_tracing_field_shard_id(&timeline);
1755 :
1756 : let lease = timeline
1757 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1758 0 : .inspect_err(|e| {
1759 0 : warn!("{e}");
1760 0 : })
1761 : .ok();
1762 0 : let valid_until_str = lease.map(|l| {
1763 0 : l.valid_until
1764 0 : .duration_since(SystemTime::UNIX_EPOCH)
1765 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1766 0 : .as_millis()
1767 0 : .to_string()
1768 0 : });
1769 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1770 :
1771 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1772 : b"valid_until",
1773 : )]))?
1774 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1775 :
1776 : Ok(())
1777 : }
1778 :
1779 : #[instrument(skip_all, fields(shard_id))]
1780 : async fn handle_get_rel_exists_request(
1781 : &mut self,
1782 : timeline: &Timeline,
1783 : req: &PagestreamExistsRequest,
1784 : ctx: &RequestContext,
1785 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1786 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1787 : let lsn = Self::wait_or_get_last_lsn(
1788 : timeline,
1789 : req.hdr.request_lsn,
1790 : req.hdr.not_modified_since,
1791 : &latest_gc_cutoff_lsn,
1792 : ctx,
1793 : )
1794 : .await?;
1795 :
1796 : let exists = timeline
1797 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1798 : .await?;
1799 :
1800 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1801 : req: *req,
1802 : exists,
1803 : }))
1804 : }
1805 :
1806 : #[instrument(skip_all, fields(shard_id))]
1807 : async fn handle_get_nblocks_request(
1808 : &mut self,
1809 : timeline: &Timeline,
1810 : req: &PagestreamNblocksRequest,
1811 : ctx: &RequestContext,
1812 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1813 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1814 : let lsn = Self::wait_or_get_last_lsn(
1815 : timeline,
1816 : req.hdr.request_lsn,
1817 : req.hdr.not_modified_since,
1818 : &latest_gc_cutoff_lsn,
1819 : ctx,
1820 : )
1821 : .await?;
1822 :
1823 : let n_blocks = timeline
1824 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1825 : .await?;
1826 :
1827 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1828 : req: *req,
1829 : n_blocks,
1830 : }))
1831 : }
1832 :
1833 : #[instrument(skip_all, fields(shard_id))]
1834 : async fn handle_db_size_request(
1835 : &mut self,
1836 : timeline: &Timeline,
1837 : req: &PagestreamDbSizeRequest,
1838 : ctx: &RequestContext,
1839 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1840 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1841 : let lsn = Self::wait_or_get_last_lsn(
1842 : timeline,
1843 : req.hdr.request_lsn,
1844 : req.hdr.not_modified_since,
1845 : &latest_gc_cutoff_lsn,
1846 : ctx,
1847 : )
1848 : .await?;
1849 :
1850 : let total_blocks = timeline
1851 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1852 : .await?;
1853 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1854 :
1855 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1856 : req: *req,
1857 : db_size,
1858 : }))
1859 : }
1860 :
1861 : #[instrument(skip_all)]
1862 : async fn handle_get_page_at_lsn_request_batched(
1863 : &mut self,
1864 : timeline: &Timeline,
1865 : effective_lsn: Lsn,
1866 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
1867 : io_concurrency: IoConcurrency,
1868 : ctx: &RequestContext,
1869 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1870 : debug_assert_current_span_has_tenant_and_timeline_id();
1871 :
1872 : timeline
1873 : .query_metrics
1874 : .observe_getpage_batch_start(requests.len());
1875 :
1876 : // If a page trace is running, submit an event for this request.
1877 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
1878 : let time = SystemTime::now();
1879 : for batch in &requests {
1880 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
1881 : // Ignore error (trace buffer may be full or tracer may have disconnected).
1882 : _ = page_trace.try_send(PageTraceEvent {
1883 : key,
1884 : effective_lsn,
1885 : time,
1886 : });
1887 : }
1888 : }
1889 :
1890 : let results = timeline
1891 : .get_rel_page_at_lsn_batched(
1892 0 : requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
1893 : effective_lsn,
1894 : io_concurrency,
1895 : ctx,
1896 : )
1897 : .await;
1898 : assert_eq!(results.len(), requests.len());
1899 :
1900 : // TODO: avoid creating the new Vec here
1901 : Vec::from_iter(
1902 : requests
1903 : .into_iter()
1904 : .zip(results.into_iter())
1905 0 : .map(|(req, res)| {
1906 0 : res.map(|page| {
1907 0 : (
1908 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
1909 0 : req: req.req,
1910 0 : page,
1911 0 : }),
1912 0 : req.timer,
1913 0 : )
1914 0 : })
1915 0 : .map_err(|e| BatchedPageStreamError {
1916 0 : err: PageStreamError::from(e),
1917 0 : req: req.req.hdr,
1918 0 : })
1919 0 : }),
1920 : )
1921 : }
1922 :
1923 : #[instrument(skip_all, fields(shard_id))]
1924 : async fn handle_get_slru_segment_request(
1925 : &mut self,
1926 : timeline: &Timeline,
1927 : req: &PagestreamGetSlruSegmentRequest,
1928 : ctx: &RequestContext,
1929 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1930 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1931 : let lsn = Self::wait_or_get_last_lsn(
1932 : timeline,
1933 : req.hdr.request_lsn,
1934 : req.hdr.not_modified_since,
1935 : &latest_gc_cutoff_lsn,
1936 : ctx,
1937 : )
1938 : .await?;
1939 :
1940 : let kind = SlruKind::from_repr(req.kind)
1941 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1942 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1943 :
1944 : Ok(PagestreamBeMessage::GetSlruSegment(
1945 : PagestreamGetSlruSegmentResponse { req: *req, segment },
1946 : ))
1947 : }
1948 :
1949 : // NB: this impl mimics what we do for batched getpage requests.
1950 : #[cfg(feature = "testing")]
1951 : #[instrument(skip_all, fields(shard_id))]
1952 : async fn handle_test_request_batch(
1953 : &mut self,
1954 : timeline: &Timeline,
1955 : requests: Vec<BatchedTestRequest>,
1956 : _ctx: &RequestContext,
1957 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1958 : // real requests would do something with the timeline
1959 : let mut results = Vec::with_capacity(requests.len());
1960 : for _req in requests.iter() {
1961 : tokio::task::yield_now().await;
1962 :
1963 : results.push({
1964 : if timeline.cancel.is_cancelled() {
1965 : Err(PageReconstructError::Cancelled)
1966 : } else {
1967 : Ok(())
1968 : }
1969 : });
1970 : }
1971 :
1972 : // TODO: avoid creating the new Vec here
1973 : Vec::from_iter(
1974 : requests
1975 : .into_iter()
1976 : .zip(results.into_iter())
1977 0 : .map(|(req, res)| {
1978 0 : res.map(|()| {
1979 0 : (
1980 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
1981 0 : req: req.req.clone(),
1982 0 : }),
1983 0 : req.timer,
1984 0 : )
1985 0 : })
1986 0 : .map_err(|e| BatchedPageStreamError {
1987 0 : err: PageStreamError::from(e),
1988 0 : req: req.req.hdr,
1989 0 : })
1990 0 : }),
1991 : )
1992 : }
1993 :
1994 : /// Note on "fullbackup":
1995 : /// Full basebackups should only be used for debugging purposes.
1996 : /// Originally, it was introduced to enable breaking storage format changes,
1997 : /// but that is not applicable anymore.
1998 : ///
1999 : /// # Coding Discipline
2000 : ///
2001 : /// Coding discipline within this function: all interaction with the `pgb` connection
2002 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2003 : /// This is so that we can shutdown page_service quickly.
2004 : ///
2005 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2006 : /// to connection cancellation.
2007 : #[allow(clippy::too_many_arguments)]
2008 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2009 : async fn handle_basebackup_request<IO>(
2010 : &mut self,
2011 : pgb: &mut PostgresBackend<IO>,
2012 : tenant_id: TenantId,
2013 : timeline_id: TimelineId,
2014 : lsn: Option<Lsn>,
2015 : prev_lsn: Option<Lsn>,
2016 : full_backup: bool,
2017 : gzip: bool,
2018 : replica: bool,
2019 : ctx: &RequestContext,
2020 : ) -> Result<(), QueryError>
2021 : where
2022 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2023 : {
2024 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2025 0 : match err {
2026 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
2027 0 : BasebackupError::Server(e) => QueryError::Other(e),
2028 : }
2029 0 : }
2030 :
2031 : let started = std::time::Instant::now();
2032 :
2033 : let timeline = self
2034 : .timeline_handles
2035 : .as_mut()
2036 : .unwrap()
2037 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2038 : .await?;
2039 :
2040 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
2041 : if let Some(lsn) = lsn {
2042 : // Backup was requested at a particular LSN. Wait for it to arrive.
2043 : info!("waiting for {}", lsn);
2044 : timeline
2045 : .wait_lsn(
2046 : lsn,
2047 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2048 : crate::tenant::timeline::WaitLsnTimeout::Default,
2049 : ctx,
2050 : )
2051 : .await?;
2052 : timeline
2053 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2054 : .context("invalid basebackup lsn")?;
2055 : }
2056 :
2057 : let lsn_awaited_after = started.elapsed();
2058 :
2059 : // switch client to COPYOUT
2060 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2061 : .map_err(QueryError::Disconnected)?;
2062 : self.flush_cancellable(pgb, &self.cancel).await?;
2063 :
2064 : // Send a tarball of the latest layer on the timeline. Compress if not
2065 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2066 : if full_backup {
2067 : let mut writer = pgb.copyout_writer();
2068 : basebackup::send_basebackup_tarball(
2069 : &mut writer,
2070 : &timeline,
2071 : lsn,
2072 : prev_lsn,
2073 : full_backup,
2074 : replica,
2075 : ctx,
2076 : )
2077 : .await
2078 : .map_err(map_basebackup_error)?;
2079 : } else {
2080 : let mut writer = BufWriter::new(pgb.copyout_writer());
2081 : if gzip {
2082 : let mut encoder = GzipEncoder::with_quality(
2083 : &mut writer,
2084 : // NOTE using fast compression because it's on the critical path
2085 : // for compute startup. For an empty database, we get
2086 : // <100KB with this method. The Level::Best compression method
2087 : // gives us <20KB, but maybe we should add basebackup caching
2088 : // on compute shutdown first.
2089 : async_compression::Level::Fastest,
2090 : );
2091 : basebackup::send_basebackup_tarball(
2092 : &mut encoder,
2093 : &timeline,
2094 : lsn,
2095 : prev_lsn,
2096 : full_backup,
2097 : replica,
2098 : ctx,
2099 : )
2100 : .await
2101 : .map_err(map_basebackup_error)?;
2102 : // shutdown the encoder to ensure the gzip footer is written
2103 : encoder
2104 : .shutdown()
2105 : .await
2106 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2107 : } else {
2108 : basebackup::send_basebackup_tarball(
2109 : &mut writer,
2110 : &timeline,
2111 : lsn,
2112 : prev_lsn,
2113 : full_backup,
2114 : replica,
2115 : ctx,
2116 : )
2117 : .await
2118 : .map_err(map_basebackup_error)?;
2119 : }
2120 : writer
2121 : .flush()
2122 : .await
2123 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
2124 : }
2125 :
2126 : pgb.write_message_noflush(&BeMessage::CopyDone)
2127 : .map_err(QueryError::Disconnected)?;
2128 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2129 :
2130 : let basebackup_after = started
2131 : .elapsed()
2132 : .checked_sub(lsn_awaited_after)
2133 : .unwrap_or(Duration::ZERO);
2134 :
2135 : info!(
2136 : lsn_await_millis = lsn_awaited_after.as_millis(),
2137 : basebackup_millis = basebackup_after.as_millis(),
2138 : "basebackup complete"
2139 : );
2140 :
2141 : Ok(())
2142 : }
2143 :
2144 : // when accessing management api supply None as an argument
2145 : // when using to authorize tenant pass corresponding tenant id
2146 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2147 0 : if self.auth.is_none() {
2148 : // auth is set to Trust, nothing to check so just return ok
2149 0 : return Ok(());
2150 0 : }
2151 0 : // auth is some, just checked above, when auth is some
2152 0 : // then claims are always present because of checks during connection init
2153 0 : // so this expect won't trigger
2154 0 : let claims = self
2155 0 : .claims
2156 0 : .as_ref()
2157 0 : .expect("claims presence already checked");
2158 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2159 0 : }
2160 : }
2161 :
2162 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2163 : #[derive(Debug, Clone, Eq, PartialEq)]
2164 : struct BaseBackupCmd {
2165 : tenant_id: TenantId,
2166 : timeline_id: TimelineId,
2167 : lsn: Option<Lsn>,
2168 : gzip: bool,
2169 : replica: bool,
2170 : }
2171 :
2172 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2173 : #[derive(Debug, Clone, Eq, PartialEq)]
2174 : struct FullBackupCmd {
2175 : tenant_id: TenantId,
2176 : timeline_id: TimelineId,
2177 : lsn: Option<Lsn>,
2178 : prev_lsn: Option<Lsn>,
2179 : }
2180 :
2181 : /// `pagestream_v2 tenant timeline`
2182 : #[derive(Debug, Clone, Eq, PartialEq)]
2183 : struct PageStreamCmd {
2184 : tenant_id: TenantId,
2185 : timeline_id: TimelineId,
2186 : protocol_version: PagestreamProtocolVersion,
2187 : }
2188 :
2189 : /// `lease lsn tenant timeline lsn`
2190 : #[derive(Debug, Clone, Eq, PartialEq)]
2191 : struct LeaseLsnCmd {
2192 : tenant_shard_id: TenantShardId,
2193 : timeline_id: TimelineId,
2194 : lsn: Lsn,
2195 : }
2196 :
2197 : #[derive(Debug, Clone, Eq, PartialEq)]
2198 : enum PageServiceCmd {
2199 : Set,
2200 : PageStream(PageStreamCmd),
2201 : BaseBackup(BaseBackupCmd),
2202 : FullBackup(FullBackupCmd),
2203 : LeaseLsn(LeaseLsnCmd),
2204 : }
2205 :
2206 : impl PageStreamCmd {
2207 12 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2208 12 : let parameters = query.split_whitespace().collect_vec();
2209 12 : if parameters.len() != 2 {
2210 4 : bail!(
2211 4 : "invalid number of parameters for pagestream command: {}",
2212 4 : query
2213 4 : );
2214 8 : }
2215 8 : let tenant_id = TenantId::from_str(parameters[0])
2216 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2217 4 : let timeline_id = TimelineId::from_str(parameters[1])
2218 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2219 4 : Ok(Self {
2220 4 : tenant_id,
2221 4 : timeline_id,
2222 4 : protocol_version,
2223 4 : })
2224 12 : }
2225 : }
2226 :
2227 : impl FullBackupCmd {
2228 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2229 8 : let parameters = query.split_whitespace().collect_vec();
2230 8 : if parameters.len() < 2 || parameters.len() > 4 {
2231 0 : bail!(
2232 0 : "invalid number of parameters for basebackup command: {}",
2233 0 : query
2234 0 : );
2235 8 : }
2236 8 : let tenant_id = TenantId::from_str(parameters[0])
2237 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2238 8 : let timeline_id = TimelineId::from_str(parameters[1])
2239 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2240 : // The caller is responsible for providing correct lsn and prev_lsn.
2241 8 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2242 : Some(
2243 4 : Lsn::from_str(lsn_str)
2244 4 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2245 : )
2246 : } else {
2247 4 : None
2248 : };
2249 8 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2250 : Some(
2251 4 : Lsn::from_str(prev_lsn_str)
2252 4 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2253 : )
2254 : } else {
2255 4 : None
2256 : };
2257 8 : Ok(Self {
2258 8 : tenant_id,
2259 8 : timeline_id,
2260 8 : lsn,
2261 8 : prev_lsn,
2262 8 : })
2263 8 : }
2264 : }
2265 :
2266 : impl BaseBackupCmd {
2267 36 : fn parse(query: &str) -> anyhow::Result<Self> {
2268 36 : let parameters = query.split_whitespace().collect_vec();
2269 36 : if parameters.len() < 2 {
2270 0 : bail!(
2271 0 : "invalid number of parameters for basebackup command: {}",
2272 0 : query
2273 0 : );
2274 36 : }
2275 36 : let tenant_id = TenantId::from_str(parameters[0])
2276 36 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2277 36 : let timeline_id = TimelineId::from_str(parameters[1])
2278 36 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2279 : let lsn;
2280 : let flags_parse_from;
2281 36 : if let Some(maybe_lsn) = parameters.get(2) {
2282 32 : if *maybe_lsn == "latest" {
2283 4 : lsn = None;
2284 4 : flags_parse_from = 3;
2285 28 : } else if maybe_lsn.starts_with("--") {
2286 20 : lsn = None;
2287 20 : flags_parse_from = 2;
2288 20 : } else {
2289 : lsn = Some(
2290 8 : Lsn::from_str(maybe_lsn)
2291 8 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2292 : );
2293 8 : flags_parse_from = 3;
2294 : }
2295 4 : } else {
2296 4 : lsn = None;
2297 4 : flags_parse_from = 2;
2298 4 : }
2299 :
2300 36 : let mut gzip = false;
2301 36 : let mut replica = false;
2302 :
2303 44 : for ¶m in ¶meters[flags_parse_from..] {
2304 44 : match param {
2305 44 : "--gzip" => {
2306 28 : if gzip {
2307 4 : bail!("duplicate parameter for basebackup command: {param}")
2308 24 : }
2309 24 : gzip = true
2310 : }
2311 16 : "--replica" => {
2312 8 : if replica {
2313 0 : bail!("duplicate parameter for basebackup command: {param}")
2314 8 : }
2315 8 : replica = true
2316 : }
2317 8 : _ => bail!("invalid parameter for basebackup command: {param}"),
2318 : }
2319 : }
2320 24 : Ok(Self {
2321 24 : tenant_id,
2322 24 : timeline_id,
2323 24 : lsn,
2324 24 : gzip,
2325 24 : replica,
2326 24 : })
2327 36 : }
2328 : }
2329 :
2330 : impl LeaseLsnCmd {
2331 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2332 8 : let parameters = query.split_whitespace().collect_vec();
2333 8 : if parameters.len() != 3 {
2334 0 : bail!(
2335 0 : "invalid number of parameters for lease lsn command: {}",
2336 0 : query
2337 0 : );
2338 8 : }
2339 8 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2340 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2341 8 : let timeline_id = TimelineId::from_str(parameters[1])
2342 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2343 8 : let lsn = Lsn::from_str(parameters[2])
2344 8 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2345 8 : Ok(Self {
2346 8 : tenant_shard_id,
2347 8 : timeline_id,
2348 8 : lsn,
2349 8 : })
2350 8 : }
2351 : }
2352 :
2353 : impl PageServiceCmd {
2354 84 : fn parse(query: &str) -> anyhow::Result<Self> {
2355 84 : let query = query.trim();
2356 84 : let Some((cmd, other)) = query.split_once(' ') else {
2357 8 : bail!("cannot parse query: {query}")
2358 : };
2359 76 : match cmd.to_ascii_lowercase().as_str() {
2360 76 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2361 12 : other,
2362 12 : PagestreamProtocolVersion::V2,
2363 12 : )?)),
2364 64 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2365 0 : other,
2366 0 : PagestreamProtocolVersion::V3,
2367 0 : )?)),
2368 64 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2369 28 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2370 20 : "lease" => {
2371 12 : let Some((cmd2, other)) = other.split_once(' ') else {
2372 0 : bail!("invalid lease command: {cmd}");
2373 : };
2374 12 : let cmd2 = cmd2.to_ascii_lowercase();
2375 12 : if cmd2 == "lsn" {
2376 8 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2377 : } else {
2378 4 : bail!("invalid lease command: {cmd}");
2379 : }
2380 : }
2381 8 : "set" => Ok(Self::Set),
2382 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2383 : }
2384 84 : }
2385 : }
2386 :
2387 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2388 : where
2389 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2390 : {
2391 0 : fn check_auth_jwt(
2392 0 : &mut self,
2393 0 : _pgb: &mut PostgresBackend<IO>,
2394 0 : jwt_response: &[u8],
2395 0 : ) -> Result<(), QueryError> {
2396 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2397 : // which requires auth to be present
2398 0 : let data = self
2399 0 : .auth
2400 0 : .as_ref()
2401 0 : .unwrap()
2402 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2403 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2404 :
2405 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2406 0 : return Err(QueryError::Unauthorized(
2407 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2408 0 : ));
2409 0 : }
2410 0 :
2411 0 : debug!(
2412 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2413 : data.claims.scope, data.claims.tenant_id,
2414 : );
2415 :
2416 0 : self.claims = Some(data.claims);
2417 0 : Ok(())
2418 0 : }
2419 :
2420 0 : fn startup(
2421 0 : &mut self,
2422 0 : _pgb: &mut PostgresBackend<IO>,
2423 0 : _sm: &FeStartupPacket,
2424 0 : ) -> Result<(), QueryError> {
2425 0 : fail::fail_point!("ps::connection-start::startup-packet");
2426 0 : Ok(())
2427 0 : }
2428 :
2429 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2430 : async fn process_query(
2431 : &mut self,
2432 : pgb: &mut PostgresBackend<IO>,
2433 : query_string: &str,
2434 : ) -> Result<(), QueryError> {
2435 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2436 0 : info!("Hit failpoint for bad connection");
2437 0 : Err(QueryError::SimulatedConnectionError)
2438 0 : });
2439 :
2440 : fail::fail_point!("ps::connection-start::process-query");
2441 :
2442 : let ctx = self.connection_ctx.attached_child();
2443 : debug!("process query {query_string}");
2444 : let query = PageServiceCmd::parse(query_string)?;
2445 : match query {
2446 : PageServiceCmd::PageStream(PageStreamCmd {
2447 : tenant_id,
2448 : timeline_id,
2449 : protocol_version,
2450 : }) => {
2451 : tracing::Span::current()
2452 : .record("tenant_id", field::display(tenant_id))
2453 : .record("timeline_id", field::display(timeline_id));
2454 :
2455 : self.check_permission(Some(tenant_id))?;
2456 : let command_kind = match protocol_version {
2457 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2458 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2459 : };
2460 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2461 :
2462 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2463 : .await?;
2464 : }
2465 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2466 : tenant_id,
2467 : timeline_id,
2468 : lsn,
2469 : gzip,
2470 : replica,
2471 : }) => {
2472 : tracing::Span::current()
2473 : .record("tenant_id", field::display(tenant_id))
2474 : .record("timeline_id", field::display(timeline_id));
2475 :
2476 : self.check_permission(Some(tenant_id))?;
2477 :
2478 : COMPUTE_COMMANDS_COUNTERS
2479 : .for_command(ComputeCommandKind::Basebackup)
2480 : .inc();
2481 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2482 0 : let res = async {
2483 0 : self.handle_basebackup_request(
2484 0 : pgb,
2485 0 : tenant_id,
2486 0 : timeline_id,
2487 0 : lsn,
2488 0 : None,
2489 0 : false,
2490 0 : gzip,
2491 0 : replica,
2492 0 : &ctx,
2493 0 : )
2494 0 : .await?;
2495 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2496 0 : Result::<(), QueryError>::Ok(())
2497 0 : }
2498 : .await;
2499 : metric_recording.observe(&res);
2500 : res?;
2501 : }
2502 : // same as basebackup, but result includes relational data as well
2503 : PageServiceCmd::FullBackup(FullBackupCmd {
2504 : tenant_id,
2505 : timeline_id,
2506 : lsn,
2507 : prev_lsn,
2508 : }) => {
2509 : tracing::Span::current()
2510 : .record("tenant_id", field::display(tenant_id))
2511 : .record("timeline_id", field::display(timeline_id));
2512 :
2513 : self.check_permission(Some(tenant_id))?;
2514 :
2515 : COMPUTE_COMMANDS_COUNTERS
2516 : .for_command(ComputeCommandKind::Fullbackup)
2517 : .inc();
2518 :
2519 : // Check that the timeline exists
2520 : self.handle_basebackup_request(
2521 : pgb,
2522 : tenant_id,
2523 : timeline_id,
2524 : lsn,
2525 : prev_lsn,
2526 : true,
2527 : false,
2528 : false,
2529 : &ctx,
2530 : )
2531 : .await?;
2532 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2533 : }
2534 : PageServiceCmd::Set => {
2535 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2536 : // on connect
2537 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2538 : }
2539 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2540 : tenant_shard_id,
2541 : timeline_id,
2542 : lsn,
2543 : }) => {
2544 : tracing::Span::current()
2545 : .record("tenant_id", field::display(tenant_shard_id))
2546 : .record("timeline_id", field::display(timeline_id));
2547 :
2548 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2549 :
2550 : COMPUTE_COMMANDS_COUNTERS
2551 : .for_command(ComputeCommandKind::LeaseLsn)
2552 : .inc();
2553 :
2554 : match self
2555 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2556 : .await
2557 : {
2558 : Ok(()) => {
2559 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2560 : }
2561 : Err(e) => {
2562 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2563 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2564 : &e.to_string(),
2565 : Some(e.pg_error_code()),
2566 : ))?
2567 : }
2568 : };
2569 : }
2570 : }
2571 :
2572 : Ok(())
2573 : }
2574 : }
2575 :
2576 : impl From<GetActiveTenantError> for QueryError {
2577 0 : fn from(e: GetActiveTenantError) -> Self {
2578 0 : match e {
2579 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2580 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2581 0 : ),
2582 : GetActiveTenantError::Cancelled
2583 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2584 0 : QueryError::Shutdown
2585 : }
2586 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2587 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2588 : }
2589 0 : }
2590 : }
2591 :
2592 : #[derive(Debug, thiserror::Error)]
2593 : pub(crate) enum GetActiveTimelineError {
2594 : #[error(transparent)]
2595 : Tenant(GetActiveTenantError),
2596 : #[error(transparent)]
2597 : Timeline(#[from] GetTimelineError),
2598 : }
2599 :
2600 : impl From<GetActiveTimelineError> for QueryError {
2601 0 : fn from(e: GetActiveTimelineError) -> Self {
2602 0 : match e {
2603 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2604 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2605 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2606 : }
2607 0 : }
2608 : }
2609 :
2610 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
2611 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
2612 0 : match e {
2613 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
2614 0 : }
2615 0 : }
2616 : }
2617 :
2618 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2619 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2620 0 : tracing::Span::current().record(
2621 0 : "shard_id",
2622 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2623 0 : );
2624 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2625 0 : }
2626 :
2627 : struct WaitedForLsn(Lsn);
2628 : impl From<WaitedForLsn> for Lsn {
2629 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2630 0 : lsn
2631 0 : }
2632 : }
2633 :
2634 : #[cfg(test)]
2635 : mod tests {
2636 : use utils::shard::ShardCount;
2637 :
2638 : use super::*;
2639 :
2640 : #[test]
2641 4 : fn pageservice_cmd_parse() {
2642 4 : let tenant_id = TenantId::generate();
2643 4 : let timeline_id = TimelineId::generate();
2644 4 : let cmd =
2645 4 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2646 4 : assert_eq!(
2647 4 : cmd,
2648 4 : PageServiceCmd::PageStream(PageStreamCmd {
2649 4 : tenant_id,
2650 4 : timeline_id,
2651 4 : protocol_version: PagestreamProtocolVersion::V2,
2652 4 : })
2653 4 : );
2654 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2655 4 : assert_eq!(
2656 4 : cmd,
2657 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2658 4 : tenant_id,
2659 4 : timeline_id,
2660 4 : lsn: None,
2661 4 : gzip: false,
2662 4 : replica: false
2663 4 : })
2664 4 : );
2665 4 : let cmd =
2666 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2667 4 : assert_eq!(
2668 4 : cmd,
2669 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2670 4 : tenant_id,
2671 4 : timeline_id,
2672 4 : lsn: None,
2673 4 : gzip: true,
2674 4 : replica: false
2675 4 : })
2676 4 : );
2677 4 : let cmd =
2678 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2679 4 : assert_eq!(
2680 4 : cmd,
2681 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2682 4 : tenant_id,
2683 4 : timeline_id,
2684 4 : lsn: None,
2685 4 : gzip: false,
2686 4 : replica: false
2687 4 : })
2688 4 : );
2689 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2690 4 : .unwrap();
2691 4 : assert_eq!(
2692 4 : cmd,
2693 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2694 4 : tenant_id,
2695 4 : timeline_id,
2696 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2697 4 : gzip: false,
2698 4 : replica: false
2699 4 : })
2700 4 : );
2701 4 : let cmd = PageServiceCmd::parse(&format!(
2702 4 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2703 4 : ))
2704 4 : .unwrap();
2705 4 : assert_eq!(
2706 4 : cmd,
2707 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2708 4 : tenant_id,
2709 4 : timeline_id,
2710 4 : lsn: None,
2711 4 : gzip: true,
2712 4 : replica: true
2713 4 : })
2714 4 : );
2715 4 : let cmd = PageServiceCmd::parse(&format!(
2716 4 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2717 4 : ))
2718 4 : .unwrap();
2719 4 : assert_eq!(
2720 4 : cmd,
2721 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2722 4 : tenant_id,
2723 4 : timeline_id,
2724 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2725 4 : gzip: true,
2726 4 : replica: true
2727 4 : })
2728 4 : );
2729 4 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2730 4 : assert_eq!(
2731 4 : cmd,
2732 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2733 4 : tenant_id,
2734 4 : timeline_id,
2735 4 : lsn: None,
2736 4 : prev_lsn: None
2737 4 : })
2738 4 : );
2739 4 : let cmd = PageServiceCmd::parse(&format!(
2740 4 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2741 4 : ))
2742 4 : .unwrap();
2743 4 : assert_eq!(
2744 4 : cmd,
2745 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2746 4 : tenant_id,
2747 4 : timeline_id,
2748 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2749 4 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2750 4 : })
2751 4 : );
2752 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2753 4 : let cmd = PageServiceCmd::parse(&format!(
2754 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2755 4 : ))
2756 4 : .unwrap();
2757 4 : assert_eq!(
2758 4 : cmd,
2759 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2760 4 : tenant_shard_id,
2761 4 : timeline_id,
2762 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2763 4 : })
2764 4 : );
2765 4 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2766 4 : let cmd = PageServiceCmd::parse(&format!(
2767 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2768 4 : ))
2769 4 : .unwrap();
2770 4 : assert_eq!(
2771 4 : cmd,
2772 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2773 4 : tenant_shard_id,
2774 4 : timeline_id,
2775 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2776 4 : })
2777 4 : );
2778 4 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2779 4 : assert_eq!(cmd, PageServiceCmd::Set);
2780 4 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2781 4 : assert_eq!(cmd, PageServiceCmd::Set);
2782 4 : }
2783 :
2784 : #[test]
2785 4 : fn pageservice_cmd_err_handling() {
2786 4 : let tenant_id = TenantId::generate();
2787 4 : let timeline_id = TimelineId::generate();
2788 4 : let cmd = PageServiceCmd::parse("unknown_command");
2789 4 : assert!(cmd.is_err());
2790 4 : let cmd = PageServiceCmd::parse("pagestream_v2");
2791 4 : assert!(cmd.is_err());
2792 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2793 4 : assert!(cmd.is_err());
2794 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2795 4 : assert!(cmd.is_err());
2796 4 : let cmd = PageServiceCmd::parse(&format!(
2797 4 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2798 4 : ));
2799 4 : assert!(cmd.is_err());
2800 4 : let cmd = PageServiceCmd::parse(&format!(
2801 4 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2802 4 : ));
2803 4 : assert!(cmd.is_err());
2804 4 : let cmd = PageServiceCmd::parse(&format!(
2805 4 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2806 4 : ));
2807 4 : assert!(cmd.is_err());
2808 4 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2809 4 : assert!(cmd.is_err());
2810 4 : }
2811 : }
|