Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::borrow::Cow;
5 : use std::num::NonZeroUsize;
6 : use std::os::fd::AsRawFd;
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use std::time::{Duration, Instant, SystemTime};
10 : use std::{io, str};
11 :
12 : use anyhow::{Context, bail};
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use futures::FutureExt;
16 : use itertools::Itertools;
17 : use once_cell::sync::OnceCell;
18 : use pageserver_api::config::{
19 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
20 : PageServiceProtocolPipelinedExecutionStrategy,
21 : };
22 : use pageserver_api::key::rel_block_to_key;
23 : use pageserver_api::models::{
24 : self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
25 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
26 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
27 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
28 : PagestreamProtocolVersion, PagestreamRequest, TenantState,
29 : };
30 : use pageserver_api::reltag::SlruKind;
31 : use pageserver_api::shard::TenantShardId;
32 : use postgres_backend::{
33 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
34 : };
35 : use postgres_ffi::BLCKSZ;
36 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
37 : use pq_proto::framed::ConnectionError;
38 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
39 : use strum_macros::IntoStaticStr;
40 : use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
41 : use tokio::task::JoinHandle;
42 : use tokio_util::sync::CancellationToken;
43 : use tracing::*;
44 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
45 : use utils::failpoint_support;
46 : use utils::id::{TenantId, TimelineId};
47 : use utils::logging::log_slow;
48 : use utils::lsn::Lsn;
49 : use utils::simple_rcu::RcuReadGuard;
50 : use utils::sync::gate::{Gate, GateGuard};
51 : use utils::sync::spsc_fold;
52 :
53 : use crate::auth::check_permission;
54 : use crate::basebackup::BasebackupError;
55 : use crate::config::PageServerConf;
56 : use crate::context::{DownloadBehavior, RequestContext};
57 : use crate::metrics::{
58 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
59 : };
60 : use crate::pgdatadir_mapping::Version;
61 : use crate::span::{
62 : debug_assert_current_span_has_tenant_and_timeline_id,
63 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
64 : };
65 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
66 : use crate::tenant::mgr::{
67 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
68 : };
69 : use crate::tenant::storage_layer::IoConcurrency;
70 : use crate::tenant::timeline::{self, WaitLsnError};
71 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
72 : use crate::{basebackup, timed_after_cancellation};
73 :
74 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
75 : /// is not yet in state [`TenantState::Active`].
76 : ///
77 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
78 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
79 :
80 : /// Threshold at which to log slow GetPage requests.
81 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
82 :
83 : ///////////////////////////////////////////////////////////////////////////////
84 :
85 : pub struct Listener {
86 : cancel: CancellationToken,
87 : /// Cancel the listener task through `listen_cancel` to shut down the listener
88 : /// and get a handle on the existing connections.
89 : task: JoinHandle<Connections>,
90 : }
91 :
92 : pub struct Connections {
93 : cancel: CancellationToken,
94 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
95 : gate: Gate,
96 : }
97 :
98 0 : pub fn spawn(
99 0 : conf: &'static PageServerConf,
100 0 : tenant_manager: Arc<TenantManager>,
101 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
102 0 : tcp_listener: tokio::net::TcpListener,
103 0 : ) -> Listener {
104 0 : let cancel = CancellationToken::new();
105 0 : let libpq_ctx = RequestContext::todo_child(
106 0 : TaskKind::LibpqEndpointListener,
107 0 : // listener task shouldn't need to download anything. (We will
108 0 : // create a separate sub-contexts for each connection, with their
109 0 : // own download behavior. This context is used only to listen and
110 0 : // accept connections.)
111 0 : DownloadBehavior::Error,
112 0 : );
113 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
114 0 : "libpq listener",
115 0 : libpq_listener_main(
116 0 : conf,
117 0 : tenant_manager,
118 0 : pg_auth,
119 0 : tcp_listener,
120 0 : conf.pg_auth_type,
121 0 : conf.page_service_pipelining.clone(),
122 0 : libpq_ctx,
123 0 : cancel.clone(),
124 0 : )
125 0 : .map(anyhow::Ok),
126 0 : ));
127 0 :
128 0 : Listener { cancel, task }
129 0 : }
130 :
131 : impl Listener {
132 0 : pub async fn stop_accepting(self) -> Connections {
133 0 : self.cancel.cancel();
134 0 : self.task
135 0 : .await
136 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
137 0 : }
138 : }
139 : impl Connections {
140 0 : pub(crate) async fn shutdown(self) {
141 0 : let Self {
142 0 : cancel,
143 0 : mut tasks,
144 0 : gate,
145 0 : } = self;
146 0 : cancel.cancel();
147 0 : while let Some(res) = tasks.join_next().await {
148 0 : Self::handle_connection_completion(res);
149 0 : }
150 0 : gate.close().await;
151 0 : }
152 :
153 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
154 0 : match res {
155 0 : Ok(Ok(())) => {}
156 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
157 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
158 : }
159 0 : }
160 : }
161 :
162 : ///
163 : /// Main loop of the page service.
164 : ///
165 : /// Listens for connections, and launches a new handler task for each.
166 : ///
167 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
168 : /// open connections.
169 : ///
170 : #[allow(clippy::too_many_arguments)]
171 0 : pub async fn libpq_listener_main(
172 0 : conf: &'static PageServerConf,
173 0 : tenant_manager: Arc<TenantManager>,
174 0 : auth: Option<Arc<SwappableJwtAuth>>,
175 0 : listener: tokio::net::TcpListener,
176 0 : auth_type: AuthType,
177 0 : pipelining_config: PageServicePipeliningConfig,
178 0 : listener_ctx: RequestContext,
179 0 : listener_cancel: CancellationToken,
180 0 : ) -> Connections {
181 0 : let connections_cancel = CancellationToken::new();
182 0 : let connections_gate = Gate::default();
183 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
184 :
185 : loop {
186 0 : let gate_guard = match connections_gate.enter() {
187 0 : Ok(guard) => guard,
188 0 : Err(_) => break,
189 : };
190 :
191 0 : let accepted = tokio::select! {
192 : biased;
193 0 : _ = listener_cancel.cancelled() => break,
194 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
195 0 : let res = next.expect("we dont poll while empty");
196 0 : Connections::handle_connection_completion(res);
197 0 : continue;
198 : }
199 0 : accepted = listener.accept() => accepted,
200 0 : };
201 0 :
202 0 : match accepted {
203 0 : Ok((socket, peer_addr)) => {
204 0 : // Connection established. Spawn a new task to handle it.
205 0 : debug!("accepted connection from {}", peer_addr);
206 0 : let local_auth = auth.clone();
207 0 : let connection_ctx = listener_ctx
208 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
209 0 : connection_handler_tasks.spawn(page_service_conn_main(
210 0 : conf,
211 0 : tenant_manager.clone(),
212 0 : local_auth,
213 0 : socket,
214 0 : auth_type,
215 0 : pipelining_config.clone(),
216 0 : connection_ctx,
217 0 : connections_cancel.child_token(),
218 0 : gate_guard,
219 0 : ));
220 : }
221 0 : Err(err) => {
222 0 : // accept() failed. Log the error, and loop back to retry on next connection.
223 0 : error!("accept() failed: {:?}", err);
224 : }
225 : }
226 : }
227 :
228 0 : debug!("page_service listener loop terminated");
229 :
230 0 : Connections {
231 0 : cancel: connections_cancel,
232 0 : tasks: connection_handler_tasks,
233 0 : gate: connections_gate,
234 0 : }
235 0 : }
236 :
237 : type ConnectionHandlerResult = anyhow::Result<()>;
238 :
239 : #[instrument(skip_all, fields(peer_addr, application_name))]
240 : #[allow(clippy::too_many_arguments)]
241 : async fn page_service_conn_main(
242 : conf: &'static PageServerConf,
243 : tenant_manager: Arc<TenantManager>,
244 : auth: Option<Arc<SwappableJwtAuth>>,
245 : socket: tokio::net::TcpStream,
246 : auth_type: AuthType,
247 : pipelining_config: PageServicePipeliningConfig,
248 : connection_ctx: RequestContext,
249 : cancel: CancellationToken,
250 : gate_guard: GateGuard,
251 : ) -> ConnectionHandlerResult {
252 : let _guard = LIVE_CONNECTIONS
253 : .with_label_values(&["page_service"])
254 : .guard();
255 :
256 : socket
257 : .set_nodelay(true)
258 : .context("could not set TCP_NODELAY")?;
259 :
260 : let socket_fd = socket.as_raw_fd();
261 :
262 : let peer_addr = socket.peer_addr().context("get peer address")?;
263 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
264 :
265 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
266 : // - long enough for most valid compute connections
267 : // - less than infinite to stop us from "leaking" connections to long-gone computes
268 : //
269 : // no write timeout is used, because the kernel is assumed to error writes after some time.
270 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
271 :
272 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
273 0 : let socket_timeout_ms = (|| {
274 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
275 : // Exponential distribution for simulating
276 : // poor network conditions, expect about avg_timeout_ms to be around 15
277 : // in tests
278 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
279 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
280 0 : let u = rand::random::<f32>();
281 0 : ((1.0 - u).ln() / (-avg)) as u64
282 : } else {
283 0 : default_timeout_ms
284 : }
285 0 : });
286 0 : default_timeout_ms
287 : })();
288 :
289 : // A timeout here does not mean the client died, it can happen if it's just idle for
290 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
291 : // they reconnect.
292 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
293 : let socket = Box::pin(socket);
294 :
295 : fail::fail_point!("ps::connection-start::pre-login");
296 :
297 : // XXX: pgbackend.run() should take the connection_ctx,
298 : // and create a child per-query context when it invokes process_query.
299 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
300 : // and create the per-query context in process_query ourselves.
301 : let mut conn_handler = PageServerHandler::new(
302 : conf,
303 : tenant_manager,
304 : auth,
305 : pipelining_config,
306 : connection_ctx,
307 : cancel.clone(),
308 : gate_guard,
309 : );
310 : let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
311 :
312 : match pgbackend.run(&mut conn_handler, &cancel).await {
313 : Ok(()) => {
314 : // we've been requested to shut down
315 : Ok(())
316 : }
317 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
318 : if is_expected_io_error(&io_error) {
319 : info!("Postgres client disconnected ({io_error})");
320 : Ok(())
321 : } else {
322 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
323 : Err(io_error).context(format!(
324 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
325 : tenant_id, peer_addr
326 : ))
327 : }
328 : }
329 : other => {
330 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
331 : other.context(format!(
332 : "Postgres query error for tenant_id={:?} client peer_addr={}",
333 : tenant_id, peer_addr
334 : ))
335 : }
336 : }
337 : }
338 :
339 : struct PageServerHandler {
340 : conf: &'static PageServerConf,
341 : auth: Option<Arc<SwappableJwtAuth>>,
342 : claims: Option<Claims>,
343 :
344 : /// The context created for the lifetime of the connection
345 : /// services by this PageServerHandler.
346 : /// For each query received over the connection,
347 : /// `process_query` creates a child context from this one.
348 : connection_ctx: RequestContext,
349 :
350 : cancel: CancellationToken,
351 :
352 : /// None only while pagestream protocol is being processed.
353 : timeline_handles: Option<TimelineHandles>,
354 :
355 : pipelining_config: PageServicePipeliningConfig,
356 :
357 : gate_guard: GateGuard,
358 : }
359 :
360 : struct TimelineHandles {
361 : wrapper: TenantManagerWrapper,
362 : /// Note on size: the typical size of this map is 1. The largest size we expect
363 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
364 : /// or the ratio used when splitting shards (i.e. how many children created from one)
365 : /// parent shard, where a "large" number might be ~8.
366 : handles: timeline::handle::Cache<TenantManagerTypes>,
367 : }
368 :
369 : impl TimelineHandles {
370 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
371 0 : Self {
372 0 : wrapper: TenantManagerWrapper {
373 0 : tenant_manager,
374 0 : tenant_id: OnceCell::new(),
375 0 : },
376 0 : handles: Default::default(),
377 0 : }
378 0 : }
379 0 : async fn get(
380 0 : &mut self,
381 0 : tenant_id: TenantId,
382 0 : timeline_id: TimelineId,
383 0 : shard_selector: ShardSelector,
384 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
385 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
386 0 : return Err(GetActiveTimelineError::Tenant(
387 0 : GetActiveTenantError::SwitchedTenant,
388 0 : ));
389 0 : }
390 0 : self.handles
391 0 : .get(timeline_id, shard_selector, &self.wrapper)
392 0 : .await
393 0 : .map_err(|e| match e {
394 0 : timeline::handle::GetError::TenantManager(e) => e,
395 : timeline::handle::GetError::PerTimelineStateShutDown => {
396 0 : trace!("per-timeline state shut down");
397 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
398 : }
399 0 : })
400 0 : }
401 :
402 0 : fn tenant_id(&self) -> Option<TenantId> {
403 0 : self.wrapper.tenant_id.get().copied()
404 0 : }
405 : }
406 :
407 : pub(crate) struct TenantManagerWrapper {
408 : tenant_manager: Arc<TenantManager>,
409 : // We do not support switching tenant_id on a connection at this point.
410 : // We can can add support for this later if needed without changing
411 : // the protocol.
412 : tenant_id: once_cell::sync::OnceCell<TenantId>,
413 : }
414 :
415 : #[derive(Debug)]
416 : pub(crate) struct TenantManagerTypes;
417 :
418 : impl timeline::handle::Types for TenantManagerTypes {
419 : type TenantManagerError = GetActiveTimelineError;
420 : type TenantManager = TenantManagerWrapper;
421 : type Timeline = TenantManagerCacheItem;
422 : }
423 :
424 : pub(crate) struct TenantManagerCacheItem {
425 : pub(crate) timeline: Arc<Timeline>,
426 : #[allow(dead_code)] // we store it to keep the gate open
427 : pub(crate) gate_guard: GateGuard,
428 : }
429 :
430 : impl std::ops::Deref for TenantManagerCacheItem {
431 : type Target = Arc<Timeline>;
432 0 : fn deref(&self) -> &Self::Target {
433 0 : &self.timeline
434 0 : }
435 : }
436 :
437 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
438 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
439 0 : Timeline::shard_timeline_id(&self.timeline)
440 0 : }
441 :
442 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
443 0 : &self.timeline.handles
444 0 : }
445 :
446 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
447 0 : Timeline::get_shard_identity(&self.timeline)
448 0 : }
449 : }
450 :
451 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
452 0 : async fn resolve(
453 0 : &self,
454 0 : timeline_id: TimelineId,
455 0 : shard_selector: ShardSelector,
456 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
457 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
458 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
459 0 : let wait_start = Instant::now();
460 0 : let deadline = wait_start + timeout;
461 0 : let tenant_shard = loop {
462 0 : let resolved = self
463 0 : .tenant_manager
464 0 : .resolve_attached_shard(tenant_id, shard_selector);
465 0 : match resolved {
466 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
467 : ShardResolveResult::NotFound => {
468 0 : return Err(GetActiveTimelineError::Tenant(
469 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
470 0 : ));
471 : }
472 0 : ShardResolveResult::InProgress(barrier) => {
473 0 : // We can't authoritatively answer right now: wait for InProgress state
474 0 : // to end, then try again
475 0 : tokio::select! {
476 0 : _ = barrier.wait() => {
477 0 : // The barrier completed: proceed around the loop to try looking up again
478 0 : },
479 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
480 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
481 0 : latest_state: None,
482 0 : wait_time: timeout,
483 0 : }));
484 : }
485 : }
486 : }
487 : };
488 : };
489 :
490 0 : tracing::debug!("Waiting for tenant to enter active state...");
491 0 : tenant_shard
492 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
493 0 : .await
494 0 : .map_err(GetActiveTimelineError::Tenant)?;
495 :
496 0 : let timeline = tenant_shard
497 0 : .get_timeline(timeline_id, true)
498 0 : .map_err(GetActiveTimelineError::Timeline)?;
499 :
500 0 : let gate_guard = match timeline.gate.enter() {
501 0 : Ok(guard) => guard,
502 : Err(_) => {
503 0 : return Err(GetActiveTimelineError::Timeline(
504 0 : GetTimelineError::ShuttingDown,
505 0 : ));
506 : }
507 : };
508 :
509 0 : Ok(TenantManagerCacheItem {
510 0 : timeline,
511 0 : gate_guard,
512 0 : })
513 0 : }
514 : }
515 :
516 : #[derive(thiserror::Error, Debug)]
517 : enum PageStreamError {
518 : /// We encountered an error that should prompt the client to reconnect:
519 : /// in practice this means we drop the connection without sending a response.
520 : #[error("Reconnect required: {0}")]
521 : Reconnect(Cow<'static, str>),
522 :
523 : /// We were instructed to shutdown while processing the query
524 : #[error("Shutting down")]
525 : Shutdown,
526 :
527 : /// Something went wrong reading a page: this likely indicates a pageserver bug
528 : #[error("Read error")]
529 : Read(#[source] PageReconstructError),
530 :
531 : /// Ran out of time waiting for an LSN
532 : #[error("LSN timeout: {0}")]
533 : LsnTimeout(WaitLsnError),
534 :
535 : /// The entity required to serve the request (tenant or timeline) is not found,
536 : /// or is not found in a suitable state to serve a request.
537 : #[error("Not found: {0}")]
538 : NotFound(Cow<'static, str>),
539 :
540 : /// Request asked for something that doesn't make sense, like an invalid LSN
541 : #[error("Bad request: {0}")]
542 : BadRequest(Cow<'static, str>),
543 : }
544 :
545 : impl From<PageReconstructError> for PageStreamError {
546 0 : fn from(value: PageReconstructError) -> Self {
547 0 : match value {
548 0 : PageReconstructError::Cancelled => Self::Shutdown,
549 0 : e => Self::Read(e),
550 : }
551 0 : }
552 : }
553 :
554 : impl From<GetActiveTimelineError> for PageStreamError {
555 0 : fn from(value: GetActiveTimelineError) -> Self {
556 0 : match value {
557 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
558 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
559 : TenantState::Stopping { .. },
560 : ))
561 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
562 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
563 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
564 : }
565 0 : }
566 : }
567 :
568 : impl From<WaitLsnError> for PageStreamError {
569 0 : fn from(value: WaitLsnError) -> Self {
570 0 : match value {
571 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
572 0 : WaitLsnError::Shutdown => Self::Shutdown,
573 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
574 : }
575 0 : }
576 : }
577 :
578 : impl From<WaitLsnError> for QueryError {
579 0 : fn from(value: WaitLsnError) -> Self {
580 0 : match value {
581 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
582 0 : WaitLsnError::Shutdown => Self::Shutdown,
583 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
584 : }
585 0 : }
586 : }
587 :
588 : #[derive(thiserror::Error, Debug)]
589 : struct BatchedPageStreamError {
590 : req: PagestreamRequest,
591 : err: PageStreamError,
592 : }
593 :
594 : impl std::fmt::Display for BatchedPageStreamError {
595 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596 0 : self.err.fmt(f)
597 0 : }
598 : }
599 :
600 : struct BatchedGetPageRequest {
601 : req: PagestreamGetPageRequest,
602 : timer: SmgrOpTimer,
603 : }
604 :
605 : #[cfg(feature = "testing")]
606 : struct BatchedTestRequest {
607 : req: models::PagestreamTestRequest,
608 : timer: SmgrOpTimer,
609 : }
610 :
611 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
612 : /// so that we don't keep the [`Timeline::gate`] open while the batch
613 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
614 : #[derive(IntoStaticStr)]
615 : enum BatchedFeMessage {
616 : Exists {
617 : span: Span,
618 : timer: SmgrOpTimer,
619 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
620 : req: models::PagestreamExistsRequest,
621 : },
622 : Nblocks {
623 : span: Span,
624 : timer: SmgrOpTimer,
625 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
626 : req: models::PagestreamNblocksRequest,
627 : },
628 : GetPage {
629 : span: Span,
630 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
631 : effective_request_lsn: Lsn,
632 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
633 : },
634 : DbSize {
635 : span: Span,
636 : timer: SmgrOpTimer,
637 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
638 : req: models::PagestreamDbSizeRequest,
639 : },
640 : GetSlruSegment {
641 : span: Span,
642 : timer: SmgrOpTimer,
643 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
644 : req: models::PagestreamGetSlruSegmentRequest,
645 : },
646 : #[cfg(feature = "testing")]
647 : Test {
648 : span: Span,
649 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
650 : requests: Vec<BatchedTestRequest>,
651 : },
652 : RespondError {
653 : span: Span,
654 : error: BatchedPageStreamError,
655 : },
656 : }
657 :
658 : impl BatchedFeMessage {
659 0 : fn as_static_str(&self) -> &'static str {
660 0 : self.into()
661 0 : }
662 :
663 0 : fn observe_execution_start(&mut self, at: Instant) {
664 0 : match self {
665 0 : BatchedFeMessage::Exists { timer, .. }
666 0 : | BatchedFeMessage::Nblocks { timer, .. }
667 0 : | BatchedFeMessage::DbSize { timer, .. }
668 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
669 0 : timer.observe_execution_start(at);
670 0 : }
671 0 : BatchedFeMessage::GetPage { pages, .. } => {
672 0 : for page in pages {
673 0 : page.timer.observe_execution_start(at);
674 0 : }
675 : }
676 : #[cfg(feature = "testing")]
677 0 : BatchedFeMessage::Test { requests, .. } => {
678 0 : for req in requests {
679 0 : req.timer.observe_execution_start(at);
680 0 : }
681 : }
682 0 : BatchedFeMessage::RespondError { .. } => {}
683 : }
684 0 : }
685 : }
686 :
687 : impl PageServerHandler {
688 0 : pub fn new(
689 0 : conf: &'static PageServerConf,
690 0 : tenant_manager: Arc<TenantManager>,
691 0 : auth: Option<Arc<SwappableJwtAuth>>,
692 0 : pipelining_config: PageServicePipeliningConfig,
693 0 : connection_ctx: RequestContext,
694 0 : cancel: CancellationToken,
695 0 : gate_guard: GateGuard,
696 0 : ) -> Self {
697 0 : PageServerHandler {
698 0 : conf,
699 0 : auth,
700 0 : claims: None,
701 0 : connection_ctx,
702 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
703 0 : cancel,
704 0 : pipelining_config,
705 0 : gate_guard,
706 0 : }
707 0 : }
708 :
709 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
710 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
711 : /// cancellation if there aren't any timelines in the cache.
712 : ///
713 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
714 : /// timeline cancellation token.
715 0 : async fn flush_cancellable<IO>(
716 0 : &self,
717 0 : pgb: &mut PostgresBackend<IO>,
718 0 : cancel: &CancellationToken,
719 0 : ) -> Result<(), QueryError>
720 0 : where
721 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
722 0 : {
723 0 : tokio::select!(
724 0 : flush_r = pgb.flush() => {
725 0 : Ok(flush_r?)
726 : },
727 0 : _ = cancel.cancelled() => {
728 0 : Err(QueryError::Shutdown)
729 : }
730 : )
731 0 : }
732 :
733 : #[allow(clippy::too_many_arguments)]
734 0 : async fn pagestream_read_message<IO>(
735 0 : pgb: &mut PostgresBackendReader<IO>,
736 0 : tenant_id: TenantId,
737 0 : timeline_id: TimelineId,
738 0 : timeline_handles: &mut TimelineHandles,
739 0 : cancel: &CancellationToken,
740 0 : ctx: &RequestContext,
741 0 : protocol_version: PagestreamProtocolVersion,
742 0 : parent_span: Span,
743 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
744 0 : where
745 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
746 0 : {
747 0 : let msg = tokio::select! {
748 : biased;
749 0 : _ = cancel.cancelled() => {
750 0 : return Err(QueryError::Shutdown)
751 : }
752 0 : msg = pgb.read_message() => { msg }
753 0 : };
754 0 :
755 0 : let received_at = Instant::now();
756 :
757 0 : let copy_data_bytes = match msg? {
758 0 : Some(FeMessage::CopyData(bytes)) => bytes,
759 : Some(FeMessage::Terminate) => {
760 0 : return Ok(None);
761 : }
762 0 : Some(m) => {
763 0 : return Err(QueryError::Other(anyhow::anyhow!(
764 0 : "unexpected message: {m:?} during COPY"
765 0 : )));
766 : }
767 : None => {
768 0 : return Ok(None);
769 : } // client disconnected
770 : };
771 0 : trace!("query: {copy_data_bytes:?}");
772 :
773 0 : fail::fail_point!("ps::handle-pagerequest-message");
774 :
775 : // parse request
776 0 : let neon_fe_msg =
777 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
778 :
779 : // TODO: turn in to async closure once available to avoid repeating received_at
780 0 : async fn record_op_start_and_throttle(
781 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
782 0 : op: metrics::SmgrQueryType,
783 0 : received_at: Instant,
784 0 : ) -> Result<SmgrOpTimer, QueryError> {
785 0 : // It's important to start the smgr op metric recorder as early as possible
786 0 : // so that the _started counters are incremented before we do
787 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
788 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
789 0 : let now = Instant::now();
790 0 : timer.observe_throttle_start(now);
791 0 : let throttled = tokio::select! {
792 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
793 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
794 : };
795 0 : timer.observe_throttle_done(throttled);
796 0 : Ok(timer)
797 0 : }
798 :
799 0 : let batched_msg = match neon_fe_msg {
800 0 : PagestreamFeMessage::Exists(req) => {
801 0 : let shard = timeline_handles
802 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
803 0 : .await?;
804 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
805 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
806 0 : let timer = record_op_start_and_throttle(
807 0 : &shard,
808 0 : metrics::SmgrQueryType::GetRelExists,
809 0 : received_at,
810 0 : )
811 0 : .await?;
812 0 : BatchedFeMessage::Exists {
813 0 : span,
814 0 : timer,
815 0 : shard: shard.downgrade(),
816 0 : req,
817 0 : }
818 : }
819 0 : PagestreamFeMessage::Nblocks(req) => {
820 0 : let shard = timeline_handles
821 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
822 0 : .await?;
823 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
824 0 : let timer = record_op_start_and_throttle(
825 0 : &shard,
826 0 : metrics::SmgrQueryType::GetRelSize,
827 0 : received_at,
828 0 : )
829 0 : .await?;
830 0 : BatchedFeMessage::Nblocks {
831 0 : span,
832 0 : timer,
833 0 : shard: shard.downgrade(),
834 0 : req,
835 0 : }
836 : }
837 0 : PagestreamFeMessage::DbSize(req) => {
838 0 : let shard = timeline_handles
839 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
840 0 : .await?;
841 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
842 0 : let timer = record_op_start_and_throttle(
843 0 : &shard,
844 0 : metrics::SmgrQueryType::GetDbSize,
845 0 : received_at,
846 0 : )
847 0 : .await?;
848 0 : BatchedFeMessage::DbSize {
849 0 : span,
850 0 : timer,
851 0 : shard: shard.downgrade(),
852 0 : req,
853 0 : }
854 : }
855 0 : PagestreamFeMessage::GetSlruSegment(req) => {
856 0 : let shard = timeline_handles
857 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
858 0 : .await?;
859 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
860 0 : let timer = record_op_start_and_throttle(
861 0 : &shard,
862 0 : metrics::SmgrQueryType::GetSlruSegment,
863 0 : received_at,
864 0 : )
865 0 : .await?;
866 0 : BatchedFeMessage::GetSlruSegment {
867 0 : span,
868 0 : timer,
869 0 : shard: shard.downgrade(),
870 0 : req,
871 0 : }
872 : }
873 0 : PagestreamFeMessage::GetPage(req) => {
874 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
875 : macro_rules! mkspan {
876 : (before shard routing) => {{
877 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
878 : }};
879 : ($shard_id:expr) => {{
880 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
881 : }};
882 : }
883 :
884 : macro_rules! respond_error {
885 : ($span:expr, $error:expr) => {{
886 : let error = BatchedFeMessage::RespondError {
887 : span: $span,
888 : error: BatchedPageStreamError {
889 : req: req.hdr,
890 : err: $error,
891 : },
892 : };
893 : Ok(Some(error))
894 : }};
895 : }
896 :
897 0 : let key = rel_block_to_key(req.rel, req.blkno);
898 0 : let shard = match timeline_handles
899 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
900 0 : .await
901 : {
902 0 : Ok(tl) => tl,
903 0 : Err(e) => {
904 0 : let span = mkspan!(before shard routing);
905 0 : match e {
906 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
907 : // We already know this tenant exists in general, because we resolved it at
908 : // start of connection. Getting a NotFound here indicates that the shard containing
909 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
910 : // mapping is out of date.
911 : //
912 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
913 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
914 : // and talk to a different pageserver.
915 0 : return respond_error!(
916 0 : span,
917 0 : PageStreamError::Reconnect(
918 0 : "getpage@lsn request routed to wrong shard".into()
919 0 : )
920 0 : );
921 : }
922 0 : e => {
923 0 : return respond_error!(span, e.into());
924 : }
925 : }
926 : }
927 : };
928 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
929 :
930 0 : let timer = record_op_start_and_throttle(
931 0 : &shard,
932 0 : metrics::SmgrQueryType::GetPageAtLsn,
933 0 : received_at,
934 0 : )
935 0 : .await?;
936 :
937 : // We're holding the Handle
938 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
939 0 : &shard,
940 0 : req.hdr.request_lsn,
941 0 : req.hdr.not_modified_since,
942 0 : &shard.get_applied_gc_cutoff_lsn(),
943 0 : ctx,
944 0 : )
945 0 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
946 0 : .await
947 : {
948 0 : Ok(lsn) => lsn,
949 0 : Err(e) => {
950 0 : return respond_error!(span, e);
951 : }
952 : };
953 : BatchedFeMessage::GetPage {
954 0 : span,
955 0 : shard: shard.downgrade(),
956 0 : effective_request_lsn,
957 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
958 : }
959 : }
960 : #[cfg(feature = "testing")]
961 0 : PagestreamFeMessage::Test(req) => {
962 0 : let shard = timeline_handles
963 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
964 0 : .await?;
965 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
966 0 : let timer =
967 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
968 0 : .await?;
969 0 : BatchedFeMessage::Test {
970 0 : span,
971 0 : shard: shard.downgrade(),
972 0 : requests: vec![BatchedTestRequest { req, timer }],
973 0 : }
974 : }
975 : };
976 0 : Ok(Some(batched_msg))
977 0 : }
978 :
979 : /// Post-condition: `batch` is Some()
980 : #[instrument(skip_all, level = tracing::Level::TRACE)]
981 : #[allow(clippy::boxed_local)]
982 : fn pagestream_do_batch(
983 : max_batch_size: NonZeroUsize,
984 : batch: &mut Result<BatchedFeMessage, QueryError>,
985 : this_msg: Result<BatchedFeMessage, QueryError>,
986 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
987 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
988 :
989 : let this_msg = match this_msg {
990 : Ok(this_msg) => this_msg,
991 : Err(e) => return Err(Err(e)),
992 : };
993 :
994 : match (&mut *batch, this_msg) {
995 : // something batched already, let's see if we can add this message to the batch
996 : (
997 : Ok(BatchedFeMessage::GetPage {
998 : span: _,
999 : shard: accum_shard,
1000 : pages: accum_pages,
1001 : effective_request_lsn: accum_lsn,
1002 : }),
1003 : BatchedFeMessage::GetPage {
1004 : span: _,
1005 : shard: this_shard,
1006 : pages: this_pages,
1007 : effective_request_lsn: this_lsn,
1008 : },
1009 0 : ) if (|| {
1010 0 : assert_eq!(this_pages.len(), 1);
1011 0 : if accum_pages.len() >= max_batch_size.get() {
1012 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
1013 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
1014 0 : return false;
1015 0 : }
1016 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1017 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
1018 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1019 : // But the current logic for keeping responses in order does not support that.
1020 0 : return false;
1021 0 : }
1022 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
1023 0 : // as the effective request_lsn changes
1024 0 : if *accum_lsn != this_lsn {
1025 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
1026 0 : return false;
1027 0 : }
1028 0 : true
1029 : })() =>
1030 : {
1031 : // ok to batch
1032 : accum_pages.extend(this_pages);
1033 : Ok(())
1034 : }
1035 : #[cfg(feature = "testing")]
1036 : (
1037 : Ok(BatchedFeMessage::Test {
1038 : shard: accum_shard,
1039 : requests: accum_requests,
1040 : ..
1041 : }),
1042 : BatchedFeMessage::Test {
1043 : shard: this_shard,
1044 : requests: this_requests,
1045 : ..
1046 : },
1047 0 : ) if (|| {
1048 0 : assert!(this_requests.len() == 1);
1049 0 : if accum_requests.len() >= max_batch_size.get() {
1050 0 : trace!(%max_batch_size, "stopping batching because of batch size");
1051 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
1052 0 : return false;
1053 0 : }
1054 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1055 0 : trace!("stopping batching because timeline object mismatch");
1056 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1057 : // But the current logic for keeping responses in order does not support that.
1058 0 : return false;
1059 0 : }
1060 0 : let this_batch_key = this_requests[0].req.batch_key;
1061 0 : let accum_batch_key = accum_requests[0].req.batch_key;
1062 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
1063 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
1064 0 : return false;
1065 0 : }
1066 0 : true
1067 : })() =>
1068 : {
1069 : // ok to batch
1070 : accum_requests.extend(this_requests);
1071 : Ok(())
1072 : }
1073 : // something batched already but this message is unbatchable
1074 : (_, this_msg) => {
1075 : // by default, don't continue batching
1076 : Err(Ok(this_msg))
1077 : }
1078 : }
1079 : }
1080 :
1081 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1082 : async fn pagesteam_handle_batched_message<IO>(
1083 : &mut self,
1084 : pgb_writer: &mut PostgresBackend<IO>,
1085 : batch: BatchedFeMessage,
1086 : io_concurrency: IoConcurrency,
1087 : cancel: &CancellationToken,
1088 : protocol_version: PagestreamProtocolVersion,
1089 : ctx: &RequestContext,
1090 : ) -> Result<(), QueryError>
1091 : where
1092 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1093 : {
1094 : let started_at = Instant::now();
1095 : let batch = {
1096 : let mut batch = batch;
1097 : batch.observe_execution_start(started_at);
1098 : batch
1099 : };
1100 :
1101 : // Dispatch the batch to the appropriate request handler.
1102 : let (mut handler_results, span) = log_slow(
1103 : batch.as_static_str(),
1104 : LOG_SLOW_GETPAGE_THRESHOLD,
1105 : self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx),
1106 : )
1107 : .await?;
1108 :
1109 : // We purposefully don't count flush time into the smgr operation timer.
1110 : //
1111 : // The reason is that current compute client will not perform protocol processing
1112 : // if the postgres backend process is doing things other than `->smgr_read()`.
1113 : // This is especially the case for prefetch.
1114 : //
1115 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1116 : // all the way into our flush call below.
1117 : //
1118 : // The timer's underlying metric is used for a storage-internal latency SLO and
1119 : // we don't want to include latency in it that we can't control.
1120 : // And as pointed out above, in this case, we don't control the time that flush will take.
1121 : //
1122 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1123 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1124 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1125 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1126 : let flush_timers = {
1127 : let flushing_start_time = Instant::now();
1128 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1129 : for handler_result in &mut handler_results {
1130 : let flush_timer = match handler_result {
1131 : Ok((_, timer)) => Some(
1132 : timer
1133 : .observe_execution_end(flushing_start_time)
1134 : .expect("we are the first caller"),
1135 : ),
1136 : Err(_) => {
1137 : // TODO: measure errors
1138 : None
1139 : }
1140 : };
1141 : flush_timers.push(flush_timer);
1142 : }
1143 : assert_eq!(flush_timers.len(), handler_results.len());
1144 : flush_timers
1145 : };
1146 :
1147 : // Map handler result to protocol behavior.
1148 : // Some handler errors cause exit from pagestream protocol.
1149 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1150 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1151 : let response_msg = match handler_result {
1152 : Err(e) => match &e.err {
1153 : PageStreamError::Shutdown => {
1154 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1155 : // shutdown, then do not send the error to the client. Instead just drop the
1156 : // connection.
1157 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1158 : return Err(QueryError::Shutdown);
1159 : }
1160 : PageStreamError::Reconnect(reason) => {
1161 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1162 : return Err(QueryError::Reconnect);
1163 : }
1164 : PageStreamError::Read(_)
1165 : | PageStreamError::LsnTimeout(_)
1166 : | PageStreamError::NotFound(_)
1167 : | PageStreamError::BadRequest(_) => {
1168 : // print the all details to the log with {:#}, but for the client the
1169 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1170 : // here includes cancellation which is not an error.
1171 : let full = utils::error::report_compact_sources(&e.err);
1172 0 : span.in_scope(|| {
1173 0 : error!("error reading relation or page version: {full:#}")
1174 0 : });
1175 :
1176 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1177 : req: e.req,
1178 : message: e.err.to_string(),
1179 : })
1180 : }
1181 : },
1182 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1183 : };
1184 :
1185 : //
1186 : // marshal & transmit response message
1187 : //
1188 :
1189 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1190 : &response_msg.serialize(protocol_version),
1191 : ))?;
1192 :
1193 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1194 :
1195 : // what we want to do
1196 : let socket_fd = pgb_writer.socket_fd;
1197 : let flush_fut = pgb_writer.flush();
1198 : // metric for how long flushing takes
1199 : let flush_fut = match flushing_timer {
1200 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1201 : Instant::now(),
1202 : flush_fut,
1203 : socket_fd,
1204 : )),
1205 : None => futures::future::Either::Right(flush_fut),
1206 : };
1207 : // do it while respecting cancellation
1208 0 : let _: () = async move {
1209 0 : tokio::select! {
1210 : biased;
1211 0 : _ = cancel.cancelled() => {
1212 : // We were requested to shut down.
1213 0 : info!("shutdown request received in page handler");
1214 0 : return Err(QueryError::Shutdown)
1215 : }
1216 0 : res = flush_fut => {
1217 0 : res?;
1218 : }
1219 : }
1220 0 : Ok(())
1221 0 : }
1222 : .await?;
1223 : }
1224 : Ok(())
1225 : }
1226 :
1227 : /// Helper which dispatches a batched message to the appropriate handler.
1228 : /// Returns a vec of results, along with the extracted trace span.
1229 0 : async fn pagestream_dispatch_batched_message(
1230 0 : &mut self,
1231 0 : batch: BatchedFeMessage,
1232 0 : io_concurrency: IoConcurrency,
1233 0 : ctx: &RequestContext,
1234 0 : ) -> Result<
1235 0 : (
1236 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1237 0 : Span,
1238 0 : ),
1239 0 : QueryError,
1240 0 : > {
1241 0 : Ok(match batch {
1242 : BatchedFeMessage::Exists {
1243 0 : span,
1244 0 : timer,
1245 0 : shard,
1246 0 : req,
1247 0 : } => {
1248 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1249 0 : (
1250 0 : vec![
1251 0 : self.handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
1252 0 : .instrument(span.clone())
1253 0 : .await
1254 0 : .map(|msg| (msg, timer))
1255 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1256 0 : ],
1257 0 : span,
1258 : )
1259 : }
1260 : BatchedFeMessage::Nblocks {
1261 0 : span,
1262 0 : timer,
1263 0 : shard,
1264 0 : req,
1265 0 : } => {
1266 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1267 0 : (
1268 0 : vec![
1269 0 : self.handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
1270 0 : .instrument(span.clone())
1271 0 : .await
1272 0 : .map(|msg| (msg, timer))
1273 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1274 0 : ],
1275 0 : span,
1276 : )
1277 : }
1278 : BatchedFeMessage::GetPage {
1279 0 : span,
1280 0 : shard,
1281 0 : effective_request_lsn,
1282 0 : pages,
1283 0 : } => {
1284 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1285 0 : (
1286 0 : {
1287 0 : let npages = pages.len();
1288 0 : trace!(npages, "handling getpage request");
1289 0 : let res = self
1290 0 : .handle_get_page_at_lsn_request_batched(
1291 0 : &*shard.upgrade()?,
1292 0 : effective_request_lsn,
1293 0 : pages,
1294 0 : io_concurrency,
1295 0 : ctx,
1296 0 : )
1297 0 : .instrument(span.clone())
1298 0 : .await;
1299 0 : assert_eq!(res.len(), npages);
1300 0 : res
1301 0 : },
1302 0 : span,
1303 : )
1304 : }
1305 : BatchedFeMessage::DbSize {
1306 0 : span,
1307 0 : timer,
1308 0 : shard,
1309 0 : req,
1310 0 : } => {
1311 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1312 0 : (
1313 0 : vec![
1314 0 : self.handle_db_size_request(&*shard.upgrade()?, &req, ctx)
1315 0 : .instrument(span.clone())
1316 0 : .await
1317 0 : .map(|msg| (msg, timer))
1318 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1319 0 : ],
1320 0 : span,
1321 : )
1322 : }
1323 : BatchedFeMessage::GetSlruSegment {
1324 0 : span,
1325 0 : timer,
1326 0 : shard,
1327 0 : req,
1328 0 : } => {
1329 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1330 0 : (
1331 0 : vec![
1332 0 : self.handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
1333 0 : .instrument(span.clone())
1334 0 : .await
1335 0 : .map(|msg| (msg, timer))
1336 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1337 0 : ],
1338 0 : span,
1339 : )
1340 : }
1341 : #[cfg(feature = "testing")]
1342 : BatchedFeMessage::Test {
1343 0 : span,
1344 0 : shard,
1345 0 : requests,
1346 0 : } => {
1347 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1348 0 : (
1349 0 : {
1350 0 : let npages = requests.len();
1351 0 : trace!(npages, "handling getpage request");
1352 0 : let res = self
1353 0 : .handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
1354 0 : .instrument(span.clone())
1355 0 : .await;
1356 0 : assert_eq!(res.len(), npages);
1357 0 : res
1358 0 : },
1359 0 : span,
1360 : )
1361 : }
1362 0 : BatchedFeMessage::RespondError { span, error } => {
1363 0 : // We've already decided to respond with an error, so we don't need to
1364 0 : // call the handler.
1365 0 : (vec![Err(error)], span)
1366 : }
1367 : })
1368 0 : }
1369 :
1370 : /// Pagestream sub-protocol handler.
1371 : ///
1372 : /// It is a simple request-response protocol inside a COPYBOTH session.
1373 : ///
1374 : /// # Coding Discipline
1375 : ///
1376 : /// Coding discipline within this function: all interaction with the `pgb` connection
1377 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1378 : /// This is so that we can shutdown page_service quickly.
1379 : #[instrument(skip_all)]
1380 : async fn handle_pagerequests<IO>(
1381 : &mut self,
1382 : pgb: &mut PostgresBackend<IO>,
1383 : tenant_id: TenantId,
1384 : timeline_id: TimelineId,
1385 : protocol_version: PagestreamProtocolVersion,
1386 : ctx: RequestContext,
1387 : ) -> Result<(), QueryError>
1388 : where
1389 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1390 : {
1391 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1392 :
1393 : // switch client to COPYBOTH
1394 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1395 : tokio::select! {
1396 : biased;
1397 : _ = self.cancel.cancelled() => {
1398 : return Err(QueryError::Shutdown)
1399 : }
1400 : res = pgb.flush() => {
1401 : res?;
1402 : }
1403 : }
1404 :
1405 : let io_concurrency = IoConcurrency::spawn_from_conf(
1406 : self.conf,
1407 : match self.gate_guard.try_clone() {
1408 : Ok(guard) => guard,
1409 : Err(_) => {
1410 : info!("shutdown request received in page handler");
1411 : return Err(QueryError::Shutdown);
1412 : }
1413 : },
1414 : );
1415 :
1416 : let pgb_reader = pgb
1417 : .split()
1418 : .context("implementation error: split pgb into reader and writer")?;
1419 :
1420 : let timeline_handles = self
1421 : .timeline_handles
1422 : .take()
1423 : .expect("implementation error: timeline_handles should not be locked");
1424 :
1425 : let request_span = info_span!("request");
1426 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1427 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1428 : self.handle_pagerequests_pipelined(
1429 : pgb,
1430 : pgb_reader,
1431 : tenant_id,
1432 : timeline_id,
1433 : timeline_handles,
1434 : request_span,
1435 : pipelining_config,
1436 : protocol_version,
1437 : io_concurrency,
1438 : &ctx,
1439 : )
1440 : .await
1441 : }
1442 : PageServicePipeliningConfig::Serial => {
1443 : self.handle_pagerequests_serial(
1444 : pgb,
1445 : pgb_reader,
1446 : tenant_id,
1447 : timeline_id,
1448 : timeline_handles,
1449 : request_span,
1450 : protocol_version,
1451 : io_concurrency,
1452 : &ctx,
1453 : )
1454 : .await
1455 : }
1456 : };
1457 :
1458 : debug!("pagestream subprotocol shut down cleanly");
1459 :
1460 : pgb.unsplit(pgb_reader)
1461 : .context("implementation error: unsplit pgb")?;
1462 :
1463 : let replaced = self.timeline_handles.replace(timeline_handles);
1464 : assert!(replaced.is_none());
1465 :
1466 : result
1467 : }
1468 :
1469 : #[allow(clippy::too_many_arguments)]
1470 0 : async fn handle_pagerequests_serial<IO>(
1471 0 : &mut self,
1472 0 : pgb_writer: &mut PostgresBackend<IO>,
1473 0 : mut pgb_reader: PostgresBackendReader<IO>,
1474 0 : tenant_id: TenantId,
1475 0 : timeline_id: TimelineId,
1476 0 : mut timeline_handles: TimelineHandles,
1477 0 : request_span: Span,
1478 0 : protocol_version: PagestreamProtocolVersion,
1479 0 : io_concurrency: IoConcurrency,
1480 0 : ctx: &RequestContext,
1481 0 : ) -> (
1482 0 : (PostgresBackendReader<IO>, TimelineHandles),
1483 0 : Result<(), QueryError>,
1484 0 : )
1485 0 : where
1486 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1487 0 : {
1488 0 : let cancel = self.cancel.clone();
1489 0 : let err = loop {
1490 0 : let msg = Self::pagestream_read_message(
1491 0 : &mut pgb_reader,
1492 0 : tenant_id,
1493 0 : timeline_id,
1494 0 : &mut timeline_handles,
1495 0 : &cancel,
1496 0 : ctx,
1497 0 : protocol_version,
1498 0 : request_span.clone(),
1499 0 : )
1500 0 : .await;
1501 0 : let msg = match msg {
1502 0 : Ok(msg) => msg,
1503 0 : Err(e) => break e,
1504 : };
1505 0 : let msg = match msg {
1506 0 : Some(msg) => msg,
1507 : None => {
1508 0 : debug!("pagestream subprotocol end observed");
1509 0 : return ((pgb_reader, timeline_handles), Ok(()));
1510 : }
1511 : };
1512 :
1513 0 : let result = self
1514 0 : .pagesteam_handle_batched_message(
1515 0 : pgb_writer,
1516 0 : msg,
1517 0 : io_concurrency.clone(),
1518 0 : &cancel,
1519 0 : protocol_version,
1520 0 : ctx,
1521 0 : )
1522 0 : .await;
1523 0 : match result {
1524 0 : Ok(()) => {}
1525 0 : Err(e) => break e,
1526 : }
1527 : };
1528 0 : ((pgb_reader, timeline_handles), Err(err))
1529 0 : }
1530 :
1531 : /// # Cancel-Safety
1532 : ///
1533 : /// May leak tokio tasks if not polled to completion.
1534 : #[allow(clippy::too_many_arguments)]
1535 0 : async fn handle_pagerequests_pipelined<IO>(
1536 0 : &mut self,
1537 0 : pgb_writer: &mut PostgresBackend<IO>,
1538 0 : pgb_reader: PostgresBackendReader<IO>,
1539 0 : tenant_id: TenantId,
1540 0 : timeline_id: TimelineId,
1541 0 : mut timeline_handles: TimelineHandles,
1542 0 : request_span: Span,
1543 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1544 0 : protocol_version: PagestreamProtocolVersion,
1545 0 : io_concurrency: IoConcurrency,
1546 0 : ctx: &RequestContext,
1547 0 : ) -> (
1548 0 : (PostgresBackendReader<IO>, TimelineHandles),
1549 0 : Result<(), QueryError>,
1550 0 : )
1551 0 : where
1552 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1553 0 : {
1554 0 : //
1555 0 : // Pipelined pagestream handling consists of
1556 0 : // - a Batcher that reads requests off the wire and
1557 0 : // and batches them if possible,
1558 0 : // - an Executor that processes the batched requests.
1559 0 : //
1560 0 : // The batch is built up inside an `spsc_fold` channel,
1561 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1562 0 : //
1563 0 : // The Batcher continously folds client requests into the batch,
1564 0 : // while the Executor can at any time take out what's in the batch
1565 0 : // in order to process it.
1566 0 : // This means the next batch builds up while the Executor
1567 0 : // executes the last batch.
1568 0 : //
1569 0 : // CANCELLATION
1570 0 : //
1571 0 : // We run both Batcher and Executor futures to completion before
1572 0 : // returning from this function.
1573 0 : //
1574 0 : // If Executor exits first, it signals cancellation to the Batcher
1575 0 : // via a CancellationToken that is child of `self.cancel`.
1576 0 : // If Batcher exits first, it signals cancellation to the Executor
1577 0 : // by dropping the spsc_fold channel Sender.
1578 0 : //
1579 0 : // CLEAN SHUTDOWN
1580 0 : //
1581 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1582 0 : // In response to such a client message, the Batcher exits.
1583 0 : // The Executor continues to run, draining the spsc_fold channel.
1584 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1585 0 : // indicating that the sender disconnected.
1586 0 : // The Executor exits with Ok(()) in response to that error.
1587 0 : //
1588 0 : // Server initiated shutdown is not clean shutdown, but instead
1589 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1590 0 : // error propagation.
1591 0 : //
1592 0 : // ERROR PROPAGATION
1593 0 : //
1594 0 : // When the Batcher encounter an error, it sends it as a value
1595 0 : // through the spsc_fold channel and exits afterwards.
1596 0 : // When the Executor observes such an error in the channel,
1597 0 : // it exits returning that error value.
1598 0 : //
1599 0 : // This design ensures that the Executor stage will still process
1600 0 : // the batch that was in flight when the Batcher encountered an error,
1601 0 : // thereby beahving identical to a serial implementation.
1602 0 :
1603 0 : let PageServicePipeliningConfigPipelined {
1604 0 : max_batch_size,
1605 0 : execution,
1606 0 : } = pipelining_config;
1607 :
1608 : // Macro to _define_ a pipeline stage.
1609 : macro_rules! pipeline_stage {
1610 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1611 : let cancel: CancellationToken = $cancel;
1612 : let stage_fut = $make_fut(cancel.clone());
1613 0 : async move {
1614 0 : scopeguard::defer! {
1615 0 : debug!("exiting");
1616 0 : }
1617 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1618 0 : .await
1619 0 : }
1620 : .instrument(tracing::info_span!($name))
1621 : }};
1622 : }
1623 :
1624 : //
1625 : // Batcher
1626 : //
1627 :
1628 0 : let cancel_batcher = self.cancel.child_token();
1629 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1630 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1631 0 : let ctx = ctx.attached_child();
1632 0 : async move {
1633 0 : let mut pgb_reader = pgb_reader;
1634 0 : let mut exit = false;
1635 0 : while !exit {
1636 0 : let read_res = Self::pagestream_read_message(
1637 0 : &mut pgb_reader,
1638 0 : tenant_id,
1639 0 : timeline_id,
1640 0 : &mut timeline_handles,
1641 0 : &cancel_batcher,
1642 0 : &ctx,
1643 0 : protocol_version,
1644 0 : request_span.clone(),
1645 0 : )
1646 0 : .await;
1647 0 : let Some(read_res) = read_res.transpose() else {
1648 0 : debug!("client-initiated shutdown");
1649 0 : break;
1650 : };
1651 0 : exit |= read_res.is_err();
1652 0 : let could_send = batch_tx
1653 0 : .send(read_res, |batch, res| {
1654 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1655 0 : })
1656 0 : .await;
1657 0 : exit |= could_send.is_err();
1658 : }
1659 0 : (pgb_reader, timeline_handles)
1660 0 : }
1661 0 : });
1662 :
1663 : //
1664 : // Executor
1665 : //
1666 :
1667 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1668 0 : let ctx = ctx.attached_child();
1669 0 : async move {
1670 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1671 : loop {
1672 0 : let maybe_batch = batch_rx.recv().await;
1673 0 : let batch = match maybe_batch {
1674 0 : Ok(batch) => batch,
1675 : Err(spsc_fold::RecvError::SenderGone) => {
1676 0 : debug!("upstream gone");
1677 0 : return Ok(());
1678 : }
1679 : };
1680 0 : let batch = match batch {
1681 0 : Ok(batch) => batch,
1682 0 : Err(e) => {
1683 0 : return Err(e);
1684 : }
1685 : };
1686 0 : self.pagesteam_handle_batched_message(
1687 0 : pgb_writer,
1688 0 : batch,
1689 0 : io_concurrency.clone(),
1690 0 : &cancel,
1691 0 : protocol_version,
1692 0 : &ctx,
1693 0 : )
1694 0 : .await?;
1695 : }
1696 0 : }
1697 0 : });
1698 :
1699 : //
1700 : // Execute the stages.
1701 : //
1702 :
1703 0 : match execution {
1704 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1705 0 : tokio::join!(batcher, executor)
1706 : }
1707 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1708 : // These tasks are not tracked anywhere.
1709 0 : let read_messages_task = tokio::spawn(batcher);
1710 0 : let (read_messages_task_res, executor_res_) =
1711 0 : tokio::join!(read_messages_task, executor,);
1712 0 : (
1713 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1714 0 : executor_res_,
1715 0 : )
1716 : }
1717 : }
1718 0 : }
1719 :
1720 : /// Helper function to handle the LSN from client request.
1721 : ///
1722 : /// Each GetPage (and Exists and Nblocks) request includes information about
1723 : /// which version of the page is being requested. The primary compute node
1724 : /// will always request the latest page version, by setting 'request_lsn' to
1725 : /// the last inserted or flushed WAL position, while a standby will request
1726 : /// a version at the LSN that it's currently caught up to.
1727 : ///
1728 : /// In either case, if the page server hasn't received the WAL up to the
1729 : /// requested LSN yet, we will wait for it to arrive. The return value is
1730 : /// the LSN that should be used to look up the page versions.
1731 : ///
1732 : /// In addition to the request LSN, each request carries another LSN,
1733 : /// 'not_modified_since', which is a hint to the pageserver that the client
1734 : /// knows that the page has not been modified between 'not_modified_since'
1735 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1736 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1737 : /// information about when the page was modified, it will use
1738 : /// not_modified_since == lsn. If the client lies and sends a too low
1739 : /// not_modified_hint such that there are in fact later page versions, the
1740 : /// behavior is undefined: the pageserver may return any of the page versions
1741 : /// or an error.
1742 0 : async fn wait_or_get_last_lsn(
1743 0 : timeline: &Timeline,
1744 0 : request_lsn: Lsn,
1745 0 : not_modified_since: Lsn,
1746 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1747 0 : ctx: &RequestContext,
1748 0 : ) -> Result<Lsn, PageStreamError> {
1749 0 : let last_record_lsn = timeline.get_last_record_lsn();
1750 0 :
1751 0 : // Sanity check the request
1752 0 : if request_lsn < not_modified_since {
1753 0 : return Err(PageStreamError::BadRequest(
1754 0 : format!(
1755 0 : "invalid request with request LSN {} and not_modified_since {}",
1756 0 : request_lsn, not_modified_since,
1757 0 : )
1758 0 : .into(),
1759 0 : ));
1760 0 : }
1761 0 :
1762 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1763 0 : if request_lsn == Lsn::INVALID {
1764 0 : return Err(PageStreamError::BadRequest(
1765 0 : "invalid LSN(0) in request".into(),
1766 0 : ));
1767 0 : }
1768 0 :
1769 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1770 0 : //
1771 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1772 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1773 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1774 0 : let gc_info = &timeline.gc_info.read().unwrap();
1775 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
1776 0 : return Err(
1777 0 : PageStreamError::BadRequest(format!(
1778 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1779 0 : request_lsn, **latest_gc_cutoff_lsn
1780 0 : ).into())
1781 0 : );
1782 0 : }
1783 0 : }
1784 :
1785 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1786 0 : if not_modified_since > last_record_lsn {
1787 0 : timeline
1788 0 : .wait_lsn(
1789 0 : not_modified_since,
1790 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1791 0 : timeline::WaitLsnTimeout::Default,
1792 0 : ctx,
1793 0 : )
1794 0 : .await?;
1795 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1796 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1797 : // advance immediately after we return anyway)
1798 0 : Ok(not_modified_since)
1799 : } else {
1800 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1801 : // here instead. That would give the same result, since we know that there
1802 : // haven't been any modifications since 'not_modified_since'. Using an older
1803 : // LSN might be faster, because that could allow skipping recent layers when
1804 : // finding the page. However, we have historically used 'last_record_lsn', so
1805 : // stick to that for now.
1806 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1807 : }
1808 0 : }
1809 :
1810 : /// Handles the lsn lease request.
1811 : /// If a lease cannot be obtained, the client will receive NULL.
1812 : #[instrument(skip_all, fields(shard_id, %lsn))]
1813 : async fn handle_make_lsn_lease<IO>(
1814 : &mut self,
1815 : pgb: &mut PostgresBackend<IO>,
1816 : tenant_shard_id: TenantShardId,
1817 : timeline_id: TimelineId,
1818 : lsn: Lsn,
1819 : ctx: &RequestContext,
1820 : ) -> Result<(), QueryError>
1821 : where
1822 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1823 : {
1824 : let timeline = self
1825 : .timeline_handles
1826 : .as_mut()
1827 : .unwrap()
1828 : .get(
1829 : tenant_shard_id.tenant_id,
1830 : timeline_id,
1831 : ShardSelector::Known(tenant_shard_id.to_index()),
1832 : )
1833 : .await?;
1834 : set_tracing_field_shard_id(&timeline);
1835 :
1836 : let lease = timeline
1837 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1838 0 : .inspect_err(|e| {
1839 0 : warn!("{e}");
1840 0 : })
1841 : .ok();
1842 0 : let valid_until_str = lease.map(|l| {
1843 0 : l.valid_until
1844 0 : .duration_since(SystemTime::UNIX_EPOCH)
1845 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1846 0 : .as_millis()
1847 0 : .to_string()
1848 0 : });
1849 :
1850 : info!(
1851 : "acquired lease for {} until {}",
1852 : lsn,
1853 : valid_until_str.as_deref().unwrap_or("<unknown>")
1854 : );
1855 :
1856 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1857 :
1858 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1859 : b"valid_until",
1860 : )]))?
1861 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1862 :
1863 : Ok(())
1864 : }
1865 :
1866 : #[instrument(skip_all, fields(shard_id))]
1867 : async fn handle_get_rel_exists_request(
1868 : &mut self,
1869 : timeline: &Timeline,
1870 : req: &PagestreamExistsRequest,
1871 : ctx: &RequestContext,
1872 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1873 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1874 : let lsn = Self::wait_or_get_last_lsn(
1875 : timeline,
1876 : req.hdr.request_lsn,
1877 : req.hdr.not_modified_since,
1878 : &latest_gc_cutoff_lsn,
1879 : ctx,
1880 : )
1881 : .await?;
1882 :
1883 : let exists = timeline
1884 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1885 : .await?;
1886 :
1887 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1888 : req: *req,
1889 : exists,
1890 : }))
1891 : }
1892 :
1893 : #[instrument(skip_all, fields(shard_id))]
1894 : async fn handle_get_nblocks_request(
1895 : &mut self,
1896 : timeline: &Timeline,
1897 : req: &PagestreamNblocksRequest,
1898 : ctx: &RequestContext,
1899 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1900 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1901 : let lsn = Self::wait_or_get_last_lsn(
1902 : timeline,
1903 : req.hdr.request_lsn,
1904 : req.hdr.not_modified_since,
1905 : &latest_gc_cutoff_lsn,
1906 : ctx,
1907 : )
1908 : .await?;
1909 :
1910 : let n_blocks = timeline
1911 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1912 : .await?;
1913 :
1914 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1915 : req: *req,
1916 : n_blocks,
1917 : }))
1918 : }
1919 :
1920 : #[instrument(skip_all, fields(shard_id))]
1921 : async fn handle_db_size_request(
1922 : &mut self,
1923 : timeline: &Timeline,
1924 : req: &PagestreamDbSizeRequest,
1925 : ctx: &RequestContext,
1926 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1927 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1928 : let lsn = Self::wait_or_get_last_lsn(
1929 : timeline,
1930 : req.hdr.request_lsn,
1931 : req.hdr.not_modified_since,
1932 : &latest_gc_cutoff_lsn,
1933 : ctx,
1934 : )
1935 : .await?;
1936 :
1937 : let total_blocks = timeline
1938 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1939 : .await?;
1940 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1941 :
1942 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1943 : req: *req,
1944 : db_size,
1945 : }))
1946 : }
1947 :
1948 : #[instrument(skip_all)]
1949 : async fn handle_get_page_at_lsn_request_batched(
1950 : &mut self,
1951 : timeline: &Timeline,
1952 : effective_lsn: Lsn,
1953 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
1954 : io_concurrency: IoConcurrency,
1955 : ctx: &RequestContext,
1956 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1957 : debug_assert_current_span_has_tenant_and_timeline_id();
1958 :
1959 : timeline
1960 : .query_metrics
1961 : .observe_getpage_batch_start(requests.len());
1962 :
1963 : // If a page trace is running, submit an event for this request.
1964 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
1965 : let time = SystemTime::now();
1966 : for batch in &requests {
1967 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
1968 : // Ignore error (trace buffer may be full or tracer may have disconnected).
1969 : _ = page_trace.try_send(PageTraceEvent {
1970 : key,
1971 : effective_lsn,
1972 : time,
1973 : });
1974 : }
1975 : }
1976 :
1977 : let results = timeline
1978 : .get_rel_page_at_lsn_batched(
1979 0 : requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
1980 : effective_lsn,
1981 : io_concurrency,
1982 : ctx,
1983 : )
1984 : .await;
1985 : assert_eq!(results.len(), requests.len());
1986 :
1987 : // TODO: avoid creating the new Vec here
1988 : Vec::from_iter(
1989 : requests
1990 : .into_iter()
1991 : .zip(results.into_iter())
1992 0 : .map(|(req, res)| {
1993 0 : res.map(|page| {
1994 0 : (
1995 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
1996 0 : req: req.req,
1997 0 : page,
1998 0 : }),
1999 0 : req.timer,
2000 0 : )
2001 0 : })
2002 0 : .map_err(|e| BatchedPageStreamError {
2003 0 : err: PageStreamError::from(e),
2004 0 : req: req.req.hdr,
2005 0 : })
2006 0 : }),
2007 : )
2008 : }
2009 :
2010 : #[instrument(skip_all, fields(shard_id))]
2011 : async fn handle_get_slru_segment_request(
2012 : &mut self,
2013 : timeline: &Timeline,
2014 : req: &PagestreamGetSlruSegmentRequest,
2015 : ctx: &RequestContext,
2016 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2017 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2018 : let lsn = Self::wait_or_get_last_lsn(
2019 : timeline,
2020 : req.hdr.request_lsn,
2021 : req.hdr.not_modified_since,
2022 : &latest_gc_cutoff_lsn,
2023 : ctx,
2024 : )
2025 : .await?;
2026 :
2027 : let kind = SlruKind::from_repr(req.kind)
2028 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2029 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2030 :
2031 : Ok(PagestreamBeMessage::GetSlruSegment(
2032 : PagestreamGetSlruSegmentResponse { req: *req, segment },
2033 : ))
2034 : }
2035 :
2036 : // NB: this impl mimics what we do for batched getpage requests.
2037 : #[cfg(feature = "testing")]
2038 : #[instrument(skip_all, fields(shard_id))]
2039 : async fn handle_test_request_batch(
2040 : &mut self,
2041 : timeline: &Timeline,
2042 : requests: Vec<BatchedTestRequest>,
2043 : _ctx: &RequestContext,
2044 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2045 : // real requests would do something with the timeline
2046 : let mut results = Vec::with_capacity(requests.len());
2047 : for _req in requests.iter() {
2048 : tokio::task::yield_now().await;
2049 :
2050 : results.push({
2051 : if timeline.cancel.is_cancelled() {
2052 : Err(PageReconstructError::Cancelled)
2053 : } else {
2054 : Ok(())
2055 : }
2056 : });
2057 : }
2058 :
2059 : // TODO: avoid creating the new Vec here
2060 : Vec::from_iter(
2061 : requests
2062 : .into_iter()
2063 : .zip(results.into_iter())
2064 0 : .map(|(req, res)| {
2065 0 : res.map(|()| {
2066 0 : (
2067 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2068 0 : req: req.req.clone(),
2069 0 : }),
2070 0 : req.timer,
2071 0 : )
2072 0 : })
2073 0 : .map_err(|e| BatchedPageStreamError {
2074 0 : err: PageStreamError::from(e),
2075 0 : req: req.req.hdr,
2076 0 : })
2077 0 : }),
2078 : )
2079 : }
2080 :
2081 : /// Note on "fullbackup":
2082 : /// Full basebackups should only be used for debugging purposes.
2083 : /// Originally, it was introduced to enable breaking storage format changes,
2084 : /// but that is not applicable anymore.
2085 : ///
2086 : /// # Coding Discipline
2087 : ///
2088 : /// Coding discipline within this function: all interaction with the `pgb` connection
2089 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2090 : /// This is so that we can shutdown page_service quickly.
2091 : ///
2092 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2093 : /// to connection cancellation.
2094 : #[allow(clippy::too_many_arguments)]
2095 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2096 : async fn handle_basebackup_request<IO>(
2097 : &mut self,
2098 : pgb: &mut PostgresBackend<IO>,
2099 : tenant_id: TenantId,
2100 : timeline_id: TimelineId,
2101 : lsn: Option<Lsn>,
2102 : prev_lsn: Option<Lsn>,
2103 : full_backup: bool,
2104 : gzip: bool,
2105 : replica: bool,
2106 : ctx: &RequestContext,
2107 : ) -> Result<(), QueryError>
2108 : where
2109 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2110 : {
2111 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2112 0 : match err {
2113 : // TODO: passthrough the error site to the final error message?
2114 0 : BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
2115 0 : BasebackupError::Server(e) => QueryError::Other(e),
2116 0 : BasebackupError::Shutdown => QueryError::Shutdown,
2117 : }
2118 0 : }
2119 :
2120 : let started = std::time::Instant::now();
2121 :
2122 : let timeline = self
2123 : .timeline_handles
2124 : .as_mut()
2125 : .unwrap()
2126 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2127 : .await?;
2128 : set_tracing_field_shard_id(&timeline);
2129 :
2130 : if timeline.is_archived() == Some(true) {
2131 : tracing::info!(
2132 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2133 : );
2134 : return Err(QueryError::NotFound("timeline is archived".into()));
2135 : }
2136 :
2137 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2138 : if let Some(lsn) = lsn {
2139 : // Backup was requested at a particular LSN. Wait for it to arrive.
2140 : info!("waiting for {}", lsn);
2141 : timeline
2142 : .wait_lsn(
2143 : lsn,
2144 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2145 : crate::tenant::timeline::WaitLsnTimeout::Default,
2146 : ctx,
2147 : )
2148 : .await?;
2149 : timeline
2150 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2151 : .context("invalid basebackup lsn")?;
2152 : }
2153 :
2154 : let lsn_awaited_after = started.elapsed();
2155 :
2156 : // switch client to COPYOUT
2157 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2158 : .map_err(QueryError::Disconnected)?;
2159 : self.flush_cancellable(pgb, &self.cancel).await?;
2160 :
2161 : // Send a tarball of the latest layer on the timeline. Compress if not
2162 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2163 : if full_backup {
2164 : let mut writer = pgb.copyout_writer();
2165 : basebackup::send_basebackup_tarball(
2166 : &mut writer,
2167 : &timeline,
2168 : lsn,
2169 : prev_lsn,
2170 : full_backup,
2171 : replica,
2172 : ctx,
2173 : )
2174 : .await
2175 : .map_err(map_basebackup_error)?;
2176 : } else {
2177 : let mut writer = BufWriter::new(pgb.copyout_writer());
2178 : if gzip {
2179 : let mut encoder = GzipEncoder::with_quality(
2180 : &mut writer,
2181 : // NOTE using fast compression because it's on the critical path
2182 : // for compute startup. For an empty database, we get
2183 : // <100KB with this method. The Level::Best compression method
2184 : // gives us <20KB, but maybe we should add basebackup caching
2185 : // on compute shutdown first.
2186 : async_compression::Level::Fastest,
2187 : );
2188 : basebackup::send_basebackup_tarball(
2189 : &mut encoder,
2190 : &timeline,
2191 : lsn,
2192 : prev_lsn,
2193 : full_backup,
2194 : replica,
2195 : ctx,
2196 : )
2197 : .await
2198 : .map_err(map_basebackup_error)?;
2199 : // shutdown the encoder to ensure the gzip footer is written
2200 : encoder
2201 : .shutdown()
2202 : .await
2203 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2204 : } else {
2205 : basebackup::send_basebackup_tarball(
2206 : &mut writer,
2207 : &timeline,
2208 : lsn,
2209 : prev_lsn,
2210 : full_backup,
2211 : replica,
2212 : ctx,
2213 : )
2214 : .await
2215 : .map_err(map_basebackup_error)?;
2216 : }
2217 0 : writer.flush().await.map_err(|e| {
2218 0 : map_basebackup_error(BasebackupError::Client(
2219 0 : e,
2220 0 : "handle_basebackup_request,flush",
2221 0 : ))
2222 0 : })?;
2223 : }
2224 :
2225 : pgb.write_message_noflush(&BeMessage::CopyDone)
2226 : .map_err(QueryError::Disconnected)?;
2227 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2228 :
2229 : let basebackup_after = started
2230 : .elapsed()
2231 : .checked_sub(lsn_awaited_after)
2232 : .unwrap_or(Duration::ZERO);
2233 :
2234 : info!(
2235 : lsn_await_millis = lsn_awaited_after.as_millis(),
2236 : basebackup_millis = basebackup_after.as_millis(),
2237 : "basebackup complete"
2238 : );
2239 :
2240 : Ok(())
2241 : }
2242 :
2243 : // when accessing management api supply None as an argument
2244 : // when using to authorize tenant pass corresponding tenant id
2245 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2246 0 : if self.auth.is_none() {
2247 : // auth is set to Trust, nothing to check so just return ok
2248 0 : return Ok(());
2249 0 : }
2250 0 : // auth is some, just checked above, when auth is some
2251 0 : // then claims are always present because of checks during connection init
2252 0 : // so this expect won't trigger
2253 0 : let claims = self
2254 0 : .claims
2255 0 : .as_ref()
2256 0 : .expect("claims presence already checked");
2257 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2258 0 : }
2259 : }
2260 :
2261 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2262 : #[derive(Debug, Clone, Eq, PartialEq)]
2263 : struct BaseBackupCmd {
2264 : tenant_id: TenantId,
2265 : timeline_id: TimelineId,
2266 : lsn: Option<Lsn>,
2267 : gzip: bool,
2268 : replica: bool,
2269 : }
2270 :
2271 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2272 : #[derive(Debug, Clone, Eq, PartialEq)]
2273 : struct FullBackupCmd {
2274 : tenant_id: TenantId,
2275 : timeline_id: TimelineId,
2276 : lsn: Option<Lsn>,
2277 : prev_lsn: Option<Lsn>,
2278 : }
2279 :
2280 : /// `pagestream_v2 tenant timeline`
2281 : #[derive(Debug, Clone, Eq, PartialEq)]
2282 : struct PageStreamCmd {
2283 : tenant_id: TenantId,
2284 : timeline_id: TimelineId,
2285 : protocol_version: PagestreamProtocolVersion,
2286 : }
2287 :
2288 : /// `lease lsn tenant timeline lsn`
2289 : #[derive(Debug, Clone, Eq, PartialEq)]
2290 : struct LeaseLsnCmd {
2291 : tenant_shard_id: TenantShardId,
2292 : timeline_id: TimelineId,
2293 : lsn: Lsn,
2294 : }
2295 :
2296 : #[derive(Debug, Clone, Eq, PartialEq)]
2297 : enum PageServiceCmd {
2298 : Set,
2299 : PageStream(PageStreamCmd),
2300 : BaseBackup(BaseBackupCmd),
2301 : FullBackup(FullBackupCmd),
2302 : LeaseLsn(LeaseLsnCmd),
2303 : }
2304 :
2305 : impl PageStreamCmd {
2306 12 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2307 12 : let parameters = query.split_whitespace().collect_vec();
2308 12 : if parameters.len() != 2 {
2309 4 : bail!(
2310 4 : "invalid number of parameters for pagestream command: {}",
2311 4 : query
2312 4 : );
2313 8 : }
2314 8 : let tenant_id = TenantId::from_str(parameters[0])
2315 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2316 4 : let timeline_id = TimelineId::from_str(parameters[1])
2317 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2318 4 : Ok(Self {
2319 4 : tenant_id,
2320 4 : timeline_id,
2321 4 : protocol_version,
2322 4 : })
2323 12 : }
2324 : }
2325 :
2326 : impl FullBackupCmd {
2327 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2328 8 : let parameters = query.split_whitespace().collect_vec();
2329 8 : if parameters.len() < 2 || parameters.len() > 4 {
2330 0 : bail!(
2331 0 : "invalid number of parameters for basebackup command: {}",
2332 0 : query
2333 0 : );
2334 8 : }
2335 8 : let tenant_id = TenantId::from_str(parameters[0])
2336 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2337 8 : let timeline_id = TimelineId::from_str(parameters[1])
2338 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2339 : // The caller is responsible for providing correct lsn and prev_lsn.
2340 8 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2341 : Some(
2342 4 : Lsn::from_str(lsn_str)
2343 4 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2344 : )
2345 : } else {
2346 4 : None
2347 : };
2348 8 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2349 : Some(
2350 4 : Lsn::from_str(prev_lsn_str)
2351 4 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2352 : )
2353 : } else {
2354 4 : None
2355 : };
2356 8 : Ok(Self {
2357 8 : tenant_id,
2358 8 : timeline_id,
2359 8 : lsn,
2360 8 : prev_lsn,
2361 8 : })
2362 8 : }
2363 : }
2364 :
2365 : impl BaseBackupCmd {
2366 36 : fn parse(query: &str) -> anyhow::Result<Self> {
2367 36 : let parameters = query.split_whitespace().collect_vec();
2368 36 : if parameters.len() < 2 {
2369 0 : bail!(
2370 0 : "invalid number of parameters for basebackup command: {}",
2371 0 : query
2372 0 : );
2373 36 : }
2374 36 : let tenant_id = TenantId::from_str(parameters[0])
2375 36 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2376 36 : let timeline_id = TimelineId::from_str(parameters[1])
2377 36 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2378 : let lsn;
2379 : let flags_parse_from;
2380 36 : if let Some(maybe_lsn) = parameters.get(2) {
2381 32 : if *maybe_lsn == "latest" {
2382 4 : lsn = None;
2383 4 : flags_parse_from = 3;
2384 28 : } else if maybe_lsn.starts_with("--") {
2385 20 : lsn = None;
2386 20 : flags_parse_from = 2;
2387 20 : } else {
2388 : lsn = Some(
2389 8 : Lsn::from_str(maybe_lsn)
2390 8 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2391 : );
2392 8 : flags_parse_from = 3;
2393 : }
2394 4 : } else {
2395 4 : lsn = None;
2396 4 : flags_parse_from = 2;
2397 4 : }
2398 :
2399 36 : let mut gzip = false;
2400 36 : let mut replica = false;
2401 :
2402 44 : for ¶m in ¶meters[flags_parse_from..] {
2403 44 : match param {
2404 44 : "--gzip" => {
2405 28 : if gzip {
2406 4 : bail!("duplicate parameter for basebackup command: {param}")
2407 24 : }
2408 24 : gzip = true
2409 : }
2410 16 : "--replica" => {
2411 8 : if replica {
2412 0 : bail!("duplicate parameter for basebackup command: {param}")
2413 8 : }
2414 8 : replica = true
2415 : }
2416 8 : _ => bail!("invalid parameter for basebackup command: {param}"),
2417 : }
2418 : }
2419 24 : Ok(Self {
2420 24 : tenant_id,
2421 24 : timeline_id,
2422 24 : lsn,
2423 24 : gzip,
2424 24 : replica,
2425 24 : })
2426 36 : }
2427 : }
2428 :
2429 : impl LeaseLsnCmd {
2430 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2431 8 : let parameters = query.split_whitespace().collect_vec();
2432 8 : if parameters.len() != 3 {
2433 0 : bail!(
2434 0 : "invalid number of parameters for lease lsn command: {}",
2435 0 : query
2436 0 : );
2437 8 : }
2438 8 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2439 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2440 8 : let timeline_id = TimelineId::from_str(parameters[1])
2441 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2442 8 : let lsn = Lsn::from_str(parameters[2])
2443 8 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2444 8 : Ok(Self {
2445 8 : tenant_shard_id,
2446 8 : timeline_id,
2447 8 : lsn,
2448 8 : })
2449 8 : }
2450 : }
2451 :
2452 : impl PageServiceCmd {
2453 84 : fn parse(query: &str) -> anyhow::Result<Self> {
2454 84 : let query = query.trim();
2455 84 : let Some((cmd, other)) = query.split_once(' ') else {
2456 8 : bail!("cannot parse query: {query}")
2457 : };
2458 76 : match cmd.to_ascii_lowercase().as_str() {
2459 76 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2460 12 : other,
2461 12 : PagestreamProtocolVersion::V2,
2462 12 : )?)),
2463 64 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2464 0 : other,
2465 0 : PagestreamProtocolVersion::V3,
2466 0 : )?)),
2467 64 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2468 28 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2469 20 : "lease" => {
2470 12 : let Some((cmd2, other)) = other.split_once(' ') else {
2471 0 : bail!("invalid lease command: {cmd}");
2472 : };
2473 12 : let cmd2 = cmd2.to_ascii_lowercase();
2474 12 : if cmd2 == "lsn" {
2475 8 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2476 : } else {
2477 4 : bail!("invalid lease command: {cmd}");
2478 : }
2479 : }
2480 8 : "set" => Ok(Self::Set),
2481 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2482 : }
2483 84 : }
2484 : }
2485 :
2486 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2487 : where
2488 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2489 : {
2490 0 : fn check_auth_jwt(
2491 0 : &mut self,
2492 0 : _pgb: &mut PostgresBackend<IO>,
2493 0 : jwt_response: &[u8],
2494 0 : ) -> Result<(), QueryError> {
2495 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2496 : // which requires auth to be present
2497 0 : let data = self
2498 0 : .auth
2499 0 : .as_ref()
2500 0 : .unwrap()
2501 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2502 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2503 :
2504 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2505 0 : return Err(QueryError::Unauthorized(
2506 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2507 0 : ));
2508 0 : }
2509 0 :
2510 0 : debug!(
2511 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2512 : data.claims.scope, data.claims.tenant_id,
2513 : );
2514 :
2515 0 : self.claims = Some(data.claims);
2516 0 : Ok(())
2517 0 : }
2518 :
2519 0 : fn startup(
2520 0 : &mut self,
2521 0 : _pgb: &mut PostgresBackend<IO>,
2522 0 : sm: &FeStartupPacket,
2523 0 : ) -> Result<(), QueryError> {
2524 0 : fail::fail_point!("ps::connection-start::startup-packet");
2525 :
2526 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
2527 0 : if let Some(app_name) = params.get("application_name") {
2528 0 : Span::current().record("application_name", field::display(app_name));
2529 0 : }
2530 0 : };
2531 :
2532 0 : Ok(())
2533 0 : }
2534 :
2535 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2536 : async fn process_query(
2537 : &mut self,
2538 : pgb: &mut PostgresBackend<IO>,
2539 : query_string: &str,
2540 : ) -> Result<(), QueryError> {
2541 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2542 0 : info!("Hit failpoint for bad connection");
2543 0 : Err(QueryError::SimulatedConnectionError)
2544 0 : });
2545 :
2546 : fail::fail_point!("ps::connection-start::process-query");
2547 :
2548 : let ctx = self.connection_ctx.attached_child();
2549 : debug!("process query {query_string}");
2550 : let query = PageServiceCmd::parse(query_string)?;
2551 : match query {
2552 : PageServiceCmd::PageStream(PageStreamCmd {
2553 : tenant_id,
2554 : timeline_id,
2555 : protocol_version,
2556 : }) => {
2557 : tracing::Span::current()
2558 : .record("tenant_id", field::display(tenant_id))
2559 : .record("timeline_id", field::display(timeline_id));
2560 :
2561 : self.check_permission(Some(tenant_id))?;
2562 : let command_kind = match protocol_version {
2563 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2564 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2565 : };
2566 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2567 :
2568 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2569 : .await?;
2570 : }
2571 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2572 : tenant_id,
2573 : timeline_id,
2574 : lsn,
2575 : gzip,
2576 : replica,
2577 : }) => {
2578 : tracing::Span::current()
2579 : .record("tenant_id", field::display(tenant_id))
2580 : .record("timeline_id", field::display(timeline_id));
2581 :
2582 : self.check_permission(Some(tenant_id))?;
2583 :
2584 : COMPUTE_COMMANDS_COUNTERS
2585 : .for_command(ComputeCommandKind::Basebackup)
2586 : .inc();
2587 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2588 0 : let res = async {
2589 0 : self.handle_basebackup_request(
2590 0 : pgb,
2591 0 : tenant_id,
2592 0 : timeline_id,
2593 0 : lsn,
2594 0 : None,
2595 0 : false,
2596 0 : gzip,
2597 0 : replica,
2598 0 : &ctx,
2599 0 : )
2600 0 : .await?;
2601 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2602 0 : Result::<(), QueryError>::Ok(())
2603 0 : }
2604 : .await;
2605 : metric_recording.observe(&res);
2606 : res?;
2607 : }
2608 : // same as basebackup, but result includes relational data as well
2609 : PageServiceCmd::FullBackup(FullBackupCmd {
2610 : tenant_id,
2611 : timeline_id,
2612 : lsn,
2613 : prev_lsn,
2614 : }) => {
2615 : tracing::Span::current()
2616 : .record("tenant_id", field::display(tenant_id))
2617 : .record("timeline_id", field::display(timeline_id));
2618 :
2619 : self.check_permission(Some(tenant_id))?;
2620 :
2621 : COMPUTE_COMMANDS_COUNTERS
2622 : .for_command(ComputeCommandKind::Fullbackup)
2623 : .inc();
2624 :
2625 : // Check that the timeline exists
2626 : self.handle_basebackup_request(
2627 : pgb,
2628 : tenant_id,
2629 : timeline_id,
2630 : lsn,
2631 : prev_lsn,
2632 : true,
2633 : false,
2634 : false,
2635 : &ctx,
2636 : )
2637 : .await?;
2638 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2639 : }
2640 : PageServiceCmd::Set => {
2641 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2642 : // on connect
2643 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2644 : }
2645 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2646 : tenant_shard_id,
2647 : timeline_id,
2648 : lsn,
2649 : }) => {
2650 : tracing::Span::current()
2651 : .record("tenant_id", field::display(tenant_shard_id))
2652 : .record("timeline_id", field::display(timeline_id));
2653 :
2654 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2655 :
2656 : COMPUTE_COMMANDS_COUNTERS
2657 : .for_command(ComputeCommandKind::LeaseLsn)
2658 : .inc();
2659 :
2660 : match self
2661 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2662 : .await
2663 : {
2664 : Ok(()) => {
2665 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2666 : }
2667 : Err(e) => {
2668 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2669 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2670 : &e.to_string(),
2671 : Some(e.pg_error_code()),
2672 : ))?
2673 : }
2674 : };
2675 : }
2676 : }
2677 :
2678 : Ok(())
2679 : }
2680 : }
2681 :
2682 : impl From<GetActiveTenantError> for QueryError {
2683 0 : fn from(e: GetActiveTenantError) -> Self {
2684 0 : match e {
2685 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2686 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2687 0 : ),
2688 : GetActiveTenantError::Cancelled
2689 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2690 0 : QueryError::Shutdown
2691 : }
2692 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2693 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2694 : }
2695 0 : }
2696 : }
2697 :
2698 : #[derive(Debug, thiserror::Error)]
2699 : pub(crate) enum GetActiveTimelineError {
2700 : #[error(transparent)]
2701 : Tenant(GetActiveTenantError),
2702 : #[error(transparent)]
2703 : Timeline(#[from] GetTimelineError),
2704 : }
2705 :
2706 : impl From<GetActiveTimelineError> for QueryError {
2707 0 : fn from(e: GetActiveTimelineError) -> Self {
2708 0 : match e {
2709 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2710 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2711 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2712 : }
2713 0 : }
2714 : }
2715 :
2716 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
2717 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
2718 0 : match e {
2719 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
2720 0 : }
2721 0 : }
2722 : }
2723 :
2724 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2725 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2726 0 : tracing::Span::current().record(
2727 0 : "shard_id",
2728 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2729 0 : );
2730 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2731 0 : }
2732 :
2733 : struct WaitedForLsn(Lsn);
2734 : impl From<WaitedForLsn> for Lsn {
2735 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2736 0 : lsn
2737 0 : }
2738 : }
2739 :
2740 : #[cfg(test)]
2741 : mod tests {
2742 : use utils::shard::ShardCount;
2743 :
2744 : use super::*;
2745 :
2746 : #[test]
2747 4 : fn pageservice_cmd_parse() {
2748 4 : let tenant_id = TenantId::generate();
2749 4 : let timeline_id = TimelineId::generate();
2750 4 : let cmd =
2751 4 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2752 4 : assert_eq!(
2753 4 : cmd,
2754 4 : PageServiceCmd::PageStream(PageStreamCmd {
2755 4 : tenant_id,
2756 4 : timeline_id,
2757 4 : protocol_version: PagestreamProtocolVersion::V2,
2758 4 : })
2759 4 : );
2760 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2761 4 : assert_eq!(
2762 4 : cmd,
2763 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2764 4 : tenant_id,
2765 4 : timeline_id,
2766 4 : lsn: None,
2767 4 : gzip: false,
2768 4 : replica: false
2769 4 : })
2770 4 : );
2771 4 : let cmd =
2772 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2773 4 : assert_eq!(
2774 4 : cmd,
2775 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2776 4 : tenant_id,
2777 4 : timeline_id,
2778 4 : lsn: None,
2779 4 : gzip: true,
2780 4 : replica: false
2781 4 : })
2782 4 : );
2783 4 : let cmd =
2784 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2785 4 : assert_eq!(
2786 4 : cmd,
2787 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2788 4 : tenant_id,
2789 4 : timeline_id,
2790 4 : lsn: None,
2791 4 : gzip: false,
2792 4 : replica: false
2793 4 : })
2794 4 : );
2795 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2796 4 : .unwrap();
2797 4 : assert_eq!(
2798 4 : cmd,
2799 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2800 4 : tenant_id,
2801 4 : timeline_id,
2802 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2803 4 : gzip: false,
2804 4 : replica: false
2805 4 : })
2806 4 : );
2807 4 : let cmd = PageServiceCmd::parse(&format!(
2808 4 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2809 4 : ))
2810 4 : .unwrap();
2811 4 : assert_eq!(
2812 4 : cmd,
2813 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2814 4 : tenant_id,
2815 4 : timeline_id,
2816 4 : lsn: None,
2817 4 : gzip: true,
2818 4 : replica: true
2819 4 : })
2820 4 : );
2821 4 : let cmd = PageServiceCmd::parse(&format!(
2822 4 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2823 4 : ))
2824 4 : .unwrap();
2825 4 : assert_eq!(
2826 4 : cmd,
2827 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2828 4 : tenant_id,
2829 4 : timeline_id,
2830 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2831 4 : gzip: true,
2832 4 : replica: true
2833 4 : })
2834 4 : );
2835 4 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2836 4 : assert_eq!(
2837 4 : cmd,
2838 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2839 4 : tenant_id,
2840 4 : timeline_id,
2841 4 : lsn: None,
2842 4 : prev_lsn: None
2843 4 : })
2844 4 : );
2845 4 : let cmd = PageServiceCmd::parse(&format!(
2846 4 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2847 4 : ))
2848 4 : .unwrap();
2849 4 : assert_eq!(
2850 4 : cmd,
2851 4 : PageServiceCmd::FullBackup(FullBackupCmd {
2852 4 : tenant_id,
2853 4 : timeline_id,
2854 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2855 4 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2856 4 : })
2857 4 : );
2858 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2859 4 : let cmd = PageServiceCmd::parse(&format!(
2860 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2861 4 : ))
2862 4 : .unwrap();
2863 4 : assert_eq!(
2864 4 : cmd,
2865 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2866 4 : tenant_shard_id,
2867 4 : timeline_id,
2868 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2869 4 : })
2870 4 : );
2871 4 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2872 4 : let cmd = PageServiceCmd::parse(&format!(
2873 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2874 4 : ))
2875 4 : .unwrap();
2876 4 : assert_eq!(
2877 4 : cmd,
2878 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2879 4 : tenant_shard_id,
2880 4 : timeline_id,
2881 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2882 4 : })
2883 4 : );
2884 4 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2885 4 : assert_eq!(cmd, PageServiceCmd::Set);
2886 4 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2887 4 : assert_eq!(cmd, PageServiceCmd::Set);
2888 4 : }
2889 :
2890 : #[test]
2891 4 : fn pageservice_cmd_err_handling() {
2892 4 : let tenant_id = TenantId::generate();
2893 4 : let timeline_id = TimelineId::generate();
2894 4 : let cmd = PageServiceCmd::parse("unknown_command");
2895 4 : assert!(cmd.is_err());
2896 4 : let cmd = PageServiceCmd::parse("pagestream_v2");
2897 4 : assert!(cmd.is_err());
2898 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2899 4 : assert!(cmd.is_err());
2900 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2901 4 : assert!(cmd.is_err());
2902 4 : let cmd = PageServiceCmd::parse(&format!(
2903 4 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2904 4 : ));
2905 4 : assert!(cmd.is_err());
2906 4 : let cmd = PageServiceCmd::parse(&format!(
2907 4 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2908 4 : ));
2909 4 : assert!(cmd.is_err());
2910 4 : let cmd = PageServiceCmd::parse(&format!(
2911 4 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2912 4 : ));
2913 4 : assert!(cmd.is_err());
2914 4 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2915 4 : assert!(cmd.is_err());
2916 4 : }
2917 : }
|