Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::config::{
11 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
12 : PageServiceProtocolPipelinedExecutionStrategy,
13 : };
14 : use pageserver_api::models::{self, TenantState};
15 : use pageserver_api::models::{
16 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
17 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
18 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
19 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
20 : PagestreamProtocolVersion, PagestreamRequest,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{
24 : is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError,
25 : };
26 : use pq_proto::framed::ConnectionError;
27 : use pq_proto::FeStartupPacket;
28 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
29 : use std::borrow::Cow;
30 : use std::io;
31 : use std::num::NonZeroUsize;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::SystemTime;
36 : use std::time::{Duration, Instant};
37 : use tokio::io::{AsyncRead, AsyncWrite};
38 : use tokio::io::{AsyncWriteExt, BufWriter};
39 : use tokio::task::JoinHandle;
40 : use tokio_util::sync::CancellationToken;
41 : use tracing::*;
42 : use utils::sync::gate::{Gate, GateGuard};
43 : use utils::sync::spsc_fold;
44 : use utils::{
45 : auth::{Claims, Scope, SwappableJwtAuth},
46 : id::{TenantId, TimelineId},
47 : lsn::Lsn,
48 : simple_rcu::RcuReadGuard,
49 : };
50 :
51 : use crate::auth::check_permission;
52 : use crate::basebackup::BasebackupError;
53 : use crate::config::PageServerConf;
54 : use crate::context::{DownloadBehavior, RequestContext};
55 : use crate::metrics::{self, SmgrOpTimer};
56 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
57 : use crate::pgdatadir_mapping::Version;
58 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
59 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
60 : use crate::task_mgr::TaskKind;
61 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
62 : use crate::tenant::mgr::ShardSelector;
63 : use crate::tenant::mgr::TenantManager;
64 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
65 : use crate::tenant::storage_layer::IoConcurrency;
66 : use crate::tenant::timeline::{self, WaitLsnError};
67 : use crate::tenant::GetTimelineError;
68 : use crate::tenant::PageReconstructError;
69 : use crate::tenant::Timeline;
70 : use crate::{basebackup, timed_after_cancellation};
71 : use pageserver_api::key::rel_block_to_key;
72 : use pageserver_api::models::PageTraceEvent;
73 : use pageserver_api::reltag::SlruKind;
74 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
75 : use postgres_ffi::BLCKSZ;
76 :
77 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
78 : /// is not yet in state [`TenantState::Active`].
79 : ///
80 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
81 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
82 :
83 : ///////////////////////////////////////////////////////////////////////////////
84 :
85 : pub struct Listener {
86 : cancel: CancellationToken,
87 : /// Cancel the listener task through `listen_cancel` to shut down the listener
88 : /// and get a handle on the existing connections.
89 : task: JoinHandle<Connections>,
90 : }
91 :
92 : pub struct Connections {
93 : cancel: CancellationToken,
94 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
95 : gate: Gate,
96 : }
97 :
98 0 : pub fn spawn(
99 0 : conf: &'static PageServerConf,
100 0 : tenant_manager: Arc<TenantManager>,
101 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
102 0 : tcp_listener: tokio::net::TcpListener,
103 0 : ) -> Listener {
104 0 : let cancel = CancellationToken::new();
105 0 : let libpq_ctx = RequestContext::todo_child(
106 0 : TaskKind::LibpqEndpointListener,
107 0 : // listener task shouldn't need to download anything. (We will
108 0 : // create a separate sub-contexts for each connection, with their
109 0 : // own download behavior. This context is used only to listen and
110 0 : // accept connections.)
111 0 : DownloadBehavior::Error,
112 0 : );
113 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
114 0 : "libpq listener",
115 0 : libpq_listener_main(
116 0 : conf,
117 0 : tenant_manager,
118 0 : pg_auth,
119 0 : tcp_listener,
120 0 : conf.pg_auth_type,
121 0 : conf.page_service_pipelining.clone(),
122 0 : libpq_ctx,
123 0 : cancel.clone(),
124 0 : )
125 0 : .map(anyhow::Ok),
126 0 : ));
127 0 :
128 0 : Listener { cancel, task }
129 0 : }
130 :
131 : impl Listener {
132 0 : pub async fn stop_accepting(self) -> Connections {
133 0 : self.cancel.cancel();
134 0 : self.task
135 0 : .await
136 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
137 0 : }
138 : }
139 : impl Connections {
140 0 : pub(crate) async fn shutdown(self) {
141 0 : let Self {
142 0 : cancel,
143 0 : mut tasks,
144 0 : gate,
145 0 : } = self;
146 0 : cancel.cancel();
147 0 : while let Some(res) = tasks.join_next().await {
148 0 : Self::handle_connection_completion(res);
149 0 : }
150 0 : gate.close().await;
151 0 : }
152 :
153 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
154 0 : match res {
155 0 : Ok(Ok(())) => {}
156 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
157 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
158 : }
159 0 : }
160 : }
161 :
162 : ///
163 : /// Main loop of the page service.
164 : ///
165 : /// Listens for connections, and launches a new handler task for each.
166 : ///
167 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
168 : /// open connections.
169 : ///
170 : #[allow(clippy::too_many_arguments)]
171 0 : pub async fn libpq_listener_main(
172 0 : conf: &'static PageServerConf,
173 0 : tenant_manager: Arc<TenantManager>,
174 0 : auth: Option<Arc<SwappableJwtAuth>>,
175 0 : listener: tokio::net::TcpListener,
176 0 : auth_type: AuthType,
177 0 : pipelining_config: PageServicePipeliningConfig,
178 0 : listener_ctx: RequestContext,
179 0 : listener_cancel: CancellationToken,
180 0 : ) -> Connections {
181 0 : let connections_cancel = CancellationToken::new();
182 0 : let connections_gate = Gate::default();
183 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
184 :
185 : loop {
186 0 : let gate_guard = match connections_gate.enter() {
187 0 : Ok(guard) => guard,
188 0 : Err(_) => break,
189 : };
190 :
191 0 : let accepted = tokio::select! {
192 : biased;
193 0 : _ = listener_cancel.cancelled() => break,
194 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
195 0 : let res = next.expect("we dont poll while empty");
196 0 : Connections::handle_connection_completion(res);
197 0 : continue;
198 : }
199 0 : accepted = listener.accept() => accepted,
200 0 : };
201 0 :
202 0 : match accepted {
203 0 : Ok((socket, peer_addr)) => {
204 0 : // Connection established. Spawn a new task to handle it.
205 0 : debug!("accepted connection from {}", peer_addr);
206 0 : let local_auth = auth.clone();
207 0 : let connection_ctx = listener_ctx
208 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
209 0 : connection_handler_tasks.spawn(page_service_conn_main(
210 0 : conf,
211 0 : tenant_manager.clone(),
212 0 : local_auth,
213 0 : socket,
214 0 : auth_type,
215 0 : pipelining_config.clone(),
216 0 : connection_ctx,
217 0 : connections_cancel.child_token(),
218 0 : gate_guard,
219 0 : ));
220 : }
221 0 : Err(err) => {
222 0 : // accept() failed. Log the error, and loop back to retry on next connection.
223 0 : error!("accept() failed: {:?}", err);
224 : }
225 : }
226 : }
227 :
228 0 : debug!("page_service listener loop terminated");
229 :
230 0 : Connections {
231 0 : cancel: connections_cancel,
232 0 : tasks: connection_handler_tasks,
233 0 : gate: connections_gate,
234 0 : }
235 0 : }
236 :
237 : type ConnectionHandlerResult = anyhow::Result<()>;
238 :
239 : #[instrument(skip_all, fields(peer_addr))]
240 : #[allow(clippy::too_many_arguments)]
241 : async fn page_service_conn_main(
242 : conf: &'static PageServerConf,
243 : tenant_manager: Arc<TenantManager>,
244 : auth: Option<Arc<SwappableJwtAuth>>,
245 : socket: tokio::net::TcpStream,
246 : auth_type: AuthType,
247 : pipelining_config: PageServicePipeliningConfig,
248 : connection_ctx: RequestContext,
249 : cancel: CancellationToken,
250 : gate_guard: GateGuard,
251 : ) -> ConnectionHandlerResult {
252 : let _guard = LIVE_CONNECTIONS
253 : .with_label_values(&["page_service"])
254 : .guard();
255 :
256 : socket
257 : .set_nodelay(true)
258 : .context("could not set TCP_NODELAY")?;
259 :
260 : let peer_addr = socket.peer_addr().context("get peer address")?;
261 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
262 :
263 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
264 : // - long enough for most valid compute connections
265 : // - less than infinite to stop us from "leaking" connections to long-gone computes
266 : //
267 : // no write timeout is used, because the kernel is assumed to error writes after some time.
268 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
269 :
270 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
271 0 : let socket_timeout_ms = (|| {
272 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
273 : // Exponential distribution for simulating
274 : // poor network conditions, expect about avg_timeout_ms to be around 15
275 : // in tests
276 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
277 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
278 0 : let u = rand::random::<f32>();
279 0 : ((1.0 - u).ln() / (-avg)) as u64
280 : } else {
281 0 : default_timeout_ms
282 : }
283 0 : });
284 0 : default_timeout_ms
285 : })();
286 :
287 : // A timeout here does not mean the client died, it can happen if it's just idle for
288 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
289 : // they reconnect.
290 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
291 : let socket = Box::pin(socket);
292 :
293 : fail::fail_point!("ps::connection-start::pre-login");
294 :
295 : // XXX: pgbackend.run() should take the connection_ctx,
296 : // and create a child per-query context when it invokes process_query.
297 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
298 : // and create the per-query context in process_query ourselves.
299 : let mut conn_handler = PageServerHandler::new(
300 : conf,
301 : tenant_manager,
302 : auth,
303 : pipelining_config,
304 : connection_ctx,
305 : cancel.clone(),
306 : gate_guard,
307 : );
308 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
309 :
310 : match pgbackend.run(&mut conn_handler, &cancel).await {
311 : Ok(()) => {
312 : // we've been requested to shut down
313 : Ok(())
314 : }
315 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
316 : if is_expected_io_error(&io_error) {
317 : info!("Postgres client disconnected ({io_error})");
318 : Ok(())
319 : } else {
320 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
321 : Err(io_error).context(format!(
322 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
323 : tenant_id, peer_addr
324 : ))
325 : }
326 : }
327 : other => {
328 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
329 : other.context(format!(
330 : "Postgres query error for tenant_id={:?} client peer_addr={}",
331 : tenant_id, peer_addr
332 : ))
333 : }
334 : }
335 : }
336 :
337 : struct PageServerHandler {
338 : conf: &'static PageServerConf,
339 : auth: Option<Arc<SwappableJwtAuth>>,
340 : claims: Option<Claims>,
341 :
342 : /// The context created for the lifetime of the connection
343 : /// services by this PageServerHandler.
344 : /// For each query received over the connection,
345 : /// `process_query` creates a child context from this one.
346 : connection_ctx: RequestContext,
347 :
348 : cancel: CancellationToken,
349 :
350 : /// None only while pagestream protocol is being processed.
351 : timeline_handles: Option<TimelineHandles>,
352 :
353 : pipelining_config: PageServicePipeliningConfig,
354 :
355 : gate_guard: GateGuard,
356 : }
357 :
358 : struct TimelineHandles {
359 : wrapper: TenantManagerWrapper,
360 : /// Note on size: the typical size of this map is 1. The largest size we expect
361 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
362 : /// or the ratio used when splitting shards (i.e. how many children created from one)
363 : /// parent shard, where a "large" number might be ~8.
364 : handles: timeline::handle::Cache<TenantManagerTypes>,
365 : }
366 :
367 : impl TimelineHandles {
368 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
369 0 : Self {
370 0 : wrapper: TenantManagerWrapper {
371 0 : tenant_manager,
372 0 : tenant_id: OnceCell::new(),
373 0 : },
374 0 : handles: Default::default(),
375 0 : }
376 0 : }
377 0 : async fn get(
378 0 : &mut self,
379 0 : tenant_id: TenantId,
380 0 : timeline_id: TimelineId,
381 0 : shard_selector: ShardSelector,
382 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
383 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
384 0 : return Err(GetActiveTimelineError::Tenant(
385 0 : GetActiveTenantError::SwitchedTenant,
386 0 : ));
387 0 : }
388 0 : self.handles
389 0 : .get(timeline_id, shard_selector, &self.wrapper)
390 0 : .await
391 0 : .map_err(|e| match e {
392 0 : timeline::handle::GetError::TenantManager(e) => e,
393 : timeline::handle::GetError::TimelineGateClosed => {
394 0 : trace!("timeline gate closed");
395 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
396 : }
397 : timeline::handle::GetError::PerTimelineStateShutDown => {
398 0 : trace!("per-timeline state shut down");
399 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
400 : }
401 0 : })
402 0 : }
403 :
404 0 : fn tenant_id(&self) -> Option<TenantId> {
405 0 : self.wrapper.tenant_id.get().copied()
406 0 : }
407 : }
408 :
409 : pub(crate) struct TenantManagerWrapper {
410 : tenant_manager: Arc<TenantManager>,
411 : // We do not support switching tenant_id on a connection at this point.
412 : // We can can add support for this later if needed without changing
413 : // the protocol.
414 : tenant_id: once_cell::sync::OnceCell<TenantId>,
415 : }
416 :
417 : #[derive(Debug)]
418 : pub(crate) struct TenantManagerTypes;
419 :
420 : impl timeline::handle::Types for TenantManagerTypes {
421 : type TenantManagerError = GetActiveTimelineError;
422 : type TenantManager = TenantManagerWrapper;
423 : type Timeline = Arc<Timeline>;
424 : }
425 :
426 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
427 0 : fn gate(&self) -> &utils::sync::gate::Gate {
428 0 : &self.gate
429 0 : }
430 :
431 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
432 0 : Timeline::shard_timeline_id(self)
433 0 : }
434 :
435 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
436 0 : &self.handles
437 0 : }
438 :
439 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
440 0 : Timeline::get_shard_identity(self)
441 0 : }
442 : }
443 :
444 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
445 0 : async fn resolve(
446 0 : &self,
447 0 : timeline_id: TimelineId,
448 0 : shard_selector: ShardSelector,
449 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
450 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
451 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
452 0 : let wait_start = Instant::now();
453 0 : let deadline = wait_start + timeout;
454 0 : let tenant_shard = loop {
455 0 : let resolved = self
456 0 : .tenant_manager
457 0 : .resolve_attached_shard(tenant_id, shard_selector);
458 0 : match resolved {
459 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
460 : ShardResolveResult::NotFound => {
461 0 : return Err(GetActiveTimelineError::Tenant(
462 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
463 0 : ));
464 : }
465 0 : ShardResolveResult::InProgress(barrier) => {
466 0 : // We can't authoritatively answer right now: wait for InProgress state
467 0 : // to end, then try again
468 0 : tokio::select! {
469 0 : _ = barrier.wait() => {
470 0 : // The barrier completed: proceed around the loop to try looking up again
471 0 : },
472 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
473 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
474 0 : latest_state: None,
475 0 : wait_time: timeout,
476 0 : }));
477 : }
478 : }
479 : }
480 : };
481 : };
482 :
483 0 : tracing::debug!("Waiting for tenant to enter active state...");
484 0 : tenant_shard
485 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
486 0 : .await
487 0 : .map_err(GetActiveTimelineError::Tenant)?;
488 :
489 0 : let timeline = tenant_shard
490 0 : .get_timeline(timeline_id, true)
491 0 : .map_err(GetActiveTimelineError::Timeline)?;
492 0 : Ok(timeline)
493 0 : }
494 : }
495 :
496 : #[derive(thiserror::Error, Debug)]
497 : enum PageStreamError {
498 : /// We encountered an error that should prompt the client to reconnect:
499 : /// in practice this means we drop the connection without sending a response.
500 : #[error("Reconnect required: {0}")]
501 : Reconnect(Cow<'static, str>),
502 :
503 : /// We were instructed to shutdown while processing the query
504 : #[error("Shutting down")]
505 : Shutdown,
506 :
507 : /// Something went wrong reading a page: this likely indicates a pageserver bug
508 : #[error("Read error")]
509 : Read(#[source] PageReconstructError),
510 :
511 : /// Ran out of time waiting for an LSN
512 : #[error("LSN timeout: {0}")]
513 : LsnTimeout(WaitLsnError),
514 :
515 : /// The entity required to serve the request (tenant or timeline) is not found,
516 : /// or is not found in a suitable state to serve a request.
517 : #[error("Not found: {0}")]
518 : NotFound(Cow<'static, str>),
519 :
520 : /// Request asked for something that doesn't make sense, like an invalid LSN
521 : #[error("Bad request: {0}")]
522 : BadRequest(Cow<'static, str>),
523 : }
524 :
525 : impl From<PageReconstructError> for PageStreamError {
526 0 : fn from(value: PageReconstructError) -> Self {
527 0 : match value {
528 0 : PageReconstructError::Cancelled => Self::Shutdown,
529 0 : e => Self::Read(e),
530 : }
531 0 : }
532 : }
533 :
534 : impl From<GetActiveTimelineError> for PageStreamError {
535 0 : fn from(value: GetActiveTimelineError) -> Self {
536 0 : match value {
537 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
538 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
539 : TenantState::Stopping { .. },
540 : ))
541 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
542 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
543 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
544 : }
545 0 : }
546 : }
547 :
548 : impl From<WaitLsnError> for PageStreamError {
549 0 : fn from(value: WaitLsnError) -> Self {
550 0 : match value {
551 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
552 0 : WaitLsnError::Shutdown => Self::Shutdown,
553 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
554 : }
555 0 : }
556 : }
557 :
558 : impl From<WaitLsnError> for QueryError {
559 0 : fn from(value: WaitLsnError) -> Self {
560 0 : match value {
561 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
562 0 : WaitLsnError::Shutdown => Self::Shutdown,
563 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
564 : }
565 0 : }
566 : }
567 :
568 : #[derive(thiserror::Error, Debug)]
569 : struct BatchedPageStreamError {
570 : req: PagestreamRequest,
571 : err: PageStreamError,
572 : }
573 :
574 : impl std::fmt::Display for BatchedPageStreamError {
575 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
576 0 : self.err.fmt(f)
577 0 : }
578 : }
579 :
580 : struct BatchedGetPageRequest {
581 : req: PagestreamGetPageRequest,
582 : timer: SmgrOpTimer,
583 : }
584 :
585 : #[cfg(feature = "testing")]
586 : struct BatchedTestRequest {
587 : req: models::PagestreamTestRequest,
588 : timer: SmgrOpTimer,
589 : }
590 :
591 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
592 : /// so that we don't keep the [`Timeline::gate`] open while the batch
593 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
594 : enum BatchedFeMessage {
595 : Exists {
596 : span: Span,
597 : timer: SmgrOpTimer,
598 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
599 : req: models::PagestreamExistsRequest,
600 : },
601 : Nblocks {
602 : span: Span,
603 : timer: SmgrOpTimer,
604 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
605 : req: models::PagestreamNblocksRequest,
606 : },
607 : GetPage {
608 : span: Span,
609 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
610 : effective_request_lsn: Lsn,
611 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
612 : },
613 : DbSize {
614 : span: Span,
615 : timer: SmgrOpTimer,
616 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
617 : req: models::PagestreamDbSizeRequest,
618 : },
619 : GetSlruSegment {
620 : span: Span,
621 : timer: SmgrOpTimer,
622 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
623 : req: models::PagestreamGetSlruSegmentRequest,
624 : },
625 : #[cfg(feature = "testing")]
626 : Test {
627 : span: Span,
628 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
629 : requests: Vec<BatchedTestRequest>,
630 : },
631 : RespondError {
632 : span: Span,
633 : error: BatchedPageStreamError,
634 : },
635 : }
636 :
637 : impl BatchedFeMessage {
638 0 : fn observe_execution_start(&mut self, at: Instant) {
639 0 : match self {
640 0 : BatchedFeMessage::Exists { timer, .. }
641 0 : | BatchedFeMessage::Nblocks { timer, .. }
642 0 : | BatchedFeMessage::DbSize { timer, .. }
643 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
644 0 : timer.observe_execution_start(at);
645 0 : }
646 0 : BatchedFeMessage::GetPage { pages, .. } => {
647 0 : for page in pages {
648 0 : page.timer.observe_execution_start(at);
649 0 : }
650 : }
651 : #[cfg(feature = "testing")]
652 0 : BatchedFeMessage::Test { requests, .. } => {
653 0 : for req in requests {
654 0 : req.timer.observe_execution_start(at);
655 0 : }
656 : }
657 0 : BatchedFeMessage::RespondError { .. } => {}
658 : }
659 0 : }
660 : }
661 :
662 : impl PageServerHandler {
663 0 : pub fn new(
664 0 : conf: &'static PageServerConf,
665 0 : tenant_manager: Arc<TenantManager>,
666 0 : auth: Option<Arc<SwappableJwtAuth>>,
667 0 : pipelining_config: PageServicePipeliningConfig,
668 0 : connection_ctx: RequestContext,
669 0 : cancel: CancellationToken,
670 0 : gate_guard: GateGuard,
671 0 : ) -> Self {
672 0 : PageServerHandler {
673 0 : conf,
674 0 : auth,
675 0 : claims: None,
676 0 : connection_ctx,
677 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
678 0 : cancel,
679 0 : pipelining_config,
680 0 : gate_guard,
681 0 : }
682 0 : }
683 :
684 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
685 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
686 : /// cancellation if there aren't any timelines in the cache.
687 : ///
688 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
689 : /// timeline cancellation token.
690 0 : async fn flush_cancellable<IO>(
691 0 : &self,
692 0 : pgb: &mut PostgresBackend<IO>,
693 0 : cancel: &CancellationToken,
694 0 : ) -> Result<(), QueryError>
695 0 : where
696 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
697 0 : {
698 0 : tokio::select!(
699 0 : flush_r = pgb.flush() => {
700 0 : Ok(flush_r?)
701 : },
702 0 : _ = cancel.cancelled() => {
703 0 : Err(QueryError::Shutdown)
704 : }
705 : )
706 0 : }
707 :
708 : #[allow(clippy::too_many_arguments)]
709 0 : async fn pagestream_read_message<IO>(
710 0 : pgb: &mut PostgresBackendReader<IO>,
711 0 : tenant_id: TenantId,
712 0 : timeline_id: TimelineId,
713 0 : timeline_handles: &mut TimelineHandles,
714 0 : cancel: &CancellationToken,
715 0 : ctx: &RequestContext,
716 0 : protocol_version: PagestreamProtocolVersion,
717 0 : parent_span: Span,
718 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
719 0 : where
720 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
721 0 : {
722 0 : let msg = tokio::select! {
723 : biased;
724 0 : _ = cancel.cancelled() => {
725 0 : return Err(QueryError::Shutdown)
726 : }
727 0 : msg = pgb.read_message() => { msg }
728 0 : };
729 0 :
730 0 : let received_at = Instant::now();
731 :
732 0 : let copy_data_bytes = match msg? {
733 0 : Some(FeMessage::CopyData(bytes)) => bytes,
734 : Some(FeMessage::Terminate) => {
735 0 : return Ok(None);
736 : }
737 0 : Some(m) => {
738 0 : return Err(QueryError::Other(anyhow::anyhow!(
739 0 : "unexpected message: {m:?} during COPY"
740 0 : )));
741 : }
742 : None => {
743 0 : return Ok(None);
744 : } // client disconnected
745 : };
746 0 : trace!("query: {copy_data_bytes:?}");
747 :
748 0 : fail::fail_point!("ps::handle-pagerequest-message");
749 :
750 : // parse request
751 0 : let neon_fe_msg =
752 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
753 :
754 : // TODO: turn in to async closure once available to avoid repeating received_at
755 0 : async fn record_op_start_and_throttle(
756 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
757 0 : op: metrics::SmgrQueryType,
758 0 : received_at: Instant,
759 0 : ) -> Result<SmgrOpTimer, QueryError> {
760 0 : // It's important to start the smgr op metric recorder as early as possible
761 0 : // so that the _started counters are incremented before we do
762 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
763 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
764 0 : let now = Instant::now();
765 0 : timer.observe_throttle_start(now);
766 0 : let throttled = tokio::select! {
767 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
768 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
769 : };
770 0 : timer.observe_throttle_done(throttled);
771 0 : Ok(timer)
772 0 : }
773 :
774 0 : let batched_msg = match neon_fe_msg {
775 0 : PagestreamFeMessage::Exists(req) => {
776 0 : let shard = timeline_handles
777 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
778 0 : .await?;
779 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
780 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
781 0 : let timer = record_op_start_and_throttle(
782 0 : &shard,
783 0 : metrics::SmgrQueryType::GetRelExists,
784 0 : received_at,
785 0 : )
786 0 : .await?;
787 0 : BatchedFeMessage::Exists {
788 0 : span,
789 0 : timer,
790 0 : shard: shard.downgrade(),
791 0 : req,
792 0 : }
793 : }
794 0 : PagestreamFeMessage::Nblocks(req) => {
795 0 : let shard = timeline_handles
796 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
797 0 : .await?;
798 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
799 0 : let timer = record_op_start_and_throttle(
800 0 : &shard,
801 0 : metrics::SmgrQueryType::GetRelSize,
802 0 : received_at,
803 0 : )
804 0 : .await?;
805 0 : BatchedFeMessage::Nblocks {
806 0 : span,
807 0 : timer,
808 0 : shard: shard.downgrade(),
809 0 : req,
810 0 : }
811 : }
812 0 : PagestreamFeMessage::DbSize(req) => {
813 0 : let shard = timeline_handles
814 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
815 0 : .await?;
816 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
817 0 : let timer = record_op_start_and_throttle(
818 0 : &shard,
819 0 : metrics::SmgrQueryType::GetDbSize,
820 0 : received_at,
821 0 : )
822 0 : .await?;
823 0 : BatchedFeMessage::DbSize {
824 0 : span,
825 0 : timer,
826 0 : shard: shard.downgrade(),
827 0 : req,
828 0 : }
829 : }
830 0 : PagestreamFeMessage::GetSlruSegment(req) => {
831 0 : let shard = timeline_handles
832 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
833 0 : .await?;
834 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
835 0 : let timer = record_op_start_and_throttle(
836 0 : &shard,
837 0 : metrics::SmgrQueryType::GetSlruSegment,
838 0 : received_at,
839 0 : )
840 0 : .await?;
841 0 : BatchedFeMessage::GetSlruSegment {
842 0 : span,
843 0 : timer,
844 0 : shard: shard.downgrade(),
845 0 : req,
846 0 : }
847 : }
848 0 : PagestreamFeMessage::GetPage(req) => {
849 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
850 : macro_rules! mkspan {
851 : (before shard routing) => {{
852 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
853 : }};
854 : ($shard_id:expr) => {{
855 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
856 : }};
857 : }
858 :
859 : macro_rules! respond_error {
860 : ($span:expr, $error:expr) => {{
861 : let error = BatchedFeMessage::RespondError {
862 : span: $span,
863 : error: BatchedPageStreamError {
864 : req: req.hdr,
865 : err: $error,
866 : },
867 : };
868 : Ok(Some(error))
869 : }};
870 : }
871 :
872 0 : let key = rel_block_to_key(req.rel, req.blkno);
873 0 : let shard = match timeline_handles
874 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
875 0 : .await
876 : {
877 0 : Ok(tl) => tl,
878 0 : Err(e) => {
879 0 : let span = mkspan!(before shard routing);
880 0 : match e {
881 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
882 : // We already know this tenant exists in general, because we resolved it at
883 : // start of connection. Getting a NotFound here indicates that the shard containing
884 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
885 : // mapping is out of date.
886 : //
887 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
888 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
889 : // and talk to a different pageserver.
890 0 : return respond_error!(
891 0 : span,
892 0 : PageStreamError::Reconnect(
893 0 : "getpage@lsn request routed to wrong shard".into()
894 0 : )
895 0 : );
896 : }
897 0 : e => {
898 0 : return respond_error!(span, e.into());
899 : }
900 : }
901 : }
902 : };
903 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
904 :
905 0 : let timer = record_op_start_and_throttle(
906 0 : &shard,
907 0 : metrics::SmgrQueryType::GetPageAtLsn,
908 0 : received_at,
909 0 : )
910 0 : .await?;
911 :
912 : // We're holding the Handle
913 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
914 0 : &shard,
915 0 : req.hdr.request_lsn,
916 0 : req.hdr.not_modified_since,
917 0 : &shard.get_latest_gc_cutoff_lsn(),
918 0 : ctx,
919 0 : )
920 0 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
921 0 : .await
922 : {
923 0 : Ok(lsn) => lsn,
924 0 : Err(e) => {
925 0 : return respond_error!(span, e);
926 : }
927 : };
928 : BatchedFeMessage::GetPage {
929 0 : span,
930 0 : shard: shard.downgrade(),
931 0 : effective_request_lsn,
932 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
933 : }
934 : }
935 : #[cfg(feature = "testing")]
936 0 : PagestreamFeMessage::Test(req) => {
937 0 : let shard = timeline_handles
938 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
939 0 : .await?;
940 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
941 0 : let timer =
942 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
943 0 : .await?;
944 0 : BatchedFeMessage::Test {
945 0 : span,
946 0 : shard: shard.downgrade(),
947 0 : requests: vec![BatchedTestRequest { req, timer }],
948 0 : }
949 : }
950 : };
951 0 : Ok(Some(batched_msg))
952 0 : }
953 :
954 : /// Post-condition: `batch` is Some()
955 : #[instrument(skip_all, level = tracing::Level::TRACE)]
956 : #[allow(clippy::boxed_local)]
957 : fn pagestream_do_batch(
958 : max_batch_size: NonZeroUsize,
959 : batch: &mut Result<BatchedFeMessage, QueryError>,
960 : this_msg: Result<BatchedFeMessage, QueryError>,
961 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
962 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
963 :
964 : let this_msg = match this_msg {
965 : Ok(this_msg) => this_msg,
966 : Err(e) => return Err(Err(e)),
967 : };
968 :
969 : match (&mut *batch, this_msg) {
970 : // something batched already, let's see if we can add this message to the batch
971 : (
972 : Ok(BatchedFeMessage::GetPage {
973 : span: _,
974 : shard: accum_shard,
975 : pages: ref mut accum_pages,
976 : effective_request_lsn: accum_lsn,
977 : }),
978 : BatchedFeMessage::GetPage {
979 : span: _,
980 : shard: this_shard,
981 : pages: this_pages,
982 : effective_request_lsn: this_lsn,
983 : },
984 0 : ) if (|| {
985 0 : assert_eq!(this_pages.len(), 1);
986 0 : if accum_pages.len() >= max_batch_size.get() {
987 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
988 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
989 0 : return false;
990 0 : }
991 0 : if !accum_shard.is_same_handle_as(&this_shard) {
992 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
993 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
994 : // But the current logic for keeping responses in order does not support that.
995 0 : return false;
996 0 : }
997 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
998 0 : // as the effective request_lsn changes
999 0 : if *accum_lsn != this_lsn {
1000 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
1001 0 : return false;
1002 0 : }
1003 0 : true
1004 : })() =>
1005 : {
1006 : // ok to batch
1007 : accum_pages.extend(this_pages);
1008 : Ok(())
1009 : }
1010 : #[cfg(feature = "testing")]
1011 : (
1012 : Ok(BatchedFeMessage::Test {
1013 : shard: accum_shard,
1014 : requests: accum_requests,
1015 : ..
1016 : }),
1017 : BatchedFeMessage::Test {
1018 : shard: this_shard,
1019 : requests: this_requests,
1020 : ..
1021 : },
1022 0 : ) if (|| {
1023 0 : assert!(this_requests.len() == 1);
1024 0 : if accum_requests.len() >= max_batch_size.get() {
1025 0 : trace!(%max_batch_size, "stopping batching because of batch size");
1026 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
1027 0 : return false;
1028 0 : }
1029 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1030 0 : trace!("stopping batching because timeline object mismatch");
1031 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1032 : // But the current logic for keeping responses in order does not support that.
1033 0 : return false;
1034 0 : }
1035 0 : let this_batch_key = this_requests[0].req.batch_key;
1036 0 : let accum_batch_key = accum_requests[0].req.batch_key;
1037 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
1038 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
1039 0 : return false;
1040 0 : }
1041 0 : true
1042 : })() =>
1043 : {
1044 : // ok to batch
1045 : accum_requests.extend(this_requests);
1046 : Ok(())
1047 : }
1048 : // something batched already but this message is unbatchable
1049 : (_, this_msg) => {
1050 : // by default, don't continue batching
1051 : Err(Ok(this_msg))
1052 : }
1053 : }
1054 : }
1055 :
1056 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1057 : async fn pagesteam_handle_batched_message<IO>(
1058 : &mut self,
1059 : pgb_writer: &mut PostgresBackend<IO>,
1060 : batch: BatchedFeMessage,
1061 : io_concurrency: IoConcurrency,
1062 : cancel: &CancellationToken,
1063 : protocol_version: PagestreamProtocolVersion,
1064 : ctx: &RequestContext,
1065 : ) -> Result<(), QueryError>
1066 : where
1067 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1068 : {
1069 : let started_at = Instant::now();
1070 : let batch = {
1071 : let mut batch = batch;
1072 : batch.observe_execution_start(started_at);
1073 : batch
1074 : };
1075 :
1076 : // invoke handler function
1077 : let (mut handler_results, span): (
1078 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1079 : _,
1080 : ) = match batch {
1081 : BatchedFeMessage::Exists {
1082 : span,
1083 : timer,
1084 : shard,
1085 : req,
1086 : } => {
1087 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1088 : (
1089 : vec![self
1090 : .handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
1091 : .instrument(span.clone())
1092 : .await
1093 0 : .map(|msg| (msg, timer))
1094 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1095 : span,
1096 : )
1097 : }
1098 : BatchedFeMessage::Nblocks {
1099 : span,
1100 : timer,
1101 : shard,
1102 : req,
1103 : } => {
1104 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1105 : (
1106 : vec![self
1107 : .handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
1108 : .instrument(span.clone())
1109 : .await
1110 0 : .map(|msg| (msg, timer))
1111 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1112 : span,
1113 : )
1114 : }
1115 : BatchedFeMessage::GetPage {
1116 : span,
1117 : shard,
1118 : effective_request_lsn,
1119 : pages,
1120 : } => {
1121 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1122 : (
1123 : {
1124 : let npages = pages.len();
1125 : trace!(npages, "handling getpage request");
1126 : let res = self
1127 : .handle_get_page_at_lsn_request_batched(
1128 : &*shard.upgrade()?,
1129 : effective_request_lsn,
1130 : pages,
1131 : io_concurrency,
1132 : ctx,
1133 : )
1134 : .instrument(span.clone())
1135 : .await;
1136 : assert_eq!(res.len(), npages);
1137 : res
1138 : },
1139 : span,
1140 : )
1141 : }
1142 : BatchedFeMessage::DbSize {
1143 : span,
1144 : timer,
1145 : shard,
1146 : req,
1147 : } => {
1148 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1149 : (
1150 : vec![self
1151 : .handle_db_size_request(&*shard.upgrade()?, &req, ctx)
1152 : .instrument(span.clone())
1153 : .await
1154 0 : .map(|msg| (msg, timer))
1155 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1156 : span,
1157 : )
1158 : }
1159 : BatchedFeMessage::GetSlruSegment {
1160 : span,
1161 : timer,
1162 : shard,
1163 : req,
1164 : } => {
1165 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1166 : (
1167 : vec![self
1168 : .handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
1169 : .instrument(span.clone())
1170 : .await
1171 0 : .map(|msg| (msg, timer))
1172 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1173 : span,
1174 : )
1175 : }
1176 : #[cfg(feature = "testing")]
1177 : BatchedFeMessage::Test {
1178 : span,
1179 : shard,
1180 : requests,
1181 : } => {
1182 : fail::fail_point!("ps::handle-pagerequest-message::test");
1183 : (
1184 : {
1185 : let npages = requests.len();
1186 : trace!(npages, "handling getpage request");
1187 : let res = self
1188 : .handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
1189 : .instrument(span.clone())
1190 : .await;
1191 : assert_eq!(res.len(), npages);
1192 : res
1193 : },
1194 : span,
1195 : )
1196 : }
1197 : BatchedFeMessage::RespondError { span, error } => {
1198 : // We've already decided to respond with an error, so we don't need to
1199 : // call the handler.
1200 : (vec![Err(error)], span)
1201 : }
1202 : };
1203 :
1204 : // We purposefully don't count flush time into the smgr operation timer.
1205 : //
1206 : // The reason is that current compute client will not perform protocol processing
1207 : // if the postgres backend process is doing things other than `->smgr_read()`.
1208 : // This is especially the case for prefetch.
1209 : //
1210 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1211 : // all the way into our flush call below.
1212 : //
1213 : // The timer's underlying metric is used for a storage-internal latency SLO and
1214 : // we don't want to include latency in it that we can't control.
1215 : // And as pointed out above, in this case, we don't control the time that flush will take.
1216 : //
1217 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1218 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1219 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1220 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1221 : let flush_timers = {
1222 : let flushing_start_time = Instant::now();
1223 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1224 : for handler_result in &mut handler_results {
1225 : let flush_timer = match handler_result {
1226 : Ok((_, timer)) => Some(
1227 : timer
1228 : .observe_execution_end(flushing_start_time)
1229 : .expect("we are the first caller"),
1230 : ),
1231 : Err(_) => {
1232 : // TODO: measure errors
1233 : None
1234 : }
1235 : };
1236 : flush_timers.push(flush_timer);
1237 : }
1238 : assert_eq!(flush_timers.len(), handler_results.len());
1239 : flush_timers
1240 : };
1241 :
1242 : // Map handler result to protocol behavior.
1243 : // Some handler errors cause exit from pagestream protocol.
1244 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1245 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1246 : let response_msg = match handler_result {
1247 : Err(e) => match &e.err {
1248 : PageStreamError::Shutdown => {
1249 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1250 : // shutdown, then do not send the error to the client. Instead just drop the
1251 : // connection.
1252 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1253 : return Err(QueryError::Shutdown);
1254 : }
1255 : PageStreamError::Reconnect(reason) => {
1256 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1257 : return Err(QueryError::Reconnect);
1258 : }
1259 : PageStreamError::Read(_)
1260 : | PageStreamError::LsnTimeout(_)
1261 : | PageStreamError::NotFound(_)
1262 : | PageStreamError::BadRequest(_) => {
1263 : // print the all details to the log with {:#}, but for the client the
1264 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1265 : // here includes cancellation which is not an error.
1266 : let full = utils::error::report_compact_sources(&e.err);
1267 0 : span.in_scope(|| {
1268 0 : error!("error reading relation or page version: {full:#}")
1269 0 : });
1270 :
1271 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1272 : req: e.req,
1273 : message: e.err.to_string(),
1274 : })
1275 : }
1276 : },
1277 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1278 : };
1279 :
1280 : //
1281 : // marshal & transmit response message
1282 : //
1283 :
1284 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1285 : &response_msg.serialize(protocol_version),
1286 : ))?;
1287 :
1288 : // what we want to do
1289 : let flush_fut = pgb_writer.flush();
1290 : // metric for how long flushing takes
1291 : let flush_fut = match flushing_timer {
1292 : Some(flushing_timer) => {
1293 : futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut))
1294 : }
1295 : None => futures::future::Either::Right(flush_fut),
1296 : };
1297 : // do it while respecting cancellation
1298 0 : let _: () = async move {
1299 0 : tokio::select! {
1300 : biased;
1301 0 : _ = cancel.cancelled() => {
1302 : // We were requested to shut down.
1303 0 : info!("shutdown request received in page handler");
1304 0 : return Err(QueryError::Shutdown)
1305 : }
1306 0 : res = flush_fut => {
1307 0 : res?;
1308 : }
1309 : }
1310 0 : Ok(())
1311 0 : }
1312 : .await?;
1313 : }
1314 : Ok(())
1315 : }
1316 :
1317 : /// Pagestream sub-protocol handler.
1318 : ///
1319 : /// It is a simple request-response protocol inside a COPYBOTH session.
1320 : ///
1321 : /// # Coding Discipline
1322 : ///
1323 : /// Coding discipline within this function: all interaction with the `pgb` connection
1324 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1325 : /// This is so that we can shutdown page_service quickly.
1326 : #[instrument(skip_all)]
1327 : async fn handle_pagerequests<IO>(
1328 : &mut self,
1329 : pgb: &mut PostgresBackend<IO>,
1330 : tenant_id: TenantId,
1331 : timeline_id: TimelineId,
1332 : protocol_version: PagestreamProtocolVersion,
1333 : ctx: RequestContext,
1334 : ) -> Result<(), QueryError>
1335 : where
1336 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1337 : {
1338 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1339 :
1340 : // switch client to COPYBOTH
1341 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1342 : tokio::select! {
1343 : biased;
1344 : _ = self.cancel.cancelled() => {
1345 : return Err(QueryError::Shutdown)
1346 : }
1347 : res = pgb.flush() => {
1348 : res?;
1349 : }
1350 : }
1351 :
1352 : let io_concurrency = IoConcurrency::spawn_from_conf(
1353 : self.conf,
1354 : match self.gate_guard.try_clone() {
1355 : Ok(guard) => guard,
1356 : Err(_) => {
1357 : info!("shutdown request received in page handler");
1358 : return Err(QueryError::Shutdown);
1359 : }
1360 : },
1361 : );
1362 :
1363 : let pgb_reader = pgb
1364 : .split()
1365 : .context("implementation error: split pgb into reader and writer")?;
1366 :
1367 : let timeline_handles = self
1368 : .timeline_handles
1369 : .take()
1370 : .expect("implementation error: timeline_handles should not be locked");
1371 :
1372 : let request_span = info_span!("request");
1373 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1374 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1375 : self.handle_pagerequests_pipelined(
1376 : pgb,
1377 : pgb_reader,
1378 : tenant_id,
1379 : timeline_id,
1380 : timeline_handles,
1381 : request_span,
1382 : pipelining_config,
1383 : protocol_version,
1384 : io_concurrency,
1385 : &ctx,
1386 : )
1387 : .await
1388 : }
1389 : PageServicePipeliningConfig::Serial => {
1390 : self.handle_pagerequests_serial(
1391 : pgb,
1392 : pgb_reader,
1393 : tenant_id,
1394 : timeline_id,
1395 : timeline_handles,
1396 : request_span,
1397 : protocol_version,
1398 : io_concurrency,
1399 : &ctx,
1400 : )
1401 : .await
1402 : }
1403 : };
1404 :
1405 : debug!("pagestream subprotocol shut down cleanly");
1406 :
1407 : pgb.unsplit(pgb_reader)
1408 : .context("implementation error: unsplit pgb")?;
1409 :
1410 : let replaced = self.timeline_handles.replace(timeline_handles);
1411 : assert!(replaced.is_none());
1412 :
1413 : result
1414 : }
1415 :
1416 : #[allow(clippy::too_many_arguments)]
1417 0 : async fn handle_pagerequests_serial<IO>(
1418 0 : &mut self,
1419 0 : pgb_writer: &mut PostgresBackend<IO>,
1420 0 : mut pgb_reader: PostgresBackendReader<IO>,
1421 0 : tenant_id: TenantId,
1422 0 : timeline_id: TimelineId,
1423 0 : mut timeline_handles: TimelineHandles,
1424 0 : request_span: Span,
1425 0 : protocol_version: PagestreamProtocolVersion,
1426 0 : io_concurrency: IoConcurrency,
1427 0 : ctx: &RequestContext,
1428 0 : ) -> (
1429 0 : (PostgresBackendReader<IO>, TimelineHandles),
1430 0 : Result<(), QueryError>,
1431 0 : )
1432 0 : where
1433 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1434 0 : {
1435 0 : let cancel = self.cancel.clone();
1436 0 : let err = loop {
1437 0 : let msg = Self::pagestream_read_message(
1438 0 : &mut pgb_reader,
1439 0 : tenant_id,
1440 0 : timeline_id,
1441 0 : &mut timeline_handles,
1442 0 : &cancel,
1443 0 : ctx,
1444 0 : protocol_version,
1445 0 : request_span.clone(),
1446 0 : )
1447 0 : .await;
1448 0 : let msg = match msg {
1449 0 : Ok(msg) => msg,
1450 0 : Err(e) => break e,
1451 : };
1452 0 : let msg = match msg {
1453 0 : Some(msg) => msg,
1454 : None => {
1455 0 : debug!("pagestream subprotocol end observed");
1456 0 : return ((pgb_reader, timeline_handles), Ok(()));
1457 : }
1458 : };
1459 :
1460 0 : let err = self
1461 0 : .pagesteam_handle_batched_message(
1462 0 : pgb_writer,
1463 0 : msg,
1464 0 : io_concurrency.clone(),
1465 0 : &cancel,
1466 0 : protocol_version,
1467 0 : ctx,
1468 0 : )
1469 0 : .await;
1470 0 : match err {
1471 0 : Ok(()) => {}
1472 0 : Err(e) => break e,
1473 : }
1474 : };
1475 0 : ((pgb_reader, timeline_handles), Err(err))
1476 0 : }
1477 :
1478 : /// # Cancel-Safety
1479 : ///
1480 : /// May leak tokio tasks if not polled to completion.
1481 : #[allow(clippy::too_many_arguments)]
1482 0 : async fn handle_pagerequests_pipelined<IO>(
1483 0 : &mut self,
1484 0 : pgb_writer: &mut PostgresBackend<IO>,
1485 0 : pgb_reader: PostgresBackendReader<IO>,
1486 0 : tenant_id: TenantId,
1487 0 : timeline_id: TimelineId,
1488 0 : mut timeline_handles: TimelineHandles,
1489 0 : request_span: Span,
1490 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1491 0 : protocol_version: PagestreamProtocolVersion,
1492 0 : io_concurrency: IoConcurrency,
1493 0 : ctx: &RequestContext,
1494 0 : ) -> (
1495 0 : (PostgresBackendReader<IO>, TimelineHandles),
1496 0 : Result<(), QueryError>,
1497 0 : )
1498 0 : where
1499 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1500 0 : {
1501 0 : //
1502 0 : // Pipelined pagestream handling consists of
1503 0 : // - a Batcher that reads requests off the wire and
1504 0 : // and batches them if possible,
1505 0 : // - an Executor that processes the batched requests.
1506 0 : //
1507 0 : // The batch is built up inside an `spsc_fold` channel,
1508 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1509 0 : //
1510 0 : // The Batcher continously folds client requests into the batch,
1511 0 : // while the Executor can at any time take out what's in the batch
1512 0 : // in order to process it.
1513 0 : // This means the next batch builds up while the Executor
1514 0 : // executes the last batch.
1515 0 : //
1516 0 : // CANCELLATION
1517 0 : //
1518 0 : // We run both Batcher and Executor futures to completion before
1519 0 : // returning from this function.
1520 0 : //
1521 0 : // If Executor exits first, it signals cancellation to the Batcher
1522 0 : // via a CancellationToken that is child of `self.cancel`.
1523 0 : // If Batcher exits first, it signals cancellation to the Executor
1524 0 : // by dropping the spsc_fold channel Sender.
1525 0 : //
1526 0 : // CLEAN SHUTDOWN
1527 0 : //
1528 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1529 0 : // In response to such a client message, the Batcher exits.
1530 0 : // The Executor continues to run, draining the spsc_fold channel.
1531 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1532 0 : // indicating that the sender disconnected.
1533 0 : // The Executor exits with Ok(()) in response to that error.
1534 0 : //
1535 0 : // Server initiated shutdown is not clean shutdown, but instead
1536 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1537 0 : // error propagation.
1538 0 : //
1539 0 : // ERROR PROPAGATION
1540 0 : //
1541 0 : // When the Batcher encounter an error, it sends it as a value
1542 0 : // through the spsc_fold channel and exits afterwards.
1543 0 : // When the Executor observes such an error in the channel,
1544 0 : // it exits returning that error value.
1545 0 : //
1546 0 : // This design ensures that the Executor stage will still process
1547 0 : // the batch that was in flight when the Batcher encountered an error,
1548 0 : // thereby beahving identical to a serial implementation.
1549 0 :
1550 0 : let PageServicePipeliningConfigPipelined {
1551 0 : max_batch_size,
1552 0 : execution,
1553 0 : } = pipelining_config;
1554 :
1555 : // Macro to _define_ a pipeline stage.
1556 : macro_rules! pipeline_stage {
1557 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1558 : let cancel: CancellationToken = $cancel;
1559 : let stage_fut = $make_fut(cancel.clone());
1560 0 : async move {
1561 0 : scopeguard::defer! {
1562 0 : debug!("exiting");
1563 0 : }
1564 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1565 0 : .await
1566 0 : }
1567 : .instrument(tracing::info_span!($name))
1568 : }};
1569 : }
1570 :
1571 : //
1572 : // Batcher
1573 : //
1574 :
1575 0 : let cancel_batcher = self.cancel.child_token();
1576 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1577 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1578 0 : let ctx = ctx.attached_child();
1579 0 : async move {
1580 0 : let mut pgb_reader = pgb_reader;
1581 0 : let mut exit = false;
1582 0 : while !exit {
1583 0 : let read_res = Self::pagestream_read_message(
1584 0 : &mut pgb_reader,
1585 0 : tenant_id,
1586 0 : timeline_id,
1587 0 : &mut timeline_handles,
1588 0 : &cancel_batcher,
1589 0 : &ctx,
1590 0 : protocol_version,
1591 0 : request_span.clone(),
1592 0 : )
1593 0 : .await;
1594 0 : let Some(read_res) = read_res.transpose() else {
1595 0 : debug!("client-initiated shutdown");
1596 0 : break;
1597 : };
1598 0 : exit |= read_res.is_err();
1599 0 : let could_send = batch_tx
1600 0 : .send(read_res, |batch, res| {
1601 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1602 0 : })
1603 0 : .await;
1604 0 : exit |= could_send.is_err();
1605 : }
1606 0 : (pgb_reader, timeline_handles)
1607 0 : }
1608 0 : });
1609 :
1610 : //
1611 : // Executor
1612 : //
1613 :
1614 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1615 0 : let ctx = ctx.attached_child();
1616 0 : async move {
1617 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1618 : loop {
1619 0 : let maybe_batch = batch_rx.recv().await;
1620 0 : let batch = match maybe_batch {
1621 0 : Ok(batch) => batch,
1622 : Err(spsc_fold::RecvError::SenderGone) => {
1623 0 : debug!("upstream gone");
1624 0 : return Ok(());
1625 : }
1626 : };
1627 0 : let batch = match batch {
1628 0 : Ok(batch) => batch,
1629 0 : Err(e) => {
1630 0 : return Err(e);
1631 : }
1632 : };
1633 0 : self.pagesteam_handle_batched_message(
1634 0 : pgb_writer,
1635 0 : batch,
1636 0 : io_concurrency.clone(),
1637 0 : &cancel,
1638 0 : protocol_version,
1639 0 : &ctx,
1640 0 : )
1641 0 : .await?;
1642 : }
1643 0 : }
1644 0 : });
1645 :
1646 : //
1647 : // Execute the stages.
1648 : //
1649 :
1650 0 : match execution {
1651 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1652 0 : tokio::join!(batcher, executor)
1653 : }
1654 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1655 : // These tasks are not tracked anywhere.
1656 0 : let read_messages_task = tokio::spawn(batcher);
1657 0 : let (read_messages_task_res, executor_res_) =
1658 0 : tokio::join!(read_messages_task, executor,);
1659 0 : (
1660 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1661 0 : executor_res_,
1662 0 : )
1663 : }
1664 : }
1665 0 : }
1666 :
1667 : /// Helper function to handle the LSN from client request.
1668 : ///
1669 : /// Each GetPage (and Exists and Nblocks) request includes information about
1670 : /// which version of the page is being requested. The primary compute node
1671 : /// will always request the latest page version, by setting 'request_lsn' to
1672 : /// the last inserted or flushed WAL position, while a standby will request
1673 : /// a version at the LSN that it's currently caught up to.
1674 : ///
1675 : /// In either case, if the page server hasn't received the WAL up to the
1676 : /// requested LSN yet, we will wait for it to arrive. The return value is
1677 : /// the LSN that should be used to look up the page versions.
1678 : ///
1679 : /// In addition to the request LSN, each request carries another LSN,
1680 : /// 'not_modified_since', which is a hint to the pageserver that the client
1681 : /// knows that the page has not been modified between 'not_modified_since'
1682 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1683 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1684 : /// information about when the page was modified, it will use
1685 : /// not_modified_since == lsn. If the client lies and sends a too low
1686 : /// not_modified_hint such that there are in fact later page versions, the
1687 : /// behavior is undefined: the pageserver may return any of the page versions
1688 : /// or an error.
1689 0 : async fn wait_or_get_last_lsn(
1690 0 : timeline: &Timeline,
1691 0 : request_lsn: Lsn,
1692 0 : not_modified_since: Lsn,
1693 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1694 0 : ctx: &RequestContext,
1695 0 : ) -> Result<Lsn, PageStreamError> {
1696 0 : let last_record_lsn = timeline.get_last_record_lsn();
1697 0 :
1698 0 : // Sanity check the request
1699 0 : if request_lsn < not_modified_since {
1700 0 : return Err(PageStreamError::BadRequest(
1701 0 : format!(
1702 0 : "invalid request with request LSN {} and not_modified_since {}",
1703 0 : request_lsn, not_modified_since,
1704 0 : )
1705 0 : .into(),
1706 0 : ));
1707 0 : }
1708 0 :
1709 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1710 0 : if request_lsn == Lsn::INVALID {
1711 0 : return Err(PageStreamError::BadRequest(
1712 0 : "invalid LSN(0) in request".into(),
1713 0 : ));
1714 0 : }
1715 0 :
1716 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1717 0 : //
1718 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1719 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1720 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1721 0 : let gc_info = &timeline.gc_info.read().unwrap();
1722 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
1723 0 : return Err(
1724 0 : PageStreamError::BadRequest(format!(
1725 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1726 0 : request_lsn, **latest_gc_cutoff_lsn
1727 0 : ).into())
1728 0 : );
1729 0 : }
1730 0 : }
1731 :
1732 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1733 0 : if not_modified_since > last_record_lsn {
1734 0 : timeline
1735 0 : .wait_lsn(
1736 0 : not_modified_since,
1737 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1738 0 : timeline::WaitLsnTimeout::Default,
1739 0 : ctx,
1740 0 : )
1741 0 : .await?;
1742 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1743 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1744 : // advance immediately after we return anyway)
1745 0 : Ok(not_modified_since)
1746 : } else {
1747 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1748 : // here instead. That would give the same result, since we know that there
1749 : // haven't been any modifications since 'not_modified_since'. Using an older
1750 : // LSN might be faster, because that could allow skipping recent layers when
1751 : // finding the page. However, we have historically used 'last_record_lsn', so
1752 : // stick to that for now.
1753 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1754 : }
1755 0 : }
1756 :
1757 : /// Handles the lsn lease request.
1758 : /// If a lease cannot be obtained, the client will receive NULL.
1759 : #[instrument(skip_all, fields(shard_id, %lsn))]
1760 : async fn handle_make_lsn_lease<IO>(
1761 : &mut self,
1762 : pgb: &mut PostgresBackend<IO>,
1763 : tenant_shard_id: TenantShardId,
1764 : timeline_id: TimelineId,
1765 : lsn: Lsn,
1766 : ctx: &RequestContext,
1767 : ) -> Result<(), QueryError>
1768 : where
1769 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1770 : {
1771 : let timeline = self
1772 : .timeline_handles
1773 : .as_mut()
1774 : .unwrap()
1775 : .get(
1776 : tenant_shard_id.tenant_id,
1777 : timeline_id,
1778 : ShardSelector::Known(tenant_shard_id.to_index()),
1779 : )
1780 : .await?;
1781 : set_tracing_field_shard_id(&timeline);
1782 :
1783 : let lease = timeline
1784 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1785 0 : .inspect_err(|e| {
1786 0 : warn!("{e}");
1787 0 : })
1788 : .ok();
1789 0 : let valid_until_str = lease.map(|l| {
1790 0 : l.valid_until
1791 0 : .duration_since(SystemTime::UNIX_EPOCH)
1792 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1793 0 : .as_millis()
1794 0 : .to_string()
1795 0 : });
1796 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1797 :
1798 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1799 : b"valid_until",
1800 : )]))?
1801 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1802 :
1803 : Ok(())
1804 : }
1805 :
1806 : #[instrument(skip_all, fields(shard_id))]
1807 : async fn handle_get_rel_exists_request(
1808 : &mut self,
1809 : timeline: &Timeline,
1810 : req: &PagestreamExistsRequest,
1811 : ctx: &RequestContext,
1812 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1813 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1814 : let lsn = Self::wait_or_get_last_lsn(
1815 : timeline,
1816 : req.hdr.request_lsn,
1817 : req.hdr.not_modified_since,
1818 : &latest_gc_cutoff_lsn,
1819 : ctx,
1820 : )
1821 : .await?;
1822 :
1823 : let exists = timeline
1824 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1825 : .await?;
1826 :
1827 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1828 : req: *req,
1829 : exists,
1830 : }))
1831 : }
1832 :
1833 : #[instrument(skip_all, fields(shard_id))]
1834 : async fn handle_get_nblocks_request(
1835 : &mut self,
1836 : timeline: &Timeline,
1837 : req: &PagestreamNblocksRequest,
1838 : ctx: &RequestContext,
1839 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1840 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1841 : let lsn = Self::wait_or_get_last_lsn(
1842 : timeline,
1843 : req.hdr.request_lsn,
1844 : req.hdr.not_modified_since,
1845 : &latest_gc_cutoff_lsn,
1846 : ctx,
1847 : )
1848 : .await?;
1849 :
1850 : let n_blocks = timeline
1851 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1852 : .await?;
1853 :
1854 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1855 : req: *req,
1856 : n_blocks,
1857 : }))
1858 : }
1859 :
1860 : #[instrument(skip_all, fields(shard_id))]
1861 : async fn handle_db_size_request(
1862 : &mut self,
1863 : timeline: &Timeline,
1864 : req: &PagestreamDbSizeRequest,
1865 : ctx: &RequestContext,
1866 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1867 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1868 : let lsn = Self::wait_or_get_last_lsn(
1869 : timeline,
1870 : req.hdr.request_lsn,
1871 : req.hdr.not_modified_since,
1872 : &latest_gc_cutoff_lsn,
1873 : ctx,
1874 : )
1875 : .await?;
1876 :
1877 : let total_blocks = timeline
1878 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1879 : .await?;
1880 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1881 :
1882 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1883 : req: *req,
1884 : db_size,
1885 : }))
1886 : }
1887 :
1888 : #[instrument(skip_all)]
1889 : async fn handle_get_page_at_lsn_request_batched(
1890 : &mut self,
1891 : timeline: &Timeline,
1892 : effective_lsn: Lsn,
1893 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
1894 : io_concurrency: IoConcurrency,
1895 : ctx: &RequestContext,
1896 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1897 : debug_assert_current_span_has_tenant_and_timeline_id();
1898 :
1899 : timeline
1900 : .query_metrics
1901 : .observe_getpage_batch_start(requests.len());
1902 :
1903 : // If a page trace is running, submit an event for this request.
1904 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
1905 : let time = SystemTime::now();
1906 : for batch in &requests {
1907 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
1908 : // Ignore error (trace buffer may be full or tracer may have disconnected).
1909 : _ = page_trace.try_send(PageTraceEvent {
1910 : key,
1911 : effective_lsn,
1912 : time,
1913 : });
1914 : }
1915 : }
1916 :
1917 : let results = timeline
1918 : .get_rel_page_at_lsn_batched(
1919 0 : requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
1920 : effective_lsn,
1921 : io_concurrency,
1922 : ctx,
1923 : )
1924 : .await;
1925 : assert_eq!(results.len(), requests.len());
1926 :
1927 : // TODO: avoid creating the new Vec here
1928 : Vec::from_iter(
1929 : requests
1930 : .into_iter()
1931 : .zip(results.into_iter())
1932 0 : .map(|(req, res)| {
1933 0 : res.map(|page| {
1934 0 : (
1935 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
1936 0 : req: req.req,
1937 0 : page,
1938 0 : }),
1939 0 : req.timer,
1940 0 : )
1941 0 : })
1942 0 : .map_err(|e| BatchedPageStreamError {
1943 0 : err: PageStreamError::from(e),
1944 0 : req: req.req.hdr,
1945 0 : })
1946 0 : }),
1947 : )
1948 : }
1949 :
1950 : #[instrument(skip_all, fields(shard_id))]
1951 : async fn handle_get_slru_segment_request(
1952 : &mut self,
1953 : timeline: &Timeline,
1954 : req: &PagestreamGetSlruSegmentRequest,
1955 : ctx: &RequestContext,
1956 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1957 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1958 : let lsn = Self::wait_or_get_last_lsn(
1959 : timeline,
1960 : req.hdr.request_lsn,
1961 : req.hdr.not_modified_since,
1962 : &latest_gc_cutoff_lsn,
1963 : ctx,
1964 : )
1965 : .await?;
1966 :
1967 : let kind = SlruKind::from_repr(req.kind)
1968 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1969 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1970 :
1971 : Ok(PagestreamBeMessage::GetSlruSegment(
1972 : PagestreamGetSlruSegmentResponse { req: *req, segment },
1973 : ))
1974 : }
1975 :
1976 : // NB: this impl mimics what we do for batched getpage requests.
1977 : #[cfg(feature = "testing")]
1978 : #[instrument(skip_all, fields(shard_id))]
1979 : async fn handle_test_request_batch(
1980 : &mut self,
1981 : timeline: &Timeline,
1982 : requests: Vec<BatchedTestRequest>,
1983 : _ctx: &RequestContext,
1984 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1985 : // real requests would do something with the timeline
1986 : let mut results = Vec::with_capacity(requests.len());
1987 : for _req in requests.iter() {
1988 : tokio::task::yield_now().await;
1989 :
1990 : results.push({
1991 : if timeline.cancel.is_cancelled() {
1992 : Err(PageReconstructError::Cancelled)
1993 : } else {
1994 : Ok(())
1995 : }
1996 : });
1997 : }
1998 :
1999 : // TODO: avoid creating the new Vec here
2000 : Vec::from_iter(
2001 : requests
2002 : .into_iter()
2003 : .zip(results.into_iter())
2004 0 : .map(|(req, res)| {
2005 0 : res.map(|()| {
2006 0 : (
2007 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2008 0 : req: req.req.clone(),
2009 0 : }),
2010 0 : req.timer,
2011 0 : )
2012 0 : })
2013 0 : .map_err(|e| BatchedPageStreamError {
2014 0 : err: PageStreamError::from(e),
2015 0 : req: req.req.hdr,
2016 0 : })
2017 0 : }),
2018 : )
2019 : }
2020 :
2021 : /// Note on "fullbackup":
2022 : /// Full basebackups should only be used for debugging purposes.
2023 : /// Originally, it was introduced to enable breaking storage format changes,
2024 : /// but that is not applicable anymore.
2025 : ///
2026 : /// # Coding Discipline
2027 : ///
2028 : /// Coding discipline within this function: all interaction with the `pgb` connection
2029 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2030 : /// This is so that we can shutdown page_service quickly.
2031 : ///
2032 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2033 : /// to connection cancellation.
2034 : #[allow(clippy::too_many_arguments)]
2035 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2036 : async fn handle_basebackup_request<IO>(
2037 : &mut self,
2038 : pgb: &mut PostgresBackend<IO>,
2039 : tenant_id: TenantId,
2040 : timeline_id: TimelineId,
2041 : lsn: Option<Lsn>,
2042 : prev_lsn: Option<Lsn>,
2043 : full_backup: bool,
2044 : gzip: bool,
2045 : replica: bool,
2046 : ctx: &RequestContext,
2047 : ) -> Result<(), QueryError>
2048 : where
2049 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2050 : {
2051 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2052 0 : match err {
2053 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
2054 0 : BasebackupError::Server(e) => QueryError::Other(e),
2055 : }
2056 0 : }
2057 :
2058 : let started = std::time::Instant::now();
2059 :
2060 : let timeline = self
2061 : .timeline_handles
2062 : .as_mut()
2063 : .unwrap()
2064 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2065 : .await?;
2066 : set_tracing_field_shard_id(&timeline);
2067 :
2068 : if timeline.is_archived() == Some(true) {
2069 : // TODO after a grace period, turn this log line into a hard error
2070 : tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
2071 : //return Err(QueryError::NotFound("timeline is archived".into()))
2072 : }
2073 :
2074 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
2075 : if let Some(lsn) = lsn {
2076 : // Backup was requested at a particular LSN. Wait for it to arrive.
2077 : info!("waiting for {}", lsn);
2078 : timeline
2079 : .wait_lsn(
2080 : lsn,
2081 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2082 : crate::tenant::timeline::WaitLsnTimeout::Default,
2083 : ctx,
2084 : )
2085 : .await?;
2086 : timeline
2087 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2088 : .context("invalid basebackup lsn")?;
2089 : }
2090 :
2091 : let lsn_awaited_after = started.elapsed();
2092 :
2093 : // switch client to COPYOUT
2094 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2095 : .map_err(QueryError::Disconnected)?;
2096 : self.flush_cancellable(pgb, &self.cancel).await?;
2097 :
2098 : // Send a tarball of the latest layer on the timeline. Compress if not
2099 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2100 : if full_backup {
2101 : let mut writer = pgb.copyout_writer();
2102 : basebackup::send_basebackup_tarball(
2103 : &mut writer,
2104 : &timeline,
2105 : lsn,
2106 : prev_lsn,
2107 : full_backup,
2108 : replica,
2109 : ctx,
2110 : )
2111 : .await
2112 : .map_err(map_basebackup_error)?;
2113 : } else {
2114 : let mut writer = BufWriter::new(pgb.copyout_writer());
2115 : if gzip {
2116 : let mut encoder = GzipEncoder::with_quality(
2117 : &mut writer,
2118 : // NOTE using fast compression because it's on the critical path
2119 : // for compute startup. For an empty database, we get
2120 : // <100KB with this method. The Level::Best compression method
2121 : // gives us <20KB, but maybe we should add basebackup caching
2122 : // on compute shutdown first.
2123 : async_compression::Level::Fastest,
2124 : );
2125 : basebackup::send_basebackup_tarball(
2126 : &mut encoder,
2127 : &timeline,
2128 : lsn,
2129 : prev_lsn,
2130 : full_backup,
2131 : replica,
2132 : ctx,
2133 : )
2134 : .await
2135 : .map_err(map_basebackup_error)?;
2136 : // shutdown the encoder to ensure the gzip footer is written
2137 : encoder
2138 : .shutdown()
2139 : .await
2140 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2141 : } else {
2142 : basebackup::send_basebackup_tarball(
2143 : &mut writer,
2144 : &timeline,
2145 : lsn,
2146 : prev_lsn,
2147 : full_backup,
2148 : replica,
2149 : ctx,
2150 : )
2151 : .await
2152 : .map_err(map_basebackup_error)?;
2153 : }
2154 : writer
2155 : .flush()
2156 : .await
2157 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
2158 : }
2159 :
2160 : pgb.write_message_noflush(&BeMessage::CopyDone)
2161 : .map_err(QueryError::Disconnected)?;
2162 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2163 :
2164 : let basebackup_after = started
2165 : .elapsed()
2166 : .checked_sub(lsn_awaited_after)
2167 : .unwrap_or(Duration::ZERO);
2168 :
2169 : info!(
2170 : lsn_await_millis = lsn_awaited_after.as_millis(),
2171 : basebackup_millis = basebackup_after.as_millis(),
2172 : "basebackup complete"
2173 : );
2174 :
2175 : Ok(())
2176 : }
2177 :
2178 : // when accessing management api supply None as an argument
2179 : // when using to authorize tenant pass corresponding tenant id
2180 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2181 0 : if self.auth.is_none() {
2182 : // auth is set to Trust, nothing to check so just return ok
2183 0 : return Ok(());
2184 0 : }
2185 0 : // auth is some, just checked above, when auth is some
2186 0 : // then claims are always present because of checks during connection init
2187 0 : // so this expect won't trigger
2188 0 : let claims = self
2189 0 : .claims
2190 0 : .as_ref()
2191 0 : .expect("claims presence already checked");
2192 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2193 0 : }
2194 : }
2195 :
2196 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2197 : #[derive(Debug, Clone, Eq, PartialEq)]
2198 : struct BaseBackupCmd {
2199 : tenant_id: TenantId,
2200 : timeline_id: TimelineId,
2201 : lsn: Option<Lsn>,
2202 : gzip: bool,
2203 : replica: bool,
2204 : }
2205 :
2206 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2207 : #[derive(Debug, Clone, Eq, PartialEq)]
2208 : struct FullBackupCmd {
2209 : tenant_id: TenantId,
2210 : timeline_id: TimelineId,
2211 : lsn: Option<Lsn>,
2212 : prev_lsn: Option<Lsn>,
2213 : }
2214 :
2215 : /// `pagestream_v2 tenant timeline`
2216 : #[derive(Debug, Clone, Eq, PartialEq)]
2217 : struct PageStreamCmd {
2218 : tenant_id: TenantId,
2219 : timeline_id: TimelineId,
2220 : protocol_version: PagestreamProtocolVersion,
2221 : }
2222 :
2223 : /// `lease lsn tenant timeline lsn`
2224 : #[derive(Debug, Clone, Eq, PartialEq)]
2225 : struct LeaseLsnCmd {
2226 : tenant_shard_id: TenantShardId,
2227 : timeline_id: TimelineId,
2228 : lsn: Lsn,
2229 : }
2230 :
2231 : #[derive(Debug, Clone, Eq, PartialEq)]
2232 : enum PageServiceCmd {
2233 : Set,
2234 : PageStream(PageStreamCmd),
2235 : BaseBackup(BaseBackupCmd),
2236 : FullBackup(FullBackupCmd),
2237 : LeaseLsn(LeaseLsnCmd),
2238 : }
2239 :
2240 : impl PageStreamCmd {
2241 12 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2242 12 : let parameters = query.split_whitespace().collect_vec();
2243 12 : if parameters.len() != 2 {
2244 4 : bail!(
2245 4 : "invalid number of parameters for pagestream command: {}",
2246 4 : query
2247 4 : );
2248 8 : }
2249 8 : let tenant_id = TenantId::from_str(parameters[0])
2250 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2251 4 : let timeline_id = TimelineId::from_str(parameters[1])
2252 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2253 4 : Ok(Self {
2254 4 : tenant_id,
2255 4 : timeline_id,
2256 4 : protocol_version,
2257 4 : })
2258 12 : }
2259 : }
2260 :
2261 : impl FullBackupCmd {
2262 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2263 8 : let parameters = query.split_whitespace().collect_vec();
2264 8 : if parameters.len() < 2 || parameters.len() > 4 {
2265 0 : bail!(
2266 0 : "invalid number of parameters for basebackup command: {}",
2267 0 : query
2268 0 : );
2269 8 : }
2270 8 : let tenant_id = TenantId::from_str(parameters[0])
2271 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2272 8 : let timeline_id = TimelineId::from_str(parameters[1])
2273 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2274 : // The caller is responsible for providing correct lsn and prev_lsn.
2275 8 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2276 : Some(
2277 4 : Lsn::from_str(lsn_str)
2278 4 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2279 : )
2280 : } else {
2281 4 : None
2282 : };
2283 8 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2284 : Some(
2285 4 : Lsn::from_str(prev_lsn_str)
2286 4 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2287 : )
2288 : } else {
2289 4 : None
2290 : };
2291 8 : Ok(Self {
2292 8 : tenant_id,
2293 8 : timeline_id,
2294 8 : lsn,
2295 8 : prev_lsn,
2296 8 : })
2297 8 : }
2298 : }
2299 :
2300 : impl BaseBackupCmd {
2301 36 : fn parse(query: &str) -> anyhow::Result<Self> {
2302 36 : let parameters = query.split_whitespace().collect_vec();
2303 36 : if parameters.len() < 2 {
2304 0 : bail!(
2305 0 : "invalid number of parameters for basebackup command: {}",
2306 0 : query
2307 0 : );
2308 36 : }
2309 36 : let tenant_id = TenantId::from_str(parameters[0])
2310 36 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2311 36 : let timeline_id = TimelineId::from_str(parameters[1])
2312 36 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2313 : let lsn;
2314 : let flags_parse_from;
2315 36 : if let Some(maybe_lsn) = parameters.get(2) {
2316 32 : if *maybe_lsn == "latest" {
2317 4 : lsn = None;
2318 4 : flags_parse_from = 3;
2319 28 : } else if maybe_lsn.starts_with("--") {
2320 20 : lsn = None;
2321 20 : flags_parse_from = 2;
2322 20 : } else {
2323 : lsn = Some(
2324 8 : Lsn::from_str(maybe_lsn)
2325 8 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2326 : );
2327 8 : flags_parse_from = 3;
2328 : }
2329 4 : } else {
2330 4 : lsn = None;
2331 4 : flags_parse_from = 2;
2332 4 : }
2333 :
2334 36 : let mut gzip = false;
2335 36 : let mut replica = false;
2336 :
2337 44 : for ¶m in ¶meters[flags_parse_from..] {
2338 44 : match param {
2339 44 : "--gzip" => {
2340 28 : if gzip {
2341 4 : bail!("duplicate parameter for basebackup command: {param}")
2342 24 : }
2343 24 : gzip = true
2344 : }
2345 16 : "--replica" => {
2346 8 : if replica {
2347 0 : bail!("duplicate parameter for basebackup command: {param}")
2348 8 : }
2349 8 : replica = true
2350 : }
2351 8 : _ => bail!("invalid parameter for basebackup command: {param}"),
2352 : }
2353 : }
2354 24 : Ok(Self {
2355 24 : tenant_id,
2356 24 : timeline_id,
2357 24 : lsn,
2358 24 : gzip,
2359 24 : replica,
2360 24 : })
2361 36 : }
2362 : }
2363 :
2364 : impl LeaseLsnCmd {
2365 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2366 8 : let parameters = query.split_whitespace().collect_vec();
2367 8 : if parameters.len() != 3 {
2368 0 : bail!(
2369 0 : "invalid number of parameters for lease lsn command: {}",
2370 0 : query
2371 0 : );
2372 8 : }
2373 8 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2374 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2375 8 : let timeline_id = TimelineId::from_str(parameters[1])
2376 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2377 8 : let lsn = Lsn::from_str(parameters[2])
2378 8 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2379 8 : Ok(Self {
2380 8 : tenant_shard_id,
2381 8 : timeline_id,
2382 8 : lsn,
2383 8 : })
2384 8 : }
2385 : }
2386 :
2387 : impl PageServiceCmd {
2388 84 : fn parse(query: &str) -> anyhow::Result<Self> {
2389 84 : let query = query.trim();
2390 84 : let Some((cmd, other)) = query.split_once(' ') else {
2391 8 : bail!("cannot parse query: {query}")
2392 : };
2393 76 : match cmd.to_ascii_lowercase().as_str() {
2394 76 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2395 12 : other,
2396 12 : PagestreamProtocolVersion::V2,
2397 12 : )?)),
2398 64 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2399 0 : other,
2400 0 : PagestreamProtocolVersion::V3,
2401 0 : )?)),
2402 64 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2403 28 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2404 20 : "lease" => {
2405 12 : let Some((cmd2, other)) = other.split_once(' ') else {
2406 0 : bail!("invalid lease command: {cmd}");
2407 : };
2408 12 : let cmd2 = cmd2.to_ascii_lowercase();
2409 12 : if cmd2 == "lsn" {
2410 8 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2411 : } else {
2412 4 : bail!("invalid lease command: {cmd}");
2413 : }
2414 : }
2415 8 : "set" => Ok(Self::Set),
2416 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2417 : }
2418 84 : }
2419 : }
2420 :
2421 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2422 : where
2423 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2424 : {
2425 0 : fn check_auth_jwt(
2426 0 : &mut self,
2427 0 : _pgb: &mut PostgresBackend<IO>,
2428 0 : jwt_response: &[u8],
2429 0 : ) -> Result<(), QueryError> {
2430 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2431 : // which requires auth to be present
2432 0 : let data = self
2433 0 : .auth
2434 0 : .as_ref()
2435 0 : .unwrap()
2436 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2437 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2438 :
2439 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2440 0 : return Err(QueryError::Unauthorized(
2441 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2442 0 : ));
2443 0 : }
2444 0 :
2445 0 : debug!(
2446 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2447 : data.claims.scope, data.claims.tenant_id,
2448 : );
2449 :
2450 0 : self.claims = Some(data.claims);
2451 0 : Ok(())
2452 0 : }
2453 :
2454 0 : fn startup(
2455 0 : &mut self,
2456 0 : _pgb: &mut PostgresBackend<IO>,
2457 0 : _sm: &FeStartupPacket,
2458 0 : ) -> Result<(), QueryError> {
2459 0 : fail::fail_point!("ps::connection-start::startup-packet");
2460 0 : Ok(())
2461 0 : }
2462 :
2463 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2464 : async fn process_query(
2465 : &mut self,
2466 : pgb: &mut PostgresBackend<IO>,
2467 : query_string: &str,
2468 : ) -> Result<(), QueryError> {
2469 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2470 0 : info!("Hit failpoint for bad connection");
2471 0 : Err(QueryError::SimulatedConnectionError)
2472 0 : });
2473 :
2474 : fail::fail_point!("ps::connection-start::process-query");
2475 :
2476 : let ctx = self.connection_ctx.attached_child();
2477 : debug!("process query {query_string}");
2478 : let query = PageServiceCmd::parse(query_string)?;
2479 : match query {
2480 : PageServiceCmd::PageStream(PageStreamCmd {
2481 : tenant_id,
2482 : timeline_id,
2483 : protocol_version,
2484 : }) => {
2485 : tracing::Span::current()
2486 : .record("tenant_id", field::display(tenant_id))
2487 : .record("timeline_id", field::display(timeline_id));
2488 :
2489 : self.check_permission(Some(tenant_id))?;
2490 : let command_kind = match protocol_version {
2491 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2492 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2493 : };
2494 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2495 :
2496 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2497 : .await?;
2498 : }
2499 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2500 : tenant_id,
2501 : timeline_id,
2502 : lsn,
2503 : gzip,
2504 : replica,
2505 : }) => {
2506 : tracing::Span::current()
2507 : .record("tenant_id", field::display(tenant_id))
2508 : .record("timeline_id", field::display(timeline_id));
2509 :
2510 : self.check_permission(Some(tenant_id))?;
2511 :
2512 : COMPUTE_COMMANDS_COUNTERS
2513 : .for_command(ComputeCommandKind::Basebackup)
2514 : .inc();
2515 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2516 0 : let res = async {
2517 0 : self.handle_basebackup_request(
2518 0 : pgb,
2519 0 : tenant_id,
2520 0 : timeline_id,
2521 0 : lsn,
2522 0 : None,
2523 0 : false,
2524 0 : gzip,
2525 0 : replica,
2526 0 : &ctx,
2527 0 : )
2528 0 : .await?;
2529 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2530 0 : Result::<(), QueryError>::Ok(())
2531 0 : }
2532 : .await;
2533 : metric_recording.observe(&res);
2534 : res?;
2535 : }
2536 : // same as basebackup, but result includes relational data as well
2537 : PageServiceCmd::FullBackup(FullBackupCmd {
2538 : tenant_id,
2539 : timeline_id,
2540 : lsn,
2541 : prev_lsn,
2542 : }) => {
2543 : tracing::Span::current()
2544 : .record("tenant_id", field::display(tenant_id))
2545 : .record("timeline_id", field::display(timeline_id));
2546 :
2547 : self.check_permission(Some(tenant_id))?;
2548 :
2549 : COMPUTE_COMMANDS_COUNTERS
2550 : .for_command(ComputeCommandKind::Fullbackup)
2551 : .inc();
2552 :
2553 : // Check that the timeline exists
2554 : self.handle_basebackup_request(
2555 : pgb,
2556 : tenant_id,
2557 : timeline_id,
2558 : lsn,
2559 : prev_lsn,
2560 : true,
2561 : false,
2562 : false,
2563 : &ctx,
2564 : )
2565 : .await?;
2566 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2567 : }
2568 : PageServiceCmd::Set => {
2569 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2570 : // on connect
2571 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2572 : }
2573 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2574 : tenant_shard_id,
2575 : timeline_id,
2576 : lsn,
2577 : }) => {
2578 : tracing::Span::current()
2579 : .record("tenant_id", field::display(tenant_shard_id))
2580 : .record("timeline_id", field::display(timeline_id));
2581 :
2582 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2583 :
2584 : COMPUTE_COMMANDS_COUNTERS
2585 : .for_command(ComputeCommandKind::LeaseLsn)
2586 : .inc();
2587 :
2588 : match self
2589 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2590 : .await
2591 : {
2592 : Ok(()) => {
2593 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2594 : }
2595 : Err(e) => {
2596 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2597 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2598 : &e.to_string(),
2599 : Some(e.pg_error_code()),
2600 : ))?
2601 : }
2602 : };
2603 : }
2604 : }
2605 :
2606 : Ok(())
2607 : }
2608 : }
2609 :
2610 : impl From<GetActiveTenantError> for QueryError {
2611 0 : fn from(e: GetActiveTenantError) -> Self {
2612 0 : match e {
2613 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2614 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2615 0 : ),
2616 : GetActiveTenantError::Cancelled
2617 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2618 0 : QueryError::Shutdown
2619 : }
2620 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2621 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2622 : }
2623 0 : }
2624 : }
2625 :
2626 : #[derive(Debug, thiserror::Error)]
2627 : pub(crate) enum GetActiveTimelineError {
2628 : #[error(transparent)]
2629 : Tenant(GetActiveTenantError),
2630 : #[error(transparent)]
2631 : Timeline(#[from] GetTimelineError),
2632 : }
2633 :
2634 : impl From<GetActiveTimelineError> for QueryError {
2635 0 : fn from(e: GetActiveTimelineError) -> Self {
2636 0 : match e {
2637 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2638 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2639 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2640 : }
2641 0 : }
2642 : }
2643 :
2644 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
2645 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
2646 0 : match e {
2647 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
2648 0 : }
2649 0 : }
2650 : }
2651 :
2652 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2653 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2654 0 : tracing::Span::current().record(
2655 0 : "shard_id",
2656 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2657 0 : );
2658 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2659 0 : }
2660 :
2661 : struct WaitedForLsn(Lsn);
2662 : impl From<WaitedForLsn> for Lsn {
2663 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2664 0 : lsn
2665 0 : }
2666 : }
2667 :
2668 : #[cfg(test)]
2669 : mod tests {
2670 : use utils::shard::ShardCount;
2671 :
2672 : use super::*;
2673 :
2674 : #[test]
2675 4 : fn pageservice_cmd_parse() {
2676 4 : let tenant_id = TenantId::generate();
2677 4 : let timeline_id = TimelineId::generate();
2678 4 : let cmd =
2679 4 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2680 4 : assert_eq!(
2681 4 : cmd,
2682 4 : PageServiceCmd::PageStream(PageStreamCmd {
2683 4 : tenant_id,
2684 4 : timeline_id,
2685 4 : protocol_version: PagestreamProtocolVersion::V2,
2686 4 : })
2687 4 : );
2688 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2689 4 : assert_eq!(
2690 4 : cmd,
2691 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2692 4 : tenant_id,
2693 4 : timeline_id,
2694 4 : lsn: None,
2695 4 : gzip: false,
2696 4 : replica: false
2697 4 : })
2698 4 : );
2699 4 : let cmd =
2700 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2701 4 : assert_eq!(
2702 4 : cmd,
2703 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2704 4 : tenant_id,
2705 4 : timeline_id,
2706 4 : lsn: None,
2707 4 : gzip: true,
2708 4 : replica: false
2709 4 : })
2710 4 : );
2711 4 : let cmd =
2712 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2713 4 : assert_eq!(
2714 4 : cmd,
2715 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2716 4 : tenant_id,
2717 4 : timeline_id,
2718 4 : lsn: None,
2719 4 : gzip: false,
2720 4 : replica: false
2721 4 : })
2722 4 : );
2723 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2724 4 : .unwrap();
2725 4 : assert_eq!(
2726 4 : cmd,
2727 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2728 4 : tenant_id,
2729 4 : timeline_id,
2730 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2731 4 : gzip: false,
2732 4 : replica: false
2733 4 : })
2734 4 : );
2735 4 : let cmd = PageServiceCmd::parse(&format!(
2736 4 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2737 4 : ))
2738 4 : .unwrap();
2739 4 : assert_eq!(
2740 4 : cmd,
2741 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2742 4 : tenant_id,
2743 4 : timeline_id,
2744 4 : lsn: None,
2745 4 : gzip: true,
2746 4 : replica: true
2747 4 : })
2748 4 : );
2749 4 : let cmd = PageServiceCmd::parse(&format!(
2750 4 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2751 4 : ))
2752 4 : .unwrap();
2753 4 : assert_eq!(
2754 4 : cmd,
2755 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2756 4 : tenant_id,
2757 4 : timeline_id,
2758 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2759 4 : gzip: true,
2760 4 : replica: true
2761 4 : })
2762 4 : );
2763 4 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2764 4 : assert_eq!(
2765 4 : cmd,
2766 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2767 4 : tenant_id,
2768 4 : timeline_id,
2769 4 : lsn: None,
2770 4 : prev_lsn: None
2771 4 : })
2772 4 : );
2773 4 : let cmd = PageServiceCmd::parse(&format!(
2774 4 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2775 4 : ))
2776 4 : .unwrap();
2777 4 : assert_eq!(
2778 4 : cmd,
2779 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2780 4 : tenant_id,
2781 4 : timeline_id,
2782 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2783 4 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2784 4 : })
2785 4 : );
2786 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2787 4 : let cmd = PageServiceCmd::parse(&format!(
2788 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2789 4 : ))
2790 4 : .unwrap();
2791 4 : assert_eq!(
2792 4 : cmd,
2793 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2794 4 : tenant_shard_id,
2795 4 : timeline_id,
2796 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2797 4 : })
2798 4 : );
2799 4 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2800 4 : let cmd = PageServiceCmd::parse(&format!(
2801 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2802 4 : ))
2803 4 : .unwrap();
2804 4 : assert_eq!(
2805 4 : cmd,
2806 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2807 4 : tenant_shard_id,
2808 4 : timeline_id,
2809 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2810 4 : })
2811 4 : );
2812 4 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2813 4 : assert_eq!(cmd, PageServiceCmd::Set);
2814 4 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2815 4 : assert_eq!(cmd, PageServiceCmd::Set);
2816 4 : }
2817 :
2818 : #[test]
2819 4 : fn pageservice_cmd_err_handling() {
2820 4 : let tenant_id = TenantId::generate();
2821 4 : let timeline_id = TimelineId::generate();
2822 4 : let cmd = PageServiceCmd::parse("unknown_command");
2823 4 : assert!(cmd.is_err());
2824 4 : let cmd = PageServiceCmd::parse("pagestream_v2");
2825 4 : assert!(cmd.is_err());
2826 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2827 4 : assert!(cmd.is_err());
2828 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2829 4 : assert!(cmd.is_err());
2830 4 : let cmd = PageServiceCmd::parse(&format!(
2831 4 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2832 4 : ));
2833 4 : assert!(cmd.is_err());
2834 4 : let cmd = PageServiceCmd::parse(&format!(
2835 4 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2836 4 : ));
2837 4 : assert!(cmd.is_err());
2838 4 : let cmd = PageServiceCmd::parse(&format!(
2839 4 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2840 4 : ));
2841 4 : assert!(cmd.is_err());
2842 4 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2843 4 : assert!(cmd.is_err());
2844 4 : }
2845 : }
|