Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::any::Any;
5 : use std::borrow::Cow;
6 : use std::num::NonZeroUsize;
7 : use std::os::fd::AsRawFd;
8 : use std::pin::Pin;
9 : use std::str::FromStr;
10 : use std::sync::Arc;
11 : use std::task::{Context, Poll};
12 : use std::time::{Duration, Instant, SystemTime};
13 : use std::{io, str};
14 :
15 : use anyhow::{Context as _, anyhow, bail};
16 : use async_compression::tokio::write::GzipEncoder;
17 : use bytes::{Buf, BytesMut};
18 : use futures::future::BoxFuture;
19 : use futures::{FutureExt, Stream};
20 : use itertools::Itertools;
21 : use jsonwebtoken::TokenData;
22 : use once_cell::sync::OnceCell;
23 : use pageserver_api::config::{
24 : GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
25 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
26 : };
27 : use pageserver_api::key::rel_block_to_key;
28 : use pageserver_api::models::{
29 : self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
30 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
31 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
32 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
33 : PagestreamProtocolVersion, PagestreamRequest, TenantState,
34 : };
35 : use pageserver_api::reltag::SlruKind;
36 : use pageserver_api::shard::TenantShardId;
37 : use pageserver_page_api as page_api;
38 : use pageserver_page_api::proto;
39 : use postgres_backend::{
40 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
41 : };
42 : use postgres_ffi::BLCKSZ;
43 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
44 : use pq_proto::framed::ConnectionError;
45 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
46 : use smallvec::{SmallVec, smallvec};
47 : use strum_macros::IntoStaticStr;
48 : use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, BufWriter};
49 : use tokio::task::JoinHandle;
50 : use tokio_util::sync::CancellationToken;
51 : use tonic::service::Interceptor as _;
52 : use tracing::*;
53 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
54 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
55 : use utils::logging::log_slow;
56 : use utils::lsn::Lsn;
57 : use utils::shard::ShardIndex;
58 : use utils::simple_rcu::RcuReadGuard;
59 : use utils::sync::gate::{Gate, GateGuard};
60 : use utils::sync::spsc_fold;
61 : use utils::{failpoint_support, span_record};
62 :
63 : use crate::auth::check_permission;
64 : use crate::basebackup::{self, BasebackupError};
65 : use crate::basebackup_cache::BasebackupCache;
66 : use crate::config::PageServerConf;
67 : use crate::context::{
68 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
69 : };
70 : use crate::metrics::{
71 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
72 : SmgrOpTimer, TimelineMetrics,
73 : };
74 : use crate::pgdatadir_mapping::{LsnRange, Version};
75 : use crate::span::{
76 : debug_assert_current_span_has_tenant_and_timeline_id,
77 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
78 : };
79 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
80 : use crate::tenant::mgr::{
81 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
82 : };
83 : use crate::tenant::storage_layer::IoConcurrency;
84 : use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle};
85 : use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter};
86 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
87 : use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
88 :
89 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
90 : /// is not yet in state [`TenantState::Active`].
91 : ///
92 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
93 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
94 :
95 : /// Threshold at which to log slow GetPage requests.
96 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
97 :
98 : /// The idle time before sending TCP keepalive probes for gRPC connections. The
99 : /// interval and timeout between each probe is configured via sysctl. This
100 : /// allows detecting dead connections sooner.
101 : const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
102 :
103 : /// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
104 : /// algorithm, which can cause latency spikes for small messages.
105 : const GRPC_TCP_NODELAY: bool = true;
106 :
107 : /// The interval between HTTP2 keepalive pings. This allows shutting down server
108 : /// tasks when clients are unresponsive.
109 : const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
110 :
111 : /// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
112 : const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
113 :
114 : /// Number of concurrent gRPC streams per TCP connection. We expect something
115 : /// like 8 GetPage streams per connections, plus any unary requests.
116 : const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
117 :
118 : ///////////////////////////////////////////////////////////////////////////////
119 :
120 : pub struct Listener {
121 : cancel: CancellationToken,
122 : /// Cancel the listener task through `listen_cancel` to shut down the listener
123 : /// and get a handle on the existing connections.
124 : task: JoinHandle<Connections>,
125 : }
126 :
127 : pub struct Connections {
128 : cancel: CancellationToken,
129 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
130 : gate: Gate,
131 : }
132 :
133 0 : pub fn spawn(
134 0 : conf: &'static PageServerConf,
135 0 : tenant_manager: Arc<TenantManager>,
136 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
137 0 : perf_trace_dispatch: Option<Dispatch>,
138 0 : tcp_listener: tokio::net::TcpListener,
139 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
140 0 : basebackup_cache: Arc<BasebackupCache>,
141 0 : ) -> Listener {
142 0 : let cancel = CancellationToken::new();
143 0 : let libpq_ctx = RequestContext::todo_child(
144 0 : TaskKind::LibpqEndpointListener,
145 0 : // listener task shouldn't need to download anything. (We will
146 0 : // create a separate sub-contexts for each connection, with their
147 0 : // own download behavior. This context is used only to listen and
148 0 : // accept connections.)
149 0 : DownloadBehavior::Error,
150 0 : );
151 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
152 0 : "libpq listener",
153 0 : libpq_listener_main(
154 0 : conf,
155 0 : tenant_manager,
156 0 : pg_auth,
157 0 : perf_trace_dispatch,
158 0 : tcp_listener,
159 0 : conf.pg_auth_type,
160 0 : tls_config,
161 0 : conf.page_service_pipelining.clone(),
162 0 : basebackup_cache,
163 0 : libpq_ctx,
164 0 : cancel.clone(),
165 0 : )
166 0 : .map(anyhow::Ok),
167 0 : ));
168 0 :
169 0 : Listener { cancel, task }
170 0 : }
171 :
172 : impl Listener {
173 0 : pub async fn stop_accepting(self) -> Connections {
174 0 : self.cancel.cancel();
175 0 : self.task
176 0 : .await
177 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
178 0 : }
179 : }
180 : impl Connections {
181 0 : pub(crate) async fn shutdown(self) {
182 0 : let Self {
183 0 : cancel,
184 0 : mut tasks,
185 0 : gate,
186 0 : } = self;
187 0 : cancel.cancel();
188 0 : while let Some(res) = tasks.join_next().await {
189 0 : Self::handle_connection_completion(res);
190 0 : }
191 0 : gate.close().await;
192 0 : }
193 :
194 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
195 0 : match res {
196 0 : Ok(Ok(())) => {}
197 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
198 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
199 : }
200 0 : }
201 : }
202 :
203 : ///
204 : /// Main loop of the page service.
205 : ///
206 : /// Listens for connections, and launches a new handler task for each.
207 : ///
208 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
209 : /// open connections.
210 : ///
211 : #[allow(clippy::too_many_arguments)]
212 0 : pub async fn libpq_listener_main(
213 0 : conf: &'static PageServerConf,
214 0 : tenant_manager: Arc<TenantManager>,
215 0 : auth: Option<Arc<SwappableJwtAuth>>,
216 0 : perf_trace_dispatch: Option<Dispatch>,
217 0 : listener: tokio::net::TcpListener,
218 0 : auth_type: AuthType,
219 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
220 0 : pipelining_config: PageServicePipeliningConfig,
221 0 : basebackup_cache: Arc<BasebackupCache>,
222 0 : listener_ctx: RequestContext,
223 0 : listener_cancel: CancellationToken,
224 0 : ) -> Connections {
225 0 : let connections_cancel = CancellationToken::new();
226 0 : let connections_gate = Gate::default();
227 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
228 :
229 : loop {
230 0 : let gate_guard = match connections_gate.enter() {
231 0 : Ok(guard) => guard,
232 0 : Err(_) => break,
233 : };
234 :
235 0 : let accepted = tokio::select! {
236 : biased;
237 0 : _ = listener_cancel.cancelled() => break,
238 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
239 0 : let res = next.expect("we dont poll while empty");
240 0 : Connections::handle_connection_completion(res);
241 0 : continue;
242 : }
243 0 : accepted = listener.accept() => accepted,
244 0 : };
245 0 :
246 0 : match accepted {
247 0 : Ok((socket, peer_addr)) => {
248 0 : // Connection established. Spawn a new task to handle it.
249 0 : debug!("accepted connection from {}", peer_addr);
250 0 : let local_auth = auth.clone();
251 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
252 0 : .task_kind(TaskKind::PageRequestHandler)
253 0 : .download_behavior(DownloadBehavior::Download)
254 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
255 0 : .detached_child();
256 0 :
257 0 : connection_handler_tasks.spawn(page_service_conn_main(
258 0 : conf,
259 0 : tenant_manager.clone(),
260 0 : local_auth,
261 0 : socket,
262 0 : auth_type,
263 0 : tls_config.clone(),
264 0 : pipelining_config.clone(),
265 0 : Arc::clone(&basebackup_cache),
266 0 : connection_ctx,
267 0 : connections_cancel.child_token(),
268 0 : gate_guard,
269 0 : ));
270 : }
271 0 : Err(err) => {
272 0 : // accept() failed. Log the error, and loop back to retry on next connection.
273 0 : error!("accept() failed: {:?}", err);
274 : }
275 : }
276 : }
277 :
278 0 : debug!("page_service listener loop terminated");
279 :
280 0 : Connections {
281 0 : cancel: connections_cancel,
282 0 : tasks: connection_handler_tasks,
283 0 : gate: connections_gate,
284 0 : }
285 0 : }
286 :
287 : type ConnectionHandlerResult = anyhow::Result<()>;
288 :
289 : /// Perf root spans start at the per-request level, after shard routing.
290 : /// This struct carries connection-level information to the root perf span definition.
291 : #[derive(Clone, Default)]
292 : struct ConnectionPerfSpanFields {
293 : peer_addr: String,
294 : application_name: Option<String>,
295 : compute_mode: Option<String>,
296 : }
297 :
298 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
299 : #[allow(clippy::too_many_arguments)]
300 : async fn page_service_conn_main(
301 : conf: &'static PageServerConf,
302 : tenant_manager: Arc<TenantManager>,
303 : auth: Option<Arc<SwappableJwtAuth>>,
304 : socket: tokio::net::TcpStream,
305 : auth_type: AuthType,
306 : tls_config: Option<Arc<rustls::ServerConfig>>,
307 : pipelining_config: PageServicePipeliningConfig,
308 : basebackup_cache: Arc<BasebackupCache>,
309 : connection_ctx: RequestContext,
310 : cancel: CancellationToken,
311 : gate_guard: GateGuard,
312 : ) -> ConnectionHandlerResult {
313 : let _guard = LIVE_CONNECTIONS
314 : .with_label_values(&["page_service"])
315 : .guard();
316 :
317 : socket
318 : .set_nodelay(true)
319 : .context("could not set TCP_NODELAY")?;
320 :
321 : let socket_fd = socket.as_raw_fd();
322 :
323 : let peer_addr = socket.peer_addr().context("get peer address")?;
324 :
325 : let perf_span_fields = ConnectionPerfSpanFields {
326 : peer_addr: peer_addr.to_string(),
327 : application_name: None, // filled in later
328 : compute_mode: None, // filled in later
329 : };
330 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
331 :
332 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
333 : // - long enough for most valid compute connections
334 : // - less than infinite to stop us from "leaking" connections to long-gone computes
335 : //
336 : // no write timeout is used, because the kernel is assumed to error writes after some time.
337 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
338 :
339 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
340 0 : let socket_timeout_ms = (|| {
341 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
342 : // Exponential distribution for simulating
343 : // poor network conditions, expect about avg_timeout_ms to be around 15
344 : // in tests
345 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
346 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
347 0 : let u = rand::random::<f32>();
348 0 : ((1.0 - u).ln() / (-avg)) as u64
349 : } else {
350 0 : default_timeout_ms
351 : }
352 0 : });
353 0 : default_timeout_ms
354 : })();
355 :
356 : // A timeout here does not mean the client died, it can happen if it's just idle for
357 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
358 : // they reconnect.
359 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
360 : let socket = Box::pin(socket);
361 :
362 : fail::fail_point!("ps::connection-start::pre-login");
363 :
364 : // XXX: pgbackend.run() should take the connection_ctx,
365 : // and create a child per-query context when it invokes process_query.
366 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
367 : // and create the per-query context in process_query ourselves.
368 : let mut conn_handler = PageServerHandler::new(
369 : tenant_manager,
370 : auth,
371 : pipelining_config,
372 : conf.get_vectored_concurrent_io,
373 : perf_span_fields,
374 : basebackup_cache,
375 : connection_ctx,
376 : cancel.clone(),
377 : gate_guard,
378 : );
379 : let pgbackend =
380 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
381 :
382 : match pgbackend.run(&mut conn_handler, &cancel).await {
383 : Ok(()) => {
384 : // we've been requested to shut down
385 : Ok(())
386 : }
387 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
388 : if is_expected_io_error(&io_error) {
389 : info!("Postgres client disconnected ({io_error})");
390 : Ok(())
391 : } else {
392 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
393 : Err(io_error).context(format!(
394 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
395 : tenant_id, peer_addr
396 : ))
397 : }
398 : }
399 : other => {
400 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
401 : other.context(format!(
402 : "Postgres query error for tenant_id={:?} client peer_addr={}",
403 : tenant_id, peer_addr
404 : ))
405 : }
406 : }
407 : }
408 :
409 : /// Page service connection handler.
410 : struct PageServerHandler {
411 : auth: Option<Arc<SwappableJwtAuth>>,
412 : claims: Option<Claims>,
413 :
414 : /// The context created for the lifetime of the connection
415 : /// services by this PageServerHandler.
416 : /// For each query received over the connection,
417 : /// `process_query` creates a child context from this one.
418 : connection_ctx: RequestContext,
419 :
420 : perf_span_fields: ConnectionPerfSpanFields,
421 :
422 : cancel: CancellationToken,
423 :
424 : /// None only while pagestream protocol is being processed.
425 : timeline_handles: Option<TimelineHandles>,
426 :
427 : pipelining_config: PageServicePipeliningConfig,
428 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
429 :
430 : basebackup_cache: Arc<BasebackupCache>,
431 :
432 : gate_guard: GateGuard,
433 : }
434 :
435 : struct TimelineHandles {
436 : wrapper: TenantManagerWrapper,
437 : /// Note on size: the typical size of this map is 1. The largest size we expect
438 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
439 : /// or the ratio used when splitting shards (i.e. how many children created from one)
440 : /// parent shard, where a "large" number might be ~8.
441 : handles: timeline::handle::Cache<TenantManagerTypes>,
442 : }
443 :
444 : impl TimelineHandles {
445 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
446 0 : Self {
447 0 : wrapper: TenantManagerWrapper {
448 0 : tenant_manager,
449 0 : tenant_id: OnceCell::new(),
450 0 : },
451 0 : handles: Default::default(),
452 0 : }
453 0 : }
454 0 : async fn get(
455 0 : &mut self,
456 0 : tenant_id: TenantId,
457 0 : timeline_id: TimelineId,
458 0 : shard_selector: ShardSelector,
459 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
460 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
461 0 : return Err(GetActiveTimelineError::Tenant(
462 0 : GetActiveTenantError::SwitchedTenant,
463 0 : ));
464 0 : }
465 0 : self.handles
466 0 : .get(timeline_id, shard_selector, &self.wrapper)
467 0 : .await
468 0 : .map_err(|e| match e {
469 0 : timeline::handle::GetError::TenantManager(e) => e,
470 : timeline::handle::GetError::PerTimelineStateShutDown => {
471 0 : trace!("per-timeline state shut down");
472 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
473 : }
474 0 : })
475 0 : }
476 :
477 0 : fn tenant_id(&self) -> Option<TenantId> {
478 0 : self.wrapper.tenant_id.get().copied()
479 0 : }
480 : }
481 :
482 : pub(crate) struct TenantManagerWrapper {
483 : tenant_manager: Arc<TenantManager>,
484 : // We do not support switching tenant_id on a connection at this point.
485 : // We can can add support for this later if needed without changing
486 : // the protocol.
487 : tenant_id: once_cell::sync::OnceCell<TenantId>,
488 : }
489 :
490 : #[derive(Debug)]
491 : pub(crate) struct TenantManagerTypes;
492 :
493 : impl timeline::handle::Types for TenantManagerTypes {
494 : type TenantManagerError = GetActiveTimelineError;
495 : type TenantManager = TenantManagerWrapper;
496 : type Timeline = TenantManagerCacheItem;
497 : }
498 :
499 : pub(crate) struct TenantManagerCacheItem {
500 : pub(crate) timeline: Arc<Timeline>,
501 : // allow() for cheap propagation through RequestContext inside a task
502 : #[allow(clippy::redundant_allocation)]
503 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
504 : #[allow(dead_code)] // we store it to keep the gate open
505 : pub(crate) gate_guard: GateGuard,
506 : }
507 :
508 : impl std::ops::Deref for TenantManagerCacheItem {
509 : type Target = Arc<Timeline>;
510 0 : fn deref(&self) -> &Self::Target {
511 0 : &self.timeline
512 0 : }
513 : }
514 :
515 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
516 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
517 0 : Timeline::shard_timeline_id(&self.timeline)
518 0 : }
519 :
520 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
521 0 : &self.timeline.handles
522 0 : }
523 :
524 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
525 0 : Timeline::get_shard_identity(&self.timeline)
526 0 : }
527 : }
528 :
529 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
530 0 : async fn resolve(
531 0 : &self,
532 0 : timeline_id: TimelineId,
533 0 : shard_selector: ShardSelector,
534 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
535 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
536 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
537 0 : let wait_start = Instant::now();
538 0 : let deadline = wait_start + timeout;
539 0 : let tenant_shard = loop {
540 0 : let resolved = self
541 0 : .tenant_manager
542 0 : .resolve_attached_shard(tenant_id, shard_selector);
543 0 : match resolved {
544 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
545 : ShardResolveResult::NotFound => {
546 0 : return Err(GetActiveTimelineError::Tenant(
547 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
548 0 : ));
549 : }
550 0 : ShardResolveResult::InProgress(barrier) => {
551 0 : // We can't authoritatively answer right now: wait for InProgress state
552 0 : // to end, then try again
553 0 : tokio::select! {
554 0 : _ = barrier.wait() => {
555 0 : // The barrier completed: proceed around the loop to try looking up again
556 0 : },
557 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
558 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
559 0 : latest_state: None,
560 0 : wait_time: timeout,
561 0 : }));
562 : }
563 : }
564 : }
565 : };
566 : };
567 :
568 0 : tracing::debug!("Waiting for tenant to enter active state...");
569 0 : tenant_shard
570 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
571 0 : .await
572 0 : .map_err(GetActiveTimelineError::Tenant)?;
573 :
574 0 : let timeline = tenant_shard
575 0 : .get_timeline(timeline_id, true)
576 0 : .map_err(GetActiveTimelineError::Timeline)?;
577 :
578 0 : let gate_guard = match timeline.gate.enter() {
579 0 : Ok(guard) => guard,
580 : Err(_) => {
581 0 : return Err(GetActiveTimelineError::Timeline(
582 0 : GetTimelineError::ShuttingDown,
583 0 : ));
584 : }
585 : };
586 :
587 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
588 0 :
589 0 : Ok(TenantManagerCacheItem {
590 0 : timeline,
591 0 : metrics,
592 0 : gate_guard,
593 0 : })
594 0 : }
595 : }
596 :
597 : #[derive(thiserror::Error, Debug)]
598 : enum PageStreamError {
599 : /// We encountered an error that should prompt the client to reconnect:
600 : /// in practice this means we drop the connection without sending a response.
601 : #[error("Reconnect required: {0}")]
602 : Reconnect(Cow<'static, str>),
603 :
604 : /// We were instructed to shutdown while processing the query
605 : #[error("Shutting down")]
606 : Shutdown,
607 :
608 : /// Something went wrong reading a page: this likely indicates a pageserver bug
609 : #[error("Read error")]
610 : Read(#[source] PageReconstructError),
611 :
612 : /// Ran out of time waiting for an LSN
613 : #[error("LSN timeout: {0}")]
614 : LsnTimeout(WaitLsnError),
615 :
616 : /// The entity required to serve the request (tenant or timeline) is not found,
617 : /// or is not found in a suitable state to serve a request.
618 : #[error("Not found: {0}")]
619 : NotFound(Cow<'static, str>),
620 :
621 : /// Request asked for something that doesn't make sense, like an invalid LSN
622 : #[error("Bad request: {0}")]
623 : BadRequest(Cow<'static, str>),
624 : }
625 :
626 : impl PageStreamError {
627 : /// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
628 : /// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
629 : /// convenience method for use from a get_pages gRPC stream.
630 : #[allow(clippy::result_large_err)]
631 0 : fn into_get_page_response(
632 0 : self,
633 0 : request_id: page_api::RequestID,
634 0 : ) -> Result<proto::GetPageResponse, tonic::Status> {
635 : use page_api::GetPageStatusCode;
636 : use tonic::Code;
637 :
638 : // We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
639 0 : let status: tonic::Status = self.into();
640 0 : let status_code = match status.code() {
641 : // We shouldn't see an OK status here, because we're emitting an error.
642 : Code::Ok => {
643 0 : debug_assert_ne!(status.code(), Code::Ok);
644 0 : return Err(tonic::Status::internal(format!(
645 0 : "unexpected OK status: {status:?}",
646 0 : )));
647 : }
648 :
649 : // These are per-request errors, returned as GetPageResponses.
650 0 : Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
651 0 : Code::DataLoss => GetPageStatusCode::InternalError,
652 0 : Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
653 0 : Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
654 0 : Code::Internal => GetPageStatusCode::InternalError,
655 0 : Code::NotFound => GetPageStatusCode::NotFound,
656 0 : Code::OutOfRange => GetPageStatusCode::InvalidRequest,
657 0 : Code::ResourceExhausted => GetPageStatusCode::SlowDown,
658 :
659 : // These should terminate the stream.
660 0 : Code::Aborted => return Err(status),
661 0 : Code::Cancelled => return Err(status),
662 0 : Code::DeadlineExceeded => return Err(status),
663 0 : Code::PermissionDenied => return Err(status),
664 0 : Code::Unauthenticated => return Err(status),
665 0 : Code::Unavailable => return Err(status),
666 0 : Code::Unimplemented => return Err(status),
667 0 : Code::Unknown => return Err(status),
668 : };
669 :
670 0 : Ok(page_api::GetPageResponse {
671 0 : request_id,
672 0 : status_code,
673 0 : reason: Some(status.message().to_string()),
674 0 : page_images: Vec::new(),
675 0 : }
676 0 : .into())
677 0 : }
678 : }
679 :
680 : impl From<PageStreamError> for tonic::Status {
681 0 : fn from(err: PageStreamError) -> Self {
682 : use tonic::Code;
683 0 : let message = err.to_string();
684 0 : let code = match err {
685 0 : PageStreamError::Reconnect(_) => Code::Unavailable,
686 0 : PageStreamError::Shutdown => Code::Unavailable,
687 0 : PageStreamError::Read(err) => match err {
688 0 : PageReconstructError::Cancelled => Code::Unavailable,
689 0 : PageReconstructError::MissingKey(_) => Code::NotFound,
690 0 : PageReconstructError::AncestorLsnTimeout(err) => tonic::Status::from(err).code(),
691 0 : PageReconstructError::Other(_) => Code::Internal,
692 0 : PageReconstructError::WalRedo(_) => Code::Internal,
693 : },
694 0 : PageStreamError::LsnTimeout(err) => tonic::Status::from(err).code(),
695 0 : PageStreamError::NotFound(_) => Code::NotFound,
696 0 : PageStreamError::BadRequest(_) => Code::InvalidArgument,
697 : };
698 0 : tonic::Status::new(code, message)
699 0 : }
700 : }
701 :
702 : impl From<PageReconstructError> for PageStreamError {
703 0 : fn from(value: PageReconstructError) -> Self {
704 0 : match value {
705 0 : PageReconstructError::Cancelled => Self::Shutdown,
706 0 : e => Self::Read(e),
707 : }
708 0 : }
709 : }
710 :
711 : impl From<GetActiveTimelineError> for PageStreamError {
712 0 : fn from(value: GetActiveTimelineError) -> Self {
713 0 : match value {
714 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
715 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
716 : TenantState::Stopping { .. },
717 : ))
718 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
719 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
720 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
721 : }
722 0 : }
723 : }
724 :
725 : impl From<WaitLsnError> for PageStreamError {
726 0 : fn from(value: WaitLsnError) -> Self {
727 0 : match value {
728 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
729 0 : WaitLsnError::Shutdown => Self::Shutdown,
730 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
731 : }
732 0 : }
733 : }
734 :
735 : impl From<WaitLsnError> for QueryError {
736 0 : fn from(value: WaitLsnError) -> Self {
737 0 : match value {
738 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
739 0 : WaitLsnError::Shutdown => Self::Shutdown,
740 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
741 : }
742 0 : }
743 : }
744 :
745 : #[derive(thiserror::Error, Debug)]
746 : struct BatchedPageStreamError {
747 : req: PagestreamRequest,
748 : err: PageStreamError,
749 : }
750 :
751 : impl std::fmt::Display for BatchedPageStreamError {
752 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 0 : self.err.fmt(f)
754 0 : }
755 : }
756 :
757 : struct BatchedGetPageRequest {
758 : req: PagestreamGetPageRequest,
759 : timer: SmgrOpTimer,
760 : lsn_range: LsnRange,
761 : ctx: RequestContext,
762 : // If the request is perf enabled, this contains a context
763 : // with a perf span tracking the time spent waiting for the executor.
764 : batch_wait_ctx: Option<RequestContext>,
765 : }
766 :
767 : #[cfg(feature = "testing")]
768 : struct BatchedTestRequest {
769 : req: models::PagestreamTestRequest,
770 : timer: SmgrOpTimer,
771 : }
772 :
773 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
774 : /// so that we don't keep the [`Timeline::gate`] open while the batch
775 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
776 : #[derive(IntoStaticStr)]
777 : #[allow(clippy::large_enum_variant)]
778 : enum BatchedFeMessage {
779 : Exists {
780 : span: Span,
781 : timer: SmgrOpTimer,
782 : shard: WeakHandle<TenantManagerTypes>,
783 : req: models::PagestreamExistsRequest,
784 : },
785 : Nblocks {
786 : span: Span,
787 : timer: SmgrOpTimer,
788 : shard: WeakHandle<TenantManagerTypes>,
789 : req: models::PagestreamNblocksRequest,
790 : },
791 : GetPage {
792 : span: Span,
793 : shard: WeakHandle<TenantManagerTypes>,
794 : pages: SmallVec<[BatchedGetPageRequest; 1]>,
795 : batch_break_reason: GetPageBatchBreakReason,
796 : },
797 : DbSize {
798 : span: Span,
799 : timer: SmgrOpTimer,
800 : shard: WeakHandle<TenantManagerTypes>,
801 : req: models::PagestreamDbSizeRequest,
802 : },
803 : GetSlruSegment {
804 : span: Span,
805 : timer: SmgrOpTimer,
806 : shard: WeakHandle<TenantManagerTypes>,
807 : req: models::PagestreamGetSlruSegmentRequest,
808 : },
809 : #[cfg(feature = "testing")]
810 : Test {
811 : span: Span,
812 : shard: WeakHandle<TenantManagerTypes>,
813 : requests: Vec<BatchedTestRequest>,
814 : },
815 : RespondError {
816 : span: Span,
817 : error: BatchedPageStreamError,
818 : },
819 : }
820 :
821 : impl BatchedFeMessage {
822 0 : fn as_static_str(&self) -> &'static str {
823 0 : self.into()
824 0 : }
825 :
826 0 : fn observe_execution_start(&mut self, at: Instant) {
827 0 : match self {
828 0 : BatchedFeMessage::Exists { timer, .. }
829 0 : | BatchedFeMessage::Nblocks { timer, .. }
830 0 : | BatchedFeMessage::DbSize { timer, .. }
831 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
832 0 : timer.observe_execution_start(at);
833 0 : }
834 0 : BatchedFeMessage::GetPage { pages, .. } => {
835 0 : for page in pages {
836 0 : page.timer.observe_execution_start(at);
837 0 : }
838 : }
839 : #[cfg(feature = "testing")]
840 0 : BatchedFeMessage::Test { requests, .. } => {
841 0 : for req in requests {
842 0 : req.timer.observe_execution_start(at);
843 0 : }
844 : }
845 0 : BatchedFeMessage::RespondError { .. } => {}
846 : }
847 0 : }
848 :
849 0 : fn should_break_batch(
850 0 : &self,
851 0 : other: &BatchedFeMessage,
852 0 : max_batch_size: NonZeroUsize,
853 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
854 0 : ) -> Option<GetPageBatchBreakReason> {
855 0 : match (self, other) {
856 : (
857 : BatchedFeMessage::GetPage {
858 0 : shard: accum_shard,
859 0 : pages: accum_pages,
860 0 : ..
861 0 : },
862 0 : BatchedFeMessage::GetPage {
863 0 : shard: this_shard,
864 0 : pages: this_pages,
865 0 : ..
866 0 : },
867 0 : ) => {
868 0 : assert_eq!(this_pages.len(), 1);
869 0 : if accum_pages.len() >= max_batch_size.get() {
870 0 : trace!(%max_batch_size, "stopping batching because of batch size");
871 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
872 :
873 0 : return Some(GetPageBatchBreakReason::BatchFull);
874 0 : }
875 0 : if !accum_shard.is_same_handle_as(this_shard) {
876 0 : trace!("stopping batching because timeline object mismatch");
877 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
878 : // But the current logic for keeping responses in order does not support that.
879 :
880 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
881 0 : }
882 0 :
883 0 : match batching_strategy {
884 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
885 0 : if let Some(last_in_batch) = accum_pages.last() {
886 0 : if last_in_batch.lsn_range.effective_lsn
887 0 : != this_pages[0].lsn_range.effective_lsn
888 : {
889 0 : trace!(
890 : accum_lsn = %last_in_batch.lsn_range.effective_lsn,
891 0 : this_lsn = %this_pages[0].lsn_range.effective_lsn,
892 0 : "stopping batching because LSN changed"
893 : );
894 :
895 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
896 0 : }
897 0 : }
898 : }
899 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
900 : // The read path doesn't curently support serving the same page at different LSNs.
901 : // While technically possible, it's uncertain if the complexity is worth it.
902 : // Break the batch if such a case is encountered.
903 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
904 0 : batched.req.rel == this_pages[0].req.rel
905 0 : && batched.req.blkno == this_pages[0].req.blkno
906 0 : && batched.lsn_range.effective_lsn
907 0 : != this_pages[0].lsn_range.effective_lsn
908 0 : });
909 0 :
910 0 : if same_page_different_lsn {
911 0 : trace!(
912 0 : rel=%this_pages[0].req.rel,
913 0 : blkno=%this_pages[0].req.blkno,
914 0 : lsn=%this_pages[0].lsn_range.effective_lsn,
915 0 : "stopping batching because same page was requested at different LSNs"
916 : );
917 :
918 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
919 0 : }
920 : }
921 : }
922 :
923 0 : None
924 : }
925 : #[cfg(feature = "testing")]
926 : (
927 : BatchedFeMessage::Test {
928 0 : shard: accum_shard,
929 0 : requests: accum_requests,
930 0 : ..
931 0 : },
932 0 : BatchedFeMessage::Test {
933 0 : shard: this_shard,
934 0 : requests: this_requests,
935 0 : ..
936 0 : },
937 0 : ) => {
938 0 : assert!(this_requests.len() == 1);
939 0 : if accum_requests.len() >= max_batch_size.get() {
940 0 : trace!(%max_batch_size, "stopping batching because of batch size");
941 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
942 0 : return Some(GetPageBatchBreakReason::BatchFull);
943 0 : }
944 0 : if !accum_shard.is_same_handle_as(this_shard) {
945 0 : trace!("stopping batching because timeline object mismatch");
946 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
947 : // But the current logic for keeping responses in order does not support that.
948 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
949 0 : }
950 0 : let this_batch_key = this_requests[0].req.batch_key;
951 0 : let accum_batch_key = accum_requests[0].req.batch_key;
952 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
953 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
954 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
955 0 : }
956 0 : None
957 : }
958 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
959 : }
960 0 : }
961 : }
962 :
963 : impl PageServerHandler {
964 : #[allow(clippy::too_many_arguments)]
965 0 : pub fn new(
966 0 : tenant_manager: Arc<TenantManager>,
967 0 : auth: Option<Arc<SwappableJwtAuth>>,
968 0 : pipelining_config: PageServicePipeliningConfig,
969 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
970 0 : perf_span_fields: ConnectionPerfSpanFields,
971 0 : basebackup_cache: Arc<BasebackupCache>,
972 0 : connection_ctx: RequestContext,
973 0 : cancel: CancellationToken,
974 0 : gate_guard: GateGuard,
975 0 : ) -> Self {
976 0 : PageServerHandler {
977 0 : auth,
978 0 : claims: None,
979 0 : connection_ctx,
980 0 : perf_span_fields,
981 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
982 0 : cancel,
983 0 : pipelining_config,
984 0 : get_vectored_concurrent_io,
985 0 : basebackup_cache,
986 0 : gate_guard,
987 0 : }
988 0 : }
989 :
990 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
991 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
992 : /// cancellation if there aren't any timelines in the cache.
993 : ///
994 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
995 : /// timeline cancellation token.
996 0 : async fn flush_cancellable<IO>(
997 0 : &self,
998 0 : pgb: &mut PostgresBackend<IO>,
999 0 : cancel: &CancellationToken,
1000 0 : ) -> Result<(), QueryError>
1001 0 : where
1002 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1003 0 : {
1004 0 : tokio::select!(
1005 0 : flush_r = pgb.flush() => {
1006 0 : Ok(flush_r?)
1007 : },
1008 0 : _ = cancel.cancelled() => {
1009 0 : Err(QueryError::Shutdown)
1010 : }
1011 : )
1012 0 : }
1013 :
1014 : #[allow(clippy::too_many_arguments)]
1015 0 : async fn pagestream_read_message<IO>(
1016 0 : pgb: &mut PostgresBackendReader<IO>,
1017 0 : tenant_id: TenantId,
1018 0 : timeline_id: TimelineId,
1019 0 : timeline_handles: &mut TimelineHandles,
1020 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
1021 0 : cancel: &CancellationToken,
1022 0 : ctx: &RequestContext,
1023 0 : protocol_version: PagestreamProtocolVersion,
1024 0 : parent_span: Span,
1025 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
1026 0 : where
1027 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1028 0 : {
1029 0 : let msg = tokio::select! {
1030 : biased;
1031 0 : _ = cancel.cancelled() => {
1032 0 : return Err(QueryError::Shutdown)
1033 : }
1034 0 : msg = pgb.read_message() => { msg }
1035 0 : };
1036 0 :
1037 0 : let received_at = Instant::now();
1038 :
1039 0 : let copy_data_bytes = match msg? {
1040 0 : Some(FeMessage::CopyData(bytes)) => bytes,
1041 : Some(FeMessage::Terminate) => {
1042 0 : return Ok(None);
1043 : }
1044 0 : Some(m) => {
1045 0 : return Err(QueryError::Other(anyhow::anyhow!(
1046 0 : "unexpected message: {m:?} during COPY"
1047 0 : )));
1048 : }
1049 : None => {
1050 0 : return Ok(None);
1051 : } // client disconnected
1052 : };
1053 0 : trace!("query: {copy_data_bytes:?}");
1054 :
1055 0 : fail::fail_point!("ps::handle-pagerequest-message");
1056 :
1057 : // parse request
1058 0 : let neon_fe_msg =
1059 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
1060 :
1061 0 : let batched_msg = match neon_fe_msg {
1062 0 : PagestreamFeMessage::Exists(req) => {
1063 0 : let shard = timeline_handles
1064 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1065 0 : .await?;
1066 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1067 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1068 0 : let timer = Self::record_op_start_and_throttle(
1069 0 : &shard,
1070 0 : metrics::SmgrQueryType::GetRelExists,
1071 0 : received_at,
1072 0 : )
1073 0 : .await?;
1074 0 : BatchedFeMessage::Exists {
1075 0 : span,
1076 0 : timer,
1077 0 : shard: shard.downgrade(),
1078 0 : req,
1079 0 : }
1080 : }
1081 0 : PagestreamFeMessage::Nblocks(req) => {
1082 0 : let shard = timeline_handles
1083 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1084 0 : .await?;
1085 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1086 0 : let timer = Self::record_op_start_and_throttle(
1087 0 : &shard,
1088 0 : metrics::SmgrQueryType::GetRelSize,
1089 0 : received_at,
1090 0 : )
1091 0 : .await?;
1092 0 : BatchedFeMessage::Nblocks {
1093 0 : span,
1094 0 : timer,
1095 0 : shard: shard.downgrade(),
1096 0 : req,
1097 0 : }
1098 : }
1099 0 : PagestreamFeMessage::DbSize(req) => {
1100 0 : let shard = timeline_handles
1101 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1102 0 : .await?;
1103 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1104 0 : let timer = Self::record_op_start_and_throttle(
1105 0 : &shard,
1106 0 : metrics::SmgrQueryType::GetDbSize,
1107 0 : received_at,
1108 0 : )
1109 0 : .await?;
1110 0 : BatchedFeMessage::DbSize {
1111 0 : span,
1112 0 : timer,
1113 0 : shard: shard.downgrade(),
1114 0 : req,
1115 0 : }
1116 : }
1117 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1118 0 : let shard = timeline_handles
1119 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1120 0 : .await?;
1121 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1122 0 : let timer = Self::record_op_start_and_throttle(
1123 0 : &shard,
1124 0 : metrics::SmgrQueryType::GetSlruSegment,
1125 0 : received_at,
1126 0 : )
1127 0 : .await?;
1128 0 : BatchedFeMessage::GetSlruSegment {
1129 0 : span,
1130 0 : timer,
1131 0 : shard: shard.downgrade(),
1132 0 : req,
1133 0 : }
1134 : }
1135 0 : PagestreamFeMessage::GetPage(req) => {
1136 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1137 : macro_rules! mkspan {
1138 : (before shard routing) => {{
1139 : tracing::info_span!(
1140 : parent: &parent_span,
1141 : "handle_get_page_request",
1142 : request_id = %req.hdr.reqid,
1143 : rel = %req.rel,
1144 : blkno = %req.blkno,
1145 : req_lsn = %req.hdr.request_lsn,
1146 : not_modified_since_lsn = %req.hdr.not_modified_since,
1147 : )
1148 : }};
1149 : ($shard_id:expr) => {{
1150 : tracing::info_span!(
1151 : parent: &parent_span,
1152 : "handle_get_page_request",
1153 : request_id = %req.hdr.reqid,
1154 : rel = %req.rel,
1155 : blkno = %req.blkno,
1156 : req_lsn = %req.hdr.request_lsn,
1157 : not_modified_since_lsn = %req.hdr.not_modified_since,
1158 : shard_id = %$shard_id,
1159 : )
1160 : }};
1161 : }
1162 :
1163 : macro_rules! respond_error {
1164 : ($span:expr, $error:expr) => {{
1165 : let error = BatchedFeMessage::RespondError {
1166 : span: $span,
1167 : error: BatchedPageStreamError {
1168 : req: req.hdr,
1169 : err: $error,
1170 : },
1171 : };
1172 : Ok(Some(error))
1173 : }};
1174 : }
1175 :
1176 0 : let key = rel_block_to_key(req.rel, req.blkno);
1177 :
1178 0 : let res = timeline_handles
1179 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1180 0 : .await;
1181 :
1182 0 : let shard = match res {
1183 0 : Ok(tl) => tl,
1184 0 : Err(e) => {
1185 0 : let span = mkspan!(before shard routing);
1186 0 : match e {
1187 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1188 : // We already know this tenant exists in general, because we resolved it at
1189 : // start of connection. Getting a NotFound here indicates that the shard containing
1190 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1191 : // mapping is out of date.
1192 : //
1193 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1194 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1195 : // and talk to a different pageserver.
1196 0 : return respond_error!(
1197 0 : span,
1198 0 : PageStreamError::Reconnect(
1199 0 : "getpage@lsn request routed to wrong shard".into()
1200 0 : )
1201 0 : );
1202 : }
1203 0 : e => {
1204 0 : return respond_error!(span, e.into());
1205 : }
1206 : }
1207 : }
1208 : };
1209 :
1210 0 : let ctx = if shard.is_get_page_request_sampled() {
1211 0 : RequestContextBuilder::from(ctx)
1212 0 : .root_perf_span(|| {
1213 0 : info_span!(
1214 : target: PERF_TRACE_TARGET,
1215 : "GET_PAGE",
1216 : peer_addr = conn_perf_span_fields.peer_addr,
1217 : application_name = conn_perf_span_fields.application_name,
1218 : compute_mode = conn_perf_span_fields.compute_mode,
1219 : tenant_id = %tenant_id,
1220 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1221 : timeline_id = %timeline_id,
1222 : lsn = %req.hdr.request_lsn,
1223 : not_modified_since_lsn = %req.hdr.not_modified_since,
1224 : request_id = %req.hdr.reqid,
1225 : key = %key,
1226 : )
1227 0 : })
1228 0 : .attached_child()
1229 : } else {
1230 0 : ctx.attached_child()
1231 : };
1232 :
1233 : // This ctx travels as part of the BatchedFeMessage through
1234 : // batching into the request handler.
1235 : // The request handler needs to do some per-request work
1236 : // (relsize check) before dispatching the batch as a single
1237 : // get_vectored call to the Timeline.
1238 : // This ctx will be used for the reslize check, whereas the
1239 : // get_vectored call will be a different ctx with separate
1240 : // perf span.
1241 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1242 :
1243 : // Similar game for this `span`: we funnel it through so that
1244 : // request handler log messages contain the request-specific fields.
1245 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1246 :
1247 0 : let timer = Self::record_op_start_and_throttle(
1248 0 : &shard,
1249 0 : metrics::SmgrQueryType::GetPageAtLsn,
1250 0 : received_at,
1251 0 : )
1252 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1253 0 : info_span!(
1254 : target: PERF_TRACE_TARGET,
1255 0 : parent: current_perf_span,
1256 : "THROTTLE",
1257 : )
1258 0 : })
1259 0 : .await?;
1260 :
1261 : // We're holding the Handle
1262 0 : let effective_lsn = match Self::effective_request_lsn(
1263 0 : &shard,
1264 0 : shard.get_last_record_lsn(),
1265 0 : req.hdr.request_lsn,
1266 0 : req.hdr.not_modified_since,
1267 0 : &shard.get_applied_gc_cutoff_lsn(),
1268 0 : ) {
1269 0 : Ok(lsn) => lsn,
1270 0 : Err(e) => {
1271 0 : return respond_error!(span, e);
1272 : }
1273 : };
1274 :
1275 0 : let batch_wait_ctx = if ctx.has_perf_span() {
1276 0 : Some(
1277 0 : RequestContextBuilder::from(&ctx)
1278 0 : .perf_span(|crnt_perf_span| {
1279 0 : info_span!(
1280 : target: PERF_TRACE_TARGET,
1281 0 : parent: crnt_perf_span,
1282 : "WAIT_EXECUTOR",
1283 : )
1284 0 : })
1285 0 : .attached_child(),
1286 0 : )
1287 : } else {
1288 0 : None
1289 : };
1290 :
1291 : BatchedFeMessage::GetPage {
1292 0 : span,
1293 0 : shard: shard.downgrade(),
1294 0 : pages: smallvec![BatchedGetPageRequest {
1295 0 : req,
1296 0 : timer,
1297 0 : lsn_range: LsnRange {
1298 0 : effective_lsn,
1299 0 : request_lsn: req.hdr.request_lsn
1300 0 : },
1301 0 : ctx,
1302 0 : batch_wait_ctx,
1303 0 : }],
1304 : // The executor grabs the batch when it becomes idle.
1305 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1306 : // default reason for breaking the batch.
1307 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1308 : }
1309 : }
1310 : #[cfg(feature = "testing")]
1311 0 : PagestreamFeMessage::Test(req) => {
1312 0 : let shard = timeline_handles
1313 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1314 0 : .await?;
1315 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1316 0 : let timer = Self::record_op_start_and_throttle(
1317 0 : &shard,
1318 0 : metrics::SmgrQueryType::Test,
1319 0 : received_at,
1320 0 : )
1321 0 : .await?;
1322 0 : BatchedFeMessage::Test {
1323 0 : span,
1324 0 : shard: shard.downgrade(),
1325 0 : requests: vec![BatchedTestRequest { req, timer }],
1326 0 : }
1327 : }
1328 : };
1329 0 : Ok(Some(batched_msg))
1330 0 : }
1331 :
1332 : /// Starts a SmgrOpTimer at received_at and throttles the request.
1333 0 : async fn record_op_start_and_throttle(
1334 0 : shard: &Handle<TenantManagerTypes>,
1335 0 : op: metrics::SmgrQueryType,
1336 0 : received_at: Instant,
1337 0 : ) -> Result<SmgrOpTimer, QueryError> {
1338 0 : // It's important to start the smgr op metric recorder as early as possible
1339 0 : // so that the _started counters are incremented before we do
1340 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
1341 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
1342 0 : let now = Instant::now();
1343 0 : timer.observe_throttle_start(now);
1344 0 : let throttled = tokio::select! {
1345 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
1346 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
1347 : };
1348 0 : timer.observe_throttle_done(throttled);
1349 0 : Ok(timer)
1350 0 : }
1351 :
1352 : /// Post-condition: `batch` is Some()
1353 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1354 : #[allow(clippy::boxed_local)]
1355 : fn pagestream_do_batch(
1356 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1357 : max_batch_size: NonZeroUsize,
1358 : batch: &mut Result<BatchedFeMessage, QueryError>,
1359 : this_msg: Result<BatchedFeMessage, QueryError>,
1360 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1361 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1362 :
1363 : let this_msg = match this_msg {
1364 : Ok(this_msg) => this_msg,
1365 : Err(e) => return Err(Err(e)),
1366 : };
1367 :
1368 : let eligible_batch = match batch {
1369 : Ok(b) => b,
1370 : Err(_) => {
1371 : return Err(Ok(this_msg));
1372 : }
1373 : };
1374 :
1375 : let batch_break =
1376 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1377 :
1378 : match batch_break {
1379 : Some(reason) => {
1380 : if let BatchedFeMessage::GetPage {
1381 : batch_break_reason, ..
1382 : } = eligible_batch
1383 : {
1384 : *batch_break_reason = reason;
1385 : }
1386 :
1387 : Err(Ok(this_msg))
1388 : }
1389 : None => {
1390 : // ok to batch
1391 : match (eligible_batch, this_msg) {
1392 : (
1393 : BatchedFeMessage::GetPage {
1394 : pages: accum_pages, ..
1395 : },
1396 : BatchedFeMessage::GetPage {
1397 : pages: this_pages, ..
1398 : },
1399 : ) => {
1400 : accum_pages.extend(this_pages);
1401 : Ok(())
1402 : }
1403 : #[cfg(feature = "testing")]
1404 : (
1405 : BatchedFeMessage::Test {
1406 : requests: accum_requests,
1407 : ..
1408 : },
1409 : BatchedFeMessage::Test {
1410 : requests: this_requests,
1411 : ..
1412 : },
1413 : ) => {
1414 : accum_requests.extend(this_requests);
1415 : Ok(())
1416 : }
1417 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1418 : _ => unreachable!(),
1419 : }
1420 : }
1421 : }
1422 : }
1423 :
1424 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1425 : async fn pagestream_handle_batched_message<IO>(
1426 : &mut self,
1427 : pgb_writer: &mut PostgresBackend<IO>,
1428 : batch: BatchedFeMessage,
1429 : io_concurrency: IoConcurrency,
1430 : cancel: &CancellationToken,
1431 : protocol_version: PagestreamProtocolVersion,
1432 : ctx: &RequestContext,
1433 : ) -> Result<(), QueryError>
1434 : where
1435 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1436 : {
1437 : let started_at = Instant::now();
1438 : let batch = {
1439 : let mut batch = batch;
1440 : batch.observe_execution_start(started_at);
1441 : batch
1442 : };
1443 :
1444 : // Dispatch the batch to the appropriate request handler.
1445 : let log_slow_name = batch.as_static_str();
1446 : let (mut handler_results, span) = {
1447 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1448 : // won't fit on the stack.
1449 : let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
1450 : batch,
1451 : io_concurrency,
1452 : ctx,
1453 : ));
1454 : log_slow(
1455 : log_slow_name,
1456 : LOG_SLOW_GETPAGE_THRESHOLD,
1457 : boxpinned.as_mut(),
1458 : )
1459 : .await?
1460 : };
1461 :
1462 : // We purposefully don't count flush time into the smgr operation timer.
1463 : //
1464 : // The reason is that current compute client will not perform protocol processing
1465 : // if the postgres backend process is doing things other than `->smgr_read()`.
1466 : // This is especially the case for prefetch.
1467 : //
1468 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1469 : // all the way into our flush call below.
1470 : //
1471 : // The timer's underlying metric is used for a storage-internal latency SLO and
1472 : // we don't want to include latency in it that we can't control.
1473 : // And as pointed out above, in this case, we don't control the time that flush will take.
1474 : //
1475 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1476 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1477 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1478 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1479 : let flush_timers = {
1480 : let flushing_start_time = Instant::now();
1481 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1482 : for handler_result in &mut handler_results {
1483 : let flush_timer = match handler_result {
1484 : Ok((_response, timer, _ctx)) => Some(
1485 : timer
1486 : .observe_execution_end(flushing_start_time)
1487 : .expect("we are the first caller"),
1488 : ),
1489 : Err(_) => {
1490 : // TODO: measure errors
1491 : None
1492 : }
1493 : };
1494 : flush_timers.push(flush_timer);
1495 : }
1496 : assert_eq!(flush_timers.len(), handler_results.len());
1497 : flush_timers
1498 : };
1499 :
1500 : // Map handler result to protocol behavior.
1501 : // Some handler errors cause exit from pagestream protocol.
1502 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1503 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1504 : let (response_msg, ctx) = match handler_result {
1505 : Err(e) => match &e.err {
1506 : PageStreamError::Shutdown => {
1507 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1508 : // shutdown, then do not send the error to the client. Instead just drop the
1509 : // connection.
1510 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1511 : return Err(QueryError::Shutdown);
1512 : }
1513 : PageStreamError::Reconnect(reason) => {
1514 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1515 : return Err(QueryError::Reconnect);
1516 : }
1517 : PageStreamError::Read(_)
1518 : | PageStreamError::LsnTimeout(_)
1519 : | PageStreamError::NotFound(_)
1520 : | PageStreamError::BadRequest(_) => {
1521 : // print the all details to the log with {:#}, but for the client the
1522 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1523 : // here includes cancellation which is not an error.
1524 : let full = utils::error::report_compact_sources(&e.err);
1525 0 : span.in_scope(|| {
1526 0 : error!("error reading relation or page version: {full:#}")
1527 0 : });
1528 :
1529 : (
1530 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1531 : req: e.req,
1532 : message: e.err.to_string(),
1533 : }),
1534 : None,
1535 : )
1536 : }
1537 : },
1538 : Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
1539 : };
1540 :
1541 0 : let ctx = ctx.map(|req_ctx| {
1542 0 : RequestContextBuilder::from(&req_ctx)
1543 0 : .perf_span(|crnt_perf_span| {
1544 0 : info_span!(
1545 : target: PERF_TRACE_TARGET,
1546 0 : parent: crnt_perf_span,
1547 : "FLUSH_RESPONSE",
1548 : )
1549 0 : })
1550 0 : .attached_child()
1551 0 : });
1552 :
1553 : //
1554 : // marshal & transmit response message
1555 : //
1556 :
1557 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1558 : &response_msg.serialize(protocol_version),
1559 : ))?;
1560 :
1561 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1562 :
1563 : // what we want to do
1564 : let socket_fd = pgb_writer.socket_fd;
1565 : let flush_fut = pgb_writer.flush();
1566 : // metric for how long flushing takes
1567 : let flush_fut = match flushing_timer {
1568 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1569 : Instant::now(),
1570 : flush_fut,
1571 : socket_fd,
1572 : )),
1573 : None => futures::future::Either::Right(flush_fut),
1574 : };
1575 :
1576 : let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
1577 : futures::future::Either::Left(
1578 0 : flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
1579 0 : current_perf_span.clone()
1580 0 : }),
1581 : )
1582 : } else {
1583 : futures::future::Either::Right(flush_fut)
1584 : };
1585 :
1586 : // do it while respecting cancellation
1587 0 : let _: () = async move {
1588 0 : tokio::select! {
1589 : biased;
1590 0 : _ = cancel.cancelled() => {
1591 : // We were requested to shut down.
1592 0 : info!("shutdown request received in page handler");
1593 0 : return Err(QueryError::Shutdown)
1594 : }
1595 0 : res = flush_fut => {
1596 0 : res?;
1597 : }
1598 : }
1599 0 : Ok(())
1600 0 : }
1601 : .await?;
1602 : }
1603 : Ok(())
1604 : }
1605 :
1606 : /// Helper which dispatches a batched message to the appropriate handler.
1607 : /// Returns a vec of results, along with the extracted trace span.
1608 0 : async fn pagestream_dispatch_batched_message(
1609 0 : batch: BatchedFeMessage,
1610 0 : io_concurrency: IoConcurrency,
1611 0 : ctx: &RequestContext,
1612 0 : ) -> Result<
1613 0 : (
1614 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
1615 0 : Span,
1616 0 : ),
1617 0 : QueryError,
1618 0 : > {
1619 : macro_rules! upgrade_handle_and_set_context {
1620 : ($shard:ident) => {{
1621 : let weak_handle = &$shard;
1622 : let handle = weak_handle.upgrade()?;
1623 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1624 : (handle, ctx)
1625 : }};
1626 : }
1627 0 : Ok(match batch {
1628 : BatchedFeMessage::Exists {
1629 0 : span,
1630 0 : timer,
1631 0 : shard,
1632 0 : req,
1633 0 : } => {
1634 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1635 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1636 : (
1637 0 : vec![
1638 0 : Self::handle_get_rel_exists_request(&shard, &req, &ctx)
1639 0 : .instrument(span.clone())
1640 0 : .await
1641 0 : .map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
1642 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1643 0 : ],
1644 0 : span,
1645 : )
1646 : }
1647 : BatchedFeMessage::Nblocks {
1648 0 : span,
1649 0 : timer,
1650 0 : shard,
1651 0 : req,
1652 0 : } => {
1653 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1654 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1655 : (
1656 0 : vec![
1657 0 : Self::handle_get_nblocks_request(&shard, &req, &ctx)
1658 0 : .instrument(span.clone())
1659 0 : .await
1660 0 : .map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
1661 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1662 0 : ],
1663 0 : span,
1664 : )
1665 : }
1666 : BatchedFeMessage::GetPage {
1667 0 : span,
1668 0 : shard,
1669 0 : pages,
1670 0 : batch_break_reason,
1671 0 : } => {
1672 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1673 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1674 : (
1675 : {
1676 0 : let npages = pages.len();
1677 0 : trace!(npages, "handling getpage request");
1678 0 : let res = Self::handle_get_page_at_lsn_request_batched(
1679 0 : &shard,
1680 0 : pages,
1681 0 : io_concurrency,
1682 0 : batch_break_reason,
1683 0 : &ctx,
1684 0 : )
1685 0 : .instrument(span.clone())
1686 0 : .await;
1687 0 : assert_eq!(res.len(), npages);
1688 0 : res
1689 0 : },
1690 0 : span,
1691 : )
1692 : }
1693 : BatchedFeMessage::DbSize {
1694 0 : span,
1695 0 : timer,
1696 0 : shard,
1697 0 : req,
1698 0 : } => {
1699 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1700 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1701 : (
1702 0 : vec![
1703 0 : Self::handle_db_size_request(&shard, &req, &ctx)
1704 0 : .instrument(span.clone())
1705 0 : .await
1706 0 : .map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
1707 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1708 0 : ],
1709 0 : span,
1710 : )
1711 : }
1712 : BatchedFeMessage::GetSlruSegment {
1713 0 : span,
1714 0 : timer,
1715 0 : shard,
1716 0 : req,
1717 0 : } => {
1718 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1719 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1720 : (
1721 0 : vec![
1722 0 : Self::handle_get_slru_segment_request(&shard, &req, &ctx)
1723 0 : .instrument(span.clone())
1724 0 : .await
1725 0 : .map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
1726 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1727 0 : ],
1728 0 : span,
1729 : )
1730 : }
1731 : #[cfg(feature = "testing")]
1732 : BatchedFeMessage::Test {
1733 0 : span,
1734 0 : shard,
1735 0 : requests,
1736 0 : } => {
1737 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1738 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1739 : (
1740 : {
1741 0 : let npages = requests.len();
1742 0 : trace!(npages, "handling getpage request");
1743 0 : let res = Self::handle_test_request_batch(&shard, requests, &ctx)
1744 0 : .instrument(span.clone())
1745 0 : .await;
1746 0 : assert_eq!(res.len(), npages);
1747 0 : res
1748 0 : },
1749 0 : span,
1750 : )
1751 : }
1752 0 : BatchedFeMessage::RespondError { span, error } => {
1753 0 : // We've already decided to respond with an error, so we don't need to
1754 0 : // call the handler.
1755 0 : (vec![Err(error)], span)
1756 : }
1757 : })
1758 0 : }
1759 :
1760 : /// Pagestream sub-protocol handler.
1761 : ///
1762 : /// It is a simple request-response protocol inside a COPYBOTH session.
1763 : ///
1764 : /// # Coding Discipline
1765 : ///
1766 : /// Coding discipline within this function: all interaction with the `pgb` connection
1767 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1768 : /// This is so that we can shutdown page_service quickly.
1769 : #[instrument(skip_all)]
1770 : async fn handle_pagerequests<IO>(
1771 : &mut self,
1772 : pgb: &mut PostgresBackend<IO>,
1773 : tenant_id: TenantId,
1774 : timeline_id: TimelineId,
1775 : protocol_version: PagestreamProtocolVersion,
1776 : ctx: RequestContext,
1777 : ) -> Result<(), QueryError>
1778 : where
1779 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1780 : {
1781 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1782 :
1783 : // switch client to COPYBOTH
1784 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1785 : tokio::select! {
1786 : biased;
1787 : _ = self.cancel.cancelled() => {
1788 : return Err(QueryError::Shutdown)
1789 : }
1790 : res = pgb.flush() => {
1791 : res?;
1792 : }
1793 : }
1794 :
1795 : let io_concurrency = IoConcurrency::spawn_from_conf(
1796 : self.get_vectored_concurrent_io,
1797 : match self.gate_guard.try_clone() {
1798 : Ok(guard) => guard,
1799 : Err(_) => {
1800 : info!("shutdown request received in page handler");
1801 : return Err(QueryError::Shutdown);
1802 : }
1803 : },
1804 : );
1805 :
1806 : let pgb_reader = pgb
1807 : .split()
1808 : .context("implementation error: split pgb into reader and writer")?;
1809 :
1810 : let timeline_handles = self
1811 : .timeline_handles
1812 : .take()
1813 : .expect("implementation error: timeline_handles should not be locked");
1814 :
1815 : let request_span = info_span!("request");
1816 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1817 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1818 : self.handle_pagerequests_pipelined(
1819 : pgb,
1820 : pgb_reader,
1821 : tenant_id,
1822 : timeline_id,
1823 : timeline_handles,
1824 : request_span,
1825 : pipelining_config,
1826 : protocol_version,
1827 : io_concurrency,
1828 : &ctx,
1829 : )
1830 : .await
1831 : }
1832 : PageServicePipeliningConfig::Serial => {
1833 : self.handle_pagerequests_serial(
1834 : pgb,
1835 : pgb_reader,
1836 : tenant_id,
1837 : timeline_id,
1838 : timeline_handles,
1839 : request_span,
1840 : protocol_version,
1841 : io_concurrency,
1842 : &ctx,
1843 : )
1844 : .await
1845 : }
1846 : };
1847 :
1848 : debug!("pagestream subprotocol shut down cleanly");
1849 :
1850 : pgb.unsplit(pgb_reader)
1851 : .context("implementation error: unsplit pgb")?;
1852 :
1853 : let replaced = self.timeline_handles.replace(timeline_handles);
1854 : assert!(replaced.is_none());
1855 :
1856 : result
1857 : }
1858 :
1859 : #[allow(clippy::too_many_arguments)]
1860 0 : async fn handle_pagerequests_serial<IO>(
1861 0 : &mut self,
1862 0 : pgb_writer: &mut PostgresBackend<IO>,
1863 0 : mut pgb_reader: PostgresBackendReader<IO>,
1864 0 : tenant_id: TenantId,
1865 0 : timeline_id: TimelineId,
1866 0 : mut timeline_handles: TimelineHandles,
1867 0 : request_span: Span,
1868 0 : protocol_version: PagestreamProtocolVersion,
1869 0 : io_concurrency: IoConcurrency,
1870 0 : ctx: &RequestContext,
1871 0 : ) -> (
1872 0 : (PostgresBackendReader<IO>, TimelineHandles),
1873 0 : Result<(), QueryError>,
1874 0 : )
1875 0 : where
1876 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1877 0 : {
1878 0 : let cancel = self.cancel.clone();
1879 :
1880 0 : let err = loop {
1881 0 : let msg = Self::pagestream_read_message(
1882 0 : &mut pgb_reader,
1883 0 : tenant_id,
1884 0 : timeline_id,
1885 0 : &mut timeline_handles,
1886 0 : &self.perf_span_fields,
1887 0 : &cancel,
1888 0 : ctx,
1889 0 : protocol_version,
1890 0 : request_span.clone(),
1891 0 : )
1892 0 : .await;
1893 0 : let msg = match msg {
1894 0 : Ok(msg) => msg,
1895 0 : Err(e) => break e,
1896 : };
1897 0 : let msg = match msg {
1898 0 : Some(msg) => msg,
1899 : None => {
1900 0 : debug!("pagestream subprotocol end observed");
1901 0 : return ((pgb_reader, timeline_handles), Ok(()));
1902 : }
1903 : };
1904 :
1905 0 : let result = self
1906 0 : .pagestream_handle_batched_message(
1907 0 : pgb_writer,
1908 0 : msg,
1909 0 : io_concurrency.clone(),
1910 0 : &cancel,
1911 0 : protocol_version,
1912 0 : ctx,
1913 0 : )
1914 0 : .await;
1915 0 : match result {
1916 0 : Ok(()) => {}
1917 0 : Err(e) => break e,
1918 : }
1919 : };
1920 0 : ((pgb_reader, timeline_handles), Err(err))
1921 0 : }
1922 :
1923 : /// # Cancel-Safety
1924 : ///
1925 : /// May leak tokio tasks if not polled to completion.
1926 : #[allow(clippy::too_many_arguments)]
1927 0 : async fn handle_pagerequests_pipelined<IO>(
1928 0 : &mut self,
1929 0 : pgb_writer: &mut PostgresBackend<IO>,
1930 0 : pgb_reader: PostgresBackendReader<IO>,
1931 0 : tenant_id: TenantId,
1932 0 : timeline_id: TimelineId,
1933 0 : mut timeline_handles: TimelineHandles,
1934 0 : request_span: Span,
1935 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1936 0 : protocol_version: PagestreamProtocolVersion,
1937 0 : io_concurrency: IoConcurrency,
1938 0 : ctx: &RequestContext,
1939 0 : ) -> (
1940 0 : (PostgresBackendReader<IO>, TimelineHandles),
1941 0 : Result<(), QueryError>,
1942 0 : )
1943 0 : where
1944 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1945 0 : {
1946 0 : //
1947 0 : // Pipelined pagestream handling consists of
1948 0 : // - a Batcher that reads requests off the wire and
1949 0 : // and batches them if possible,
1950 0 : // - an Executor that processes the batched requests.
1951 0 : //
1952 0 : // The batch is built up inside an `spsc_fold` channel,
1953 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1954 0 : //
1955 0 : // The Batcher continously folds client requests into the batch,
1956 0 : // while the Executor can at any time take out what's in the batch
1957 0 : // in order to process it.
1958 0 : // This means the next batch builds up while the Executor
1959 0 : // executes the last batch.
1960 0 : //
1961 0 : // CANCELLATION
1962 0 : //
1963 0 : // We run both Batcher and Executor futures to completion before
1964 0 : // returning from this function.
1965 0 : //
1966 0 : // If Executor exits first, it signals cancellation to the Batcher
1967 0 : // via a CancellationToken that is child of `self.cancel`.
1968 0 : // If Batcher exits first, it signals cancellation to the Executor
1969 0 : // by dropping the spsc_fold channel Sender.
1970 0 : //
1971 0 : // CLEAN SHUTDOWN
1972 0 : //
1973 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1974 0 : // In response to such a client message, the Batcher exits.
1975 0 : // The Executor continues to run, draining the spsc_fold channel.
1976 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1977 0 : // indicating that the sender disconnected.
1978 0 : // The Executor exits with Ok(()) in response to that error.
1979 0 : //
1980 0 : // Server initiated shutdown is not clean shutdown, but instead
1981 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1982 0 : // error propagation.
1983 0 : //
1984 0 : // ERROR PROPAGATION
1985 0 : //
1986 0 : // When the Batcher encounter an error, it sends it as a value
1987 0 : // through the spsc_fold channel and exits afterwards.
1988 0 : // When the Executor observes such an error in the channel,
1989 0 : // it exits returning that error value.
1990 0 : //
1991 0 : // This design ensures that the Executor stage will still process
1992 0 : // the batch that was in flight when the Batcher encountered an error,
1993 0 : // thereby beahving identical to a serial implementation.
1994 0 :
1995 0 : let PageServicePipeliningConfigPipelined {
1996 0 : max_batch_size,
1997 0 : execution,
1998 0 : batching: batching_strategy,
1999 0 : } = pipelining_config;
2000 :
2001 : // Macro to _define_ a pipeline stage.
2002 : macro_rules! pipeline_stage {
2003 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
2004 : let cancel: CancellationToken = $cancel;
2005 : let stage_fut = $make_fut(cancel.clone());
2006 0 : async move {
2007 0 : scopeguard::defer! {
2008 0 : debug!("exiting");
2009 0 : }
2010 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
2011 0 : .await
2012 0 : }
2013 : .instrument(tracing::info_span!($name))
2014 : }};
2015 : }
2016 :
2017 : //
2018 : // Batcher
2019 : //
2020 :
2021 0 : let perf_span_fields = self.perf_span_fields.clone();
2022 0 :
2023 0 : let cancel_batcher = self.cancel.child_token();
2024 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
2025 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
2026 0 : let ctx = ctx.attached_child();
2027 0 : async move {
2028 0 : let mut pgb_reader = pgb_reader;
2029 0 : let mut exit = false;
2030 0 : while !exit {
2031 0 : let read_res = Self::pagestream_read_message(
2032 0 : &mut pgb_reader,
2033 0 : tenant_id,
2034 0 : timeline_id,
2035 0 : &mut timeline_handles,
2036 0 : &perf_span_fields,
2037 0 : &cancel_batcher,
2038 0 : &ctx,
2039 0 : protocol_version,
2040 0 : request_span.clone(),
2041 0 : )
2042 0 : .await;
2043 0 : let Some(read_res) = read_res.transpose() else {
2044 0 : debug!("client-initiated shutdown");
2045 0 : break;
2046 : };
2047 0 : exit |= read_res.is_err();
2048 0 : let could_send = batch_tx
2049 0 : .send(read_res, |batch, res| {
2050 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
2051 0 : })
2052 0 : .await;
2053 0 : exit |= could_send.is_err();
2054 : }
2055 0 : (pgb_reader, timeline_handles)
2056 0 : }
2057 0 : });
2058 :
2059 : //
2060 : // Executor
2061 : //
2062 :
2063 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
2064 0 : let ctx = ctx.attached_child();
2065 0 : async move {
2066 0 : let _cancel_batcher = cancel_batcher.drop_guard();
2067 : loop {
2068 0 : let maybe_batch = batch_rx.recv().await;
2069 0 : let batch = match maybe_batch {
2070 0 : Ok(batch) => batch,
2071 : Err(spsc_fold::RecvError::SenderGone) => {
2072 0 : debug!("upstream gone");
2073 0 : return Ok(());
2074 : }
2075 : };
2076 0 : let mut batch = match batch {
2077 0 : Ok(batch) => batch,
2078 0 : Err(e) => {
2079 0 : return Err(e);
2080 : }
2081 : };
2082 :
2083 : if let BatchedFeMessage::GetPage {
2084 0 : pages,
2085 : span: _,
2086 : shard: _,
2087 : batch_break_reason: _,
2088 0 : } = &mut batch
2089 : {
2090 0 : for req in pages {
2091 0 : req.batch_wait_ctx.take();
2092 0 : }
2093 0 : }
2094 :
2095 0 : self.pagestream_handle_batched_message(
2096 0 : pgb_writer,
2097 0 : batch,
2098 0 : io_concurrency.clone(),
2099 0 : &cancel,
2100 0 : protocol_version,
2101 0 : &ctx,
2102 0 : )
2103 0 : .await?;
2104 : }
2105 0 : }
2106 0 : });
2107 :
2108 : //
2109 : // Execute the stages.
2110 : //
2111 :
2112 0 : match execution {
2113 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
2114 0 : tokio::join!(batcher, executor)
2115 : }
2116 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
2117 : // These tasks are not tracked anywhere.
2118 0 : let read_messages_task = tokio::spawn(batcher);
2119 0 : let (read_messages_task_res, executor_res_) =
2120 0 : tokio::join!(read_messages_task, executor,);
2121 0 : (
2122 0 : read_messages_task_res.expect("propagated panic from read_messages"),
2123 0 : executor_res_,
2124 0 : )
2125 : }
2126 : }
2127 0 : }
2128 :
2129 : /// Helper function to handle the LSN from client request.
2130 : ///
2131 : /// Each GetPage (and Exists and Nblocks) request includes information about
2132 : /// which version of the page is being requested. The primary compute node
2133 : /// will always request the latest page version, by setting 'request_lsn' to
2134 : /// the last inserted or flushed WAL position, while a standby will request
2135 : /// a version at the LSN that it's currently caught up to.
2136 : ///
2137 : /// In either case, if the page server hasn't received the WAL up to the
2138 : /// requested LSN yet, we will wait for it to arrive. The return value is
2139 : /// the LSN that should be used to look up the page versions.
2140 : ///
2141 : /// In addition to the request LSN, each request carries another LSN,
2142 : /// 'not_modified_since', which is a hint to the pageserver that the client
2143 : /// knows that the page has not been modified between 'not_modified_since'
2144 : /// and the request LSN. This allows skipping the wait, as long as the WAL
2145 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
2146 : /// information about when the page was modified, it will use
2147 : /// not_modified_since == lsn. If the client lies and sends a too low
2148 : /// not_modified_hint such that there are in fact later page versions, the
2149 : /// behavior is undefined: the pageserver may return any of the page versions
2150 : /// or an error.
2151 0 : async fn wait_or_get_last_lsn(
2152 0 : timeline: &Timeline,
2153 0 : request_lsn: Lsn,
2154 0 : not_modified_since: Lsn,
2155 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2156 0 : ctx: &RequestContext,
2157 0 : ) -> Result<Lsn, PageStreamError> {
2158 0 : let last_record_lsn = timeline.get_last_record_lsn();
2159 0 : let effective_request_lsn = Self::effective_request_lsn(
2160 0 : timeline,
2161 0 : last_record_lsn,
2162 0 : request_lsn,
2163 0 : not_modified_since,
2164 0 : latest_gc_cutoff_lsn,
2165 0 : )?;
2166 :
2167 0 : if effective_request_lsn > last_record_lsn {
2168 0 : timeline
2169 0 : .wait_lsn(
2170 0 : not_modified_since,
2171 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2172 0 : timeline::WaitLsnTimeout::Default,
2173 0 : ctx,
2174 0 : )
2175 0 : .await?;
2176 :
2177 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
2178 : // record LSN. (Or close enough for our purposes; the last-record LSN can
2179 : // advance immediately after we return anyway)
2180 0 : }
2181 :
2182 0 : Ok(effective_request_lsn)
2183 0 : }
2184 :
2185 0 : fn effective_request_lsn(
2186 0 : timeline: &Timeline,
2187 0 : last_record_lsn: Lsn,
2188 0 : request_lsn: Lsn,
2189 0 : not_modified_since: Lsn,
2190 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2191 0 : ) -> Result<Lsn, PageStreamError> {
2192 0 : // Sanity check the request
2193 0 : if request_lsn < not_modified_since {
2194 0 : return Err(PageStreamError::BadRequest(
2195 0 : format!(
2196 0 : "invalid request with request LSN {} and not_modified_since {}",
2197 0 : request_lsn, not_modified_since,
2198 0 : )
2199 0 : .into(),
2200 0 : ));
2201 0 : }
2202 0 :
2203 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2204 0 : if request_lsn == Lsn::INVALID {
2205 0 : return Err(PageStreamError::BadRequest(
2206 0 : "invalid LSN(0) in request".into(),
2207 0 : ));
2208 0 : }
2209 0 :
2210 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2211 0 : //
2212 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
2213 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2214 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2215 0 : let gc_info = &timeline.gc_info.read().unwrap();
2216 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2217 0 : return Err(
2218 0 : PageStreamError::BadRequest(format!(
2219 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2220 0 : request_lsn, **latest_gc_cutoff_lsn
2221 0 : ).into())
2222 0 : );
2223 0 : }
2224 0 : }
2225 :
2226 0 : if not_modified_since > last_record_lsn {
2227 0 : Ok(not_modified_since)
2228 : } else {
2229 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2230 : // here instead. That would give the same result, since we know that there
2231 : // haven't been any modifications since 'not_modified_since'. Using an older
2232 : // LSN might be faster, because that could allow skipping recent layers when
2233 : // finding the page. However, we have historically used 'last_record_lsn', so
2234 : // stick to that for now.
2235 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2236 : }
2237 0 : }
2238 :
2239 : /// Handles the lsn lease request.
2240 : /// If a lease cannot be obtained, the client will receive NULL.
2241 : #[instrument(skip_all, fields(shard_id, %lsn))]
2242 : async fn handle_make_lsn_lease<IO>(
2243 : &mut self,
2244 : pgb: &mut PostgresBackend<IO>,
2245 : tenant_shard_id: TenantShardId,
2246 : timeline_id: TimelineId,
2247 : lsn: Lsn,
2248 : ctx: &RequestContext,
2249 : ) -> Result<(), QueryError>
2250 : where
2251 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2252 : {
2253 : let timeline = self
2254 : .timeline_handles
2255 : .as_mut()
2256 : .unwrap()
2257 : .get(
2258 : tenant_shard_id.tenant_id,
2259 : timeline_id,
2260 : ShardSelector::Known(tenant_shard_id.to_index()),
2261 : )
2262 : .await?;
2263 : set_tracing_field_shard_id(&timeline);
2264 :
2265 : let lease = timeline
2266 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2267 0 : .inspect_err(|e| {
2268 0 : warn!("{e}");
2269 0 : })
2270 : .ok();
2271 0 : let valid_until_str = lease.map(|l| {
2272 0 : l.valid_until
2273 0 : .duration_since(SystemTime::UNIX_EPOCH)
2274 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2275 0 : .as_millis()
2276 0 : .to_string()
2277 0 : });
2278 :
2279 : info!(
2280 : "acquired lease for {} until {}",
2281 : lsn,
2282 : valid_until_str.as_deref().unwrap_or("<unknown>")
2283 : );
2284 :
2285 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2286 :
2287 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2288 : b"valid_until",
2289 : )]))?
2290 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2291 :
2292 : Ok(())
2293 : }
2294 :
2295 : #[instrument(skip_all, fields(shard_id))]
2296 : async fn handle_get_rel_exists_request(
2297 : timeline: &Timeline,
2298 : req: &PagestreamExistsRequest,
2299 : ctx: &RequestContext,
2300 : ) -> Result<PagestreamExistsResponse, PageStreamError> {
2301 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2302 : let lsn = Self::wait_or_get_last_lsn(
2303 : timeline,
2304 : req.hdr.request_lsn,
2305 : req.hdr.not_modified_since,
2306 : &latest_gc_cutoff_lsn,
2307 : ctx,
2308 : )
2309 : .await?;
2310 :
2311 : let exists = timeline
2312 : .get_rel_exists(
2313 : req.rel,
2314 : Version::LsnRange(LsnRange {
2315 : effective_lsn: lsn,
2316 : request_lsn: req.hdr.request_lsn,
2317 : }),
2318 : ctx,
2319 : )
2320 : .await?;
2321 :
2322 : Ok(PagestreamExistsResponse { req: *req, exists })
2323 : }
2324 :
2325 : #[instrument(skip_all, fields(shard_id))]
2326 : async fn handle_get_nblocks_request(
2327 : timeline: &Timeline,
2328 : req: &PagestreamNblocksRequest,
2329 : ctx: &RequestContext,
2330 : ) -> Result<PagestreamNblocksResponse, PageStreamError> {
2331 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2332 : let lsn = Self::wait_or_get_last_lsn(
2333 : timeline,
2334 : req.hdr.request_lsn,
2335 : req.hdr.not_modified_since,
2336 : &latest_gc_cutoff_lsn,
2337 : ctx,
2338 : )
2339 : .await?;
2340 :
2341 : let n_blocks = timeline
2342 : .get_rel_size(
2343 : req.rel,
2344 : Version::LsnRange(LsnRange {
2345 : effective_lsn: lsn,
2346 : request_lsn: req.hdr.request_lsn,
2347 : }),
2348 : ctx,
2349 : )
2350 : .await?;
2351 :
2352 : Ok(PagestreamNblocksResponse {
2353 : req: *req,
2354 : n_blocks,
2355 : })
2356 : }
2357 :
2358 : #[instrument(skip_all, fields(shard_id))]
2359 : async fn handle_db_size_request(
2360 : timeline: &Timeline,
2361 : req: &PagestreamDbSizeRequest,
2362 : ctx: &RequestContext,
2363 : ) -> Result<PagestreamDbSizeResponse, PageStreamError> {
2364 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2365 : let lsn = Self::wait_or_get_last_lsn(
2366 : timeline,
2367 : req.hdr.request_lsn,
2368 : req.hdr.not_modified_since,
2369 : &latest_gc_cutoff_lsn,
2370 : ctx,
2371 : )
2372 : .await?;
2373 :
2374 : let total_blocks = timeline
2375 : .get_db_size(
2376 : DEFAULTTABLESPACE_OID,
2377 : req.dbnode,
2378 : Version::LsnRange(LsnRange {
2379 : effective_lsn: lsn,
2380 : request_lsn: req.hdr.request_lsn,
2381 : }),
2382 : ctx,
2383 : )
2384 : .await?;
2385 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2386 :
2387 : Ok(PagestreamDbSizeResponse { req: *req, db_size })
2388 : }
2389 :
2390 : #[instrument(skip_all)]
2391 : async fn handle_get_page_at_lsn_request_batched(
2392 : timeline: &Timeline,
2393 : requests: SmallVec<[BatchedGetPageRequest; 1]>,
2394 : io_concurrency: IoConcurrency,
2395 : batch_break_reason: GetPageBatchBreakReason,
2396 : ctx: &RequestContext,
2397 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2398 : {
2399 : debug_assert_current_span_has_tenant_and_timeline_id();
2400 :
2401 : timeline
2402 : .query_metrics
2403 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2404 :
2405 : // If a page trace is running, submit an event for this request.
2406 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2407 : let time = SystemTime::now();
2408 : for batch in &requests {
2409 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2410 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2411 : _ = page_trace.try_send(PageTraceEvent {
2412 : key,
2413 : effective_lsn: batch.lsn_range.effective_lsn,
2414 : time,
2415 : });
2416 : }
2417 : }
2418 :
2419 : // If any request in the batch needs to wait for LSN, then do so now.
2420 : let mut perf_instrument = false;
2421 : let max_effective_lsn = requests
2422 : .iter()
2423 0 : .map(|req| {
2424 0 : if req.ctx.has_perf_span() {
2425 0 : perf_instrument = true;
2426 0 : }
2427 :
2428 0 : req.lsn_range.effective_lsn
2429 0 : })
2430 : .max()
2431 : .expect("batch is never empty");
2432 :
2433 : let ctx = match perf_instrument {
2434 : true => RequestContextBuilder::from(ctx)
2435 0 : .root_perf_span(|| {
2436 0 : info_span!(
2437 : target: PERF_TRACE_TARGET,
2438 : "GET_VECTORED",
2439 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2440 : timeline_id = %timeline.timeline_id,
2441 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2442 : %max_effective_lsn
2443 : )
2444 0 : })
2445 : .attached_child(),
2446 : false => ctx.attached_child(),
2447 : };
2448 :
2449 : let last_record_lsn = timeline.get_last_record_lsn();
2450 : if max_effective_lsn > last_record_lsn {
2451 : if let Err(e) = timeline
2452 : .wait_lsn(
2453 : max_effective_lsn,
2454 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2455 : timeline::WaitLsnTimeout::Default,
2456 : &ctx,
2457 : )
2458 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2459 0 : info_span!(
2460 : target: PERF_TRACE_TARGET,
2461 0 : parent: current_perf_span,
2462 : "WAIT_LSN",
2463 : )
2464 0 : })
2465 : .await
2466 : {
2467 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2468 0 : Err(BatchedPageStreamError {
2469 0 : err: PageStreamError::from(e.clone()),
2470 0 : req: req.req.hdr,
2471 0 : })
2472 0 : }));
2473 : }
2474 : }
2475 :
2476 : let results = timeline
2477 : .get_rel_page_at_lsn_batched(
2478 0 : requests.iter().map(|p| {
2479 0 : (
2480 0 : &p.req.rel,
2481 0 : &p.req.blkno,
2482 0 : p.lsn_range,
2483 0 : p.ctx.attached_child(),
2484 0 : )
2485 0 : }),
2486 : io_concurrency,
2487 : &ctx,
2488 : )
2489 : .await;
2490 : assert_eq!(results.len(), requests.len());
2491 :
2492 : // TODO: avoid creating the new Vec here
2493 : Vec::from_iter(
2494 : requests
2495 : .into_iter()
2496 : .zip(results.into_iter())
2497 0 : .map(|(req, res)| {
2498 0 : res.map(|page| {
2499 0 : (
2500 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
2501 0 : req: req.req,
2502 0 : page,
2503 0 : }),
2504 0 : req.timer,
2505 0 : req.ctx,
2506 0 : )
2507 0 : })
2508 0 : .map_err(|e| BatchedPageStreamError {
2509 0 : err: PageStreamError::from(e),
2510 0 : req: req.req.hdr,
2511 0 : })
2512 0 : }),
2513 : )
2514 : }
2515 :
2516 : #[instrument(skip_all, fields(shard_id))]
2517 : async fn handle_get_slru_segment_request(
2518 : timeline: &Timeline,
2519 : req: &PagestreamGetSlruSegmentRequest,
2520 : ctx: &RequestContext,
2521 : ) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
2522 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2523 : let lsn = Self::wait_or_get_last_lsn(
2524 : timeline,
2525 : req.hdr.request_lsn,
2526 : req.hdr.not_modified_since,
2527 : &latest_gc_cutoff_lsn,
2528 : ctx,
2529 : )
2530 : .await?;
2531 :
2532 : let kind = SlruKind::from_repr(req.kind)
2533 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2534 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2535 :
2536 : Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
2537 : }
2538 :
2539 : // NB: this impl mimics what we do for batched getpage requests.
2540 : #[cfg(feature = "testing")]
2541 : #[instrument(skip_all, fields(shard_id))]
2542 : async fn handle_test_request_batch(
2543 : timeline: &Timeline,
2544 : requests: Vec<BatchedTestRequest>,
2545 : _ctx: &RequestContext,
2546 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2547 : {
2548 : // real requests would do something with the timeline
2549 : let mut results = Vec::with_capacity(requests.len());
2550 : for _req in requests.iter() {
2551 : tokio::task::yield_now().await;
2552 :
2553 : results.push({
2554 : if timeline.cancel.is_cancelled() {
2555 : Err(PageReconstructError::Cancelled)
2556 : } else {
2557 : Ok(())
2558 : }
2559 : });
2560 : }
2561 :
2562 : // TODO: avoid creating the new Vec here
2563 : Vec::from_iter(
2564 : requests
2565 : .into_iter()
2566 : .zip(results.into_iter())
2567 0 : .map(|(req, res)| {
2568 0 : res.map(|()| {
2569 0 : (
2570 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2571 0 : req: req.req.clone(),
2572 0 : }),
2573 0 : req.timer,
2574 0 : RequestContext::new(
2575 0 : TaskKind::PageRequestHandler,
2576 0 : DownloadBehavior::Warn,
2577 0 : ),
2578 0 : )
2579 0 : })
2580 0 : .map_err(|e| BatchedPageStreamError {
2581 0 : err: PageStreamError::from(e),
2582 0 : req: req.req.hdr,
2583 0 : })
2584 0 : }),
2585 : )
2586 : }
2587 :
2588 : /// Note on "fullbackup":
2589 : /// Full basebackups should only be used for debugging purposes.
2590 : /// Originally, it was introduced to enable breaking storage format changes,
2591 : /// but that is not applicable anymore.
2592 : ///
2593 : /// # Coding Discipline
2594 : ///
2595 : /// Coding discipline within this function: all interaction with the `pgb` connection
2596 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2597 : /// This is so that we can shutdown page_service quickly.
2598 : ///
2599 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2600 : /// to connection cancellation.
2601 : #[allow(clippy::too_many_arguments)]
2602 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2603 : async fn handle_basebackup_request<IO>(
2604 : &mut self,
2605 : pgb: &mut PostgresBackend<IO>,
2606 : tenant_id: TenantId,
2607 : timeline_id: TimelineId,
2608 : lsn: Option<Lsn>,
2609 : prev_lsn: Option<Lsn>,
2610 : full_backup: bool,
2611 : gzip: bool,
2612 : replica: bool,
2613 : ctx: &RequestContext,
2614 : ) -> Result<(), QueryError>
2615 : where
2616 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2617 : {
2618 : let started = std::time::Instant::now();
2619 :
2620 : let timeline = self
2621 : .timeline_handles
2622 : .as_mut()
2623 : .unwrap()
2624 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2625 : .await?;
2626 : set_tracing_field_shard_id(&timeline);
2627 : let ctx = ctx.with_scope_timeline(&timeline);
2628 :
2629 : if timeline.is_archived() == Some(true) {
2630 : tracing::info!(
2631 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2632 : );
2633 : return Err(QueryError::NotFound("timeline is archived".into()));
2634 : }
2635 :
2636 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2637 : if let Some(lsn) = lsn {
2638 : // Backup was requested at a particular LSN. Wait for it to arrive.
2639 : info!("waiting for {}", lsn);
2640 : timeline
2641 : .wait_lsn(
2642 : lsn,
2643 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2644 : crate::tenant::timeline::WaitLsnTimeout::Default,
2645 : &ctx,
2646 : )
2647 : .await?;
2648 : timeline
2649 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2650 : .context("invalid basebackup lsn")?;
2651 : }
2652 :
2653 : let lsn_awaited_after = started.elapsed();
2654 :
2655 : // switch client to COPYOUT
2656 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2657 : .map_err(QueryError::Disconnected)?;
2658 : self.flush_cancellable(pgb, &self.cancel).await?;
2659 :
2660 : let mut from_cache = false;
2661 :
2662 : // Send a tarball of the latest layer on the timeline. Compress if not
2663 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2664 : if full_backup {
2665 : let mut writer = pgb.copyout_writer();
2666 : basebackup::send_basebackup_tarball(
2667 : &mut writer,
2668 : &timeline,
2669 : lsn,
2670 : prev_lsn,
2671 : full_backup,
2672 : replica,
2673 : &ctx,
2674 : )
2675 : .await?;
2676 : } else {
2677 : let mut writer = BufWriter::new(pgb.copyout_writer());
2678 :
2679 : let cached = {
2680 : // Basebackup is cached only for this combination of parameters.
2681 : if timeline.is_basebackup_cache_enabled()
2682 : && gzip
2683 : && lsn.is_some()
2684 : && prev_lsn.is_none()
2685 : {
2686 : self.basebackup_cache
2687 : .get(tenant_id, timeline_id, lsn.unwrap())
2688 : .await
2689 : } else {
2690 : None
2691 : }
2692 : };
2693 :
2694 : if let Some(mut cached) = cached {
2695 : from_cache = true;
2696 : tokio::io::copy(&mut cached, &mut writer)
2697 : .await
2698 0 : .map_err(|err| {
2699 0 : BasebackupError::Client(err, "handle_basebackup_request,cached,copy")
2700 0 : })?;
2701 : } else if gzip {
2702 : let mut encoder = GzipEncoder::with_quality(
2703 : &mut writer,
2704 : // NOTE using fast compression because it's on the critical path
2705 : // for compute startup. For an empty database, we get
2706 : // <100KB with this method. The Level::Best compression method
2707 : // gives us <20KB, but maybe we should add basebackup caching
2708 : // on compute shutdown first.
2709 : async_compression::Level::Fastest,
2710 : );
2711 : basebackup::send_basebackup_tarball(
2712 : &mut encoder,
2713 : &timeline,
2714 : lsn,
2715 : prev_lsn,
2716 : full_backup,
2717 : replica,
2718 : &ctx,
2719 : )
2720 : .await?;
2721 : // shutdown the encoder to ensure the gzip footer is written
2722 : encoder
2723 : .shutdown()
2724 : .await
2725 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2726 : } else {
2727 : basebackup::send_basebackup_tarball(
2728 : &mut writer,
2729 : &timeline,
2730 : lsn,
2731 : prev_lsn,
2732 : full_backup,
2733 : replica,
2734 : &ctx,
2735 : )
2736 : .await?;
2737 : }
2738 : writer
2739 : .flush()
2740 : .await
2741 0 : .map_err(|err| BasebackupError::Client(err, "handle_basebackup_request,flush"))?;
2742 : }
2743 :
2744 : pgb.write_message_noflush(&BeMessage::CopyDone)
2745 : .map_err(QueryError::Disconnected)?;
2746 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2747 :
2748 : let basebackup_after = started
2749 : .elapsed()
2750 : .checked_sub(lsn_awaited_after)
2751 : .unwrap_or(Duration::ZERO);
2752 :
2753 : info!(
2754 : lsn_await_millis = lsn_awaited_after.as_millis(),
2755 : basebackup_millis = basebackup_after.as_millis(),
2756 : %from_cache,
2757 : "basebackup complete"
2758 : );
2759 :
2760 : Ok(())
2761 : }
2762 :
2763 : // when accessing management api supply None as an argument
2764 : // when using to authorize tenant pass corresponding tenant id
2765 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2766 0 : if self.auth.is_none() {
2767 : // auth is set to Trust, nothing to check so just return ok
2768 0 : return Ok(());
2769 0 : }
2770 0 : // auth is some, just checked above, when auth is some
2771 0 : // then claims are always present because of checks during connection init
2772 0 : // so this expect won't trigger
2773 0 : let claims = self
2774 0 : .claims
2775 0 : .as_ref()
2776 0 : .expect("claims presence already checked");
2777 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2778 0 : }
2779 : }
2780 :
2781 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2782 : #[derive(Debug, Clone, Eq, PartialEq)]
2783 : struct BaseBackupCmd {
2784 : tenant_id: TenantId,
2785 : timeline_id: TimelineId,
2786 : lsn: Option<Lsn>,
2787 : gzip: bool,
2788 : replica: bool,
2789 : }
2790 :
2791 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2792 : #[derive(Debug, Clone, Eq, PartialEq)]
2793 : struct FullBackupCmd {
2794 : tenant_id: TenantId,
2795 : timeline_id: TimelineId,
2796 : lsn: Option<Lsn>,
2797 : prev_lsn: Option<Lsn>,
2798 : }
2799 :
2800 : /// `pagestream_v2 tenant timeline`
2801 : #[derive(Debug, Clone, Eq, PartialEq)]
2802 : struct PageStreamCmd {
2803 : tenant_id: TenantId,
2804 : timeline_id: TimelineId,
2805 : protocol_version: PagestreamProtocolVersion,
2806 : }
2807 :
2808 : /// `lease lsn tenant timeline lsn`
2809 : #[derive(Debug, Clone, Eq, PartialEq)]
2810 : struct LeaseLsnCmd {
2811 : tenant_shard_id: TenantShardId,
2812 : timeline_id: TimelineId,
2813 : lsn: Lsn,
2814 : }
2815 :
2816 : #[derive(Debug, Clone, Eq, PartialEq)]
2817 : enum PageServiceCmd {
2818 : Set,
2819 : PageStream(PageStreamCmd),
2820 : BaseBackup(BaseBackupCmd),
2821 : FullBackup(FullBackupCmd),
2822 : LeaseLsn(LeaseLsnCmd),
2823 : }
2824 :
2825 : impl PageStreamCmd {
2826 3 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2827 3 : let parameters = query.split_whitespace().collect_vec();
2828 3 : if parameters.len() != 2 {
2829 1 : bail!(
2830 1 : "invalid number of parameters for pagestream command: {}",
2831 1 : query
2832 1 : );
2833 2 : }
2834 2 : let tenant_id = TenantId::from_str(parameters[0])
2835 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2836 1 : let timeline_id = TimelineId::from_str(parameters[1])
2837 1 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2838 1 : Ok(Self {
2839 1 : tenant_id,
2840 1 : timeline_id,
2841 1 : protocol_version,
2842 1 : })
2843 3 : }
2844 : }
2845 :
2846 : impl FullBackupCmd {
2847 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2848 2 : let parameters = query.split_whitespace().collect_vec();
2849 2 : if parameters.len() < 2 || parameters.len() > 4 {
2850 0 : bail!(
2851 0 : "invalid number of parameters for basebackup command: {}",
2852 0 : query
2853 0 : );
2854 2 : }
2855 2 : let tenant_id = TenantId::from_str(parameters[0])
2856 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2857 2 : let timeline_id = TimelineId::from_str(parameters[1])
2858 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2859 : // The caller is responsible for providing correct lsn and prev_lsn.
2860 2 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2861 : Some(
2862 1 : Lsn::from_str(lsn_str)
2863 1 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2864 : )
2865 : } else {
2866 1 : None
2867 : };
2868 2 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2869 : Some(
2870 1 : Lsn::from_str(prev_lsn_str)
2871 1 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2872 : )
2873 : } else {
2874 1 : None
2875 : };
2876 2 : Ok(Self {
2877 2 : tenant_id,
2878 2 : timeline_id,
2879 2 : lsn,
2880 2 : prev_lsn,
2881 2 : })
2882 2 : }
2883 : }
2884 :
2885 : impl BaseBackupCmd {
2886 9 : fn parse(query: &str) -> anyhow::Result<Self> {
2887 9 : let parameters = query.split_whitespace().collect_vec();
2888 9 : if parameters.len() < 2 {
2889 0 : bail!(
2890 0 : "invalid number of parameters for basebackup command: {}",
2891 0 : query
2892 0 : );
2893 9 : }
2894 9 : let tenant_id = TenantId::from_str(parameters[0])
2895 9 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2896 9 : let timeline_id = TimelineId::from_str(parameters[1])
2897 9 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2898 : let lsn;
2899 : let flags_parse_from;
2900 9 : if let Some(maybe_lsn) = parameters.get(2) {
2901 8 : if *maybe_lsn == "latest" {
2902 1 : lsn = None;
2903 1 : flags_parse_from = 3;
2904 7 : } else if maybe_lsn.starts_with("--") {
2905 5 : lsn = None;
2906 5 : flags_parse_from = 2;
2907 5 : } else {
2908 : lsn = Some(
2909 2 : Lsn::from_str(maybe_lsn)
2910 2 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2911 : );
2912 2 : flags_parse_from = 3;
2913 : }
2914 1 : } else {
2915 1 : lsn = None;
2916 1 : flags_parse_from = 2;
2917 1 : }
2918 :
2919 9 : let mut gzip = false;
2920 9 : let mut replica = false;
2921 :
2922 11 : for ¶m in ¶meters[flags_parse_from..] {
2923 11 : match param {
2924 11 : "--gzip" => {
2925 7 : if gzip {
2926 1 : bail!("duplicate parameter for basebackup command: {param}")
2927 6 : }
2928 6 : gzip = true
2929 : }
2930 4 : "--replica" => {
2931 2 : if replica {
2932 0 : bail!("duplicate parameter for basebackup command: {param}")
2933 2 : }
2934 2 : replica = true
2935 : }
2936 2 : _ => bail!("invalid parameter for basebackup command: {param}"),
2937 : }
2938 : }
2939 6 : Ok(Self {
2940 6 : tenant_id,
2941 6 : timeline_id,
2942 6 : lsn,
2943 6 : gzip,
2944 6 : replica,
2945 6 : })
2946 9 : }
2947 : }
2948 :
2949 : impl LeaseLsnCmd {
2950 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2951 2 : let parameters = query.split_whitespace().collect_vec();
2952 2 : if parameters.len() != 3 {
2953 0 : bail!(
2954 0 : "invalid number of parameters for lease lsn command: {}",
2955 0 : query
2956 0 : );
2957 2 : }
2958 2 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2959 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2960 2 : let timeline_id = TimelineId::from_str(parameters[1])
2961 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2962 2 : let lsn = Lsn::from_str(parameters[2])
2963 2 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2964 2 : Ok(Self {
2965 2 : tenant_shard_id,
2966 2 : timeline_id,
2967 2 : lsn,
2968 2 : })
2969 2 : }
2970 : }
2971 :
2972 : impl PageServiceCmd {
2973 21 : fn parse(query: &str) -> anyhow::Result<Self> {
2974 21 : let query = query.trim();
2975 21 : let Some((cmd, other)) = query.split_once(' ') else {
2976 2 : bail!("cannot parse query: {query}")
2977 : };
2978 19 : match cmd.to_ascii_lowercase().as_str() {
2979 19 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2980 3 : other,
2981 3 : PagestreamProtocolVersion::V2,
2982 3 : )?)),
2983 16 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2984 0 : other,
2985 0 : PagestreamProtocolVersion::V3,
2986 0 : )?)),
2987 16 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2988 7 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2989 5 : "lease" => {
2990 3 : let Some((cmd2, other)) = other.split_once(' ') else {
2991 0 : bail!("invalid lease command: {cmd}");
2992 : };
2993 3 : let cmd2 = cmd2.to_ascii_lowercase();
2994 3 : if cmd2 == "lsn" {
2995 2 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2996 : } else {
2997 1 : bail!("invalid lease command: {cmd}");
2998 : }
2999 : }
3000 2 : "set" => Ok(Self::Set),
3001 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
3002 : }
3003 21 : }
3004 : }
3005 :
3006 : /// Parse the startup options from the postgres wire protocol startup packet.
3007 : ///
3008 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
3009 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
3010 : /// whether all options are successfully parsed. There could be duplicates in the options
3011 : /// if the caller passed such parameters.
3012 7 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
3013 7 : let mut parsing_config = false;
3014 7 : let mut has_error = false;
3015 7 : let mut config = Vec::new();
3016 16 : for item in options.split_whitespace() {
3017 16 : if item == "-c" {
3018 9 : if !parsing_config {
3019 8 : parsing_config = true;
3020 8 : } else {
3021 : // "-c" followed with another "-c"
3022 1 : tracing::warn!("failed to parse the startup options: {options}");
3023 1 : has_error = true;
3024 1 : break;
3025 : }
3026 7 : } else if item.starts_with("-c") || parsing_config {
3027 7 : let Some((mut key, value)) = item.split_once('=') else {
3028 : // "-c" followed with an invalid option
3029 1 : tracing::warn!("failed to parse the startup options: {options}");
3030 1 : has_error = true;
3031 1 : break;
3032 : };
3033 6 : if !parsing_config {
3034 : // Parse "-coptions=X"
3035 1 : let Some(stripped_key) = key.strip_prefix("-c") else {
3036 0 : tracing::warn!("failed to parse the startup options: {options}");
3037 0 : has_error = true;
3038 0 : break;
3039 : };
3040 1 : key = stripped_key;
3041 5 : }
3042 6 : config.push((key.to_string(), value.to_string()));
3043 6 : parsing_config = false;
3044 : } else {
3045 0 : tracing::warn!("failed to parse the startup options: {options}");
3046 0 : has_error = true;
3047 0 : break;
3048 : }
3049 : }
3050 7 : if parsing_config {
3051 : // "-c" without the option
3052 3 : tracing::warn!("failed to parse the startup options: {options}");
3053 3 : has_error = true;
3054 4 : }
3055 7 : (config, has_error)
3056 7 : }
3057 :
3058 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
3059 : where
3060 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
3061 : {
3062 0 : fn check_auth_jwt(
3063 0 : &mut self,
3064 0 : _pgb: &mut PostgresBackend<IO>,
3065 0 : jwt_response: &[u8],
3066 0 : ) -> Result<(), QueryError> {
3067 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
3068 : // which requires auth to be present
3069 0 : let data: TokenData<Claims> = self
3070 0 : .auth
3071 0 : .as_ref()
3072 0 : .unwrap()
3073 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
3074 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
3075 :
3076 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
3077 0 : return Err(QueryError::Unauthorized(
3078 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
3079 0 : ));
3080 0 : }
3081 0 :
3082 0 : debug!(
3083 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
3084 : data.claims.scope, data.claims.tenant_id,
3085 : );
3086 :
3087 0 : self.claims = Some(data.claims);
3088 0 : Ok(())
3089 0 : }
3090 :
3091 0 : fn startup(
3092 0 : &mut self,
3093 0 : _pgb: &mut PostgresBackend<IO>,
3094 0 : sm: &FeStartupPacket,
3095 0 : ) -> Result<(), QueryError> {
3096 0 : fail::fail_point!("ps::connection-start::startup-packet");
3097 :
3098 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
3099 0 : if let Some(app_name) = params.get("application_name") {
3100 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
3101 0 : Span::current().record("application_name", field::display(app_name));
3102 0 : }
3103 0 : if let Some(options) = params.get("options") {
3104 0 : let (config, _) = parse_options(options);
3105 0 : for (key, value) in config {
3106 0 : if key == "neon.compute_mode" {
3107 0 : self.perf_span_fields.compute_mode = Some(value.clone());
3108 0 : Span::current().record("compute_mode", field::display(value));
3109 0 : }
3110 : }
3111 0 : }
3112 0 : };
3113 :
3114 0 : Ok(())
3115 0 : }
3116 :
3117 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
3118 : async fn process_query(
3119 : &mut self,
3120 : pgb: &mut PostgresBackend<IO>,
3121 : query_string: &str,
3122 : ) -> Result<(), QueryError> {
3123 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
3124 0 : info!("Hit failpoint for bad connection");
3125 0 : Err(QueryError::SimulatedConnectionError)
3126 0 : });
3127 :
3128 : fail::fail_point!("ps::connection-start::process-query");
3129 :
3130 : let ctx = self.connection_ctx.attached_child();
3131 : debug!("process query {query_string}");
3132 : let query = PageServiceCmd::parse(query_string)?;
3133 : match query {
3134 : PageServiceCmd::PageStream(PageStreamCmd {
3135 : tenant_id,
3136 : timeline_id,
3137 : protocol_version,
3138 : }) => {
3139 : tracing::Span::current()
3140 : .record("tenant_id", field::display(tenant_id))
3141 : .record("timeline_id", field::display(timeline_id));
3142 :
3143 : self.check_permission(Some(tenant_id))?;
3144 : let command_kind = match protocol_version {
3145 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
3146 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
3147 : };
3148 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
3149 :
3150 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
3151 : .await?;
3152 : }
3153 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3154 : tenant_id,
3155 : timeline_id,
3156 : lsn,
3157 : gzip,
3158 : replica,
3159 : }) => {
3160 : tracing::Span::current()
3161 : .record("tenant_id", field::display(tenant_id))
3162 : .record("timeline_id", field::display(timeline_id));
3163 :
3164 : self.check_permission(Some(tenant_id))?;
3165 :
3166 : COMPUTE_COMMANDS_COUNTERS
3167 : .for_command(ComputeCommandKind::Basebackup)
3168 : .inc();
3169 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
3170 0 : let res = async {
3171 0 : self.handle_basebackup_request(
3172 0 : pgb,
3173 0 : tenant_id,
3174 0 : timeline_id,
3175 0 : lsn,
3176 0 : None,
3177 0 : false,
3178 0 : gzip,
3179 0 : replica,
3180 0 : &ctx,
3181 0 : )
3182 0 : .await?;
3183 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3184 0 : Result::<(), QueryError>::Ok(())
3185 0 : }
3186 : .await;
3187 : metric_recording.observe(&res);
3188 : res?;
3189 : }
3190 : // same as basebackup, but result includes relational data as well
3191 : PageServiceCmd::FullBackup(FullBackupCmd {
3192 : tenant_id,
3193 : timeline_id,
3194 : lsn,
3195 : prev_lsn,
3196 : }) => {
3197 : tracing::Span::current()
3198 : .record("tenant_id", field::display(tenant_id))
3199 : .record("timeline_id", field::display(timeline_id));
3200 :
3201 : self.check_permission(Some(tenant_id))?;
3202 :
3203 : COMPUTE_COMMANDS_COUNTERS
3204 : .for_command(ComputeCommandKind::Fullbackup)
3205 : .inc();
3206 :
3207 : // Check that the timeline exists
3208 : self.handle_basebackup_request(
3209 : pgb,
3210 : tenant_id,
3211 : timeline_id,
3212 : lsn,
3213 : prev_lsn,
3214 : true,
3215 : false,
3216 : false,
3217 : &ctx,
3218 : )
3219 : .await?;
3220 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3221 : }
3222 : PageServiceCmd::Set => {
3223 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3224 : // on connect
3225 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3226 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3227 : }
3228 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3229 : tenant_shard_id,
3230 : timeline_id,
3231 : lsn,
3232 : }) => {
3233 : tracing::Span::current()
3234 : .record("tenant_id", field::display(tenant_shard_id))
3235 : .record("timeline_id", field::display(timeline_id));
3236 :
3237 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3238 :
3239 : COMPUTE_COMMANDS_COUNTERS
3240 : .for_command(ComputeCommandKind::LeaseLsn)
3241 : .inc();
3242 :
3243 : match self
3244 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3245 : .await
3246 : {
3247 : Ok(()) => {
3248 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3249 : }
3250 : Err(e) => {
3251 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3252 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3253 : &e.to_string(),
3254 : Some(e.pg_error_code()),
3255 : ))?
3256 : }
3257 : };
3258 : }
3259 : }
3260 :
3261 : Ok(())
3262 : }
3263 : }
3264 :
3265 : /// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
3266 : ///
3267 : /// TODO: rename to PageServiceHandler when libpq impl is removed.
3268 : pub struct GrpcPageServiceHandler {
3269 : tenant_manager: Arc<TenantManager>,
3270 : ctx: RequestContext,
3271 : gate_guard: GateGuard,
3272 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3273 : }
3274 :
3275 : impl GrpcPageServiceHandler {
3276 : /// Spawns a gRPC server for the page service.
3277 : ///
3278 : /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
3279 : /// need to reimplement the TCP+TLS accept loop ourselves.
3280 0 : pub fn spawn(
3281 0 : tenant_manager: Arc<TenantManager>,
3282 0 : auth: Option<Arc<SwappableJwtAuth>>,
3283 0 : perf_trace_dispatch: Option<Dispatch>,
3284 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3285 0 : listener: std::net::TcpListener,
3286 0 : ) -> anyhow::Result<CancellableTask> {
3287 0 : let cancel = CancellationToken::new();
3288 0 : let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
3289 0 : .download_behavior(DownloadBehavior::Download)
3290 0 : .perf_span_dispatch(perf_trace_dispatch)
3291 0 : .detached_child();
3292 0 : let gate = Gate::default();
3293 :
3294 : // Set up the TCP socket. We take a preconfigured TcpListener to bind the
3295 : // port early during startup.
3296 0 : let incoming = {
3297 0 : let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
3298 0 : listener.set_nonblocking(true)?;
3299 0 : tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(
3300 0 : listener,
3301 0 : )?)
3302 0 : .with_nodelay(Some(GRPC_TCP_NODELAY))
3303 0 : .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
3304 0 : };
3305 0 :
3306 0 : // Set up the gRPC server.
3307 0 : //
3308 0 : // TODO: consider tuning window sizes.
3309 0 : let mut server = tonic::transport::Server::builder()
3310 0 : .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
3311 0 : .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
3312 0 : .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
3313 0 :
3314 0 : // Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
3315 0 : //
3316 0 : // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
3317 0 : //
3318 0 : // * Layers: allow async code, can run code after the service response. However, only has access
3319 0 : // to the raw HTTP request/response, not the gRPC types.
3320 0 : let page_service_handler = GrpcPageServiceHandler {
3321 0 : tenant_manager,
3322 0 : ctx,
3323 0 : gate_guard: gate.enter().expect("gate was just created"),
3324 0 : get_vectored_concurrent_io,
3325 0 : };
3326 0 :
3327 0 : let observability_layer = ObservabilityLayer;
3328 0 : let mut tenant_interceptor = TenantMetadataInterceptor;
3329 0 : let mut auth_interceptor = TenantAuthInterceptor::new(auth);
3330 0 :
3331 0 : let page_service = tower::ServiceBuilder::new()
3332 0 : // Create tracing span and record request start time.
3333 0 : .layer(observability_layer)
3334 0 : // Intercept gRPC requests.
3335 0 : .layer(tonic::service::InterceptorLayer::new(move |mut req| {
3336 : // Extract tenant metadata.
3337 0 : req = tenant_interceptor.call(req)?;
3338 : // Authenticate tenant JWT token.
3339 0 : req = auth_interceptor.call(req)?;
3340 0 : Ok(req)
3341 0 : }))
3342 0 : // Run the page service.
3343 0 : .service(proto::PageServiceServer::new(page_service_handler));
3344 0 : let server = server.add_service(page_service);
3345 :
3346 : // Reflection service for use with e.g. grpcurl.
3347 0 : let reflection_service = tonic_reflection::server::Builder::configure()
3348 0 : .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
3349 0 : .build_v1()?;
3350 0 : let server = server.add_service(reflection_service);
3351 0 :
3352 0 : // Spawn server task.
3353 0 : let task_cancel = cancel.clone();
3354 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
3355 0 : "grpc listener",
3356 0 : async move {
3357 0 : let result = server
3358 0 : .serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
3359 0 : .await;
3360 0 : if result.is_ok() {
3361 : // TODO: revisit shutdown logic once page service is implemented.
3362 0 : gate.close().await;
3363 0 : }
3364 0 : result
3365 0 : },
3366 0 : ));
3367 0 :
3368 0 : Ok(CancellableTask { task, cancel })
3369 0 : }
3370 :
3371 : /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
3372 : /// relations and their sizes, as well as SLRU segments and similar data.
3373 : #[allow(clippy::result_large_err)]
3374 0 : fn ensure_shard_zero(timeline: &Handle<TenantManagerTypes>) -> Result<(), tonic::Status> {
3375 0 : match timeline.get_shard_index().shard_number.0 {
3376 0 : 0 => Ok(()),
3377 0 : shard => Err(tonic::Status::invalid_argument(format!(
3378 0 : "request must execute on shard zero (is shard {shard})",
3379 0 : ))),
3380 : }
3381 0 : }
3382 :
3383 : /// Generates a PagestreamRequest header from a ReadLsn and request ID.
3384 0 : fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
3385 0 : PagestreamRequest {
3386 0 : reqid: req_id,
3387 0 : request_lsn: read_lsn.request_lsn,
3388 0 : not_modified_since: read_lsn
3389 0 : .not_modified_since_lsn
3390 0 : .unwrap_or(read_lsn.request_lsn),
3391 0 : }
3392 0 : }
3393 :
3394 : /// Acquires a timeline handle for the given request.
3395 : ///
3396 : /// TODO: during shard splits, the compute may still be sending requests to the parent shard
3397 : /// until the entire split is committed and the compute is notified. Consider installing a
3398 : /// temporary shard router from the parent to the children while the split is in progress.
3399 : ///
3400 : /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage
3401 : /// the TimelineHandles lifecycle.
3402 : ///
3403 : /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid
3404 : /// the unnecessary overhead.
3405 0 : async fn get_request_timeline(
3406 0 : &self,
3407 0 : req: &tonic::Request<impl Any>,
3408 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
3409 0 : let ttid = *extract::<TenantTimelineId>(req);
3410 0 : let shard_index = *extract::<ShardIndex>(req);
3411 0 : let shard_selector = ShardSelector::Known(shard_index);
3412 0 :
3413 0 : TimelineHandles::new(self.tenant_manager.clone())
3414 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3415 0 : .await
3416 0 : }
3417 :
3418 : /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
3419 : /// Only errors if the timeline is shutting down.
3420 : ///
3421 : /// TODO: move timer construction to ObservabilityLayer (see TODO there).
3422 : /// TODO: decouple rate limiting (middleware?), and return SlowDown errors instead.
3423 0 : async fn record_op_start_and_throttle(
3424 0 : timeline: &Handle<TenantManagerTypes>,
3425 0 : op: metrics::SmgrQueryType,
3426 0 : received_at: Instant,
3427 0 : ) -> Result<SmgrOpTimer, tonic::Status> {
3428 0 : let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
3429 0 : .await
3430 0 : .map_err(|err| match err {
3431 : // record_op_start_and_throttle() only returns Shutdown.
3432 0 : QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
3433 0 : err => tonic::Status::internal(format!("unexpected error: {err}")),
3434 0 : })?;
3435 0 : timer.observe_execution_start(Instant::now());
3436 0 : Ok(timer)
3437 0 : }
3438 :
3439 : /// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
3440 : ///
3441 : /// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
3442 : /// with an appropriate status code instead.
3443 : ///
3444 : /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
3445 : /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
3446 : /// split them up in the client or server.
3447 : #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))]
3448 : async fn get_page(
3449 : ctx: &RequestContext,
3450 : timeline: &WeakHandle<TenantManagerTypes>,
3451 : req: proto::GetPageRequest,
3452 : io_concurrency: IoConcurrency,
3453 : ) -> Result<proto::GetPageResponse, tonic::Status> {
3454 : let received_at = Instant::now();
3455 : let timeline = timeline.upgrade()?;
3456 : let ctx = ctx.with_scope_page_service_pagestream(&timeline);
3457 :
3458 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3459 : let req: page_api::GetPageRequest = req.try_into()?;
3460 :
3461 : span_record!(
3462 : req_id = %req.request_id,
3463 : rel = %req.rel,
3464 : blkno = %req.block_numbers[0],
3465 : blks = %req.block_numbers.len(),
3466 : lsn = %req.read_lsn,
3467 : );
3468 :
3469 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
3470 : let effective_lsn = match PageServerHandler::effective_request_lsn(
3471 : &timeline,
3472 : timeline.get_last_record_lsn(),
3473 : req.read_lsn.request_lsn,
3474 : req.read_lsn
3475 : .not_modified_since_lsn
3476 : .unwrap_or(req.read_lsn.request_lsn),
3477 : &latest_gc_cutoff_lsn,
3478 : ) {
3479 : Ok(lsn) => lsn,
3480 : Err(err) => return err.into_get_page_response(req.request_id),
3481 : };
3482 :
3483 : let mut batch = SmallVec::with_capacity(req.block_numbers.len());
3484 : for blkno in req.block_numbers {
3485 : // TODO: this creates one timer per page and throttles it. We should have a timer for
3486 : // the entire batch, and throttle only the batch, but this is equivalent to what
3487 : // PageServerHandler does already so we keep it for now.
3488 : let timer = Self::record_op_start_and_throttle(
3489 : &timeline,
3490 : metrics::SmgrQueryType::GetPageAtLsn,
3491 : received_at,
3492 : )
3493 : .await?;
3494 :
3495 : batch.push(BatchedGetPageRequest {
3496 : req: PagestreamGetPageRequest {
3497 : hdr: Self::make_hdr(req.read_lsn, req.request_id),
3498 : rel: req.rel,
3499 : blkno,
3500 : },
3501 : lsn_range: LsnRange {
3502 : effective_lsn,
3503 : request_lsn: req.read_lsn.request_lsn,
3504 : },
3505 : timer,
3506 : ctx: ctx.attached_child(),
3507 : batch_wait_ctx: None, // TODO: add tracing
3508 : });
3509 : }
3510 :
3511 : // TODO: this does a relation size query for every page in the batch. Since this batch is
3512 : // all for one relation, we could do this only once. However, this is not the case for the
3513 : // libpq implementation.
3514 : let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
3515 : &timeline,
3516 : batch,
3517 : io_concurrency,
3518 : GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
3519 : &ctx,
3520 : )
3521 : .await;
3522 :
3523 : let mut resp = page_api::GetPageResponse {
3524 : request_id: req.request_id,
3525 : status_code: page_api::GetPageStatusCode::Ok,
3526 : reason: None,
3527 : page_images: Vec::with_capacity(results.len()),
3528 : };
3529 :
3530 : for result in results {
3531 : match result {
3532 : Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page),
3533 : Ok((resp, _, _)) => {
3534 : return Err(tonic::Status::internal(format!(
3535 : "unexpected response: {resp:?}"
3536 : )));
3537 : }
3538 : Err(err) => return err.err.into_get_page_response(req.request_id),
3539 : };
3540 : }
3541 :
3542 : Ok(resp.into())
3543 : }
3544 : }
3545 :
3546 : /// Implements the gRPC page service.
3547 : ///
3548 : /// TODO: cancellation.
3549 : /// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
3550 : #[tonic::async_trait]
3551 : impl proto::PageService for GrpcPageServiceHandler {
3552 : type GetBaseBackupStream = Pin<
3553 : Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
3554 : >;
3555 :
3556 : type GetPagesStream =
3557 : Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
3558 :
3559 : #[instrument(skip_all, fields(rel, lsn))]
3560 : async fn check_rel_exists(
3561 : &self,
3562 : req: tonic::Request<proto::CheckRelExistsRequest>,
3563 0 : ) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
3564 0 : let received_at = extract::<ReceivedAt>(&req).0;
3565 0 : let timeline = self.get_request_timeline(&req).await?;
3566 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3567 0 :
3568 0 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3569 0 : Self::ensure_shard_zero(&timeline)?;
3570 0 : let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
3571 :
3572 0 : span_record!(rel=%req.rel, lsn=%req.read_lsn);
3573 :
3574 0 : let req = PagestreamExistsRequest {
3575 0 : hdr: Self::make_hdr(req.read_lsn, 0),
3576 0 : rel: req.rel,
3577 0 : };
3578 :
3579 : // Execute the request and convert the response.
3580 0 : let _timer = Self::record_op_start_and_throttle(
3581 0 : &timeline,
3582 0 : metrics::SmgrQueryType::GetRelExists,
3583 0 : received_at,
3584 0 : )
3585 0 : .await?;
3586 :
3587 0 : let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
3588 0 : let resp: page_api::CheckRelExistsResponse = resp.exists;
3589 0 : Ok(tonic::Response::new(resp.into()))
3590 0 : }
3591 :
3592 : // TODO: ensure clients use gzip compression for the stream.
3593 : #[instrument(skip_all, fields(lsn))]
3594 : async fn get_base_backup(
3595 : &self,
3596 : req: tonic::Request<proto::GetBaseBackupRequest>,
3597 0 : ) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
3598 : // Send 64 KB chunks to avoid large memory allocations.
3599 : const CHUNK_SIZE: usize = 64 * 1024;
3600 :
3601 0 : let timeline = self.get_request_timeline(&req).await?;
3602 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
3603 0 :
3604 0 : // Validate the request, decorate the span, and wait for the LSN to arrive.
3605 0 : //
3606 0 : // TODO: this requires a read LSN, is that ok?
3607 0 : Self::ensure_shard_zero(&timeline)?;
3608 0 : if timeline.is_archived() == Some(true) {
3609 0 : return Err(tonic::Status::failed_precondition("timeline is archived"));
3610 0 : }
3611 0 : let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
3612 :
3613 0 : span_record!(lsn=%req.read_lsn);
3614 :
3615 0 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
3616 0 : timeline
3617 0 : .wait_lsn(
3618 0 : req.read_lsn.request_lsn,
3619 0 : WaitLsnWaiter::PageService,
3620 0 : WaitLsnTimeout::Default,
3621 0 : &ctx,
3622 0 : )
3623 0 : .await?;
3624 0 : timeline
3625 0 : .check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
3626 0 : .map_err(|err| {
3627 0 : tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
3628 0 : })?;
3629 :
3630 : // Spawn a task to run the basebackup.
3631 : //
3632 : // TODO: do we need to support full base backups, for debugging?
3633 0 : let span = Span::current();
3634 0 : let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
3635 0 : let jh = tokio::spawn(async move {
3636 0 : let result = basebackup::send_basebackup_tarball(
3637 0 : &mut simplex_write,
3638 0 : &timeline,
3639 0 : Some(req.read_lsn.request_lsn),
3640 0 : None,
3641 0 : false,
3642 0 : req.replica,
3643 0 : &ctx,
3644 0 : )
3645 0 : .instrument(span) // propagate request span
3646 0 : .await;
3647 0 : simplex_write.shutdown().await.map_err(|err| {
3648 0 : BasebackupError::Server(anyhow!("simplex shutdown failed: {err}"))
3649 0 : })?;
3650 0 : result
3651 0 : });
3652 0 :
3653 0 : // Emit chunks of size CHUNK_SIZE.
3654 0 : let chunks = async_stream::try_stream! {
3655 0 : let mut chunk = BytesMut::with_capacity(CHUNK_SIZE);
3656 0 : loop {
3657 0 : let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
3658 0 : tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
3659 0 : })?;
3660 0 :
3661 0 : // If we read 0 bytes, either the chunk is full or the stream is closed.
3662 0 : if n == 0 {
3663 0 : if chunk.is_empty() {
3664 0 : break;
3665 0 : }
3666 0 : yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze());
3667 0 : chunk.clear();
3668 0 : }
3669 0 : }
3670 0 : // Wait for the basebackup task to exit and check for errors.
3671 0 : jh.await.map_err(|err| {
3672 0 : tonic::Status::internal(format!("basebackup failed: {err}"))
3673 0 : })??;
3674 0 : };
3675 0 :
3676 0 : Ok(tonic::Response::new(Box::pin(chunks)))
3677 0 : }
3678 :
3679 : #[instrument(skip_all, fields(db_oid, lsn))]
3680 : async fn get_db_size(
3681 : &self,
3682 : req: tonic::Request<proto::GetDbSizeRequest>,
3683 0 : ) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
3684 0 : let received_at = extract::<ReceivedAt>(&req).0;
3685 0 : let timeline = self.get_request_timeline(&req).await?;
3686 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3687 0 :
3688 0 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3689 0 : Self::ensure_shard_zero(&timeline)?;
3690 0 : let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
3691 :
3692 0 : span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
3693 :
3694 0 : let req = PagestreamDbSizeRequest {
3695 0 : hdr: Self::make_hdr(req.read_lsn, 0),
3696 0 : dbnode: req.db_oid,
3697 0 : };
3698 :
3699 : // Execute the request and convert the response.
3700 0 : let _timer = Self::record_op_start_and_throttle(
3701 0 : &timeline,
3702 0 : metrics::SmgrQueryType::GetDbSize,
3703 0 : received_at,
3704 0 : )
3705 0 : .await?;
3706 :
3707 0 : let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
3708 0 : let resp = resp.db_size as page_api::GetDbSizeResponse;
3709 0 : Ok(tonic::Response::new(resp.into()))
3710 0 : }
3711 :
3712 : // NB: don't instrument this, instrument each streamed request.
3713 0 : async fn get_pages(
3714 0 : &self,
3715 0 : req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
3716 0 : ) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
3717 : // Extract the timeline from the request and check that it exists.
3718 0 : let ttid = *extract::<TenantTimelineId>(&req);
3719 0 : let shard_index = *extract::<ShardIndex>(&req);
3720 0 : let shard_selector = ShardSelector::Known(shard_index);
3721 0 :
3722 0 : let mut handles = TimelineHandles::new(self.tenant_manager.clone());
3723 0 : handles
3724 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3725 0 : .await?;
3726 :
3727 : // Spawn an IoConcurrency sidecar, if enabled.
3728 0 : let Ok(gate_guard) = self.gate_guard.try_clone() else {
3729 0 : return Err(tonic::Status::unavailable("shutting down"));
3730 : };
3731 0 : let io_concurrency =
3732 0 : IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
3733 0 :
3734 0 : // Spawn a task to handle the GetPageRequest stream.
3735 0 : let span = Span::current();
3736 0 : let ctx = self.ctx.attached_child();
3737 0 : let mut reqs = req.into_inner();
3738 0 :
3739 0 : let resps = async_stream::try_stream! {
3740 0 : let timeline = handles
3741 0 : .get(ttid.tenant_id, ttid.timeline_id, shard_selector)
3742 0 : .await?
3743 0 : .downgrade();
3744 0 : while let Some(req) = reqs.message().await? {
3745 0 : yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
3746 0 : .instrument(span.clone()) // propagate request span
3747 0 : .await?
3748 0 : }
3749 0 : };
3750 0 :
3751 0 : Ok(tonic::Response::new(Box::pin(resps)))
3752 0 : }
3753 :
3754 : #[instrument(skip_all, fields(rel, lsn))]
3755 : async fn get_rel_size(
3756 : &self,
3757 : req: tonic::Request<proto::GetRelSizeRequest>,
3758 0 : ) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
3759 0 : let received_at = extract::<ReceivedAt>(&req).0;
3760 0 : let timeline = self.get_request_timeline(&req).await?;
3761 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3762 0 :
3763 0 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3764 0 : Self::ensure_shard_zero(&timeline)?;
3765 0 : let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
3766 :
3767 0 : span_record!(rel=%req.rel, lsn=%req.read_lsn);
3768 :
3769 0 : let req = PagestreamNblocksRequest {
3770 0 : hdr: Self::make_hdr(req.read_lsn, 0),
3771 0 : rel: req.rel,
3772 0 : };
3773 :
3774 : // Execute the request and convert the response.
3775 0 : let _timer = Self::record_op_start_and_throttle(
3776 0 : &timeline,
3777 0 : metrics::SmgrQueryType::GetRelSize,
3778 0 : received_at,
3779 0 : )
3780 0 : .await?;
3781 :
3782 0 : let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
3783 0 : let resp: page_api::GetRelSizeResponse = resp.n_blocks;
3784 0 : Ok(tonic::Response::new(resp.into()))
3785 0 : }
3786 :
3787 : #[instrument(skip_all, fields(kind, segno, lsn))]
3788 : async fn get_slru_segment(
3789 : &self,
3790 : req: tonic::Request<proto::GetSlruSegmentRequest>,
3791 0 : ) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
3792 0 : let received_at = extract::<ReceivedAt>(&req).0;
3793 0 : let timeline = self.get_request_timeline(&req).await?;
3794 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3795 0 :
3796 0 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3797 0 : Self::ensure_shard_zero(&timeline)?;
3798 0 : let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
3799 :
3800 0 : span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
3801 :
3802 0 : let req = PagestreamGetSlruSegmentRequest {
3803 0 : hdr: Self::make_hdr(req.read_lsn, 0),
3804 0 : kind: req.kind as u8,
3805 0 : segno: req.segno,
3806 0 : };
3807 :
3808 : // Execute the request and convert the response.
3809 0 : let _timer = Self::record_op_start_and_throttle(
3810 0 : &timeline,
3811 0 : metrics::SmgrQueryType::GetSlruSegment,
3812 0 : received_at,
3813 0 : )
3814 0 : .await?;
3815 :
3816 0 : let resp =
3817 0 : PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
3818 0 : let resp: page_api::GetSlruSegmentResponse = resp.segment;
3819 0 : Ok(tonic::Response::new(resp.into()))
3820 0 : }
3821 : }
3822 :
3823 : /// gRPC middleware layer that handles observability concerns:
3824 : ///
3825 : /// * Creates and enters a tracing span.
3826 : /// * Records the request start time as a ReceivedAt request extension.
3827 : ///
3828 : /// TODO: add perf tracing.
3829 : /// TODO: add timing and metrics.
3830 : /// TODO: add logging.
3831 : #[derive(Clone)]
3832 : struct ObservabilityLayer;
3833 :
3834 : impl<S: tonic::server::NamedService> tower::Layer<S> for ObservabilityLayer {
3835 : type Service = ObservabilityLayerService<S>;
3836 :
3837 0 : fn layer(&self, inner: S) -> Self::Service {
3838 0 : Self::Service { inner }
3839 0 : }
3840 : }
3841 :
3842 : #[derive(Clone)]
3843 : struct ObservabilityLayerService<S> {
3844 : inner: S,
3845 : }
3846 :
3847 : #[derive(Clone, Copy)]
3848 : struct ReceivedAt(Instant);
3849 :
3850 : impl<S: tonic::server::NamedService> tonic::server::NamedService for ObservabilityLayerService<S> {
3851 : const NAME: &'static str = S::NAME; // propagate inner service name
3852 : }
3853 :
3854 : impl<S, B> tower::Service<http::Request<B>> for ObservabilityLayerService<S>
3855 : where
3856 : S: tower::Service<http::Request<B>>,
3857 : S::Future: Send + 'static,
3858 : {
3859 : type Response = S::Response;
3860 : type Error = S::Error;
3861 : type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
3862 :
3863 0 : fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
3864 0 : // Record the request start time as a request extension.
3865 0 : //
3866 0 : // TODO: we should start a timer here instead, but it currently requires a timeline handle
3867 0 : // and SmgrQueryType, which we don't have yet. Refactor it to provide it later.
3868 0 : req.extensions_mut().insert(ReceivedAt(Instant::now()));
3869 :
3870 : // Create a basic tracing span. Enter the span for the current thread (to use it for inner
3871 : // sync code like interceptors), and instrument the future (to use it for inner async code
3872 : // like the page service itself).
3873 : //
3874 : // The instrument() call below is not sufficient. It only affects the returned future, and
3875 : // only takes effect when the caller polls it. Any sync code executed when we call
3876 : // self.inner.call() below (such as interceptors) runs outside of the returned future, and
3877 : // is not affected by it. We therefore have to enter the span on the current thread too.
3878 0 : let span = info_span!(
3879 0 : "grpc:pageservice",
3880 0 : // Set by TenantMetadataInterceptor.
3881 0 : tenant_id = field::Empty,
3882 0 : timeline_id = field::Empty,
3883 0 : shard_id = field::Empty,
3884 0 : );
3885 0 : let _guard = span.enter();
3886 0 :
3887 0 : Box::pin(self.inner.call(req).instrument(span.clone()))
3888 0 : }
3889 :
3890 0 : fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
3891 0 : self.inner.poll_ready(cx)
3892 0 : }
3893 : }
3894 :
3895 : /// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
3896 : /// TenantTimelineId and ShardIndex.
3897 : #[derive(Clone)]
3898 : struct TenantMetadataInterceptor;
3899 :
3900 : impl tonic::service::Interceptor for TenantMetadataInterceptor {
3901 0 : fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
3902 : // Decode the tenant ID.
3903 0 : let tenant_id = req
3904 0 : .metadata()
3905 0 : .get("neon-tenant-id")
3906 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
3907 0 : .to_str()
3908 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
3909 0 : let tenant_id = TenantId::from_str(tenant_id)
3910 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
3911 :
3912 : // Decode the timeline ID.
3913 0 : let timeline_id = req
3914 0 : .metadata()
3915 0 : .get("neon-timeline-id")
3916 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
3917 0 : .to_str()
3918 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
3919 0 : let timeline_id = TimelineId::from_str(timeline_id)
3920 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
3921 :
3922 : // Decode the shard ID.
3923 0 : let shard_id = req
3924 0 : .metadata()
3925 0 : .get("neon-shard-id")
3926 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
3927 0 : .to_str()
3928 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
3929 0 : let shard_id = ShardIndex::from_str(shard_id)
3930 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
3931 :
3932 : // Stash them in the request.
3933 0 : let extensions = req.extensions_mut();
3934 0 : extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
3935 0 : extensions.insert(shard_id);
3936 0 :
3937 0 : // Decorate the tracing span.
3938 0 : span_record!(%tenant_id, %timeline_id, %shard_id);
3939 :
3940 0 : Ok(req)
3941 0 : }
3942 : }
3943 :
3944 : /// Authenticates gRPC page service requests.
3945 : #[derive(Clone)]
3946 : struct TenantAuthInterceptor {
3947 : auth: Option<Arc<SwappableJwtAuth>>,
3948 : }
3949 :
3950 : impl TenantAuthInterceptor {
3951 0 : fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
3952 0 : Self { auth }
3953 0 : }
3954 : }
3955 :
3956 : impl tonic::service::Interceptor for TenantAuthInterceptor {
3957 0 : fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
3958 : // Do nothing if auth is disabled.
3959 0 : let Some(auth) = self.auth.as_ref() else {
3960 0 : return Ok(req);
3961 : };
3962 :
3963 : // Fetch the tenant ID from the request extensions (set by TenantMetadataInterceptor).
3964 0 : let TenantTimelineId { tenant_id, .. } = *extract::<TenantTimelineId>(&req);
3965 :
3966 : // Fetch and decode the JWT token.
3967 0 : let jwt = req
3968 0 : .metadata()
3969 0 : .get("authorization")
3970 0 : .ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
3971 0 : .to_str()
3972 0 : .map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
3973 0 : .strip_prefix("Bearer ")
3974 0 : .ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
3975 0 : .trim();
3976 0 : let jwtdata: TokenData<Claims> = auth
3977 0 : .decode(jwt)
3978 0 : .map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
3979 0 : let claims = jwtdata.claims;
3980 0 :
3981 0 : // Check if the token is valid for this tenant.
3982 0 : check_permission(&claims, Some(tenant_id))
3983 0 : .map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
3984 :
3985 : // TODO: consider stashing the claims in the request extensions, if needed.
3986 :
3987 0 : Ok(req)
3988 0 : }
3989 : }
3990 :
3991 : /// Extracts the given type from the request extensions, or panics if it is missing.
3992 0 : fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
3993 0 : extract_from(req.extensions())
3994 0 : }
3995 :
3996 : /// Extract the given type from the request extensions, or panics if it is missing. This variant
3997 : /// can extract both from a tonic::Request and http::Request.
3998 0 : fn extract_from<T: Send + Sync + 'static>(ext: &http::Extensions) -> &T {
3999 0 : let Some(value) = ext.get::<T>() else {
4000 0 : let name = std::any::type_name::<T>();
4001 0 : panic!("extension {name} should be set by middleware");
4002 : };
4003 0 : value
4004 0 : }
4005 :
4006 : #[derive(Debug, thiserror::Error)]
4007 : pub(crate) enum GetActiveTimelineError {
4008 : #[error(transparent)]
4009 : Tenant(GetActiveTenantError),
4010 : #[error(transparent)]
4011 : Timeline(#[from] GetTimelineError),
4012 : }
4013 :
4014 : impl From<GetActiveTimelineError> for QueryError {
4015 0 : fn from(e: GetActiveTimelineError) -> Self {
4016 0 : match e {
4017 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
4018 0 : GetActiveTimelineError::Tenant(e) => e.into(),
4019 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
4020 : }
4021 0 : }
4022 : }
4023 :
4024 : impl From<GetActiveTimelineError> for tonic::Status {
4025 0 : fn from(err: GetActiveTimelineError) -> Self {
4026 0 : let message = err.to_string();
4027 0 : let code = match err {
4028 0 : GetActiveTimelineError::Tenant(err) => tonic::Status::from(err).code(),
4029 0 : GetActiveTimelineError::Timeline(err) => tonic::Status::from(err).code(),
4030 : };
4031 0 : tonic::Status::new(code, message)
4032 0 : }
4033 : }
4034 :
4035 : impl From<GetTimelineError> for tonic::Status {
4036 0 : fn from(err: GetTimelineError) -> Self {
4037 : use tonic::Code;
4038 0 : let code = match &err {
4039 0 : GetTimelineError::NotFound { .. } => Code::NotFound,
4040 0 : GetTimelineError::NotActive { .. } => Code::Unavailable,
4041 0 : GetTimelineError::ShuttingDown => Code::Unavailable,
4042 : };
4043 0 : tonic::Status::new(code, err.to_string())
4044 0 : }
4045 : }
4046 :
4047 : impl From<GetActiveTenantError> for QueryError {
4048 0 : fn from(e: GetActiveTenantError) -> Self {
4049 0 : match e {
4050 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
4051 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
4052 0 : ),
4053 : GetActiveTenantError::Cancelled
4054 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
4055 0 : QueryError::Shutdown
4056 : }
4057 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
4058 0 : e => QueryError::Other(anyhow::anyhow!(e)),
4059 : }
4060 0 : }
4061 : }
4062 :
4063 : impl From<GetActiveTenantError> for tonic::Status {
4064 0 : fn from(err: GetActiveTenantError) -> Self {
4065 : use tonic::Code;
4066 0 : let code = match &err {
4067 0 : GetActiveTenantError::Broken(_) => Code::Internal,
4068 0 : GetActiveTenantError::Cancelled => Code::Unavailable,
4069 0 : GetActiveTenantError::NotFound(_) => Code::NotFound,
4070 0 : GetActiveTenantError::SwitchedTenant => Code::Unavailable,
4071 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
4072 0 : GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
4073 : };
4074 0 : tonic::Status::new(code, err.to_string())
4075 0 : }
4076 : }
4077 :
4078 : impl From<HandleUpgradeError> for QueryError {
4079 0 : fn from(e: HandleUpgradeError) -> Self {
4080 0 : match e {
4081 0 : HandleUpgradeError::ShutDown => QueryError::Shutdown,
4082 0 : }
4083 0 : }
4084 : }
4085 :
4086 : impl From<HandleUpgradeError> for tonic::Status {
4087 0 : fn from(err: HandleUpgradeError) -> Self {
4088 0 : match err {
4089 0 : HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
4090 0 : }
4091 0 : }
4092 : }
4093 :
4094 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
4095 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
4096 0 : tracing::Span::current().record(
4097 0 : "shard_id",
4098 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
4099 0 : );
4100 0 : debug_assert_current_span_has_tenant_and_timeline_id();
4101 0 : }
4102 :
4103 : struct WaitedForLsn(Lsn);
4104 : impl From<WaitedForLsn> for Lsn {
4105 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
4106 0 : lsn
4107 0 : }
4108 : }
4109 :
4110 : #[cfg(test)]
4111 : mod tests {
4112 : use utils::shard::ShardCount;
4113 :
4114 : use super::*;
4115 :
4116 : #[test]
4117 1 : fn pageservice_cmd_parse() {
4118 1 : let tenant_id = TenantId::generate();
4119 1 : let timeline_id = TimelineId::generate();
4120 1 : let cmd =
4121 1 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
4122 1 : assert_eq!(
4123 1 : cmd,
4124 1 : PageServiceCmd::PageStream(PageStreamCmd {
4125 1 : tenant_id,
4126 1 : timeline_id,
4127 1 : protocol_version: PagestreamProtocolVersion::V2,
4128 1 : })
4129 1 : );
4130 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
4131 1 : assert_eq!(
4132 1 : cmd,
4133 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4134 1 : tenant_id,
4135 1 : timeline_id,
4136 1 : lsn: None,
4137 1 : gzip: false,
4138 1 : replica: false
4139 1 : })
4140 1 : );
4141 1 : let cmd =
4142 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
4143 1 : assert_eq!(
4144 1 : cmd,
4145 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4146 1 : tenant_id,
4147 1 : timeline_id,
4148 1 : lsn: None,
4149 1 : gzip: true,
4150 1 : replica: false
4151 1 : })
4152 1 : );
4153 1 : let cmd =
4154 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
4155 1 : assert_eq!(
4156 1 : cmd,
4157 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4158 1 : tenant_id,
4159 1 : timeline_id,
4160 1 : lsn: None,
4161 1 : gzip: false,
4162 1 : replica: false
4163 1 : })
4164 1 : );
4165 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
4166 1 : .unwrap();
4167 1 : assert_eq!(
4168 1 : cmd,
4169 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4170 1 : tenant_id,
4171 1 : timeline_id,
4172 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4173 1 : gzip: false,
4174 1 : replica: false
4175 1 : })
4176 1 : );
4177 1 : let cmd = PageServiceCmd::parse(&format!(
4178 1 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
4179 1 : ))
4180 1 : .unwrap();
4181 1 : assert_eq!(
4182 1 : cmd,
4183 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4184 1 : tenant_id,
4185 1 : timeline_id,
4186 1 : lsn: None,
4187 1 : gzip: true,
4188 1 : replica: true
4189 1 : })
4190 1 : );
4191 1 : let cmd = PageServiceCmd::parse(&format!(
4192 1 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
4193 1 : ))
4194 1 : .unwrap();
4195 1 : assert_eq!(
4196 1 : cmd,
4197 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4198 1 : tenant_id,
4199 1 : timeline_id,
4200 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4201 1 : gzip: true,
4202 1 : replica: true
4203 1 : })
4204 1 : );
4205 1 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
4206 1 : assert_eq!(
4207 1 : cmd,
4208 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4209 1 : tenant_id,
4210 1 : timeline_id,
4211 1 : lsn: None,
4212 1 : prev_lsn: None
4213 1 : })
4214 1 : );
4215 1 : let cmd = PageServiceCmd::parse(&format!(
4216 1 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
4217 1 : ))
4218 1 : .unwrap();
4219 1 : assert_eq!(
4220 1 : cmd,
4221 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4222 1 : tenant_id,
4223 1 : timeline_id,
4224 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4225 1 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
4226 1 : })
4227 1 : );
4228 1 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
4229 1 : let cmd = PageServiceCmd::parse(&format!(
4230 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4231 1 : ))
4232 1 : .unwrap();
4233 1 : assert_eq!(
4234 1 : cmd,
4235 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4236 1 : tenant_shard_id,
4237 1 : timeline_id,
4238 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4239 1 : })
4240 1 : );
4241 1 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
4242 1 : let cmd = PageServiceCmd::parse(&format!(
4243 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4244 1 : ))
4245 1 : .unwrap();
4246 1 : assert_eq!(
4247 1 : cmd,
4248 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4249 1 : tenant_shard_id,
4250 1 : timeline_id,
4251 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4252 1 : })
4253 1 : );
4254 1 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
4255 1 : assert_eq!(cmd, PageServiceCmd::Set);
4256 1 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
4257 1 : assert_eq!(cmd, PageServiceCmd::Set);
4258 1 : }
4259 :
4260 : #[test]
4261 1 : fn pageservice_cmd_err_handling() {
4262 1 : let tenant_id = TenantId::generate();
4263 1 : let timeline_id = TimelineId::generate();
4264 1 : let cmd = PageServiceCmd::parse("unknown_command");
4265 1 : assert!(cmd.is_err());
4266 1 : let cmd = PageServiceCmd::parse("pagestream_v2");
4267 1 : assert!(cmd.is_err());
4268 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
4269 1 : assert!(cmd.is_err());
4270 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
4271 1 : assert!(cmd.is_err());
4272 1 : let cmd = PageServiceCmd::parse(&format!(
4273 1 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
4274 1 : ));
4275 1 : assert!(cmd.is_err());
4276 1 : let cmd = PageServiceCmd::parse(&format!(
4277 1 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
4278 1 : ));
4279 1 : assert!(cmd.is_err());
4280 1 : let cmd = PageServiceCmd::parse(&format!(
4281 1 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
4282 1 : ));
4283 1 : assert!(cmd.is_err());
4284 1 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
4285 1 : assert!(cmd.is_err());
4286 1 : }
4287 :
4288 : #[test]
4289 1 : fn test_parse_options() {
4290 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
4291 1 : assert!(!has_error);
4292 1 : assert_eq!(
4293 1 : config,
4294 1 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
4295 1 : );
4296 :
4297 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
4298 1 : assert!(!has_error);
4299 1 : assert_eq!(
4300 1 : config,
4301 1 : vec![
4302 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4303 1 : ("foo".to_string(), "bar".to_string()),
4304 1 : ]
4305 1 : );
4306 :
4307 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
4308 1 : assert!(!has_error);
4309 1 : assert_eq!(
4310 1 : config,
4311 1 : vec![
4312 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4313 1 : ("foo".to_string(), "bar".to_string()),
4314 1 : ]
4315 1 : );
4316 :
4317 1 : let (_, has_error) = parse_options("-c");
4318 1 : assert!(has_error);
4319 :
4320 1 : let (_, has_error) = parse_options("-c foo=bar -c -c");
4321 1 : assert!(has_error);
4322 :
4323 1 : let (_, has_error) = parse_options(" ");
4324 1 : assert!(!has_error);
4325 :
4326 1 : let (_, has_error) = parse_options(" -c neon.compute_mode");
4327 1 : assert!(has_error);
4328 1 : }
4329 : }
|