Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::any::Any;
5 : use std::borrow::Cow;
6 : use std::num::NonZeroUsize;
7 : use std::os::fd::AsRawFd;
8 : use std::pin::Pin;
9 : use std::str::FromStr;
10 : use std::sync::Arc;
11 : use std::task::{Context, Poll};
12 : use std::time::{Duration, Instant, SystemTime};
13 : use std::{io, str};
14 :
15 : use anyhow::{Context as _, bail};
16 : use bytes::{Buf as _, BufMut as _, BytesMut};
17 : use chrono::Utc;
18 : use futures::future::BoxFuture;
19 : use futures::{FutureExt, Stream};
20 : use itertools::Itertools;
21 : use jsonwebtoken::TokenData;
22 : use once_cell::sync::OnceCell;
23 : use pageserver_api::config::{
24 : GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
25 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
26 : };
27 : use pageserver_api::key::rel_block_to_key;
28 : use pageserver_api::models::{PageTraceEvent, TenantState};
29 : use pageserver_api::pagestream_api::{
30 : self, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
31 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
32 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
33 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
34 : PagestreamProtocolVersion, PagestreamRequest,
35 : };
36 : use pageserver_api::reltag::SlruKind;
37 : use pageserver_api::shard::TenantShardId;
38 : use pageserver_page_api as page_api;
39 : use pageserver_page_api::proto;
40 : use postgres_backend::{
41 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
42 : };
43 : use postgres_ffi::BLCKSZ;
44 : use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
45 : use pq_proto::framed::ConnectionError;
46 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
47 : use smallvec::{SmallVec, smallvec};
48 : use strum_macros::IntoStaticStr;
49 : use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, BufWriter};
50 : use tokio::task::JoinHandle;
51 : use tokio_util::sync::CancellationToken;
52 : use tonic::service::Interceptor as _;
53 : use tonic::transport::server::TcpConnectInfo;
54 : use tracing::*;
55 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
56 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
57 : use utils::logging::log_slow;
58 : use utils::lsn::Lsn;
59 : use utils::shard::ShardIndex;
60 : use utils::simple_rcu::RcuReadGuard;
61 : use utils::sync::gate::{Gate, GateGuard};
62 : use utils::sync::spsc_fold;
63 : use utils::{failpoint_support, span_record};
64 :
65 : use crate::auth::check_permission;
66 : use crate::basebackup::{self, BasebackupError};
67 : use crate::config::PageServerConf;
68 : use crate::context::{
69 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
70 : };
71 : use crate::feature_resolver::FeatureResolver;
72 : use crate::metrics::{
73 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
74 : MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
75 : };
76 : use crate::pgdatadir_mapping::{LsnRange, Version};
77 : use crate::span::{
78 : debug_assert_current_span_has_tenant_and_timeline_id,
79 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
80 : };
81 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
82 : use crate::tenant::mgr::{
83 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
84 : };
85 : use crate::tenant::storage_layer::IoConcurrency;
86 : use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle};
87 : use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter};
88 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
89 : use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
90 :
91 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
92 : /// is not yet in state [`TenantState::Active`].
93 : ///
94 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
95 : /// HADRON: reduced timeout and we will retry in Cache::get().
96 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
97 :
98 : /// Threshold at which to log slow GetPage requests.
99 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
100 :
101 : /// The idle time before sending TCP keepalive probes for gRPC connections. The
102 : /// interval and timeout between each probe is configured via sysctl. This
103 : /// allows detecting dead connections sooner.
104 : const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
105 :
106 : /// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
107 : /// algorithm, which can cause latency spikes for small messages.
108 : const GRPC_TCP_NODELAY: bool = true;
109 :
110 : /// The interval between HTTP2 keepalive pings. This allows shutting down server
111 : /// tasks when clients are unresponsive.
112 : const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
113 :
114 : /// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
115 : const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
116 :
117 : /// Number of concurrent gRPC streams per TCP connection. We expect something
118 : /// like 8 GetPage streams per connections, plus any unary requests.
119 : const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
120 :
121 : ///////////////////////////////////////////////////////////////////////////////
122 :
123 : pub struct Listener {
124 : cancel: CancellationToken,
125 : /// Cancel the listener task through `listen_cancel` to shut down the listener
126 : /// and get a handle on the existing connections.
127 : task: JoinHandle<Connections>,
128 : }
129 :
130 : pub struct Connections {
131 : cancel: CancellationToken,
132 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
133 : gate: Gate,
134 : }
135 :
136 0 : pub fn spawn(
137 0 : conf: &'static PageServerConf,
138 0 : tenant_manager: Arc<TenantManager>,
139 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
140 0 : perf_trace_dispatch: Option<Dispatch>,
141 0 : tcp_listener: tokio::net::TcpListener,
142 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
143 0 : feature_resolver: FeatureResolver,
144 0 : ) -> Listener {
145 0 : let cancel = CancellationToken::new();
146 0 : let libpq_ctx = RequestContext::todo_child(
147 0 : TaskKind::LibpqEndpointListener,
148 : // listener task shouldn't need to download anything. (We will
149 : // create a separate sub-contexts for each connection, with their
150 : // own download behavior. This context is used only to listen and
151 : // accept connections.)
152 0 : DownloadBehavior::Error,
153 : );
154 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
155 0 : "libpq listener",
156 0 : libpq_listener_main(
157 0 : conf,
158 0 : tenant_manager,
159 0 : pg_auth,
160 0 : perf_trace_dispatch,
161 0 : tcp_listener,
162 0 : conf.pg_auth_type,
163 0 : tls_config,
164 0 : conf.page_service_pipelining.clone(),
165 0 : feature_resolver,
166 0 : libpq_ctx,
167 0 : cancel.clone(),
168 0 : )
169 0 : .map(anyhow::Ok),
170 0 : ));
171 :
172 0 : Listener { cancel, task }
173 0 : }
174 :
175 : impl Listener {
176 0 : pub async fn stop_accepting(self) -> Connections {
177 0 : self.cancel.cancel();
178 0 : self.task
179 0 : .await
180 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
181 0 : }
182 : }
183 : impl Connections {
184 0 : pub(crate) async fn shutdown(self) {
185 : let Self {
186 0 : cancel,
187 0 : mut tasks,
188 0 : gate,
189 0 : } = self;
190 0 : cancel.cancel();
191 0 : while let Some(res) = tasks.join_next().await {
192 0 : Self::handle_connection_completion(res);
193 0 : }
194 0 : gate.close().await;
195 0 : }
196 :
197 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
198 0 : match res {
199 0 : Ok(Ok(())) => {}
200 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
201 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
202 : }
203 0 : }
204 : }
205 :
206 : ///
207 : /// Main loop of the page service.
208 : ///
209 : /// Listens for connections, and launches a new handler task for each.
210 : ///
211 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
212 : /// open connections.
213 : ///
214 : #[allow(clippy::too_many_arguments)]
215 0 : pub async fn libpq_listener_main(
216 0 : conf: &'static PageServerConf,
217 0 : tenant_manager: Arc<TenantManager>,
218 0 : auth: Option<Arc<SwappableJwtAuth>>,
219 0 : perf_trace_dispatch: Option<Dispatch>,
220 0 : listener: tokio::net::TcpListener,
221 0 : auth_type: AuthType,
222 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
223 0 : pipelining_config: PageServicePipeliningConfig,
224 0 : feature_resolver: FeatureResolver,
225 0 : listener_ctx: RequestContext,
226 0 : listener_cancel: CancellationToken,
227 0 : ) -> Connections {
228 0 : let connections_cancel = CancellationToken::new();
229 0 : let connections_gate = Gate::default();
230 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
231 :
232 : loop {
233 0 : let gate_guard = match connections_gate.enter() {
234 0 : Ok(guard) => guard,
235 0 : Err(_) => break,
236 : };
237 :
238 0 : let accepted = tokio::select! {
239 : biased;
240 0 : _ = listener_cancel.cancelled() => break,
241 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
242 0 : let res = next.expect("we dont poll while empty");
243 0 : Connections::handle_connection_completion(res);
244 0 : continue;
245 : }
246 0 : accepted = listener.accept() => accepted,
247 : };
248 :
249 0 : match accepted {
250 0 : Ok((socket, peer_addr)) => {
251 : // Connection established. Spawn a new task to handle it.
252 0 : debug!("accepted connection from {}", peer_addr);
253 0 : let local_auth = auth.clone();
254 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
255 0 : .task_kind(TaskKind::PageRequestHandler)
256 0 : .download_behavior(DownloadBehavior::Download)
257 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
258 0 : .detached_child();
259 :
260 0 : connection_handler_tasks.spawn(page_service_conn_main(
261 0 : conf,
262 0 : tenant_manager.clone(),
263 0 : local_auth,
264 0 : socket,
265 0 : auth_type,
266 0 : tls_config.clone(),
267 0 : pipelining_config.clone(),
268 0 : feature_resolver.clone(),
269 0 : connection_ctx,
270 0 : connections_cancel.child_token(),
271 0 : gate_guard,
272 : ));
273 : }
274 0 : Err(err) => {
275 : // accept() failed. Log the error, and loop back to retry on next connection.
276 0 : error!("accept() failed: {:?}", err);
277 : }
278 : }
279 : }
280 :
281 0 : debug!("page_service listener loop terminated");
282 :
283 0 : Connections {
284 0 : cancel: connections_cancel,
285 0 : tasks: connection_handler_tasks,
286 0 : gate: connections_gate,
287 0 : }
288 0 : }
289 :
290 : type ConnectionHandlerResult = anyhow::Result<()>;
291 :
292 : /// Perf root spans start at the per-request level, after shard routing.
293 : /// This struct carries connection-level information to the root perf span definition.
294 : #[derive(Clone, Default)]
295 : struct ConnectionPerfSpanFields {
296 : peer_addr: String,
297 : application_name: Option<String>,
298 : compute_mode: Option<String>,
299 : }
300 :
301 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
302 : #[allow(clippy::too_many_arguments)]
303 : async fn page_service_conn_main(
304 : conf: &'static PageServerConf,
305 : tenant_manager: Arc<TenantManager>,
306 : auth: Option<Arc<SwappableJwtAuth>>,
307 : socket: tokio::net::TcpStream,
308 : auth_type: AuthType,
309 : tls_config: Option<Arc<rustls::ServerConfig>>,
310 : pipelining_config: PageServicePipeliningConfig,
311 : feature_resolver: FeatureResolver,
312 : connection_ctx: RequestContext,
313 : cancel: CancellationToken,
314 : gate_guard: GateGuard,
315 : ) -> ConnectionHandlerResult {
316 : let _guard = LIVE_CONNECTIONS
317 : .with_label_values(&["page_service"])
318 : .guard();
319 :
320 : socket
321 : .set_nodelay(true)
322 : .context("could not set TCP_NODELAY")?;
323 :
324 : let socket_fd = socket.as_raw_fd();
325 :
326 : let peer_addr = socket.peer_addr().context("get peer address")?;
327 :
328 : let perf_span_fields = ConnectionPerfSpanFields {
329 : peer_addr: peer_addr.to_string(),
330 : application_name: None, // filled in later
331 : compute_mode: None, // filled in later
332 : };
333 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
334 :
335 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
336 : // - long enough for most valid compute connections
337 : // - less than infinite to stop us from "leaking" connections to long-gone computes
338 : //
339 : // no write timeout is used, because the kernel is assumed to error writes after some time.
340 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
341 :
342 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
343 0 : let socket_timeout_ms = (|| {
344 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
345 : // Exponential distribution for simulating
346 : // poor network conditions, expect about avg_timeout_ms to be around 15
347 : // in tests
348 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
349 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
350 0 : let u = rand::random::<f32>();
351 0 : ((1.0 - u).ln() / (-avg)) as u64
352 : } else {
353 0 : default_timeout_ms
354 : }
355 0 : });
356 0 : default_timeout_ms
357 : })();
358 :
359 : // A timeout here does not mean the client died, it can happen if it's just idle for
360 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
361 : // they reconnect.
362 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
363 : let socket = Box::pin(socket);
364 :
365 : fail::fail_point!("ps::connection-start::pre-login");
366 :
367 : // XXX: pgbackend.run() should take the connection_ctx,
368 : // and create a child per-query context when it invokes process_query.
369 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
370 : // and create the per-query context in process_query ourselves.
371 : let mut conn_handler = PageServerHandler::new(
372 : tenant_manager,
373 : auth,
374 : pipelining_config,
375 : conf.get_vectored_concurrent_io,
376 : perf_span_fields,
377 : connection_ctx,
378 : cancel.clone(),
379 : feature_resolver.clone(),
380 : gate_guard,
381 : );
382 : let pgbackend =
383 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
384 :
385 : match pgbackend.run(&mut conn_handler, &cancel).await {
386 : Ok(()) => {
387 : // we've been requested to shut down
388 : Ok(())
389 : }
390 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
391 : if is_expected_io_error(&io_error) {
392 : info!("Postgres client disconnected ({io_error})");
393 : Ok(())
394 : } else {
395 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
396 : Err(io_error).context(format!(
397 : "Postgres connection error for tenant_id={tenant_id:?} client at peer_addr={peer_addr}"
398 : ))
399 : }
400 : }
401 : other => {
402 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
403 : other.context(format!(
404 : "Postgres query error for tenant_id={tenant_id:?} client peer_addr={peer_addr}"
405 : ))
406 : }
407 : }
408 : }
409 :
410 : /// Page service connection handler.
411 : struct PageServerHandler {
412 : auth: Option<Arc<SwappableJwtAuth>>,
413 : claims: Option<Claims>,
414 :
415 : /// The context created for the lifetime of the connection
416 : /// services by this PageServerHandler.
417 : /// For each query received over the connection,
418 : /// `process_query` creates a child context from this one.
419 : connection_ctx: RequestContext,
420 :
421 : perf_span_fields: ConnectionPerfSpanFields,
422 :
423 : cancel: CancellationToken,
424 :
425 : /// None only while pagestream protocol is being processed.
426 : timeline_handles: Option<TimelineHandles>,
427 :
428 : pipelining_config: PageServicePipeliningConfig,
429 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
430 :
431 : feature_resolver: FeatureResolver,
432 :
433 : gate_guard: GateGuard,
434 : }
435 :
436 : struct TimelineHandles {
437 : wrapper: TenantManagerWrapper,
438 : /// Note on size: the typical size of this map is 1. The largest size we expect
439 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
440 : /// or the ratio used when splitting shards (i.e. how many children created from one)
441 : /// parent shard, where a "large" number might be ~8.
442 : handles: timeline::handle::Cache<TenantManagerTypes>,
443 : }
444 :
445 : impl TimelineHandles {
446 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
447 0 : Self {
448 0 : wrapper: TenantManagerWrapper {
449 0 : tenant_manager,
450 0 : tenant_id: OnceCell::new(),
451 0 : },
452 0 : handles: Default::default(),
453 0 : }
454 0 : }
455 0 : async fn get(
456 0 : &mut self,
457 0 : tenant_id: TenantId,
458 0 : timeline_id: TimelineId,
459 0 : shard_selector: ShardSelector,
460 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
461 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
462 0 : return Err(GetActiveTimelineError::Tenant(
463 0 : GetActiveTenantError::SwitchedTenant,
464 0 : ));
465 0 : }
466 0 : self.handles
467 0 : .get(timeline_id, shard_selector, &self.wrapper)
468 0 : .await
469 0 : .map_err(|e| match e {
470 0 : timeline::handle::GetError::TenantManager(e) => e,
471 : timeline::handle::GetError::PerTimelineStateShutDown => {
472 0 : trace!("per-timeline state shut down");
473 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
474 : }
475 0 : })
476 0 : }
477 :
478 0 : fn tenant_id(&self) -> Option<TenantId> {
479 0 : self.wrapper.tenant_id.get().copied()
480 0 : }
481 : }
482 :
483 : pub(crate) struct TenantManagerWrapper {
484 : tenant_manager: Arc<TenantManager>,
485 : // We do not support switching tenant_id on a connection at this point.
486 : // We can can add support for this later if needed without changing
487 : // the protocol.
488 : tenant_id: once_cell::sync::OnceCell<TenantId>,
489 : }
490 :
491 : #[derive(Debug)]
492 : pub(crate) struct TenantManagerTypes;
493 :
494 : impl timeline::handle::Types for TenantManagerTypes {
495 : type TenantManagerError = GetActiveTimelineError;
496 : type TenantManager = TenantManagerWrapper;
497 : type Timeline = TenantManagerCacheItem;
498 : }
499 :
500 : pub(crate) struct TenantManagerCacheItem {
501 : pub(crate) timeline: Arc<Timeline>,
502 : // allow() for cheap propagation through RequestContext inside a task
503 : #[allow(clippy::redundant_allocation)]
504 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
505 : #[allow(dead_code)] // we store it to keep the gate open
506 : pub(crate) gate_guard: GateGuard,
507 : }
508 :
509 : impl std::ops::Deref for TenantManagerCacheItem {
510 : type Target = Arc<Timeline>;
511 0 : fn deref(&self) -> &Self::Target {
512 0 : &self.timeline
513 0 : }
514 : }
515 :
516 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
517 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
518 0 : Timeline::shard_timeline_id(&self.timeline)
519 0 : }
520 :
521 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
522 0 : &self.timeline.handles
523 0 : }
524 :
525 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
526 0 : Timeline::get_shard_identity(&self.timeline)
527 0 : }
528 : }
529 :
530 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
531 0 : async fn resolve(
532 0 : &self,
533 0 : timeline_id: TimelineId,
534 0 : shard_selector: ShardSelector,
535 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
536 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
537 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
538 0 : let wait_start = Instant::now();
539 0 : let deadline = wait_start + timeout;
540 0 : let tenant_shard = loop {
541 0 : let resolved = self
542 0 : .tenant_manager
543 0 : .resolve_attached_shard(tenant_id, shard_selector);
544 0 : match resolved {
545 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
546 : ShardResolveResult::NotFound => {
547 0 : MISROUTED_PAGESTREAM_REQUESTS.inc();
548 0 : return Err(GetActiveTimelineError::Tenant(
549 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
550 0 : ));
551 : }
552 0 : ShardResolveResult::InProgress(barrier) => {
553 : // We can't authoritatively answer right now: wait for InProgress state
554 : // to end, then try again
555 0 : tokio::select! {
556 0 : _ = barrier.wait() => {
557 0 : // The barrier completed: proceed around the loop to try looking up again
558 0 : },
559 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
560 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
561 0 : latest_state: None,
562 0 : wait_time: timeout,
563 0 : }));
564 : }
565 : }
566 : }
567 : };
568 : };
569 :
570 0 : tracing::debug!("Waiting for tenant to enter active state...");
571 0 : tenant_shard
572 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
573 0 : .await
574 0 : .map_err(GetActiveTimelineError::Tenant)?;
575 :
576 0 : let timeline = tenant_shard
577 0 : .get_timeline(timeline_id, true)
578 0 : .map_err(GetActiveTimelineError::Timeline)?;
579 :
580 0 : let gate_guard = match timeline.gate.enter() {
581 0 : Ok(guard) => guard,
582 : Err(_) => {
583 0 : return Err(GetActiveTimelineError::Timeline(
584 0 : GetTimelineError::ShuttingDown,
585 0 : ));
586 : }
587 : };
588 :
589 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
590 :
591 0 : Ok(TenantManagerCacheItem {
592 0 : timeline,
593 0 : metrics,
594 0 : gate_guard,
595 0 : })
596 0 : }
597 : }
598 :
599 : /// Whether to hold the applied GC cutoff guard when processing GetPage requests.
600 : /// This is determined once at the start of pagestream subprotocol handling based on
601 : /// feature flags, configuration, and test conditions.
602 : #[derive(Debug, Clone, Copy)]
603 : enum HoldAppliedGcCutoffGuard {
604 : Yes,
605 : No,
606 : }
607 :
608 : #[derive(thiserror::Error, Debug)]
609 : enum PageStreamError {
610 : /// We encountered an error that should prompt the client to reconnect:
611 : /// in practice this means we drop the connection without sending a response.
612 : #[error("Reconnect required: {0}")]
613 : Reconnect(Cow<'static, str>),
614 :
615 : /// We were instructed to shutdown while processing the query
616 : #[error("Shutting down")]
617 : Shutdown,
618 :
619 : /// Something went wrong reading a page: this likely indicates a pageserver bug
620 : #[error("Read error")]
621 : Read(#[source] PageReconstructError),
622 :
623 : /// Ran out of time waiting for an LSN
624 : #[error("LSN timeout: {0}")]
625 : LsnTimeout(WaitLsnError),
626 :
627 : /// The entity required to serve the request (tenant or timeline) is not found,
628 : /// or is not found in a suitable state to serve a request.
629 : #[error("Not found: {0}")]
630 : NotFound(Cow<'static, str>),
631 :
632 : /// Request asked for something that doesn't make sense, like an invalid LSN
633 : #[error("Bad request: {0}")]
634 : BadRequest(Cow<'static, str>),
635 : }
636 :
637 : impl From<PageStreamError> for tonic::Status {
638 0 : fn from(err: PageStreamError) -> Self {
639 : use tonic::Code;
640 0 : let message = err.to_string();
641 0 : let code = match err {
642 0 : PageStreamError::Reconnect(_) => Code::Unavailable,
643 0 : PageStreamError::Shutdown => Code::Unavailable,
644 0 : PageStreamError::Read(err) => match err {
645 0 : PageReconstructError::Cancelled => Code::Unavailable,
646 0 : PageReconstructError::MissingKey(_) => Code::NotFound,
647 0 : PageReconstructError::AncestorLsnTimeout(err) => tonic::Status::from(err).code(),
648 0 : PageReconstructError::Other(_) => Code::Internal,
649 0 : PageReconstructError::WalRedo(_) => Code::Internal,
650 : },
651 0 : PageStreamError::LsnTimeout(err) => tonic::Status::from(err).code(),
652 0 : PageStreamError::NotFound(_) => Code::NotFound,
653 0 : PageStreamError::BadRequest(_) => Code::InvalidArgument,
654 : };
655 0 : tonic::Status::new(code, message)
656 0 : }
657 : }
658 :
659 : impl From<PageReconstructError> for PageStreamError {
660 0 : fn from(value: PageReconstructError) -> Self {
661 0 : match value {
662 0 : PageReconstructError::Cancelled => Self::Shutdown,
663 0 : e => Self::Read(e),
664 : }
665 0 : }
666 : }
667 :
668 : impl From<GetActiveTimelineError> for PageStreamError {
669 0 : fn from(value: GetActiveTimelineError) -> Self {
670 0 : match value {
671 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
672 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
673 : TenantState::Stopping { .. },
674 : ))
675 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
676 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
677 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
678 : }
679 0 : }
680 : }
681 :
682 : impl From<WaitLsnError> for PageStreamError {
683 0 : fn from(value: WaitLsnError) -> Self {
684 0 : match value {
685 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
686 0 : WaitLsnError::Shutdown => Self::Shutdown,
687 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
688 : }
689 0 : }
690 : }
691 :
692 : impl From<WaitLsnError> for QueryError {
693 0 : fn from(value: WaitLsnError) -> Self {
694 0 : match value {
695 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
696 0 : WaitLsnError::Shutdown => Self::Shutdown,
697 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
698 : }
699 0 : }
700 : }
701 :
702 : #[derive(thiserror::Error, Debug)]
703 : struct BatchedPageStreamError {
704 : req: PagestreamRequest,
705 : err: PageStreamError,
706 : }
707 :
708 : impl std::fmt::Display for BatchedPageStreamError {
709 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710 0 : self.err.fmt(f)
711 0 : }
712 : }
713 :
714 : struct BatchedGetPageRequest {
715 : req: PagestreamGetPageRequest,
716 : timer: SmgrOpTimer,
717 : lsn_range: LsnRange,
718 : ctx: RequestContext,
719 : // If the request is perf enabled, this contains a context
720 : // with a perf span tracking the time spent waiting for the executor.
721 : batch_wait_ctx: Option<RequestContext>,
722 : }
723 :
724 : #[cfg(feature = "testing")]
725 : struct BatchedTestRequest {
726 : req: pagestream_api::PagestreamTestRequest,
727 : timer: SmgrOpTimer,
728 : }
729 :
730 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
731 : /// so that we don't keep the [`Timeline::gate`] open while the batch
732 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
733 : #[derive(IntoStaticStr)]
734 : #[allow(clippy::large_enum_variant)]
735 : enum BatchedFeMessage {
736 : Exists {
737 : span: Span,
738 : timer: SmgrOpTimer,
739 : shard: WeakHandle<TenantManagerTypes>,
740 : req: PagestreamExistsRequest,
741 : },
742 : Nblocks {
743 : span: Span,
744 : timer: SmgrOpTimer,
745 : shard: WeakHandle<TenantManagerTypes>,
746 : req: PagestreamNblocksRequest,
747 : },
748 : GetPage {
749 : span: Span,
750 : shard: WeakHandle<TenantManagerTypes>,
751 : applied_gc_cutoff_guard: Option<RcuReadGuard<Lsn>>,
752 : pages: SmallVec<[BatchedGetPageRequest; 1]>,
753 : batch_break_reason: GetPageBatchBreakReason,
754 : },
755 : DbSize {
756 : span: Span,
757 : timer: SmgrOpTimer,
758 : shard: WeakHandle<TenantManagerTypes>,
759 : req: PagestreamDbSizeRequest,
760 : },
761 : GetSlruSegment {
762 : span: Span,
763 : timer: SmgrOpTimer,
764 : shard: WeakHandle<TenantManagerTypes>,
765 : req: PagestreamGetSlruSegmentRequest,
766 : },
767 : #[cfg(feature = "testing")]
768 : Test {
769 : span: Span,
770 : shard: WeakHandle<TenantManagerTypes>,
771 : requests: Vec<BatchedTestRequest>,
772 : },
773 : RespondError {
774 : span: Span,
775 : error: BatchedPageStreamError,
776 : },
777 : }
778 :
779 : impl BatchedFeMessage {
780 0 : fn as_static_str(&self) -> &'static str {
781 0 : self.into()
782 0 : }
783 :
784 0 : fn observe_execution_start(&mut self, at: Instant) {
785 0 : match self {
786 0 : BatchedFeMessage::Exists { timer, .. }
787 0 : | BatchedFeMessage::Nblocks { timer, .. }
788 0 : | BatchedFeMessage::DbSize { timer, .. }
789 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
790 0 : timer.observe_execution_start(at);
791 0 : }
792 0 : BatchedFeMessage::GetPage { pages, .. } => {
793 0 : for page in pages {
794 0 : page.timer.observe_execution_start(at);
795 0 : }
796 : }
797 : #[cfg(feature = "testing")]
798 0 : BatchedFeMessage::Test { requests, .. } => {
799 0 : for req in requests {
800 0 : req.timer.observe_execution_start(at);
801 0 : }
802 : }
803 0 : BatchedFeMessage::RespondError { .. } => {}
804 : }
805 0 : }
806 :
807 0 : fn should_break_batch(
808 0 : &self,
809 0 : other: &BatchedFeMessage,
810 0 : max_batch_size: NonZeroUsize,
811 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
812 0 : ) -> Option<GetPageBatchBreakReason> {
813 0 : match (self, other) {
814 : (
815 : BatchedFeMessage::GetPage {
816 0 : shard: accum_shard,
817 0 : pages: accum_pages,
818 : ..
819 : },
820 : BatchedFeMessage::GetPage {
821 0 : shard: this_shard,
822 0 : pages: this_pages,
823 : ..
824 : },
825 : ) => {
826 0 : assert_eq!(this_pages.len(), 1);
827 0 : if accum_pages.len() >= max_batch_size.get() {
828 0 : trace!(%max_batch_size, "stopping batching because of batch size");
829 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
830 :
831 0 : return Some(GetPageBatchBreakReason::BatchFull);
832 0 : }
833 0 : if !accum_shard.is_same_handle_as(this_shard) {
834 0 : trace!("stopping batching because timeline object mismatch");
835 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
836 : // But the current logic for keeping responses in order does not support that.
837 :
838 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
839 0 : }
840 :
841 0 : match batching_strategy {
842 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
843 0 : if let Some(last_in_batch) = accum_pages.last() {
844 0 : if last_in_batch.lsn_range.effective_lsn
845 0 : != this_pages[0].lsn_range.effective_lsn
846 : {
847 0 : trace!(
848 : accum_lsn = %last_in_batch.lsn_range.effective_lsn,
849 0 : this_lsn = %this_pages[0].lsn_range.effective_lsn,
850 0 : "stopping batching because LSN changed"
851 : );
852 :
853 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
854 0 : }
855 0 : }
856 : }
857 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
858 : // The read path doesn't curently support serving the same page at different LSNs.
859 : // While technically possible, it's uncertain if the complexity is worth it.
860 : // Break the batch if such a case is encountered.
861 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
862 0 : batched.req.rel == this_pages[0].req.rel
863 0 : && batched.req.blkno == this_pages[0].req.blkno
864 0 : && batched.lsn_range.effective_lsn
865 0 : != this_pages[0].lsn_range.effective_lsn
866 0 : });
867 :
868 0 : if same_page_different_lsn {
869 0 : trace!(
870 0 : rel=%this_pages[0].req.rel,
871 0 : blkno=%this_pages[0].req.blkno,
872 0 : lsn=%this_pages[0].lsn_range.effective_lsn,
873 0 : "stopping batching because same page was requested at different LSNs"
874 : );
875 :
876 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
877 0 : }
878 : }
879 : }
880 :
881 0 : None
882 : }
883 : #[cfg(feature = "testing")]
884 : (
885 : BatchedFeMessage::Test {
886 0 : shard: accum_shard,
887 0 : requests: accum_requests,
888 : ..
889 : },
890 : BatchedFeMessage::Test {
891 0 : shard: this_shard,
892 0 : requests: this_requests,
893 : ..
894 : },
895 : ) => {
896 0 : assert!(this_requests.len() == 1);
897 0 : if accum_requests.len() >= max_batch_size.get() {
898 0 : trace!(%max_batch_size, "stopping batching because of batch size");
899 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
900 0 : return Some(GetPageBatchBreakReason::BatchFull);
901 0 : }
902 0 : if !accum_shard.is_same_handle_as(this_shard) {
903 0 : trace!("stopping batching because timeline object mismatch");
904 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
905 : // But the current logic for keeping responses in order does not support that.
906 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
907 0 : }
908 0 : let this_batch_key = this_requests[0].req.batch_key;
909 0 : let accum_batch_key = accum_requests[0].req.batch_key;
910 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
911 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
912 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
913 0 : }
914 0 : None
915 : }
916 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
917 : }
918 0 : }
919 : }
920 :
921 : impl PageServerHandler {
922 : #[allow(clippy::too_many_arguments)]
923 0 : pub fn new(
924 0 : tenant_manager: Arc<TenantManager>,
925 0 : auth: Option<Arc<SwappableJwtAuth>>,
926 0 : pipelining_config: PageServicePipeliningConfig,
927 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
928 0 : perf_span_fields: ConnectionPerfSpanFields,
929 0 : connection_ctx: RequestContext,
930 0 : cancel: CancellationToken,
931 0 : feature_resolver: FeatureResolver,
932 0 : gate_guard: GateGuard,
933 0 : ) -> Self {
934 0 : PageServerHandler {
935 0 : auth,
936 0 : claims: None,
937 0 : connection_ctx,
938 0 : perf_span_fields,
939 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
940 0 : cancel,
941 0 : pipelining_config,
942 0 : get_vectored_concurrent_io,
943 0 : feature_resolver,
944 0 : gate_guard,
945 0 : }
946 0 : }
947 :
948 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
949 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
950 : /// cancellation if there aren't any timelines in the cache.
951 : ///
952 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
953 : /// timeline cancellation token.
954 0 : async fn flush_cancellable<IO>(
955 0 : &self,
956 0 : pgb: &mut PostgresBackend<IO>,
957 0 : cancel: &CancellationToken,
958 0 : ) -> Result<(), QueryError>
959 0 : where
960 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
961 0 : {
962 0 : tokio::select!(
963 0 : flush_r = pgb.flush() => {
964 0 : Ok(flush_r?)
965 : },
966 0 : _ = cancel.cancelled() => {
967 0 : Err(QueryError::Shutdown)
968 : }
969 : )
970 0 : }
971 :
972 : #[allow(clippy::too_many_arguments)]
973 0 : async fn pagestream_read_message<IO>(
974 0 : pgb: &mut PostgresBackendReader<IO>,
975 0 : tenant_id: TenantId,
976 0 : timeline_id: TimelineId,
977 0 : timeline_handles: &mut TimelineHandles,
978 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
979 0 : cancel: &CancellationToken,
980 0 : ctx: &RequestContext,
981 0 : protocol_version: PagestreamProtocolVersion,
982 0 : parent_span: Span,
983 0 : hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
984 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
985 0 : where
986 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
987 0 : {
988 0 : let msg = tokio::select! {
989 : biased;
990 0 : _ = cancel.cancelled() => {
991 0 : return Err(QueryError::Shutdown)
992 : }
993 0 : msg = pgb.read_message() => { msg }
994 : };
995 :
996 0 : let received_at = Instant::now();
997 :
998 0 : let copy_data_bytes = match msg? {
999 0 : Some(FeMessage::CopyData(bytes)) => bytes,
1000 : Some(FeMessage::Terminate) => {
1001 0 : return Ok(None);
1002 : }
1003 0 : Some(m) => {
1004 0 : return Err(QueryError::Other(anyhow::anyhow!(
1005 0 : "unexpected message: {m:?} during COPY"
1006 0 : )));
1007 : }
1008 : None => {
1009 0 : return Ok(None);
1010 : } // client disconnected
1011 : };
1012 0 : trace!("query: {copy_data_bytes:?}");
1013 :
1014 0 : fail::fail_point!("ps::handle-pagerequest-message");
1015 :
1016 : // parse request
1017 0 : let neon_fe_msg =
1018 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
1019 :
1020 0 : let batched_msg = match neon_fe_msg {
1021 0 : PagestreamFeMessage::Exists(req) => {
1022 0 : let shard = timeline_handles
1023 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1024 0 : .await?;
1025 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1026 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1027 0 : let timer = Self::record_op_start_and_throttle(
1028 0 : &shard,
1029 0 : metrics::SmgrQueryType::GetRelExists,
1030 0 : received_at,
1031 0 : )
1032 0 : .await?;
1033 0 : BatchedFeMessage::Exists {
1034 0 : span,
1035 0 : timer,
1036 0 : shard: shard.downgrade(),
1037 0 : req,
1038 0 : }
1039 : }
1040 0 : PagestreamFeMessage::Nblocks(req) => {
1041 0 : let shard = timeline_handles
1042 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1043 0 : .await?;
1044 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1045 0 : let timer = Self::record_op_start_and_throttle(
1046 0 : &shard,
1047 0 : metrics::SmgrQueryType::GetRelSize,
1048 0 : received_at,
1049 0 : )
1050 0 : .await?;
1051 0 : BatchedFeMessage::Nblocks {
1052 0 : span,
1053 0 : timer,
1054 0 : shard: shard.downgrade(),
1055 0 : req,
1056 0 : }
1057 : }
1058 0 : PagestreamFeMessage::DbSize(req) => {
1059 0 : let shard = timeline_handles
1060 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1061 0 : .await?;
1062 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1063 0 : let timer = Self::record_op_start_and_throttle(
1064 0 : &shard,
1065 0 : metrics::SmgrQueryType::GetDbSize,
1066 0 : received_at,
1067 0 : )
1068 0 : .await?;
1069 0 : BatchedFeMessage::DbSize {
1070 0 : span,
1071 0 : timer,
1072 0 : shard: shard.downgrade(),
1073 0 : req,
1074 0 : }
1075 : }
1076 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1077 0 : let shard = timeline_handles
1078 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1079 0 : .await?;
1080 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1081 0 : let timer = Self::record_op_start_and_throttle(
1082 0 : &shard,
1083 0 : metrics::SmgrQueryType::GetSlruSegment,
1084 0 : received_at,
1085 0 : )
1086 0 : .await?;
1087 0 : BatchedFeMessage::GetSlruSegment {
1088 0 : span,
1089 0 : timer,
1090 0 : shard: shard.downgrade(),
1091 0 : req,
1092 0 : }
1093 : }
1094 0 : PagestreamFeMessage::GetPage(req) => {
1095 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1096 : macro_rules! mkspan {
1097 : (before shard routing) => {{
1098 : tracing::info_span!(
1099 : parent: &parent_span,
1100 : "handle_get_page_request",
1101 : request_id = %req.hdr.reqid,
1102 : rel = %req.rel,
1103 : blkno = %req.blkno,
1104 : req_lsn = %req.hdr.request_lsn,
1105 : not_modified_since_lsn = %req.hdr.not_modified_since,
1106 : )
1107 : }};
1108 : ($shard_id:expr) => {{
1109 : tracing::info_span!(
1110 : parent: &parent_span,
1111 : "handle_get_page_request",
1112 : request_id = %req.hdr.reqid,
1113 : rel = %req.rel,
1114 : blkno = %req.blkno,
1115 : req_lsn = %req.hdr.request_lsn,
1116 : not_modified_since_lsn = %req.hdr.not_modified_since,
1117 : shard_id = %$shard_id,
1118 : )
1119 : }};
1120 : }
1121 :
1122 : macro_rules! respond_error {
1123 : ($span:expr, $error:expr) => {{
1124 : let error = BatchedFeMessage::RespondError {
1125 : span: $span,
1126 : error: BatchedPageStreamError {
1127 : req: req.hdr,
1128 : err: $error,
1129 : },
1130 : };
1131 : Ok(Some(error))
1132 : }};
1133 : }
1134 :
1135 0 : let key = rel_block_to_key(req.rel, req.blkno);
1136 :
1137 0 : let res = timeline_handles
1138 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1139 0 : .await;
1140 :
1141 0 : let shard = match res {
1142 0 : Ok(tl) => tl,
1143 0 : Err(e) => {
1144 0 : let span = mkspan!(before shard routing);
1145 0 : match e {
1146 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1147 : // We already know this tenant exists in general, because we resolved it at
1148 : // start of connection. Getting a NotFound here indicates that the shard containing
1149 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1150 : // mapping is out of date.
1151 : //
1152 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1153 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1154 : // and talk to a different pageserver.
1155 0 : MISROUTED_PAGESTREAM_REQUESTS.inc();
1156 0 : return respond_error!(
1157 0 : span,
1158 0 : PageStreamError::Reconnect(
1159 0 : "getpage@lsn request routed to wrong shard".into()
1160 0 : )
1161 : );
1162 : }
1163 0 : e => {
1164 0 : return respond_error!(span, e.into());
1165 : }
1166 : }
1167 : }
1168 : };
1169 :
1170 0 : let ctx = if shard.is_get_page_request_sampled() {
1171 0 : RequestContextBuilder::from(ctx)
1172 0 : .root_perf_span(|| {
1173 0 : info_span!(
1174 : target: PERF_TRACE_TARGET,
1175 : "GET_PAGE",
1176 : peer_addr = conn_perf_span_fields.peer_addr,
1177 : application_name = conn_perf_span_fields.application_name,
1178 : compute_mode = conn_perf_span_fields.compute_mode,
1179 : tenant_id = %tenant_id,
1180 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1181 : timeline_id = %timeline_id,
1182 : lsn = %req.hdr.request_lsn,
1183 : not_modified_since_lsn = %req.hdr.not_modified_since,
1184 : request_id = %req.hdr.reqid,
1185 : key = %key,
1186 : )
1187 0 : })
1188 0 : .attached_child()
1189 : } else {
1190 0 : ctx.attached_child()
1191 : };
1192 :
1193 : // This ctx travels as part of the BatchedFeMessage through
1194 : // batching into the request handler.
1195 : // The request handler needs to do some per-request work
1196 : // (relsize check) before dispatching the batch as a single
1197 : // get_vectored call to the Timeline.
1198 : // This ctx will be used for the reslize check, whereas the
1199 : // get_vectored call will be a different ctx with separate
1200 : // perf span.
1201 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1202 :
1203 : // Similar game for this `span`: we funnel it through so that
1204 : // request handler log messages contain the request-specific fields.
1205 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1206 :
1207 0 : let timer = Self::record_op_start_and_throttle(
1208 0 : &shard,
1209 0 : metrics::SmgrQueryType::GetPageAtLsn,
1210 0 : received_at,
1211 : )
1212 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1213 0 : info_span!(
1214 : target: PERF_TRACE_TARGET,
1215 0 : parent: current_perf_span,
1216 : "THROTTLE",
1217 : )
1218 0 : })
1219 0 : .await?;
1220 :
1221 0 : let applied_gc_cutoff_guard = shard.get_applied_gc_cutoff_lsn(); // hold guard
1222 : // We're holding the Handle
1223 0 : let effective_lsn = match Self::effective_request_lsn(
1224 0 : &shard,
1225 0 : shard.get_last_record_lsn(),
1226 0 : req.hdr.request_lsn,
1227 0 : req.hdr.not_modified_since,
1228 0 : &applied_gc_cutoff_guard,
1229 0 : ) {
1230 0 : Ok(lsn) => lsn,
1231 0 : Err(e) => {
1232 0 : return respond_error!(span, e);
1233 : }
1234 : };
1235 0 : let applied_gc_cutoff_guard = match hold_gc_cutoff_guard {
1236 0 : HoldAppliedGcCutoffGuard::Yes => Some(applied_gc_cutoff_guard),
1237 : HoldAppliedGcCutoffGuard::No => {
1238 0 : drop(applied_gc_cutoff_guard);
1239 0 : None
1240 : }
1241 : };
1242 :
1243 0 : let batch_wait_ctx = if ctx.has_perf_span() {
1244 : Some(
1245 0 : RequestContextBuilder::from(&ctx)
1246 0 : .perf_span(|crnt_perf_span| {
1247 0 : info_span!(
1248 : target: PERF_TRACE_TARGET,
1249 0 : parent: crnt_perf_span,
1250 : "WAIT_EXECUTOR",
1251 : )
1252 0 : })
1253 0 : .attached_child(),
1254 : )
1255 : } else {
1256 0 : None
1257 : };
1258 :
1259 : BatchedFeMessage::GetPage {
1260 0 : span,
1261 0 : shard: shard.downgrade(),
1262 0 : applied_gc_cutoff_guard,
1263 0 : pages: smallvec![BatchedGetPageRequest {
1264 0 : req,
1265 0 : timer,
1266 0 : lsn_range: LsnRange {
1267 0 : effective_lsn,
1268 0 : request_lsn: req.hdr.request_lsn
1269 0 : },
1270 0 : ctx,
1271 0 : batch_wait_ctx,
1272 0 : }],
1273 : // The executor grabs the batch when it becomes idle.
1274 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1275 : // default reason for breaking the batch.
1276 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1277 : }
1278 : }
1279 : #[cfg(feature = "testing")]
1280 0 : PagestreamFeMessage::Test(req) => {
1281 0 : let shard = timeline_handles
1282 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1283 0 : .await?;
1284 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1285 0 : let timer = Self::record_op_start_and_throttle(
1286 0 : &shard,
1287 0 : metrics::SmgrQueryType::Test,
1288 0 : received_at,
1289 0 : )
1290 0 : .await?;
1291 0 : BatchedFeMessage::Test {
1292 0 : span,
1293 0 : shard: shard.downgrade(),
1294 0 : requests: vec![BatchedTestRequest { req, timer }],
1295 0 : }
1296 : }
1297 : };
1298 0 : Ok(Some(batched_msg))
1299 0 : }
1300 :
1301 : /// Starts a SmgrOpTimer at received_at and throttles the request.
1302 0 : async fn record_op_start_and_throttle(
1303 0 : shard: &Handle<TenantManagerTypes>,
1304 0 : op: metrics::SmgrQueryType,
1305 0 : received_at: Instant,
1306 0 : ) -> Result<SmgrOpTimer, QueryError> {
1307 : // It's important to start the smgr op metric recorder as early as possible
1308 : // so that the _started counters are incremented before we do
1309 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
1310 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
1311 0 : let now = Instant::now();
1312 0 : timer.observe_throttle_start(now);
1313 0 : let throttled = tokio::select! {
1314 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
1315 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
1316 : };
1317 0 : timer.observe_throttle_done(throttled);
1318 0 : Ok(timer)
1319 0 : }
1320 :
1321 : /// Post-condition: `batch` is Some()
1322 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1323 : #[allow(clippy::boxed_local)]
1324 : fn pagestream_do_batch(
1325 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1326 : max_batch_size: NonZeroUsize,
1327 : batch: &mut Result<BatchedFeMessage, QueryError>,
1328 : this_msg: Result<BatchedFeMessage, QueryError>,
1329 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1330 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1331 :
1332 : let this_msg = match this_msg {
1333 : Ok(this_msg) => this_msg,
1334 : Err(e) => return Err(Err(e)),
1335 : };
1336 :
1337 : let eligible_batch = match batch {
1338 : Ok(b) => b,
1339 : Err(_) => {
1340 : return Err(Ok(this_msg));
1341 : }
1342 : };
1343 :
1344 : let batch_break =
1345 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1346 :
1347 : match batch_break {
1348 : Some(reason) => {
1349 : if let BatchedFeMessage::GetPage {
1350 : batch_break_reason, ..
1351 : } = eligible_batch
1352 : {
1353 : *batch_break_reason = reason;
1354 : }
1355 :
1356 : Err(Ok(this_msg))
1357 : }
1358 : None => {
1359 : // ok to batch
1360 : match (eligible_batch, this_msg) {
1361 : (
1362 : BatchedFeMessage::GetPage {
1363 : pages: accum_pages,
1364 : applied_gc_cutoff_guard: accum_applied_gc_cutoff_guard,
1365 : ..
1366 : },
1367 : BatchedFeMessage::GetPage {
1368 : pages: this_pages,
1369 : applied_gc_cutoff_guard: this_applied_gc_cutoff_guard,
1370 : ..
1371 : },
1372 : ) => {
1373 : accum_pages.extend(this_pages);
1374 : // the minimum of the two guards will keep data for both alive
1375 : match (&accum_applied_gc_cutoff_guard, this_applied_gc_cutoff_guard) {
1376 : (None, None) => (),
1377 : (None, Some(this)) => *accum_applied_gc_cutoff_guard = Some(this),
1378 : (Some(_), None) => (),
1379 : (Some(accum), Some(this)) => {
1380 : if **accum > *this {
1381 : *accum_applied_gc_cutoff_guard = Some(this);
1382 : }
1383 : }
1384 : };
1385 : Ok(())
1386 : }
1387 : #[cfg(feature = "testing")]
1388 : (
1389 : BatchedFeMessage::Test {
1390 : requests: accum_requests,
1391 : ..
1392 : },
1393 : BatchedFeMessage::Test {
1394 : requests: this_requests,
1395 : ..
1396 : },
1397 : ) => {
1398 : accum_requests.extend(this_requests);
1399 : Ok(())
1400 : }
1401 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1402 : _ => unreachable!(),
1403 : }
1404 : }
1405 : }
1406 : }
1407 :
1408 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1409 : async fn pagestream_handle_batched_message<IO>(
1410 : &mut self,
1411 : pgb_writer: &mut PostgresBackend<IO>,
1412 : batch: BatchedFeMessage,
1413 : io_concurrency: IoConcurrency,
1414 : cancel: &CancellationToken,
1415 : protocol_version: PagestreamProtocolVersion,
1416 : ctx: &RequestContext,
1417 : ) -> Result<(), QueryError>
1418 : where
1419 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1420 : {
1421 : let started_at = Instant::now();
1422 : let batch = {
1423 : let mut batch = batch;
1424 : batch.observe_execution_start(started_at);
1425 : batch
1426 : };
1427 :
1428 : // Dispatch the batch to the appropriate request handler.
1429 : let log_slow_name = batch.as_static_str();
1430 : let (mut handler_results, span) = {
1431 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1432 : // won't fit on the stack.
1433 : let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
1434 : batch,
1435 : io_concurrency,
1436 : ctx,
1437 : ));
1438 : log_slow(
1439 : log_slow_name,
1440 : LOG_SLOW_GETPAGE_THRESHOLD,
1441 : boxpinned.as_mut(),
1442 : )
1443 : .await?
1444 : };
1445 :
1446 : // We purposefully don't count flush time into the smgr operation timer.
1447 : //
1448 : // The reason is that current compute client will not perform protocol processing
1449 : // if the postgres backend process is doing things other than `->smgr_read()`.
1450 : // This is especially the case for prefetch.
1451 : //
1452 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1453 : // all the way into our flush call below.
1454 : //
1455 : // The timer's underlying metric is used for a storage-internal latency SLO and
1456 : // we don't want to include latency in it that we can't control.
1457 : // And as pointed out above, in this case, we don't control the time that flush will take.
1458 : //
1459 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1460 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1461 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1462 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1463 : let flush_timers = {
1464 : let flushing_start_time = Instant::now();
1465 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1466 : for handler_result in &mut handler_results {
1467 : let flush_timer = match handler_result {
1468 : Ok((_response, timer, _ctx)) => Some(
1469 : timer
1470 : .observe_execution_end(flushing_start_time)
1471 : .expect("we are the first caller"),
1472 : ),
1473 : Err(_) => {
1474 : // TODO: measure errors
1475 : None
1476 : }
1477 : };
1478 : flush_timers.push(flush_timer);
1479 : }
1480 : assert_eq!(flush_timers.len(), handler_results.len());
1481 : flush_timers
1482 : };
1483 :
1484 : // Map handler result to protocol behavior.
1485 : // Some handler errors cause exit from pagestream protocol.
1486 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1487 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1488 : let (response_msg, ctx) = match handler_result {
1489 : Err(e) => match &e.err {
1490 : PageStreamError::Shutdown => {
1491 : // BEGIN HADRON
1492 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1493 : .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
1494 : .inc();
1495 : // END HADRON
1496 :
1497 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1498 : // shutdown, then do not send the error to the client. Instead just drop the
1499 : // connection.
1500 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1501 : return Err(QueryError::Shutdown);
1502 : }
1503 : PageStreamError::Reconnect(_reason) => {
1504 0 : span.in_scope(|| {
1505 : // BEGIN HADRON
1506 : // We can get here because the compute node is pointing at the wrong PS. We
1507 : // already have a metric to keep track of this so suppressing this log to
1508 : // reduce log spam. The information in this log message is not going to be that
1509 : // helpful given the volume of logs that can be generated.
1510 : // info!("handler requested reconnect: {reason}")
1511 : // END HADRON
1512 0 : });
1513 : // BEGIN HADRON
1514 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1515 : .with_label_values(&[
1516 : metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
1517 : ])
1518 : .inc();
1519 : // END HADRON
1520 : return Err(QueryError::Reconnect);
1521 : }
1522 : PageStreamError::Read(_)
1523 : | PageStreamError::LsnTimeout(_)
1524 : | PageStreamError::NotFound(_)
1525 : | PageStreamError::BadRequest(_) => {
1526 : // BEGIN HADRON
1527 : if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
1528 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1529 : .with_label_values(&[
1530 : metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
1531 : ])
1532 : .inc();
1533 : } else {
1534 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1535 : .with_label_values(&[
1536 : metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
1537 : ])
1538 : .inc();
1539 : }
1540 : // END HADRON
1541 :
1542 : // print the all details to the log with {:#}, but for the client the
1543 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1544 : // here includes cancellation which is not an error.
1545 : let full = utils::error::report_compact_sources(&e.err);
1546 0 : span.in_scope(|| {
1547 0 : error!("error reading relation or page version: {full:#}")
1548 0 : });
1549 :
1550 : (
1551 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1552 : req: e.req,
1553 : message: e.err.to_string(),
1554 : }),
1555 : None,
1556 : )
1557 : }
1558 : },
1559 : Ok((response_msg, _op_timer_already_observed, ctx)) => {
1560 : // BEGIN HADRON
1561 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1562 : .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
1563 : .inc();
1564 : // END HADRON
1565 :
1566 : (response_msg, Some(ctx))
1567 : }
1568 : };
1569 :
1570 0 : let ctx = ctx.map(|req_ctx| {
1571 0 : RequestContextBuilder::from(&req_ctx)
1572 0 : .perf_span(|crnt_perf_span| {
1573 0 : info_span!(
1574 : target: PERF_TRACE_TARGET,
1575 0 : parent: crnt_perf_span,
1576 : "FLUSH_RESPONSE",
1577 : )
1578 0 : })
1579 0 : .attached_child()
1580 0 : });
1581 :
1582 : //
1583 : // marshal & transmit response message
1584 : //
1585 :
1586 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1587 : &response_msg.serialize(protocol_version),
1588 : ))?;
1589 :
1590 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1591 :
1592 : // what we want to do
1593 : let socket_fd = pgb_writer.socket_fd;
1594 : let flush_fut = pgb_writer.flush();
1595 : // metric for how long flushing takes
1596 : let flush_fut = match flushing_timer {
1597 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1598 : Instant::now(),
1599 : flush_fut,
1600 : socket_fd,
1601 : )),
1602 : None => futures::future::Either::Right(flush_fut),
1603 : };
1604 :
1605 : let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
1606 : futures::future::Either::Left(
1607 0 : flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
1608 0 : current_perf_span.clone()
1609 0 : }),
1610 : )
1611 : } else {
1612 : futures::future::Either::Right(flush_fut)
1613 : };
1614 :
1615 : // do it while respecting cancellation
1616 0 : let _: () = async move {
1617 0 : tokio::select! {
1618 : biased;
1619 0 : _ = cancel.cancelled() => {
1620 : // We were requested to shut down.
1621 0 : info!("shutdown request received in page handler");
1622 0 : return Err(QueryError::Shutdown)
1623 : }
1624 0 : res = flush_fut => {
1625 0 : res?;
1626 : }
1627 : }
1628 0 : Ok(())
1629 0 : }
1630 : .await?;
1631 : }
1632 : Ok(())
1633 : }
1634 :
1635 : /// Helper which dispatches a batched message to the appropriate handler.
1636 : /// Returns a vec of results, along with the extracted trace span.
1637 0 : async fn pagestream_dispatch_batched_message(
1638 0 : batch: BatchedFeMessage,
1639 0 : io_concurrency: IoConcurrency,
1640 0 : ctx: &RequestContext,
1641 0 : ) -> Result<
1642 0 : (
1643 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
1644 0 : Span,
1645 0 : ),
1646 0 : QueryError,
1647 0 : > {
1648 : macro_rules! upgrade_handle_and_set_context {
1649 : ($shard:ident) => {{
1650 : let weak_handle = &$shard;
1651 : let handle = weak_handle.upgrade()?;
1652 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1653 : (handle, ctx)
1654 : }};
1655 : }
1656 0 : Ok(match batch {
1657 : BatchedFeMessage::Exists {
1658 0 : span,
1659 0 : timer,
1660 0 : shard,
1661 0 : req,
1662 : } => {
1663 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1664 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1665 : (
1666 0 : vec![
1667 0 : Self::handle_get_rel_exists_request(&shard, &req, &ctx)
1668 0 : .instrument(span.clone())
1669 0 : .await
1670 0 : .map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
1671 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1672 : ],
1673 0 : span,
1674 : )
1675 : }
1676 : BatchedFeMessage::Nblocks {
1677 0 : span,
1678 0 : timer,
1679 0 : shard,
1680 0 : req,
1681 : } => {
1682 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1683 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1684 : (
1685 0 : vec![
1686 0 : Self::handle_get_nblocks_request(&shard, &req, false, &ctx)
1687 0 : .instrument(span.clone())
1688 0 : .await
1689 0 : .map(|msg| msg.expect("allow_missing=false"))
1690 0 : .map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
1691 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1692 : ],
1693 0 : span,
1694 : )
1695 : }
1696 : BatchedFeMessage::GetPage {
1697 0 : span,
1698 0 : shard,
1699 0 : applied_gc_cutoff_guard,
1700 0 : pages,
1701 0 : batch_break_reason,
1702 : } => {
1703 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1704 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1705 : (
1706 : {
1707 0 : let npages = pages.len();
1708 0 : trace!(npages, "handling getpage request");
1709 0 : let res = Self::handle_get_page_at_lsn_request_batched(
1710 0 : &shard,
1711 0 : pages,
1712 0 : io_concurrency,
1713 0 : batch_break_reason,
1714 0 : &ctx,
1715 0 : )
1716 0 : .instrument(span.clone())
1717 0 : .await;
1718 0 : assert_eq!(res.len(), npages);
1719 0 : drop(applied_gc_cutoff_guard);
1720 0 : res
1721 : },
1722 0 : span,
1723 : )
1724 : }
1725 : BatchedFeMessage::DbSize {
1726 0 : span,
1727 0 : timer,
1728 0 : shard,
1729 0 : req,
1730 : } => {
1731 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1732 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1733 : (
1734 0 : vec![
1735 0 : Self::handle_db_size_request(&shard, &req, &ctx)
1736 0 : .instrument(span.clone())
1737 0 : .await
1738 0 : .map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
1739 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1740 : ],
1741 0 : span,
1742 : )
1743 : }
1744 : BatchedFeMessage::GetSlruSegment {
1745 0 : span,
1746 0 : timer,
1747 0 : shard,
1748 0 : req,
1749 : } => {
1750 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1751 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1752 : (
1753 0 : vec![
1754 0 : Self::handle_get_slru_segment_request(&shard, &req, &ctx)
1755 0 : .instrument(span.clone())
1756 0 : .await
1757 0 : .map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
1758 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1759 : ],
1760 0 : span,
1761 : )
1762 : }
1763 : #[cfg(feature = "testing")]
1764 : BatchedFeMessage::Test {
1765 0 : span,
1766 0 : shard,
1767 0 : requests,
1768 : } => {
1769 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1770 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1771 : (
1772 : {
1773 0 : let npages = requests.len();
1774 0 : trace!(npages, "handling getpage request");
1775 0 : let res = Self::handle_test_request_batch(&shard, requests, &ctx)
1776 0 : .instrument(span.clone())
1777 0 : .await;
1778 0 : assert_eq!(res.len(), npages);
1779 0 : res
1780 : },
1781 0 : span,
1782 : )
1783 : }
1784 0 : BatchedFeMessage::RespondError { span, error } => {
1785 : // We've already decided to respond with an error, so we don't need to
1786 : // call the handler.
1787 0 : (vec![Err(error)], span)
1788 : }
1789 : })
1790 0 : }
1791 :
1792 : /// Pagestream sub-protocol handler.
1793 : ///
1794 : /// It is a simple request-response protocol inside a COPYBOTH session.
1795 : ///
1796 : /// # Coding Discipline
1797 : ///
1798 : /// Coding discipline within this function: all interaction with the `pgb` connection
1799 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1800 : /// This is so that we can shutdown page_service quickly.
1801 : #[instrument(skip_all, fields(hold_gc_cutoff_guard))]
1802 : async fn handle_pagerequests<IO>(
1803 : &mut self,
1804 : pgb: &mut PostgresBackend<IO>,
1805 : tenant_id: TenantId,
1806 : timeline_id: TimelineId,
1807 : protocol_version: PagestreamProtocolVersion,
1808 : ctx: RequestContext,
1809 : ) -> Result<(), QueryError>
1810 : where
1811 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1812 : {
1813 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1814 :
1815 : // switch client to COPYBOTH
1816 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1817 : tokio::select! {
1818 : biased;
1819 : _ = self.cancel.cancelled() => {
1820 : return Err(QueryError::Shutdown)
1821 : }
1822 : res = pgb.flush() => {
1823 : res?;
1824 : }
1825 : }
1826 :
1827 : let io_concurrency = IoConcurrency::spawn_from_conf(
1828 : self.get_vectored_concurrent_io,
1829 : match self.gate_guard.try_clone() {
1830 : Ok(guard) => guard,
1831 : Err(_) => {
1832 : info!("shutdown request received in page handler");
1833 : return Err(QueryError::Shutdown);
1834 : }
1835 : },
1836 : );
1837 :
1838 : let pgb_reader = pgb
1839 : .split()
1840 : .context("implementation error: split pgb into reader and writer")?;
1841 :
1842 : let timeline_handles = self
1843 : .timeline_handles
1844 : .take()
1845 : .expect("implementation error: timeline_handles should not be locked");
1846 :
1847 : // Evaluate the expensive feature resolver check once per pagestream subprotocol handling
1848 : // instead of once per GetPage request. This is shared between pipelined and serial paths.
1849 : let hold_gc_cutoff_guard = if cfg!(test) || cfg!(feature = "testing") {
1850 : HoldAppliedGcCutoffGuard::Yes
1851 : } else {
1852 : // Use the global feature resolver with the tenant ID directly, avoiding the need
1853 : // to get a timeline/shard which might not be available on this pageserver node.
1854 : let empty_properties = std::collections::HashMap::new();
1855 : match self.feature_resolver.evaluate_boolean(
1856 : "page-service-getpage-hold-applied-gc-cutoff-guard",
1857 : tenant_id,
1858 : &empty_properties,
1859 : ) {
1860 : Ok(()) => HoldAppliedGcCutoffGuard::Yes,
1861 : Err(_) => HoldAppliedGcCutoffGuard::No,
1862 : }
1863 : };
1864 : // record it in the span of handle_pagerequests so that both the request_span
1865 : // and the pipeline implementation spans contains the field.
1866 : Span::current().record(
1867 : "hold_gc_cutoff_guard",
1868 : tracing::field::debug(&hold_gc_cutoff_guard),
1869 : );
1870 :
1871 : let request_span = info_span!("request");
1872 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1873 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1874 : self.handle_pagerequests_pipelined(
1875 : pgb,
1876 : pgb_reader,
1877 : tenant_id,
1878 : timeline_id,
1879 : timeline_handles,
1880 : request_span,
1881 : pipelining_config,
1882 : protocol_version,
1883 : io_concurrency,
1884 : hold_gc_cutoff_guard,
1885 : &ctx,
1886 : )
1887 : .await
1888 : }
1889 : PageServicePipeliningConfig::Serial => {
1890 : self.handle_pagerequests_serial(
1891 : pgb,
1892 : pgb_reader,
1893 : tenant_id,
1894 : timeline_id,
1895 : timeline_handles,
1896 : request_span,
1897 : protocol_version,
1898 : io_concurrency,
1899 : hold_gc_cutoff_guard,
1900 : &ctx,
1901 : )
1902 : .await
1903 : }
1904 : };
1905 :
1906 : debug!("pagestream subprotocol shut down cleanly");
1907 :
1908 : pgb.unsplit(pgb_reader)
1909 : .context("implementation error: unsplit pgb")?;
1910 :
1911 : let replaced = self.timeline_handles.replace(timeline_handles);
1912 : assert!(replaced.is_none());
1913 :
1914 : result
1915 : }
1916 :
1917 : #[allow(clippy::too_many_arguments)]
1918 0 : async fn handle_pagerequests_serial<IO>(
1919 0 : &mut self,
1920 0 : pgb_writer: &mut PostgresBackend<IO>,
1921 0 : mut pgb_reader: PostgresBackendReader<IO>,
1922 0 : tenant_id: TenantId,
1923 0 : timeline_id: TimelineId,
1924 0 : mut timeline_handles: TimelineHandles,
1925 0 : request_span: Span,
1926 0 : protocol_version: PagestreamProtocolVersion,
1927 0 : io_concurrency: IoConcurrency,
1928 0 : hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
1929 0 : ctx: &RequestContext,
1930 0 : ) -> (
1931 0 : (PostgresBackendReader<IO>, TimelineHandles),
1932 0 : Result<(), QueryError>,
1933 0 : )
1934 0 : where
1935 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1936 0 : {
1937 0 : let cancel = self.cancel.clone();
1938 :
1939 0 : let err = loop {
1940 0 : let msg = Self::pagestream_read_message(
1941 0 : &mut pgb_reader,
1942 0 : tenant_id,
1943 0 : timeline_id,
1944 0 : &mut timeline_handles,
1945 0 : &self.perf_span_fields,
1946 0 : &cancel,
1947 0 : ctx,
1948 0 : protocol_version,
1949 0 : request_span.clone(),
1950 0 : hold_gc_cutoff_guard,
1951 0 : )
1952 0 : .await;
1953 0 : let msg = match msg {
1954 0 : Ok(msg) => msg,
1955 0 : Err(e) => break e,
1956 : };
1957 0 : let msg = match msg {
1958 0 : Some(msg) => msg,
1959 : None => {
1960 0 : debug!("pagestream subprotocol end observed");
1961 0 : return ((pgb_reader, timeline_handles), Ok(()));
1962 : }
1963 : };
1964 :
1965 0 : let result = self
1966 0 : .pagestream_handle_batched_message(
1967 0 : pgb_writer,
1968 0 : msg,
1969 0 : io_concurrency.clone(),
1970 0 : &cancel,
1971 0 : protocol_version,
1972 0 : ctx,
1973 0 : )
1974 0 : .await;
1975 0 : match result {
1976 0 : Ok(()) => {}
1977 0 : Err(e) => break e,
1978 : }
1979 : };
1980 0 : ((pgb_reader, timeline_handles), Err(err))
1981 0 : }
1982 :
1983 : /// # Cancel-Safety
1984 : ///
1985 : /// May leak tokio tasks if not polled to completion.
1986 : #[allow(clippy::too_many_arguments)]
1987 0 : async fn handle_pagerequests_pipelined<IO>(
1988 0 : &mut self,
1989 0 : pgb_writer: &mut PostgresBackend<IO>,
1990 0 : pgb_reader: PostgresBackendReader<IO>,
1991 0 : tenant_id: TenantId,
1992 0 : timeline_id: TimelineId,
1993 0 : mut timeline_handles: TimelineHandles,
1994 0 : request_span: Span,
1995 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1996 0 : protocol_version: PagestreamProtocolVersion,
1997 0 : io_concurrency: IoConcurrency,
1998 0 : hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
1999 0 : ctx: &RequestContext,
2000 0 : ) -> (
2001 0 : (PostgresBackendReader<IO>, TimelineHandles),
2002 0 : Result<(), QueryError>,
2003 0 : )
2004 0 : where
2005 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2006 0 : {
2007 : //
2008 : // Pipelined pagestream handling consists of
2009 : // - a Batcher that reads requests off the wire and
2010 : // and batches them if possible,
2011 : // - an Executor that processes the batched requests.
2012 : //
2013 : // The batch is built up inside an `spsc_fold` channel,
2014 : // shared betwen Batcher (Sender) and Executor (Receiver).
2015 : //
2016 : // The Batcher continously folds client requests into the batch,
2017 : // while the Executor can at any time take out what's in the batch
2018 : // in order to process it.
2019 : // This means the next batch builds up while the Executor
2020 : // executes the last batch.
2021 : //
2022 : // CANCELLATION
2023 : //
2024 : // We run both Batcher and Executor futures to completion before
2025 : // returning from this function.
2026 : //
2027 : // If Executor exits first, it signals cancellation to the Batcher
2028 : // via a CancellationToken that is child of `self.cancel`.
2029 : // If Batcher exits first, it signals cancellation to the Executor
2030 : // by dropping the spsc_fold channel Sender.
2031 : //
2032 : // CLEAN SHUTDOWN
2033 : //
2034 : // Clean shutdown means that the client ends the COPYBOTH session.
2035 : // In response to such a client message, the Batcher exits.
2036 : // The Executor continues to run, draining the spsc_fold channel.
2037 : // Once drained, the spsc_fold recv will fail with a distinct error
2038 : // indicating that the sender disconnected.
2039 : // The Executor exits with Ok(()) in response to that error.
2040 : //
2041 : // Server initiated shutdown is not clean shutdown, but instead
2042 : // is an error Err(QueryError::Shutdown) that is propagated through
2043 : // error propagation.
2044 : //
2045 : // ERROR PROPAGATION
2046 : //
2047 : // When the Batcher encounter an error, it sends it as a value
2048 : // through the spsc_fold channel and exits afterwards.
2049 : // When the Executor observes such an error in the channel,
2050 : // it exits returning that error value.
2051 : //
2052 : // This design ensures that the Executor stage will still process
2053 : // the batch that was in flight when the Batcher encountered an error,
2054 : // thereby beahving identical to a serial implementation.
2055 :
2056 : let PageServicePipeliningConfigPipelined {
2057 0 : max_batch_size,
2058 0 : execution,
2059 0 : batching: batching_strategy,
2060 0 : } = pipelining_config;
2061 :
2062 : // Macro to _define_ a pipeline stage.
2063 : macro_rules! pipeline_stage {
2064 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
2065 : let cancel: CancellationToken = $cancel;
2066 : let stage_fut = $make_fut(cancel.clone());
2067 0 : async move {
2068 0 : scopeguard::defer! {
2069 : debug!("exiting");
2070 : }
2071 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
2072 0 : .await
2073 0 : }
2074 : .instrument(tracing::info_span!($name))
2075 : }};
2076 : }
2077 :
2078 : //
2079 : // Batcher
2080 : //
2081 :
2082 0 : let perf_span_fields = self.perf_span_fields.clone();
2083 :
2084 0 : let cancel_batcher = self.cancel.child_token();
2085 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
2086 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
2087 0 : let ctx = ctx.attached_child();
2088 0 : async move {
2089 0 : let mut pgb_reader = pgb_reader;
2090 0 : let mut exit = false;
2091 0 : while !exit {
2092 0 : let read_res = Self::pagestream_read_message(
2093 0 : &mut pgb_reader,
2094 0 : tenant_id,
2095 0 : timeline_id,
2096 0 : &mut timeline_handles,
2097 0 : &perf_span_fields,
2098 0 : &cancel_batcher,
2099 0 : &ctx,
2100 0 : protocol_version,
2101 0 : request_span.clone(),
2102 0 : hold_gc_cutoff_guard,
2103 0 : )
2104 0 : .await;
2105 0 : let Some(read_res) = read_res.transpose() else {
2106 0 : debug!("client-initiated shutdown");
2107 0 : break;
2108 : };
2109 0 : exit |= read_res.is_err();
2110 0 : let could_send = batch_tx
2111 0 : .send(read_res, |batch, res| {
2112 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
2113 0 : })
2114 0 : .await;
2115 0 : exit |= could_send.is_err();
2116 : }
2117 0 : (pgb_reader, timeline_handles)
2118 0 : }
2119 0 : });
2120 :
2121 : //
2122 : // Executor
2123 : //
2124 :
2125 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
2126 0 : let ctx = ctx.attached_child();
2127 0 : async move {
2128 0 : let _cancel_batcher = cancel_batcher.drop_guard();
2129 : loop {
2130 0 : let maybe_batch = batch_rx.recv().await;
2131 0 : let batch = match maybe_batch {
2132 0 : Ok(batch) => batch,
2133 : Err(spsc_fold::RecvError::SenderGone) => {
2134 0 : debug!("upstream gone");
2135 0 : return Ok(());
2136 : }
2137 : };
2138 0 : let mut batch = match batch {
2139 0 : Ok(batch) => batch,
2140 0 : Err(e) => {
2141 0 : return Err(e);
2142 : }
2143 : };
2144 :
2145 : if let BatchedFeMessage::GetPage {
2146 0 : pages,
2147 : span: _,
2148 : shard: _,
2149 : applied_gc_cutoff_guard: _,
2150 : batch_break_reason: _,
2151 0 : } = &mut batch
2152 : {
2153 0 : for req in pages {
2154 0 : req.batch_wait_ctx.take();
2155 0 : }
2156 0 : }
2157 :
2158 0 : self.pagestream_handle_batched_message(
2159 0 : pgb_writer,
2160 0 : batch,
2161 0 : io_concurrency.clone(),
2162 0 : &cancel,
2163 0 : protocol_version,
2164 0 : &ctx,
2165 0 : )
2166 0 : .await?;
2167 : }
2168 0 : }
2169 0 : });
2170 :
2171 : //
2172 : // Execute the stages.
2173 : //
2174 :
2175 0 : match execution {
2176 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
2177 0 : tokio::join!(batcher, executor)
2178 : }
2179 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
2180 : // These tasks are not tracked anywhere.
2181 0 : let read_messages_task = tokio::spawn(batcher);
2182 0 : let (read_messages_task_res, executor_res_) =
2183 0 : tokio::join!(read_messages_task, executor,);
2184 0 : (
2185 0 : read_messages_task_res.expect("propagated panic from read_messages"),
2186 0 : executor_res_,
2187 0 : )
2188 : }
2189 : }
2190 0 : }
2191 :
2192 : /// Helper function to handle the LSN from client request.
2193 : ///
2194 : /// Each GetPage (and Exists and Nblocks) request includes information about
2195 : /// which version of the page is being requested. The primary compute node
2196 : /// will always request the latest page version, by setting 'request_lsn' to
2197 : /// the last inserted or flushed WAL position, while a standby will request
2198 : /// a version at the LSN that it's currently caught up to.
2199 : ///
2200 : /// In either case, if the page server hasn't received the WAL up to the
2201 : /// requested LSN yet, we will wait for it to arrive. The return value is
2202 : /// the LSN that should be used to look up the page versions.
2203 : ///
2204 : /// In addition to the request LSN, each request carries another LSN,
2205 : /// 'not_modified_since', which is a hint to the pageserver that the client
2206 : /// knows that the page has not been modified between 'not_modified_since'
2207 : /// and the request LSN. This allows skipping the wait, as long as the WAL
2208 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
2209 : /// information about when the page was modified, it will use
2210 : /// not_modified_since == lsn. If the client lies and sends a too low
2211 : /// not_modified_hint such that there are in fact later page versions, the
2212 : /// behavior is undefined: the pageserver may return any of the page versions
2213 : /// or an error.
2214 0 : async fn wait_or_get_last_lsn(
2215 0 : timeline: &Timeline,
2216 0 : request_lsn: Lsn,
2217 0 : not_modified_since: Lsn,
2218 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2219 0 : ctx: &RequestContext,
2220 0 : ) -> Result<Lsn, PageStreamError> {
2221 0 : let last_record_lsn = timeline.get_last_record_lsn();
2222 0 : let effective_request_lsn = Self::effective_request_lsn(
2223 0 : timeline,
2224 0 : last_record_lsn,
2225 0 : request_lsn,
2226 0 : not_modified_since,
2227 0 : latest_gc_cutoff_lsn,
2228 0 : )?;
2229 :
2230 0 : if effective_request_lsn > last_record_lsn {
2231 0 : timeline
2232 0 : .wait_lsn(
2233 0 : not_modified_since,
2234 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2235 0 : timeline::WaitLsnTimeout::Default,
2236 0 : ctx,
2237 0 : )
2238 0 : .await?;
2239 :
2240 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
2241 : // record LSN. (Or close enough for our purposes; the last-record LSN can
2242 : // advance immediately after we return anyway)
2243 0 : }
2244 :
2245 0 : Ok(effective_request_lsn)
2246 0 : }
2247 :
2248 0 : fn effective_request_lsn(
2249 0 : timeline: &Timeline,
2250 0 : last_record_lsn: Lsn,
2251 0 : request_lsn: Lsn,
2252 0 : not_modified_since: Lsn,
2253 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2254 0 : ) -> Result<Lsn, PageStreamError> {
2255 : // Sanity check the request
2256 0 : if request_lsn < not_modified_since {
2257 0 : return Err(PageStreamError::BadRequest(
2258 0 : format!(
2259 0 : "invalid request with request LSN {request_lsn} and not_modified_since {not_modified_since}",
2260 0 : )
2261 0 : .into(),
2262 0 : ));
2263 0 : }
2264 :
2265 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2266 0 : if request_lsn == Lsn::INVALID {
2267 0 : return Err(PageStreamError::BadRequest(
2268 0 : "invalid LSN(0) in request".into(),
2269 0 : ));
2270 0 : }
2271 :
2272 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2273 : //
2274 : // We may have older data available, but we make a best effort to detect this case and return an error,
2275 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2276 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2277 0 : let gc_info = &timeline.gc_info.read().unwrap();
2278 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2279 0 : return Err(
2280 0 : PageStreamError::BadRequest(format!(
2281 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2282 0 : request_lsn, **latest_gc_cutoff_lsn
2283 0 : ).into())
2284 0 : );
2285 0 : }
2286 0 : }
2287 :
2288 0 : if not_modified_since > last_record_lsn {
2289 0 : Ok(not_modified_since)
2290 : } else {
2291 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2292 : // here instead. That would give the same result, since we know that there
2293 : // haven't been any modifications since 'not_modified_since'. Using an older
2294 : // LSN might be faster, because that could allow skipping recent layers when
2295 : // finding the page. However, we have historically used 'last_record_lsn', so
2296 : // stick to that for now.
2297 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2298 : }
2299 0 : }
2300 :
2301 : /// Handles the lsn lease request.
2302 : /// If a lease cannot be obtained, the client will receive NULL.
2303 : #[instrument(skip_all, fields(shard_id, %lsn))]
2304 : async fn handle_make_lsn_lease<IO>(
2305 : &mut self,
2306 : pgb: &mut PostgresBackend<IO>,
2307 : tenant_shard_id: TenantShardId,
2308 : timeline_id: TimelineId,
2309 : lsn: Lsn,
2310 : ctx: &RequestContext,
2311 : ) -> Result<(), QueryError>
2312 : where
2313 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2314 : {
2315 : let timeline = self
2316 : .timeline_handles
2317 : .as_mut()
2318 : .unwrap()
2319 : .get(
2320 : tenant_shard_id.tenant_id,
2321 : timeline_id,
2322 : ShardSelector::Known(tenant_shard_id.to_index()),
2323 : )
2324 : .await?;
2325 : set_tracing_field_shard_id(&timeline);
2326 :
2327 : let lease = timeline
2328 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2329 0 : .inspect_err(|e| {
2330 0 : warn!("{e}");
2331 0 : })
2332 : .ok();
2333 0 : let valid_until_str = lease.map(|l| {
2334 0 : l.valid_until
2335 0 : .duration_since(SystemTime::UNIX_EPOCH)
2336 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2337 0 : .as_millis()
2338 0 : .to_string()
2339 0 : });
2340 :
2341 : info!(
2342 : "acquired lease for {} until {}",
2343 : lsn,
2344 : valid_until_str.as_deref().unwrap_or("<unknown>")
2345 : );
2346 :
2347 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2348 :
2349 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2350 : b"valid_until",
2351 : )]))?
2352 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2353 :
2354 : Ok(())
2355 : }
2356 :
2357 : #[instrument(skip_all, fields(shard_id))]
2358 : async fn handle_get_rel_exists_request(
2359 : timeline: &Timeline,
2360 : req: &PagestreamExistsRequest,
2361 : ctx: &RequestContext,
2362 : ) -> Result<PagestreamExistsResponse, PageStreamError> {
2363 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2364 : let lsn = Self::wait_or_get_last_lsn(
2365 : timeline,
2366 : req.hdr.request_lsn,
2367 : req.hdr.not_modified_since,
2368 : &latest_gc_cutoff_lsn,
2369 : ctx,
2370 : )
2371 : .await?;
2372 :
2373 : let exists = timeline
2374 : .get_rel_exists(
2375 : req.rel,
2376 : Version::LsnRange(LsnRange {
2377 : effective_lsn: lsn,
2378 : request_lsn: req.hdr.request_lsn,
2379 : }),
2380 : ctx,
2381 : )
2382 : .await?;
2383 :
2384 : Ok(PagestreamExistsResponse { req: *req, exists })
2385 : }
2386 :
2387 : /// If `allow_missing` is true, returns None instead of Err on missing relations. Otherwise,
2388 : /// never returns None. It is only supported by the gRPC protocol, so we pass it separately to
2389 : /// avoid changing the libpq protocol types.
2390 : #[instrument(skip_all, fields(shard_id))]
2391 : async fn handle_get_nblocks_request(
2392 : timeline: &Timeline,
2393 : req: &PagestreamNblocksRequest,
2394 : allow_missing: bool,
2395 : ctx: &RequestContext,
2396 : ) -> Result<Option<PagestreamNblocksResponse>, PageStreamError> {
2397 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2398 : let lsn = Self::wait_or_get_last_lsn(
2399 : timeline,
2400 : req.hdr.request_lsn,
2401 : req.hdr.not_modified_since,
2402 : &latest_gc_cutoff_lsn,
2403 : ctx,
2404 : )
2405 : .await?;
2406 :
2407 : let n_blocks = timeline
2408 : .get_rel_size_in_reldir(
2409 : req.rel,
2410 : Version::LsnRange(LsnRange {
2411 : effective_lsn: lsn,
2412 : request_lsn: req.hdr.request_lsn,
2413 : }),
2414 : None,
2415 : allow_missing,
2416 : ctx,
2417 : )
2418 : .await?;
2419 : let Some(n_blocks) = n_blocks else {
2420 : return Ok(None);
2421 : };
2422 :
2423 : Ok(Some(PagestreamNblocksResponse {
2424 : req: *req,
2425 : n_blocks,
2426 : }))
2427 : }
2428 :
2429 : #[instrument(skip_all, fields(shard_id))]
2430 : async fn handle_db_size_request(
2431 : timeline: &Timeline,
2432 : req: &PagestreamDbSizeRequest,
2433 : ctx: &RequestContext,
2434 : ) -> Result<PagestreamDbSizeResponse, PageStreamError> {
2435 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2436 : let lsn = Self::wait_or_get_last_lsn(
2437 : timeline,
2438 : req.hdr.request_lsn,
2439 : req.hdr.not_modified_since,
2440 : &latest_gc_cutoff_lsn,
2441 : ctx,
2442 : )
2443 : .await?;
2444 :
2445 : let total_blocks = timeline
2446 : .get_db_size(
2447 : DEFAULTTABLESPACE_OID,
2448 : req.dbnode,
2449 : Version::LsnRange(LsnRange {
2450 : effective_lsn: lsn,
2451 : request_lsn: req.hdr.request_lsn,
2452 : }),
2453 : ctx,
2454 : )
2455 : .await?;
2456 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2457 :
2458 : Ok(PagestreamDbSizeResponse { req: *req, db_size })
2459 : }
2460 :
2461 : #[instrument(skip_all)]
2462 : async fn handle_get_page_at_lsn_request_batched(
2463 : timeline: &Timeline,
2464 : requests: SmallVec<[BatchedGetPageRequest; 1]>,
2465 : io_concurrency: IoConcurrency,
2466 : batch_break_reason: GetPageBatchBreakReason,
2467 : ctx: &RequestContext,
2468 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2469 : {
2470 : debug_assert_current_span_has_tenant_and_timeline_id();
2471 :
2472 : timeline
2473 : .query_metrics
2474 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2475 :
2476 : // If a page trace is running, submit an event for this request.
2477 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2478 : let time = SystemTime::now();
2479 : for batch in &requests {
2480 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2481 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2482 : _ = page_trace.try_send(PageTraceEvent {
2483 : key,
2484 : effective_lsn: batch.lsn_range.effective_lsn,
2485 : time,
2486 : });
2487 : }
2488 : }
2489 :
2490 : // If any request in the batch needs to wait for LSN, then do so now.
2491 : let mut perf_instrument = false;
2492 : let max_effective_lsn = requests
2493 : .iter()
2494 0 : .map(|req| {
2495 0 : if req.ctx.has_perf_span() {
2496 0 : perf_instrument = true;
2497 0 : }
2498 :
2499 0 : req.lsn_range.effective_lsn
2500 0 : })
2501 : .max()
2502 : .expect("batch is never empty");
2503 :
2504 : let ctx = match perf_instrument {
2505 : true => RequestContextBuilder::from(ctx)
2506 0 : .root_perf_span(|| {
2507 0 : info_span!(
2508 : target: PERF_TRACE_TARGET,
2509 : "GET_VECTORED",
2510 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2511 : timeline_id = %timeline.timeline_id,
2512 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2513 : %max_effective_lsn
2514 : )
2515 0 : })
2516 : .attached_child(),
2517 : false => ctx.attached_child(),
2518 : };
2519 :
2520 : let last_record_lsn = timeline.get_last_record_lsn();
2521 : if max_effective_lsn > last_record_lsn {
2522 : if let Err(e) = timeline
2523 : .wait_lsn(
2524 : max_effective_lsn,
2525 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2526 : timeline::WaitLsnTimeout::Default,
2527 : &ctx,
2528 : )
2529 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2530 0 : info_span!(
2531 : target: PERF_TRACE_TARGET,
2532 0 : parent: current_perf_span,
2533 : "WAIT_LSN",
2534 : )
2535 0 : })
2536 : .await
2537 : {
2538 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2539 0 : Err(BatchedPageStreamError {
2540 0 : err: PageStreamError::from(e.clone()),
2541 0 : req: req.req.hdr,
2542 0 : })
2543 0 : }));
2544 : }
2545 : }
2546 :
2547 : let results = timeline
2548 : .get_rel_page_at_lsn_batched(
2549 0 : requests.iter().map(|p| {
2550 0 : (
2551 0 : &p.req.rel,
2552 0 : &p.req.blkno,
2553 0 : p.lsn_range,
2554 0 : p.ctx.attached_child(),
2555 0 : )
2556 0 : }),
2557 : io_concurrency,
2558 : &ctx,
2559 : )
2560 : .await;
2561 : assert_eq!(results.len(), requests.len());
2562 :
2563 : // TODO: avoid creating the new Vec here
2564 : Vec::from_iter(
2565 : requests
2566 : .into_iter()
2567 : .zip(results.into_iter())
2568 0 : .map(|(req, res)| {
2569 0 : res.map(|page| {
2570 0 : (
2571 0 : PagestreamBeMessage::GetPage(
2572 0 : pagestream_api::PagestreamGetPageResponse { req: req.req, page },
2573 0 : ),
2574 0 : req.timer,
2575 0 : req.ctx,
2576 0 : )
2577 0 : })
2578 0 : .map_err(|e| BatchedPageStreamError {
2579 0 : err: PageStreamError::from(e),
2580 0 : req: req.req.hdr,
2581 0 : })
2582 0 : }),
2583 : )
2584 : }
2585 :
2586 : #[instrument(skip_all, fields(shard_id))]
2587 : async fn handle_get_slru_segment_request(
2588 : timeline: &Timeline,
2589 : req: &PagestreamGetSlruSegmentRequest,
2590 : ctx: &RequestContext,
2591 : ) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
2592 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2593 : let lsn = Self::wait_or_get_last_lsn(
2594 : timeline,
2595 : req.hdr.request_lsn,
2596 : req.hdr.not_modified_since,
2597 : &latest_gc_cutoff_lsn,
2598 : ctx,
2599 : )
2600 : .await?;
2601 :
2602 : let kind = SlruKind::from_repr(req.kind)
2603 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2604 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2605 :
2606 : Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
2607 : }
2608 :
2609 : // NB: this impl mimics what we do for batched getpage requests.
2610 : #[cfg(feature = "testing")]
2611 : #[instrument(skip_all, fields(shard_id))]
2612 : async fn handle_test_request_batch(
2613 : timeline: &Timeline,
2614 : requests: Vec<BatchedTestRequest>,
2615 : _ctx: &RequestContext,
2616 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2617 : {
2618 : // real requests would do something with the timeline
2619 : let mut results = Vec::with_capacity(requests.len());
2620 : for _req in requests.iter() {
2621 : tokio::task::yield_now().await;
2622 :
2623 : results.push({
2624 : if timeline.cancel.is_cancelled() {
2625 : Err(PageReconstructError::Cancelled)
2626 : } else {
2627 : Ok(())
2628 : }
2629 : });
2630 : }
2631 :
2632 : // TODO: avoid creating the new Vec here
2633 : Vec::from_iter(
2634 : requests
2635 : .into_iter()
2636 : .zip(results.into_iter())
2637 0 : .map(|(req, res)| {
2638 0 : res.map(|()| {
2639 0 : (
2640 0 : PagestreamBeMessage::Test(pagestream_api::PagestreamTestResponse {
2641 0 : req: req.req.clone(),
2642 0 : }),
2643 0 : req.timer,
2644 0 : RequestContext::new(
2645 0 : TaskKind::PageRequestHandler,
2646 0 : DownloadBehavior::Warn,
2647 0 : ),
2648 0 : )
2649 0 : })
2650 0 : .map_err(|e| BatchedPageStreamError {
2651 0 : err: PageStreamError::from(e),
2652 0 : req: req.req.hdr,
2653 0 : })
2654 0 : }),
2655 : )
2656 : }
2657 :
2658 : /// Note on "fullbackup":
2659 : /// Full basebackups should only be used for debugging purposes.
2660 : /// Originally, it was introduced to enable breaking storage format changes,
2661 : /// but that is not applicable anymore.
2662 : ///
2663 : /// # Coding Discipline
2664 : ///
2665 : /// Coding discipline within this function: all interaction with the `pgb` connection
2666 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2667 : /// This is so that we can shutdown page_service quickly.
2668 : ///
2669 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2670 : /// to connection cancellation.
2671 : #[allow(clippy::too_many_arguments)]
2672 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2673 : async fn handle_basebackup_request<IO>(
2674 : &mut self,
2675 : pgb: &mut PostgresBackend<IO>,
2676 : tenant_id: TenantId,
2677 : timeline_id: TimelineId,
2678 : lsn: Option<Lsn>,
2679 : prev_lsn: Option<Lsn>,
2680 : full_backup: bool,
2681 : gzip: bool,
2682 : replica: bool,
2683 : ctx: &RequestContext,
2684 : ) -> Result<(), QueryError>
2685 : where
2686 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2687 : {
2688 : let started = std::time::Instant::now();
2689 :
2690 : let timeline = self
2691 : .timeline_handles
2692 : .as_mut()
2693 : .unwrap()
2694 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2695 : .await?;
2696 : set_tracing_field_shard_id(&timeline);
2697 : let ctx = ctx.with_scope_timeline(&timeline);
2698 :
2699 : if timeline.is_archived() == Some(true) {
2700 : tracing::info!(
2701 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2702 : );
2703 : return Err(QueryError::NotFound("timeline is archived".into()));
2704 : }
2705 :
2706 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2707 : if let Some(lsn) = lsn {
2708 : // Backup was requested at a particular LSN. Wait for it to arrive.
2709 : info!("waiting for {}", lsn);
2710 : timeline
2711 : .wait_lsn(
2712 : lsn,
2713 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2714 : crate::tenant::timeline::WaitLsnTimeout::Default,
2715 : &ctx,
2716 : )
2717 : .await?;
2718 : timeline
2719 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2720 : .context("invalid basebackup lsn")?;
2721 : }
2722 :
2723 : let lsn_awaited_after = started.elapsed();
2724 :
2725 : // switch client to COPYOUT
2726 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2727 : .map_err(QueryError::Disconnected)?;
2728 : self.flush_cancellable(pgb, &self.cancel).await?;
2729 :
2730 : let mut from_cache = false;
2731 :
2732 : // Send a tarball of the latest layer on the timeline. Compress if not
2733 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2734 : if full_backup {
2735 : let mut writer = pgb.copyout_writer();
2736 : basebackup::send_basebackup_tarball(
2737 : &mut writer,
2738 : &timeline,
2739 : lsn,
2740 : prev_lsn,
2741 : full_backup,
2742 : replica,
2743 : None,
2744 : &ctx,
2745 : )
2746 : .await?;
2747 : } else {
2748 : let mut writer = BufWriter::new(pgb.copyout_writer());
2749 :
2750 : let cached = timeline
2751 : .get_cached_basebackup_if_enabled(lsn, prev_lsn, full_backup, replica, gzip)
2752 : .await;
2753 :
2754 : if let Some(mut cached) = cached {
2755 : from_cache = true;
2756 : tokio::io::copy(&mut cached, &mut writer)
2757 : .await
2758 0 : .map_err(|err| {
2759 0 : BasebackupError::Client(err, "handle_basebackup_request,cached,copy")
2760 0 : })?;
2761 : } else {
2762 : basebackup::send_basebackup_tarball(
2763 : &mut writer,
2764 : &timeline,
2765 : lsn,
2766 : prev_lsn,
2767 : full_backup,
2768 : replica,
2769 : // NB: using fast compression because it's on the critical path for compute
2770 : // startup. For an empty database, we get <100KB with this method. The
2771 : // Level::Best compression method gives us <20KB, but maybe we should add
2772 : // basebackup caching on compute shutdown first.
2773 : gzip.then_some(async_compression::Level::Fastest),
2774 : &ctx,
2775 : )
2776 : .await?;
2777 : }
2778 : writer
2779 : .flush()
2780 : .await
2781 0 : .map_err(|err| BasebackupError::Client(err, "handle_basebackup_request,flush"))?;
2782 : }
2783 :
2784 : pgb.write_message_noflush(&BeMessage::CopyDone)
2785 : .map_err(QueryError::Disconnected)?;
2786 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2787 :
2788 : let basebackup_after = started
2789 : .elapsed()
2790 : .checked_sub(lsn_awaited_after)
2791 : .unwrap_or(Duration::ZERO);
2792 :
2793 : info!(
2794 : lsn_await_millis = lsn_awaited_after.as_millis(),
2795 : basebackup_millis = basebackup_after.as_millis(),
2796 : %from_cache,
2797 : "basebackup complete"
2798 : );
2799 :
2800 : Ok(())
2801 : }
2802 :
2803 : // when accessing management api supply None as an argument
2804 : // when using to authorize tenant pass corresponding tenant id
2805 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2806 0 : if self.auth.is_none() {
2807 : // auth is set to Trust, nothing to check so just return ok
2808 0 : return Ok(());
2809 0 : }
2810 : // auth is some, just checked above, when auth is some
2811 : // then claims are always present because of checks during connection init
2812 : // so this expect won't trigger
2813 0 : let claims = self
2814 0 : .claims
2815 0 : .as_ref()
2816 0 : .expect("claims presence already checked");
2817 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2818 0 : }
2819 : }
2820 :
2821 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2822 : #[derive(Debug, Clone, Eq, PartialEq)]
2823 : struct BaseBackupCmd {
2824 : tenant_id: TenantId,
2825 : timeline_id: TimelineId,
2826 : lsn: Option<Lsn>,
2827 : gzip: bool,
2828 : replica: bool,
2829 : }
2830 :
2831 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2832 : #[derive(Debug, Clone, Eq, PartialEq)]
2833 : struct FullBackupCmd {
2834 : tenant_id: TenantId,
2835 : timeline_id: TimelineId,
2836 : lsn: Option<Lsn>,
2837 : prev_lsn: Option<Lsn>,
2838 : }
2839 :
2840 : /// `pagestream_v2 tenant timeline`
2841 : #[derive(Debug, Clone, Eq, PartialEq)]
2842 : struct PageStreamCmd {
2843 : tenant_id: TenantId,
2844 : timeline_id: TimelineId,
2845 : protocol_version: PagestreamProtocolVersion,
2846 : }
2847 :
2848 : /// `lease lsn tenant timeline lsn`
2849 : #[derive(Debug, Clone, Eq, PartialEq)]
2850 : struct LeaseLsnCmd {
2851 : tenant_shard_id: TenantShardId,
2852 : timeline_id: TimelineId,
2853 : lsn: Lsn,
2854 : }
2855 :
2856 : #[derive(Debug, Clone, Eq, PartialEq)]
2857 : enum PageServiceCmd {
2858 : Set,
2859 : PageStream(PageStreamCmd),
2860 : BaseBackup(BaseBackupCmd),
2861 : FullBackup(FullBackupCmd),
2862 : LeaseLsn(LeaseLsnCmd),
2863 : }
2864 :
2865 : impl PageStreamCmd {
2866 3 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2867 3 : let parameters = query.split_whitespace().collect_vec();
2868 3 : if parameters.len() != 2 {
2869 1 : bail!(
2870 1 : "invalid number of parameters for pagestream command: {}",
2871 : query
2872 : );
2873 2 : }
2874 2 : let tenant_id = TenantId::from_str(parameters[0])
2875 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2876 1 : let timeline_id = TimelineId::from_str(parameters[1])
2877 1 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2878 1 : Ok(Self {
2879 1 : tenant_id,
2880 1 : timeline_id,
2881 1 : protocol_version,
2882 1 : })
2883 3 : }
2884 : }
2885 :
2886 : impl FullBackupCmd {
2887 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2888 2 : let parameters = query.split_whitespace().collect_vec();
2889 2 : if parameters.len() < 2 || parameters.len() > 4 {
2890 0 : bail!(
2891 0 : "invalid number of parameters for basebackup command: {}",
2892 : query
2893 : );
2894 2 : }
2895 2 : let tenant_id = TenantId::from_str(parameters[0])
2896 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2897 2 : let timeline_id = TimelineId::from_str(parameters[1])
2898 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2899 : // The caller is responsible for providing correct lsn and prev_lsn.
2900 2 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2901 : Some(
2902 1 : Lsn::from_str(lsn_str)
2903 1 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2904 : )
2905 : } else {
2906 1 : None
2907 : };
2908 2 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2909 : Some(
2910 1 : Lsn::from_str(prev_lsn_str)
2911 1 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2912 : )
2913 : } else {
2914 1 : None
2915 : };
2916 2 : Ok(Self {
2917 2 : tenant_id,
2918 2 : timeline_id,
2919 2 : lsn,
2920 2 : prev_lsn,
2921 2 : })
2922 2 : }
2923 : }
2924 :
2925 : impl BaseBackupCmd {
2926 9 : fn parse(query: &str) -> anyhow::Result<Self> {
2927 9 : let parameters = query.split_whitespace().collect_vec();
2928 9 : if parameters.len() < 2 {
2929 0 : bail!(
2930 0 : "invalid number of parameters for basebackup command: {}",
2931 : query
2932 : );
2933 9 : }
2934 9 : let tenant_id = TenantId::from_str(parameters[0])
2935 9 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2936 9 : let timeline_id = TimelineId::from_str(parameters[1])
2937 9 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2938 : let lsn;
2939 : let flags_parse_from;
2940 9 : if let Some(maybe_lsn) = parameters.get(2) {
2941 8 : if *maybe_lsn == "latest" {
2942 1 : lsn = None;
2943 1 : flags_parse_from = 3;
2944 7 : } else if maybe_lsn.starts_with("--") {
2945 5 : lsn = None;
2946 5 : flags_parse_from = 2;
2947 5 : } else {
2948 : lsn = Some(
2949 2 : Lsn::from_str(maybe_lsn)
2950 2 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2951 : );
2952 2 : flags_parse_from = 3;
2953 : }
2954 1 : } else {
2955 1 : lsn = None;
2956 1 : flags_parse_from = 2;
2957 1 : }
2958 :
2959 9 : let mut gzip = false;
2960 9 : let mut replica = false;
2961 :
2962 11 : for ¶m in ¶meters[flags_parse_from..] {
2963 11 : match param {
2964 11 : "--gzip" => {
2965 7 : if gzip {
2966 1 : bail!("duplicate parameter for basebackup command: {param}")
2967 6 : }
2968 6 : gzip = true
2969 : }
2970 4 : "--replica" => {
2971 2 : if replica {
2972 0 : bail!("duplicate parameter for basebackup command: {param}")
2973 2 : }
2974 2 : replica = true
2975 : }
2976 2 : _ => bail!("invalid parameter for basebackup command: {param}"),
2977 : }
2978 : }
2979 6 : Ok(Self {
2980 6 : tenant_id,
2981 6 : timeline_id,
2982 6 : lsn,
2983 6 : gzip,
2984 6 : replica,
2985 6 : })
2986 9 : }
2987 : }
2988 :
2989 : impl LeaseLsnCmd {
2990 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2991 2 : let parameters = query.split_whitespace().collect_vec();
2992 2 : if parameters.len() != 3 {
2993 0 : bail!(
2994 0 : "invalid number of parameters for lease lsn command: {}",
2995 : query
2996 : );
2997 2 : }
2998 2 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2999 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
3000 2 : let timeline_id = TimelineId::from_str(parameters[1])
3001 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
3002 2 : let lsn = Lsn::from_str(parameters[2])
3003 2 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
3004 2 : Ok(Self {
3005 2 : tenant_shard_id,
3006 2 : timeline_id,
3007 2 : lsn,
3008 2 : })
3009 2 : }
3010 : }
3011 :
3012 : impl PageServiceCmd {
3013 21 : fn parse(query: &str) -> anyhow::Result<Self> {
3014 21 : let query = query.trim();
3015 21 : let Some((cmd, other)) = query.split_once(' ') else {
3016 2 : bail!("cannot parse query: {query}")
3017 : };
3018 19 : match cmd.to_ascii_lowercase().as_str() {
3019 19 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
3020 3 : other,
3021 3 : PagestreamProtocolVersion::V2,
3022 2 : )?)),
3023 16 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
3024 0 : other,
3025 0 : PagestreamProtocolVersion::V3,
3026 0 : )?)),
3027 16 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
3028 7 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
3029 5 : "lease" => {
3030 3 : let Some((cmd2, other)) = other.split_once(' ') else {
3031 0 : bail!("invalid lease command: {cmd}");
3032 : };
3033 3 : let cmd2 = cmd2.to_ascii_lowercase();
3034 3 : if cmd2 == "lsn" {
3035 2 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
3036 : } else {
3037 1 : bail!("invalid lease command: {cmd}");
3038 : }
3039 : }
3040 2 : "set" => Ok(Self::Set),
3041 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
3042 : }
3043 21 : }
3044 : }
3045 :
3046 : /// Parse the startup options from the postgres wire protocol startup packet.
3047 : ///
3048 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
3049 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
3050 : /// whether all options are successfully parsed. There could be duplicates in the options
3051 : /// if the caller passed such parameters.
3052 7 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
3053 7 : let mut parsing_config = false;
3054 7 : let mut has_error = false;
3055 7 : let mut config = Vec::new();
3056 16 : for item in options.split_whitespace() {
3057 16 : if item == "-c" {
3058 9 : if !parsing_config {
3059 8 : parsing_config = true;
3060 8 : } else {
3061 : // "-c" followed with another "-c"
3062 1 : tracing::warn!("failed to parse the startup options: {options}");
3063 1 : has_error = true;
3064 1 : break;
3065 : }
3066 7 : } else if item.starts_with("-c") || parsing_config {
3067 7 : let Some((mut key, value)) = item.split_once('=') else {
3068 : // "-c" followed with an invalid option
3069 1 : tracing::warn!("failed to parse the startup options: {options}");
3070 1 : has_error = true;
3071 1 : break;
3072 : };
3073 6 : if !parsing_config {
3074 : // Parse "-coptions=X"
3075 1 : let Some(stripped_key) = key.strip_prefix("-c") else {
3076 0 : tracing::warn!("failed to parse the startup options: {options}");
3077 0 : has_error = true;
3078 0 : break;
3079 : };
3080 1 : key = stripped_key;
3081 5 : }
3082 6 : config.push((key.to_string(), value.to_string()));
3083 6 : parsing_config = false;
3084 : } else {
3085 0 : tracing::warn!("failed to parse the startup options: {options}");
3086 0 : has_error = true;
3087 0 : break;
3088 : }
3089 : }
3090 7 : if parsing_config {
3091 : // "-c" without the option
3092 3 : tracing::warn!("failed to parse the startup options: {options}");
3093 3 : has_error = true;
3094 4 : }
3095 7 : (config, has_error)
3096 7 : }
3097 :
3098 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
3099 : where
3100 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
3101 : {
3102 0 : fn check_auth_jwt(
3103 0 : &mut self,
3104 0 : _pgb: &mut PostgresBackend<IO>,
3105 0 : jwt_response: &[u8],
3106 0 : ) -> Result<(), QueryError> {
3107 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
3108 : // which requires auth to be present
3109 0 : let data: TokenData<Claims> = self
3110 0 : .auth
3111 0 : .as_ref()
3112 0 : .unwrap()
3113 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
3114 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
3115 :
3116 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
3117 0 : return Err(QueryError::Unauthorized(
3118 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
3119 0 : ));
3120 0 : }
3121 :
3122 0 : debug!(
3123 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
3124 : data.claims.scope, data.claims.tenant_id,
3125 : );
3126 :
3127 0 : self.claims = Some(data.claims);
3128 0 : Ok(())
3129 0 : }
3130 :
3131 0 : fn startup(
3132 0 : &mut self,
3133 0 : _pgb: &mut PostgresBackend<IO>,
3134 0 : sm: &FeStartupPacket,
3135 0 : ) -> Result<(), QueryError> {
3136 0 : fail::fail_point!("ps::connection-start::startup-packet");
3137 :
3138 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
3139 0 : if let Some(app_name) = params.get("application_name") {
3140 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
3141 0 : Span::current().record("application_name", field::display(app_name));
3142 0 : }
3143 0 : if let Some(options) = params.get("options") {
3144 0 : let (config, _) = parse_options(options);
3145 0 : for (key, value) in config {
3146 0 : if key == "neon.compute_mode" {
3147 0 : self.perf_span_fields.compute_mode = Some(value.clone());
3148 0 : Span::current().record("compute_mode", field::display(value));
3149 0 : }
3150 : }
3151 0 : }
3152 0 : };
3153 :
3154 0 : Ok(())
3155 0 : }
3156 :
3157 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
3158 : async fn process_query(
3159 : &mut self,
3160 : pgb: &mut PostgresBackend<IO>,
3161 : query_string: &str,
3162 : ) -> Result<(), QueryError> {
3163 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
3164 0 : info!("Hit failpoint for bad connection");
3165 0 : Err(QueryError::SimulatedConnectionError)
3166 0 : });
3167 :
3168 : fail::fail_point!("ps::connection-start::process-query");
3169 :
3170 : let ctx = self.connection_ctx.attached_child();
3171 : debug!("process query {query_string}");
3172 : let query = PageServiceCmd::parse(query_string)?;
3173 : match query {
3174 : PageServiceCmd::PageStream(PageStreamCmd {
3175 : tenant_id,
3176 : timeline_id,
3177 : protocol_version,
3178 : }) => {
3179 : tracing::Span::current()
3180 : .record("tenant_id", field::display(tenant_id))
3181 : .record("timeline_id", field::display(timeline_id));
3182 :
3183 : self.check_permission(Some(tenant_id))?;
3184 : let command_kind = match protocol_version {
3185 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
3186 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
3187 : };
3188 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
3189 :
3190 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
3191 : .await?;
3192 : }
3193 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3194 : tenant_id,
3195 : timeline_id,
3196 : lsn,
3197 : gzip,
3198 : replica,
3199 : }) => {
3200 : tracing::Span::current()
3201 : .record("tenant_id", field::display(tenant_id))
3202 : .record("timeline_id", field::display(timeline_id));
3203 :
3204 : self.check_permission(Some(tenant_id))?;
3205 :
3206 : COMPUTE_COMMANDS_COUNTERS
3207 : .for_command(ComputeCommandKind::Basebackup)
3208 : .inc();
3209 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
3210 0 : let res = async {
3211 0 : self.handle_basebackup_request(
3212 0 : pgb,
3213 0 : tenant_id,
3214 0 : timeline_id,
3215 0 : lsn,
3216 0 : None,
3217 0 : false,
3218 0 : gzip,
3219 0 : replica,
3220 0 : &ctx,
3221 0 : )
3222 0 : .await?;
3223 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3224 0 : Result::<(), QueryError>::Ok(())
3225 0 : }
3226 : .await;
3227 : metric_recording.observe(&res);
3228 : res?;
3229 : }
3230 : // same as basebackup, but result includes relational data as well
3231 : PageServiceCmd::FullBackup(FullBackupCmd {
3232 : tenant_id,
3233 : timeline_id,
3234 : lsn,
3235 : prev_lsn,
3236 : }) => {
3237 : tracing::Span::current()
3238 : .record("tenant_id", field::display(tenant_id))
3239 : .record("timeline_id", field::display(timeline_id));
3240 :
3241 : self.check_permission(Some(tenant_id))?;
3242 :
3243 : COMPUTE_COMMANDS_COUNTERS
3244 : .for_command(ComputeCommandKind::Fullbackup)
3245 : .inc();
3246 :
3247 : // Check that the timeline exists
3248 : self.handle_basebackup_request(
3249 : pgb,
3250 : tenant_id,
3251 : timeline_id,
3252 : lsn,
3253 : prev_lsn,
3254 : true,
3255 : false,
3256 : false,
3257 : &ctx,
3258 : )
3259 : .await?;
3260 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3261 : }
3262 : PageServiceCmd::Set => {
3263 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3264 : // on connect
3265 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3266 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3267 : }
3268 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3269 : tenant_shard_id,
3270 : timeline_id,
3271 : lsn,
3272 : }) => {
3273 : tracing::Span::current()
3274 : .record("tenant_id", field::display(tenant_shard_id))
3275 : .record("timeline_id", field::display(timeline_id));
3276 :
3277 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3278 :
3279 : COMPUTE_COMMANDS_COUNTERS
3280 : .for_command(ComputeCommandKind::LeaseLsn)
3281 : .inc();
3282 :
3283 : match self
3284 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3285 : .await
3286 : {
3287 : Ok(()) => {
3288 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3289 : }
3290 : Err(e) => {
3291 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3292 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3293 : &e.to_string(),
3294 : Some(e.pg_error_code()),
3295 : ))?
3296 : }
3297 : };
3298 : }
3299 : }
3300 :
3301 : Ok(())
3302 : }
3303 : }
3304 :
3305 : /// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
3306 : ///
3307 : /// TODO: rename to PageServiceHandler when libpq impl is removed.
3308 : pub struct GrpcPageServiceHandler {
3309 : tenant_manager: Arc<TenantManager>,
3310 : ctx: RequestContext,
3311 :
3312 : /// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for
3313 : /// in-flight requests to complete. Any tasks we spawn ourselves must respect this token.
3314 : cancel: CancellationToken,
3315 :
3316 : /// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to
3317 : /// complete during shutdown. Request handlers implicitly hold this guard already.
3318 : gate_guard: GateGuard,
3319 :
3320 : /// `get_vectored` concurrency setting.
3321 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3322 : }
3323 :
3324 : impl GrpcPageServiceHandler {
3325 : /// Spawns a gRPC server for the page service.
3326 : ///
3327 : /// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for
3328 : /// any in-flight requests and tasks to complete first.
3329 : ///
3330 : /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
3331 : /// need to reimplement the TCP+TLS accept loop ourselves.
3332 0 : pub fn spawn(
3333 0 : tenant_manager: Arc<TenantManager>,
3334 0 : auth: Option<Arc<SwappableJwtAuth>>,
3335 0 : perf_trace_dispatch: Option<Dispatch>,
3336 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3337 0 : listener: std::net::TcpListener,
3338 0 : ) -> anyhow::Result<CancellableTask> {
3339 : // Set up a cancellation token for shutting down the server, and a gate to wait for all
3340 : // requests and spawned tasks to complete.
3341 0 : let cancel = CancellationToken::new();
3342 0 : let gate = Gate::default();
3343 :
3344 0 : let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
3345 0 : .download_behavior(DownloadBehavior::Download)
3346 0 : .perf_span_dispatch(perf_trace_dispatch)
3347 0 : .detached_child();
3348 :
3349 : // Set up the TCP socket. We take a preconfigured TcpListener to bind the
3350 : // port early during startup.
3351 0 : let incoming = {
3352 0 : let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
3353 0 : listener.set_nonblocking(true)?;
3354 0 : tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(
3355 0 : listener,
3356 0 : )?)
3357 0 : .with_nodelay(Some(GRPC_TCP_NODELAY))
3358 0 : .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
3359 : };
3360 :
3361 : // Set up the gRPC server.
3362 : //
3363 : // TODO: consider tuning window sizes.
3364 0 : let mut server = tonic::transport::Server::builder()
3365 0 : .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
3366 0 : .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
3367 0 : .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
3368 :
3369 : // Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
3370 : //
3371 : // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
3372 : //
3373 : // * Layers: allow async code, can run code after the service response. However, only has access
3374 : // to the raw HTTP request/response, not the gRPC types.
3375 0 : let page_service_handler = GrpcPageServiceHandler {
3376 0 : tenant_manager,
3377 0 : ctx,
3378 0 : cancel: cancel.clone(),
3379 0 : gate_guard: gate.enter().expect("gate was just created"),
3380 0 : get_vectored_concurrent_io,
3381 0 : };
3382 :
3383 0 : let observability_layer = ObservabilityLayer;
3384 0 : let mut tenant_interceptor = TenantMetadataInterceptor;
3385 0 : let mut auth_interceptor = TenantAuthInterceptor::new(auth);
3386 :
3387 0 : let page_service = tower::ServiceBuilder::new()
3388 : // Create tracing span and record request start time.
3389 0 : .layer(observability_layer)
3390 : // Intercept gRPC requests.
3391 0 : .layer(tonic::service::InterceptorLayer::new(move |mut req| {
3392 : // Extract tenant metadata.
3393 0 : req = tenant_interceptor.call(req)?;
3394 : // Authenticate tenant JWT token.
3395 0 : req = auth_interceptor.call(req)?;
3396 0 : Ok(req)
3397 0 : }))
3398 : // Run the page service.
3399 0 : .service(
3400 0 : proto::PageServiceServer::new(page_service_handler)
3401 : // Support both gzip and zstd compression. The client decides what to use.
3402 0 : .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
3403 0 : .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
3404 0 : .send_compressed(tonic::codec::CompressionEncoding::Gzip)
3405 0 : .send_compressed(tonic::codec::CompressionEncoding::Zstd),
3406 : );
3407 0 : let server = server.add_service(page_service);
3408 :
3409 : // Reflection service for use with e.g. grpcurl.
3410 0 : let reflection_service = tonic_reflection::server::Builder::configure()
3411 0 : .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
3412 0 : .build_v1()?;
3413 0 : let server = server.add_service(reflection_service);
3414 :
3415 : // Spawn server task. It runs until the cancellation token fires and in-flight requests and
3416 : // tasks complete. The `CancellableTask` will wait for the task's join handle, which
3417 : // implicitly waits for the gate to close.
3418 0 : let task_cancel = cancel.clone();
3419 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
3420 : "grpc pageservice listener",
3421 0 : async move {
3422 0 : server
3423 0 : .serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
3424 0 : .await?;
3425 : // Server exited cleanly. All requests should have completed by now. Wait for any
3426 : // spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate.
3427 0 : gate.close().await;
3428 0 : anyhow::Ok(())
3429 0 : },
3430 : ));
3431 :
3432 0 : Ok(CancellableTask { task, cancel })
3433 0 : }
3434 :
3435 : /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
3436 : /// relations and their sizes, as well as SLRU segments and similar data.
3437 : #[allow(clippy::result_large_err)]
3438 0 : fn ensure_shard_zero(timeline: &Handle<TenantManagerTypes>) -> Result<(), tonic::Status> {
3439 0 : match timeline.get_shard_index().shard_number.0 {
3440 0 : 0 => Ok(()),
3441 0 : shard => Err(tonic::Status::invalid_argument(format!(
3442 0 : "request must execute on shard zero (is shard {shard})",
3443 0 : ))),
3444 : }
3445 0 : }
3446 :
3447 : /// Generates a PagestreamRequest header from a ReadLsn and request ID.
3448 0 : fn make_hdr(
3449 0 : read_lsn: page_api::ReadLsn,
3450 0 : req_id: Option<page_api::RequestID>,
3451 0 : ) -> PagestreamRequest {
3452 : PagestreamRequest {
3453 0 : reqid: req_id.map(|r| r.id).unwrap_or_default(),
3454 0 : request_lsn: read_lsn.request_lsn,
3455 0 : not_modified_since: read_lsn
3456 0 : .not_modified_since_lsn
3457 0 : .unwrap_or(read_lsn.request_lsn),
3458 : }
3459 0 : }
3460 :
3461 : /// Acquires a timeline handle for the given request.
3462 : ///
3463 : /// TODO: during shard splits, the compute may still be sending requests to the parent shard
3464 : /// until the entire split is committed and the compute is notified. Consider installing a
3465 : /// temporary shard router from the parent to the children while the split is in progress.
3466 : ///
3467 : /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage
3468 : /// the TimelineHandles lifecycle.
3469 : ///
3470 : /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid
3471 : /// the unnecessary overhead.
3472 0 : async fn get_request_timeline(
3473 0 : &self,
3474 0 : req: &tonic::Request<impl Any>,
3475 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
3476 0 : let ttid = *extract::<TenantTimelineId>(req);
3477 0 : let shard_index = *extract::<ShardIndex>(req);
3478 0 : let shard_selector = ShardSelector::Known(shard_index);
3479 :
3480 0 : TimelineHandles::new(self.tenant_manager.clone())
3481 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3482 0 : .await
3483 0 : }
3484 :
3485 : /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
3486 : /// Only errors if the timeline is shutting down.
3487 : ///
3488 : /// TODO: move timer construction to ObservabilityLayer (see TODO there).
3489 : /// TODO: decouple rate limiting (middleware?), and return SlowDown errors instead.
3490 0 : async fn record_op_start_and_throttle(
3491 0 : timeline: &Handle<TenantManagerTypes>,
3492 0 : op: metrics::SmgrQueryType,
3493 0 : received_at: Instant,
3494 0 : ) -> Result<SmgrOpTimer, tonic::Status> {
3495 0 : let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
3496 0 : .await
3497 0 : .map_err(|err| match err {
3498 : // record_op_start_and_throttle() only returns Shutdown.
3499 0 : QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
3500 0 : err => tonic::Status::internal(format!("unexpected error: {err}")),
3501 0 : })?;
3502 0 : timer.observe_execution_start(Instant::now());
3503 0 : Ok(timer)
3504 0 : }
3505 :
3506 : /// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
3507 : ///
3508 : /// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
3509 : /// GetPageResponse with an appropriate status code to avoid terminating the stream.
3510 : ///
3511 : /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
3512 : /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
3513 : /// split them up in the client or server.
3514 : #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))]
3515 : async fn get_page(
3516 : ctx: &RequestContext,
3517 : timeline: &WeakHandle<TenantManagerTypes>,
3518 : req: proto::GetPageRequest,
3519 : io_concurrency: IoConcurrency,
3520 : ) -> Result<proto::GetPageResponse, tonic::Status> {
3521 : let received_at = Instant::now();
3522 : let timeline = timeline.upgrade()?;
3523 : let ctx = ctx.with_scope_page_service_pagestream(&timeline);
3524 :
3525 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3526 : let req = page_api::GetPageRequest::try_from(req)?;
3527 :
3528 : span_record!(
3529 : req_id = %req.request_id,
3530 : rel = %req.rel,
3531 : blkno = %req.block_numbers[0],
3532 : blks = %req.block_numbers.len(),
3533 : lsn = %req.read_lsn,
3534 : );
3535 :
3536 : for &blkno in &req.block_numbers {
3537 : let shard = timeline.get_shard_identity();
3538 : let key = rel_block_to_key(req.rel, blkno);
3539 : if !shard.is_key_local(&key) {
3540 : return Err(tonic::Status::invalid_argument(format!(
3541 : "block {blkno} of relation {} requested on wrong shard {} (is on {})",
3542 : req.rel,
3543 : timeline.get_shard_index(),
3544 : ShardIndex::new(shard.get_shard_number(&key), shard.count),
3545 : )));
3546 : }
3547 : }
3548 :
3549 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
3550 : let effective_lsn = PageServerHandler::effective_request_lsn(
3551 : &timeline,
3552 : timeline.get_last_record_lsn(),
3553 : req.read_lsn.request_lsn,
3554 : req.read_lsn
3555 : .not_modified_since_lsn
3556 : .unwrap_or(req.read_lsn.request_lsn),
3557 : &latest_gc_cutoff_lsn,
3558 : )?;
3559 :
3560 : let mut batch = SmallVec::with_capacity(req.block_numbers.len());
3561 : for blkno in req.block_numbers {
3562 : // TODO: this creates one timer per page and throttles it. We should have a timer for
3563 : // the entire batch, and throttle only the batch, but this is equivalent to what
3564 : // PageServerHandler does already so we keep it for now.
3565 : let timer = Self::record_op_start_and_throttle(
3566 : &timeline,
3567 : metrics::SmgrQueryType::GetPageAtLsn,
3568 : received_at,
3569 : )
3570 : .await?;
3571 :
3572 : batch.push(BatchedGetPageRequest {
3573 : req: PagestreamGetPageRequest {
3574 : hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)),
3575 : rel: req.rel,
3576 : blkno,
3577 : },
3578 : lsn_range: LsnRange {
3579 : effective_lsn,
3580 : request_lsn: req.read_lsn.request_lsn,
3581 : },
3582 : timer,
3583 : ctx: ctx.attached_child(),
3584 : batch_wait_ctx: None, // TODO: add tracing
3585 : });
3586 : }
3587 :
3588 : // TODO: this does a relation size query for every page in the batch. Since this batch is
3589 : // all for one relation, we could do this only once. However, this is not the case for the
3590 : // libpq implementation.
3591 : let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
3592 : &timeline,
3593 : batch,
3594 : io_concurrency,
3595 : GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
3596 : &ctx,
3597 : )
3598 : .await;
3599 :
3600 : let mut resp = page_api::GetPageResponse {
3601 : request_id: req.request_id,
3602 : status_code: page_api::GetPageStatusCode::Ok,
3603 : reason: None,
3604 : rel: req.rel,
3605 : pages: Vec::with_capacity(results.len()),
3606 : };
3607 :
3608 : for result in results {
3609 : match result {
3610 : Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page {
3611 : block_number: r.req.blkno,
3612 : image: r.page,
3613 : }),
3614 : Ok((resp, _, _)) => {
3615 : return Err(tonic::Status::internal(format!(
3616 : "unexpected response: {resp:?}"
3617 : )));
3618 : }
3619 : Err(err) => return Err(err.err.into()),
3620 : };
3621 : }
3622 :
3623 : Ok(resp.into())
3624 : }
3625 : }
3626 :
3627 : /// Implements the gRPC page service.
3628 : ///
3629 : /// On client disconnect (e.g. timeout or client shutdown), Tonic will drop the request handler
3630 : /// futures, so the read path must be cancellation-safe. On server shutdown, Tonic will wait for
3631 : /// in-flight requests to complete.
3632 : ///
3633 : /// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
3634 : #[tonic::async_trait]
3635 : impl proto::PageService for GrpcPageServiceHandler {
3636 : type GetBaseBackupStream = Pin<
3637 : Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
3638 : >;
3639 :
3640 : type GetPagesStream =
3641 : Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
3642 :
3643 : #[instrument(skip_all, fields(lsn))]
3644 : async fn get_base_backup(
3645 : &self,
3646 : req: tonic::Request<proto::GetBaseBackupRequest>,
3647 0 : ) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
3648 : // Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
3649 : // to be the sweet spot where throughput is saturated.
3650 : const CHUNK_SIZE: usize = 256 * 1024;
3651 :
3652 0 : let timeline = self.get_request_timeline(&req).await?;
3653 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
3654 :
3655 : // Validate the request and decorate the span.
3656 0 : Self::ensure_shard_zero(&timeline)?;
3657 0 : if timeline.is_archived() == Some(true) {
3658 0 : return Err(tonic::Status::failed_precondition("timeline is archived"));
3659 0 : }
3660 0 : let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
3661 :
3662 0 : span_record!(lsn=?req.lsn);
3663 :
3664 : // Wait for the LSN to arrive, if given.
3665 0 : if let Some(lsn) = req.lsn {
3666 0 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
3667 0 : timeline
3668 0 : .wait_lsn(
3669 0 : lsn,
3670 0 : WaitLsnWaiter::PageService,
3671 0 : WaitLsnTimeout::Default,
3672 0 : &ctx,
3673 0 : )
3674 0 : .await?;
3675 0 : timeline
3676 0 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
3677 0 : .map_err(|err| {
3678 0 : tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
3679 0 : })?;
3680 0 : }
3681 :
3682 : // Spawn a task to run the basebackup.
3683 0 : let span = Span::current();
3684 0 : let gate_guard = self
3685 0 : .gate_guard
3686 0 : .try_clone()
3687 0 : .map_err(|_| tonic::Status::unavailable("shutting down"))?;
3688 0 : let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
3689 0 : let jh = tokio::spawn(async move {
3690 0 : let _gate_guard = gate_guard; // keep gate open until task completes
3691 :
3692 0 : let gzip_level = match req.compression {
3693 0 : page_api::BaseBackupCompression::None => None,
3694 : // NB: using fast compression because it's on the critical path for compute
3695 : // startup. For an empty database, we get <100KB with this method. The
3696 : // Level::Best compression method gives us <20KB, but maybe we should add
3697 : // basebackup caching on compute shutdown first.
3698 0 : page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest),
3699 : };
3700 :
3701 : // Check for a cached basebackup.
3702 0 : let cached = timeline
3703 0 : .get_cached_basebackup_if_enabled(
3704 0 : req.lsn,
3705 0 : None,
3706 0 : req.full,
3707 0 : req.replica,
3708 0 : gzip_level.is_some(),
3709 0 : )
3710 0 : .await;
3711 :
3712 0 : let result = if let Some(mut cached) = cached {
3713 : // If we have a cached basebackup, send it.
3714 0 : tokio::io::copy(&mut cached, &mut simplex_write)
3715 0 : .await
3716 0 : .map(|_| ())
3717 0 : .map_err(|err| BasebackupError::Client(err, "cached,copy"))
3718 : } else {
3719 0 : basebackup::send_basebackup_tarball(
3720 0 : &mut simplex_write,
3721 0 : &timeline,
3722 0 : req.lsn,
3723 0 : None,
3724 0 : req.full,
3725 0 : req.replica,
3726 0 : gzip_level,
3727 0 : &ctx,
3728 0 : )
3729 0 : .instrument(span) // propagate request span
3730 0 : .await
3731 : };
3732 0 : simplex_write
3733 0 : .shutdown()
3734 0 : .await
3735 0 : .map_err(|err| BasebackupError::Client(err, "simplex_write"))?;
3736 0 : result
3737 0 : });
3738 :
3739 : // Emit chunks of size CHUNK_SIZE.
3740 0 : let chunks = async_stream::try_stream! {
3741 : loop {
3742 : let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
3743 : loop {
3744 0 : let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
3745 0 : tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
3746 0 : })?;
3747 : if n == 0 {
3748 : break; // full chunk or closed stream
3749 : }
3750 : }
3751 : let chunk = chunk.into_inner().freeze();
3752 : if chunk.is_empty() {
3753 : break;
3754 : }
3755 : yield proto::GetBaseBackupResponseChunk::from(chunk);
3756 : }
3757 : // Wait for the basebackup task to exit and check for errors.
3758 0 : jh.await.map_err(|err| {
3759 0 : tonic::Status::internal(format!("basebackup failed: {err}"))
3760 0 : })??;
3761 : };
3762 :
3763 0 : Ok(tonic::Response::new(Box::pin(chunks)))
3764 0 : }
3765 :
3766 : #[instrument(skip_all, fields(db_oid, lsn))]
3767 : async fn get_db_size(
3768 : &self,
3769 : req: tonic::Request<proto::GetDbSizeRequest>,
3770 0 : ) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
3771 0 : let received_at = extract::<ReceivedAt>(&req).0;
3772 0 : let timeline = self.get_request_timeline(&req).await?;
3773 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3774 :
3775 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3776 0 : Self::ensure_shard_zero(&timeline)?;
3777 0 : let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
3778 :
3779 0 : span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
3780 :
3781 0 : let req = PagestreamDbSizeRequest {
3782 0 : hdr: Self::make_hdr(req.read_lsn, None),
3783 0 : dbnode: req.db_oid,
3784 0 : };
3785 :
3786 : // Execute the request and convert the response.
3787 0 : let _timer = Self::record_op_start_and_throttle(
3788 0 : &timeline,
3789 0 : metrics::SmgrQueryType::GetDbSize,
3790 0 : received_at,
3791 0 : )
3792 0 : .await?;
3793 :
3794 0 : let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
3795 0 : let resp = resp.db_size as page_api::GetDbSizeResponse;
3796 0 : Ok(tonic::Response::new(resp.into()))
3797 0 : }
3798 :
3799 : // NB: don't instrument this, instrument each streamed request.
3800 0 : async fn get_pages(
3801 : &self,
3802 : req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
3803 0 : ) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
3804 : // Extract the timeline from the request and check that it exists.
3805 0 : let ttid = *extract::<TenantTimelineId>(&req);
3806 0 : let shard_index = *extract::<ShardIndex>(&req);
3807 0 : let shard_selector = ShardSelector::Known(shard_index);
3808 :
3809 0 : let mut handles = TimelineHandles::new(self.tenant_manager.clone());
3810 0 : handles
3811 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3812 0 : .await?;
3813 :
3814 : // Spawn an IoConcurrency sidecar, if enabled.
3815 0 : let gate_guard = self
3816 0 : .gate_guard
3817 0 : .try_clone()
3818 0 : .map_err(|_| tonic::Status::unavailable("shutting down"))?;
3819 0 : let io_concurrency =
3820 0 : IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
3821 :
3822 : // Construct the GetPageRequest stream handler.
3823 0 : let span = Span::current();
3824 0 : let ctx = self.ctx.attached_child();
3825 0 : let cancel = self.cancel.clone();
3826 0 : let mut reqs = req.into_inner();
3827 :
3828 0 : let resps = async_stream::try_stream! {
3829 : let timeline = handles
3830 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3831 : .await?
3832 : .downgrade();
3833 : loop {
3834 : // NB: Tonic considers the entire stream to be an in-flight request and will wait
3835 : // for it to complete before shutting down. React to cancellation between requests.
3836 : let req = tokio::select! {
3837 : biased;
3838 : _ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")),
3839 :
3840 : result = reqs.message() => match result {
3841 : Ok(Some(req)) => Ok(req),
3842 : Ok(None) => break, // client closed the stream
3843 : Err(err) => Err(err),
3844 : },
3845 : }?;
3846 : let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
3847 : let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
3848 : .instrument(span.clone()) // propagate request span
3849 : .await;
3850 : yield match result {
3851 : Ok(resp) => resp,
3852 : // Convert per-request errors to GetPageResponses as appropriate, or terminate
3853 : // the stream with a tonic::Status. Log the error regardless, since
3854 : // ObservabilityLayer can't automatically log stream errors.
3855 : Err(status) => {
3856 : // TODO: it would be nice if we could propagate the get_page() fields here.
3857 0 : span.in_scope(|| {
3858 0 : warn!("request failed with {:?}: {}", status.code(), status.message());
3859 0 : });
3860 : page_api::GetPageResponse::try_from_status(status, req_id)?.into()
3861 : }
3862 : }
3863 : }
3864 : };
3865 :
3866 0 : Ok(tonic::Response::new(Box::pin(resps)))
3867 0 : }
3868 :
3869 : #[instrument(skip_all, fields(rel, lsn, allow_missing))]
3870 : async fn get_rel_size(
3871 : &self,
3872 : req: tonic::Request<proto::GetRelSizeRequest>,
3873 0 : ) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
3874 0 : let received_at = extract::<ReceivedAt>(&req).0;
3875 0 : let timeline = self.get_request_timeline(&req).await?;
3876 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3877 :
3878 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3879 0 : Self::ensure_shard_zero(&timeline)?;
3880 0 : let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
3881 0 : let allow_missing = req.allow_missing;
3882 :
3883 0 : span_record!(rel=%req.rel, lsn=%req.read_lsn, allow_missing=%req.allow_missing);
3884 :
3885 0 : let req = PagestreamNblocksRequest {
3886 0 : hdr: Self::make_hdr(req.read_lsn, None),
3887 0 : rel: req.rel,
3888 0 : };
3889 :
3890 : // Execute the request and convert the response.
3891 0 : let _timer = Self::record_op_start_and_throttle(
3892 0 : &timeline,
3893 0 : metrics::SmgrQueryType::GetRelSize,
3894 0 : received_at,
3895 0 : )
3896 0 : .await?;
3897 :
3898 0 : let resp =
3899 0 : PageServerHandler::handle_get_nblocks_request(&timeline, &req, allow_missing, &ctx)
3900 0 : .await?;
3901 0 : let resp: page_api::GetRelSizeResponse = resp.map(|resp| resp.n_blocks);
3902 :
3903 0 : Ok(tonic::Response::new(resp.into()))
3904 0 : }
3905 :
3906 : #[instrument(skip_all, fields(kind, segno, lsn))]
3907 : async fn get_slru_segment(
3908 : &self,
3909 : req: tonic::Request<proto::GetSlruSegmentRequest>,
3910 0 : ) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
3911 0 : let received_at = extract::<ReceivedAt>(&req).0;
3912 0 : let timeline = self.get_request_timeline(&req).await?;
3913 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3914 :
3915 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3916 0 : Self::ensure_shard_zero(&timeline)?;
3917 0 : let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
3918 :
3919 0 : span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
3920 :
3921 0 : let req = PagestreamGetSlruSegmentRequest {
3922 0 : hdr: Self::make_hdr(req.read_lsn, None),
3923 0 : kind: req.kind as u8,
3924 0 : segno: req.segno,
3925 0 : };
3926 :
3927 : // Execute the request and convert the response.
3928 0 : let _timer = Self::record_op_start_and_throttle(
3929 0 : &timeline,
3930 0 : metrics::SmgrQueryType::GetSlruSegment,
3931 0 : received_at,
3932 0 : )
3933 0 : .await?;
3934 :
3935 0 : let resp =
3936 0 : PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
3937 0 : let resp: page_api::GetSlruSegmentResponse = resp.segment;
3938 0 : Ok(tonic::Response::new(resp.into()))
3939 0 : }
3940 :
3941 : #[instrument(skip_all, fields(lsn))]
3942 : async fn lease_lsn(
3943 : &self,
3944 : req: tonic::Request<proto::LeaseLsnRequest>,
3945 0 : ) -> Result<tonic::Response<proto::LeaseLsnResponse>, tonic::Status> {
3946 0 : let timeline = self.get_request_timeline(&req).await?;
3947 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
3948 :
3949 : // Validate and convert the request, and decorate the span.
3950 0 : let req: page_api::LeaseLsnRequest = req.into_inner().try_into()?;
3951 :
3952 0 : span_record!(lsn=%req.lsn);
3953 :
3954 : // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
3955 0 : let lease_length = timeline.get_lsn_lease_length();
3956 0 : let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) {
3957 0 : Ok(lease) => lease.valid_until,
3958 0 : Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
3959 : };
3960 :
3961 : // TODO: is this spammy? Move it compute-side?
3962 0 : info!(
3963 0 : "acquired lease for {} until {}",
3964 : req.lsn,
3965 0 : chrono::DateTime::<Utc>::from(expires).to_rfc3339()
3966 : );
3967 :
3968 0 : Ok(tonic::Response::new(expires.into()))
3969 0 : }
3970 : }
3971 :
3972 : /// gRPC middleware layer that handles observability concerns:
3973 : ///
3974 : /// * Creates and enters a tracing span.
3975 : /// * Records the request start time as a ReceivedAt request extension.
3976 : ///
3977 : /// TODO: add perf tracing.
3978 : /// TODO: add timing and metrics.
3979 : /// TODO: add logging.
3980 : #[derive(Clone)]
3981 : struct ObservabilityLayer;
3982 :
3983 : impl<S: tonic::server::NamedService> tower::Layer<S> for ObservabilityLayer {
3984 : type Service = ObservabilityLayerService<S>;
3985 :
3986 0 : fn layer(&self, inner: S) -> Self::Service {
3987 0 : Self::Service { inner }
3988 0 : }
3989 : }
3990 :
3991 : #[derive(Clone)]
3992 : struct ObservabilityLayerService<S> {
3993 : inner: S,
3994 : }
3995 :
3996 : #[derive(Clone, Copy)]
3997 : struct ReceivedAt(Instant);
3998 :
3999 : impl<S: tonic::server::NamedService> tonic::server::NamedService for ObservabilityLayerService<S> {
4000 : const NAME: &'static str = S::NAME; // propagate inner service name
4001 : }
4002 :
4003 : impl<S, Req, Resp> tower::Service<http::Request<Req>> for ObservabilityLayerService<S>
4004 : where
4005 : S: tower::Service<http::Request<Req>, Response = http::Response<Resp>> + Send,
4006 : S::Future: Send + 'static,
4007 : {
4008 : type Response = S::Response;
4009 : type Error = S::Error;
4010 : type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
4011 :
4012 0 : fn call(&mut self, mut req: http::Request<Req>) -> Self::Future {
4013 : // Record the request start time as a request extension.
4014 : //
4015 : // TODO: we should start a timer here instead, but it currently requires a timeline handle
4016 : // and SmgrQueryType, which we don't have yet. Refactor it to provide it later.
4017 0 : req.extensions_mut().insert(ReceivedAt(Instant::now()));
4018 :
4019 : // Extract the peer address and gRPC method.
4020 0 : let peer = req
4021 0 : .extensions()
4022 0 : .get::<TcpConnectInfo>()
4023 0 : .and_then(|info| info.remote_addr())
4024 0 : .map(|addr| addr.to_string())
4025 0 : .unwrap_or_default();
4026 :
4027 0 : let method = req
4028 0 : .uri()
4029 0 : .path()
4030 0 : .split('/')
4031 0 : .nth(2)
4032 0 : .unwrap_or(req.uri().path())
4033 0 : .to_string();
4034 :
4035 : // Create a basic tracing span.
4036 : //
4037 : // Enter the span for the current thread and instrument the future. It is not sufficient to
4038 : // only instrument the future, since it only takes effect after the future is returned and
4039 : // polled, not when the inner service is called below (e.g. during interceptor execution).
4040 0 : let span = info_span!(
4041 : "grpc:pageservice",
4042 : // These will be populated by TenantMetadataInterceptor.
4043 : tenant_id = field::Empty,
4044 : timeline_id = field::Empty,
4045 : shard_id = field::Empty,
4046 : // NB: empty fields must be listed first above. Otherwise, the field names will be
4047 : // clobbered when the empty fields are populated. They will be output last regardless.
4048 : %peer,
4049 : %method,
4050 : );
4051 0 : let _guard = span.enter();
4052 :
4053 : // Construct a future for calling the inner service, but don't await it. This avoids having
4054 : // to clone the inner service into the future below.
4055 0 : let call = self.inner.call(req);
4056 :
4057 0 : async move {
4058 : // Await the inner service call.
4059 0 : let result = call.await;
4060 :
4061 : // Log gRPC error statuses. This won't include request info from handler spans, but it
4062 : // will catch all errors (even those emitted before handler spans are constructed). Only
4063 : // unary request errors are logged here, not streaming response errors.
4064 0 : if let Ok(ref resp) = result
4065 0 : && let Some(status) = tonic::Status::from_header_map(resp.headers())
4066 0 : && status.code() != tonic::Code::Ok
4067 : {
4068 : // TODO: it would be nice if we could propagate the handler span's request fields
4069 : // here. This could e.g. be done by attaching the request fields to
4070 : // tonic::Status::metadata via a proc macro.
4071 0 : warn!(
4072 0 : "request failed with {:?}: {}",
4073 0 : status.code(),
4074 0 : status.message()
4075 : );
4076 0 : }
4077 :
4078 0 : result
4079 0 : }
4080 0 : .instrument(span.clone())
4081 0 : .boxed()
4082 0 : }
4083 :
4084 0 : fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
4085 0 : self.inner.poll_ready(cx)
4086 0 : }
4087 : }
4088 :
4089 : /// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
4090 : /// TenantTimelineId and ShardIndex.
4091 : #[derive(Clone)]
4092 : struct TenantMetadataInterceptor;
4093 :
4094 : impl tonic::service::Interceptor for TenantMetadataInterceptor {
4095 0 : fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
4096 : // Decode the tenant ID.
4097 0 : let tenant_id = req
4098 0 : .metadata()
4099 0 : .get("neon-tenant-id")
4100 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
4101 0 : .to_str()
4102 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
4103 0 : let tenant_id = TenantId::from_str(tenant_id)
4104 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
4105 :
4106 : // Decode the timeline ID.
4107 0 : let timeline_id = req
4108 0 : .metadata()
4109 0 : .get("neon-timeline-id")
4110 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
4111 0 : .to_str()
4112 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
4113 0 : let timeline_id = TimelineId::from_str(timeline_id)
4114 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
4115 :
4116 : // Decode the shard ID.
4117 0 : let shard_id = req
4118 0 : .metadata()
4119 0 : .get("neon-shard-id")
4120 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
4121 0 : .to_str()
4122 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
4123 0 : let shard_id = ShardIndex::from_str(shard_id)
4124 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
4125 :
4126 : // Stash them in the request.
4127 0 : let extensions = req.extensions_mut();
4128 0 : extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
4129 0 : extensions.insert(shard_id);
4130 :
4131 : // Decorate the tracing span.
4132 0 : span_record!(%tenant_id, %timeline_id, %shard_id);
4133 :
4134 0 : Ok(req)
4135 0 : }
4136 : }
4137 :
4138 : /// Authenticates gRPC page service requests.
4139 : #[derive(Clone)]
4140 : struct TenantAuthInterceptor {
4141 : auth: Option<Arc<SwappableJwtAuth>>,
4142 : }
4143 :
4144 : impl TenantAuthInterceptor {
4145 0 : fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
4146 0 : Self { auth }
4147 0 : }
4148 : }
4149 :
4150 : impl tonic::service::Interceptor for TenantAuthInterceptor {
4151 0 : fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
4152 : // Do nothing if auth is disabled.
4153 0 : let Some(auth) = self.auth.as_ref() else {
4154 0 : return Ok(req);
4155 : };
4156 :
4157 : // Fetch the tenant ID from the request extensions (set by TenantMetadataInterceptor).
4158 0 : let TenantTimelineId { tenant_id, .. } = *extract::<TenantTimelineId>(&req);
4159 :
4160 : // Fetch and decode the JWT token.
4161 0 : let jwt = req
4162 0 : .metadata()
4163 0 : .get("authorization")
4164 0 : .ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
4165 0 : .to_str()
4166 0 : .map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
4167 0 : .strip_prefix("Bearer ")
4168 0 : .ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
4169 0 : .trim();
4170 0 : let jwtdata: TokenData<Claims> = auth
4171 0 : .decode(jwt)
4172 0 : .map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
4173 0 : let claims = jwtdata.claims;
4174 :
4175 : // Check if the token is valid for this tenant.
4176 0 : check_permission(&claims, Some(tenant_id))
4177 0 : .map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
4178 :
4179 : // TODO: consider stashing the claims in the request extensions, if needed.
4180 :
4181 0 : Ok(req)
4182 0 : }
4183 : }
4184 :
4185 : /// Extracts the given type from the request extensions, or panics if it is missing.
4186 0 : fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
4187 0 : extract_from(req.extensions())
4188 0 : }
4189 :
4190 : /// Extract the given type from the request extensions, or panics if it is missing. This variant
4191 : /// can extract both from a tonic::Request and http::Request.
4192 0 : fn extract_from<T: Send + Sync + 'static>(ext: &http::Extensions) -> &T {
4193 0 : let Some(value) = ext.get::<T>() else {
4194 0 : let name = std::any::type_name::<T>();
4195 0 : panic!("extension {name} should be set by middleware");
4196 : };
4197 0 : value
4198 0 : }
4199 :
4200 : #[derive(Debug, thiserror::Error)]
4201 : pub(crate) enum GetActiveTimelineError {
4202 : #[error(transparent)]
4203 : Tenant(GetActiveTenantError),
4204 : #[error(transparent)]
4205 : Timeline(#[from] GetTimelineError),
4206 : }
4207 :
4208 : impl From<GetActiveTimelineError> for QueryError {
4209 0 : fn from(e: GetActiveTimelineError) -> Self {
4210 0 : match e {
4211 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
4212 0 : GetActiveTimelineError::Tenant(e) => e.into(),
4213 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
4214 : }
4215 0 : }
4216 : }
4217 :
4218 : impl From<GetActiveTimelineError> for tonic::Status {
4219 0 : fn from(err: GetActiveTimelineError) -> Self {
4220 0 : let message = err.to_string();
4221 0 : let code = match err {
4222 0 : GetActiveTimelineError::Tenant(err) => tonic::Status::from(err).code(),
4223 0 : GetActiveTimelineError::Timeline(err) => tonic::Status::from(err).code(),
4224 : };
4225 0 : tonic::Status::new(code, message)
4226 0 : }
4227 : }
4228 :
4229 : impl From<GetTimelineError> for tonic::Status {
4230 0 : fn from(err: GetTimelineError) -> Self {
4231 : use tonic::Code;
4232 0 : let code = match &err {
4233 0 : GetTimelineError::NotFound { .. } => Code::NotFound,
4234 0 : GetTimelineError::NotActive { .. } => Code::Unavailable,
4235 0 : GetTimelineError::ShuttingDown => Code::Unavailable,
4236 : };
4237 0 : tonic::Status::new(code, err.to_string())
4238 0 : }
4239 : }
4240 :
4241 : impl From<GetActiveTenantError> for QueryError {
4242 0 : fn from(e: GetActiveTenantError) -> Self {
4243 0 : match e {
4244 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
4245 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
4246 0 : ),
4247 : GetActiveTenantError::Cancelled
4248 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
4249 0 : QueryError::Shutdown
4250 : }
4251 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
4252 0 : e => QueryError::Other(anyhow::anyhow!(e)),
4253 : }
4254 0 : }
4255 : }
4256 :
4257 : impl From<GetActiveTenantError> for tonic::Status {
4258 0 : fn from(err: GetActiveTenantError) -> Self {
4259 : use tonic::Code;
4260 0 : let code = match &err {
4261 0 : GetActiveTenantError::Broken(_) => Code::Internal,
4262 0 : GetActiveTenantError::Cancelled => Code::Unavailable,
4263 0 : GetActiveTenantError::NotFound(_) => Code::NotFound,
4264 0 : GetActiveTenantError::SwitchedTenant => Code::Unavailable,
4265 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
4266 0 : GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
4267 : };
4268 0 : tonic::Status::new(code, err.to_string())
4269 0 : }
4270 : }
4271 :
4272 : impl From<HandleUpgradeError> for QueryError {
4273 0 : fn from(e: HandleUpgradeError) -> Self {
4274 0 : match e {
4275 0 : HandleUpgradeError::ShutDown => QueryError::Shutdown,
4276 : }
4277 0 : }
4278 : }
4279 :
4280 : impl From<HandleUpgradeError> for tonic::Status {
4281 0 : fn from(err: HandleUpgradeError) -> Self {
4282 0 : match err {
4283 0 : HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
4284 : }
4285 0 : }
4286 : }
4287 :
4288 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
4289 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
4290 0 : tracing::Span::current().record(
4291 0 : "shard_id",
4292 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
4293 : );
4294 0 : debug_assert_current_span_has_tenant_and_timeline_id();
4295 0 : }
4296 :
4297 : struct WaitedForLsn(Lsn);
4298 : impl From<WaitedForLsn> for Lsn {
4299 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
4300 0 : lsn
4301 0 : }
4302 : }
4303 :
4304 : #[cfg(test)]
4305 : mod tests {
4306 : use utils::shard::ShardCount;
4307 :
4308 : use super::*;
4309 :
4310 : #[test]
4311 1 : fn pageservice_cmd_parse() {
4312 1 : let tenant_id = TenantId::generate();
4313 1 : let timeline_id = TimelineId::generate();
4314 1 : let cmd =
4315 1 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
4316 1 : assert_eq!(
4317 : cmd,
4318 1 : PageServiceCmd::PageStream(PageStreamCmd {
4319 1 : tenant_id,
4320 1 : timeline_id,
4321 1 : protocol_version: PagestreamProtocolVersion::V2,
4322 1 : })
4323 : );
4324 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
4325 1 : assert_eq!(
4326 : cmd,
4327 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4328 1 : tenant_id,
4329 1 : timeline_id,
4330 1 : lsn: None,
4331 1 : gzip: false,
4332 1 : replica: false
4333 1 : })
4334 : );
4335 1 : let cmd =
4336 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
4337 1 : assert_eq!(
4338 : cmd,
4339 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4340 1 : tenant_id,
4341 1 : timeline_id,
4342 1 : lsn: None,
4343 1 : gzip: true,
4344 1 : replica: false
4345 1 : })
4346 : );
4347 1 : let cmd =
4348 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
4349 1 : assert_eq!(
4350 : cmd,
4351 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4352 1 : tenant_id,
4353 1 : timeline_id,
4354 1 : lsn: None,
4355 1 : gzip: false,
4356 1 : replica: false
4357 1 : })
4358 : );
4359 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
4360 1 : .unwrap();
4361 1 : assert_eq!(
4362 : cmd,
4363 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4364 1 : tenant_id,
4365 1 : timeline_id,
4366 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4367 1 : gzip: false,
4368 1 : replica: false
4369 1 : })
4370 : );
4371 1 : let cmd = PageServiceCmd::parse(&format!(
4372 1 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
4373 1 : ))
4374 1 : .unwrap();
4375 1 : assert_eq!(
4376 : cmd,
4377 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4378 1 : tenant_id,
4379 1 : timeline_id,
4380 1 : lsn: None,
4381 1 : gzip: true,
4382 1 : replica: true
4383 1 : })
4384 : );
4385 1 : let cmd = PageServiceCmd::parse(&format!(
4386 1 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
4387 1 : ))
4388 1 : .unwrap();
4389 1 : assert_eq!(
4390 : cmd,
4391 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4392 1 : tenant_id,
4393 1 : timeline_id,
4394 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4395 1 : gzip: true,
4396 1 : replica: true
4397 1 : })
4398 : );
4399 1 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
4400 1 : assert_eq!(
4401 : cmd,
4402 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4403 1 : tenant_id,
4404 1 : timeline_id,
4405 1 : lsn: None,
4406 1 : prev_lsn: None
4407 1 : })
4408 : );
4409 1 : let cmd = PageServiceCmd::parse(&format!(
4410 1 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
4411 1 : ))
4412 1 : .unwrap();
4413 1 : assert_eq!(
4414 : cmd,
4415 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4416 1 : tenant_id,
4417 1 : timeline_id,
4418 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4419 1 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
4420 1 : })
4421 : );
4422 1 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
4423 1 : let cmd = PageServiceCmd::parse(&format!(
4424 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4425 1 : ))
4426 1 : .unwrap();
4427 1 : assert_eq!(
4428 : cmd,
4429 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4430 1 : tenant_shard_id,
4431 1 : timeline_id,
4432 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4433 1 : })
4434 : );
4435 1 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
4436 1 : let cmd = PageServiceCmd::parse(&format!(
4437 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4438 1 : ))
4439 1 : .unwrap();
4440 1 : assert_eq!(
4441 : cmd,
4442 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4443 1 : tenant_shard_id,
4444 1 : timeline_id,
4445 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4446 1 : })
4447 : );
4448 1 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
4449 1 : assert_eq!(cmd, PageServiceCmd::Set);
4450 1 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
4451 1 : assert_eq!(cmd, PageServiceCmd::Set);
4452 1 : }
4453 :
4454 : #[test]
4455 1 : fn pageservice_cmd_err_handling() {
4456 1 : let tenant_id = TenantId::generate();
4457 1 : let timeline_id = TimelineId::generate();
4458 1 : let cmd = PageServiceCmd::parse("unknown_command");
4459 1 : assert!(cmd.is_err());
4460 1 : let cmd = PageServiceCmd::parse("pagestream_v2");
4461 1 : assert!(cmd.is_err());
4462 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
4463 1 : assert!(cmd.is_err());
4464 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
4465 1 : assert!(cmd.is_err());
4466 1 : let cmd = PageServiceCmd::parse(&format!(
4467 1 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
4468 1 : ));
4469 1 : assert!(cmd.is_err());
4470 1 : let cmd = PageServiceCmd::parse(&format!(
4471 1 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
4472 1 : ));
4473 1 : assert!(cmd.is_err());
4474 1 : let cmd = PageServiceCmd::parse(&format!(
4475 1 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
4476 1 : ));
4477 1 : assert!(cmd.is_err());
4478 1 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
4479 1 : assert!(cmd.is_err());
4480 1 : }
4481 :
4482 : #[test]
4483 1 : fn test_parse_options() {
4484 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
4485 1 : assert!(!has_error);
4486 1 : assert_eq!(
4487 : config,
4488 1 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
4489 : );
4490 :
4491 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
4492 1 : assert!(!has_error);
4493 1 : assert_eq!(
4494 : config,
4495 1 : vec![
4496 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4497 1 : ("foo".to_string(), "bar".to_string()),
4498 : ]
4499 : );
4500 :
4501 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
4502 1 : assert!(!has_error);
4503 1 : assert_eq!(
4504 : config,
4505 1 : vec![
4506 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4507 1 : ("foo".to_string(), "bar".to_string()),
4508 : ]
4509 : );
4510 :
4511 1 : let (_, has_error) = parse_options("-c");
4512 1 : assert!(has_error);
4513 :
4514 1 : let (_, has_error) = parse_options("-c foo=bar -c -c");
4515 1 : assert!(has_error);
4516 :
4517 1 : let (_, has_error) = parse_options(" ");
4518 1 : assert!(!has_error);
4519 :
4520 1 : let (_, has_error) = parse_options(" -c neon.compute_mode");
4521 1 : assert!(has_error);
4522 1 : }
4523 : }
|