Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::any::Any;
5 : use std::borrow::Cow;
6 : use std::num::NonZeroUsize;
7 : use std::os::fd::AsRawFd;
8 : use std::pin::Pin;
9 : use std::str::FromStr;
10 : use std::sync::Arc;
11 : use std::task::{Context, Poll};
12 : use std::time::{Duration, Instant, SystemTime};
13 : use std::{io, str};
14 :
15 : use anyhow::{Context as _, bail};
16 : use bytes::{Buf as _, BufMut as _, BytesMut};
17 : use chrono::Utc;
18 : use futures::future::BoxFuture;
19 : use futures::{FutureExt, Stream};
20 : use itertools::Itertools;
21 : use jsonwebtoken::TokenData;
22 : use once_cell::sync::OnceCell;
23 : use pageserver_api::config::{
24 : GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
25 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
26 : };
27 : use pageserver_api::key::rel_block_to_key;
28 : use pageserver_api::models::{PageTraceEvent, TenantState};
29 : use pageserver_api::pagestream_api::{
30 : self, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
31 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
32 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
33 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
34 : PagestreamProtocolVersion, PagestreamRequest,
35 : };
36 : use pageserver_api::reltag::SlruKind;
37 : use pageserver_api::shard::TenantShardId;
38 : use pageserver_page_api as page_api;
39 : use pageserver_page_api::proto;
40 : use postgres_backend::{
41 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
42 : };
43 : use postgres_ffi::BLCKSZ;
44 : use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
45 : use pq_proto::framed::ConnectionError;
46 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
47 : use smallvec::{SmallVec, smallvec};
48 : use strum_macros::IntoStaticStr;
49 : use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, BufWriter};
50 : use tokio::task::JoinHandle;
51 : use tokio_util::sync::CancellationToken;
52 : use tonic::service::Interceptor as _;
53 : use tonic::transport::server::TcpConnectInfo;
54 : use tracing::*;
55 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
56 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
57 : use utils::logging::log_slow;
58 : use utils::lsn::Lsn;
59 : use utils::shard::ShardIndex;
60 : use utils::simple_rcu::RcuReadGuard;
61 : use utils::sync::gate::{Gate, GateGuard};
62 : use utils::sync::spsc_fold;
63 : use utils::{failpoint_support, span_record};
64 :
65 : use crate::auth::check_permission;
66 : use crate::basebackup::{self, BasebackupError};
67 : use crate::config::PageServerConf;
68 : use crate::context::{
69 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
70 : };
71 : use crate::metrics::{
72 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
73 : MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
74 : };
75 : use crate::pgdatadir_mapping::{LsnRange, Version};
76 : use crate::span::{
77 : debug_assert_current_span_has_tenant_and_timeline_id,
78 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
79 : };
80 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
81 : use crate::tenant::mgr::{
82 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
83 : };
84 : use crate::tenant::storage_layer::IoConcurrency;
85 : use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle};
86 : use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter};
87 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
88 : use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
89 :
90 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
91 : /// is not yet in state [`TenantState::Active`].
92 : ///
93 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
94 : /// HADRON: reduced timeout and we will retry in Cache::get().
95 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
96 :
97 : /// Threshold at which to log slow GetPage requests.
98 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
99 :
100 : /// The idle time before sending TCP keepalive probes for gRPC connections. The
101 : /// interval and timeout between each probe is configured via sysctl. This
102 : /// allows detecting dead connections sooner.
103 : const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
104 :
105 : /// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
106 : /// algorithm, which can cause latency spikes for small messages.
107 : const GRPC_TCP_NODELAY: bool = true;
108 :
109 : /// The interval between HTTP2 keepalive pings. This allows shutting down server
110 : /// tasks when clients are unresponsive.
111 : const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
112 :
113 : /// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
114 : const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
115 :
116 : /// Number of concurrent gRPC streams per TCP connection. We expect something
117 : /// like 8 GetPage streams per connections, plus any unary requests.
118 : const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
119 :
120 : ///////////////////////////////////////////////////////////////////////////////
121 :
122 : pub struct Listener {
123 : cancel: CancellationToken,
124 : /// Cancel the listener task through `listen_cancel` to shut down the listener
125 : /// and get a handle on the existing connections.
126 : task: JoinHandle<Connections>,
127 : }
128 :
129 : pub struct Connections {
130 : cancel: CancellationToken,
131 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
132 : gate: Gate,
133 : }
134 :
135 0 : pub fn spawn(
136 0 : conf: &'static PageServerConf,
137 0 : tenant_manager: Arc<TenantManager>,
138 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
139 0 : perf_trace_dispatch: Option<Dispatch>,
140 0 : tcp_listener: tokio::net::TcpListener,
141 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
142 0 : ) -> Listener {
143 0 : let cancel = CancellationToken::new();
144 0 : let libpq_ctx = RequestContext::todo_child(
145 0 : TaskKind::LibpqEndpointListener,
146 : // listener task shouldn't need to download anything. (We will
147 : // create a separate sub-contexts for each connection, with their
148 : // own download behavior. This context is used only to listen and
149 : // accept connections.)
150 0 : DownloadBehavior::Error,
151 : );
152 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
153 0 : "libpq listener",
154 0 : libpq_listener_main(
155 0 : conf,
156 0 : tenant_manager,
157 0 : pg_auth,
158 0 : perf_trace_dispatch,
159 0 : tcp_listener,
160 0 : conf.pg_auth_type,
161 0 : tls_config,
162 0 : conf.page_service_pipelining.clone(),
163 0 : libpq_ctx,
164 0 : cancel.clone(),
165 0 : )
166 0 : .map(anyhow::Ok),
167 0 : ));
168 :
169 0 : Listener { cancel, task }
170 0 : }
171 :
172 : impl Listener {
173 0 : pub async fn stop_accepting(self) -> Connections {
174 0 : self.cancel.cancel();
175 0 : self.task
176 0 : .await
177 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
178 0 : }
179 : }
180 : impl Connections {
181 0 : pub(crate) async fn shutdown(self) {
182 : let Self {
183 0 : cancel,
184 0 : mut tasks,
185 0 : gate,
186 0 : } = self;
187 0 : cancel.cancel();
188 0 : while let Some(res) = tasks.join_next().await {
189 0 : Self::handle_connection_completion(res);
190 0 : }
191 0 : gate.close().await;
192 0 : }
193 :
194 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
195 0 : match res {
196 0 : Ok(Ok(())) => {}
197 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
198 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
199 : }
200 0 : }
201 : }
202 :
203 : ///
204 : /// Main loop of the page service.
205 : ///
206 : /// Listens for connections, and launches a new handler task for each.
207 : ///
208 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
209 : /// open connections.
210 : ///
211 : #[allow(clippy::too_many_arguments)]
212 0 : pub async fn libpq_listener_main(
213 0 : conf: &'static PageServerConf,
214 0 : tenant_manager: Arc<TenantManager>,
215 0 : auth: Option<Arc<SwappableJwtAuth>>,
216 0 : perf_trace_dispatch: Option<Dispatch>,
217 0 : listener: tokio::net::TcpListener,
218 0 : auth_type: AuthType,
219 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
220 0 : pipelining_config: PageServicePipeliningConfig,
221 0 : listener_ctx: RequestContext,
222 0 : listener_cancel: CancellationToken,
223 0 : ) -> Connections {
224 0 : let connections_cancel = CancellationToken::new();
225 0 : let connections_gate = Gate::default();
226 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
227 :
228 : loop {
229 0 : let gate_guard = match connections_gate.enter() {
230 0 : Ok(guard) => guard,
231 0 : Err(_) => break,
232 : };
233 :
234 0 : let accepted = tokio::select! {
235 : biased;
236 0 : _ = listener_cancel.cancelled() => break,
237 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
238 0 : let res = next.expect("we dont poll while empty");
239 0 : Connections::handle_connection_completion(res);
240 0 : continue;
241 : }
242 0 : accepted = listener.accept() => accepted,
243 : };
244 :
245 0 : match accepted {
246 0 : Ok((socket, peer_addr)) => {
247 : // Connection established. Spawn a new task to handle it.
248 0 : debug!("accepted connection from {}", peer_addr);
249 0 : let local_auth = auth.clone();
250 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
251 0 : .task_kind(TaskKind::PageRequestHandler)
252 0 : .download_behavior(DownloadBehavior::Download)
253 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
254 0 : .detached_child();
255 :
256 0 : connection_handler_tasks.spawn(page_service_conn_main(
257 0 : conf,
258 0 : tenant_manager.clone(),
259 0 : local_auth,
260 0 : socket,
261 0 : auth_type,
262 0 : tls_config.clone(),
263 0 : pipelining_config.clone(),
264 0 : connection_ctx,
265 0 : connections_cancel.child_token(),
266 0 : gate_guard,
267 : ));
268 : }
269 0 : Err(err) => {
270 : // accept() failed. Log the error, and loop back to retry on next connection.
271 0 : error!("accept() failed: {:?}", err);
272 : }
273 : }
274 : }
275 :
276 0 : debug!("page_service listener loop terminated");
277 :
278 0 : Connections {
279 0 : cancel: connections_cancel,
280 0 : tasks: connection_handler_tasks,
281 0 : gate: connections_gate,
282 0 : }
283 0 : }
284 :
285 : type ConnectionHandlerResult = anyhow::Result<()>;
286 :
287 : /// Perf root spans start at the per-request level, after shard routing.
288 : /// This struct carries connection-level information to the root perf span definition.
289 : #[derive(Clone, Default)]
290 : struct ConnectionPerfSpanFields {
291 : peer_addr: String,
292 : application_name: Option<String>,
293 : compute_mode: Option<String>,
294 : }
295 :
296 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
297 : #[allow(clippy::too_many_arguments)]
298 : async fn page_service_conn_main(
299 : conf: &'static PageServerConf,
300 : tenant_manager: Arc<TenantManager>,
301 : auth: Option<Arc<SwappableJwtAuth>>,
302 : socket: tokio::net::TcpStream,
303 : auth_type: AuthType,
304 : tls_config: Option<Arc<rustls::ServerConfig>>,
305 : pipelining_config: PageServicePipeliningConfig,
306 : connection_ctx: RequestContext,
307 : cancel: CancellationToken,
308 : gate_guard: GateGuard,
309 : ) -> ConnectionHandlerResult {
310 : let _guard = LIVE_CONNECTIONS
311 : .with_label_values(&["page_service"])
312 : .guard();
313 :
314 : socket
315 : .set_nodelay(true)
316 : .context("could not set TCP_NODELAY")?;
317 :
318 : let socket_fd = socket.as_raw_fd();
319 :
320 : let peer_addr = socket.peer_addr().context("get peer address")?;
321 :
322 : let perf_span_fields = ConnectionPerfSpanFields {
323 : peer_addr: peer_addr.to_string(),
324 : application_name: None, // filled in later
325 : compute_mode: None, // filled in later
326 : };
327 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
328 :
329 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
330 : // - long enough for most valid compute connections
331 : // - less than infinite to stop us from "leaking" connections to long-gone computes
332 : //
333 : // no write timeout is used, because the kernel is assumed to error writes after some time.
334 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
335 :
336 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
337 0 : let socket_timeout_ms = (|| {
338 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
339 : // Exponential distribution for simulating
340 : // poor network conditions, expect about avg_timeout_ms to be around 15
341 : // in tests
342 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
343 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
344 0 : let u = rand::random::<f32>();
345 0 : ((1.0 - u).ln() / (-avg)) as u64
346 : } else {
347 0 : default_timeout_ms
348 : }
349 0 : });
350 0 : default_timeout_ms
351 : })();
352 :
353 : // A timeout here does not mean the client died, it can happen if it's just idle for
354 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
355 : // they reconnect.
356 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
357 : let socket = Box::pin(socket);
358 :
359 : fail::fail_point!("ps::connection-start::pre-login");
360 :
361 : // XXX: pgbackend.run() should take the connection_ctx,
362 : // and create a child per-query context when it invokes process_query.
363 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
364 : // and create the per-query context in process_query ourselves.
365 : let mut conn_handler = PageServerHandler::new(
366 : tenant_manager,
367 : auth,
368 : pipelining_config,
369 : conf.get_vectored_concurrent_io,
370 : perf_span_fields,
371 : connection_ctx,
372 : cancel.clone(),
373 : gate_guard,
374 : );
375 : let pgbackend =
376 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
377 :
378 : match pgbackend.run(&mut conn_handler, &cancel).await {
379 : Ok(()) => {
380 : // we've been requested to shut down
381 : Ok(())
382 : }
383 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
384 : if is_expected_io_error(&io_error) {
385 : info!("Postgres client disconnected ({io_error})");
386 : Ok(())
387 : } else {
388 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
389 : Err(io_error).context(format!(
390 : "Postgres connection error for tenant_id={tenant_id:?} client at peer_addr={peer_addr}"
391 : ))
392 : }
393 : }
394 : other => {
395 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
396 : other.context(format!(
397 : "Postgres query error for tenant_id={tenant_id:?} client peer_addr={peer_addr}"
398 : ))
399 : }
400 : }
401 : }
402 :
403 : /// Page service connection handler.
404 : struct PageServerHandler {
405 : auth: Option<Arc<SwappableJwtAuth>>,
406 : claims: Option<Claims>,
407 :
408 : /// The context created for the lifetime of the connection
409 : /// services by this PageServerHandler.
410 : /// For each query received over the connection,
411 : /// `process_query` creates a child context from this one.
412 : connection_ctx: RequestContext,
413 :
414 : perf_span_fields: ConnectionPerfSpanFields,
415 :
416 : cancel: CancellationToken,
417 :
418 : /// None only while pagestream protocol is being processed.
419 : timeline_handles: Option<TimelineHandles>,
420 :
421 : pipelining_config: PageServicePipeliningConfig,
422 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
423 :
424 : gate_guard: GateGuard,
425 : }
426 :
427 : struct TimelineHandles {
428 : wrapper: TenantManagerWrapper,
429 : /// Note on size: the typical size of this map is 1. The largest size we expect
430 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
431 : /// or the ratio used when splitting shards (i.e. how many children created from one)
432 : /// parent shard, where a "large" number might be ~8.
433 : handles: timeline::handle::Cache<TenantManagerTypes>,
434 : }
435 :
436 : impl TimelineHandles {
437 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
438 0 : Self {
439 0 : wrapper: TenantManagerWrapper {
440 0 : tenant_manager,
441 0 : tenant_id: OnceCell::new(),
442 0 : },
443 0 : handles: Default::default(),
444 0 : }
445 0 : }
446 0 : async fn get(
447 0 : &mut self,
448 0 : tenant_id: TenantId,
449 0 : timeline_id: TimelineId,
450 0 : shard_selector: ShardSelector,
451 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
452 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
453 0 : return Err(GetActiveTimelineError::Tenant(
454 0 : GetActiveTenantError::SwitchedTenant,
455 0 : ));
456 0 : }
457 0 : self.handles
458 0 : .get(timeline_id, shard_selector, &self.wrapper)
459 0 : .await
460 0 : .map_err(|e| match e {
461 0 : timeline::handle::GetError::TenantManager(e) => e,
462 : timeline::handle::GetError::PerTimelineStateShutDown => {
463 0 : trace!("per-timeline state shut down");
464 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
465 : }
466 0 : })
467 0 : }
468 :
469 0 : fn tenant_id(&self) -> Option<TenantId> {
470 0 : self.wrapper.tenant_id.get().copied()
471 0 : }
472 : }
473 :
474 : pub(crate) struct TenantManagerWrapper {
475 : tenant_manager: Arc<TenantManager>,
476 : // We do not support switching tenant_id on a connection at this point.
477 : // We can can add support for this later if needed without changing
478 : // the protocol.
479 : tenant_id: once_cell::sync::OnceCell<TenantId>,
480 : }
481 :
482 : #[derive(Debug)]
483 : pub(crate) struct TenantManagerTypes;
484 :
485 : impl timeline::handle::Types for TenantManagerTypes {
486 : type TenantManagerError = GetActiveTimelineError;
487 : type TenantManager = TenantManagerWrapper;
488 : type Timeline = TenantManagerCacheItem;
489 : }
490 :
491 : pub(crate) struct TenantManagerCacheItem {
492 : pub(crate) timeline: Arc<Timeline>,
493 : // allow() for cheap propagation through RequestContext inside a task
494 : #[allow(clippy::redundant_allocation)]
495 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
496 : #[allow(dead_code)] // we store it to keep the gate open
497 : pub(crate) gate_guard: GateGuard,
498 : }
499 :
500 : impl std::ops::Deref for TenantManagerCacheItem {
501 : type Target = Arc<Timeline>;
502 0 : fn deref(&self) -> &Self::Target {
503 0 : &self.timeline
504 0 : }
505 : }
506 :
507 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
508 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
509 0 : Timeline::shard_timeline_id(&self.timeline)
510 0 : }
511 :
512 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
513 0 : &self.timeline.handles
514 0 : }
515 :
516 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
517 0 : Timeline::get_shard_identity(&self.timeline)
518 0 : }
519 : }
520 :
521 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
522 0 : async fn resolve(
523 0 : &self,
524 0 : timeline_id: TimelineId,
525 0 : shard_selector: ShardSelector,
526 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
527 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
528 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
529 0 : let wait_start = Instant::now();
530 0 : let deadline = wait_start + timeout;
531 0 : let tenant_shard = loop {
532 0 : let resolved = self
533 0 : .tenant_manager
534 0 : .resolve_attached_shard(tenant_id, shard_selector);
535 0 : match resolved {
536 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
537 : ShardResolveResult::NotFound => {
538 0 : return Err(GetActiveTimelineError::Tenant(
539 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
540 0 : ));
541 : }
542 0 : ShardResolveResult::InProgress(barrier) => {
543 : // We can't authoritatively answer right now: wait for InProgress state
544 : // to end, then try again
545 0 : tokio::select! {
546 0 : _ = barrier.wait() => {
547 0 : // The barrier completed: proceed around the loop to try looking up again
548 0 : },
549 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
550 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
551 0 : latest_state: None,
552 0 : wait_time: timeout,
553 0 : }));
554 : }
555 : }
556 : }
557 : };
558 : };
559 :
560 0 : tracing::debug!("Waiting for tenant to enter active state...");
561 0 : tenant_shard
562 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
563 0 : .await
564 0 : .map_err(GetActiveTimelineError::Tenant)?;
565 :
566 0 : let timeline = tenant_shard
567 0 : .get_timeline(timeline_id, true)
568 0 : .map_err(GetActiveTimelineError::Timeline)?;
569 :
570 0 : let gate_guard = match timeline.gate.enter() {
571 0 : Ok(guard) => guard,
572 : Err(_) => {
573 0 : return Err(GetActiveTimelineError::Timeline(
574 0 : GetTimelineError::ShuttingDown,
575 0 : ));
576 : }
577 : };
578 :
579 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
580 :
581 0 : Ok(TenantManagerCacheItem {
582 0 : timeline,
583 0 : metrics,
584 0 : gate_guard,
585 0 : })
586 0 : }
587 : }
588 :
589 : #[derive(thiserror::Error, Debug)]
590 : enum PageStreamError {
591 : /// We encountered an error that should prompt the client to reconnect:
592 : /// in practice this means we drop the connection without sending a response.
593 : #[error("Reconnect required: {0}")]
594 : Reconnect(Cow<'static, str>),
595 :
596 : /// We were instructed to shutdown while processing the query
597 : #[error("Shutting down")]
598 : Shutdown,
599 :
600 : /// Something went wrong reading a page: this likely indicates a pageserver bug
601 : #[error("Read error")]
602 : Read(#[source] PageReconstructError),
603 :
604 : /// Ran out of time waiting for an LSN
605 : #[error("LSN timeout: {0}")]
606 : LsnTimeout(WaitLsnError),
607 :
608 : /// The entity required to serve the request (tenant or timeline) is not found,
609 : /// or is not found in a suitable state to serve a request.
610 : #[error("Not found: {0}")]
611 : NotFound(Cow<'static, str>),
612 :
613 : /// Request asked for something that doesn't make sense, like an invalid LSN
614 : #[error("Bad request: {0}")]
615 : BadRequest(Cow<'static, str>),
616 : }
617 :
618 : impl From<PageStreamError> for tonic::Status {
619 0 : fn from(err: PageStreamError) -> Self {
620 : use tonic::Code;
621 0 : let message = err.to_string();
622 0 : let code = match err {
623 0 : PageStreamError::Reconnect(_) => Code::Unavailable,
624 0 : PageStreamError::Shutdown => Code::Unavailable,
625 0 : PageStreamError::Read(err) => match err {
626 0 : PageReconstructError::Cancelled => Code::Unavailable,
627 0 : PageReconstructError::MissingKey(_) => Code::NotFound,
628 0 : PageReconstructError::AncestorLsnTimeout(err) => tonic::Status::from(err).code(),
629 0 : PageReconstructError::Other(_) => Code::Internal,
630 0 : PageReconstructError::WalRedo(_) => Code::Internal,
631 : },
632 0 : PageStreamError::LsnTimeout(err) => tonic::Status::from(err).code(),
633 0 : PageStreamError::NotFound(_) => Code::NotFound,
634 0 : PageStreamError::BadRequest(_) => Code::InvalidArgument,
635 : };
636 0 : tonic::Status::new(code, message)
637 0 : }
638 : }
639 :
640 : impl From<PageReconstructError> for PageStreamError {
641 0 : fn from(value: PageReconstructError) -> Self {
642 0 : match value {
643 0 : PageReconstructError::Cancelled => Self::Shutdown,
644 0 : e => Self::Read(e),
645 : }
646 0 : }
647 : }
648 :
649 : impl From<GetActiveTimelineError> for PageStreamError {
650 0 : fn from(value: GetActiveTimelineError) -> Self {
651 0 : match value {
652 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
653 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
654 : TenantState::Stopping { .. },
655 : ))
656 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
657 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
658 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
659 : }
660 0 : }
661 : }
662 :
663 : impl From<WaitLsnError> for PageStreamError {
664 0 : fn from(value: WaitLsnError) -> Self {
665 0 : match value {
666 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
667 0 : WaitLsnError::Shutdown => Self::Shutdown,
668 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
669 : }
670 0 : }
671 : }
672 :
673 : impl From<WaitLsnError> for QueryError {
674 0 : fn from(value: WaitLsnError) -> Self {
675 0 : match value {
676 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
677 0 : WaitLsnError::Shutdown => Self::Shutdown,
678 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
679 : }
680 0 : }
681 : }
682 :
683 : #[derive(thiserror::Error, Debug)]
684 : struct BatchedPageStreamError {
685 : req: PagestreamRequest,
686 : err: PageStreamError,
687 : }
688 :
689 : impl std::fmt::Display for BatchedPageStreamError {
690 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691 0 : self.err.fmt(f)
692 0 : }
693 : }
694 :
695 : struct BatchedGetPageRequest {
696 : req: PagestreamGetPageRequest,
697 : timer: SmgrOpTimer,
698 : lsn_range: LsnRange,
699 : ctx: RequestContext,
700 : // If the request is perf enabled, this contains a context
701 : // with a perf span tracking the time spent waiting for the executor.
702 : batch_wait_ctx: Option<RequestContext>,
703 : }
704 :
705 : #[cfg(feature = "testing")]
706 : struct BatchedTestRequest {
707 : req: pagestream_api::PagestreamTestRequest,
708 : timer: SmgrOpTimer,
709 : }
710 :
711 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
712 : /// so that we don't keep the [`Timeline::gate`] open while the batch
713 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
714 : #[derive(IntoStaticStr)]
715 : #[allow(clippy::large_enum_variant)]
716 : enum BatchedFeMessage {
717 : Exists {
718 : span: Span,
719 : timer: SmgrOpTimer,
720 : shard: WeakHandle<TenantManagerTypes>,
721 : req: PagestreamExistsRequest,
722 : },
723 : Nblocks {
724 : span: Span,
725 : timer: SmgrOpTimer,
726 : shard: WeakHandle<TenantManagerTypes>,
727 : req: PagestreamNblocksRequest,
728 : },
729 : GetPage {
730 : span: Span,
731 : shard: WeakHandle<TenantManagerTypes>,
732 : pages: SmallVec<[BatchedGetPageRequest; 1]>,
733 : batch_break_reason: GetPageBatchBreakReason,
734 : },
735 : DbSize {
736 : span: Span,
737 : timer: SmgrOpTimer,
738 : shard: WeakHandle<TenantManagerTypes>,
739 : req: PagestreamDbSizeRequest,
740 : },
741 : GetSlruSegment {
742 : span: Span,
743 : timer: SmgrOpTimer,
744 : shard: WeakHandle<TenantManagerTypes>,
745 : req: PagestreamGetSlruSegmentRequest,
746 : },
747 : #[cfg(feature = "testing")]
748 : Test {
749 : span: Span,
750 : shard: WeakHandle<TenantManagerTypes>,
751 : requests: Vec<BatchedTestRequest>,
752 : },
753 : RespondError {
754 : span: Span,
755 : error: BatchedPageStreamError,
756 : },
757 : }
758 :
759 : impl BatchedFeMessage {
760 0 : fn as_static_str(&self) -> &'static str {
761 0 : self.into()
762 0 : }
763 :
764 0 : fn observe_execution_start(&mut self, at: Instant) {
765 0 : match self {
766 0 : BatchedFeMessage::Exists { timer, .. }
767 0 : | BatchedFeMessage::Nblocks { timer, .. }
768 0 : | BatchedFeMessage::DbSize { timer, .. }
769 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
770 0 : timer.observe_execution_start(at);
771 0 : }
772 0 : BatchedFeMessage::GetPage { pages, .. } => {
773 0 : for page in pages {
774 0 : page.timer.observe_execution_start(at);
775 0 : }
776 : }
777 : #[cfg(feature = "testing")]
778 0 : BatchedFeMessage::Test { requests, .. } => {
779 0 : for req in requests {
780 0 : req.timer.observe_execution_start(at);
781 0 : }
782 : }
783 0 : BatchedFeMessage::RespondError { .. } => {}
784 : }
785 0 : }
786 :
787 0 : fn should_break_batch(
788 0 : &self,
789 0 : other: &BatchedFeMessage,
790 0 : max_batch_size: NonZeroUsize,
791 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
792 0 : ) -> Option<GetPageBatchBreakReason> {
793 0 : match (self, other) {
794 : (
795 : BatchedFeMessage::GetPage {
796 0 : shard: accum_shard,
797 0 : pages: accum_pages,
798 : ..
799 : },
800 : BatchedFeMessage::GetPage {
801 0 : shard: this_shard,
802 0 : pages: this_pages,
803 : ..
804 : },
805 : ) => {
806 0 : assert_eq!(this_pages.len(), 1);
807 0 : if accum_pages.len() >= max_batch_size.get() {
808 0 : trace!(%max_batch_size, "stopping batching because of batch size");
809 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
810 :
811 0 : return Some(GetPageBatchBreakReason::BatchFull);
812 0 : }
813 0 : if !accum_shard.is_same_handle_as(this_shard) {
814 0 : trace!("stopping batching because timeline object mismatch");
815 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
816 : // But the current logic for keeping responses in order does not support that.
817 :
818 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
819 0 : }
820 :
821 0 : match batching_strategy {
822 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
823 0 : if let Some(last_in_batch) = accum_pages.last() {
824 0 : if last_in_batch.lsn_range.effective_lsn
825 0 : != this_pages[0].lsn_range.effective_lsn
826 : {
827 0 : trace!(
828 : accum_lsn = %last_in_batch.lsn_range.effective_lsn,
829 0 : this_lsn = %this_pages[0].lsn_range.effective_lsn,
830 0 : "stopping batching because LSN changed"
831 : );
832 :
833 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
834 0 : }
835 0 : }
836 : }
837 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
838 : // The read path doesn't curently support serving the same page at different LSNs.
839 : // While technically possible, it's uncertain if the complexity is worth it.
840 : // Break the batch if such a case is encountered.
841 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
842 0 : batched.req.rel == this_pages[0].req.rel
843 0 : && batched.req.blkno == this_pages[0].req.blkno
844 0 : && batched.lsn_range.effective_lsn
845 0 : != this_pages[0].lsn_range.effective_lsn
846 0 : });
847 :
848 0 : if same_page_different_lsn {
849 0 : trace!(
850 0 : rel=%this_pages[0].req.rel,
851 0 : blkno=%this_pages[0].req.blkno,
852 0 : lsn=%this_pages[0].lsn_range.effective_lsn,
853 0 : "stopping batching because same page was requested at different LSNs"
854 : );
855 :
856 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
857 0 : }
858 : }
859 : }
860 :
861 0 : None
862 : }
863 : #[cfg(feature = "testing")]
864 : (
865 : BatchedFeMessage::Test {
866 0 : shard: accum_shard,
867 0 : requests: accum_requests,
868 : ..
869 : },
870 : BatchedFeMessage::Test {
871 0 : shard: this_shard,
872 0 : requests: this_requests,
873 : ..
874 : },
875 : ) => {
876 0 : assert!(this_requests.len() == 1);
877 0 : if accum_requests.len() >= max_batch_size.get() {
878 0 : trace!(%max_batch_size, "stopping batching because of batch size");
879 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
880 0 : return Some(GetPageBatchBreakReason::BatchFull);
881 0 : }
882 0 : if !accum_shard.is_same_handle_as(this_shard) {
883 0 : trace!("stopping batching because timeline object mismatch");
884 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
885 : // But the current logic for keeping responses in order does not support that.
886 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
887 0 : }
888 0 : let this_batch_key = this_requests[0].req.batch_key;
889 0 : let accum_batch_key = accum_requests[0].req.batch_key;
890 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
891 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
892 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
893 0 : }
894 0 : None
895 : }
896 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
897 : }
898 0 : }
899 : }
900 :
901 : impl PageServerHandler {
902 : #[allow(clippy::too_many_arguments)]
903 0 : pub fn new(
904 0 : tenant_manager: Arc<TenantManager>,
905 0 : auth: Option<Arc<SwappableJwtAuth>>,
906 0 : pipelining_config: PageServicePipeliningConfig,
907 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
908 0 : perf_span_fields: ConnectionPerfSpanFields,
909 0 : connection_ctx: RequestContext,
910 0 : cancel: CancellationToken,
911 0 : gate_guard: GateGuard,
912 0 : ) -> Self {
913 0 : PageServerHandler {
914 0 : auth,
915 0 : claims: None,
916 0 : connection_ctx,
917 0 : perf_span_fields,
918 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
919 0 : cancel,
920 0 : pipelining_config,
921 0 : get_vectored_concurrent_io,
922 0 : gate_guard,
923 0 : }
924 0 : }
925 :
926 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
927 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
928 : /// cancellation if there aren't any timelines in the cache.
929 : ///
930 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
931 : /// timeline cancellation token.
932 0 : async fn flush_cancellable<IO>(
933 0 : &self,
934 0 : pgb: &mut PostgresBackend<IO>,
935 0 : cancel: &CancellationToken,
936 0 : ) -> Result<(), QueryError>
937 0 : where
938 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
939 0 : {
940 0 : tokio::select!(
941 0 : flush_r = pgb.flush() => {
942 0 : Ok(flush_r?)
943 : },
944 0 : _ = cancel.cancelled() => {
945 0 : Err(QueryError::Shutdown)
946 : }
947 : )
948 0 : }
949 :
950 : #[allow(clippy::too_many_arguments)]
951 0 : async fn pagestream_read_message<IO>(
952 0 : pgb: &mut PostgresBackendReader<IO>,
953 0 : tenant_id: TenantId,
954 0 : timeline_id: TimelineId,
955 0 : timeline_handles: &mut TimelineHandles,
956 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
957 0 : cancel: &CancellationToken,
958 0 : ctx: &RequestContext,
959 0 : protocol_version: PagestreamProtocolVersion,
960 0 : parent_span: Span,
961 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
962 0 : where
963 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
964 0 : {
965 0 : let msg = tokio::select! {
966 : biased;
967 0 : _ = cancel.cancelled() => {
968 0 : return Err(QueryError::Shutdown)
969 : }
970 0 : msg = pgb.read_message() => { msg }
971 : };
972 :
973 0 : let received_at = Instant::now();
974 :
975 0 : let copy_data_bytes = match msg? {
976 0 : Some(FeMessage::CopyData(bytes)) => bytes,
977 : Some(FeMessage::Terminate) => {
978 0 : return Ok(None);
979 : }
980 0 : Some(m) => {
981 0 : return Err(QueryError::Other(anyhow::anyhow!(
982 0 : "unexpected message: {m:?} during COPY"
983 0 : )));
984 : }
985 : None => {
986 0 : return Ok(None);
987 : } // client disconnected
988 : };
989 0 : trace!("query: {copy_data_bytes:?}");
990 :
991 0 : fail::fail_point!("ps::handle-pagerequest-message");
992 :
993 : // parse request
994 0 : let neon_fe_msg =
995 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
996 :
997 0 : let batched_msg = match neon_fe_msg {
998 0 : PagestreamFeMessage::Exists(req) => {
999 0 : let shard = timeline_handles
1000 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1001 0 : .await?;
1002 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1003 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1004 0 : let timer = Self::record_op_start_and_throttle(
1005 0 : &shard,
1006 0 : metrics::SmgrQueryType::GetRelExists,
1007 0 : received_at,
1008 0 : )
1009 0 : .await?;
1010 0 : BatchedFeMessage::Exists {
1011 0 : span,
1012 0 : timer,
1013 0 : shard: shard.downgrade(),
1014 0 : req,
1015 0 : }
1016 : }
1017 0 : PagestreamFeMessage::Nblocks(req) => {
1018 0 : let shard = timeline_handles
1019 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1020 0 : .await?;
1021 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1022 0 : let timer = Self::record_op_start_and_throttle(
1023 0 : &shard,
1024 0 : metrics::SmgrQueryType::GetRelSize,
1025 0 : received_at,
1026 0 : )
1027 0 : .await?;
1028 0 : BatchedFeMessage::Nblocks {
1029 0 : span,
1030 0 : timer,
1031 0 : shard: shard.downgrade(),
1032 0 : req,
1033 0 : }
1034 : }
1035 0 : PagestreamFeMessage::DbSize(req) => {
1036 0 : let shard = timeline_handles
1037 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1038 0 : .await?;
1039 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1040 0 : let timer = Self::record_op_start_and_throttle(
1041 0 : &shard,
1042 0 : metrics::SmgrQueryType::GetDbSize,
1043 0 : received_at,
1044 0 : )
1045 0 : .await?;
1046 0 : BatchedFeMessage::DbSize {
1047 0 : span,
1048 0 : timer,
1049 0 : shard: shard.downgrade(),
1050 0 : req,
1051 0 : }
1052 : }
1053 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1054 0 : let shard = timeline_handles
1055 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1056 0 : .await?;
1057 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1058 0 : let timer = Self::record_op_start_and_throttle(
1059 0 : &shard,
1060 0 : metrics::SmgrQueryType::GetSlruSegment,
1061 0 : received_at,
1062 0 : )
1063 0 : .await?;
1064 0 : BatchedFeMessage::GetSlruSegment {
1065 0 : span,
1066 0 : timer,
1067 0 : shard: shard.downgrade(),
1068 0 : req,
1069 0 : }
1070 : }
1071 0 : PagestreamFeMessage::GetPage(req) => {
1072 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1073 : macro_rules! mkspan {
1074 : (before shard routing) => {{
1075 : tracing::info_span!(
1076 : parent: &parent_span,
1077 : "handle_get_page_request",
1078 : request_id = %req.hdr.reqid,
1079 : rel = %req.rel,
1080 : blkno = %req.blkno,
1081 : req_lsn = %req.hdr.request_lsn,
1082 : not_modified_since_lsn = %req.hdr.not_modified_since,
1083 : )
1084 : }};
1085 : ($shard_id:expr) => {{
1086 : tracing::info_span!(
1087 : parent: &parent_span,
1088 : "handle_get_page_request",
1089 : request_id = %req.hdr.reqid,
1090 : rel = %req.rel,
1091 : blkno = %req.blkno,
1092 : req_lsn = %req.hdr.request_lsn,
1093 : not_modified_since_lsn = %req.hdr.not_modified_since,
1094 : shard_id = %$shard_id,
1095 : )
1096 : }};
1097 : }
1098 :
1099 : macro_rules! respond_error {
1100 : ($span:expr, $error:expr) => {{
1101 : let error = BatchedFeMessage::RespondError {
1102 : span: $span,
1103 : error: BatchedPageStreamError {
1104 : req: req.hdr,
1105 : err: $error,
1106 : },
1107 : };
1108 : Ok(Some(error))
1109 : }};
1110 : }
1111 :
1112 0 : let key = rel_block_to_key(req.rel, req.blkno);
1113 :
1114 0 : let res = timeline_handles
1115 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1116 0 : .await;
1117 :
1118 0 : let shard = match res {
1119 0 : Ok(tl) => tl,
1120 0 : Err(e) => {
1121 0 : let span = mkspan!(before shard routing);
1122 0 : match e {
1123 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1124 : // We already know this tenant exists in general, because we resolved it at
1125 : // start of connection. Getting a NotFound here indicates that the shard containing
1126 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1127 : // mapping is out of date.
1128 : //
1129 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1130 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1131 : // and talk to a different pageserver.
1132 0 : MISROUTED_PAGESTREAM_REQUESTS.inc();
1133 0 : return respond_error!(
1134 0 : span,
1135 0 : PageStreamError::Reconnect(
1136 0 : "getpage@lsn request routed to wrong shard".into()
1137 0 : )
1138 : );
1139 : }
1140 0 : e => {
1141 0 : return respond_error!(span, e.into());
1142 : }
1143 : }
1144 : }
1145 : };
1146 :
1147 0 : let ctx = if shard.is_get_page_request_sampled() {
1148 0 : RequestContextBuilder::from(ctx)
1149 0 : .root_perf_span(|| {
1150 0 : info_span!(
1151 : target: PERF_TRACE_TARGET,
1152 : "GET_PAGE",
1153 : peer_addr = conn_perf_span_fields.peer_addr,
1154 : application_name = conn_perf_span_fields.application_name,
1155 : compute_mode = conn_perf_span_fields.compute_mode,
1156 : tenant_id = %tenant_id,
1157 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1158 : timeline_id = %timeline_id,
1159 : lsn = %req.hdr.request_lsn,
1160 : not_modified_since_lsn = %req.hdr.not_modified_since,
1161 : request_id = %req.hdr.reqid,
1162 : key = %key,
1163 : )
1164 0 : })
1165 0 : .attached_child()
1166 : } else {
1167 0 : ctx.attached_child()
1168 : };
1169 :
1170 : // This ctx travels as part of the BatchedFeMessage through
1171 : // batching into the request handler.
1172 : // The request handler needs to do some per-request work
1173 : // (relsize check) before dispatching the batch as a single
1174 : // get_vectored call to the Timeline.
1175 : // This ctx will be used for the reslize check, whereas the
1176 : // get_vectored call will be a different ctx with separate
1177 : // perf span.
1178 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1179 :
1180 : // Similar game for this `span`: we funnel it through so that
1181 : // request handler log messages contain the request-specific fields.
1182 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1183 :
1184 0 : let timer = Self::record_op_start_and_throttle(
1185 0 : &shard,
1186 0 : metrics::SmgrQueryType::GetPageAtLsn,
1187 0 : received_at,
1188 : )
1189 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1190 0 : info_span!(
1191 : target: PERF_TRACE_TARGET,
1192 0 : parent: current_perf_span,
1193 : "THROTTLE",
1194 : )
1195 0 : })
1196 0 : .await?;
1197 :
1198 : // We're holding the Handle
1199 0 : let effective_lsn = match Self::effective_request_lsn(
1200 0 : &shard,
1201 0 : shard.get_last_record_lsn(),
1202 0 : req.hdr.request_lsn,
1203 0 : req.hdr.not_modified_since,
1204 0 : &shard.get_applied_gc_cutoff_lsn(),
1205 0 : ) {
1206 0 : Ok(lsn) => lsn,
1207 0 : Err(e) => {
1208 0 : return respond_error!(span, e);
1209 : }
1210 : };
1211 :
1212 0 : let batch_wait_ctx = if ctx.has_perf_span() {
1213 : Some(
1214 0 : RequestContextBuilder::from(&ctx)
1215 0 : .perf_span(|crnt_perf_span| {
1216 0 : info_span!(
1217 : target: PERF_TRACE_TARGET,
1218 0 : parent: crnt_perf_span,
1219 : "WAIT_EXECUTOR",
1220 : )
1221 0 : })
1222 0 : .attached_child(),
1223 : )
1224 : } else {
1225 0 : None
1226 : };
1227 :
1228 : BatchedFeMessage::GetPage {
1229 0 : span,
1230 0 : shard: shard.downgrade(),
1231 0 : pages: smallvec![BatchedGetPageRequest {
1232 0 : req,
1233 0 : timer,
1234 0 : lsn_range: LsnRange {
1235 0 : effective_lsn,
1236 0 : request_lsn: req.hdr.request_lsn
1237 0 : },
1238 0 : ctx,
1239 0 : batch_wait_ctx,
1240 0 : }],
1241 : // The executor grabs the batch when it becomes idle.
1242 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1243 : // default reason for breaking the batch.
1244 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1245 : }
1246 : }
1247 : #[cfg(feature = "testing")]
1248 0 : PagestreamFeMessage::Test(req) => {
1249 0 : let shard = timeline_handles
1250 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1251 0 : .await?;
1252 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1253 0 : let timer = Self::record_op_start_and_throttle(
1254 0 : &shard,
1255 0 : metrics::SmgrQueryType::Test,
1256 0 : received_at,
1257 0 : )
1258 0 : .await?;
1259 0 : BatchedFeMessage::Test {
1260 0 : span,
1261 0 : shard: shard.downgrade(),
1262 0 : requests: vec![BatchedTestRequest { req, timer }],
1263 0 : }
1264 : }
1265 : };
1266 0 : Ok(Some(batched_msg))
1267 0 : }
1268 :
1269 : /// Starts a SmgrOpTimer at received_at and throttles the request.
1270 0 : async fn record_op_start_and_throttle(
1271 0 : shard: &Handle<TenantManagerTypes>,
1272 0 : op: metrics::SmgrQueryType,
1273 0 : received_at: Instant,
1274 0 : ) -> Result<SmgrOpTimer, QueryError> {
1275 : // It's important to start the smgr op metric recorder as early as possible
1276 : // so that the _started counters are incremented before we do
1277 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
1278 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
1279 0 : let now = Instant::now();
1280 0 : timer.observe_throttle_start(now);
1281 0 : let throttled = tokio::select! {
1282 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
1283 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
1284 : };
1285 0 : timer.observe_throttle_done(throttled);
1286 0 : Ok(timer)
1287 0 : }
1288 :
1289 : /// Post-condition: `batch` is Some()
1290 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1291 : #[allow(clippy::boxed_local)]
1292 : fn pagestream_do_batch(
1293 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1294 : max_batch_size: NonZeroUsize,
1295 : batch: &mut Result<BatchedFeMessage, QueryError>,
1296 : this_msg: Result<BatchedFeMessage, QueryError>,
1297 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1298 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1299 :
1300 : let this_msg = match this_msg {
1301 : Ok(this_msg) => this_msg,
1302 : Err(e) => return Err(Err(e)),
1303 : };
1304 :
1305 : let eligible_batch = match batch {
1306 : Ok(b) => b,
1307 : Err(_) => {
1308 : return Err(Ok(this_msg));
1309 : }
1310 : };
1311 :
1312 : let batch_break =
1313 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1314 :
1315 : match batch_break {
1316 : Some(reason) => {
1317 : if let BatchedFeMessage::GetPage {
1318 : batch_break_reason, ..
1319 : } = eligible_batch
1320 : {
1321 : *batch_break_reason = reason;
1322 : }
1323 :
1324 : Err(Ok(this_msg))
1325 : }
1326 : None => {
1327 : // ok to batch
1328 : match (eligible_batch, this_msg) {
1329 : (
1330 : BatchedFeMessage::GetPage {
1331 : pages: accum_pages, ..
1332 : },
1333 : BatchedFeMessage::GetPage {
1334 : pages: this_pages, ..
1335 : },
1336 : ) => {
1337 : accum_pages.extend(this_pages);
1338 : Ok(())
1339 : }
1340 : #[cfg(feature = "testing")]
1341 : (
1342 : BatchedFeMessage::Test {
1343 : requests: accum_requests,
1344 : ..
1345 : },
1346 : BatchedFeMessage::Test {
1347 : requests: this_requests,
1348 : ..
1349 : },
1350 : ) => {
1351 : accum_requests.extend(this_requests);
1352 : Ok(())
1353 : }
1354 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1355 : _ => unreachable!(),
1356 : }
1357 : }
1358 : }
1359 : }
1360 :
1361 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1362 : async fn pagestream_handle_batched_message<IO>(
1363 : &mut self,
1364 : pgb_writer: &mut PostgresBackend<IO>,
1365 : batch: BatchedFeMessage,
1366 : io_concurrency: IoConcurrency,
1367 : cancel: &CancellationToken,
1368 : protocol_version: PagestreamProtocolVersion,
1369 : ctx: &RequestContext,
1370 : ) -> Result<(), QueryError>
1371 : where
1372 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1373 : {
1374 : let started_at = Instant::now();
1375 : let batch = {
1376 : let mut batch = batch;
1377 : batch.observe_execution_start(started_at);
1378 : batch
1379 : };
1380 :
1381 : // Dispatch the batch to the appropriate request handler.
1382 : let log_slow_name = batch.as_static_str();
1383 : let (mut handler_results, span) = {
1384 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1385 : // won't fit on the stack.
1386 : let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
1387 : batch,
1388 : io_concurrency,
1389 : ctx,
1390 : ));
1391 : log_slow(
1392 : log_slow_name,
1393 : LOG_SLOW_GETPAGE_THRESHOLD,
1394 : boxpinned.as_mut(),
1395 : )
1396 : .await?
1397 : };
1398 :
1399 : // We purposefully don't count flush time into the smgr operation timer.
1400 : //
1401 : // The reason is that current compute client will not perform protocol processing
1402 : // if the postgres backend process is doing things other than `->smgr_read()`.
1403 : // This is especially the case for prefetch.
1404 : //
1405 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1406 : // all the way into our flush call below.
1407 : //
1408 : // The timer's underlying metric is used for a storage-internal latency SLO and
1409 : // we don't want to include latency in it that we can't control.
1410 : // And as pointed out above, in this case, we don't control the time that flush will take.
1411 : //
1412 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1413 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1414 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1415 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1416 : let flush_timers = {
1417 : let flushing_start_time = Instant::now();
1418 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1419 : for handler_result in &mut handler_results {
1420 : let flush_timer = match handler_result {
1421 : Ok((_response, timer, _ctx)) => Some(
1422 : timer
1423 : .observe_execution_end(flushing_start_time)
1424 : .expect("we are the first caller"),
1425 : ),
1426 : Err(_) => {
1427 : // TODO: measure errors
1428 : None
1429 : }
1430 : };
1431 : flush_timers.push(flush_timer);
1432 : }
1433 : assert_eq!(flush_timers.len(), handler_results.len());
1434 : flush_timers
1435 : };
1436 :
1437 : // Map handler result to protocol behavior.
1438 : // Some handler errors cause exit from pagestream protocol.
1439 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1440 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1441 : let (response_msg, ctx) = match handler_result {
1442 : Err(e) => match &e.err {
1443 : PageStreamError::Shutdown => {
1444 : // BEGIN HADRON
1445 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1446 : .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
1447 : .inc();
1448 : // END HADRON
1449 :
1450 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1451 : // shutdown, then do not send the error to the client. Instead just drop the
1452 : // connection.
1453 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1454 : return Err(QueryError::Shutdown);
1455 : }
1456 : PageStreamError::Reconnect(_reason) => {
1457 0 : span.in_scope(|| {
1458 : // BEGIN HADRON
1459 : // We can get here because the compute node is pointing at the wrong PS. We
1460 : // already have a metric to keep track of this so suppressing this log to
1461 : // reduce log spam. The information in this log message is not going to be that
1462 : // helpful given the volume of logs that can be generated.
1463 : // info!("handler requested reconnect: {reason}")
1464 : // END HADRON
1465 0 : });
1466 : // BEGIN HADRON
1467 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1468 : .with_label_values(&[
1469 : metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
1470 : ])
1471 : .inc();
1472 : // END HADRON
1473 : return Err(QueryError::Reconnect);
1474 : }
1475 : PageStreamError::Read(_)
1476 : | PageStreamError::LsnTimeout(_)
1477 : | PageStreamError::NotFound(_)
1478 : | PageStreamError::BadRequest(_) => {
1479 : // BEGIN HADRON
1480 : if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
1481 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1482 : .with_label_values(&[
1483 : metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
1484 : ])
1485 : .inc();
1486 : } else {
1487 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1488 : .with_label_values(&[
1489 : metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
1490 : ])
1491 : .inc();
1492 : }
1493 : // END HADRON
1494 :
1495 : // print the all details to the log with {:#}, but for the client the
1496 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1497 : // here includes cancellation which is not an error.
1498 : let full = utils::error::report_compact_sources(&e.err);
1499 0 : span.in_scope(|| {
1500 0 : error!("error reading relation or page version: {full:#}")
1501 0 : });
1502 :
1503 : (
1504 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1505 : req: e.req,
1506 : message: e.err.to_string(),
1507 : }),
1508 : None,
1509 : )
1510 : }
1511 : },
1512 : Ok((response_msg, _op_timer_already_observed, ctx)) => {
1513 : // BEGIN HADRON
1514 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1515 : .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
1516 : .inc();
1517 : // END HADRON
1518 :
1519 : (response_msg, Some(ctx))
1520 : }
1521 : };
1522 :
1523 0 : let ctx = ctx.map(|req_ctx| {
1524 0 : RequestContextBuilder::from(&req_ctx)
1525 0 : .perf_span(|crnt_perf_span| {
1526 0 : info_span!(
1527 : target: PERF_TRACE_TARGET,
1528 0 : parent: crnt_perf_span,
1529 : "FLUSH_RESPONSE",
1530 : )
1531 0 : })
1532 0 : .attached_child()
1533 0 : });
1534 :
1535 : //
1536 : // marshal & transmit response message
1537 : //
1538 :
1539 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1540 : &response_msg.serialize(protocol_version),
1541 : ))?;
1542 :
1543 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1544 :
1545 : // what we want to do
1546 : let socket_fd = pgb_writer.socket_fd;
1547 : let flush_fut = pgb_writer.flush();
1548 : // metric for how long flushing takes
1549 : let flush_fut = match flushing_timer {
1550 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1551 : Instant::now(),
1552 : flush_fut,
1553 : socket_fd,
1554 : )),
1555 : None => futures::future::Either::Right(flush_fut),
1556 : };
1557 :
1558 : let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
1559 : futures::future::Either::Left(
1560 0 : flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
1561 0 : current_perf_span.clone()
1562 0 : }),
1563 : )
1564 : } else {
1565 : futures::future::Either::Right(flush_fut)
1566 : };
1567 :
1568 : // do it while respecting cancellation
1569 0 : let _: () = async move {
1570 0 : tokio::select! {
1571 : biased;
1572 0 : _ = cancel.cancelled() => {
1573 : // We were requested to shut down.
1574 0 : info!("shutdown request received in page handler");
1575 0 : return Err(QueryError::Shutdown)
1576 : }
1577 0 : res = flush_fut => {
1578 0 : res?;
1579 : }
1580 : }
1581 0 : Ok(())
1582 0 : }
1583 : .await?;
1584 : }
1585 : Ok(())
1586 : }
1587 :
1588 : /// Helper which dispatches a batched message to the appropriate handler.
1589 : /// Returns a vec of results, along with the extracted trace span.
1590 0 : async fn pagestream_dispatch_batched_message(
1591 0 : batch: BatchedFeMessage,
1592 0 : io_concurrency: IoConcurrency,
1593 0 : ctx: &RequestContext,
1594 0 : ) -> Result<
1595 0 : (
1596 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
1597 0 : Span,
1598 0 : ),
1599 0 : QueryError,
1600 0 : > {
1601 : macro_rules! upgrade_handle_and_set_context {
1602 : ($shard:ident) => {{
1603 : let weak_handle = &$shard;
1604 : let handle = weak_handle.upgrade()?;
1605 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1606 : (handle, ctx)
1607 : }};
1608 : }
1609 0 : Ok(match batch {
1610 : BatchedFeMessage::Exists {
1611 0 : span,
1612 0 : timer,
1613 0 : shard,
1614 0 : req,
1615 : } => {
1616 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1617 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1618 : (
1619 0 : vec![
1620 0 : Self::handle_get_rel_exists_request(&shard, &req, &ctx)
1621 0 : .instrument(span.clone())
1622 0 : .await
1623 0 : .map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
1624 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1625 : ],
1626 0 : span,
1627 : )
1628 : }
1629 : BatchedFeMessage::Nblocks {
1630 0 : span,
1631 0 : timer,
1632 0 : shard,
1633 0 : req,
1634 : } => {
1635 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1636 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1637 : (
1638 0 : vec![
1639 0 : Self::handle_get_nblocks_request(&shard, &req, &ctx)
1640 0 : .instrument(span.clone())
1641 0 : .await
1642 0 : .map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
1643 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1644 : ],
1645 0 : span,
1646 : )
1647 : }
1648 : BatchedFeMessage::GetPage {
1649 0 : span,
1650 0 : shard,
1651 0 : pages,
1652 0 : batch_break_reason,
1653 : } => {
1654 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1655 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1656 : (
1657 : {
1658 0 : let npages = pages.len();
1659 0 : trace!(npages, "handling getpage request");
1660 0 : let res = Self::handle_get_page_at_lsn_request_batched(
1661 0 : &shard,
1662 0 : pages,
1663 0 : io_concurrency,
1664 0 : batch_break_reason,
1665 0 : &ctx,
1666 0 : )
1667 0 : .instrument(span.clone())
1668 0 : .await;
1669 0 : assert_eq!(res.len(), npages);
1670 0 : res
1671 : },
1672 0 : span,
1673 : )
1674 : }
1675 : BatchedFeMessage::DbSize {
1676 0 : span,
1677 0 : timer,
1678 0 : shard,
1679 0 : req,
1680 : } => {
1681 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1682 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1683 : (
1684 0 : vec![
1685 0 : Self::handle_db_size_request(&shard, &req, &ctx)
1686 0 : .instrument(span.clone())
1687 0 : .await
1688 0 : .map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
1689 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1690 : ],
1691 0 : span,
1692 : )
1693 : }
1694 : BatchedFeMessage::GetSlruSegment {
1695 0 : span,
1696 0 : timer,
1697 0 : shard,
1698 0 : req,
1699 : } => {
1700 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1701 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1702 : (
1703 0 : vec![
1704 0 : Self::handle_get_slru_segment_request(&shard, &req, &ctx)
1705 0 : .instrument(span.clone())
1706 0 : .await
1707 0 : .map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
1708 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1709 : ],
1710 0 : span,
1711 : )
1712 : }
1713 : #[cfg(feature = "testing")]
1714 : BatchedFeMessage::Test {
1715 0 : span,
1716 0 : shard,
1717 0 : requests,
1718 : } => {
1719 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1720 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1721 : (
1722 : {
1723 0 : let npages = requests.len();
1724 0 : trace!(npages, "handling getpage request");
1725 0 : let res = Self::handle_test_request_batch(&shard, requests, &ctx)
1726 0 : .instrument(span.clone())
1727 0 : .await;
1728 0 : assert_eq!(res.len(), npages);
1729 0 : res
1730 : },
1731 0 : span,
1732 : )
1733 : }
1734 0 : BatchedFeMessage::RespondError { span, error } => {
1735 : // We've already decided to respond with an error, so we don't need to
1736 : // call the handler.
1737 0 : (vec![Err(error)], span)
1738 : }
1739 : })
1740 0 : }
1741 :
1742 : /// Pagestream sub-protocol handler.
1743 : ///
1744 : /// It is a simple request-response protocol inside a COPYBOTH session.
1745 : ///
1746 : /// # Coding Discipline
1747 : ///
1748 : /// Coding discipline within this function: all interaction with the `pgb` connection
1749 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1750 : /// This is so that we can shutdown page_service quickly.
1751 : #[instrument(skip_all)]
1752 : async fn handle_pagerequests<IO>(
1753 : &mut self,
1754 : pgb: &mut PostgresBackend<IO>,
1755 : tenant_id: TenantId,
1756 : timeline_id: TimelineId,
1757 : protocol_version: PagestreamProtocolVersion,
1758 : ctx: RequestContext,
1759 : ) -> Result<(), QueryError>
1760 : where
1761 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1762 : {
1763 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1764 :
1765 : // switch client to COPYBOTH
1766 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1767 : tokio::select! {
1768 : biased;
1769 : _ = self.cancel.cancelled() => {
1770 : return Err(QueryError::Shutdown)
1771 : }
1772 : res = pgb.flush() => {
1773 : res?;
1774 : }
1775 : }
1776 :
1777 : let io_concurrency = IoConcurrency::spawn_from_conf(
1778 : self.get_vectored_concurrent_io,
1779 : match self.gate_guard.try_clone() {
1780 : Ok(guard) => guard,
1781 : Err(_) => {
1782 : info!("shutdown request received in page handler");
1783 : return Err(QueryError::Shutdown);
1784 : }
1785 : },
1786 : );
1787 :
1788 : let pgb_reader = pgb
1789 : .split()
1790 : .context("implementation error: split pgb into reader and writer")?;
1791 :
1792 : let timeline_handles = self
1793 : .timeline_handles
1794 : .take()
1795 : .expect("implementation error: timeline_handles should not be locked");
1796 :
1797 : let request_span = info_span!("request");
1798 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1799 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1800 : self.handle_pagerequests_pipelined(
1801 : pgb,
1802 : pgb_reader,
1803 : tenant_id,
1804 : timeline_id,
1805 : timeline_handles,
1806 : request_span,
1807 : pipelining_config,
1808 : protocol_version,
1809 : io_concurrency,
1810 : &ctx,
1811 : )
1812 : .await
1813 : }
1814 : PageServicePipeliningConfig::Serial => {
1815 : self.handle_pagerequests_serial(
1816 : pgb,
1817 : pgb_reader,
1818 : tenant_id,
1819 : timeline_id,
1820 : timeline_handles,
1821 : request_span,
1822 : protocol_version,
1823 : io_concurrency,
1824 : &ctx,
1825 : )
1826 : .await
1827 : }
1828 : };
1829 :
1830 : debug!("pagestream subprotocol shut down cleanly");
1831 :
1832 : pgb.unsplit(pgb_reader)
1833 : .context("implementation error: unsplit pgb")?;
1834 :
1835 : let replaced = self.timeline_handles.replace(timeline_handles);
1836 : assert!(replaced.is_none());
1837 :
1838 : result
1839 : }
1840 :
1841 : #[allow(clippy::too_many_arguments)]
1842 0 : async fn handle_pagerequests_serial<IO>(
1843 0 : &mut self,
1844 0 : pgb_writer: &mut PostgresBackend<IO>,
1845 0 : mut pgb_reader: PostgresBackendReader<IO>,
1846 0 : tenant_id: TenantId,
1847 0 : timeline_id: TimelineId,
1848 0 : mut timeline_handles: TimelineHandles,
1849 0 : request_span: Span,
1850 0 : protocol_version: PagestreamProtocolVersion,
1851 0 : io_concurrency: IoConcurrency,
1852 0 : ctx: &RequestContext,
1853 0 : ) -> (
1854 0 : (PostgresBackendReader<IO>, TimelineHandles),
1855 0 : Result<(), QueryError>,
1856 0 : )
1857 0 : where
1858 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1859 0 : {
1860 0 : let cancel = self.cancel.clone();
1861 :
1862 0 : let err = loop {
1863 0 : let msg = Self::pagestream_read_message(
1864 0 : &mut pgb_reader,
1865 0 : tenant_id,
1866 0 : timeline_id,
1867 0 : &mut timeline_handles,
1868 0 : &self.perf_span_fields,
1869 0 : &cancel,
1870 0 : ctx,
1871 0 : protocol_version,
1872 0 : request_span.clone(),
1873 0 : )
1874 0 : .await;
1875 0 : let msg = match msg {
1876 0 : Ok(msg) => msg,
1877 0 : Err(e) => break e,
1878 : };
1879 0 : let msg = match msg {
1880 0 : Some(msg) => msg,
1881 : None => {
1882 0 : debug!("pagestream subprotocol end observed");
1883 0 : return ((pgb_reader, timeline_handles), Ok(()));
1884 : }
1885 : };
1886 :
1887 0 : let result = self
1888 0 : .pagestream_handle_batched_message(
1889 0 : pgb_writer,
1890 0 : msg,
1891 0 : io_concurrency.clone(),
1892 0 : &cancel,
1893 0 : protocol_version,
1894 0 : ctx,
1895 0 : )
1896 0 : .await;
1897 0 : match result {
1898 0 : Ok(()) => {}
1899 0 : Err(e) => break e,
1900 : }
1901 : };
1902 0 : ((pgb_reader, timeline_handles), Err(err))
1903 0 : }
1904 :
1905 : /// # Cancel-Safety
1906 : ///
1907 : /// May leak tokio tasks if not polled to completion.
1908 : #[allow(clippy::too_many_arguments)]
1909 0 : async fn handle_pagerequests_pipelined<IO>(
1910 0 : &mut self,
1911 0 : pgb_writer: &mut PostgresBackend<IO>,
1912 0 : pgb_reader: PostgresBackendReader<IO>,
1913 0 : tenant_id: TenantId,
1914 0 : timeline_id: TimelineId,
1915 0 : mut timeline_handles: TimelineHandles,
1916 0 : request_span: Span,
1917 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1918 0 : protocol_version: PagestreamProtocolVersion,
1919 0 : io_concurrency: IoConcurrency,
1920 0 : ctx: &RequestContext,
1921 0 : ) -> (
1922 0 : (PostgresBackendReader<IO>, TimelineHandles),
1923 0 : Result<(), QueryError>,
1924 0 : )
1925 0 : where
1926 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1927 0 : {
1928 : //
1929 : // Pipelined pagestream handling consists of
1930 : // - a Batcher that reads requests off the wire and
1931 : // and batches them if possible,
1932 : // - an Executor that processes the batched requests.
1933 : //
1934 : // The batch is built up inside an `spsc_fold` channel,
1935 : // shared betwen Batcher (Sender) and Executor (Receiver).
1936 : //
1937 : // The Batcher continously folds client requests into the batch,
1938 : // while the Executor can at any time take out what's in the batch
1939 : // in order to process it.
1940 : // This means the next batch builds up while the Executor
1941 : // executes the last batch.
1942 : //
1943 : // CANCELLATION
1944 : //
1945 : // We run both Batcher and Executor futures to completion before
1946 : // returning from this function.
1947 : //
1948 : // If Executor exits first, it signals cancellation to the Batcher
1949 : // via a CancellationToken that is child of `self.cancel`.
1950 : // If Batcher exits first, it signals cancellation to the Executor
1951 : // by dropping the spsc_fold channel Sender.
1952 : //
1953 : // CLEAN SHUTDOWN
1954 : //
1955 : // Clean shutdown means that the client ends the COPYBOTH session.
1956 : // In response to such a client message, the Batcher exits.
1957 : // The Executor continues to run, draining the spsc_fold channel.
1958 : // Once drained, the spsc_fold recv will fail with a distinct error
1959 : // indicating that the sender disconnected.
1960 : // The Executor exits with Ok(()) in response to that error.
1961 : //
1962 : // Server initiated shutdown is not clean shutdown, but instead
1963 : // is an error Err(QueryError::Shutdown) that is propagated through
1964 : // error propagation.
1965 : //
1966 : // ERROR PROPAGATION
1967 : //
1968 : // When the Batcher encounter an error, it sends it as a value
1969 : // through the spsc_fold channel and exits afterwards.
1970 : // When the Executor observes such an error in the channel,
1971 : // it exits returning that error value.
1972 : //
1973 : // This design ensures that the Executor stage will still process
1974 : // the batch that was in flight when the Batcher encountered an error,
1975 : // thereby beahving identical to a serial implementation.
1976 :
1977 : let PageServicePipeliningConfigPipelined {
1978 0 : max_batch_size,
1979 0 : execution,
1980 0 : batching: batching_strategy,
1981 0 : } = pipelining_config;
1982 :
1983 : // Macro to _define_ a pipeline stage.
1984 : macro_rules! pipeline_stage {
1985 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1986 : let cancel: CancellationToken = $cancel;
1987 : let stage_fut = $make_fut(cancel.clone());
1988 0 : async move {
1989 0 : scopeguard::defer! {
1990 : debug!("exiting");
1991 : }
1992 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1993 0 : .await
1994 0 : }
1995 : .instrument(tracing::info_span!($name))
1996 : }};
1997 : }
1998 :
1999 : //
2000 : // Batcher
2001 : //
2002 :
2003 0 : let perf_span_fields = self.perf_span_fields.clone();
2004 :
2005 0 : let cancel_batcher = self.cancel.child_token();
2006 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
2007 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
2008 0 : let ctx = ctx.attached_child();
2009 0 : async move {
2010 0 : let mut pgb_reader = pgb_reader;
2011 0 : let mut exit = false;
2012 0 : while !exit {
2013 0 : let read_res = Self::pagestream_read_message(
2014 0 : &mut pgb_reader,
2015 0 : tenant_id,
2016 0 : timeline_id,
2017 0 : &mut timeline_handles,
2018 0 : &perf_span_fields,
2019 0 : &cancel_batcher,
2020 0 : &ctx,
2021 0 : protocol_version,
2022 0 : request_span.clone(),
2023 0 : )
2024 0 : .await;
2025 0 : let Some(read_res) = read_res.transpose() else {
2026 0 : debug!("client-initiated shutdown");
2027 0 : break;
2028 : };
2029 0 : exit |= read_res.is_err();
2030 0 : let could_send = batch_tx
2031 0 : .send(read_res, |batch, res| {
2032 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
2033 0 : })
2034 0 : .await;
2035 0 : exit |= could_send.is_err();
2036 : }
2037 0 : (pgb_reader, timeline_handles)
2038 0 : }
2039 0 : });
2040 :
2041 : //
2042 : // Executor
2043 : //
2044 :
2045 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
2046 0 : let ctx = ctx.attached_child();
2047 0 : async move {
2048 0 : let _cancel_batcher = cancel_batcher.drop_guard();
2049 : loop {
2050 0 : let maybe_batch = batch_rx.recv().await;
2051 0 : let batch = match maybe_batch {
2052 0 : Ok(batch) => batch,
2053 : Err(spsc_fold::RecvError::SenderGone) => {
2054 0 : debug!("upstream gone");
2055 0 : return Ok(());
2056 : }
2057 : };
2058 0 : let mut batch = match batch {
2059 0 : Ok(batch) => batch,
2060 0 : Err(e) => {
2061 0 : return Err(e);
2062 : }
2063 : };
2064 :
2065 : if let BatchedFeMessage::GetPage {
2066 0 : pages,
2067 : span: _,
2068 : shard: _,
2069 : batch_break_reason: _,
2070 0 : } = &mut batch
2071 : {
2072 0 : for req in pages {
2073 0 : req.batch_wait_ctx.take();
2074 0 : }
2075 0 : }
2076 :
2077 0 : self.pagestream_handle_batched_message(
2078 0 : pgb_writer,
2079 0 : batch,
2080 0 : io_concurrency.clone(),
2081 0 : &cancel,
2082 0 : protocol_version,
2083 0 : &ctx,
2084 0 : )
2085 0 : .await?;
2086 : }
2087 0 : }
2088 0 : });
2089 :
2090 : //
2091 : // Execute the stages.
2092 : //
2093 :
2094 0 : match execution {
2095 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
2096 0 : tokio::join!(batcher, executor)
2097 : }
2098 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
2099 : // These tasks are not tracked anywhere.
2100 0 : let read_messages_task = tokio::spawn(batcher);
2101 0 : let (read_messages_task_res, executor_res_) =
2102 0 : tokio::join!(read_messages_task, executor,);
2103 0 : (
2104 0 : read_messages_task_res.expect("propagated panic from read_messages"),
2105 0 : executor_res_,
2106 0 : )
2107 : }
2108 : }
2109 0 : }
2110 :
2111 : /// Helper function to handle the LSN from client request.
2112 : ///
2113 : /// Each GetPage (and Exists and Nblocks) request includes information about
2114 : /// which version of the page is being requested. The primary compute node
2115 : /// will always request the latest page version, by setting 'request_lsn' to
2116 : /// the last inserted or flushed WAL position, while a standby will request
2117 : /// a version at the LSN that it's currently caught up to.
2118 : ///
2119 : /// In either case, if the page server hasn't received the WAL up to the
2120 : /// requested LSN yet, we will wait for it to arrive. The return value is
2121 : /// the LSN that should be used to look up the page versions.
2122 : ///
2123 : /// In addition to the request LSN, each request carries another LSN,
2124 : /// 'not_modified_since', which is a hint to the pageserver that the client
2125 : /// knows that the page has not been modified between 'not_modified_since'
2126 : /// and the request LSN. This allows skipping the wait, as long as the WAL
2127 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
2128 : /// information about when the page was modified, it will use
2129 : /// not_modified_since == lsn. If the client lies and sends a too low
2130 : /// not_modified_hint such that there are in fact later page versions, the
2131 : /// behavior is undefined: the pageserver may return any of the page versions
2132 : /// or an error.
2133 0 : async fn wait_or_get_last_lsn(
2134 0 : timeline: &Timeline,
2135 0 : request_lsn: Lsn,
2136 0 : not_modified_since: Lsn,
2137 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2138 0 : ctx: &RequestContext,
2139 0 : ) -> Result<Lsn, PageStreamError> {
2140 0 : let last_record_lsn = timeline.get_last_record_lsn();
2141 0 : let effective_request_lsn = Self::effective_request_lsn(
2142 0 : timeline,
2143 0 : last_record_lsn,
2144 0 : request_lsn,
2145 0 : not_modified_since,
2146 0 : latest_gc_cutoff_lsn,
2147 0 : )?;
2148 :
2149 0 : if effective_request_lsn > last_record_lsn {
2150 0 : timeline
2151 0 : .wait_lsn(
2152 0 : not_modified_since,
2153 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2154 0 : timeline::WaitLsnTimeout::Default,
2155 0 : ctx,
2156 0 : )
2157 0 : .await?;
2158 :
2159 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
2160 : // record LSN. (Or close enough for our purposes; the last-record LSN can
2161 : // advance immediately after we return anyway)
2162 0 : }
2163 :
2164 0 : Ok(effective_request_lsn)
2165 0 : }
2166 :
2167 0 : fn effective_request_lsn(
2168 0 : timeline: &Timeline,
2169 0 : last_record_lsn: Lsn,
2170 0 : request_lsn: Lsn,
2171 0 : not_modified_since: Lsn,
2172 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2173 0 : ) -> Result<Lsn, PageStreamError> {
2174 : // Sanity check the request
2175 0 : if request_lsn < not_modified_since {
2176 0 : return Err(PageStreamError::BadRequest(
2177 0 : format!(
2178 0 : "invalid request with request LSN {request_lsn} and not_modified_since {not_modified_since}",
2179 0 : )
2180 0 : .into(),
2181 0 : ));
2182 0 : }
2183 :
2184 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2185 0 : if request_lsn == Lsn::INVALID {
2186 0 : return Err(PageStreamError::BadRequest(
2187 0 : "invalid LSN(0) in request".into(),
2188 0 : ));
2189 0 : }
2190 :
2191 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2192 : //
2193 : // We may have older data available, but we make a best effort to detect this case and return an error,
2194 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2195 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2196 0 : let gc_info = &timeline.gc_info.read().unwrap();
2197 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2198 0 : return Err(
2199 0 : PageStreamError::BadRequest(format!(
2200 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2201 0 : request_lsn, **latest_gc_cutoff_lsn
2202 0 : ).into())
2203 0 : );
2204 0 : }
2205 0 : }
2206 :
2207 0 : if not_modified_since > last_record_lsn {
2208 0 : Ok(not_modified_since)
2209 : } else {
2210 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2211 : // here instead. That would give the same result, since we know that there
2212 : // haven't been any modifications since 'not_modified_since'. Using an older
2213 : // LSN might be faster, because that could allow skipping recent layers when
2214 : // finding the page. However, we have historically used 'last_record_lsn', so
2215 : // stick to that for now.
2216 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2217 : }
2218 0 : }
2219 :
2220 : /// Handles the lsn lease request.
2221 : /// If a lease cannot be obtained, the client will receive NULL.
2222 : #[instrument(skip_all, fields(shard_id, %lsn))]
2223 : async fn handle_make_lsn_lease<IO>(
2224 : &mut self,
2225 : pgb: &mut PostgresBackend<IO>,
2226 : tenant_shard_id: TenantShardId,
2227 : timeline_id: TimelineId,
2228 : lsn: Lsn,
2229 : ctx: &RequestContext,
2230 : ) -> Result<(), QueryError>
2231 : where
2232 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2233 : {
2234 : let timeline = self
2235 : .timeline_handles
2236 : .as_mut()
2237 : .unwrap()
2238 : .get(
2239 : tenant_shard_id.tenant_id,
2240 : timeline_id,
2241 : ShardSelector::Known(tenant_shard_id.to_index()),
2242 : )
2243 : .await?;
2244 : set_tracing_field_shard_id(&timeline);
2245 :
2246 : let lease = timeline
2247 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2248 0 : .inspect_err(|e| {
2249 0 : warn!("{e}");
2250 0 : })
2251 : .ok();
2252 0 : let valid_until_str = lease.map(|l| {
2253 0 : l.valid_until
2254 0 : .duration_since(SystemTime::UNIX_EPOCH)
2255 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2256 0 : .as_millis()
2257 0 : .to_string()
2258 0 : });
2259 :
2260 : info!(
2261 : "acquired lease for {} until {}",
2262 : lsn,
2263 : valid_until_str.as_deref().unwrap_or("<unknown>")
2264 : );
2265 :
2266 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2267 :
2268 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2269 : b"valid_until",
2270 : )]))?
2271 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2272 :
2273 : Ok(())
2274 : }
2275 :
2276 : #[instrument(skip_all, fields(shard_id))]
2277 : async fn handle_get_rel_exists_request(
2278 : timeline: &Timeline,
2279 : req: &PagestreamExistsRequest,
2280 : ctx: &RequestContext,
2281 : ) -> Result<PagestreamExistsResponse, PageStreamError> {
2282 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2283 : let lsn = Self::wait_or_get_last_lsn(
2284 : timeline,
2285 : req.hdr.request_lsn,
2286 : req.hdr.not_modified_since,
2287 : &latest_gc_cutoff_lsn,
2288 : ctx,
2289 : )
2290 : .await?;
2291 :
2292 : let exists = timeline
2293 : .get_rel_exists(
2294 : req.rel,
2295 : Version::LsnRange(LsnRange {
2296 : effective_lsn: lsn,
2297 : request_lsn: req.hdr.request_lsn,
2298 : }),
2299 : ctx,
2300 : )
2301 : .await?;
2302 :
2303 : Ok(PagestreamExistsResponse { req: *req, exists })
2304 : }
2305 :
2306 : #[instrument(skip_all, fields(shard_id))]
2307 : async fn handle_get_nblocks_request(
2308 : timeline: &Timeline,
2309 : req: &PagestreamNblocksRequest,
2310 : ctx: &RequestContext,
2311 : ) -> Result<PagestreamNblocksResponse, PageStreamError> {
2312 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2313 : let lsn = Self::wait_or_get_last_lsn(
2314 : timeline,
2315 : req.hdr.request_lsn,
2316 : req.hdr.not_modified_since,
2317 : &latest_gc_cutoff_lsn,
2318 : ctx,
2319 : )
2320 : .await?;
2321 :
2322 : let n_blocks = timeline
2323 : .get_rel_size(
2324 : req.rel,
2325 : Version::LsnRange(LsnRange {
2326 : effective_lsn: lsn,
2327 : request_lsn: req.hdr.request_lsn,
2328 : }),
2329 : ctx,
2330 : )
2331 : .await?;
2332 :
2333 : Ok(PagestreamNblocksResponse {
2334 : req: *req,
2335 : n_blocks,
2336 : })
2337 : }
2338 :
2339 : #[instrument(skip_all, fields(shard_id))]
2340 : async fn handle_db_size_request(
2341 : timeline: &Timeline,
2342 : req: &PagestreamDbSizeRequest,
2343 : ctx: &RequestContext,
2344 : ) -> Result<PagestreamDbSizeResponse, PageStreamError> {
2345 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2346 : let lsn = Self::wait_or_get_last_lsn(
2347 : timeline,
2348 : req.hdr.request_lsn,
2349 : req.hdr.not_modified_since,
2350 : &latest_gc_cutoff_lsn,
2351 : ctx,
2352 : )
2353 : .await?;
2354 :
2355 : let total_blocks = timeline
2356 : .get_db_size(
2357 : DEFAULTTABLESPACE_OID,
2358 : req.dbnode,
2359 : Version::LsnRange(LsnRange {
2360 : effective_lsn: lsn,
2361 : request_lsn: req.hdr.request_lsn,
2362 : }),
2363 : ctx,
2364 : )
2365 : .await?;
2366 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2367 :
2368 : Ok(PagestreamDbSizeResponse { req: *req, db_size })
2369 : }
2370 :
2371 : #[instrument(skip_all)]
2372 : async fn handle_get_page_at_lsn_request_batched(
2373 : timeline: &Timeline,
2374 : requests: SmallVec<[BatchedGetPageRequest; 1]>,
2375 : io_concurrency: IoConcurrency,
2376 : batch_break_reason: GetPageBatchBreakReason,
2377 : ctx: &RequestContext,
2378 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2379 : {
2380 : debug_assert_current_span_has_tenant_and_timeline_id();
2381 :
2382 : timeline
2383 : .query_metrics
2384 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2385 :
2386 : // If a page trace is running, submit an event for this request.
2387 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2388 : let time = SystemTime::now();
2389 : for batch in &requests {
2390 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2391 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2392 : _ = page_trace.try_send(PageTraceEvent {
2393 : key,
2394 : effective_lsn: batch.lsn_range.effective_lsn,
2395 : time,
2396 : });
2397 : }
2398 : }
2399 :
2400 : // If any request in the batch needs to wait for LSN, then do so now.
2401 : let mut perf_instrument = false;
2402 : let max_effective_lsn = requests
2403 : .iter()
2404 0 : .map(|req| {
2405 0 : if req.ctx.has_perf_span() {
2406 0 : perf_instrument = true;
2407 0 : }
2408 :
2409 0 : req.lsn_range.effective_lsn
2410 0 : })
2411 : .max()
2412 : .expect("batch is never empty");
2413 :
2414 : let ctx = match perf_instrument {
2415 : true => RequestContextBuilder::from(ctx)
2416 0 : .root_perf_span(|| {
2417 0 : info_span!(
2418 : target: PERF_TRACE_TARGET,
2419 : "GET_VECTORED",
2420 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2421 : timeline_id = %timeline.timeline_id,
2422 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2423 : %max_effective_lsn
2424 : )
2425 0 : })
2426 : .attached_child(),
2427 : false => ctx.attached_child(),
2428 : };
2429 :
2430 : let last_record_lsn = timeline.get_last_record_lsn();
2431 : if max_effective_lsn > last_record_lsn {
2432 : if let Err(e) = timeline
2433 : .wait_lsn(
2434 : max_effective_lsn,
2435 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2436 : timeline::WaitLsnTimeout::Default,
2437 : &ctx,
2438 : )
2439 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2440 0 : info_span!(
2441 : target: PERF_TRACE_TARGET,
2442 0 : parent: current_perf_span,
2443 : "WAIT_LSN",
2444 : )
2445 0 : })
2446 : .await
2447 : {
2448 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2449 0 : Err(BatchedPageStreamError {
2450 0 : err: PageStreamError::from(e.clone()),
2451 0 : req: req.req.hdr,
2452 0 : })
2453 0 : }));
2454 : }
2455 : }
2456 :
2457 : let results = timeline
2458 : .get_rel_page_at_lsn_batched(
2459 0 : requests.iter().map(|p| {
2460 0 : (
2461 0 : &p.req.rel,
2462 0 : &p.req.blkno,
2463 0 : p.lsn_range,
2464 0 : p.ctx.attached_child(),
2465 0 : )
2466 0 : }),
2467 : io_concurrency,
2468 : &ctx,
2469 : )
2470 : .await;
2471 : assert_eq!(results.len(), requests.len());
2472 :
2473 : // TODO: avoid creating the new Vec here
2474 : Vec::from_iter(
2475 : requests
2476 : .into_iter()
2477 : .zip(results.into_iter())
2478 0 : .map(|(req, res)| {
2479 0 : res.map(|page| {
2480 0 : (
2481 0 : PagestreamBeMessage::GetPage(
2482 0 : pagestream_api::PagestreamGetPageResponse { req: req.req, page },
2483 0 : ),
2484 0 : req.timer,
2485 0 : req.ctx,
2486 0 : )
2487 0 : })
2488 0 : .map_err(|e| BatchedPageStreamError {
2489 0 : err: PageStreamError::from(e),
2490 0 : req: req.req.hdr,
2491 0 : })
2492 0 : }),
2493 : )
2494 : }
2495 :
2496 : #[instrument(skip_all, fields(shard_id))]
2497 : async fn handle_get_slru_segment_request(
2498 : timeline: &Timeline,
2499 : req: &PagestreamGetSlruSegmentRequest,
2500 : ctx: &RequestContext,
2501 : ) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
2502 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2503 : let lsn = Self::wait_or_get_last_lsn(
2504 : timeline,
2505 : req.hdr.request_lsn,
2506 : req.hdr.not_modified_since,
2507 : &latest_gc_cutoff_lsn,
2508 : ctx,
2509 : )
2510 : .await?;
2511 :
2512 : let kind = SlruKind::from_repr(req.kind)
2513 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2514 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2515 :
2516 : Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
2517 : }
2518 :
2519 : // NB: this impl mimics what we do for batched getpage requests.
2520 : #[cfg(feature = "testing")]
2521 : #[instrument(skip_all, fields(shard_id))]
2522 : async fn handle_test_request_batch(
2523 : timeline: &Timeline,
2524 : requests: Vec<BatchedTestRequest>,
2525 : _ctx: &RequestContext,
2526 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2527 : {
2528 : // real requests would do something with the timeline
2529 : let mut results = Vec::with_capacity(requests.len());
2530 : for _req in requests.iter() {
2531 : tokio::task::yield_now().await;
2532 :
2533 : results.push({
2534 : if timeline.cancel.is_cancelled() {
2535 : Err(PageReconstructError::Cancelled)
2536 : } else {
2537 : Ok(())
2538 : }
2539 : });
2540 : }
2541 :
2542 : // TODO: avoid creating the new Vec here
2543 : Vec::from_iter(
2544 : requests
2545 : .into_iter()
2546 : .zip(results.into_iter())
2547 0 : .map(|(req, res)| {
2548 0 : res.map(|()| {
2549 0 : (
2550 0 : PagestreamBeMessage::Test(pagestream_api::PagestreamTestResponse {
2551 0 : req: req.req.clone(),
2552 0 : }),
2553 0 : req.timer,
2554 0 : RequestContext::new(
2555 0 : TaskKind::PageRequestHandler,
2556 0 : DownloadBehavior::Warn,
2557 0 : ),
2558 0 : )
2559 0 : })
2560 0 : .map_err(|e| BatchedPageStreamError {
2561 0 : err: PageStreamError::from(e),
2562 0 : req: req.req.hdr,
2563 0 : })
2564 0 : }),
2565 : )
2566 : }
2567 :
2568 : /// Note on "fullbackup":
2569 : /// Full basebackups should only be used for debugging purposes.
2570 : /// Originally, it was introduced to enable breaking storage format changes,
2571 : /// but that is not applicable anymore.
2572 : ///
2573 : /// # Coding Discipline
2574 : ///
2575 : /// Coding discipline within this function: all interaction with the `pgb` connection
2576 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2577 : /// This is so that we can shutdown page_service quickly.
2578 : ///
2579 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2580 : /// to connection cancellation.
2581 : #[allow(clippy::too_many_arguments)]
2582 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2583 : async fn handle_basebackup_request<IO>(
2584 : &mut self,
2585 : pgb: &mut PostgresBackend<IO>,
2586 : tenant_id: TenantId,
2587 : timeline_id: TimelineId,
2588 : lsn: Option<Lsn>,
2589 : prev_lsn: Option<Lsn>,
2590 : full_backup: bool,
2591 : gzip: bool,
2592 : replica: bool,
2593 : ctx: &RequestContext,
2594 : ) -> Result<(), QueryError>
2595 : where
2596 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2597 : {
2598 : let started = std::time::Instant::now();
2599 :
2600 : let timeline = self
2601 : .timeline_handles
2602 : .as_mut()
2603 : .unwrap()
2604 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2605 : .await?;
2606 : set_tracing_field_shard_id(&timeline);
2607 : let ctx = ctx.with_scope_timeline(&timeline);
2608 :
2609 : if timeline.is_archived() == Some(true) {
2610 : tracing::info!(
2611 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2612 : );
2613 : return Err(QueryError::NotFound("timeline is archived".into()));
2614 : }
2615 :
2616 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2617 : if let Some(lsn) = lsn {
2618 : // Backup was requested at a particular LSN. Wait for it to arrive.
2619 : info!("waiting for {}", lsn);
2620 : timeline
2621 : .wait_lsn(
2622 : lsn,
2623 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2624 : crate::tenant::timeline::WaitLsnTimeout::Default,
2625 : &ctx,
2626 : )
2627 : .await?;
2628 : timeline
2629 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2630 : .context("invalid basebackup lsn")?;
2631 : }
2632 :
2633 : let lsn_awaited_after = started.elapsed();
2634 :
2635 : // switch client to COPYOUT
2636 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2637 : .map_err(QueryError::Disconnected)?;
2638 : self.flush_cancellable(pgb, &self.cancel).await?;
2639 :
2640 : let mut from_cache = false;
2641 :
2642 : // Send a tarball of the latest layer on the timeline. Compress if not
2643 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2644 : if full_backup {
2645 : let mut writer = pgb.copyout_writer();
2646 : basebackup::send_basebackup_tarball(
2647 : &mut writer,
2648 : &timeline,
2649 : lsn,
2650 : prev_lsn,
2651 : full_backup,
2652 : replica,
2653 : None,
2654 : &ctx,
2655 : )
2656 : .await?;
2657 : } else {
2658 : let mut writer = BufWriter::new(pgb.copyout_writer());
2659 :
2660 : let cached = timeline
2661 : .get_cached_basebackup_if_enabled(lsn, prev_lsn, full_backup, replica, gzip)
2662 : .await;
2663 :
2664 : if let Some(mut cached) = cached {
2665 : from_cache = true;
2666 : tokio::io::copy(&mut cached, &mut writer)
2667 : .await
2668 0 : .map_err(|err| {
2669 0 : BasebackupError::Client(err, "handle_basebackup_request,cached,copy")
2670 0 : })?;
2671 : } else {
2672 : basebackup::send_basebackup_tarball(
2673 : &mut writer,
2674 : &timeline,
2675 : lsn,
2676 : prev_lsn,
2677 : full_backup,
2678 : replica,
2679 : // NB: using fast compression because it's on the critical path for compute
2680 : // startup. For an empty database, we get <100KB with this method. The
2681 : // Level::Best compression method gives us <20KB, but maybe we should add
2682 : // basebackup caching on compute shutdown first.
2683 : gzip.then_some(async_compression::Level::Fastest),
2684 : &ctx,
2685 : )
2686 : .await?;
2687 : }
2688 : writer
2689 : .flush()
2690 : .await
2691 0 : .map_err(|err| BasebackupError::Client(err, "handle_basebackup_request,flush"))?;
2692 : }
2693 :
2694 : pgb.write_message_noflush(&BeMessage::CopyDone)
2695 : .map_err(QueryError::Disconnected)?;
2696 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2697 :
2698 : let basebackup_after = started
2699 : .elapsed()
2700 : .checked_sub(lsn_awaited_after)
2701 : .unwrap_or(Duration::ZERO);
2702 :
2703 : info!(
2704 : lsn_await_millis = lsn_awaited_after.as_millis(),
2705 : basebackup_millis = basebackup_after.as_millis(),
2706 : %from_cache,
2707 : "basebackup complete"
2708 : );
2709 :
2710 : Ok(())
2711 : }
2712 :
2713 : // when accessing management api supply None as an argument
2714 : // when using to authorize tenant pass corresponding tenant id
2715 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2716 0 : if self.auth.is_none() {
2717 : // auth is set to Trust, nothing to check so just return ok
2718 0 : return Ok(());
2719 0 : }
2720 : // auth is some, just checked above, when auth is some
2721 : // then claims are always present because of checks during connection init
2722 : // so this expect won't trigger
2723 0 : let claims = self
2724 0 : .claims
2725 0 : .as_ref()
2726 0 : .expect("claims presence already checked");
2727 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2728 0 : }
2729 : }
2730 :
2731 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2732 : #[derive(Debug, Clone, Eq, PartialEq)]
2733 : struct BaseBackupCmd {
2734 : tenant_id: TenantId,
2735 : timeline_id: TimelineId,
2736 : lsn: Option<Lsn>,
2737 : gzip: bool,
2738 : replica: bool,
2739 : }
2740 :
2741 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2742 : #[derive(Debug, Clone, Eq, PartialEq)]
2743 : struct FullBackupCmd {
2744 : tenant_id: TenantId,
2745 : timeline_id: TimelineId,
2746 : lsn: Option<Lsn>,
2747 : prev_lsn: Option<Lsn>,
2748 : }
2749 :
2750 : /// `pagestream_v2 tenant timeline`
2751 : #[derive(Debug, Clone, Eq, PartialEq)]
2752 : struct PageStreamCmd {
2753 : tenant_id: TenantId,
2754 : timeline_id: TimelineId,
2755 : protocol_version: PagestreamProtocolVersion,
2756 : }
2757 :
2758 : /// `lease lsn tenant timeline lsn`
2759 : #[derive(Debug, Clone, Eq, PartialEq)]
2760 : struct LeaseLsnCmd {
2761 : tenant_shard_id: TenantShardId,
2762 : timeline_id: TimelineId,
2763 : lsn: Lsn,
2764 : }
2765 :
2766 : #[derive(Debug, Clone, Eq, PartialEq)]
2767 : enum PageServiceCmd {
2768 : Set,
2769 : PageStream(PageStreamCmd),
2770 : BaseBackup(BaseBackupCmd),
2771 : FullBackup(FullBackupCmd),
2772 : LeaseLsn(LeaseLsnCmd),
2773 : }
2774 :
2775 : impl PageStreamCmd {
2776 3 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2777 3 : let parameters = query.split_whitespace().collect_vec();
2778 3 : if parameters.len() != 2 {
2779 1 : bail!(
2780 1 : "invalid number of parameters for pagestream command: {}",
2781 : query
2782 : );
2783 2 : }
2784 2 : let tenant_id = TenantId::from_str(parameters[0])
2785 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2786 1 : let timeline_id = TimelineId::from_str(parameters[1])
2787 1 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2788 1 : Ok(Self {
2789 1 : tenant_id,
2790 1 : timeline_id,
2791 1 : protocol_version,
2792 1 : })
2793 3 : }
2794 : }
2795 :
2796 : impl FullBackupCmd {
2797 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2798 2 : let parameters = query.split_whitespace().collect_vec();
2799 2 : if parameters.len() < 2 || parameters.len() > 4 {
2800 0 : bail!(
2801 0 : "invalid number of parameters for basebackup command: {}",
2802 : query
2803 : );
2804 2 : }
2805 2 : let tenant_id = TenantId::from_str(parameters[0])
2806 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2807 2 : let timeline_id = TimelineId::from_str(parameters[1])
2808 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2809 : // The caller is responsible for providing correct lsn and prev_lsn.
2810 2 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2811 : Some(
2812 1 : Lsn::from_str(lsn_str)
2813 1 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2814 : )
2815 : } else {
2816 1 : None
2817 : };
2818 2 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2819 : Some(
2820 1 : Lsn::from_str(prev_lsn_str)
2821 1 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2822 : )
2823 : } else {
2824 1 : None
2825 : };
2826 2 : Ok(Self {
2827 2 : tenant_id,
2828 2 : timeline_id,
2829 2 : lsn,
2830 2 : prev_lsn,
2831 2 : })
2832 2 : }
2833 : }
2834 :
2835 : impl BaseBackupCmd {
2836 9 : fn parse(query: &str) -> anyhow::Result<Self> {
2837 9 : let parameters = query.split_whitespace().collect_vec();
2838 9 : if parameters.len() < 2 {
2839 0 : bail!(
2840 0 : "invalid number of parameters for basebackup command: {}",
2841 : query
2842 : );
2843 9 : }
2844 9 : let tenant_id = TenantId::from_str(parameters[0])
2845 9 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2846 9 : let timeline_id = TimelineId::from_str(parameters[1])
2847 9 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2848 : let lsn;
2849 : let flags_parse_from;
2850 9 : if let Some(maybe_lsn) = parameters.get(2) {
2851 8 : if *maybe_lsn == "latest" {
2852 1 : lsn = None;
2853 1 : flags_parse_from = 3;
2854 7 : } else if maybe_lsn.starts_with("--") {
2855 5 : lsn = None;
2856 5 : flags_parse_from = 2;
2857 5 : } else {
2858 : lsn = Some(
2859 2 : Lsn::from_str(maybe_lsn)
2860 2 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2861 : );
2862 2 : flags_parse_from = 3;
2863 : }
2864 1 : } else {
2865 1 : lsn = None;
2866 1 : flags_parse_from = 2;
2867 1 : }
2868 :
2869 9 : let mut gzip = false;
2870 9 : let mut replica = false;
2871 :
2872 11 : for ¶m in ¶meters[flags_parse_from..] {
2873 11 : match param {
2874 11 : "--gzip" => {
2875 7 : if gzip {
2876 1 : bail!("duplicate parameter for basebackup command: {param}")
2877 6 : }
2878 6 : gzip = true
2879 : }
2880 4 : "--replica" => {
2881 2 : if replica {
2882 0 : bail!("duplicate parameter for basebackup command: {param}")
2883 2 : }
2884 2 : replica = true
2885 : }
2886 2 : _ => bail!("invalid parameter for basebackup command: {param}"),
2887 : }
2888 : }
2889 6 : Ok(Self {
2890 6 : tenant_id,
2891 6 : timeline_id,
2892 6 : lsn,
2893 6 : gzip,
2894 6 : replica,
2895 6 : })
2896 9 : }
2897 : }
2898 :
2899 : impl LeaseLsnCmd {
2900 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2901 2 : let parameters = query.split_whitespace().collect_vec();
2902 2 : if parameters.len() != 3 {
2903 0 : bail!(
2904 0 : "invalid number of parameters for lease lsn command: {}",
2905 : query
2906 : );
2907 2 : }
2908 2 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2909 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2910 2 : let timeline_id = TimelineId::from_str(parameters[1])
2911 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2912 2 : let lsn = Lsn::from_str(parameters[2])
2913 2 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2914 2 : Ok(Self {
2915 2 : tenant_shard_id,
2916 2 : timeline_id,
2917 2 : lsn,
2918 2 : })
2919 2 : }
2920 : }
2921 :
2922 : impl PageServiceCmd {
2923 21 : fn parse(query: &str) -> anyhow::Result<Self> {
2924 21 : let query = query.trim();
2925 21 : let Some((cmd, other)) = query.split_once(' ') else {
2926 2 : bail!("cannot parse query: {query}")
2927 : };
2928 19 : match cmd.to_ascii_lowercase().as_str() {
2929 19 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2930 3 : other,
2931 3 : PagestreamProtocolVersion::V2,
2932 2 : )?)),
2933 16 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2934 0 : other,
2935 0 : PagestreamProtocolVersion::V3,
2936 0 : )?)),
2937 16 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2938 7 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2939 5 : "lease" => {
2940 3 : let Some((cmd2, other)) = other.split_once(' ') else {
2941 0 : bail!("invalid lease command: {cmd}");
2942 : };
2943 3 : let cmd2 = cmd2.to_ascii_lowercase();
2944 3 : if cmd2 == "lsn" {
2945 2 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2946 : } else {
2947 1 : bail!("invalid lease command: {cmd}");
2948 : }
2949 : }
2950 2 : "set" => Ok(Self::Set),
2951 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2952 : }
2953 21 : }
2954 : }
2955 :
2956 : /// Parse the startup options from the postgres wire protocol startup packet.
2957 : ///
2958 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
2959 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
2960 : /// whether all options are successfully parsed. There could be duplicates in the options
2961 : /// if the caller passed such parameters.
2962 7 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
2963 7 : let mut parsing_config = false;
2964 7 : let mut has_error = false;
2965 7 : let mut config = Vec::new();
2966 16 : for item in options.split_whitespace() {
2967 16 : if item == "-c" {
2968 9 : if !parsing_config {
2969 8 : parsing_config = true;
2970 8 : } else {
2971 : // "-c" followed with another "-c"
2972 1 : tracing::warn!("failed to parse the startup options: {options}");
2973 1 : has_error = true;
2974 1 : break;
2975 : }
2976 7 : } else if item.starts_with("-c") || parsing_config {
2977 7 : let Some((mut key, value)) = item.split_once('=') else {
2978 : // "-c" followed with an invalid option
2979 1 : tracing::warn!("failed to parse the startup options: {options}");
2980 1 : has_error = true;
2981 1 : break;
2982 : };
2983 6 : if !parsing_config {
2984 : // Parse "-coptions=X"
2985 1 : let Some(stripped_key) = key.strip_prefix("-c") else {
2986 0 : tracing::warn!("failed to parse the startup options: {options}");
2987 0 : has_error = true;
2988 0 : break;
2989 : };
2990 1 : key = stripped_key;
2991 5 : }
2992 6 : config.push((key.to_string(), value.to_string()));
2993 6 : parsing_config = false;
2994 : } else {
2995 0 : tracing::warn!("failed to parse the startup options: {options}");
2996 0 : has_error = true;
2997 0 : break;
2998 : }
2999 : }
3000 7 : if parsing_config {
3001 : // "-c" without the option
3002 3 : tracing::warn!("failed to parse the startup options: {options}");
3003 3 : has_error = true;
3004 4 : }
3005 7 : (config, has_error)
3006 7 : }
3007 :
3008 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
3009 : where
3010 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
3011 : {
3012 0 : fn check_auth_jwt(
3013 0 : &mut self,
3014 0 : _pgb: &mut PostgresBackend<IO>,
3015 0 : jwt_response: &[u8],
3016 0 : ) -> Result<(), QueryError> {
3017 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
3018 : // which requires auth to be present
3019 0 : let data: TokenData<Claims> = self
3020 0 : .auth
3021 0 : .as_ref()
3022 0 : .unwrap()
3023 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
3024 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
3025 :
3026 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
3027 0 : return Err(QueryError::Unauthorized(
3028 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
3029 0 : ));
3030 0 : }
3031 :
3032 0 : debug!(
3033 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
3034 : data.claims.scope, data.claims.tenant_id,
3035 : );
3036 :
3037 0 : self.claims = Some(data.claims);
3038 0 : Ok(())
3039 0 : }
3040 :
3041 0 : fn startup(
3042 0 : &mut self,
3043 0 : _pgb: &mut PostgresBackend<IO>,
3044 0 : sm: &FeStartupPacket,
3045 0 : ) -> Result<(), QueryError> {
3046 0 : fail::fail_point!("ps::connection-start::startup-packet");
3047 :
3048 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
3049 0 : if let Some(app_name) = params.get("application_name") {
3050 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
3051 0 : Span::current().record("application_name", field::display(app_name));
3052 0 : }
3053 0 : if let Some(options) = params.get("options") {
3054 0 : let (config, _) = parse_options(options);
3055 0 : for (key, value) in config {
3056 0 : if key == "neon.compute_mode" {
3057 0 : self.perf_span_fields.compute_mode = Some(value.clone());
3058 0 : Span::current().record("compute_mode", field::display(value));
3059 0 : }
3060 : }
3061 0 : }
3062 0 : };
3063 :
3064 0 : Ok(())
3065 0 : }
3066 :
3067 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
3068 : async fn process_query(
3069 : &mut self,
3070 : pgb: &mut PostgresBackend<IO>,
3071 : query_string: &str,
3072 : ) -> Result<(), QueryError> {
3073 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
3074 0 : info!("Hit failpoint for bad connection");
3075 0 : Err(QueryError::SimulatedConnectionError)
3076 0 : });
3077 :
3078 : fail::fail_point!("ps::connection-start::process-query");
3079 :
3080 : let ctx = self.connection_ctx.attached_child();
3081 : debug!("process query {query_string}");
3082 : let query = PageServiceCmd::parse(query_string)?;
3083 : match query {
3084 : PageServiceCmd::PageStream(PageStreamCmd {
3085 : tenant_id,
3086 : timeline_id,
3087 : protocol_version,
3088 : }) => {
3089 : tracing::Span::current()
3090 : .record("tenant_id", field::display(tenant_id))
3091 : .record("timeline_id", field::display(timeline_id));
3092 :
3093 : self.check_permission(Some(tenant_id))?;
3094 : let command_kind = match protocol_version {
3095 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
3096 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
3097 : };
3098 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
3099 :
3100 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
3101 : .await?;
3102 : }
3103 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3104 : tenant_id,
3105 : timeline_id,
3106 : lsn,
3107 : gzip,
3108 : replica,
3109 : }) => {
3110 : tracing::Span::current()
3111 : .record("tenant_id", field::display(tenant_id))
3112 : .record("timeline_id", field::display(timeline_id));
3113 :
3114 : self.check_permission(Some(tenant_id))?;
3115 :
3116 : COMPUTE_COMMANDS_COUNTERS
3117 : .for_command(ComputeCommandKind::Basebackup)
3118 : .inc();
3119 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
3120 0 : let res = async {
3121 0 : self.handle_basebackup_request(
3122 0 : pgb,
3123 0 : tenant_id,
3124 0 : timeline_id,
3125 0 : lsn,
3126 0 : None,
3127 0 : false,
3128 0 : gzip,
3129 0 : replica,
3130 0 : &ctx,
3131 0 : )
3132 0 : .await?;
3133 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3134 0 : Result::<(), QueryError>::Ok(())
3135 0 : }
3136 : .await;
3137 : metric_recording.observe(&res);
3138 : res?;
3139 : }
3140 : // same as basebackup, but result includes relational data as well
3141 : PageServiceCmd::FullBackup(FullBackupCmd {
3142 : tenant_id,
3143 : timeline_id,
3144 : lsn,
3145 : prev_lsn,
3146 : }) => {
3147 : tracing::Span::current()
3148 : .record("tenant_id", field::display(tenant_id))
3149 : .record("timeline_id", field::display(timeline_id));
3150 :
3151 : self.check_permission(Some(tenant_id))?;
3152 :
3153 : COMPUTE_COMMANDS_COUNTERS
3154 : .for_command(ComputeCommandKind::Fullbackup)
3155 : .inc();
3156 :
3157 : // Check that the timeline exists
3158 : self.handle_basebackup_request(
3159 : pgb,
3160 : tenant_id,
3161 : timeline_id,
3162 : lsn,
3163 : prev_lsn,
3164 : true,
3165 : false,
3166 : false,
3167 : &ctx,
3168 : )
3169 : .await?;
3170 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3171 : }
3172 : PageServiceCmd::Set => {
3173 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3174 : // on connect
3175 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3176 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3177 : }
3178 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3179 : tenant_shard_id,
3180 : timeline_id,
3181 : lsn,
3182 : }) => {
3183 : tracing::Span::current()
3184 : .record("tenant_id", field::display(tenant_shard_id))
3185 : .record("timeline_id", field::display(timeline_id));
3186 :
3187 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3188 :
3189 : COMPUTE_COMMANDS_COUNTERS
3190 : .for_command(ComputeCommandKind::LeaseLsn)
3191 : .inc();
3192 :
3193 : match self
3194 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3195 : .await
3196 : {
3197 : Ok(()) => {
3198 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3199 : }
3200 : Err(e) => {
3201 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3202 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3203 : &e.to_string(),
3204 : Some(e.pg_error_code()),
3205 : ))?
3206 : }
3207 : };
3208 : }
3209 : }
3210 :
3211 : Ok(())
3212 : }
3213 : }
3214 :
3215 : /// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
3216 : ///
3217 : /// TODO: rename to PageServiceHandler when libpq impl is removed.
3218 : pub struct GrpcPageServiceHandler {
3219 : tenant_manager: Arc<TenantManager>,
3220 : ctx: RequestContext,
3221 : gate_guard: GateGuard,
3222 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3223 : }
3224 :
3225 : impl GrpcPageServiceHandler {
3226 : /// Spawns a gRPC server for the page service.
3227 : ///
3228 : /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
3229 : /// need to reimplement the TCP+TLS accept loop ourselves.
3230 0 : pub fn spawn(
3231 0 : tenant_manager: Arc<TenantManager>,
3232 0 : auth: Option<Arc<SwappableJwtAuth>>,
3233 0 : perf_trace_dispatch: Option<Dispatch>,
3234 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3235 0 : listener: std::net::TcpListener,
3236 0 : ) -> anyhow::Result<CancellableTask> {
3237 0 : let cancel = CancellationToken::new();
3238 0 : let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
3239 0 : .download_behavior(DownloadBehavior::Download)
3240 0 : .perf_span_dispatch(perf_trace_dispatch)
3241 0 : .detached_child();
3242 0 : let gate = Gate::default();
3243 :
3244 : // Set up the TCP socket. We take a preconfigured TcpListener to bind the
3245 : // port early during startup.
3246 0 : let incoming = {
3247 0 : let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
3248 0 : listener.set_nonblocking(true)?;
3249 0 : tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(
3250 0 : listener,
3251 0 : )?)
3252 0 : .with_nodelay(Some(GRPC_TCP_NODELAY))
3253 0 : .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
3254 : };
3255 :
3256 : // Set up the gRPC server.
3257 : //
3258 : // TODO: consider tuning window sizes.
3259 0 : let mut server = tonic::transport::Server::builder()
3260 0 : .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
3261 0 : .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
3262 0 : .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
3263 :
3264 : // Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
3265 : //
3266 : // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
3267 : //
3268 : // * Layers: allow async code, can run code after the service response. However, only has access
3269 : // to the raw HTTP request/response, not the gRPC types.
3270 0 : let page_service_handler = GrpcPageServiceHandler {
3271 0 : tenant_manager,
3272 0 : ctx,
3273 0 : gate_guard: gate.enter().expect("gate was just created"),
3274 0 : get_vectored_concurrent_io,
3275 0 : };
3276 :
3277 0 : let observability_layer = ObservabilityLayer;
3278 0 : let mut tenant_interceptor = TenantMetadataInterceptor;
3279 0 : let mut auth_interceptor = TenantAuthInterceptor::new(auth);
3280 :
3281 0 : let page_service = tower::ServiceBuilder::new()
3282 : // Create tracing span and record request start time.
3283 0 : .layer(observability_layer)
3284 : // Intercept gRPC requests.
3285 0 : .layer(tonic::service::InterceptorLayer::new(move |mut req| {
3286 : // Extract tenant metadata.
3287 0 : req = tenant_interceptor.call(req)?;
3288 : // Authenticate tenant JWT token.
3289 0 : req = auth_interceptor.call(req)?;
3290 0 : Ok(req)
3291 0 : }))
3292 : // Run the page service.
3293 0 : .service(
3294 0 : proto::PageServiceServer::new(page_service_handler)
3295 : // Support both gzip and zstd compression. The client decides what to use.
3296 0 : .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
3297 0 : .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
3298 0 : .send_compressed(tonic::codec::CompressionEncoding::Gzip)
3299 0 : .send_compressed(tonic::codec::CompressionEncoding::Zstd),
3300 : );
3301 0 : let server = server.add_service(page_service);
3302 :
3303 : // Reflection service for use with e.g. grpcurl.
3304 0 : let reflection_service = tonic_reflection::server::Builder::configure()
3305 0 : .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
3306 0 : .build_v1()?;
3307 0 : let server = server.add_service(reflection_service);
3308 :
3309 : // Spawn server task.
3310 0 : let task_cancel = cancel.clone();
3311 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
3312 : "grpc listener",
3313 0 : async move {
3314 0 : let result = server
3315 0 : .serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
3316 0 : .await;
3317 0 : if result.is_ok() {
3318 : // TODO: revisit shutdown logic once page service is implemented.
3319 0 : gate.close().await;
3320 0 : }
3321 0 : result
3322 0 : },
3323 : ));
3324 :
3325 0 : Ok(CancellableTask { task, cancel })
3326 0 : }
3327 :
3328 : /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
3329 : /// relations and their sizes, as well as SLRU segments and similar data.
3330 : #[allow(clippy::result_large_err)]
3331 0 : fn ensure_shard_zero(timeline: &Handle<TenantManagerTypes>) -> Result<(), tonic::Status> {
3332 0 : match timeline.get_shard_index().shard_number.0 {
3333 0 : 0 => Ok(()),
3334 0 : shard => Err(tonic::Status::invalid_argument(format!(
3335 0 : "request must execute on shard zero (is shard {shard})",
3336 0 : ))),
3337 : }
3338 0 : }
3339 :
3340 : /// Generates a PagestreamRequest header from a ReadLsn and request ID.
3341 0 : fn make_hdr(
3342 0 : read_lsn: page_api::ReadLsn,
3343 0 : req_id: Option<page_api::RequestID>,
3344 0 : ) -> PagestreamRequest {
3345 : PagestreamRequest {
3346 0 : reqid: req_id.map(|r| r.id).unwrap_or_default(),
3347 0 : request_lsn: read_lsn.request_lsn,
3348 0 : not_modified_since: read_lsn
3349 0 : .not_modified_since_lsn
3350 0 : .unwrap_or(read_lsn.request_lsn),
3351 : }
3352 0 : }
3353 :
3354 : /// Acquires a timeline handle for the given request.
3355 : ///
3356 : /// TODO: during shard splits, the compute may still be sending requests to the parent shard
3357 : /// until the entire split is committed and the compute is notified. Consider installing a
3358 : /// temporary shard router from the parent to the children while the split is in progress.
3359 : ///
3360 : /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage
3361 : /// the TimelineHandles lifecycle.
3362 : ///
3363 : /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid
3364 : /// the unnecessary overhead.
3365 0 : async fn get_request_timeline(
3366 0 : &self,
3367 0 : req: &tonic::Request<impl Any>,
3368 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
3369 0 : let ttid = *extract::<TenantTimelineId>(req);
3370 0 : let shard_index = *extract::<ShardIndex>(req);
3371 0 : let shard_selector = ShardSelector::Known(shard_index);
3372 :
3373 0 : TimelineHandles::new(self.tenant_manager.clone())
3374 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3375 0 : .await
3376 0 : }
3377 :
3378 : /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
3379 : /// Only errors if the timeline is shutting down.
3380 : ///
3381 : /// TODO: move timer construction to ObservabilityLayer (see TODO there).
3382 : /// TODO: decouple rate limiting (middleware?), and return SlowDown errors instead.
3383 0 : async fn record_op_start_and_throttle(
3384 0 : timeline: &Handle<TenantManagerTypes>,
3385 0 : op: metrics::SmgrQueryType,
3386 0 : received_at: Instant,
3387 0 : ) -> Result<SmgrOpTimer, tonic::Status> {
3388 0 : let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
3389 0 : .await
3390 0 : .map_err(|err| match err {
3391 : // record_op_start_and_throttle() only returns Shutdown.
3392 0 : QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
3393 0 : err => tonic::Status::internal(format!("unexpected error: {err}")),
3394 0 : })?;
3395 0 : timer.observe_execution_start(Instant::now());
3396 0 : Ok(timer)
3397 0 : }
3398 :
3399 : /// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
3400 : ///
3401 : /// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
3402 : /// GetPageResponse with an appropriate status code to avoid terminating the stream.
3403 : ///
3404 : /// TODO: verify that the requested pages belong to this shard.
3405 : ///
3406 : /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
3407 : /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
3408 : /// split them up in the client or server.
3409 : #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))]
3410 : async fn get_page(
3411 : ctx: &RequestContext,
3412 : timeline: &WeakHandle<TenantManagerTypes>,
3413 : req: proto::GetPageRequest,
3414 : io_concurrency: IoConcurrency,
3415 : ) -> Result<proto::GetPageResponse, tonic::Status> {
3416 : let received_at = Instant::now();
3417 : let timeline = timeline.upgrade()?;
3418 : let ctx = ctx.with_scope_page_service_pagestream(&timeline);
3419 :
3420 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3421 : let req = page_api::GetPageRequest::try_from(req)?;
3422 :
3423 : span_record!(
3424 : req_id = %req.request_id,
3425 : rel = %req.rel,
3426 : blkno = %req.block_numbers[0],
3427 : blks = %req.block_numbers.len(),
3428 : lsn = %req.read_lsn,
3429 : );
3430 :
3431 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
3432 : let effective_lsn = PageServerHandler::effective_request_lsn(
3433 : &timeline,
3434 : timeline.get_last_record_lsn(),
3435 : req.read_lsn.request_lsn,
3436 : req.read_lsn
3437 : .not_modified_since_lsn
3438 : .unwrap_or(req.read_lsn.request_lsn),
3439 : &latest_gc_cutoff_lsn,
3440 : )?;
3441 :
3442 : let mut batch = SmallVec::with_capacity(req.block_numbers.len());
3443 : for blkno in req.block_numbers {
3444 : // TODO: this creates one timer per page and throttles it. We should have a timer for
3445 : // the entire batch, and throttle only the batch, but this is equivalent to what
3446 : // PageServerHandler does already so we keep it for now.
3447 : let timer = Self::record_op_start_and_throttle(
3448 : &timeline,
3449 : metrics::SmgrQueryType::GetPageAtLsn,
3450 : received_at,
3451 : )
3452 : .await?;
3453 :
3454 : batch.push(BatchedGetPageRequest {
3455 : req: PagestreamGetPageRequest {
3456 : hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)),
3457 : rel: req.rel,
3458 : blkno,
3459 : },
3460 : lsn_range: LsnRange {
3461 : effective_lsn,
3462 : request_lsn: req.read_lsn.request_lsn,
3463 : },
3464 : timer,
3465 : ctx: ctx.attached_child(),
3466 : batch_wait_ctx: None, // TODO: add tracing
3467 : });
3468 : }
3469 :
3470 : // TODO: this does a relation size query for every page in the batch. Since this batch is
3471 : // all for one relation, we could do this only once. However, this is not the case for the
3472 : // libpq implementation.
3473 : let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
3474 : &timeline,
3475 : batch,
3476 : io_concurrency,
3477 : GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
3478 : &ctx,
3479 : )
3480 : .await;
3481 :
3482 : let mut resp = page_api::GetPageResponse {
3483 : request_id: req.request_id,
3484 : status_code: page_api::GetPageStatusCode::Ok,
3485 : reason: None,
3486 : rel: req.rel,
3487 : pages: Vec::with_capacity(results.len()),
3488 : };
3489 :
3490 : for result in results {
3491 : match result {
3492 : Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page {
3493 : block_number: r.req.blkno,
3494 : image: r.page,
3495 : }),
3496 : Ok((resp, _, _)) => {
3497 : return Err(tonic::Status::internal(format!(
3498 : "unexpected response: {resp:?}"
3499 : )));
3500 : }
3501 : Err(err) => return Err(err.err.into()),
3502 : };
3503 : }
3504 :
3505 : Ok(resp.into())
3506 : }
3507 : }
3508 :
3509 : /// Implements the gRPC page service.
3510 : ///
3511 : /// TODO: cancellation.
3512 : /// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
3513 : #[tonic::async_trait]
3514 : impl proto::PageService for GrpcPageServiceHandler {
3515 : type GetBaseBackupStream = Pin<
3516 : Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
3517 : >;
3518 :
3519 : type GetPagesStream =
3520 : Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
3521 :
3522 : #[instrument(skip_all, fields(rel, lsn))]
3523 : async fn check_rel_exists(
3524 : &self,
3525 : req: tonic::Request<proto::CheckRelExistsRequest>,
3526 0 : ) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
3527 0 : let received_at = extract::<ReceivedAt>(&req).0;
3528 0 : let timeline = self.get_request_timeline(&req).await?;
3529 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3530 :
3531 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3532 0 : Self::ensure_shard_zero(&timeline)?;
3533 0 : let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
3534 :
3535 0 : span_record!(rel=%req.rel, lsn=%req.read_lsn);
3536 :
3537 0 : let req = PagestreamExistsRequest {
3538 0 : hdr: Self::make_hdr(req.read_lsn, None),
3539 0 : rel: req.rel,
3540 0 : };
3541 :
3542 : // Execute the request and convert the response.
3543 0 : let _timer = Self::record_op_start_and_throttle(
3544 0 : &timeline,
3545 0 : metrics::SmgrQueryType::GetRelExists,
3546 0 : received_at,
3547 0 : )
3548 0 : .await?;
3549 :
3550 0 : let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
3551 0 : let resp: page_api::CheckRelExistsResponse = resp.exists;
3552 0 : Ok(tonic::Response::new(resp.into()))
3553 0 : }
3554 :
3555 : #[instrument(skip_all, fields(lsn))]
3556 : async fn get_base_backup(
3557 : &self,
3558 : req: tonic::Request<proto::GetBaseBackupRequest>,
3559 0 : ) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
3560 : // Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
3561 : // to be the sweet spot where throughput is saturated.
3562 : const CHUNK_SIZE: usize = 256 * 1024;
3563 :
3564 0 : let timeline = self.get_request_timeline(&req).await?;
3565 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
3566 :
3567 : // Validate the request and decorate the span.
3568 0 : Self::ensure_shard_zero(&timeline)?;
3569 0 : if timeline.is_archived() == Some(true) {
3570 0 : return Err(tonic::Status::failed_precondition("timeline is archived"));
3571 0 : }
3572 0 : let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
3573 :
3574 0 : span_record!(lsn=?req.lsn);
3575 :
3576 : // Wait for the LSN to arrive, if given.
3577 0 : if let Some(lsn) = req.lsn {
3578 0 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
3579 0 : timeline
3580 0 : .wait_lsn(
3581 0 : lsn,
3582 0 : WaitLsnWaiter::PageService,
3583 0 : WaitLsnTimeout::Default,
3584 0 : &ctx,
3585 0 : )
3586 0 : .await?;
3587 0 : timeline
3588 0 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
3589 0 : .map_err(|err| {
3590 0 : tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
3591 0 : })?;
3592 0 : }
3593 :
3594 : // Spawn a task to run the basebackup.
3595 0 : let span = Span::current();
3596 0 : let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
3597 0 : let jh = tokio::spawn(async move {
3598 0 : let gzip_level = match req.compression {
3599 0 : page_api::BaseBackupCompression::None => None,
3600 : // NB: using fast compression because it's on the critical path for compute
3601 : // startup. For an empty database, we get <100KB with this method. The
3602 : // Level::Best compression method gives us <20KB, but maybe we should add
3603 : // basebackup caching on compute shutdown first.
3604 0 : page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest),
3605 : };
3606 :
3607 : // Check for a cached basebackup.
3608 0 : let cached = timeline
3609 0 : .get_cached_basebackup_if_enabled(
3610 0 : req.lsn,
3611 0 : None,
3612 0 : req.full,
3613 0 : req.replica,
3614 0 : gzip_level.is_some(),
3615 0 : )
3616 0 : .await;
3617 :
3618 0 : let result = if let Some(mut cached) = cached {
3619 : // If we have a cached basebackup, send it.
3620 0 : tokio::io::copy(&mut cached, &mut simplex_write)
3621 0 : .await
3622 0 : .map(|_| ())
3623 0 : .map_err(|err| BasebackupError::Client(err, "cached,copy"))
3624 : } else {
3625 0 : basebackup::send_basebackup_tarball(
3626 0 : &mut simplex_write,
3627 0 : &timeline,
3628 0 : req.lsn,
3629 0 : None,
3630 0 : req.full,
3631 0 : req.replica,
3632 0 : gzip_level,
3633 0 : &ctx,
3634 0 : )
3635 0 : .instrument(span) // propagate request span
3636 0 : .await
3637 : };
3638 0 : simplex_write
3639 0 : .shutdown()
3640 0 : .await
3641 0 : .map_err(|err| BasebackupError::Client(err, "simplex_write"))?;
3642 0 : result
3643 0 : });
3644 :
3645 : // Emit chunks of size CHUNK_SIZE.
3646 0 : let chunks = async_stream::try_stream! {
3647 : loop {
3648 : let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
3649 : loop {
3650 0 : let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
3651 0 : tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
3652 0 : })?;
3653 : if n == 0 {
3654 : break; // full chunk or closed stream
3655 : }
3656 : }
3657 : let chunk = chunk.into_inner().freeze();
3658 : if chunk.is_empty() {
3659 : break;
3660 : }
3661 : yield proto::GetBaseBackupResponseChunk::from(chunk);
3662 : }
3663 : // Wait for the basebackup task to exit and check for errors.
3664 0 : jh.await.map_err(|err| {
3665 0 : tonic::Status::internal(format!("basebackup failed: {err}"))
3666 0 : })??;
3667 : };
3668 :
3669 0 : Ok(tonic::Response::new(Box::pin(chunks)))
3670 0 : }
3671 :
3672 : #[instrument(skip_all, fields(db_oid, lsn))]
3673 : async fn get_db_size(
3674 : &self,
3675 : req: tonic::Request<proto::GetDbSizeRequest>,
3676 0 : ) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
3677 0 : let received_at = extract::<ReceivedAt>(&req).0;
3678 0 : let timeline = self.get_request_timeline(&req).await?;
3679 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3680 :
3681 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3682 0 : Self::ensure_shard_zero(&timeline)?;
3683 0 : let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
3684 :
3685 0 : span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
3686 :
3687 0 : let req = PagestreamDbSizeRequest {
3688 0 : hdr: Self::make_hdr(req.read_lsn, None),
3689 0 : dbnode: req.db_oid,
3690 0 : };
3691 :
3692 : // Execute the request and convert the response.
3693 0 : let _timer = Self::record_op_start_and_throttle(
3694 0 : &timeline,
3695 0 : metrics::SmgrQueryType::GetDbSize,
3696 0 : received_at,
3697 0 : )
3698 0 : .await?;
3699 :
3700 0 : let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
3701 0 : let resp = resp.db_size as page_api::GetDbSizeResponse;
3702 0 : Ok(tonic::Response::new(resp.into()))
3703 0 : }
3704 :
3705 : // NB: don't instrument this, instrument each streamed request.
3706 0 : async fn get_pages(
3707 : &self,
3708 : req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
3709 0 : ) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
3710 : // Extract the timeline from the request and check that it exists.
3711 0 : let ttid = *extract::<TenantTimelineId>(&req);
3712 0 : let shard_index = *extract::<ShardIndex>(&req);
3713 0 : let shard_selector = ShardSelector::Known(shard_index);
3714 :
3715 0 : let mut handles = TimelineHandles::new(self.tenant_manager.clone());
3716 0 : handles
3717 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3718 0 : .await?;
3719 :
3720 : // Spawn an IoConcurrency sidecar, if enabled.
3721 0 : let Ok(gate_guard) = self.gate_guard.try_clone() else {
3722 0 : return Err(tonic::Status::unavailable("shutting down"));
3723 : };
3724 0 : let io_concurrency =
3725 0 : IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
3726 :
3727 : // Spawn a task to handle the GetPageRequest stream.
3728 0 : let span = Span::current();
3729 0 : let ctx = self.ctx.attached_child();
3730 0 : let mut reqs = req.into_inner();
3731 :
3732 0 : let resps = async_stream::try_stream! {
3733 : let timeline = handles
3734 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3735 : .await?
3736 : .downgrade();
3737 : while let Some(req) = reqs.message().await? {
3738 : let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
3739 : let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
3740 : .instrument(span.clone()) // propagate request span
3741 : .await;
3742 : yield match result {
3743 : Ok(resp) => resp,
3744 : // Convert per-request errors to GetPageResponses as appropriate, or terminate
3745 : // the stream with a tonic::Status. Log the error regardless, since
3746 : // ObservabilityLayer can't automatically log stream errors.
3747 : Err(status) => {
3748 : // TODO: it would be nice if we could propagate the get_page() fields here.
3749 0 : span.in_scope(|| {
3750 0 : warn!("request failed with {:?}: {}", status.code(), status.message());
3751 0 : });
3752 : page_api::GetPageResponse::try_from_status(status, req_id)?.into()
3753 : }
3754 : }
3755 : }
3756 : };
3757 :
3758 0 : Ok(tonic::Response::new(Box::pin(resps)))
3759 0 : }
3760 :
3761 : #[instrument(skip_all, fields(rel, lsn))]
3762 : async fn get_rel_size(
3763 : &self,
3764 : req: tonic::Request<proto::GetRelSizeRequest>,
3765 0 : ) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
3766 0 : let received_at = extract::<ReceivedAt>(&req).0;
3767 0 : let timeline = self.get_request_timeline(&req).await?;
3768 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3769 :
3770 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3771 0 : Self::ensure_shard_zero(&timeline)?;
3772 0 : let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
3773 :
3774 0 : span_record!(rel=%req.rel, lsn=%req.read_lsn);
3775 :
3776 0 : let req = PagestreamNblocksRequest {
3777 0 : hdr: Self::make_hdr(req.read_lsn, None),
3778 0 : rel: req.rel,
3779 0 : };
3780 :
3781 : // Execute the request and convert the response.
3782 0 : let _timer = Self::record_op_start_and_throttle(
3783 0 : &timeline,
3784 0 : metrics::SmgrQueryType::GetRelSize,
3785 0 : received_at,
3786 0 : )
3787 0 : .await?;
3788 :
3789 0 : let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
3790 0 : let resp: page_api::GetRelSizeResponse = resp.n_blocks;
3791 0 : Ok(tonic::Response::new(resp.into()))
3792 0 : }
3793 :
3794 : #[instrument(skip_all, fields(kind, segno, lsn))]
3795 : async fn get_slru_segment(
3796 : &self,
3797 : req: tonic::Request<proto::GetSlruSegmentRequest>,
3798 0 : ) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
3799 0 : let received_at = extract::<ReceivedAt>(&req).0;
3800 0 : let timeline = self.get_request_timeline(&req).await?;
3801 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3802 :
3803 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3804 0 : Self::ensure_shard_zero(&timeline)?;
3805 0 : let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
3806 :
3807 0 : span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
3808 :
3809 0 : let req = PagestreamGetSlruSegmentRequest {
3810 0 : hdr: Self::make_hdr(req.read_lsn, None),
3811 0 : kind: req.kind as u8,
3812 0 : segno: req.segno,
3813 0 : };
3814 :
3815 : // Execute the request and convert the response.
3816 0 : let _timer = Self::record_op_start_and_throttle(
3817 0 : &timeline,
3818 0 : metrics::SmgrQueryType::GetSlruSegment,
3819 0 : received_at,
3820 0 : )
3821 0 : .await?;
3822 :
3823 0 : let resp =
3824 0 : PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
3825 0 : let resp: page_api::GetSlruSegmentResponse = resp.segment;
3826 0 : Ok(tonic::Response::new(resp.into()))
3827 0 : }
3828 :
3829 : #[instrument(skip_all, fields(lsn))]
3830 : async fn lease_lsn(
3831 : &self,
3832 : req: tonic::Request<proto::LeaseLsnRequest>,
3833 0 : ) -> Result<tonic::Response<proto::LeaseLsnResponse>, tonic::Status> {
3834 0 : let timeline = self.get_request_timeline(&req).await?;
3835 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
3836 :
3837 : // Validate and convert the request, and decorate the span.
3838 0 : let req: page_api::LeaseLsnRequest = req.into_inner().try_into()?;
3839 :
3840 0 : span_record!(lsn=%req.lsn);
3841 :
3842 : // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
3843 0 : let lease_length = timeline.get_lsn_lease_length();
3844 0 : let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) {
3845 0 : Ok(lease) => lease.valid_until,
3846 0 : Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
3847 : };
3848 :
3849 : // TODO: is this spammy? Move it compute-side?
3850 0 : info!(
3851 0 : "acquired lease for {} until {}",
3852 : req.lsn,
3853 0 : chrono::DateTime::<Utc>::from(expires).to_rfc3339()
3854 : );
3855 :
3856 0 : Ok(tonic::Response::new(expires.into()))
3857 0 : }
3858 : }
3859 :
3860 : /// gRPC middleware layer that handles observability concerns:
3861 : ///
3862 : /// * Creates and enters a tracing span.
3863 : /// * Records the request start time as a ReceivedAt request extension.
3864 : ///
3865 : /// TODO: add perf tracing.
3866 : /// TODO: add timing and metrics.
3867 : /// TODO: add logging.
3868 : #[derive(Clone)]
3869 : struct ObservabilityLayer;
3870 :
3871 : impl<S: tonic::server::NamedService> tower::Layer<S> for ObservabilityLayer {
3872 : type Service = ObservabilityLayerService<S>;
3873 :
3874 0 : fn layer(&self, inner: S) -> Self::Service {
3875 0 : Self::Service { inner }
3876 0 : }
3877 : }
3878 :
3879 : #[derive(Clone)]
3880 : struct ObservabilityLayerService<S> {
3881 : inner: S,
3882 : }
3883 :
3884 : #[derive(Clone, Copy)]
3885 : struct ReceivedAt(Instant);
3886 :
3887 : impl<S: tonic::server::NamedService> tonic::server::NamedService for ObservabilityLayerService<S> {
3888 : const NAME: &'static str = S::NAME; // propagate inner service name
3889 : }
3890 :
3891 : impl<S, Req, Resp> tower::Service<http::Request<Req>> for ObservabilityLayerService<S>
3892 : where
3893 : S: tower::Service<http::Request<Req>, Response = http::Response<Resp>> + Send,
3894 : S::Future: Send + 'static,
3895 : {
3896 : type Response = S::Response;
3897 : type Error = S::Error;
3898 : type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
3899 :
3900 0 : fn call(&mut self, mut req: http::Request<Req>) -> Self::Future {
3901 : // Record the request start time as a request extension.
3902 : //
3903 : // TODO: we should start a timer here instead, but it currently requires a timeline handle
3904 : // and SmgrQueryType, which we don't have yet. Refactor it to provide it later.
3905 0 : req.extensions_mut().insert(ReceivedAt(Instant::now()));
3906 :
3907 : // Extract the peer address and gRPC method.
3908 0 : let peer = req
3909 0 : .extensions()
3910 0 : .get::<TcpConnectInfo>()
3911 0 : .and_then(|info| info.remote_addr())
3912 0 : .map(|addr| addr.to_string())
3913 0 : .unwrap_or_default();
3914 :
3915 0 : let method = req
3916 0 : .uri()
3917 0 : .path()
3918 0 : .split('/')
3919 0 : .nth(2)
3920 0 : .unwrap_or(req.uri().path())
3921 0 : .to_string();
3922 :
3923 : // Create a basic tracing span.
3924 : //
3925 : // Enter the span for the current thread and instrument the future. It is not sufficient to
3926 : // only instrument the future, since it only takes effect after the future is returned and
3927 : // polled, not when the inner service is called below (e.g. during interceptor execution).
3928 0 : let span = info_span!(
3929 : "grpc:pageservice",
3930 : // These will be populated by TenantMetadataInterceptor.
3931 : tenant_id = field::Empty,
3932 : timeline_id = field::Empty,
3933 : shard_id = field::Empty,
3934 : // NB: empty fields must be listed first above. Otherwise, the field names will be
3935 : // clobbered when the empty fields are populated. They will be output last regardless.
3936 : %peer,
3937 : %method,
3938 : );
3939 0 : let _guard = span.enter();
3940 :
3941 : // Construct a future for calling the inner service, but don't await it. This avoids having
3942 : // to clone the inner service into the future below.
3943 0 : let call = self.inner.call(req);
3944 :
3945 0 : async move {
3946 : // Await the inner service call.
3947 0 : let result = call.await;
3948 :
3949 : // Log gRPC error statuses. This won't include request info from handler spans, but it
3950 : // will catch all errors (even those emitted before handler spans are constructed). Only
3951 : // unary request errors are logged here, not streaming response errors.
3952 0 : if let Ok(ref resp) = result
3953 0 : && let Some(status) = tonic::Status::from_header_map(resp.headers())
3954 0 : && status.code() != tonic::Code::Ok
3955 : {
3956 : // TODO: it would be nice if we could propagate the handler span's request fields
3957 : // here. This could e.g. be done by attaching the request fields to
3958 : // tonic::Status::metadata via a proc macro.
3959 0 : warn!(
3960 0 : "request failed with {:?}: {}",
3961 0 : status.code(),
3962 0 : status.message()
3963 : );
3964 0 : }
3965 :
3966 0 : result
3967 0 : }
3968 0 : .instrument(span.clone())
3969 0 : .boxed()
3970 0 : }
3971 :
3972 0 : fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
3973 0 : self.inner.poll_ready(cx)
3974 0 : }
3975 : }
3976 :
3977 : /// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
3978 : /// TenantTimelineId and ShardIndex.
3979 : #[derive(Clone)]
3980 : struct TenantMetadataInterceptor;
3981 :
3982 : impl tonic::service::Interceptor for TenantMetadataInterceptor {
3983 0 : fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
3984 : // Decode the tenant ID.
3985 0 : let tenant_id = req
3986 0 : .metadata()
3987 0 : .get("neon-tenant-id")
3988 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
3989 0 : .to_str()
3990 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
3991 0 : let tenant_id = TenantId::from_str(tenant_id)
3992 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
3993 :
3994 : // Decode the timeline ID.
3995 0 : let timeline_id = req
3996 0 : .metadata()
3997 0 : .get("neon-timeline-id")
3998 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
3999 0 : .to_str()
4000 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
4001 0 : let timeline_id = TimelineId::from_str(timeline_id)
4002 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
4003 :
4004 : // Decode the shard ID.
4005 0 : let shard_id = req
4006 0 : .metadata()
4007 0 : .get("neon-shard-id")
4008 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
4009 0 : .to_str()
4010 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
4011 0 : let shard_id = ShardIndex::from_str(shard_id)
4012 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
4013 :
4014 : // Stash them in the request.
4015 0 : let extensions = req.extensions_mut();
4016 0 : extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
4017 0 : extensions.insert(shard_id);
4018 :
4019 : // Decorate the tracing span.
4020 0 : span_record!(%tenant_id, %timeline_id, %shard_id);
4021 :
4022 0 : Ok(req)
4023 0 : }
4024 : }
4025 :
4026 : /// Authenticates gRPC page service requests.
4027 : #[derive(Clone)]
4028 : struct TenantAuthInterceptor {
4029 : auth: Option<Arc<SwappableJwtAuth>>,
4030 : }
4031 :
4032 : impl TenantAuthInterceptor {
4033 0 : fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
4034 0 : Self { auth }
4035 0 : }
4036 : }
4037 :
4038 : impl tonic::service::Interceptor for TenantAuthInterceptor {
4039 0 : fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
4040 : // Do nothing if auth is disabled.
4041 0 : let Some(auth) = self.auth.as_ref() else {
4042 0 : return Ok(req);
4043 : };
4044 :
4045 : // Fetch the tenant ID from the request extensions (set by TenantMetadataInterceptor).
4046 0 : let TenantTimelineId { tenant_id, .. } = *extract::<TenantTimelineId>(&req);
4047 :
4048 : // Fetch and decode the JWT token.
4049 0 : let jwt = req
4050 0 : .metadata()
4051 0 : .get("authorization")
4052 0 : .ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
4053 0 : .to_str()
4054 0 : .map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
4055 0 : .strip_prefix("Bearer ")
4056 0 : .ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
4057 0 : .trim();
4058 0 : let jwtdata: TokenData<Claims> = auth
4059 0 : .decode(jwt)
4060 0 : .map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
4061 0 : let claims = jwtdata.claims;
4062 :
4063 : // Check if the token is valid for this tenant.
4064 0 : check_permission(&claims, Some(tenant_id))
4065 0 : .map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
4066 :
4067 : // TODO: consider stashing the claims in the request extensions, if needed.
4068 :
4069 0 : Ok(req)
4070 0 : }
4071 : }
4072 :
4073 : /// Extracts the given type from the request extensions, or panics if it is missing.
4074 0 : fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
4075 0 : extract_from(req.extensions())
4076 0 : }
4077 :
4078 : /// Extract the given type from the request extensions, or panics if it is missing. This variant
4079 : /// can extract both from a tonic::Request and http::Request.
4080 0 : fn extract_from<T: Send + Sync + 'static>(ext: &http::Extensions) -> &T {
4081 0 : let Some(value) = ext.get::<T>() else {
4082 0 : let name = std::any::type_name::<T>();
4083 0 : panic!("extension {name} should be set by middleware");
4084 : };
4085 0 : value
4086 0 : }
4087 :
4088 : #[derive(Debug, thiserror::Error)]
4089 : pub(crate) enum GetActiveTimelineError {
4090 : #[error(transparent)]
4091 : Tenant(GetActiveTenantError),
4092 : #[error(transparent)]
4093 : Timeline(#[from] GetTimelineError),
4094 : }
4095 :
4096 : impl From<GetActiveTimelineError> for QueryError {
4097 0 : fn from(e: GetActiveTimelineError) -> Self {
4098 0 : match e {
4099 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
4100 0 : GetActiveTimelineError::Tenant(e) => e.into(),
4101 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
4102 : }
4103 0 : }
4104 : }
4105 :
4106 : impl From<GetActiveTimelineError> for tonic::Status {
4107 0 : fn from(err: GetActiveTimelineError) -> Self {
4108 0 : let message = err.to_string();
4109 0 : let code = match err {
4110 0 : GetActiveTimelineError::Tenant(err) => tonic::Status::from(err).code(),
4111 0 : GetActiveTimelineError::Timeline(err) => tonic::Status::from(err).code(),
4112 : };
4113 0 : tonic::Status::new(code, message)
4114 0 : }
4115 : }
4116 :
4117 : impl From<GetTimelineError> for tonic::Status {
4118 0 : fn from(err: GetTimelineError) -> Self {
4119 : use tonic::Code;
4120 0 : let code = match &err {
4121 0 : GetTimelineError::NotFound { .. } => Code::NotFound,
4122 0 : GetTimelineError::NotActive { .. } => Code::Unavailable,
4123 0 : GetTimelineError::ShuttingDown => Code::Unavailable,
4124 : };
4125 0 : tonic::Status::new(code, err.to_string())
4126 0 : }
4127 : }
4128 :
4129 : impl From<GetActiveTenantError> for QueryError {
4130 0 : fn from(e: GetActiveTenantError) -> Self {
4131 0 : match e {
4132 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
4133 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
4134 0 : ),
4135 : GetActiveTenantError::Cancelled
4136 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
4137 0 : QueryError::Shutdown
4138 : }
4139 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
4140 0 : e => QueryError::Other(anyhow::anyhow!(e)),
4141 : }
4142 0 : }
4143 : }
4144 :
4145 : impl From<GetActiveTenantError> for tonic::Status {
4146 0 : fn from(err: GetActiveTenantError) -> Self {
4147 : use tonic::Code;
4148 0 : let code = match &err {
4149 0 : GetActiveTenantError::Broken(_) => Code::Internal,
4150 0 : GetActiveTenantError::Cancelled => Code::Unavailable,
4151 0 : GetActiveTenantError::NotFound(_) => Code::NotFound,
4152 0 : GetActiveTenantError::SwitchedTenant => Code::Unavailable,
4153 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
4154 0 : GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
4155 : };
4156 0 : tonic::Status::new(code, err.to_string())
4157 0 : }
4158 : }
4159 :
4160 : impl From<HandleUpgradeError> for QueryError {
4161 0 : fn from(e: HandleUpgradeError) -> Self {
4162 0 : match e {
4163 0 : HandleUpgradeError::ShutDown => QueryError::Shutdown,
4164 : }
4165 0 : }
4166 : }
4167 :
4168 : impl From<HandleUpgradeError> for tonic::Status {
4169 0 : fn from(err: HandleUpgradeError) -> Self {
4170 0 : match err {
4171 0 : HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
4172 : }
4173 0 : }
4174 : }
4175 :
4176 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
4177 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
4178 0 : tracing::Span::current().record(
4179 0 : "shard_id",
4180 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
4181 : );
4182 0 : debug_assert_current_span_has_tenant_and_timeline_id();
4183 0 : }
4184 :
4185 : struct WaitedForLsn(Lsn);
4186 : impl From<WaitedForLsn> for Lsn {
4187 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
4188 0 : lsn
4189 0 : }
4190 : }
4191 :
4192 : #[cfg(test)]
4193 : mod tests {
4194 : use utils::shard::ShardCount;
4195 :
4196 : use super::*;
4197 :
4198 : #[test]
4199 1 : fn pageservice_cmd_parse() {
4200 1 : let tenant_id = TenantId::generate();
4201 1 : let timeline_id = TimelineId::generate();
4202 1 : let cmd =
4203 1 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
4204 1 : assert_eq!(
4205 : cmd,
4206 1 : PageServiceCmd::PageStream(PageStreamCmd {
4207 1 : tenant_id,
4208 1 : timeline_id,
4209 1 : protocol_version: PagestreamProtocolVersion::V2,
4210 1 : })
4211 : );
4212 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
4213 1 : assert_eq!(
4214 : cmd,
4215 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4216 1 : tenant_id,
4217 1 : timeline_id,
4218 1 : lsn: None,
4219 1 : gzip: false,
4220 1 : replica: false
4221 1 : })
4222 : );
4223 1 : let cmd =
4224 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
4225 1 : assert_eq!(
4226 : cmd,
4227 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4228 1 : tenant_id,
4229 1 : timeline_id,
4230 1 : lsn: None,
4231 1 : gzip: true,
4232 1 : replica: false
4233 1 : })
4234 : );
4235 1 : let cmd =
4236 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
4237 1 : assert_eq!(
4238 : cmd,
4239 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4240 1 : tenant_id,
4241 1 : timeline_id,
4242 1 : lsn: None,
4243 1 : gzip: false,
4244 1 : replica: false
4245 1 : })
4246 : );
4247 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
4248 1 : .unwrap();
4249 1 : assert_eq!(
4250 : cmd,
4251 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4252 1 : tenant_id,
4253 1 : timeline_id,
4254 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4255 1 : gzip: false,
4256 1 : replica: false
4257 1 : })
4258 : );
4259 1 : let cmd = PageServiceCmd::parse(&format!(
4260 1 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
4261 1 : ))
4262 1 : .unwrap();
4263 1 : assert_eq!(
4264 : cmd,
4265 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4266 1 : tenant_id,
4267 1 : timeline_id,
4268 1 : lsn: None,
4269 1 : gzip: true,
4270 1 : replica: true
4271 1 : })
4272 : );
4273 1 : let cmd = PageServiceCmd::parse(&format!(
4274 1 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
4275 1 : ))
4276 1 : .unwrap();
4277 1 : assert_eq!(
4278 : cmd,
4279 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4280 1 : tenant_id,
4281 1 : timeline_id,
4282 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4283 1 : gzip: true,
4284 1 : replica: true
4285 1 : })
4286 : );
4287 1 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
4288 1 : assert_eq!(
4289 : cmd,
4290 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4291 1 : tenant_id,
4292 1 : timeline_id,
4293 1 : lsn: None,
4294 1 : prev_lsn: None
4295 1 : })
4296 : );
4297 1 : let cmd = PageServiceCmd::parse(&format!(
4298 1 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
4299 1 : ))
4300 1 : .unwrap();
4301 1 : assert_eq!(
4302 : cmd,
4303 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4304 1 : tenant_id,
4305 1 : timeline_id,
4306 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4307 1 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
4308 1 : })
4309 : );
4310 1 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
4311 1 : let cmd = PageServiceCmd::parse(&format!(
4312 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4313 1 : ))
4314 1 : .unwrap();
4315 1 : assert_eq!(
4316 : cmd,
4317 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4318 1 : tenant_shard_id,
4319 1 : timeline_id,
4320 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4321 1 : })
4322 : );
4323 1 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
4324 1 : let cmd = PageServiceCmd::parse(&format!(
4325 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4326 1 : ))
4327 1 : .unwrap();
4328 1 : assert_eq!(
4329 : cmd,
4330 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4331 1 : tenant_shard_id,
4332 1 : timeline_id,
4333 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4334 1 : })
4335 : );
4336 1 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
4337 1 : assert_eq!(cmd, PageServiceCmd::Set);
4338 1 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
4339 1 : assert_eq!(cmd, PageServiceCmd::Set);
4340 1 : }
4341 :
4342 : #[test]
4343 1 : fn pageservice_cmd_err_handling() {
4344 1 : let tenant_id = TenantId::generate();
4345 1 : let timeline_id = TimelineId::generate();
4346 1 : let cmd = PageServiceCmd::parse("unknown_command");
4347 1 : assert!(cmd.is_err());
4348 1 : let cmd = PageServiceCmd::parse("pagestream_v2");
4349 1 : assert!(cmd.is_err());
4350 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
4351 1 : assert!(cmd.is_err());
4352 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
4353 1 : assert!(cmd.is_err());
4354 1 : let cmd = PageServiceCmd::parse(&format!(
4355 1 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
4356 1 : ));
4357 1 : assert!(cmd.is_err());
4358 1 : let cmd = PageServiceCmd::parse(&format!(
4359 1 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
4360 1 : ));
4361 1 : assert!(cmd.is_err());
4362 1 : let cmd = PageServiceCmd::parse(&format!(
4363 1 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
4364 1 : ));
4365 1 : assert!(cmd.is_err());
4366 1 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
4367 1 : assert!(cmd.is_err());
4368 1 : }
4369 :
4370 : #[test]
4371 1 : fn test_parse_options() {
4372 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
4373 1 : assert!(!has_error);
4374 1 : assert_eq!(
4375 : config,
4376 1 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
4377 : );
4378 :
4379 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
4380 1 : assert!(!has_error);
4381 1 : assert_eq!(
4382 : config,
4383 1 : vec![
4384 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4385 1 : ("foo".to_string(), "bar".to_string()),
4386 : ]
4387 : );
4388 :
4389 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
4390 1 : assert!(!has_error);
4391 1 : assert_eq!(
4392 : config,
4393 1 : vec![
4394 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4395 1 : ("foo".to_string(), "bar".to_string()),
4396 : ]
4397 : );
4398 :
4399 1 : let (_, has_error) = parse_options("-c");
4400 1 : assert!(has_error);
4401 :
4402 1 : let (_, has_error) = parse_options("-c foo=bar -c -c");
4403 1 : assert!(has_error);
4404 :
4405 1 : let (_, has_error) = parse_options(" ");
4406 1 : assert!(!has_error);
4407 :
4408 1 : let (_, has_error) = parse_options(" -c neon.compute_mode");
4409 1 : assert!(has_error);
4410 1 : }
4411 : }
|