Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::config::{
11 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
12 : PageServiceProtocolPipelinedExecutionStrategy,
13 : };
14 : use pageserver_api::models::{self, TenantState};
15 : use pageserver_api::models::{
16 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
17 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
18 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
19 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
20 : PagestreamProtocolVersion, PagestreamRequest,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{
24 : is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError,
25 : };
26 : use pq_proto::framed::ConnectionError;
27 : use pq_proto::FeStartupPacket;
28 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
29 : use std::borrow::Cow;
30 : use std::io;
31 : use std::num::NonZeroUsize;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::SystemTime;
36 : use std::time::{Duration, Instant};
37 : use strum_macros::IntoStaticStr;
38 : use tokio::io::{AsyncRead, AsyncWrite};
39 : use tokio::io::{AsyncWriteExt, BufWriter};
40 : use tokio::task::JoinHandle;
41 : use tokio_util::sync::CancellationToken;
42 : use tracing::*;
43 : use utils::logging::warn_slow;
44 : use utils::sync::gate::{Gate, GateGuard};
45 : use utils::sync::spsc_fold;
46 : use utils::{
47 : auth::{Claims, Scope, SwappableJwtAuth},
48 : id::{TenantId, TimelineId},
49 : lsn::Lsn,
50 : simple_rcu::RcuReadGuard,
51 : };
52 :
53 : use crate::auth::check_permission;
54 : use crate::basebackup::BasebackupError;
55 : use crate::config::PageServerConf;
56 : use crate::context::{DownloadBehavior, RequestContext};
57 : use crate::metrics::{self, SmgrOpTimer};
58 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
59 : use crate::pgdatadir_mapping::Version;
60 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
61 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
62 : use crate::task_mgr::TaskKind;
63 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
64 : use crate::tenant::mgr::ShardSelector;
65 : use crate::tenant::mgr::TenantManager;
66 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
67 : use crate::tenant::storage_layer::IoConcurrency;
68 : use crate::tenant::timeline::{self, WaitLsnError};
69 : use crate::tenant::GetTimelineError;
70 : use crate::tenant::PageReconstructError;
71 : use crate::tenant::Timeline;
72 : use crate::{basebackup, timed_after_cancellation};
73 : use pageserver_api::key::rel_block_to_key;
74 : use pageserver_api::models::PageTraceEvent;
75 : use pageserver_api::reltag::SlruKind;
76 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
77 : use postgres_ffi::BLCKSZ;
78 : use std::os::fd::AsRawFd;
79 :
80 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
81 : /// is not yet in state [`TenantState::Active`].
82 : ///
83 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
84 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
85 :
86 : /// Threshold at which to log a warning about slow GetPage requests.
87 : const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
88 :
89 : ///////////////////////////////////////////////////////////////////////////////
90 :
91 : pub struct Listener {
92 : cancel: CancellationToken,
93 : /// Cancel the listener task through `listen_cancel` to shut down the listener
94 : /// and get a handle on the existing connections.
95 : task: JoinHandle<Connections>,
96 : }
97 :
98 : pub struct Connections {
99 : cancel: CancellationToken,
100 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
101 : gate: Gate,
102 : }
103 :
104 0 : pub fn spawn(
105 0 : conf: &'static PageServerConf,
106 0 : tenant_manager: Arc<TenantManager>,
107 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
108 0 : tcp_listener: tokio::net::TcpListener,
109 0 : ) -> Listener {
110 0 : let cancel = CancellationToken::new();
111 0 : let libpq_ctx = RequestContext::todo_child(
112 0 : TaskKind::LibpqEndpointListener,
113 0 : // listener task shouldn't need to download anything. (We will
114 0 : // create a separate sub-contexts for each connection, with their
115 0 : // own download behavior. This context is used only to listen and
116 0 : // accept connections.)
117 0 : DownloadBehavior::Error,
118 0 : );
119 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
120 0 : "libpq listener",
121 0 : libpq_listener_main(
122 0 : conf,
123 0 : tenant_manager,
124 0 : pg_auth,
125 0 : tcp_listener,
126 0 : conf.pg_auth_type,
127 0 : conf.page_service_pipelining.clone(),
128 0 : libpq_ctx,
129 0 : cancel.clone(),
130 0 : )
131 0 : .map(anyhow::Ok),
132 0 : ));
133 0 :
134 0 : Listener { cancel, task }
135 0 : }
136 :
137 : impl Listener {
138 0 : pub async fn stop_accepting(self) -> Connections {
139 0 : self.cancel.cancel();
140 0 : self.task
141 0 : .await
142 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
143 0 : }
144 : }
145 : impl Connections {
146 0 : pub(crate) async fn shutdown(self) {
147 0 : let Self {
148 0 : cancel,
149 0 : mut tasks,
150 0 : gate,
151 0 : } = self;
152 0 : cancel.cancel();
153 0 : while let Some(res) = tasks.join_next().await {
154 0 : Self::handle_connection_completion(res);
155 0 : }
156 0 : gate.close().await;
157 0 : }
158 :
159 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
160 0 : match res {
161 0 : Ok(Ok(())) => {}
162 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
163 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
164 : }
165 0 : }
166 : }
167 :
168 : ///
169 : /// Main loop of the page service.
170 : ///
171 : /// Listens for connections, and launches a new handler task for each.
172 : ///
173 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
174 : /// open connections.
175 : ///
176 : #[allow(clippy::too_many_arguments)]
177 0 : pub async fn libpq_listener_main(
178 0 : conf: &'static PageServerConf,
179 0 : tenant_manager: Arc<TenantManager>,
180 0 : auth: Option<Arc<SwappableJwtAuth>>,
181 0 : listener: tokio::net::TcpListener,
182 0 : auth_type: AuthType,
183 0 : pipelining_config: PageServicePipeliningConfig,
184 0 : listener_ctx: RequestContext,
185 0 : listener_cancel: CancellationToken,
186 0 : ) -> Connections {
187 0 : let connections_cancel = CancellationToken::new();
188 0 : let connections_gate = Gate::default();
189 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
190 :
191 : loop {
192 0 : let gate_guard = match connections_gate.enter() {
193 0 : Ok(guard) => guard,
194 0 : Err(_) => break,
195 : };
196 :
197 0 : let accepted = tokio::select! {
198 : biased;
199 0 : _ = listener_cancel.cancelled() => break,
200 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
201 0 : let res = next.expect("we dont poll while empty");
202 0 : Connections::handle_connection_completion(res);
203 0 : continue;
204 : }
205 0 : accepted = listener.accept() => accepted,
206 0 : };
207 0 :
208 0 : match accepted {
209 0 : Ok((socket, peer_addr)) => {
210 0 : // Connection established. Spawn a new task to handle it.
211 0 : debug!("accepted connection from {}", peer_addr);
212 0 : let local_auth = auth.clone();
213 0 : let connection_ctx = listener_ctx
214 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
215 0 : connection_handler_tasks.spawn(page_service_conn_main(
216 0 : conf,
217 0 : tenant_manager.clone(),
218 0 : local_auth,
219 0 : socket,
220 0 : auth_type,
221 0 : pipelining_config.clone(),
222 0 : connection_ctx,
223 0 : connections_cancel.child_token(),
224 0 : gate_guard,
225 0 : ));
226 : }
227 0 : Err(err) => {
228 0 : // accept() failed. Log the error, and loop back to retry on next connection.
229 0 : error!("accept() failed: {:?}", err);
230 : }
231 : }
232 : }
233 :
234 0 : debug!("page_service listener loop terminated");
235 :
236 0 : Connections {
237 0 : cancel: connections_cancel,
238 0 : tasks: connection_handler_tasks,
239 0 : gate: connections_gate,
240 0 : }
241 0 : }
242 :
243 : type ConnectionHandlerResult = anyhow::Result<()>;
244 :
245 : #[instrument(skip_all, fields(peer_addr, application_name))]
246 : #[allow(clippy::too_many_arguments)]
247 : async fn page_service_conn_main(
248 : conf: &'static PageServerConf,
249 : tenant_manager: Arc<TenantManager>,
250 : auth: Option<Arc<SwappableJwtAuth>>,
251 : socket: tokio::net::TcpStream,
252 : auth_type: AuthType,
253 : pipelining_config: PageServicePipeliningConfig,
254 : connection_ctx: RequestContext,
255 : cancel: CancellationToken,
256 : gate_guard: GateGuard,
257 : ) -> ConnectionHandlerResult {
258 : let _guard = LIVE_CONNECTIONS
259 : .with_label_values(&["page_service"])
260 : .guard();
261 :
262 : socket
263 : .set_nodelay(true)
264 : .context("could not set TCP_NODELAY")?;
265 :
266 : let socket_fd = socket.as_raw_fd();
267 :
268 : let peer_addr = socket.peer_addr().context("get peer address")?;
269 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
270 :
271 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
272 : // - long enough for most valid compute connections
273 : // - less than infinite to stop us from "leaking" connections to long-gone computes
274 : //
275 : // no write timeout is used, because the kernel is assumed to error writes after some time.
276 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
277 :
278 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
279 0 : let socket_timeout_ms = (|| {
280 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
281 : // Exponential distribution for simulating
282 : // poor network conditions, expect about avg_timeout_ms to be around 15
283 : // in tests
284 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
285 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
286 0 : let u = rand::random::<f32>();
287 0 : ((1.0 - u).ln() / (-avg)) as u64
288 : } else {
289 0 : default_timeout_ms
290 : }
291 0 : });
292 0 : default_timeout_ms
293 : })();
294 :
295 : // A timeout here does not mean the client died, it can happen if it's just idle for
296 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
297 : // they reconnect.
298 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
299 : let socket = Box::pin(socket);
300 :
301 : fail::fail_point!("ps::connection-start::pre-login");
302 :
303 : // XXX: pgbackend.run() should take the connection_ctx,
304 : // and create a child per-query context when it invokes process_query.
305 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
306 : // and create the per-query context in process_query ourselves.
307 : let mut conn_handler = PageServerHandler::new(
308 : conf,
309 : tenant_manager,
310 : auth,
311 : pipelining_config,
312 : connection_ctx,
313 : cancel.clone(),
314 : gate_guard,
315 : );
316 : let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
317 :
318 : match pgbackend.run(&mut conn_handler, &cancel).await {
319 : Ok(()) => {
320 : // we've been requested to shut down
321 : Ok(())
322 : }
323 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
324 : if is_expected_io_error(&io_error) {
325 : info!("Postgres client disconnected ({io_error})");
326 : Ok(())
327 : } else {
328 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
329 : Err(io_error).context(format!(
330 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
331 : tenant_id, peer_addr
332 : ))
333 : }
334 : }
335 : other => {
336 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
337 : other.context(format!(
338 : "Postgres query error for tenant_id={:?} client peer_addr={}",
339 : tenant_id, peer_addr
340 : ))
341 : }
342 : }
343 : }
344 :
345 : struct PageServerHandler {
346 : conf: &'static PageServerConf,
347 : auth: Option<Arc<SwappableJwtAuth>>,
348 : claims: Option<Claims>,
349 :
350 : /// The context created for the lifetime of the connection
351 : /// services by this PageServerHandler.
352 : /// For each query received over the connection,
353 : /// `process_query` creates a child context from this one.
354 : connection_ctx: RequestContext,
355 :
356 : cancel: CancellationToken,
357 :
358 : /// None only while pagestream protocol is being processed.
359 : timeline_handles: Option<TimelineHandles>,
360 :
361 : pipelining_config: PageServicePipeliningConfig,
362 :
363 : gate_guard: GateGuard,
364 : }
365 :
366 : struct TimelineHandles {
367 : wrapper: TenantManagerWrapper,
368 : /// Note on size: the typical size of this map is 1. The largest size we expect
369 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
370 : /// or the ratio used when splitting shards (i.e. how many children created from one)
371 : /// parent shard, where a "large" number might be ~8.
372 : handles: timeline::handle::Cache<TenantManagerTypes>,
373 : }
374 :
375 : impl TimelineHandles {
376 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
377 0 : Self {
378 0 : wrapper: TenantManagerWrapper {
379 0 : tenant_manager,
380 0 : tenant_id: OnceCell::new(),
381 0 : },
382 0 : handles: Default::default(),
383 0 : }
384 0 : }
385 0 : async fn get(
386 0 : &mut self,
387 0 : tenant_id: TenantId,
388 0 : timeline_id: TimelineId,
389 0 : shard_selector: ShardSelector,
390 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
391 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
392 0 : return Err(GetActiveTimelineError::Tenant(
393 0 : GetActiveTenantError::SwitchedTenant,
394 0 : ));
395 0 : }
396 0 : self.handles
397 0 : .get(timeline_id, shard_selector, &self.wrapper)
398 0 : .await
399 0 : .map_err(|e| match e {
400 0 : timeline::handle::GetError::TenantManager(e) => e,
401 : timeline::handle::GetError::TimelineGateClosed => {
402 0 : trace!("timeline gate closed");
403 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
404 : }
405 : timeline::handle::GetError::PerTimelineStateShutDown => {
406 0 : trace!("per-timeline state shut down");
407 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
408 : }
409 0 : })
410 0 : }
411 :
412 0 : fn tenant_id(&self) -> Option<TenantId> {
413 0 : self.wrapper.tenant_id.get().copied()
414 0 : }
415 : }
416 :
417 : pub(crate) struct TenantManagerWrapper {
418 : tenant_manager: Arc<TenantManager>,
419 : // We do not support switching tenant_id on a connection at this point.
420 : // We can can add support for this later if needed without changing
421 : // the protocol.
422 : tenant_id: once_cell::sync::OnceCell<TenantId>,
423 : }
424 :
425 : #[derive(Debug)]
426 : pub(crate) struct TenantManagerTypes;
427 :
428 : impl timeline::handle::Types for TenantManagerTypes {
429 : type TenantManagerError = GetActiveTimelineError;
430 : type TenantManager = TenantManagerWrapper;
431 : type Timeline = Arc<Timeline>;
432 : }
433 :
434 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
435 0 : fn gate(&self) -> &utils::sync::gate::Gate {
436 0 : &self.gate
437 0 : }
438 :
439 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
440 0 : Timeline::shard_timeline_id(self)
441 0 : }
442 :
443 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
444 0 : &self.handles
445 0 : }
446 :
447 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
448 0 : Timeline::get_shard_identity(self)
449 0 : }
450 : }
451 :
452 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
453 0 : async fn resolve(
454 0 : &self,
455 0 : timeline_id: TimelineId,
456 0 : shard_selector: ShardSelector,
457 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
458 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
459 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
460 0 : let wait_start = Instant::now();
461 0 : let deadline = wait_start + timeout;
462 0 : let tenant_shard = loop {
463 0 : let resolved = self
464 0 : .tenant_manager
465 0 : .resolve_attached_shard(tenant_id, shard_selector);
466 0 : match resolved {
467 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
468 : ShardResolveResult::NotFound => {
469 0 : return Err(GetActiveTimelineError::Tenant(
470 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
471 0 : ));
472 : }
473 0 : ShardResolveResult::InProgress(barrier) => {
474 0 : // We can't authoritatively answer right now: wait for InProgress state
475 0 : // to end, then try again
476 0 : tokio::select! {
477 0 : _ = barrier.wait() => {
478 0 : // The barrier completed: proceed around the loop to try looking up again
479 0 : },
480 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
481 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
482 0 : latest_state: None,
483 0 : wait_time: timeout,
484 0 : }));
485 : }
486 : }
487 : }
488 : };
489 : };
490 :
491 0 : tracing::debug!("Waiting for tenant to enter active state...");
492 0 : tenant_shard
493 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
494 0 : .await
495 0 : .map_err(GetActiveTimelineError::Tenant)?;
496 :
497 0 : let timeline = tenant_shard
498 0 : .get_timeline(timeline_id, true)
499 0 : .map_err(GetActiveTimelineError::Timeline)?;
500 0 : Ok(timeline)
501 0 : }
502 : }
503 :
504 : #[derive(thiserror::Error, Debug)]
505 : enum PageStreamError {
506 : /// We encountered an error that should prompt the client to reconnect:
507 : /// in practice this means we drop the connection without sending a response.
508 : #[error("Reconnect required: {0}")]
509 : Reconnect(Cow<'static, str>),
510 :
511 : /// We were instructed to shutdown while processing the query
512 : #[error("Shutting down")]
513 : Shutdown,
514 :
515 : /// Something went wrong reading a page: this likely indicates a pageserver bug
516 : #[error("Read error")]
517 : Read(#[source] PageReconstructError),
518 :
519 : /// Ran out of time waiting for an LSN
520 : #[error("LSN timeout: {0}")]
521 : LsnTimeout(WaitLsnError),
522 :
523 : /// The entity required to serve the request (tenant or timeline) is not found,
524 : /// or is not found in a suitable state to serve a request.
525 : #[error("Not found: {0}")]
526 : NotFound(Cow<'static, str>),
527 :
528 : /// Request asked for something that doesn't make sense, like an invalid LSN
529 : #[error("Bad request: {0}")]
530 : BadRequest(Cow<'static, str>),
531 : }
532 :
533 : impl From<PageReconstructError> for PageStreamError {
534 0 : fn from(value: PageReconstructError) -> Self {
535 0 : match value {
536 0 : PageReconstructError::Cancelled => Self::Shutdown,
537 0 : e => Self::Read(e),
538 : }
539 0 : }
540 : }
541 :
542 : impl From<GetActiveTimelineError> for PageStreamError {
543 0 : fn from(value: GetActiveTimelineError) -> Self {
544 0 : match value {
545 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
546 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
547 : TenantState::Stopping { .. },
548 : ))
549 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
550 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
551 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
552 : }
553 0 : }
554 : }
555 :
556 : impl From<WaitLsnError> for PageStreamError {
557 0 : fn from(value: WaitLsnError) -> Self {
558 0 : match value {
559 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
560 0 : WaitLsnError::Shutdown => Self::Shutdown,
561 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
562 : }
563 0 : }
564 : }
565 :
566 : impl From<WaitLsnError> for QueryError {
567 0 : fn from(value: WaitLsnError) -> Self {
568 0 : match value {
569 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
570 0 : WaitLsnError::Shutdown => Self::Shutdown,
571 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
572 : }
573 0 : }
574 : }
575 :
576 : #[derive(thiserror::Error, Debug)]
577 : struct BatchedPageStreamError {
578 : req: PagestreamRequest,
579 : err: PageStreamError,
580 : }
581 :
582 : impl std::fmt::Display for BatchedPageStreamError {
583 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
584 0 : self.err.fmt(f)
585 0 : }
586 : }
587 :
588 : struct BatchedGetPageRequest {
589 : req: PagestreamGetPageRequest,
590 : timer: SmgrOpTimer,
591 : }
592 :
593 : #[cfg(feature = "testing")]
594 : struct BatchedTestRequest {
595 : req: models::PagestreamTestRequest,
596 : timer: SmgrOpTimer,
597 : }
598 :
599 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
600 : /// so that we don't keep the [`Timeline::gate`] open while the batch
601 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
602 : #[derive(IntoStaticStr)]
603 : enum BatchedFeMessage {
604 : Exists {
605 : span: Span,
606 : timer: SmgrOpTimer,
607 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
608 : req: models::PagestreamExistsRequest,
609 : },
610 : Nblocks {
611 : span: Span,
612 : timer: SmgrOpTimer,
613 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
614 : req: models::PagestreamNblocksRequest,
615 : },
616 : GetPage {
617 : span: Span,
618 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
619 : effective_request_lsn: Lsn,
620 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
621 : },
622 : DbSize {
623 : span: Span,
624 : timer: SmgrOpTimer,
625 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
626 : req: models::PagestreamDbSizeRequest,
627 : },
628 : GetSlruSegment {
629 : span: Span,
630 : timer: SmgrOpTimer,
631 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
632 : req: models::PagestreamGetSlruSegmentRequest,
633 : },
634 : #[cfg(feature = "testing")]
635 : Test {
636 : span: Span,
637 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
638 : requests: Vec<BatchedTestRequest>,
639 : },
640 : RespondError {
641 : span: Span,
642 : error: BatchedPageStreamError,
643 : },
644 : }
645 :
646 : impl BatchedFeMessage {
647 0 : fn as_static_str(&self) -> &'static str {
648 0 : self.into()
649 0 : }
650 :
651 0 : fn observe_execution_start(&mut self, at: Instant) {
652 0 : match self {
653 0 : BatchedFeMessage::Exists { timer, .. }
654 0 : | BatchedFeMessage::Nblocks { timer, .. }
655 0 : | BatchedFeMessage::DbSize { timer, .. }
656 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
657 0 : timer.observe_execution_start(at);
658 0 : }
659 0 : BatchedFeMessage::GetPage { pages, .. } => {
660 0 : for page in pages {
661 0 : page.timer.observe_execution_start(at);
662 0 : }
663 : }
664 : #[cfg(feature = "testing")]
665 0 : BatchedFeMessage::Test { requests, .. } => {
666 0 : for req in requests {
667 0 : req.timer.observe_execution_start(at);
668 0 : }
669 : }
670 0 : BatchedFeMessage::RespondError { .. } => {}
671 : }
672 0 : }
673 : }
674 :
675 : impl PageServerHandler {
676 0 : pub fn new(
677 0 : conf: &'static PageServerConf,
678 0 : tenant_manager: Arc<TenantManager>,
679 0 : auth: Option<Arc<SwappableJwtAuth>>,
680 0 : pipelining_config: PageServicePipeliningConfig,
681 0 : connection_ctx: RequestContext,
682 0 : cancel: CancellationToken,
683 0 : gate_guard: GateGuard,
684 0 : ) -> Self {
685 0 : PageServerHandler {
686 0 : conf,
687 0 : auth,
688 0 : claims: None,
689 0 : connection_ctx,
690 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
691 0 : cancel,
692 0 : pipelining_config,
693 0 : gate_guard,
694 0 : }
695 0 : }
696 :
697 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
698 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
699 : /// cancellation if there aren't any timelines in the cache.
700 : ///
701 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
702 : /// timeline cancellation token.
703 0 : async fn flush_cancellable<IO>(
704 0 : &self,
705 0 : pgb: &mut PostgresBackend<IO>,
706 0 : cancel: &CancellationToken,
707 0 : ) -> Result<(), QueryError>
708 0 : where
709 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
710 0 : {
711 0 : tokio::select!(
712 0 : flush_r = pgb.flush() => {
713 0 : Ok(flush_r?)
714 : },
715 0 : _ = cancel.cancelled() => {
716 0 : Err(QueryError::Shutdown)
717 : }
718 : )
719 0 : }
720 :
721 : #[allow(clippy::too_many_arguments)]
722 0 : async fn pagestream_read_message<IO>(
723 0 : pgb: &mut PostgresBackendReader<IO>,
724 0 : tenant_id: TenantId,
725 0 : timeline_id: TimelineId,
726 0 : timeline_handles: &mut TimelineHandles,
727 0 : cancel: &CancellationToken,
728 0 : ctx: &RequestContext,
729 0 : protocol_version: PagestreamProtocolVersion,
730 0 : parent_span: Span,
731 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
732 0 : where
733 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
734 0 : {
735 0 : let msg = tokio::select! {
736 : biased;
737 0 : _ = cancel.cancelled() => {
738 0 : return Err(QueryError::Shutdown)
739 : }
740 0 : msg = pgb.read_message() => { msg }
741 0 : };
742 0 :
743 0 : let received_at = Instant::now();
744 :
745 0 : let copy_data_bytes = match msg? {
746 0 : Some(FeMessage::CopyData(bytes)) => bytes,
747 : Some(FeMessage::Terminate) => {
748 0 : return Ok(None);
749 : }
750 0 : Some(m) => {
751 0 : return Err(QueryError::Other(anyhow::anyhow!(
752 0 : "unexpected message: {m:?} during COPY"
753 0 : )));
754 : }
755 : None => {
756 0 : return Ok(None);
757 : } // client disconnected
758 : };
759 0 : trace!("query: {copy_data_bytes:?}");
760 :
761 0 : fail::fail_point!("ps::handle-pagerequest-message");
762 :
763 : // parse request
764 0 : let neon_fe_msg =
765 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
766 :
767 : // TODO: turn in to async closure once available to avoid repeating received_at
768 0 : async fn record_op_start_and_throttle(
769 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
770 0 : op: metrics::SmgrQueryType,
771 0 : received_at: Instant,
772 0 : ) -> Result<SmgrOpTimer, QueryError> {
773 0 : // It's important to start the smgr op metric recorder as early as possible
774 0 : // so that the _started counters are incremented before we do
775 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
776 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
777 0 : let now = Instant::now();
778 0 : timer.observe_throttle_start(now);
779 0 : let throttled = tokio::select! {
780 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
781 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
782 : };
783 0 : timer.observe_throttle_done(throttled);
784 0 : Ok(timer)
785 0 : }
786 :
787 0 : let batched_msg = match neon_fe_msg {
788 0 : PagestreamFeMessage::Exists(req) => {
789 0 : let shard = timeline_handles
790 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
791 0 : .await?;
792 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
793 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
794 0 : let timer = record_op_start_and_throttle(
795 0 : &shard,
796 0 : metrics::SmgrQueryType::GetRelExists,
797 0 : received_at,
798 0 : )
799 0 : .await?;
800 0 : BatchedFeMessage::Exists {
801 0 : span,
802 0 : timer,
803 0 : shard: shard.downgrade(),
804 0 : req,
805 0 : }
806 : }
807 0 : PagestreamFeMessage::Nblocks(req) => {
808 0 : let shard = timeline_handles
809 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
810 0 : .await?;
811 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
812 0 : let timer = record_op_start_and_throttle(
813 0 : &shard,
814 0 : metrics::SmgrQueryType::GetRelSize,
815 0 : received_at,
816 0 : )
817 0 : .await?;
818 0 : BatchedFeMessage::Nblocks {
819 0 : span,
820 0 : timer,
821 0 : shard: shard.downgrade(),
822 0 : req,
823 0 : }
824 : }
825 0 : PagestreamFeMessage::DbSize(req) => {
826 0 : let shard = timeline_handles
827 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
828 0 : .await?;
829 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
830 0 : let timer = record_op_start_and_throttle(
831 0 : &shard,
832 0 : metrics::SmgrQueryType::GetDbSize,
833 0 : received_at,
834 0 : )
835 0 : .await?;
836 0 : BatchedFeMessage::DbSize {
837 0 : span,
838 0 : timer,
839 0 : shard: shard.downgrade(),
840 0 : req,
841 0 : }
842 : }
843 0 : PagestreamFeMessage::GetSlruSegment(req) => {
844 0 : let shard = timeline_handles
845 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
846 0 : .await?;
847 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
848 0 : let timer = record_op_start_and_throttle(
849 0 : &shard,
850 0 : metrics::SmgrQueryType::GetSlruSegment,
851 0 : received_at,
852 0 : )
853 0 : .await?;
854 0 : BatchedFeMessage::GetSlruSegment {
855 0 : span,
856 0 : timer,
857 0 : shard: shard.downgrade(),
858 0 : req,
859 0 : }
860 : }
861 0 : PagestreamFeMessage::GetPage(req) => {
862 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
863 : macro_rules! mkspan {
864 : (before shard routing) => {{
865 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
866 : }};
867 : ($shard_id:expr) => {{
868 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
869 : }};
870 : }
871 :
872 : macro_rules! respond_error {
873 : ($span:expr, $error:expr) => {{
874 : let error = BatchedFeMessage::RespondError {
875 : span: $span,
876 : error: BatchedPageStreamError {
877 : req: req.hdr,
878 : err: $error,
879 : },
880 : };
881 : Ok(Some(error))
882 : }};
883 : }
884 :
885 0 : let key = rel_block_to_key(req.rel, req.blkno);
886 0 : let shard = match timeline_handles
887 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
888 0 : .await
889 : {
890 0 : Ok(tl) => tl,
891 0 : Err(e) => {
892 0 : let span = mkspan!(before shard routing);
893 0 : match e {
894 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
895 : // We already know this tenant exists in general, because we resolved it at
896 : // start of connection. Getting a NotFound here indicates that the shard containing
897 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
898 : // mapping is out of date.
899 : //
900 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
901 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
902 : // and talk to a different pageserver.
903 0 : return respond_error!(
904 0 : span,
905 0 : PageStreamError::Reconnect(
906 0 : "getpage@lsn request routed to wrong shard".into()
907 0 : )
908 0 : );
909 : }
910 0 : e => {
911 0 : return respond_error!(span, e.into());
912 : }
913 : }
914 : }
915 : };
916 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
917 :
918 0 : let timer = record_op_start_and_throttle(
919 0 : &shard,
920 0 : metrics::SmgrQueryType::GetPageAtLsn,
921 0 : received_at,
922 0 : )
923 0 : .await?;
924 :
925 : // We're holding the Handle
926 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
927 0 : &shard,
928 0 : req.hdr.request_lsn,
929 0 : req.hdr.not_modified_since,
930 0 : &shard.get_applied_gc_cutoff_lsn(),
931 0 : ctx,
932 0 : )
933 0 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
934 0 : .await
935 : {
936 0 : Ok(lsn) => lsn,
937 0 : Err(e) => {
938 0 : return respond_error!(span, e);
939 : }
940 : };
941 : BatchedFeMessage::GetPage {
942 0 : span,
943 0 : shard: shard.downgrade(),
944 0 : effective_request_lsn,
945 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
946 : }
947 : }
948 : #[cfg(feature = "testing")]
949 0 : PagestreamFeMessage::Test(req) => {
950 0 : let shard = timeline_handles
951 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
952 0 : .await?;
953 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
954 0 : let timer =
955 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
956 0 : .await?;
957 0 : BatchedFeMessage::Test {
958 0 : span,
959 0 : shard: shard.downgrade(),
960 0 : requests: vec![BatchedTestRequest { req, timer }],
961 0 : }
962 : }
963 : };
964 0 : Ok(Some(batched_msg))
965 0 : }
966 :
967 : /// Post-condition: `batch` is Some()
968 : #[instrument(skip_all, level = tracing::Level::TRACE)]
969 : #[allow(clippy::boxed_local)]
970 : fn pagestream_do_batch(
971 : max_batch_size: NonZeroUsize,
972 : batch: &mut Result<BatchedFeMessage, QueryError>,
973 : this_msg: Result<BatchedFeMessage, QueryError>,
974 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
975 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
976 :
977 : let this_msg = match this_msg {
978 : Ok(this_msg) => this_msg,
979 : Err(e) => return Err(Err(e)),
980 : };
981 :
982 : match (&mut *batch, this_msg) {
983 : // something batched already, let's see if we can add this message to the batch
984 : (
985 : Ok(BatchedFeMessage::GetPage {
986 : span: _,
987 : shard: accum_shard,
988 : pages: ref mut accum_pages,
989 : effective_request_lsn: accum_lsn,
990 : }),
991 : BatchedFeMessage::GetPage {
992 : span: _,
993 : shard: this_shard,
994 : pages: this_pages,
995 : effective_request_lsn: this_lsn,
996 : },
997 0 : ) if (|| {
998 0 : assert_eq!(this_pages.len(), 1);
999 0 : if accum_pages.len() >= max_batch_size.get() {
1000 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
1001 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
1002 0 : return false;
1003 0 : }
1004 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1005 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
1006 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1007 : // But the current logic for keeping responses in order does not support that.
1008 0 : return false;
1009 0 : }
1010 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
1011 0 : // as the effective request_lsn changes
1012 0 : if *accum_lsn != this_lsn {
1013 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
1014 0 : return false;
1015 0 : }
1016 0 : true
1017 : })() =>
1018 : {
1019 : // ok to batch
1020 : accum_pages.extend(this_pages);
1021 : Ok(())
1022 : }
1023 : #[cfg(feature = "testing")]
1024 : (
1025 : Ok(BatchedFeMessage::Test {
1026 : shard: accum_shard,
1027 : requests: accum_requests,
1028 : ..
1029 : }),
1030 : BatchedFeMessage::Test {
1031 : shard: this_shard,
1032 : requests: this_requests,
1033 : ..
1034 : },
1035 0 : ) if (|| {
1036 0 : assert!(this_requests.len() == 1);
1037 0 : if accum_requests.len() >= max_batch_size.get() {
1038 0 : trace!(%max_batch_size, "stopping batching because of batch size");
1039 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
1040 0 : return false;
1041 0 : }
1042 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1043 0 : trace!("stopping batching because timeline object mismatch");
1044 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1045 : // But the current logic for keeping responses in order does not support that.
1046 0 : return false;
1047 0 : }
1048 0 : let this_batch_key = this_requests[0].req.batch_key;
1049 0 : let accum_batch_key = accum_requests[0].req.batch_key;
1050 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
1051 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
1052 0 : return false;
1053 0 : }
1054 0 : true
1055 : })() =>
1056 : {
1057 : // ok to batch
1058 : accum_requests.extend(this_requests);
1059 : Ok(())
1060 : }
1061 : // something batched already but this message is unbatchable
1062 : (_, this_msg) => {
1063 : // by default, don't continue batching
1064 : Err(Ok(this_msg))
1065 : }
1066 : }
1067 : }
1068 :
1069 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1070 : async fn pagesteam_handle_batched_message<IO>(
1071 : &mut self,
1072 : pgb_writer: &mut PostgresBackend<IO>,
1073 : batch: BatchedFeMessage,
1074 : io_concurrency: IoConcurrency,
1075 : cancel: &CancellationToken,
1076 : protocol_version: PagestreamProtocolVersion,
1077 : ctx: &RequestContext,
1078 : ) -> Result<(), QueryError>
1079 : where
1080 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1081 : {
1082 : let started_at = Instant::now();
1083 : let batch = {
1084 : let mut batch = batch;
1085 : batch.observe_execution_start(started_at);
1086 : batch
1087 : };
1088 :
1089 : // invoke handler function
1090 : let (mut handler_results, span): (
1091 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1092 : _,
1093 : ) = match batch {
1094 : BatchedFeMessage::Exists {
1095 : span,
1096 : timer,
1097 : shard,
1098 : req,
1099 : } => {
1100 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1101 : (
1102 : vec![self
1103 : .handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
1104 : .instrument(span.clone())
1105 : .await
1106 0 : .map(|msg| (msg, timer))
1107 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1108 : span,
1109 : )
1110 : }
1111 : BatchedFeMessage::Nblocks {
1112 : span,
1113 : timer,
1114 : shard,
1115 : req,
1116 : } => {
1117 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1118 : (
1119 : vec![self
1120 : .handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
1121 : .instrument(span.clone())
1122 : .await
1123 0 : .map(|msg| (msg, timer))
1124 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1125 : span,
1126 : )
1127 : }
1128 : BatchedFeMessage::GetPage {
1129 : span,
1130 : shard,
1131 : effective_request_lsn,
1132 : pages,
1133 : } => {
1134 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1135 : (
1136 : {
1137 : let npages = pages.len();
1138 : trace!(npages, "handling getpage request");
1139 : let res = self
1140 : .handle_get_page_at_lsn_request_batched(
1141 : &*shard.upgrade()?,
1142 : effective_request_lsn,
1143 : pages,
1144 : io_concurrency,
1145 : ctx,
1146 : )
1147 : .instrument(span.clone())
1148 : .await;
1149 : assert_eq!(res.len(), npages);
1150 : res
1151 : },
1152 : span,
1153 : )
1154 : }
1155 : BatchedFeMessage::DbSize {
1156 : span,
1157 : timer,
1158 : shard,
1159 : req,
1160 : } => {
1161 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1162 : (
1163 : vec![self
1164 : .handle_db_size_request(&*shard.upgrade()?, &req, ctx)
1165 : .instrument(span.clone())
1166 : .await
1167 0 : .map(|msg| (msg, timer))
1168 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1169 : span,
1170 : )
1171 : }
1172 : BatchedFeMessage::GetSlruSegment {
1173 : span,
1174 : timer,
1175 : shard,
1176 : req,
1177 : } => {
1178 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1179 : (
1180 : vec![self
1181 : .handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
1182 : .instrument(span.clone())
1183 : .await
1184 0 : .map(|msg| (msg, timer))
1185 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1186 : span,
1187 : )
1188 : }
1189 : #[cfg(feature = "testing")]
1190 : BatchedFeMessage::Test {
1191 : span,
1192 : shard,
1193 : requests,
1194 : } => {
1195 : fail::fail_point!("ps::handle-pagerequest-message::test");
1196 : (
1197 : {
1198 : let npages = requests.len();
1199 : trace!(npages, "handling getpage request");
1200 : let res = self
1201 : .handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
1202 : .instrument(span.clone())
1203 : .await;
1204 : assert_eq!(res.len(), npages);
1205 : res
1206 : },
1207 : span,
1208 : )
1209 : }
1210 : BatchedFeMessage::RespondError { span, error } => {
1211 : // We've already decided to respond with an error, so we don't need to
1212 : // call the handler.
1213 : (vec![Err(error)], span)
1214 : }
1215 : };
1216 :
1217 : // We purposefully don't count flush time into the smgr operation timer.
1218 : //
1219 : // The reason is that current compute client will not perform protocol processing
1220 : // if the postgres backend process is doing things other than `->smgr_read()`.
1221 : // This is especially the case for prefetch.
1222 : //
1223 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1224 : // all the way into our flush call below.
1225 : //
1226 : // The timer's underlying metric is used for a storage-internal latency SLO and
1227 : // we don't want to include latency in it that we can't control.
1228 : // And as pointed out above, in this case, we don't control the time that flush will take.
1229 : //
1230 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1231 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1232 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1233 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1234 : let flush_timers = {
1235 : let flushing_start_time = Instant::now();
1236 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1237 : for handler_result in &mut handler_results {
1238 : let flush_timer = match handler_result {
1239 : Ok((_, timer)) => Some(
1240 : timer
1241 : .observe_execution_end(flushing_start_time)
1242 : .expect("we are the first caller"),
1243 : ),
1244 : Err(_) => {
1245 : // TODO: measure errors
1246 : None
1247 : }
1248 : };
1249 : flush_timers.push(flush_timer);
1250 : }
1251 : assert_eq!(flush_timers.len(), handler_results.len());
1252 : flush_timers
1253 : };
1254 :
1255 : // Map handler result to protocol behavior.
1256 : // Some handler errors cause exit from pagestream protocol.
1257 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1258 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1259 : let response_msg = match handler_result {
1260 : Err(e) => match &e.err {
1261 : PageStreamError::Shutdown => {
1262 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1263 : // shutdown, then do not send the error to the client. Instead just drop the
1264 : // connection.
1265 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1266 : return Err(QueryError::Shutdown);
1267 : }
1268 : PageStreamError::Reconnect(reason) => {
1269 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1270 : return Err(QueryError::Reconnect);
1271 : }
1272 : PageStreamError::Read(_)
1273 : | PageStreamError::LsnTimeout(_)
1274 : | PageStreamError::NotFound(_)
1275 : | PageStreamError::BadRequest(_) => {
1276 : // print the all details to the log with {:#}, but for the client the
1277 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1278 : // here includes cancellation which is not an error.
1279 : let full = utils::error::report_compact_sources(&e.err);
1280 0 : span.in_scope(|| {
1281 0 : error!("error reading relation or page version: {full:#}")
1282 0 : });
1283 :
1284 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1285 : req: e.req,
1286 : message: e.err.to_string(),
1287 : })
1288 : }
1289 : },
1290 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1291 : };
1292 :
1293 : //
1294 : // marshal & transmit response message
1295 : //
1296 :
1297 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1298 : &response_msg.serialize(protocol_version),
1299 : ))?;
1300 :
1301 : // what we want to do
1302 : let socket_fd = pgb_writer.socket_fd;
1303 : let flush_fut = pgb_writer.flush();
1304 : // metric for how long flushing takes
1305 : let flush_fut = match flushing_timer {
1306 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1307 : Instant::now(),
1308 : flush_fut,
1309 : socket_fd,
1310 : )),
1311 : None => futures::future::Either::Right(flush_fut),
1312 : };
1313 : // do it while respecting cancellation
1314 0 : let _: () = async move {
1315 0 : tokio::select! {
1316 : biased;
1317 0 : _ = cancel.cancelled() => {
1318 : // We were requested to shut down.
1319 0 : info!("shutdown request received in page handler");
1320 0 : return Err(QueryError::Shutdown)
1321 : }
1322 0 : res = flush_fut => {
1323 0 : res?;
1324 : }
1325 : }
1326 0 : Ok(())
1327 0 : }
1328 : .await?;
1329 : }
1330 : Ok(())
1331 : }
1332 :
1333 : /// Pagestream sub-protocol handler.
1334 : ///
1335 : /// It is a simple request-response protocol inside a COPYBOTH session.
1336 : ///
1337 : /// # Coding Discipline
1338 : ///
1339 : /// Coding discipline within this function: all interaction with the `pgb` connection
1340 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1341 : /// This is so that we can shutdown page_service quickly.
1342 : #[instrument(skip_all)]
1343 : async fn handle_pagerequests<IO>(
1344 : &mut self,
1345 : pgb: &mut PostgresBackend<IO>,
1346 : tenant_id: TenantId,
1347 : timeline_id: TimelineId,
1348 : protocol_version: PagestreamProtocolVersion,
1349 : ctx: RequestContext,
1350 : ) -> Result<(), QueryError>
1351 : where
1352 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1353 : {
1354 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1355 :
1356 : // switch client to COPYBOTH
1357 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1358 : tokio::select! {
1359 : biased;
1360 : _ = self.cancel.cancelled() => {
1361 : return Err(QueryError::Shutdown)
1362 : }
1363 : res = pgb.flush() => {
1364 : res?;
1365 : }
1366 : }
1367 :
1368 : let io_concurrency = IoConcurrency::spawn_from_conf(
1369 : self.conf,
1370 : match self.gate_guard.try_clone() {
1371 : Ok(guard) => guard,
1372 : Err(_) => {
1373 : info!("shutdown request received in page handler");
1374 : return Err(QueryError::Shutdown);
1375 : }
1376 : },
1377 : );
1378 :
1379 : let pgb_reader = pgb
1380 : .split()
1381 : .context("implementation error: split pgb into reader and writer")?;
1382 :
1383 : let timeline_handles = self
1384 : .timeline_handles
1385 : .take()
1386 : .expect("implementation error: timeline_handles should not be locked");
1387 :
1388 : let request_span = info_span!("request");
1389 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1390 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1391 : self.handle_pagerequests_pipelined(
1392 : pgb,
1393 : pgb_reader,
1394 : tenant_id,
1395 : timeline_id,
1396 : timeline_handles,
1397 : request_span,
1398 : pipelining_config,
1399 : protocol_version,
1400 : io_concurrency,
1401 : &ctx,
1402 : )
1403 : .await
1404 : }
1405 : PageServicePipeliningConfig::Serial => {
1406 : self.handle_pagerequests_serial(
1407 : pgb,
1408 : pgb_reader,
1409 : tenant_id,
1410 : timeline_id,
1411 : timeline_handles,
1412 : request_span,
1413 : protocol_version,
1414 : io_concurrency,
1415 : &ctx,
1416 : )
1417 : .await
1418 : }
1419 : };
1420 :
1421 : debug!("pagestream subprotocol shut down cleanly");
1422 :
1423 : pgb.unsplit(pgb_reader)
1424 : .context("implementation error: unsplit pgb")?;
1425 :
1426 : let replaced = self.timeline_handles.replace(timeline_handles);
1427 : assert!(replaced.is_none());
1428 :
1429 : result
1430 : }
1431 :
1432 : #[allow(clippy::too_many_arguments)]
1433 0 : async fn handle_pagerequests_serial<IO>(
1434 0 : &mut self,
1435 0 : pgb_writer: &mut PostgresBackend<IO>,
1436 0 : mut pgb_reader: PostgresBackendReader<IO>,
1437 0 : tenant_id: TenantId,
1438 0 : timeline_id: TimelineId,
1439 0 : mut timeline_handles: TimelineHandles,
1440 0 : request_span: Span,
1441 0 : protocol_version: PagestreamProtocolVersion,
1442 0 : io_concurrency: IoConcurrency,
1443 0 : ctx: &RequestContext,
1444 0 : ) -> (
1445 0 : (PostgresBackendReader<IO>, TimelineHandles),
1446 0 : Result<(), QueryError>,
1447 0 : )
1448 0 : where
1449 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1450 0 : {
1451 0 : let cancel = self.cancel.clone();
1452 0 : let err = loop {
1453 0 : let msg = Self::pagestream_read_message(
1454 0 : &mut pgb_reader,
1455 0 : tenant_id,
1456 0 : timeline_id,
1457 0 : &mut timeline_handles,
1458 0 : &cancel,
1459 0 : ctx,
1460 0 : protocol_version,
1461 0 : request_span.clone(),
1462 0 : )
1463 0 : .await;
1464 0 : let msg = match msg {
1465 0 : Ok(msg) => msg,
1466 0 : Err(e) => break e,
1467 : };
1468 0 : let msg = match msg {
1469 0 : Some(msg) => msg,
1470 : None => {
1471 0 : debug!("pagestream subprotocol end observed");
1472 0 : return ((pgb_reader, timeline_handles), Ok(()));
1473 : }
1474 : };
1475 :
1476 0 : let result = warn_slow(
1477 0 : msg.as_static_str(),
1478 0 : WARN_SLOW_GETPAGE_THRESHOLD,
1479 0 : self.pagesteam_handle_batched_message(
1480 0 : pgb_writer,
1481 0 : msg,
1482 0 : io_concurrency.clone(),
1483 0 : &cancel,
1484 0 : protocol_version,
1485 0 : ctx,
1486 0 : ),
1487 0 : )
1488 0 : .await;
1489 0 : match result {
1490 0 : Ok(()) => {}
1491 0 : Err(e) => break e,
1492 : }
1493 : };
1494 0 : ((pgb_reader, timeline_handles), Err(err))
1495 0 : }
1496 :
1497 : /// # Cancel-Safety
1498 : ///
1499 : /// May leak tokio tasks if not polled to completion.
1500 : #[allow(clippy::too_many_arguments)]
1501 0 : async fn handle_pagerequests_pipelined<IO>(
1502 0 : &mut self,
1503 0 : pgb_writer: &mut PostgresBackend<IO>,
1504 0 : pgb_reader: PostgresBackendReader<IO>,
1505 0 : tenant_id: TenantId,
1506 0 : timeline_id: TimelineId,
1507 0 : mut timeline_handles: TimelineHandles,
1508 0 : request_span: Span,
1509 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1510 0 : protocol_version: PagestreamProtocolVersion,
1511 0 : io_concurrency: IoConcurrency,
1512 0 : ctx: &RequestContext,
1513 0 : ) -> (
1514 0 : (PostgresBackendReader<IO>, TimelineHandles),
1515 0 : Result<(), QueryError>,
1516 0 : )
1517 0 : where
1518 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1519 0 : {
1520 0 : //
1521 0 : // Pipelined pagestream handling consists of
1522 0 : // - a Batcher that reads requests off the wire and
1523 0 : // and batches them if possible,
1524 0 : // - an Executor that processes the batched requests.
1525 0 : //
1526 0 : // The batch is built up inside an `spsc_fold` channel,
1527 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1528 0 : //
1529 0 : // The Batcher continously folds client requests into the batch,
1530 0 : // while the Executor can at any time take out what's in the batch
1531 0 : // in order to process it.
1532 0 : // This means the next batch builds up while the Executor
1533 0 : // executes the last batch.
1534 0 : //
1535 0 : // CANCELLATION
1536 0 : //
1537 0 : // We run both Batcher and Executor futures to completion before
1538 0 : // returning from this function.
1539 0 : //
1540 0 : // If Executor exits first, it signals cancellation to the Batcher
1541 0 : // via a CancellationToken that is child of `self.cancel`.
1542 0 : // If Batcher exits first, it signals cancellation to the Executor
1543 0 : // by dropping the spsc_fold channel Sender.
1544 0 : //
1545 0 : // CLEAN SHUTDOWN
1546 0 : //
1547 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1548 0 : // In response to such a client message, the Batcher exits.
1549 0 : // The Executor continues to run, draining the spsc_fold channel.
1550 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1551 0 : // indicating that the sender disconnected.
1552 0 : // The Executor exits with Ok(()) in response to that error.
1553 0 : //
1554 0 : // Server initiated shutdown is not clean shutdown, but instead
1555 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1556 0 : // error propagation.
1557 0 : //
1558 0 : // ERROR PROPAGATION
1559 0 : //
1560 0 : // When the Batcher encounter an error, it sends it as a value
1561 0 : // through the spsc_fold channel and exits afterwards.
1562 0 : // When the Executor observes such an error in the channel,
1563 0 : // it exits returning that error value.
1564 0 : //
1565 0 : // This design ensures that the Executor stage will still process
1566 0 : // the batch that was in flight when the Batcher encountered an error,
1567 0 : // thereby beahving identical to a serial implementation.
1568 0 :
1569 0 : let PageServicePipeliningConfigPipelined {
1570 0 : max_batch_size,
1571 0 : execution,
1572 0 : } = pipelining_config;
1573 :
1574 : // Macro to _define_ a pipeline stage.
1575 : macro_rules! pipeline_stage {
1576 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1577 : let cancel: CancellationToken = $cancel;
1578 : let stage_fut = $make_fut(cancel.clone());
1579 0 : async move {
1580 0 : scopeguard::defer! {
1581 0 : debug!("exiting");
1582 0 : }
1583 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1584 0 : .await
1585 0 : }
1586 : .instrument(tracing::info_span!($name))
1587 : }};
1588 : }
1589 :
1590 : //
1591 : // Batcher
1592 : //
1593 :
1594 0 : let cancel_batcher = self.cancel.child_token();
1595 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1596 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1597 0 : let ctx = ctx.attached_child();
1598 0 : async move {
1599 0 : let mut pgb_reader = pgb_reader;
1600 0 : let mut exit = false;
1601 0 : while !exit {
1602 0 : let read_res = Self::pagestream_read_message(
1603 0 : &mut pgb_reader,
1604 0 : tenant_id,
1605 0 : timeline_id,
1606 0 : &mut timeline_handles,
1607 0 : &cancel_batcher,
1608 0 : &ctx,
1609 0 : protocol_version,
1610 0 : request_span.clone(),
1611 0 : )
1612 0 : .await;
1613 0 : let Some(read_res) = read_res.transpose() else {
1614 0 : debug!("client-initiated shutdown");
1615 0 : break;
1616 : };
1617 0 : exit |= read_res.is_err();
1618 0 : let could_send = batch_tx
1619 0 : .send(read_res, |batch, res| {
1620 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1621 0 : })
1622 0 : .await;
1623 0 : exit |= could_send.is_err();
1624 : }
1625 0 : (pgb_reader, timeline_handles)
1626 0 : }
1627 0 : });
1628 :
1629 : //
1630 : // Executor
1631 : //
1632 :
1633 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1634 0 : let ctx = ctx.attached_child();
1635 0 : async move {
1636 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1637 : loop {
1638 0 : let maybe_batch = batch_rx.recv().await;
1639 0 : let batch = match maybe_batch {
1640 0 : Ok(batch) => batch,
1641 : Err(spsc_fold::RecvError::SenderGone) => {
1642 0 : debug!("upstream gone");
1643 0 : return Ok(());
1644 : }
1645 : };
1646 0 : let batch = match batch {
1647 0 : Ok(batch) => batch,
1648 0 : Err(e) => {
1649 0 : return Err(e);
1650 : }
1651 : };
1652 0 : warn_slow(
1653 0 : batch.as_static_str(),
1654 0 : WARN_SLOW_GETPAGE_THRESHOLD,
1655 0 : self.pagesteam_handle_batched_message(
1656 0 : pgb_writer,
1657 0 : batch,
1658 0 : io_concurrency.clone(),
1659 0 : &cancel,
1660 0 : protocol_version,
1661 0 : &ctx,
1662 0 : ),
1663 0 : )
1664 0 : .await?;
1665 : }
1666 0 : }
1667 0 : });
1668 :
1669 : //
1670 : // Execute the stages.
1671 : //
1672 :
1673 0 : match execution {
1674 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1675 0 : tokio::join!(batcher, executor)
1676 : }
1677 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1678 : // These tasks are not tracked anywhere.
1679 0 : let read_messages_task = tokio::spawn(batcher);
1680 0 : let (read_messages_task_res, executor_res_) =
1681 0 : tokio::join!(read_messages_task, executor,);
1682 0 : (
1683 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1684 0 : executor_res_,
1685 0 : )
1686 : }
1687 : }
1688 0 : }
1689 :
1690 : /// Helper function to handle the LSN from client request.
1691 : ///
1692 : /// Each GetPage (and Exists and Nblocks) request includes information about
1693 : /// which version of the page is being requested. The primary compute node
1694 : /// will always request the latest page version, by setting 'request_lsn' to
1695 : /// the last inserted or flushed WAL position, while a standby will request
1696 : /// a version at the LSN that it's currently caught up to.
1697 : ///
1698 : /// In either case, if the page server hasn't received the WAL up to the
1699 : /// requested LSN yet, we will wait for it to arrive. The return value is
1700 : /// the LSN that should be used to look up the page versions.
1701 : ///
1702 : /// In addition to the request LSN, each request carries another LSN,
1703 : /// 'not_modified_since', which is a hint to the pageserver that the client
1704 : /// knows that the page has not been modified between 'not_modified_since'
1705 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1706 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1707 : /// information about when the page was modified, it will use
1708 : /// not_modified_since == lsn. If the client lies and sends a too low
1709 : /// not_modified_hint such that there are in fact later page versions, the
1710 : /// behavior is undefined: the pageserver may return any of the page versions
1711 : /// or an error.
1712 0 : async fn wait_or_get_last_lsn(
1713 0 : timeline: &Timeline,
1714 0 : request_lsn: Lsn,
1715 0 : not_modified_since: Lsn,
1716 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1717 0 : ctx: &RequestContext,
1718 0 : ) -> Result<Lsn, PageStreamError> {
1719 0 : let last_record_lsn = timeline.get_last_record_lsn();
1720 0 :
1721 0 : // Sanity check the request
1722 0 : if request_lsn < not_modified_since {
1723 0 : return Err(PageStreamError::BadRequest(
1724 0 : format!(
1725 0 : "invalid request with request LSN {} and not_modified_since {}",
1726 0 : request_lsn, not_modified_since,
1727 0 : )
1728 0 : .into(),
1729 0 : ));
1730 0 : }
1731 0 :
1732 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1733 0 : if request_lsn == Lsn::INVALID {
1734 0 : return Err(PageStreamError::BadRequest(
1735 0 : "invalid LSN(0) in request".into(),
1736 0 : ));
1737 0 : }
1738 0 :
1739 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1740 0 : //
1741 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1742 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1743 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1744 0 : let gc_info = &timeline.gc_info.read().unwrap();
1745 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
1746 0 : return Err(
1747 0 : PageStreamError::BadRequest(format!(
1748 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1749 0 : request_lsn, **latest_gc_cutoff_lsn
1750 0 : ).into())
1751 0 : );
1752 0 : }
1753 0 : }
1754 :
1755 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1756 0 : if not_modified_since > last_record_lsn {
1757 0 : timeline
1758 0 : .wait_lsn(
1759 0 : not_modified_since,
1760 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1761 0 : timeline::WaitLsnTimeout::Default,
1762 0 : ctx,
1763 0 : )
1764 0 : .await?;
1765 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1766 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1767 : // advance immediately after we return anyway)
1768 0 : Ok(not_modified_since)
1769 : } else {
1770 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1771 : // here instead. That would give the same result, since we know that there
1772 : // haven't been any modifications since 'not_modified_since'. Using an older
1773 : // LSN might be faster, because that could allow skipping recent layers when
1774 : // finding the page. However, we have historically used 'last_record_lsn', so
1775 : // stick to that for now.
1776 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1777 : }
1778 0 : }
1779 :
1780 : /// Handles the lsn lease request.
1781 : /// If a lease cannot be obtained, the client will receive NULL.
1782 : #[instrument(skip_all, fields(shard_id, %lsn))]
1783 : async fn handle_make_lsn_lease<IO>(
1784 : &mut self,
1785 : pgb: &mut PostgresBackend<IO>,
1786 : tenant_shard_id: TenantShardId,
1787 : timeline_id: TimelineId,
1788 : lsn: Lsn,
1789 : ctx: &RequestContext,
1790 : ) -> Result<(), QueryError>
1791 : where
1792 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1793 : {
1794 : let timeline = self
1795 : .timeline_handles
1796 : .as_mut()
1797 : .unwrap()
1798 : .get(
1799 : tenant_shard_id.tenant_id,
1800 : timeline_id,
1801 : ShardSelector::Known(tenant_shard_id.to_index()),
1802 : )
1803 : .await?;
1804 : set_tracing_field_shard_id(&timeline);
1805 :
1806 : let lease = timeline
1807 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1808 0 : .inspect_err(|e| {
1809 0 : warn!("{e}");
1810 0 : })
1811 : .ok();
1812 0 : let valid_until_str = lease.map(|l| {
1813 0 : l.valid_until
1814 0 : .duration_since(SystemTime::UNIX_EPOCH)
1815 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1816 0 : .as_millis()
1817 0 : .to_string()
1818 0 : });
1819 :
1820 : info!(
1821 : "acquired lease for {} until {}",
1822 : lsn,
1823 : valid_until_str.as_deref().unwrap_or("<unknown>")
1824 : );
1825 :
1826 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1827 :
1828 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1829 : b"valid_until",
1830 : )]))?
1831 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1832 :
1833 : Ok(())
1834 : }
1835 :
1836 : #[instrument(skip_all, fields(shard_id))]
1837 : async fn handle_get_rel_exists_request(
1838 : &mut self,
1839 : timeline: &Timeline,
1840 : req: &PagestreamExistsRequest,
1841 : ctx: &RequestContext,
1842 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1843 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1844 : let lsn = Self::wait_or_get_last_lsn(
1845 : timeline,
1846 : req.hdr.request_lsn,
1847 : req.hdr.not_modified_since,
1848 : &latest_gc_cutoff_lsn,
1849 : ctx,
1850 : )
1851 : .await?;
1852 :
1853 : let exists = timeline
1854 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1855 : .await?;
1856 :
1857 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1858 : req: *req,
1859 : exists,
1860 : }))
1861 : }
1862 :
1863 : #[instrument(skip_all, fields(shard_id))]
1864 : async fn handle_get_nblocks_request(
1865 : &mut self,
1866 : timeline: &Timeline,
1867 : req: &PagestreamNblocksRequest,
1868 : ctx: &RequestContext,
1869 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1870 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1871 : let lsn = Self::wait_or_get_last_lsn(
1872 : timeline,
1873 : req.hdr.request_lsn,
1874 : req.hdr.not_modified_since,
1875 : &latest_gc_cutoff_lsn,
1876 : ctx,
1877 : )
1878 : .await?;
1879 :
1880 : let n_blocks = timeline
1881 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1882 : .await?;
1883 :
1884 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1885 : req: *req,
1886 : n_blocks,
1887 : }))
1888 : }
1889 :
1890 : #[instrument(skip_all, fields(shard_id))]
1891 : async fn handle_db_size_request(
1892 : &mut self,
1893 : timeline: &Timeline,
1894 : req: &PagestreamDbSizeRequest,
1895 : ctx: &RequestContext,
1896 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1897 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1898 : let lsn = Self::wait_or_get_last_lsn(
1899 : timeline,
1900 : req.hdr.request_lsn,
1901 : req.hdr.not_modified_since,
1902 : &latest_gc_cutoff_lsn,
1903 : ctx,
1904 : )
1905 : .await?;
1906 :
1907 : let total_blocks = timeline
1908 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1909 : .await?;
1910 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1911 :
1912 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1913 : req: *req,
1914 : db_size,
1915 : }))
1916 : }
1917 :
1918 : #[instrument(skip_all)]
1919 : async fn handle_get_page_at_lsn_request_batched(
1920 : &mut self,
1921 : timeline: &Timeline,
1922 : effective_lsn: Lsn,
1923 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
1924 : io_concurrency: IoConcurrency,
1925 : ctx: &RequestContext,
1926 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1927 : debug_assert_current_span_has_tenant_and_timeline_id();
1928 :
1929 : timeline
1930 : .query_metrics
1931 : .observe_getpage_batch_start(requests.len());
1932 :
1933 : // If a page trace is running, submit an event for this request.
1934 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
1935 : let time = SystemTime::now();
1936 : for batch in &requests {
1937 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
1938 : // Ignore error (trace buffer may be full or tracer may have disconnected).
1939 : _ = page_trace.try_send(PageTraceEvent {
1940 : key,
1941 : effective_lsn,
1942 : time,
1943 : });
1944 : }
1945 : }
1946 :
1947 : let results = timeline
1948 : .get_rel_page_at_lsn_batched(
1949 0 : requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
1950 : effective_lsn,
1951 : io_concurrency,
1952 : ctx,
1953 : )
1954 : .await;
1955 : assert_eq!(results.len(), requests.len());
1956 :
1957 : // TODO: avoid creating the new Vec here
1958 : Vec::from_iter(
1959 : requests
1960 : .into_iter()
1961 : .zip(results.into_iter())
1962 0 : .map(|(req, res)| {
1963 0 : res.map(|page| {
1964 0 : (
1965 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
1966 0 : req: req.req,
1967 0 : page,
1968 0 : }),
1969 0 : req.timer,
1970 0 : )
1971 0 : })
1972 0 : .map_err(|e| BatchedPageStreamError {
1973 0 : err: PageStreamError::from(e),
1974 0 : req: req.req.hdr,
1975 0 : })
1976 0 : }),
1977 : )
1978 : }
1979 :
1980 : #[instrument(skip_all, fields(shard_id))]
1981 : async fn handle_get_slru_segment_request(
1982 : &mut self,
1983 : timeline: &Timeline,
1984 : req: &PagestreamGetSlruSegmentRequest,
1985 : ctx: &RequestContext,
1986 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1987 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1988 : let lsn = Self::wait_or_get_last_lsn(
1989 : timeline,
1990 : req.hdr.request_lsn,
1991 : req.hdr.not_modified_since,
1992 : &latest_gc_cutoff_lsn,
1993 : ctx,
1994 : )
1995 : .await?;
1996 :
1997 : let kind = SlruKind::from_repr(req.kind)
1998 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1999 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2000 :
2001 : Ok(PagestreamBeMessage::GetSlruSegment(
2002 : PagestreamGetSlruSegmentResponse { req: *req, segment },
2003 : ))
2004 : }
2005 :
2006 : // NB: this impl mimics what we do for batched getpage requests.
2007 : #[cfg(feature = "testing")]
2008 : #[instrument(skip_all, fields(shard_id))]
2009 : async fn handle_test_request_batch(
2010 : &mut self,
2011 : timeline: &Timeline,
2012 : requests: Vec<BatchedTestRequest>,
2013 : _ctx: &RequestContext,
2014 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2015 : // real requests would do something with the timeline
2016 : let mut results = Vec::with_capacity(requests.len());
2017 : for _req in requests.iter() {
2018 : tokio::task::yield_now().await;
2019 :
2020 : results.push({
2021 : if timeline.cancel.is_cancelled() {
2022 : Err(PageReconstructError::Cancelled)
2023 : } else {
2024 : Ok(())
2025 : }
2026 : });
2027 : }
2028 :
2029 : // TODO: avoid creating the new Vec here
2030 : Vec::from_iter(
2031 : requests
2032 : .into_iter()
2033 : .zip(results.into_iter())
2034 0 : .map(|(req, res)| {
2035 0 : res.map(|()| {
2036 0 : (
2037 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2038 0 : req: req.req.clone(),
2039 0 : }),
2040 0 : req.timer,
2041 0 : )
2042 0 : })
2043 0 : .map_err(|e| BatchedPageStreamError {
2044 0 : err: PageStreamError::from(e),
2045 0 : req: req.req.hdr,
2046 0 : })
2047 0 : }),
2048 : )
2049 : }
2050 :
2051 : /// Note on "fullbackup":
2052 : /// Full basebackups should only be used for debugging purposes.
2053 : /// Originally, it was introduced to enable breaking storage format changes,
2054 : /// but that is not applicable anymore.
2055 : ///
2056 : /// # Coding Discipline
2057 : ///
2058 : /// Coding discipline within this function: all interaction with the `pgb` connection
2059 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2060 : /// This is so that we can shutdown page_service quickly.
2061 : ///
2062 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2063 : /// to connection cancellation.
2064 : #[allow(clippy::too_many_arguments)]
2065 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2066 : async fn handle_basebackup_request<IO>(
2067 : &mut self,
2068 : pgb: &mut PostgresBackend<IO>,
2069 : tenant_id: TenantId,
2070 : timeline_id: TimelineId,
2071 : lsn: Option<Lsn>,
2072 : prev_lsn: Option<Lsn>,
2073 : full_backup: bool,
2074 : gzip: bool,
2075 : replica: bool,
2076 : ctx: &RequestContext,
2077 : ) -> Result<(), QueryError>
2078 : where
2079 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2080 : {
2081 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2082 0 : match err {
2083 : // TODO: passthrough the error site to the final error message?
2084 0 : BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
2085 0 : BasebackupError::Server(e) => QueryError::Other(e),
2086 : }
2087 0 : }
2088 :
2089 : let started = std::time::Instant::now();
2090 :
2091 : let timeline = self
2092 : .timeline_handles
2093 : .as_mut()
2094 : .unwrap()
2095 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2096 : .await?;
2097 : set_tracing_field_shard_id(&timeline);
2098 :
2099 : if timeline.is_archived() == Some(true) {
2100 : // TODO after a grace period, turn this log line into a hard error
2101 : tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
2102 : //return Err(QueryError::NotFound("timeline is archived".into()))
2103 : }
2104 :
2105 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2106 : if let Some(lsn) = lsn {
2107 : // Backup was requested at a particular LSN. Wait for it to arrive.
2108 : info!("waiting for {}", lsn);
2109 : timeline
2110 : .wait_lsn(
2111 : lsn,
2112 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2113 : crate::tenant::timeline::WaitLsnTimeout::Default,
2114 : ctx,
2115 : )
2116 : .await?;
2117 : timeline
2118 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2119 : .context("invalid basebackup lsn")?;
2120 : }
2121 :
2122 : let lsn_awaited_after = started.elapsed();
2123 :
2124 : // switch client to COPYOUT
2125 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2126 : .map_err(QueryError::Disconnected)?;
2127 : self.flush_cancellable(pgb, &self.cancel).await?;
2128 :
2129 : // Send a tarball of the latest layer on the timeline. Compress if not
2130 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2131 : if full_backup {
2132 : let mut writer = pgb.copyout_writer();
2133 : basebackup::send_basebackup_tarball(
2134 : &mut writer,
2135 : &timeline,
2136 : lsn,
2137 : prev_lsn,
2138 : full_backup,
2139 : replica,
2140 : ctx,
2141 : )
2142 : .await
2143 : .map_err(map_basebackup_error)?;
2144 : } else {
2145 : let mut writer = BufWriter::new(pgb.copyout_writer());
2146 : if gzip {
2147 : let mut encoder = GzipEncoder::with_quality(
2148 : &mut writer,
2149 : // NOTE using fast compression because it's on the critical path
2150 : // for compute startup. For an empty database, we get
2151 : // <100KB with this method. The Level::Best compression method
2152 : // gives us <20KB, but maybe we should add basebackup caching
2153 : // on compute shutdown first.
2154 : async_compression::Level::Fastest,
2155 : );
2156 : basebackup::send_basebackup_tarball(
2157 : &mut encoder,
2158 : &timeline,
2159 : lsn,
2160 : prev_lsn,
2161 : full_backup,
2162 : replica,
2163 : ctx,
2164 : )
2165 : .await
2166 : .map_err(map_basebackup_error)?;
2167 : // shutdown the encoder to ensure the gzip footer is written
2168 : encoder
2169 : .shutdown()
2170 : .await
2171 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2172 : } else {
2173 : basebackup::send_basebackup_tarball(
2174 : &mut writer,
2175 : &timeline,
2176 : lsn,
2177 : prev_lsn,
2178 : full_backup,
2179 : replica,
2180 : ctx,
2181 : )
2182 : .await
2183 : .map_err(map_basebackup_error)?;
2184 : }
2185 0 : writer.flush().await.map_err(|e| {
2186 0 : map_basebackup_error(BasebackupError::Client(
2187 0 : e,
2188 0 : "handle_basebackup_request,flush",
2189 0 : ))
2190 0 : })?;
2191 : }
2192 :
2193 : pgb.write_message_noflush(&BeMessage::CopyDone)
2194 : .map_err(QueryError::Disconnected)?;
2195 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2196 :
2197 : let basebackup_after = started
2198 : .elapsed()
2199 : .checked_sub(lsn_awaited_after)
2200 : .unwrap_or(Duration::ZERO);
2201 :
2202 : info!(
2203 : lsn_await_millis = lsn_awaited_after.as_millis(),
2204 : basebackup_millis = basebackup_after.as_millis(),
2205 : "basebackup complete"
2206 : );
2207 :
2208 : Ok(())
2209 : }
2210 :
2211 : // when accessing management api supply None as an argument
2212 : // when using to authorize tenant pass corresponding tenant id
2213 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2214 0 : if self.auth.is_none() {
2215 : // auth is set to Trust, nothing to check so just return ok
2216 0 : return Ok(());
2217 0 : }
2218 0 : // auth is some, just checked above, when auth is some
2219 0 : // then claims are always present because of checks during connection init
2220 0 : // so this expect won't trigger
2221 0 : let claims = self
2222 0 : .claims
2223 0 : .as_ref()
2224 0 : .expect("claims presence already checked");
2225 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2226 0 : }
2227 : }
2228 :
2229 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2230 : #[derive(Debug, Clone, Eq, PartialEq)]
2231 : struct BaseBackupCmd {
2232 : tenant_id: TenantId,
2233 : timeline_id: TimelineId,
2234 : lsn: Option<Lsn>,
2235 : gzip: bool,
2236 : replica: bool,
2237 : }
2238 :
2239 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2240 : #[derive(Debug, Clone, Eq, PartialEq)]
2241 : struct FullBackupCmd {
2242 : tenant_id: TenantId,
2243 : timeline_id: TimelineId,
2244 : lsn: Option<Lsn>,
2245 : prev_lsn: Option<Lsn>,
2246 : }
2247 :
2248 : /// `pagestream_v2 tenant timeline`
2249 : #[derive(Debug, Clone, Eq, PartialEq)]
2250 : struct PageStreamCmd {
2251 : tenant_id: TenantId,
2252 : timeline_id: TimelineId,
2253 : protocol_version: PagestreamProtocolVersion,
2254 : }
2255 :
2256 : /// `lease lsn tenant timeline lsn`
2257 : #[derive(Debug, Clone, Eq, PartialEq)]
2258 : struct LeaseLsnCmd {
2259 : tenant_shard_id: TenantShardId,
2260 : timeline_id: TimelineId,
2261 : lsn: Lsn,
2262 : }
2263 :
2264 : #[derive(Debug, Clone, Eq, PartialEq)]
2265 : enum PageServiceCmd {
2266 : Set,
2267 : PageStream(PageStreamCmd),
2268 : BaseBackup(BaseBackupCmd),
2269 : FullBackup(FullBackupCmd),
2270 : LeaseLsn(LeaseLsnCmd),
2271 : }
2272 :
2273 : impl PageStreamCmd {
2274 12 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2275 12 : let parameters = query.split_whitespace().collect_vec();
2276 12 : if parameters.len() != 2 {
2277 4 : bail!(
2278 4 : "invalid number of parameters for pagestream command: {}",
2279 4 : query
2280 4 : );
2281 8 : }
2282 8 : let tenant_id = TenantId::from_str(parameters[0])
2283 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2284 4 : let timeline_id = TimelineId::from_str(parameters[1])
2285 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2286 4 : Ok(Self {
2287 4 : tenant_id,
2288 4 : timeline_id,
2289 4 : protocol_version,
2290 4 : })
2291 12 : }
2292 : }
2293 :
2294 : impl FullBackupCmd {
2295 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2296 8 : let parameters = query.split_whitespace().collect_vec();
2297 8 : if parameters.len() < 2 || parameters.len() > 4 {
2298 0 : bail!(
2299 0 : "invalid number of parameters for basebackup command: {}",
2300 0 : query
2301 0 : );
2302 8 : }
2303 8 : let tenant_id = TenantId::from_str(parameters[0])
2304 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2305 8 : let timeline_id = TimelineId::from_str(parameters[1])
2306 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2307 : // The caller is responsible for providing correct lsn and prev_lsn.
2308 8 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2309 : Some(
2310 4 : Lsn::from_str(lsn_str)
2311 4 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2312 : )
2313 : } else {
2314 4 : None
2315 : };
2316 8 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2317 : Some(
2318 4 : Lsn::from_str(prev_lsn_str)
2319 4 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2320 : )
2321 : } else {
2322 4 : None
2323 : };
2324 8 : Ok(Self {
2325 8 : tenant_id,
2326 8 : timeline_id,
2327 8 : lsn,
2328 8 : prev_lsn,
2329 8 : })
2330 8 : }
2331 : }
2332 :
2333 : impl BaseBackupCmd {
2334 36 : fn parse(query: &str) -> anyhow::Result<Self> {
2335 36 : let parameters = query.split_whitespace().collect_vec();
2336 36 : if parameters.len() < 2 {
2337 0 : bail!(
2338 0 : "invalid number of parameters for basebackup command: {}",
2339 0 : query
2340 0 : );
2341 36 : }
2342 36 : let tenant_id = TenantId::from_str(parameters[0])
2343 36 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2344 36 : let timeline_id = TimelineId::from_str(parameters[1])
2345 36 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2346 : let lsn;
2347 : let flags_parse_from;
2348 36 : if let Some(maybe_lsn) = parameters.get(2) {
2349 32 : if *maybe_lsn == "latest" {
2350 4 : lsn = None;
2351 4 : flags_parse_from = 3;
2352 28 : } else if maybe_lsn.starts_with("--") {
2353 20 : lsn = None;
2354 20 : flags_parse_from = 2;
2355 20 : } else {
2356 : lsn = Some(
2357 8 : Lsn::from_str(maybe_lsn)
2358 8 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2359 : );
2360 8 : flags_parse_from = 3;
2361 : }
2362 4 : } else {
2363 4 : lsn = None;
2364 4 : flags_parse_from = 2;
2365 4 : }
2366 :
2367 36 : let mut gzip = false;
2368 36 : let mut replica = false;
2369 :
2370 44 : for ¶m in ¶meters[flags_parse_from..] {
2371 44 : match param {
2372 44 : "--gzip" => {
2373 28 : if gzip {
2374 4 : bail!("duplicate parameter for basebackup command: {param}")
2375 24 : }
2376 24 : gzip = true
2377 : }
2378 16 : "--replica" => {
2379 8 : if replica {
2380 0 : bail!("duplicate parameter for basebackup command: {param}")
2381 8 : }
2382 8 : replica = true
2383 : }
2384 8 : _ => bail!("invalid parameter for basebackup command: {param}"),
2385 : }
2386 : }
2387 24 : Ok(Self {
2388 24 : tenant_id,
2389 24 : timeline_id,
2390 24 : lsn,
2391 24 : gzip,
2392 24 : replica,
2393 24 : })
2394 36 : }
2395 : }
2396 :
2397 : impl LeaseLsnCmd {
2398 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2399 8 : let parameters = query.split_whitespace().collect_vec();
2400 8 : if parameters.len() != 3 {
2401 0 : bail!(
2402 0 : "invalid number of parameters for lease lsn command: {}",
2403 0 : query
2404 0 : );
2405 8 : }
2406 8 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2407 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2408 8 : let timeline_id = TimelineId::from_str(parameters[1])
2409 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2410 8 : let lsn = Lsn::from_str(parameters[2])
2411 8 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2412 8 : Ok(Self {
2413 8 : tenant_shard_id,
2414 8 : timeline_id,
2415 8 : lsn,
2416 8 : })
2417 8 : }
2418 : }
2419 :
2420 : impl PageServiceCmd {
2421 84 : fn parse(query: &str) -> anyhow::Result<Self> {
2422 84 : let query = query.trim();
2423 84 : let Some((cmd, other)) = query.split_once(' ') else {
2424 8 : bail!("cannot parse query: {query}")
2425 : };
2426 76 : match cmd.to_ascii_lowercase().as_str() {
2427 76 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2428 12 : other,
2429 12 : PagestreamProtocolVersion::V2,
2430 12 : )?)),
2431 64 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2432 0 : other,
2433 0 : PagestreamProtocolVersion::V3,
2434 0 : )?)),
2435 64 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2436 28 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2437 20 : "lease" => {
2438 12 : let Some((cmd2, other)) = other.split_once(' ') else {
2439 0 : bail!("invalid lease command: {cmd}");
2440 : };
2441 12 : let cmd2 = cmd2.to_ascii_lowercase();
2442 12 : if cmd2 == "lsn" {
2443 8 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2444 : } else {
2445 4 : bail!("invalid lease command: {cmd}");
2446 : }
2447 : }
2448 8 : "set" => Ok(Self::Set),
2449 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2450 : }
2451 84 : }
2452 : }
2453 :
2454 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2455 : where
2456 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2457 : {
2458 0 : fn check_auth_jwt(
2459 0 : &mut self,
2460 0 : _pgb: &mut PostgresBackend<IO>,
2461 0 : jwt_response: &[u8],
2462 0 : ) -> Result<(), QueryError> {
2463 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2464 : // which requires auth to be present
2465 0 : let data = self
2466 0 : .auth
2467 0 : .as_ref()
2468 0 : .unwrap()
2469 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2470 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2471 :
2472 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2473 0 : return Err(QueryError::Unauthorized(
2474 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2475 0 : ));
2476 0 : }
2477 0 :
2478 0 : debug!(
2479 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2480 : data.claims.scope, data.claims.tenant_id,
2481 : );
2482 :
2483 0 : self.claims = Some(data.claims);
2484 0 : Ok(())
2485 0 : }
2486 :
2487 0 : fn startup(
2488 0 : &mut self,
2489 0 : _pgb: &mut PostgresBackend<IO>,
2490 0 : sm: &FeStartupPacket,
2491 0 : ) -> Result<(), QueryError> {
2492 0 : fail::fail_point!("ps::connection-start::startup-packet");
2493 :
2494 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
2495 0 : if let Some(app_name) = params.get("application_name") {
2496 0 : Span::current().record("application_name", field::display(app_name));
2497 0 : }
2498 0 : };
2499 :
2500 0 : Ok(())
2501 0 : }
2502 :
2503 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2504 : async fn process_query(
2505 : &mut self,
2506 : pgb: &mut PostgresBackend<IO>,
2507 : query_string: &str,
2508 : ) -> Result<(), QueryError> {
2509 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2510 0 : info!("Hit failpoint for bad connection");
2511 0 : Err(QueryError::SimulatedConnectionError)
2512 0 : });
2513 :
2514 : fail::fail_point!("ps::connection-start::process-query");
2515 :
2516 : let ctx = self.connection_ctx.attached_child();
2517 : debug!("process query {query_string}");
2518 : let query = PageServiceCmd::parse(query_string)?;
2519 : match query {
2520 : PageServiceCmd::PageStream(PageStreamCmd {
2521 : tenant_id,
2522 : timeline_id,
2523 : protocol_version,
2524 : }) => {
2525 : tracing::Span::current()
2526 : .record("tenant_id", field::display(tenant_id))
2527 : .record("timeline_id", field::display(timeline_id));
2528 :
2529 : self.check_permission(Some(tenant_id))?;
2530 : let command_kind = match protocol_version {
2531 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2532 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2533 : };
2534 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2535 :
2536 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2537 : .await?;
2538 : }
2539 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2540 : tenant_id,
2541 : timeline_id,
2542 : lsn,
2543 : gzip,
2544 : replica,
2545 : }) => {
2546 : tracing::Span::current()
2547 : .record("tenant_id", field::display(tenant_id))
2548 : .record("timeline_id", field::display(timeline_id));
2549 :
2550 : self.check_permission(Some(tenant_id))?;
2551 :
2552 : COMPUTE_COMMANDS_COUNTERS
2553 : .for_command(ComputeCommandKind::Basebackup)
2554 : .inc();
2555 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2556 0 : let res = async {
2557 0 : self.handle_basebackup_request(
2558 0 : pgb,
2559 0 : tenant_id,
2560 0 : timeline_id,
2561 0 : lsn,
2562 0 : None,
2563 0 : false,
2564 0 : gzip,
2565 0 : replica,
2566 0 : &ctx,
2567 0 : )
2568 0 : .await?;
2569 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2570 0 : Result::<(), QueryError>::Ok(())
2571 0 : }
2572 : .await;
2573 : metric_recording.observe(&res);
2574 : res?;
2575 : }
2576 : // same as basebackup, but result includes relational data as well
2577 : PageServiceCmd::FullBackup(FullBackupCmd {
2578 : tenant_id,
2579 : timeline_id,
2580 : lsn,
2581 : prev_lsn,
2582 : }) => {
2583 : tracing::Span::current()
2584 : .record("tenant_id", field::display(tenant_id))
2585 : .record("timeline_id", field::display(timeline_id));
2586 :
2587 : self.check_permission(Some(tenant_id))?;
2588 :
2589 : COMPUTE_COMMANDS_COUNTERS
2590 : .for_command(ComputeCommandKind::Fullbackup)
2591 : .inc();
2592 :
2593 : // Check that the timeline exists
2594 : self.handle_basebackup_request(
2595 : pgb,
2596 : tenant_id,
2597 : timeline_id,
2598 : lsn,
2599 : prev_lsn,
2600 : true,
2601 : false,
2602 : false,
2603 : &ctx,
2604 : )
2605 : .await?;
2606 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2607 : }
2608 : PageServiceCmd::Set => {
2609 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2610 : // on connect
2611 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2612 : }
2613 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2614 : tenant_shard_id,
2615 : timeline_id,
2616 : lsn,
2617 : }) => {
2618 : tracing::Span::current()
2619 : .record("tenant_id", field::display(tenant_shard_id))
2620 : .record("timeline_id", field::display(timeline_id));
2621 :
2622 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2623 :
2624 : COMPUTE_COMMANDS_COUNTERS
2625 : .for_command(ComputeCommandKind::LeaseLsn)
2626 : .inc();
2627 :
2628 : match self
2629 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2630 : .await
2631 : {
2632 : Ok(()) => {
2633 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2634 : }
2635 : Err(e) => {
2636 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2637 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2638 : &e.to_string(),
2639 : Some(e.pg_error_code()),
2640 : ))?
2641 : }
2642 : };
2643 : }
2644 : }
2645 :
2646 : Ok(())
2647 : }
2648 : }
2649 :
2650 : impl From<GetActiveTenantError> for QueryError {
2651 0 : fn from(e: GetActiveTenantError) -> Self {
2652 0 : match e {
2653 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2654 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2655 0 : ),
2656 : GetActiveTenantError::Cancelled
2657 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2658 0 : QueryError::Shutdown
2659 : }
2660 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2661 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2662 : }
2663 0 : }
2664 : }
2665 :
2666 : #[derive(Debug, thiserror::Error)]
2667 : pub(crate) enum GetActiveTimelineError {
2668 : #[error(transparent)]
2669 : Tenant(GetActiveTenantError),
2670 : #[error(transparent)]
2671 : Timeline(#[from] GetTimelineError),
2672 : }
2673 :
2674 : impl From<GetActiveTimelineError> for QueryError {
2675 0 : fn from(e: GetActiveTimelineError) -> Self {
2676 0 : match e {
2677 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2678 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2679 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2680 : }
2681 0 : }
2682 : }
2683 :
2684 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
2685 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
2686 0 : match e {
2687 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
2688 0 : }
2689 0 : }
2690 : }
2691 :
2692 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2693 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2694 0 : tracing::Span::current().record(
2695 0 : "shard_id",
2696 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2697 0 : );
2698 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2699 0 : }
2700 :
2701 : struct WaitedForLsn(Lsn);
2702 : impl From<WaitedForLsn> for Lsn {
2703 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2704 0 : lsn
2705 0 : }
2706 : }
2707 :
2708 : #[cfg(test)]
2709 : mod tests {
2710 : use utils::shard::ShardCount;
2711 :
2712 : use super::*;
2713 :
2714 : #[test]
2715 4 : fn pageservice_cmd_parse() {
2716 4 : let tenant_id = TenantId::generate();
2717 4 : let timeline_id = TimelineId::generate();
2718 4 : let cmd =
2719 4 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2720 4 : assert_eq!(
2721 4 : cmd,
2722 4 : PageServiceCmd::PageStream(PageStreamCmd {
2723 4 : tenant_id,
2724 4 : timeline_id,
2725 4 : protocol_version: PagestreamProtocolVersion::V2,
2726 4 : })
2727 4 : );
2728 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2729 4 : assert_eq!(
2730 4 : cmd,
2731 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2732 4 : tenant_id,
2733 4 : timeline_id,
2734 4 : lsn: None,
2735 4 : gzip: false,
2736 4 : replica: false
2737 4 : })
2738 4 : );
2739 4 : let cmd =
2740 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2741 4 : assert_eq!(
2742 4 : cmd,
2743 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2744 4 : tenant_id,
2745 4 : timeline_id,
2746 4 : lsn: None,
2747 4 : gzip: true,
2748 4 : replica: false
2749 4 : })
2750 4 : );
2751 4 : let cmd =
2752 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2753 4 : assert_eq!(
2754 4 : cmd,
2755 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2756 4 : tenant_id,
2757 4 : timeline_id,
2758 4 : lsn: None,
2759 4 : gzip: false,
2760 4 : replica: false
2761 4 : })
2762 4 : );
2763 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2764 4 : .unwrap();
2765 4 : assert_eq!(
2766 4 : cmd,
2767 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2768 4 : tenant_id,
2769 4 : timeline_id,
2770 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2771 4 : gzip: false,
2772 4 : replica: false
2773 4 : })
2774 4 : );
2775 4 : let cmd = PageServiceCmd::parse(&format!(
2776 4 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2777 4 : ))
2778 4 : .unwrap();
2779 4 : assert_eq!(
2780 4 : cmd,
2781 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2782 4 : tenant_id,
2783 4 : timeline_id,
2784 4 : lsn: None,
2785 4 : gzip: true,
2786 4 : replica: true
2787 4 : })
2788 4 : );
2789 4 : let cmd = PageServiceCmd::parse(&format!(
2790 4 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2791 4 : ))
2792 4 : .unwrap();
2793 4 : assert_eq!(
2794 4 : cmd,
2795 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2796 4 : tenant_id,
2797 4 : timeline_id,
2798 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2799 4 : gzip: true,
2800 4 : replica: true
2801 4 : })
2802 4 : );
2803 4 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2804 4 : assert_eq!(
2805 4 : cmd,
2806 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2807 4 : tenant_id,
2808 4 : timeline_id,
2809 4 : lsn: None,
2810 4 : prev_lsn: None
2811 4 : })
2812 4 : );
2813 4 : let cmd = PageServiceCmd::parse(&format!(
2814 4 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2815 4 : ))
2816 4 : .unwrap();
2817 4 : assert_eq!(
2818 4 : cmd,
2819 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2820 4 : tenant_id,
2821 4 : timeline_id,
2822 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2823 4 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2824 4 : })
2825 4 : );
2826 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2827 4 : let cmd = PageServiceCmd::parse(&format!(
2828 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2829 4 : ))
2830 4 : .unwrap();
2831 4 : assert_eq!(
2832 4 : cmd,
2833 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2834 4 : tenant_shard_id,
2835 4 : timeline_id,
2836 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2837 4 : })
2838 4 : );
2839 4 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2840 4 : let cmd = PageServiceCmd::parse(&format!(
2841 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2842 4 : ))
2843 4 : .unwrap();
2844 4 : assert_eq!(
2845 4 : cmd,
2846 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2847 4 : tenant_shard_id,
2848 4 : timeline_id,
2849 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2850 4 : })
2851 4 : );
2852 4 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2853 4 : assert_eq!(cmd, PageServiceCmd::Set);
2854 4 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2855 4 : assert_eq!(cmd, PageServiceCmd::Set);
2856 4 : }
2857 :
2858 : #[test]
2859 4 : fn pageservice_cmd_err_handling() {
2860 4 : let tenant_id = TenantId::generate();
2861 4 : let timeline_id = TimelineId::generate();
2862 4 : let cmd = PageServiceCmd::parse("unknown_command");
2863 4 : assert!(cmd.is_err());
2864 4 : let cmd = PageServiceCmd::parse("pagestream_v2");
2865 4 : assert!(cmd.is_err());
2866 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2867 4 : assert!(cmd.is_err());
2868 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2869 4 : assert!(cmd.is_err());
2870 4 : let cmd = PageServiceCmd::parse(&format!(
2871 4 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2872 4 : ));
2873 4 : assert!(cmd.is_err());
2874 4 : let cmd = PageServiceCmd::parse(&format!(
2875 4 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2876 4 : ));
2877 4 : assert!(cmd.is_err());
2878 4 : let cmd = PageServiceCmd::parse(&format!(
2879 4 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2880 4 : ));
2881 4 : assert!(cmd.is_err());
2882 4 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2883 4 : assert!(cmd.is_err());
2884 4 : }
2885 : }
|