Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::any::Any;
5 : use std::borrow::Cow;
6 : use std::num::NonZeroUsize;
7 : use std::os::fd::AsRawFd;
8 : use std::pin::Pin;
9 : use std::str::FromStr;
10 : use std::sync::Arc;
11 : use std::task::{Context, Poll};
12 : use std::time::{Duration, Instant, SystemTime};
13 : use std::{io, str};
14 :
15 : use anyhow::{Context as _, bail};
16 : use bytes::{Buf as _, BufMut as _, BytesMut};
17 : use chrono::Utc;
18 : use futures::future::BoxFuture;
19 : use futures::stream::FuturesUnordered;
20 : use futures::{FutureExt, Stream, StreamExt as _};
21 : use itertools::Itertools;
22 : use jsonwebtoken::TokenData;
23 : use once_cell::sync::OnceCell;
24 : use pageserver_api::config::{
25 : GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
26 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
27 : };
28 : use pageserver_api::key::rel_block_to_key;
29 : use pageserver_api::models::{PageTraceEvent, TenantState};
30 : use pageserver_api::pagestream_api::{
31 : self, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
32 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
33 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
34 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
35 : PagestreamProtocolVersion, PagestreamRequest,
36 : };
37 : use pageserver_api::reltag::SlruKind;
38 : use pageserver_api::shard::TenantShardId;
39 : use pageserver_page_api::proto;
40 : use pageserver_page_api::{self as page_api, GetPageSplitter};
41 : use postgres_backend::{
42 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
43 : };
44 : use postgres_ffi::BLCKSZ;
45 : use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
46 : use pq_proto::framed::ConnectionError;
47 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
48 : use smallvec::{SmallVec, smallvec};
49 : use strum_macros::IntoStaticStr;
50 : use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, BufWriter};
51 : use tokio::task::JoinHandle;
52 : use tokio_util::sync::CancellationToken;
53 : use tonic::service::Interceptor as _;
54 : use tonic::transport::server::TcpConnectInfo;
55 : use tracing::*;
56 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
57 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
58 : use utils::logging::log_slow;
59 : use utils::lsn::Lsn;
60 : use utils::shard::ShardIndex;
61 : use utils::simple_rcu::RcuReadGuard;
62 : use utils::sync::gate::{Gate, GateGuard};
63 : use utils::sync::spsc_fold;
64 : use utils::{failpoint_support, span_record};
65 :
66 : use crate::auth::check_permission;
67 : use crate::basebackup::{self, BasebackupError};
68 : use crate::config::PageServerConf;
69 : use crate::context::{
70 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
71 : };
72 : use crate::feature_resolver::FeatureResolver;
73 : use crate::metrics::{
74 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
75 : MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
76 : };
77 : use crate::pgdatadir_mapping::{LsnRange, Version};
78 : use crate::span::{
79 : debug_assert_current_span_has_tenant_and_timeline_id,
80 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
81 : };
82 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
83 : use crate::tenant::mgr::{
84 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
85 : };
86 : use crate::tenant::storage_layer::IoConcurrency;
87 : use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle};
88 : use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter};
89 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
90 : use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
91 :
92 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
93 : /// is not yet in state [`TenantState::Active`].
94 : ///
95 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
96 : /// HADRON: reduced timeout and we will retry in Cache::get().
97 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
98 :
99 : /// Threshold at which to log slow GetPage requests.
100 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
101 :
102 : /// The idle time before sending TCP keepalive probes for gRPC connections. The
103 : /// interval and timeout between each probe is configured via sysctl. This
104 : /// allows detecting dead connections sooner.
105 : const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
106 :
107 : /// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
108 : /// algorithm, which can cause latency spikes for small messages.
109 : const GRPC_TCP_NODELAY: bool = true;
110 :
111 : /// The interval between HTTP2 keepalive pings. This allows shutting down server
112 : /// tasks when clients are unresponsive.
113 : const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
114 :
115 : /// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
116 : const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
117 :
118 : /// Number of concurrent gRPC streams per TCP connection. We expect something
119 : /// like 8 GetPage streams per connections, plus any unary requests.
120 : const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
121 :
122 : ///////////////////////////////////////////////////////////////////////////////
123 :
124 : pub struct Listener {
125 : cancel: CancellationToken,
126 : /// Cancel the listener task through `listen_cancel` to shut down the listener
127 : /// and get a handle on the existing connections.
128 : task: JoinHandle<Connections>,
129 : }
130 :
131 : pub struct Connections {
132 : cancel: CancellationToken,
133 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
134 : gate: Gate,
135 : }
136 :
137 0 : pub fn spawn(
138 0 : conf: &'static PageServerConf,
139 0 : tenant_manager: Arc<TenantManager>,
140 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
141 0 : perf_trace_dispatch: Option<Dispatch>,
142 0 : tcp_listener: tokio::net::TcpListener,
143 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
144 0 : feature_resolver: FeatureResolver,
145 0 : ) -> Listener {
146 0 : let cancel = CancellationToken::new();
147 0 : let libpq_ctx = RequestContext::todo_child(
148 0 : TaskKind::LibpqEndpointListener,
149 : // listener task shouldn't need to download anything. (We will
150 : // create a separate sub-contexts for each connection, with their
151 : // own download behavior. This context is used only to listen and
152 : // accept connections.)
153 0 : DownloadBehavior::Error,
154 : );
155 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
156 0 : "libpq listener",
157 0 : libpq_listener_main(
158 0 : conf,
159 0 : tenant_manager,
160 0 : pg_auth,
161 0 : perf_trace_dispatch,
162 0 : tcp_listener,
163 0 : conf.pg_auth_type,
164 0 : tls_config,
165 0 : conf.page_service_pipelining.clone(),
166 0 : feature_resolver,
167 0 : libpq_ctx,
168 0 : cancel.clone(),
169 0 : )
170 0 : .map(anyhow::Ok),
171 0 : ));
172 :
173 0 : Listener { cancel, task }
174 0 : }
175 :
176 : impl Listener {
177 0 : pub async fn stop_accepting(self) -> Connections {
178 0 : self.cancel.cancel();
179 0 : self.task
180 0 : .await
181 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
182 0 : }
183 : }
184 : impl Connections {
185 0 : pub(crate) async fn shutdown(self) {
186 : let Self {
187 0 : cancel,
188 0 : mut tasks,
189 0 : gate,
190 0 : } = self;
191 0 : cancel.cancel();
192 0 : while let Some(res) = tasks.join_next().await {
193 0 : Self::handle_connection_completion(res);
194 0 : }
195 0 : gate.close().await;
196 0 : }
197 :
198 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
199 0 : match res {
200 0 : Ok(Ok(())) => {}
201 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
202 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
203 : }
204 0 : }
205 : }
206 :
207 : ///
208 : /// Main loop of the page service.
209 : ///
210 : /// Listens for connections, and launches a new handler task for each.
211 : ///
212 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
213 : /// open connections.
214 : ///
215 : #[allow(clippy::too_many_arguments)]
216 0 : pub async fn libpq_listener_main(
217 0 : conf: &'static PageServerConf,
218 0 : tenant_manager: Arc<TenantManager>,
219 0 : auth: Option<Arc<SwappableJwtAuth>>,
220 0 : perf_trace_dispatch: Option<Dispatch>,
221 0 : listener: tokio::net::TcpListener,
222 0 : auth_type: AuthType,
223 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
224 0 : pipelining_config: PageServicePipeliningConfig,
225 0 : feature_resolver: FeatureResolver,
226 0 : listener_ctx: RequestContext,
227 0 : listener_cancel: CancellationToken,
228 0 : ) -> Connections {
229 0 : let connections_cancel = CancellationToken::new();
230 0 : let connections_gate = Gate::default();
231 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
232 :
233 : loop {
234 0 : let gate_guard = match connections_gate.enter() {
235 0 : Ok(guard) => guard,
236 0 : Err(_) => break,
237 : };
238 :
239 0 : let accepted = tokio::select! {
240 : biased;
241 0 : _ = listener_cancel.cancelled() => break,
242 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
243 0 : let res = next.expect("we dont poll while empty");
244 0 : Connections::handle_connection_completion(res);
245 0 : continue;
246 : }
247 0 : accepted = listener.accept() => accepted,
248 : };
249 :
250 0 : match accepted {
251 0 : Ok((socket, peer_addr)) => {
252 : // Connection established. Spawn a new task to handle it.
253 0 : debug!("accepted connection from {}", peer_addr);
254 0 : let local_auth = auth.clone();
255 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
256 0 : .task_kind(TaskKind::PageRequestHandler)
257 0 : .download_behavior(DownloadBehavior::Download)
258 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
259 0 : .detached_child();
260 :
261 0 : connection_handler_tasks.spawn(page_service_conn_main(
262 0 : conf,
263 0 : tenant_manager.clone(),
264 0 : local_auth,
265 0 : socket,
266 0 : auth_type,
267 0 : tls_config.clone(),
268 0 : pipelining_config.clone(),
269 0 : feature_resolver.clone(),
270 0 : connection_ctx,
271 0 : connections_cancel.child_token(),
272 0 : gate_guard,
273 : ));
274 : }
275 0 : Err(err) => {
276 : // accept() failed. Log the error, and loop back to retry on next connection.
277 0 : error!("accept() failed: {:?}", err);
278 : }
279 : }
280 : }
281 :
282 0 : debug!("page_service listener loop terminated");
283 :
284 0 : Connections {
285 0 : cancel: connections_cancel,
286 0 : tasks: connection_handler_tasks,
287 0 : gate: connections_gate,
288 0 : }
289 0 : }
290 :
291 : type ConnectionHandlerResult = anyhow::Result<()>;
292 :
293 : /// Perf root spans start at the per-request level, after shard routing.
294 : /// This struct carries connection-level information to the root perf span definition.
295 : #[derive(Clone, Default)]
296 : struct ConnectionPerfSpanFields {
297 : peer_addr: String,
298 : application_name: Option<String>,
299 : compute_mode: Option<String>,
300 : }
301 :
302 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
303 : #[allow(clippy::too_many_arguments)]
304 : async fn page_service_conn_main(
305 : conf: &'static PageServerConf,
306 : tenant_manager: Arc<TenantManager>,
307 : auth: Option<Arc<SwappableJwtAuth>>,
308 : socket: tokio::net::TcpStream,
309 : auth_type: AuthType,
310 : tls_config: Option<Arc<rustls::ServerConfig>>,
311 : pipelining_config: PageServicePipeliningConfig,
312 : feature_resolver: FeatureResolver,
313 : connection_ctx: RequestContext,
314 : cancel: CancellationToken,
315 : gate_guard: GateGuard,
316 : ) -> ConnectionHandlerResult {
317 : let _guard = LIVE_CONNECTIONS
318 : .with_label_values(&["page_service"])
319 : .guard();
320 :
321 : socket
322 : .set_nodelay(true)
323 : .context("could not set TCP_NODELAY")?;
324 :
325 : let socket_fd = socket.as_raw_fd();
326 :
327 : let peer_addr = socket.peer_addr().context("get peer address")?;
328 :
329 : let perf_span_fields = ConnectionPerfSpanFields {
330 : peer_addr: peer_addr.to_string(),
331 : application_name: None, // filled in later
332 : compute_mode: None, // filled in later
333 : };
334 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
335 :
336 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
337 : // - long enough for most valid compute connections
338 : // - less than infinite to stop us from "leaking" connections to long-gone computes
339 : //
340 : // no write timeout is used, because the kernel is assumed to error writes after some time.
341 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
342 :
343 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
344 0 : let socket_timeout_ms = (|| {
345 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
346 : // Exponential distribution for simulating
347 : // poor network conditions, expect about avg_timeout_ms to be around 15
348 : // in tests
349 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
350 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
351 0 : let u = rand::random::<f32>();
352 0 : ((1.0 - u).ln() / (-avg)) as u64
353 : } else {
354 0 : default_timeout_ms
355 : }
356 0 : });
357 0 : default_timeout_ms
358 : })();
359 :
360 : // A timeout here does not mean the client died, it can happen if it's just idle for
361 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
362 : // they reconnect.
363 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
364 : let socket = Box::pin(socket);
365 :
366 : fail::fail_point!("ps::connection-start::pre-login");
367 :
368 : // XXX: pgbackend.run() should take the connection_ctx,
369 : // and create a child per-query context when it invokes process_query.
370 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
371 : // and create the per-query context in process_query ourselves.
372 : let mut conn_handler = PageServerHandler::new(
373 : tenant_manager,
374 : auth,
375 : pipelining_config,
376 : conf.get_vectored_concurrent_io,
377 : perf_span_fields,
378 : connection_ctx,
379 : cancel.clone(),
380 : feature_resolver.clone(),
381 : gate_guard,
382 : );
383 : let pgbackend =
384 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
385 :
386 : match pgbackend.run(&mut conn_handler, &cancel).await {
387 : Ok(()) => {
388 : // we've been requested to shut down
389 : Ok(())
390 : }
391 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
392 : if is_expected_io_error(&io_error) {
393 : info!("Postgres client disconnected ({io_error})");
394 : Ok(())
395 : } else {
396 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
397 : Err(io_error).context(format!(
398 : "Postgres connection error for tenant_id={tenant_id:?} client at peer_addr={peer_addr}"
399 : ))
400 : }
401 : }
402 : other => {
403 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
404 : other.context(format!(
405 : "Postgres query error for tenant_id={tenant_id:?} client peer_addr={peer_addr}"
406 : ))
407 : }
408 : }
409 : }
410 :
411 : /// Page service connection handler.
412 : struct PageServerHandler {
413 : auth: Option<Arc<SwappableJwtAuth>>,
414 : claims: Option<Claims>,
415 :
416 : /// The context created for the lifetime of the connection
417 : /// services by this PageServerHandler.
418 : /// For each query received over the connection,
419 : /// `process_query` creates a child context from this one.
420 : connection_ctx: RequestContext,
421 :
422 : perf_span_fields: ConnectionPerfSpanFields,
423 :
424 : cancel: CancellationToken,
425 :
426 : /// None only while pagestream protocol is being processed.
427 : timeline_handles: Option<TimelineHandles>,
428 :
429 : pipelining_config: PageServicePipeliningConfig,
430 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
431 :
432 : feature_resolver: FeatureResolver,
433 :
434 : gate_guard: GateGuard,
435 : }
436 :
437 : struct TimelineHandles {
438 : wrapper: TenantManagerWrapper,
439 : /// Note on size: the typical size of this map is 1. The largest size we expect
440 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
441 : /// or the ratio used when splitting shards (i.e. how many children created from one)
442 : /// parent shard, where a "large" number might be ~8.
443 : handles: timeline::handle::Cache<TenantManagerTypes>,
444 : }
445 :
446 : impl TimelineHandles {
447 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
448 0 : Self {
449 0 : wrapper: TenantManagerWrapper {
450 0 : tenant_manager,
451 0 : tenant_id: OnceCell::new(),
452 0 : },
453 0 : handles: Default::default(),
454 0 : }
455 0 : }
456 0 : async fn get(
457 0 : &mut self,
458 0 : tenant_id: TenantId,
459 0 : timeline_id: TimelineId,
460 0 : shard_selector: ShardSelector,
461 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
462 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
463 0 : return Err(GetActiveTimelineError::Tenant(
464 0 : GetActiveTenantError::SwitchedTenant,
465 0 : ));
466 0 : }
467 0 : self.handles
468 0 : .get(timeline_id, shard_selector, &self.wrapper)
469 0 : .await
470 0 : }
471 :
472 0 : fn tenant_id(&self) -> Option<TenantId> {
473 0 : self.wrapper.tenant_id.get().copied()
474 0 : }
475 : }
476 :
477 : pub(crate) struct TenantManagerWrapper {
478 : tenant_manager: Arc<TenantManager>,
479 : // We do not support switching tenant_id on a connection at this point.
480 : // We can can add support for this later if needed without changing
481 : // the protocol.
482 : tenant_id: once_cell::sync::OnceCell<TenantId>,
483 : }
484 :
485 : pub(crate) struct TenantManagerTypes;
486 :
487 : impl timeline::handle::Types for TenantManagerTypes {
488 : type TenantManager = TenantManagerWrapper;
489 : type Timeline = TenantManagerCacheItem;
490 : }
491 :
492 : pub(crate) struct TenantManagerCacheItem {
493 : pub(crate) timeline: Arc<Timeline>,
494 : // allow() for cheap propagation through RequestContext inside a task
495 : #[allow(clippy::redundant_allocation)]
496 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
497 : #[allow(dead_code)] // we store it to keep the gate open
498 : pub(crate) gate_guard: GateGuard,
499 : }
500 :
501 : impl std::ops::Deref for TenantManagerCacheItem {
502 : type Target = Arc<Timeline>;
503 0 : fn deref(&self) -> &Self::Target {
504 0 : &self.timeline
505 0 : }
506 : }
507 :
508 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
509 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
510 0 : Timeline::shard_timeline_id(&self.timeline)
511 0 : }
512 :
513 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
514 0 : &self.timeline.handles
515 0 : }
516 :
517 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
518 0 : Timeline::get_shard_identity(&self.timeline)
519 0 : }
520 : }
521 :
522 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
523 0 : async fn resolve(
524 0 : &self,
525 0 : timeline_id: TimelineId,
526 0 : shard_selector: ShardSelector,
527 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
528 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
529 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
530 0 : let wait_start = Instant::now();
531 0 : let deadline = wait_start + timeout;
532 0 : let tenant_shard = loop {
533 0 : let resolved = self
534 0 : .tenant_manager
535 0 : .resolve_attached_shard(tenant_id, shard_selector);
536 0 : match resolved {
537 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
538 : ShardResolveResult::NotFound => {
539 0 : MISROUTED_PAGESTREAM_REQUESTS.inc();
540 0 : return Err(GetActiveTimelineError::Tenant(
541 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
542 0 : ));
543 : }
544 0 : ShardResolveResult::InProgress(barrier) => {
545 : // We can't authoritatively answer right now: wait for InProgress state
546 : // to end, then try again
547 0 : tokio::select! {
548 0 : _ = barrier.wait() => {
549 0 : // The barrier completed: proceed around the loop to try looking up again
550 0 : },
551 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
552 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
553 0 : latest_state: None,
554 0 : wait_time: timeout,
555 0 : }));
556 : }
557 : }
558 : }
559 : };
560 : };
561 :
562 0 : tracing::debug!("Waiting for tenant to enter active state...");
563 0 : tenant_shard
564 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
565 0 : .await
566 0 : .map_err(GetActiveTimelineError::Tenant)?;
567 :
568 0 : let timeline = tenant_shard
569 0 : .get_timeline(timeline_id, true)
570 0 : .map_err(GetActiveTimelineError::Timeline)?;
571 :
572 0 : let gate_guard = match timeline.gate.enter() {
573 0 : Ok(guard) => guard,
574 : Err(_) => {
575 0 : return Err(GetActiveTimelineError::Timeline(
576 0 : GetTimelineError::ShuttingDown,
577 0 : ));
578 : }
579 : };
580 :
581 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
582 :
583 0 : Ok(TenantManagerCacheItem {
584 0 : timeline,
585 0 : metrics,
586 0 : gate_guard,
587 0 : })
588 0 : }
589 : }
590 :
591 : /// Whether to hold the applied GC cutoff guard when processing GetPage requests.
592 : /// This is determined once at the start of pagestream subprotocol handling based on
593 : /// feature flags, configuration, and test conditions.
594 : #[derive(Debug, Clone, Copy)]
595 : enum HoldAppliedGcCutoffGuard {
596 : Yes,
597 : No,
598 : }
599 :
600 : #[derive(thiserror::Error, Debug)]
601 : enum PageStreamError {
602 : /// We encountered an error that should prompt the client to reconnect:
603 : /// in practice this means we drop the connection without sending a response.
604 : #[error("Reconnect required: {0}")]
605 : Reconnect(Cow<'static, str>),
606 :
607 : /// We were instructed to shutdown while processing the query
608 : #[error("Shutting down")]
609 : Shutdown,
610 :
611 : /// Something went wrong reading a page: this likely indicates a pageserver bug
612 : #[error("Read error")]
613 : Read(#[source] PageReconstructError),
614 :
615 : /// Ran out of time waiting for an LSN
616 : #[error("LSN timeout: {0}")]
617 : LsnTimeout(WaitLsnError),
618 :
619 : /// The entity required to serve the request (tenant or timeline) is not found,
620 : /// or is not found in a suitable state to serve a request.
621 : #[error("Not found: {0}")]
622 : NotFound(Cow<'static, str>),
623 :
624 : /// Request asked for something that doesn't make sense, like an invalid LSN
625 : #[error("Bad request: {0}")]
626 : BadRequest(Cow<'static, str>),
627 : }
628 :
629 : impl From<PageStreamError> for tonic::Status {
630 0 : fn from(err: PageStreamError) -> Self {
631 : use tonic::Code;
632 0 : let message = err.to_string();
633 0 : let code = match err {
634 0 : PageStreamError::Reconnect(_) => Code::Unavailable,
635 0 : PageStreamError::Shutdown => Code::Unavailable,
636 0 : PageStreamError::Read(err) => match err {
637 0 : PageReconstructError::Cancelled => Code::Unavailable,
638 0 : PageReconstructError::MissingKey(_) => Code::NotFound,
639 0 : PageReconstructError::AncestorLsnTimeout(err) => tonic::Status::from(err).code(),
640 0 : PageReconstructError::Other(_) => Code::Internal,
641 0 : PageReconstructError::WalRedo(_) => Code::Internal,
642 : },
643 0 : PageStreamError::LsnTimeout(err) => tonic::Status::from(err).code(),
644 0 : PageStreamError::NotFound(_) => Code::NotFound,
645 0 : PageStreamError::BadRequest(_) => Code::InvalidArgument,
646 : };
647 0 : tonic::Status::new(code, message)
648 0 : }
649 : }
650 :
651 : impl From<PageReconstructError> for PageStreamError {
652 0 : fn from(value: PageReconstructError) -> Self {
653 0 : match value {
654 0 : PageReconstructError::Cancelled => Self::Shutdown,
655 0 : e => Self::Read(e),
656 : }
657 0 : }
658 : }
659 :
660 : impl From<GetActiveTimelineError> for PageStreamError {
661 0 : fn from(value: GetActiveTimelineError) -> Self {
662 0 : match value {
663 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
664 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
665 : TenantState::Stopping { .. },
666 : ))
667 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
668 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
669 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
670 : }
671 0 : }
672 : }
673 :
674 : impl From<WaitLsnError> for PageStreamError {
675 0 : fn from(value: WaitLsnError) -> Self {
676 0 : match value {
677 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
678 0 : WaitLsnError::Shutdown => Self::Shutdown,
679 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
680 : }
681 0 : }
682 : }
683 :
684 : impl From<WaitLsnError> for QueryError {
685 0 : fn from(value: WaitLsnError) -> Self {
686 0 : match value {
687 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
688 0 : WaitLsnError::Shutdown => Self::Shutdown,
689 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
690 : }
691 0 : }
692 : }
693 :
694 : #[derive(thiserror::Error, Debug)]
695 : struct BatchedPageStreamError {
696 : req: PagestreamRequest,
697 : err: PageStreamError,
698 : }
699 :
700 : impl std::fmt::Display for BatchedPageStreamError {
701 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
702 0 : self.err.fmt(f)
703 0 : }
704 : }
705 :
706 : struct BatchedGetPageRequest {
707 : req: PagestreamGetPageRequest,
708 : timer: SmgrOpTimer,
709 : lsn_range: LsnRange,
710 : ctx: RequestContext,
711 : // If the request is perf enabled, this contains a context
712 : // with a perf span tracking the time spent waiting for the executor.
713 : batch_wait_ctx: Option<RequestContext>,
714 : }
715 :
716 : #[cfg(feature = "testing")]
717 : struct BatchedTestRequest {
718 : req: pagestream_api::PagestreamTestRequest,
719 : timer: SmgrOpTimer,
720 : }
721 :
722 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
723 : /// so that we don't keep the [`Timeline::gate`] open while the batch
724 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
725 : #[derive(IntoStaticStr)]
726 : #[allow(clippy::large_enum_variant)]
727 : enum BatchedFeMessage {
728 : Exists {
729 : span: Span,
730 : timer: SmgrOpTimer,
731 : shard: WeakHandle<TenantManagerTypes>,
732 : req: PagestreamExistsRequest,
733 : },
734 : Nblocks {
735 : span: Span,
736 : timer: SmgrOpTimer,
737 : shard: WeakHandle<TenantManagerTypes>,
738 : req: PagestreamNblocksRequest,
739 : },
740 : GetPage {
741 : span: Span,
742 : shard: WeakHandle<TenantManagerTypes>,
743 : applied_gc_cutoff_guard: Option<RcuReadGuard<Lsn>>,
744 : pages: SmallVec<[BatchedGetPageRequest; 1]>,
745 : batch_break_reason: GetPageBatchBreakReason,
746 : },
747 : DbSize {
748 : span: Span,
749 : timer: SmgrOpTimer,
750 : shard: WeakHandle<TenantManagerTypes>,
751 : req: PagestreamDbSizeRequest,
752 : },
753 : GetSlruSegment {
754 : span: Span,
755 : timer: SmgrOpTimer,
756 : shard: WeakHandle<TenantManagerTypes>,
757 : req: PagestreamGetSlruSegmentRequest,
758 : },
759 : #[cfg(feature = "testing")]
760 : Test {
761 : span: Span,
762 : shard: WeakHandle<TenantManagerTypes>,
763 : requests: Vec<BatchedTestRequest>,
764 : },
765 : RespondError {
766 : span: Span,
767 : error: BatchedPageStreamError,
768 : },
769 : }
770 :
771 : impl BatchedFeMessage {
772 0 : fn as_static_str(&self) -> &'static str {
773 0 : self.into()
774 0 : }
775 :
776 0 : fn observe_execution_start(&mut self, at: Instant) {
777 0 : match self {
778 0 : BatchedFeMessage::Exists { timer, .. }
779 0 : | BatchedFeMessage::Nblocks { timer, .. }
780 0 : | BatchedFeMessage::DbSize { timer, .. }
781 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
782 0 : timer.observe_execution_start(at);
783 0 : }
784 0 : BatchedFeMessage::GetPage { pages, .. } => {
785 0 : for page in pages {
786 0 : page.timer.observe_execution_start(at);
787 0 : }
788 : }
789 : #[cfg(feature = "testing")]
790 0 : BatchedFeMessage::Test { requests, .. } => {
791 0 : for req in requests {
792 0 : req.timer.observe_execution_start(at);
793 0 : }
794 : }
795 0 : BatchedFeMessage::RespondError { .. } => {}
796 : }
797 0 : }
798 :
799 0 : fn should_break_batch(
800 0 : &self,
801 0 : other: &BatchedFeMessage,
802 0 : max_batch_size: NonZeroUsize,
803 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
804 0 : ) -> Option<GetPageBatchBreakReason> {
805 0 : match (self, other) {
806 : (
807 : BatchedFeMessage::GetPage {
808 0 : shard: accum_shard,
809 0 : pages: accum_pages,
810 : ..
811 : },
812 : BatchedFeMessage::GetPage {
813 0 : shard: this_shard,
814 0 : pages: this_pages,
815 : ..
816 : },
817 : ) => {
818 0 : assert_eq!(this_pages.len(), 1);
819 0 : if accum_pages.len() >= max_batch_size.get() {
820 0 : trace!(%max_batch_size, "stopping batching because of batch size");
821 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
822 :
823 0 : return Some(GetPageBatchBreakReason::BatchFull);
824 0 : }
825 0 : if !accum_shard.is_same_handle_as(this_shard) {
826 0 : trace!("stopping batching because timeline object mismatch");
827 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
828 : // But the current logic for keeping responses in order does not support that.
829 :
830 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
831 0 : }
832 :
833 0 : match batching_strategy {
834 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
835 0 : if let Some(last_in_batch) = accum_pages.last() {
836 0 : if last_in_batch.lsn_range.effective_lsn
837 0 : != this_pages[0].lsn_range.effective_lsn
838 : {
839 0 : trace!(
840 : accum_lsn = %last_in_batch.lsn_range.effective_lsn,
841 0 : this_lsn = %this_pages[0].lsn_range.effective_lsn,
842 0 : "stopping batching because LSN changed"
843 : );
844 :
845 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
846 0 : }
847 0 : }
848 : }
849 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
850 : // The read path doesn't curently support serving the same page at different LSNs.
851 : // While technically possible, it's uncertain if the complexity is worth it.
852 : // Break the batch if such a case is encountered.
853 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
854 0 : batched.req.rel == this_pages[0].req.rel
855 0 : && batched.req.blkno == this_pages[0].req.blkno
856 0 : && batched.lsn_range.effective_lsn
857 0 : != this_pages[0].lsn_range.effective_lsn
858 0 : });
859 :
860 0 : if same_page_different_lsn {
861 0 : trace!(
862 0 : rel=%this_pages[0].req.rel,
863 0 : blkno=%this_pages[0].req.blkno,
864 0 : lsn=%this_pages[0].lsn_range.effective_lsn,
865 0 : "stopping batching because same page was requested at different LSNs"
866 : );
867 :
868 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
869 0 : }
870 : }
871 : }
872 :
873 0 : None
874 : }
875 : #[cfg(feature = "testing")]
876 : (
877 : BatchedFeMessage::Test {
878 0 : shard: accum_shard,
879 0 : requests: accum_requests,
880 : ..
881 : },
882 : BatchedFeMessage::Test {
883 0 : shard: this_shard,
884 0 : requests: this_requests,
885 : ..
886 : },
887 : ) => {
888 0 : assert!(this_requests.len() == 1);
889 0 : if accum_requests.len() >= max_batch_size.get() {
890 0 : trace!(%max_batch_size, "stopping batching because of batch size");
891 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
892 0 : return Some(GetPageBatchBreakReason::BatchFull);
893 0 : }
894 0 : if !accum_shard.is_same_handle_as(this_shard) {
895 0 : trace!("stopping batching because timeline object mismatch");
896 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
897 : // But the current logic for keeping responses in order does not support that.
898 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
899 0 : }
900 0 : let this_batch_key = this_requests[0].req.batch_key;
901 0 : let accum_batch_key = accum_requests[0].req.batch_key;
902 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
903 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
904 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
905 0 : }
906 0 : None
907 : }
908 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
909 : }
910 0 : }
911 : }
912 :
913 : impl PageServerHandler {
914 : #[allow(clippy::too_many_arguments)]
915 0 : pub fn new(
916 0 : tenant_manager: Arc<TenantManager>,
917 0 : auth: Option<Arc<SwappableJwtAuth>>,
918 0 : pipelining_config: PageServicePipeliningConfig,
919 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
920 0 : perf_span_fields: ConnectionPerfSpanFields,
921 0 : connection_ctx: RequestContext,
922 0 : cancel: CancellationToken,
923 0 : feature_resolver: FeatureResolver,
924 0 : gate_guard: GateGuard,
925 0 : ) -> Self {
926 0 : PageServerHandler {
927 0 : auth,
928 0 : claims: None,
929 0 : connection_ctx,
930 0 : perf_span_fields,
931 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
932 0 : cancel,
933 0 : pipelining_config,
934 0 : get_vectored_concurrent_io,
935 0 : feature_resolver,
936 0 : gate_guard,
937 0 : }
938 0 : }
939 :
940 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
941 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
942 : /// cancellation if there aren't any timelines in the cache.
943 : ///
944 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
945 : /// timeline cancellation token.
946 0 : async fn flush_cancellable<IO>(
947 0 : &self,
948 0 : pgb: &mut PostgresBackend<IO>,
949 0 : cancel: &CancellationToken,
950 0 : ) -> Result<(), QueryError>
951 0 : where
952 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
953 0 : {
954 0 : tokio::select!(
955 0 : flush_r = pgb.flush() => {
956 0 : Ok(flush_r?)
957 : },
958 0 : _ = cancel.cancelled() => {
959 0 : Err(QueryError::Shutdown)
960 : }
961 : )
962 0 : }
963 :
964 : #[allow(clippy::too_many_arguments)]
965 0 : async fn pagestream_read_message<IO>(
966 0 : pgb: &mut PostgresBackendReader<IO>,
967 0 : tenant_id: TenantId,
968 0 : timeline_id: TimelineId,
969 0 : timeline_handles: &mut TimelineHandles,
970 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
971 0 : cancel: &CancellationToken,
972 0 : ctx: &RequestContext,
973 0 : protocol_version: PagestreamProtocolVersion,
974 0 : parent_span: Span,
975 0 : hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
976 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
977 0 : where
978 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
979 0 : {
980 0 : let msg = tokio::select! {
981 : biased;
982 0 : _ = cancel.cancelled() => {
983 0 : return Err(QueryError::Shutdown)
984 : }
985 0 : msg = pgb.read_message() => { msg }
986 : };
987 :
988 0 : let received_at = Instant::now();
989 :
990 0 : let copy_data_bytes = match msg? {
991 0 : Some(FeMessage::CopyData(bytes)) => bytes,
992 : Some(FeMessage::Terminate) => {
993 0 : return Ok(None);
994 : }
995 0 : Some(m) => {
996 0 : return Err(QueryError::Other(anyhow::anyhow!(
997 0 : "unexpected message: {m:?} during COPY"
998 0 : )));
999 : }
1000 : None => {
1001 0 : return Ok(None);
1002 : } // client disconnected
1003 : };
1004 0 : trace!("query: {copy_data_bytes:?}");
1005 :
1006 0 : fail::fail_point!("ps::handle-pagerequest-message");
1007 :
1008 : // parse request
1009 0 : let neon_fe_msg =
1010 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
1011 :
1012 0 : let batched_msg = match neon_fe_msg {
1013 0 : PagestreamFeMessage::Exists(req) => {
1014 0 : let shard = timeline_handles
1015 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1016 0 : .await?;
1017 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1018 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1019 0 : let timer = Self::record_op_start_and_throttle(
1020 0 : &shard,
1021 0 : metrics::SmgrQueryType::GetRelExists,
1022 0 : received_at,
1023 0 : )
1024 0 : .await?;
1025 0 : BatchedFeMessage::Exists {
1026 0 : span,
1027 0 : timer,
1028 0 : shard: shard.downgrade(),
1029 0 : req,
1030 0 : }
1031 : }
1032 0 : PagestreamFeMessage::Nblocks(req) => {
1033 0 : let shard = timeline_handles
1034 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1035 0 : .await?;
1036 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1037 0 : let timer = Self::record_op_start_and_throttle(
1038 0 : &shard,
1039 0 : metrics::SmgrQueryType::GetRelSize,
1040 0 : received_at,
1041 0 : )
1042 0 : .await?;
1043 0 : BatchedFeMessage::Nblocks {
1044 0 : span,
1045 0 : timer,
1046 0 : shard: shard.downgrade(),
1047 0 : req,
1048 0 : }
1049 : }
1050 0 : PagestreamFeMessage::DbSize(req) => {
1051 0 : let shard = timeline_handles
1052 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1053 0 : .await?;
1054 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1055 0 : let timer = Self::record_op_start_and_throttle(
1056 0 : &shard,
1057 0 : metrics::SmgrQueryType::GetDbSize,
1058 0 : received_at,
1059 0 : )
1060 0 : .await?;
1061 0 : BatchedFeMessage::DbSize {
1062 0 : span,
1063 0 : timer,
1064 0 : shard: shard.downgrade(),
1065 0 : req,
1066 0 : }
1067 : }
1068 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1069 0 : let shard = timeline_handles
1070 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1071 0 : .await?;
1072 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1073 0 : let timer = Self::record_op_start_and_throttle(
1074 0 : &shard,
1075 0 : metrics::SmgrQueryType::GetSlruSegment,
1076 0 : received_at,
1077 0 : )
1078 0 : .await?;
1079 0 : BatchedFeMessage::GetSlruSegment {
1080 0 : span,
1081 0 : timer,
1082 0 : shard: shard.downgrade(),
1083 0 : req,
1084 0 : }
1085 : }
1086 0 : PagestreamFeMessage::GetPage(req) => {
1087 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1088 : macro_rules! mkspan {
1089 : (before shard routing) => {{
1090 : tracing::info_span!(
1091 : parent: &parent_span,
1092 : "handle_get_page_request",
1093 : request_id = %req.hdr.reqid,
1094 : rel = %req.rel,
1095 : blkno = %req.blkno,
1096 : req_lsn = %req.hdr.request_lsn,
1097 : not_modified_since_lsn = %req.hdr.not_modified_since,
1098 : )
1099 : }};
1100 : ($shard_id:expr) => {{
1101 : tracing::info_span!(
1102 : parent: &parent_span,
1103 : "handle_get_page_request",
1104 : request_id = %req.hdr.reqid,
1105 : rel = %req.rel,
1106 : blkno = %req.blkno,
1107 : req_lsn = %req.hdr.request_lsn,
1108 : not_modified_since_lsn = %req.hdr.not_modified_since,
1109 : shard_id = %$shard_id,
1110 : )
1111 : }};
1112 : }
1113 :
1114 : macro_rules! respond_error {
1115 : ($span:expr, $error:expr) => {{
1116 : let error = BatchedFeMessage::RespondError {
1117 : span: $span,
1118 : error: BatchedPageStreamError {
1119 : req: req.hdr,
1120 : err: $error,
1121 : },
1122 : };
1123 : Ok(Some(error))
1124 : }};
1125 : }
1126 :
1127 0 : let key = rel_block_to_key(req.rel, req.blkno);
1128 :
1129 0 : let res = timeline_handles
1130 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1131 0 : .await;
1132 :
1133 0 : let shard = match res {
1134 0 : Ok(tl) => tl,
1135 0 : Err(e) => {
1136 0 : let span = mkspan!(before shard routing);
1137 0 : match e {
1138 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1139 : // We already know this tenant exists in general, because we resolved it at
1140 : // start of connection. Getting a NotFound here indicates that the shard containing
1141 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1142 : // mapping is out of date.
1143 : //
1144 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1145 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1146 : // and talk to a different pageserver.
1147 0 : MISROUTED_PAGESTREAM_REQUESTS.inc();
1148 0 : return respond_error!(
1149 0 : span,
1150 0 : PageStreamError::Reconnect(
1151 0 : "getpage@lsn request routed to wrong shard".into()
1152 0 : )
1153 : );
1154 : }
1155 0 : e => {
1156 0 : return respond_error!(span, e.into());
1157 : }
1158 : }
1159 : }
1160 : };
1161 :
1162 0 : let ctx = if shard.is_get_page_request_sampled() {
1163 0 : RequestContextBuilder::from(ctx)
1164 0 : .root_perf_span(|| {
1165 0 : info_span!(
1166 : target: PERF_TRACE_TARGET,
1167 : "GET_PAGE",
1168 : peer_addr = conn_perf_span_fields.peer_addr,
1169 : application_name = conn_perf_span_fields.application_name,
1170 : compute_mode = conn_perf_span_fields.compute_mode,
1171 : tenant_id = %tenant_id,
1172 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1173 : timeline_id = %timeline_id,
1174 : lsn = %req.hdr.request_lsn,
1175 : not_modified_since_lsn = %req.hdr.not_modified_since,
1176 : request_id = %req.hdr.reqid,
1177 : key = %key,
1178 : )
1179 0 : })
1180 0 : .attached_child()
1181 : } else {
1182 0 : ctx.attached_child()
1183 : };
1184 :
1185 : // This ctx travels as part of the BatchedFeMessage through
1186 : // batching into the request handler.
1187 : // The request handler needs to do some per-request work
1188 : // (relsize check) before dispatching the batch as a single
1189 : // get_vectored call to the Timeline.
1190 : // This ctx will be used for the reslize check, whereas the
1191 : // get_vectored call will be a different ctx with separate
1192 : // perf span.
1193 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1194 :
1195 : // Similar game for this `span`: we funnel it through so that
1196 : // request handler log messages contain the request-specific fields.
1197 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1198 :
1199 0 : let timer = Self::record_op_start_and_throttle(
1200 0 : &shard,
1201 0 : metrics::SmgrQueryType::GetPageAtLsn,
1202 0 : received_at,
1203 : )
1204 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1205 0 : info_span!(
1206 : target: PERF_TRACE_TARGET,
1207 0 : parent: current_perf_span,
1208 : "THROTTLE",
1209 : )
1210 0 : })
1211 0 : .await?;
1212 :
1213 0 : let applied_gc_cutoff_guard = shard.get_applied_gc_cutoff_lsn(); // hold guard
1214 : // We're holding the Handle
1215 0 : let effective_lsn = match Self::effective_request_lsn(
1216 0 : &shard,
1217 0 : shard.get_last_record_lsn(),
1218 0 : req.hdr.request_lsn,
1219 0 : req.hdr.not_modified_since,
1220 0 : &applied_gc_cutoff_guard,
1221 0 : ) {
1222 0 : Ok(lsn) => lsn,
1223 0 : Err(e) => {
1224 0 : return respond_error!(span, e);
1225 : }
1226 : };
1227 0 : let applied_gc_cutoff_guard = match hold_gc_cutoff_guard {
1228 0 : HoldAppliedGcCutoffGuard::Yes => Some(applied_gc_cutoff_guard),
1229 : HoldAppliedGcCutoffGuard::No => {
1230 0 : drop(applied_gc_cutoff_guard);
1231 0 : None
1232 : }
1233 : };
1234 :
1235 0 : let batch_wait_ctx = if ctx.has_perf_span() {
1236 : Some(
1237 0 : RequestContextBuilder::from(&ctx)
1238 0 : .perf_span(|crnt_perf_span| {
1239 0 : info_span!(
1240 : target: PERF_TRACE_TARGET,
1241 0 : parent: crnt_perf_span,
1242 : "WAIT_EXECUTOR",
1243 : )
1244 0 : })
1245 0 : .attached_child(),
1246 : )
1247 : } else {
1248 0 : None
1249 : };
1250 :
1251 : BatchedFeMessage::GetPage {
1252 0 : span,
1253 0 : shard: shard.downgrade(),
1254 0 : applied_gc_cutoff_guard,
1255 0 : pages: smallvec![BatchedGetPageRequest {
1256 0 : req,
1257 0 : timer,
1258 0 : lsn_range: LsnRange {
1259 0 : effective_lsn,
1260 0 : request_lsn: req.hdr.request_lsn
1261 0 : },
1262 0 : ctx,
1263 0 : batch_wait_ctx,
1264 0 : }],
1265 : // The executor grabs the batch when it becomes idle.
1266 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1267 : // default reason for breaking the batch.
1268 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1269 : }
1270 : }
1271 : #[cfg(feature = "testing")]
1272 0 : PagestreamFeMessage::Test(req) => {
1273 0 : let shard = timeline_handles
1274 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1275 0 : .await?;
1276 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1277 0 : let timer = Self::record_op_start_and_throttle(
1278 0 : &shard,
1279 0 : metrics::SmgrQueryType::Test,
1280 0 : received_at,
1281 0 : )
1282 0 : .await?;
1283 0 : BatchedFeMessage::Test {
1284 0 : span,
1285 0 : shard: shard.downgrade(),
1286 0 : requests: vec![BatchedTestRequest { req, timer }],
1287 0 : }
1288 : }
1289 : };
1290 0 : Ok(Some(batched_msg))
1291 0 : }
1292 :
1293 : /// Starts a SmgrOpTimer at received_at and throttles the request.
1294 0 : async fn record_op_start_and_throttle(
1295 0 : shard: &Handle<TenantManagerTypes>,
1296 0 : op: metrics::SmgrQueryType,
1297 0 : received_at: Instant,
1298 0 : ) -> Result<SmgrOpTimer, QueryError> {
1299 : // It's important to start the smgr op metric recorder as early as possible
1300 : // so that the _started counters are incremented before we do
1301 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
1302 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
1303 0 : let now = Instant::now();
1304 0 : timer.observe_throttle_start(now);
1305 0 : let throttled = tokio::select! {
1306 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
1307 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
1308 : };
1309 0 : timer.observe_throttle_done(throttled);
1310 0 : Ok(timer)
1311 0 : }
1312 :
1313 : /// Post-condition: `batch` is Some()
1314 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1315 : #[allow(clippy::boxed_local)]
1316 : fn pagestream_do_batch(
1317 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1318 : max_batch_size: NonZeroUsize,
1319 : batch: &mut Result<BatchedFeMessage, QueryError>,
1320 : this_msg: Result<BatchedFeMessage, QueryError>,
1321 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1322 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1323 :
1324 : let this_msg = match this_msg {
1325 : Ok(this_msg) => this_msg,
1326 : Err(e) => return Err(Err(e)),
1327 : };
1328 :
1329 : let eligible_batch = match batch {
1330 : Ok(b) => b,
1331 : Err(_) => {
1332 : return Err(Ok(this_msg));
1333 : }
1334 : };
1335 :
1336 : let batch_break =
1337 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1338 :
1339 : match batch_break {
1340 : Some(reason) => {
1341 : if let BatchedFeMessage::GetPage {
1342 : batch_break_reason, ..
1343 : } = eligible_batch
1344 : {
1345 : *batch_break_reason = reason;
1346 : }
1347 :
1348 : Err(Ok(this_msg))
1349 : }
1350 : None => {
1351 : // ok to batch
1352 : match (eligible_batch, this_msg) {
1353 : (
1354 : BatchedFeMessage::GetPage {
1355 : pages: accum_pages,
1356 : applied_gc_cutoff_guard: accum_applied_gc_cutoff_guard,
1357 : ..
1358 : },
1359 : BatchedFeMessage::GetPage {
1360 : pages: this_pages,
1361 : applied_gc_cutoff_guard: this_applied_gc_cutoff_guard,
1362 : ..
1363 : },
1364 : ) => {
1365 : accum_pages.extend(this_pages);
1366 : // the minimum of the two guards will keep data for both alive
1367 : match (&accum_applied_gc_cutoff_guard, this_applied_gc_cutoff_guard) {
1368 : (None, None) => (),
1369 : (None, Some(this)) => *accum_applied_gc_cutoff_guard = Some(this),
1370 : (Some(_), None) => (),
1371 : (Some(accum), Some(this)) => {
1372 : if **accum > *this {
1373 : *accum_applied_gc_cutoff_guard = Some(this);
1374 : }
1375 : }
1376 : };
1377 : Ok(())
1378 : }
1379 : #[cfg(feature = "testing")]
1380 : (
1381 : BatchedFeMessage::Test {
1382 : requests: accum_requests,
1383 : ..
1384 : },
1385 : BatchedFeMessage::Test {
1386 : requests: this_requests,
1387 : ..
1388 : },
1389 : ) => {
1390 : accum_requests.extend(this_requests);
1391 : Ok(())
1392 : }
1393 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1394 : _ => unreachable!(),
1395 : }
1396 : }
1397 : }
1398 : }
1399 :
1400 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1401 : async fn pagestream_handle_batched_message<IO>(
1402 : &mut self,
1403 : pgb_writer: &mut PostgresBackend<IO>,
1404 : batch: BatchedFeMessage,
1405 : io_concurrency: IoConcurrency,
1406 : cancel: &CancellationToken,
1407 : protocol_version: PagestreamProtocolVersion,
1408 : ctx: &RequestContext,
1409 : ) -> Result<(), QueryError>
1410 : where
1411 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1412 : {
1413 : let started_at = Instant::now();
1414 : let batch = {
1415 : let mut batch = batch;
1416 : batch.observe_execution_start(started_at);
1417 : batch
1418 : };
1419 :
1420 : // Dispatch the batch to the appropriate request handler.
1421 : let log_slow_name = batch.as_static_str();
1422 : let (mut handler_results, span) = {
1423 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1424 : // won't fit on the stack.
1425 : let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
1426 : batch,
1427 : io_concurrency,
1428 : ctx,
1429 : ));
1430 : log_slow(
1431 : log_slow_name,
1432 : LOG_SLOW_GETPAGE_THRESHOLD,
1433 : boxpinned.as_mut(),
1434 : )
1435 : .await?
1436 : };
1437 :
1438 : // We purposefully don't count flush time into the smgr operation timer.
1439 : //
1440 : // The reason is that current compute client will not perform protocol processing
1441 : // if the postgres backend process is doing things other than `->smgr_read()`.
1442 : // This is especially the case for prefetch.
1443 : //
1444 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1445 : // all the way into our flush call below.
1446 : //
1447 : // The timer's underlying metric is used for a storage-internal latency SLO and
1448 : // we don't want to include latency in it that we can't control.
1449 : // And as pointed out above, in this case, we don't control the time that flush will take.
1450 : //
1451 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1452 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1453 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1454 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1455 : let flush_timers = {
1456 : let flushing_start_time = Instant::now();
1457 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1458 : for handler_result in &mut handler_results {
1459 : let flush_timer = match handler_result {
1460 : Ok((_response, timer, _ctx)) => Some(
1461 : timer
1462 : .observe_execution_end(flushing_start_time)
1463 : .expect("we are the first caller"),
1464 : ),
1465 : Err(_) => {
1466 : // TODO: measure errors
1467 : None
1468 : }
1469 : };
1470 : flush_timers.push(flush_timer);
1471 : }
1472 : assert_eq!(flush_timers.len(), handler_results.len());
1473 : flush_timers
1474 : };
1475 :
1476 : // Map handler result to protocol behavior.
1477 : // Some handler errors cause exit from pagestream protocol.
1478 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1479 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1480 : let (response_msg, ctx) = match handler_result {
1481 : Err(e) => match &e.err {
1482 : PageStreamError::Shutdown => {
1483 : // BEGIN HADRON
1484 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1485 : .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
1486 : .inc();
1487 : // END HADRON
1488 :
1489 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1490 : // shutdown, then do not send the error to the client. Instead just drop the
1491 : // connection.
1492 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1493 : return Err(QueryError::Shutdown);
1494 : }
1495 : PageStreamError::Reconnect(_reason) => {
1496 0 : span.in_scope(|| {
1497 : // BEGIN HADRON
1498 : // We can get here because the compute node is pointing at the wrong PS. We
1499 : // already have a metric to keep track of this so suppressing this log to
1500 : // reduce log spam. The information in this log message is not going to be that
1501 : // helpful given the volume of logs that can be generated.
1502 : // info!("handler requested reconnect: {reason}")
1503 : // END HADRON
1504 0 : });
1505 : // BEGIN HADRON
1506 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1507 : .with_label_values(&[
1508 : metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
1509 : ])
1510 : .inc();
1511 : // END HADRON
1512 : return Err(QueryError::Reconnect);
1513 : }
1514 : PageStreamError::Read(_)
1515 : | PageStreamError::LsnTimeout(_)
1516 : | PageStreamError::NotFound(_)
1517 : | PageStreamError::BadRequest(_) => {
1518 : // BEGIN HADRON
1519 : if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
1520 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1521 : .with_label_values(&[
1522 : metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
1523 : ])
1524 : .inc();
1525 : } else {
1526 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1527 : .with_label_values(&[
1528 : metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
1529 : ])
1530 : .inc();
1531 : }
1532 : // END HADRON
1533 :
1534 : // print the all details to the log with {:#}, but for the client the
1535 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1536 : // here includes cancellation which is not an error.
1537 : let full = utils::error::report_compact_sources(&e.err);
1538 0 : span.in_scope(|| {
1539 0 : error!("error reading relation or page version: {full:#}")
1540 0 : });
1541 :
1542 : (
1543 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1544 : req: e.req,
1545 : message: e.err.to_string(),
1546 : }),
1547 : None,
1548 : )
1549 : }
1550 : },
1551 : Ok((response_msg, _op_timer_already_observed, ctx)) => {
1552 : // BEGIN HADRON
1553 : PAGESTREAM_HANDLER_RESULTS_TOTAL
1554 : .with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
1555 : .inc();
1556 : // END HADRON
1557 :
1558 : (response_msg, Some(ctx))
1559 : }
1560 : };
1561 :
1562 0 : let ctx = ctx.map(|req_ctx| {
1563 0 : RequestContextBuilder::from(&req_ctx)
1564 0 : .perf_span(|crnt_perf_span| {
1565 0 : info_span!(
1566 : target: PERF_TRACE_TARGET,
1567 0 : parent: crnt_perf_span,
1568 : "FLUSH_RESPONSE",
1569 : )
1570 0 : })
1571 0 : .attached_child()
1572 0 : });
1573 :
1574 : //
1575 : // marshal & transmit response message
1576 : //
1577 :
1578 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1579 : &response_msg.serialize(protocol_version),
1580 : ))?;
1581 :
1582 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1583 :
1584 : // what we want to do
1585 : let socket_fd = pgb_writer.socket_fd;
1586 : let flush_fut = pgb_writer.flush();
1587 : // metric for how long flushing takes
1588 : let flush_fut = match flushing_timer {
1589 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1590 : Instant::now(),
1591 : flush_fut,
1592 : socket_fd,
1593 : )),
1594 : None => futures::future::Either::Right(flush_fut),
1595 : };
1596 :
1597 : let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
1598 : futures::future::Either::Left(
1599 0 : flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
1600 0 : current_perf_span.clone()
1601 0 : }),
1602 : )
1603 : } else {
1604 : futures::future::Either::Right(flush_fut)
1605 : };
1606 :
1607 : // do it while respecting cancellation
1608 0 : let _: () = async move {
1609 0 : tokio::select! {
1610 : biased;
1611 0 : _ = cancel.cancelled() => {
1612 : // We were requested to shut down.
1613 0 : info!("shutdown request received in page handler");
1614 0 : return Err(QueryError::Shutdown)
1615 : }
1616 0 : res = flush_fut => {
1617 0 : res?;
1618 : }
1619 : }
1620 0 : Ok(())
1621 0 : }
1622 : .await?;
1623 : }
1624 : Ok(())
1625 : }
1626 :
1627 : /// Helper which dispatches a batched message to the appropriate handler.
1628 : /// Returns a vec of results, along with the extracted trace span.
1629 0 : async fn pagestream_dispatch_batched_message(
1630 0 : batch: BatchedFeMessage,
1631 0 : io_concurrency: IoConcurrency,
1632 0 : ctx: &RequestContext,
1633 0 : ) -> Result<
1634 0 : (
1635 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
1636 0 : Span,
1637 0 : ),
1638 0 : QueryError,
1639 0 : > {
1640 : macro_rules! upgrade_handle_and_set_context {
1641 : ($shard:ident) => {{
1642 : let weak_handle = &$shard;
1643 : let handle = weak_handle.upgrade()?;
1644 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1645 : (handle, ctx)
1646 : }};
1647 : }
1648 0 : Ok(match batch {
1649 : BatchedFeMessage::Exists {
1650 0 : span,
1651 0 : timer,
1652 0 : shard,
1653 0 : req,
1654 : } => {
1655 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1656 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1657 : (
1658 0 : vec![
1659 0 : Self::handle_get_rel_exists_request(&shard, &req, &ctx)
1660 0 : .instrument(span.clone())
1661 0 : .await
1662 0 : .map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
1663 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1664 : ],
1665 0 : span,
1666 : )
1667 : }
1668 : BatchedFeMessage::Nblocks {
1669 0 : span,
1670 0 : timer,
1671 0 : shard,
1672 0 : req,
1673 : } => {
1674 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1675 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1676 : (
1677 0 : vec![
1678 0 : Self::handle_get_nblocks_request(&shard, &req, false, &ctx)
1679 0 : .instrument(span.clone())
1680 0 : .await
1681 0 : .map(|msg| msg.expect("allow_missing=false"))
1682 0 : .map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
1683 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1684 : ],
1685 0 : span,
1686 : )
1687 : }
1688 : BatchedFeMessage::GetPage {
1689 0 : span,
1690 0 : shard,
1691 0 : applied_gc_cutoff_guard,
1692 0 : pages,
1693 0 : batch_break_reason,
1694 : } => {
1695 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1696 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1697 : (
1698 : {
1699 0 : let npages = pages.len();
1700 0 : trace!(npages, "handling getpage request");
1701 0 : let res = Self::handle_get_page_at_lsn_request_batched(
1702 0 : &shard,
1703 0 : pages,
1704 0 : io_concurrency,
1705 0 : batch_break_reason,
1706 0 : &ctx,
1707 0 : )
1708 0 : .instrument(span.clone())
1709 0 : .await;
1710 0 : assert_eq!(res.len(), npages);
1711 0 : drop(applied_gc_cutoff_guard);
1712 0 : res
1713 : },
1714 0 : span,
1715 : )
1716 : }
1717 : BatchedFeMessage::DbSize {
1718 0 : span,
1719 0 : timer,
1720 0 : shard,
1721 0 : req,
1722 : } => {
1723 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1724 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1725 : (
1726 0 : vec![
1727 0 : Self::handle_db_size_request(&shard, &req, &ctx)
1728 0 : .instrument(span.clone())
1729 0 : .await
1730 0 : .map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
1731 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1732 : ],
1733 0 : span,
1734 : )
1735 : }
1736 : BatchedFeMessage::GetSlruSegment {
1737 0 : span,
1738 0 : timer,
1739 0 : shard,
1740 0 : req,
1741 : } => {
1742 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1743 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1744 : (
1745 0 : vec![
1746 0 : Self::handle_get_slru_segment_request(&shard, &req, &ctx)
1747 0 : .instrument(span.clone())
1748 0 : .await
1749 0 : .map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
1750 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1751 : ],
1752 0 : span,
1753 : )
1754 : }
1755 : #[cfg(feature = "testing")]
1756 : BatchedFeMessage::Test {
1757 0 : span,
1758 0 : shard,
1759 0 : requests,
1760 : } => {
1761 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1762 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1763 : (
1764 : {
1765 0 : let npages = requests.len();
1766 0 : trace!(npages, "handling getpage request");
1767 0 : let res = Self::handle_test_request_batch(&shard, requests, &ctx)
1768 0 : .instrument(span.clone())
1769 0 : .await;
1770 0 : assert_eq!(res.len(), npages);
1771 0 : res
1772 : },
1773 0 : span,
1774 : )
1775 : }
1776 0 : BatchedFeMessage::RespondError { span, error } => {
1777 : // We've already decided to respond with an error, so we don't need to
1778 : // call the handler.
1779 0 : (vec![Err(error)], span)
1780 : }
1781 : })
1782 0 : }
1783 :
1784 : /// Pagestream sub-protocol handler.
1785 : ///
1786 : /// It is a simple request-response protocol inside a COPYBOTH session.
1787 : ///
1788 : /// # Coding Discipline
1789 : ///
1790 : /// Coding discipline within this function: all interaction with the `pgb` connection
1791 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1792 : /// This is so that we can shutdown page_service quickly.
1793 : #[instrument(skip_all, fields(hold_gc_cutoff_guard))]
1794 : async fn handle_pagerequests<IO>(
1795 : &mut self,
1796 : pgb: &mut PostgresBackend<IO>,
1797 : tenant_id: TenantId,
1798 : timeline_id: TimelineId,
1799 : protocol_version: PagestreamProtocolVersion,
1800 : ctx: RequestContext,
1801 : ) -> Result<(), QueryError>
1802 : where
1803 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1804 : {
1805 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1806 :
1807 : // switch client to COPYBOTH
1808 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1809 : tokio::select! {
1810 : biased;
1811 : _ = self.cancel.cancelled() => {
1812 : return Err(QueryError::Shutdown)
1813 : }
1814 : res = pgb.flush() => {
1815 : res?;
1816 : }
1817 : }
1818 :
1819 : let io_concurrency = IoConcurrency::spawn_from_conf(
1820 : self.get_vectored_concurrent_io,
1821 : match self.gate_guard.try_clone() {
1822 : Ok(guard) => guard,
1823 : Err(_) => {
1824 : info!("shutdown request received in page handler");
1825 : return Err(QueryError::Shutdown);
1826 : }
1827 : },
1828 : );
1829 :
1830 : let pgb_reader = pgb
1831 : .split()
1832 : .context("implementation error: split pgb into reader and writer")?;
1833 :
1834 : let timeline_handles = self
1835 : .timeline_handles
1836 : .take()
1837 : .expect("implementation error: timeline_handles should not be locked");
1838 :
1839 : // Evaluate the expensive feature resolver check once per pagestream subprotocol handling
1840 : // instead of once per GetPage request. This is shared between pipelined and serial paths.
1841 : let hold_gc_cutoff_guard = if cfg!(test) || cfg!(feature = "testing") {
1842 : HoldAppliedGcCutoffGuard::Yes
1843 : } else {
1844 : // Use the global feature resolver with the tenant ID directly, avoiding the need
1845 : // to get a timeline/shard which might not be available on this pageserver node.
1846 : let empty_properties = std::collections::HashMap::new();
1847 : match self.feature_resolver.evaluate_boolean(
1848 : "page-service-getpage-hold-applied-gc-cutoff-guard",
1849 : tenant_id,
1850 : &empty_properties,
1851 : ) {
1852 : Ok(()) => HoldAppliedGcCutoffGuard::Yes,
1853 : Err(_) => HoldAppliedGcCutoffGuard::No,
1854 : }
1855 : };
1856 : // record it in the span of handle_pagerequests so that both the request_span
1857 : // and the pipeline implementation spans contains the field.
1858 : Span::current().record(
1859 : "hold_gc_cutoff_guard",
1860 : tracing::field::debug(&hold_gc_cutoff_guard),
1861 : );
1862 :
1863 : let request_span = info_span!("request");
1864 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1865 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1866 : self.handle_pagerequests_pipelined(
1867 : pgb,
1868 : pgb_reader,
1869 : tenant_id,
1870 : timeline_id,
1871 : timeline_handles,
1872 : request_span,
1873 : pipelining_config,
1874 : protocol_version,
1875 : io_concurrency,
1876 : hold_gc_cutoff_guard,
1877 : &ctx,
1878 : )
1879 : .await
1880 : }
1881 : PageServicePipeliningConfig::Serial => {
1882 : self.handle_pagerequests_serial(
1883 : pgb,
1884 : pgb_reader,
1885 : tenant_id,
1886 : timeline_id,
1887 : timeline_handles,
1888 : request_span,
1889 : protocol_version,
1890 : io_concurrency,
1891 : hold_gc_cutoff_guard,
1892 : &ctx,
1893 : )
1894 : .await
1895 : }
1896 : };
1897 :
1898 : debug!("pagestream subprotocol shut down cleanly");
1899 :
1900 : pgb.unsplit(pgb_reader)
1901 : .context("implementation error: unsplit pgb")?;
1902 :
1903 : let replaced = self.timeline_handles.replace(timeline_handles);
1904 : assert!(replaced.is_none());
1905 :
1906 : result
1907 : }
1908 :
1909 : #[allow(clippy::too_many_arguments)]
1910 0 : async fn handle_pagerequests_serial<IO>(
1911 0 : &mut self,
1912 0 : pgb_writer: &mut PostgresBackend<IO>,
1913 0 : mut pgb_reader: PostgresBackendReader<IO>,
1914 0 : tenant_id: TenantId,
1915 0 : timeline_id: TimelineId,
1916 0 : mut timeline_handles: TimelineHandles,
1917 0 : request_span: Span,
1918 0 : protocol_version: PagestreamProtocolVersion,
1919 0 : io_concurrency: IoConcurrency,
1920 0 : hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
1921 0 : ctx: &RequestContext,
1922 0 : ) -> (
1923 0 : (PostgresBackendReader<IO>, TimelineHandles),
1924 0 : Result<(), QueryError>,
1925 0 : )
1926 0 : where
1927 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1928 0 : {
1929 0 : let cancel = self.cancel.clone();
1930 :
1931 0 : let err = loop {
1932 0 : let msg = Self::pagestream_read_message(
1933 0 : &mut pgb_reader,
1934 0 : tenant_id,
1935 0 : timeline_id,
1936 0 : &mut timeline_handles,
1937 0 : &self.perf_span_fields,
1938 0 : &cancel,
1939 0 : ctx,
1940 0 : protocol_version,
1941 0 : request_span.clone(),
1942 0 : hold_gc_cutoff_guard,
1943 0 : )
1944 0 : .await;
1945 0 : let msg = match msg {
1946 0 : Ok(msg) => msg,
1947 0 : Err(e) => break e,
1948 : };
1949 0 : let msg = match msg {
1950 0 : Some(msg) => msg,
1951 : None => {
1952 0 : debug!("pagestream subprotocol end observed");
1953 0 : return ((pgb_reader, timeline_handles), Ok(()));
1954 : }
1955 : };
1956 :
1957 0 : let result = self
1958 0 : .pagestream_handle_batched_message(
1959 0 : pgb_writer,
1960 0 : msg,
1961 0 : io_concurrency.clone(),
1962 0 : &cancel,
1963 0 : protocol_version,
1964 0 : ctx,
1965 0 : )
1966 0 : .await;
1967 0 : match result {
1968 0 : Ok(()) => {}
1969 0 : Err(e) => break e,
1970 : }
1971 : };
1972 0 : ((pgb_reader, timeline_handles), Err(err))
1973 0 : }
1974 :
1975 : /// # Cancel-Safety
1976 : ///
1977 : /// May leak tokio tasks if not polled to completion.
1978 : #[allow(clippy::too_many_arguments)]
1979 0 : async fn handle_pagerequests_pipelined<IO>(
1980 0 : &mut self,
1981 0 : pgb_writer: &mut PostgresBackend<IO>,
1982 0 : pgb_reader: PostgresBackendReader<IO>,
1983 0 : tenant_id: TenantId,
1984 0 : timeline_id: TimelineId,
1985 0 : mut timeline_handles: TimelineHandles,
1986 0 : request_span: Span,
1987 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1988 0 : protocol_version: PagestreamProtocolVersion,
1989 0 : io_concurrency: IoConcurrency,
1990 0 : hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
1991 0 : ctx: &RequestContext,
1992 0 : ) -> (
1993 0 : (PostgresBackendReader<IO>, TimelineHandles),
1994 0 : Result<(), QueryError>,
1995 0 : )
1996 0 : where
1997 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1998 0 : {
1999 : //
2000 : // Pipelined pagestream handling consists of
2001 : // - a Batcher that reads requests off the wire and
2002 : // and batches them if possible,
2003 : // - an Executor that processes the batched requests.
2004 : //
2005 : // The batch is built up inside an `spsc_fold` channel,
2006 : // shared betwen Batcher (Sender) and Executor (Receiver).
2007 : //
2008 : // The Batcher continously folds client requests into the batch,
2009 : // while the Executor can at any time take out what's in the batch
2010 : // in order to process it.
2011 : // This means the next batch builds up while the Executor
2012 : // executes the last batch.
2013 : //
2014 : // CANCELLATION
2015 : //
2016 : // We run both Batcher and Executor futures to completion before
2017 : // returning from this function.
2018 : //
2019 : // If Executor exits first, it signals cancellation to the Batcher
2020 : // via a CancellationToken that is child of `self.cancel`.
2021 : // If Batcher exits first, it signals cancellation to the Executor
2022 : // by dropping the spsc_fold channel Sender.
2023 : //
2024 : // CLEAN SHUTDOWN
2025 : //
2026 : // Clean shutdown means that the client ends the COPYBOTH session.
2027 : // In response to such a client message, the Batcher exits.
2028 : // The Executor continues to run, draining the spsc_fold channel.
2029 : // Once drained, the spsc_fold recv will fail with a distinct error
2030 : // indicating that the sender disconnected.
2031 : // The Executor exits with Ok(()) in response to that error.
2032 : //
2033 : // Server initiated shutdown is not clean shutdown, but instead
2034 : // is an error Err(QueryError::Shutdown) that is propagated through
2035 : // error propagation.
2036 : //
2037 : // ERROR PROPAGATION
2038 : //
2039 : // When the Batcher encounter an error, it sends it as a value
2040 : // through the spsc_fold channel and exits afterwards.
2041 : // When the Executor observes such an error in the channel,
2042 : // it exits returning that error value.
2043 : //
2044 : // This design ensures that the Executor stage will still process
2045 : // the batch that was in flight when the Batcher encountered an error,
2046 : // thereby beahving identical to a serial implementation.
2047 :
2048 : let PageServicePipeliningConfigPipelined {
2049 0 : max_batch_size,
2050 0 : execution,
2051 0 : batching: batching_strategy,
2052 0 : } = pipelining_config;
2053 :
2054 : // Macro to _define_ a pipeline stage.
2055 : macro_rules! pipeline_stage {
2056 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
2057 : let cancel: CancellationToken = $cancel;
2058 : let stage_fut = $make_fut(cancel.clone());
2059 0 : async move {
2060 0 : scopeguard::defer! {
2061 : debug!("exiting");
2062 : }
2063 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
2064 0 : .await
2065 0 : }
2066 : .instrument(tracing::info_span!($name))
2067 : }};
2068 : }
2069 :
2070 : //
2071 : // Batcher
2072 : //
2073 :
2074 0 : let perf_span_fields = self.perf_span_fields.clone();
2075 :
2076 0 : let cancel_batcher = self.cancel.child_token();
2077 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
2078 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
2079 0 : let ctx = ctx.attached_child();
2080 0 : async move {
2081 0 : let mut pgb_reader = pgb_reader;
2082 0 : let mut exit = false;
2083 0 : while !exit {
2084 0 : let read_res = Self::pagestream_read_message(
2085 0 : &mut pgb_reader,
2086 0 : tenant_id,
2087 0 : timeline_id,
2088 0 : &mut timeline_handles,
2089 0 : &perf_span_fields,
2090 0 : &cancel_batcher,
2091 0 : &ctx,
2092 0 : protocol_version,
2093 0 : request_span.clone(),
2094 0 : hold_gc_cutoff_guard,
2095 0 : )
2096 0 : .await;
2097 0 : let Some(read_res) = read_res.transpose() else {
2098 0 : debug!("client-initiated shutdown");
2099 0 : break;
2100 : };
2101 0 : exit |= read_res.is_err();
2102 0 : let could_send = batch_tx
2103 0 : .send(read_res, |batch, res| {
2104 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
2105 0 : })
2106 0 : .await;
2107 0 : exit |= could_send.is_err();
2108 : }
2109 0 : (pgb_reader, timeline_handles)
2110 0 : }
2111 0 : });
2112 :
2113 : //
2114 : // Executor
2115 : //
2116 :
2117 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
2118 0 : let ctx = ctx.attached_child();
2119 0 : async move {
2120 0 : let _cancel_batcher = cancel_batcher.drop_guard();
2121 : loop {
2122 0 : let maybe_batch = batch_rx.recv().await;
2123 0 : let batch = match maybe_batch {
2124 0 : Ok(batch) => batch,
2125 : Err(spsc_fold::RecvError::SenderGone) => {
2126 0 : debug!("upstream gone");
2127 0 : return Ok(());
2128 : }
2129 : };
2130 0 : let mut batch = match batch {
2131 0 : Ok(batch) => batch,
2132 0 : Err(e) => {
2133 0 : return Err(e);
2134 : }
2135 : };
2136 :
2137 : if let BatchedFeMessage::GetPage {
2138 0 : pages,
2139 : span: _,
2140 : shard: _,
2141 : applied_gc_cutoff_guard: _,
2142 : batch_break_reason: _,
2143 0 : } = &mut batch
2144 : {
2145 0 : for req in pages {
2146 0 : req.batch_wait_ctx.take();
2147 0 : }
2148 0 : }
2149 :
2150 0 : self.pagestream_handle_batched_message(
2151 0 : pgb_writer,
2152 0 : batch,
2153 0 : io_concurrency.clone(),
2154 0 : &cancel,
2155 0 : protocol_version,
2156 0 : &ctx,
2157 0 : )
2158 0 : .await?;
2159 : }
2160 0 : }
2161 0 : });
2162 :
2163 : //
2164 : // Execute the stages.
2165 : //
2166 :
2167 0 : match execution {
2168 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
2169 0 : tokio::join!(batcher, executor)
2170 : }
2171 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
2172 : // These tasks are not tracked anywhere.
2173 0 : let read_messages_task = tokio::spawn(batcher);
2174 0 : let (read_messages_task_res, executor_res_) =
2175 0 : tokio::join!(read_messages_task, executor,);
2176 0 : (
2177 0 : read_messages_task_res.expect("propagated panic from read_messages"),
2178 0 : executor_res_,
2179 0 : )
2180 : }
2181 : }
2182 0 : }
2183 :
2184 : /// Helper function to handle the LSN from client request.
2185 : ///
2186 : /// Each GetPage (and Exists and Nblocks) request includes information about
2187 : /// which version of the page is being requested. The primary compute node
2188 : /// will always request the latest page version, by setting 'request_lsn' to
2189 : /// the last inserted or flushed WAL position, while a standby will request
2190 : /// a version at the LSN that it's currently caught up to.
2191 : ///
2192 : /// In either case, if the page server hasn't received the WAL up to the
2193 : /// requested LSN yet, we will wait for it to arrive. The return value is
2194 : /// the LSN that should be used to look up the page versions.
2195 : ///
2196 : /// In addition to the request LSN, each request carries another LSN,
2197 : /// 'not_modified_since', which is a hint to the pageserver that the client
2198 : /// knows that the page has not been modified between 'not_modified_since'
2199 : /// and the request LSN. This allows skipping the wait, as long as the WAL
2200 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
2201 : /// information about when the page was modified, it will use
2202 : /// not_modified_since == lsn. If the client lies and sends a too low
2203 : /// not_modified_hint such that there are in fact later page versions, the
2204 : /// behavior is undefined: the pageserver may return any of the page versions
2205 : /// or an error.
2206 0 : async fn wait_or_get_last_lsn(
2207 0 : timeline: &Timeline,
2208 0 : request_lsn: Lsn,
2209 0 : not_modified_since: Lsn,
2210 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2211 0 : ctx: &RequestContext,
2212 0 : ) -> Result<Lsn, PageStreamError> {
2213 0 : let last_record_lsn = timeline.get_last_record_lsn();
2214 0 : let effective_request_lsn = Self::effective_request_lsn(
2215 0 : timeline,
2216 0 : last_record_lsn,
2217 0 : request_lsn,
2218 0 : not_modified_since,
2219 0 : latest_gc_cutoff_lsn,
2220 0 : )?;
2221 :
2222 0 : if effective_request_lsn > last_record_lsn {
2223 0 : timeline
2224 0 : .wait_lsn(
2225 0 : not_modified_since,
2226 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2227 0 : timeline::WaitLsnTimeout::Default,
2228 0 : ctx,
2229 0 : )
2230 0 : .await?;
2231 :
2232 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
2233 : // record LSN. (Or close enough for our purposes; the last-record LSN can
2234 : // advance immediately after we return anyway)
2235 0 : }
2236 :
2237 0 : Ok(effective_request_lsn)
2238 0 : }
2239 :
2240 0 : fn effective_request_lsn(
2241 0 : timeline: &Timeline,
2242 0 : last_record_lsn: Lsn,
2243 0 : request_lsn: Lsn,
2244 0 : not_modified_since: Lsn,
2245 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2246 0 : ) -> Result<Lsn, PageStreamError> {
2247 : // Sanity check the request
2248 0 : if request_lsn < not_modified_since {
2249 0 : return Err(PageStreamError::BadRequest(
2250 0 : format!(
2251 0 : "invalid request with request LSN {request_lsn} and not_modified_since {not_modified_since}",
2252 0 : )
2253 0 : .into(),
2254 0 : ));
2255 0 : }
2256 :
2257 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2258 0 : if request_lsn == Lsn::INVALID {
2259 0 : return Err(PageStreamError::BadRequest(
2260 0 : "invalid LSN(0) in request".into(),
2261 0 : ));
2262 0 : }
2263 :
2264 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2265 : //
2266 : // We may have older data available, but we make a best effort to detect this case and return an error,
2267 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2268 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2269 0 : let gc_info = &timeline.gc_info.read().unwrap();
2270 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2271 0 : return Err(
2272 0 : PageStreamError::BadRequest(format!(
2273 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2274 0 : request_lsn, **latest_gc_cutoff_lsn
2275 0 : ).into())
2276 0 : );
2277 0 : }
2278 0 : }
2279 :
2280 0 : if not_modified_since > last_record_lsn {
2281 0 : Ok(not_modified_since)
2282 : } else {
2283 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2284 : // here instead. That would give the same result, since we know that there
2285 : // haven't been any modifications since 'not_modified_since'. Using an older
2286 : // LSN might be faster, because that could allow skipping recent layers when
2287 : // finding the page. However, we have historically used 'last_record_lsn', so
2288 : // stick to that for now.
2289 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2290 : }
2291 0 : }
2292 :
2293 : /// Handles the lsn lease request.
2294 : /// If a lease cannot be obtained, the client will receive NULL.
2295 : #[instrument(skip_all, fields(shard_id, %lsn))]
2296 : async fn handle_make_lsn_lease<IO>(
2297 : &mut self,
2298 : pgb: &mut PostgresBackend<IO>,
2299 : tenant_shard_id: TenantShardId,
2300 : timeline_id: TimelineId,
2301 : lsn: Lsn,
2302 : ctx: &RequestContext,
2303 : ) -> Result<(), QueryError>
2304 : where
2305 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2306 : {
2307 : let timeline = self
2308 : .timeline_handles
2309 : .as_mut()
2310 : .unwrap()
2311 : .get(
2312 : tenant_shard_id.tenant_id,
2313 : timeline_id,
2314 : ShardSelector::Known(tenant_shard_id.to_index()),
2315 : )
2316 : .await?;
2317 : set_tracing_field_shard_id(&timeline);
2318 :
2319 : let lease = timeline
2320 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2321 0 : .inspect_err(|e| {
2322 0 : warn!("{e}");
2323 0 : })
2324 : .ok();
2325 0 : let valid_until_str = lease.map(|l| {
2326 0 : l.valid_until
2327 0 : .duration_since(SystemTime::UNIX_EPOCH)
2328 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2329 0 : .as_millis()
2330 0 : .to_string()
2331 0 : });
2332 :
2333 : info!(
2334 : "acquired lease for {} until {}",
2335 : lsn,
2336 : valid_until_str.as_deref().unwrap_or("<unknown>")
2337 : );
2338 :
2339 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2340 :
2341 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2342 : b"valid_until",
2343 : )]))?
2344 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2345 :
2346 : Ok(())
2347 : }
2348 :
2349 : #[instrument(skip_all, fields(shard_id))]
2350 : async fn handle_get_rel_exists_request(
2351 : timeline: &Timeline,
2352 : req: &PagestreamExistsRequest,
2353 : ctx: &RequestContext,
2354 : ) -> Result<PagestreamExistsResponse, PageStreamError> {
2355 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2356 : let lsn = Self::wait_or_get_last_lsn(
2357 : timeline,
2358 : req.hdr.request_lsn,
2359 : req.hdr.not_modified_since,
2360 : &latest_gc_cutoff_lsn,
2361 : ctx,
2362 : )
2363 : .await?;
2364 :
2365 : let exists = timeline
2366 : .get_rel_exists(
2367 : req.rel,
2368 : Version::LsnRange(LsnRange {
2369 : effective_lsn: lsn,
2370 : request_lsn: req.hdr.request_lsn,
2371 : }),
2372 : ctx,
2373 : )
2374 : .await?;
2375 :
2376 : Ok(PagestreamExistsResponse { req: *req, exists })
2377 : }
2378 :
2379 : /// If `allow_missing` is true, returns None instead of Err on missing relations. Otherwise,
2380 : /// never returns None. It is only supported by the gRPC protocol, so we pass it separately to
2381 : /// avoid changing the libpq protocol types.
2382 : #[instrument(skip_all, fields(shard_id))]
2383 : async fn handle_get_nblocks_request(
2384 : timeline: &Timeline,
2385 : req: &PagestreamNblocksRequest,
2386 : allow_missing: bool,
2387 : ctx: &RequestContext,
2388 : ) -> Result<Option<PagestreamNblocksResponse>, PageStreamError> {
2389 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2390 : let lsn = Self::wait_or_get_last_lsn(
2391 : timeline,
2392 : req.hdr.request_lsn,
2393 : req.hdr.not_modified_since,
2394 : &latest_gc_cutoff_lsn,
2395 : ctx,
2396 : )
2397 : .await?;
2398 :
2399 : let n_blocks = timeline
2400 : .get_rel_size_in_reldir(
2401 : req.rel,
2402 : Version::LsnRange(LsnRange {
2403 : effective_lsn: lsn,
2404 : request_lsn: req.hdr.request_lsn,
2405 : }),
2406 : None,
2407 : allow_missing,
2408 : ctx,
2409 : )
2410 : .await?;
2411 : let Some(n_blocks) = n_blocks else {
2412 : return Ok(None);
2413 : };
2414 :
2415 : Ok(Some(PagestreamNblocksResponse {
2416 : req: *req,
2417 : n_blocks,
2418 : }))
2419 : }
2420 :
2421 : #[instrument(skip_all, fields(shard_id))]
2422 : async fn handle_db_size_request(
2423 : timeline: &Timeline,
2424 : req: &PagestreamDbSizeRequest,
2425 : ctx: &RequestContext,
2426 : ) -> Result<PagestreamDbSizeResponse, PageStreamError> {
2427 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2428 : let lsn = Self::wait_or_get_last_lsn(
2429 : timeline,
2430 : req.hdr.request_lsn,
2431 : req.hdr.not_modified_since,
2432 : &latest_gc_cutoff_lsn,
2433 : ctx,
2434 : )
2435 : .await?;
2436 :
2437 : let total_blocks = timeline
2438 : .get_db_size(
2439 : DEFAULTTABLESPACE_OID,
2440 : req.dbnode,
2441 : Version::LsnRange(LsnRange {
2442 : effective_lsn: lsn,
2443 : request_lsn: req.hdr.request_lsn,
2444 : }),
2445 : ctx,
2446 : )
2447 : .await?;
2448 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2449 :
2450 : Ok(PagestreamDbSizeResponse { req: *req, db_size })
2451 : }
2452 :
2453 : #[instrument(skip_all)]
2454 : async fn handle_get_page_at_lsn_request_batched(
2455 : timeline: &Timeline,
2456 : requests: SmallVec<[BatchedGetPageRequest; 1]>,
2457 : io_concurrency: IoConcurrency,
2458 : batch_break_reason: GetPageBatchBreakReason,
2459 : ctx: &RequestContext,
2460 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2461 : {
2462 : debug_assert_current_span_has_tenant_and_timeline_id();
2463 :
2464 : timeline
2465 : .query_metrics
2466 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2467 :
2468 : // If a page trace is running, submit an event for this request.
2469 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2470 : let time = SystemTime::now();
2471 : for batch in &requests {
2472 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2473 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2474 : _ = page_trace.try_send(PageTraceEvent {
2475 : key,
2476 : effective_lsn: batch.lsn_range.effective_lsn,
2477 : time,
2478 : });
2479 : }
2480 : }
2481 :
2482 : // If any request in the batch needs to wait for LSN, then do so now.
2483 : let mut perf_instrument = false;
2484 : let max_effective_lsn = requests
2485 : .iter()
2486 0 : .map(|req| {
2487 0 : if req.ctx.has_perf_span() {
2488 0 : perf_instrument = true;
2489 0 : }
2490 :
2491 0 : req.lsn_range.effective_lsn
2492 0 : })
2493 : .max()
2494 : .expect("batch is never empty");
2495 :
2496 : let ctx = match perf_instrument {
2497 : true => RequestContextBuilder::from(ctx)
2498 0 : .root_perf_span(|| {
2499 0 : info_span!(
2500 : target: PERF_TRACE_TARGET,
2501 : "GET_VECTORED",
2502 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2503 : timeline_id = %timeline.timeline_id,
2504 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2505 : %max_effective_lsn
2506 : )
2507 0 : })
2508 : .attached_child(),
2509 : false => ctx.attached_child(),
2510 : };
2511 :
2512 : let last_record_lsn = timeline.get_last_record_lsn();
2513 : if max_effective_lsn > last_record_lsn {
2514 : if let Err(e) = timeline
2515 : .wait_lsn(
2516 : max_effective_lsn,
2517 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2518 : timeline::WaitLsnTimeout::Default,
2519 : &ctx,
2520 : )
2521 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2522 0 : info_span!(
2523 : target: PERF_TRACE_TARGET,
2524 0 : parent: current_perf_span,
2525 : "WAIT_LSN",
2526 : )
2527 0 : })
2528 : .await
2529 : {
2530 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2531 0 : Err(BatchedPageStreamError {
2532 0 : err: PageStreamError::from(e.clone()),
2533 0 : req: req.req.hdr,
2534 0 : })
2535 0 : }));
2536 : }
2537 : }
2538 :
2539 : let results = timeline
2540 : .get_rel_page_at_lsn_batched(
2541 0 : requests.iter().map(|p| {
2542 0 : (
2543 0 : &p.req.rel,
2544 0 : &p.req.blkno,
2545 0 : p.lsn_range,
2546 0 : p.ctx.attached_child(),
2547 0 : )
2548 0 : }),
2549 : io_concurrency,
2550 : &ctx,
2551 : )
2552 : .await;
2553 : assert_eq!(results.len(), requests.len());
2554 :
2555 : // TODO: avoid creating the new Vec here
2556 : Vec::from_iter(
2557 : requests
2558 : .into_iter()
2559 : .zip(results.into_iter())
2560 0 : .map(|(req, res)| {
2561 0 : res.map(|page| {
2562 0 : (
2563 0 : PagestreamBeMessage::GetPage(
2564 0 : pagestream_api::PagestreamGetPageResponse { req: req.req, page },
2565 0 : ),
2566 0 : req.timer,
2567 0 : req.ctx,
2568 0 : )
2569 0 : })
2570 0 : .map_err(|e| BatchedPageStreamError {
2571 0 : err: PageStreamError::from(e),
2572 0 : req: req.req.hdr,
2573 0 : })
2574 0 : }),
2575 : )
2576 : }
2577 :
2578 : #[instrument(skip_all, fields(shard_id))]
2579 : async fn handle_get_slru_segment_request(
2580 : timeline: &Timeline,
2581 : req: &PagestreamGetSlruSegmentRequest,
2582 : ctx: &RequestContext,
2583 : ) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
2584 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2585 : let lsn = Self::wait_or_get_last_lsn(
2586 : timeline,
2587 : req.hdr.request_lsn,
2588 : req.hdr.not_modified_since,
2589 : &latest_gc_cutoff_lsn,
2590 : ctx,
2591 : )
2592 : .await?;
2593 :
2594 : let kind = SlruKind::from_repr(req.kind)
2595 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2596 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2597 :
2598 : Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
2599 : }
2600 :
2601 : // NB: this impl mimics what we do for batched getpage requests.
2602 : #[cfg(feature = "testing")]
2603 : #[instrument(skip_all, fields(shard_id))]
2604 : async fn handle_test_request_batch(
2605 : timeline: &Timeline,
2606 : requests: Vec<BatchedTestRequest>,
2607 : _ctx: &RequestContext,
2608 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
2609 : {
2610 : // real requests would do something with the timeline
2611 : let mut results = Vec::with_capacity(requests.len());
2612 : for _req in requests.iter() {
2613 : tokio::task::yield_now().await;
2614 :
2615 : results.push({
2616 : if timeline.cancel.is_cancelled() {
2617 : Err(PageReconstructError::Cancelled)
2618 : } else {
2619 : Ok(())
2620 : }
2621 : });
2622 : }
2623 :
2624 : // TODO: avoid creating the new Vec here
2625 : Vec::from_iter(
2626 : requests
2627 : .into_iter()
2628 : .zip(results.into_iter())
2629 0 : .map(|(req, res)| {
2630 0 : res.map(|()| {
2631 0 : (
2632 0 : PagestreamBeMessage::Test(pagestream_api::PagestreamTestResponse {
2633 0 : req: req.req.clone(),
2634 0 : }),
2635 0 : req.timer,
2636 0 : RequestContext::new(
2637 0 : TaskKind::PageRequestHandler,
2638 0 : DownloadBehavior::Warn,
2639 0 : ),
2640 0 : )
2641 0 : })
2642 0 : .map_err(|e| BatchedPageStreamError {
2643 0 : err: PageStreamError::from(e),
2644 0 : req: req.req.hdr,
2645 0 : })
2646 0 : }),
2647 : )
2648 : }
2649 :
2650 : /// Note on "fullbackup":
2651 : /// Full basebackups should only be used for debugging purposes.
2652 : /// Originally, it was introduced to enable breaking storage format changes,
2653 : /// but that is not applicable anymore.
2654 : ///
2655 : /// # Coding Discipline
2656 : ///
2657 : /// Coding discipline within this function: all interaction with the `pgb` connection
2658 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2659 : /// This is so that we can shutdown page_service quickly.
2660 : ///
2661 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2662 : /// to connection cancellation.
2663 : #[allow(clippy::too_many_arguments)]
2664 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2665 : async fn handle_basebackup_request<IO>(
2666 : &mut self,
2667 : pgb: &mut PostgresBackend<IO>,
2668 : tenant_id: TenantId,
2669 : timeline_id: TimelineId,
2670 : lsn: Option<Lsn>,
2671 : prev_lsn: Option<Lsn>,
2672 : full_backup: bool,
2673 : gzip: bool,
2674 : replica: bool,
2675 : ctx: &RequestContext,
2676 : ) -> Result<(), QueryError>
2677 : where
2678 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2679 : {
2680 : let started = std::time::Instant::now();
2681 :
2682 : let timeline = self
2683 : .timeline_handles
2684 : .as_mut()
2685 : .unwrap()
2686 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2687 : .await?;
2688 : set_tracing_field_shard_id(&timeline);
2689 : let ctx = ctx.with_scope_timeline(&timeline);
2690 :
2691 : if timeline.is_archived() == Some(true) {
2692 : tracing::info!(
2693 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2694 : );
2695 : return Err(QueryError::NotFound("timeline is archived".into()));
2696 : }
2697 :
2698 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2699 : if let Some(lsn) = lsn {
2700 : // Backup was requested at a particular LSN. Wait for it to arrive.
2701 : info!("waiting for {}", lsn);
2702 : timeline
2703 : .wait_lsn(
2704 : lsn,
2705 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2706 : crate::tenant::timeline::WaitLsnTimeout::Default,
2707 : &ctx,
2708 : )
2709 : .await?;
2710 : timeline
2711 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2712 : .context("invalid basebackup lsn")?;
2713 : }
2714 :
2715 : let lsn_awaited_after = started.elapsed();
2716 :
2717 : // switch client to COPYOUT
2718 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2719 : .map_err(QueryError::Disconnected)?;
2720 : self.flush_cancellable(pgb, &self.cancel).await?;
2721 :
2722 : let mut from_cache = false;
2723 :
2724 : // Send a tarball of the latest layer on the timeline. Compress if not
2725 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2726 : if full_backup {
2727 : let mut writer = pgb.copyout_writer();
2728 : basebackup::send_basebackup_tarball(
2729 : &mut writer,
2730 : &timeline,
2731 : lsn,
2732 : prev_lsn,
2733 : full_backup,
2734 : replica,
2735 : None,
2736 : &ctx,
2737 : )
2738 : .await?;
2739 : } else {
2740 : let mut writer = BufWriter::new(pgb.copyout_writer());
2741 :
2742 : let cached = timeline
2743 : .get_cached_basebackup_if_enabled(lsn, prev_lsn, full_backup, replica, gzip)
2744 : .await;
2745 :
2746 : if let Some(mut cached) = cached {
2747 : from_cache = true;
2748 : tokio::io::copy(&mut cached, &mut writer)
2749 : .await
2750 0 : .map_err(|err| {
2751 0 : BasebackupError::Client(err, "handle_basebackup_request,cached,copy")
2752 0 : })?;
2753 : } else {
2754 : basebackup::send_basebackup_tarball(
2755 : &mut writer,
2756 : &timeline,
2757 : lsn,
2758 : prev_lsn,
2759 : full_backup,
2760 : replica,
2761 : // NB: using fast compression because it's on the critical path for compute
2762 : // startup. For an empty database, we get <100KB with this method. The
2763 : // Level::Best compression method gives us <20KB, but maybe we should add
2764 : // basebackup caching on compute shutdown first.
2765 : gzip.then_some(async_compression::Level::Fastest),
2766 : &ctx,
2767 : )
2768 : .await?;
2769 : }
2770 : writer
2771 : .flush()
2772 : .await
2773 0 : .map_err(|err| BasebackupError::Client(err, "handle_basebackup_request,flush"))?;
2774 : }
2775 :
2776 : pgb.write_message_noflush(&BeMessage::CopyDone)
2777 : .map_err(QueryError::Disconnected)?;
2778 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2779 :
2780 : let basebackup_after = started
2781 : .elapsed()
2782 : .checked_sub(lsn_awaited_after)
2783 : .unwrap_or(Duration::ZERO);
2784 :
2785 : info!(
2786 : lsn_await_millis = lsn_awaited_after.as_millis(),
2787 : basebackup_millis = basebackup_after.as_millis(),
2788 : %from_cache,
2789 : "basebackup complete"
2790 : );
2791 :
2792 : Ok(())
2793 : }
2794 :
2795 : // when accessing management api supply None as an argument
2796 : // when using to authorize tenant pass corresponding tenant id
2797 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2798 0 : if self.auth.is_none() {
2799 : // auth is set to Trust, nothing to check so just return ok
2800 0 : return Ok(());
2801 0 : }
2802 : // auth is some, just checked above, when auth is some
2803 : // then claims are always present because of checks during connection init
2804 : // so this expect won't trigger
2805 0 : let claims = self
2806 0 : .claims
2807 0 : .as_ref()
2808 0 : .expect("claims presence already checked");
2809 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2810 0 : }
2811 : }
2812 :
2813 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2814 : #[derive(Debug, Clone, Eq, PartialEq)]
2815 : struct BaseBackupCmd {
2816 : tenant_id: TenantId,
2817 : timeline_id: TimelineId,
2818 : lsn: Option<Lsn>,
2819 : gzip: bool,
2820 : replica: bool,
2821 : }
2822 :
2823 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2824 : #[derive(Debug, Clone, Eq, PartialEq)]
2825 : struct FullBackupCmd {
2826 : tenant_id: TenantId,
2827 : timeline_id: TimelineId,
2828 : lsn: Option<Lsn>,
2829 : prev_lsn: Option<Lsn>,
2830 : }
2831 :
2832 : /// `pagestream_v2 tenant timeline`
2833 : #[derive(Debug, Clone, Eq, PartialEq)]
2834 : struct PageStreamCmd {
2835 : tenant_id: TenantId,
2836 : timeline_id: TimelineId,
2837 : protocol_version: PagestreamProtocolVersion,
2838 : }
2839 :
2840 : /// `lease lsn tenant timeline lsn`
2841 : #[derive(Debug, Clone, Eq, PartialEq)]
2842 : struct LeaseLsnCmd {
2843 : tenant_shard_id: TenantShardId,
2844 : timeline_id: TimelineId,
2845 : lsn: Lsn,
2846 : }
2847 :
2848 : #[derive(Debug, Clone, Eq, PartialEq)]
2849 : enum PageServiceCmd {
2850 : Set,
2851 : PageStream(PageStreamCmd),
2852 : BaseBackup(BaseBackupCmd),
2853 : FullBackup(FullBackupCmd),
2854 : LeaseLsn(LeaseLsnCmd),
2855 : }
2856 :
2857 : impl PageStreamCmd {
2858 3 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2859 3 : let parameters = query.split_whitespace().collect_vec();
2860 3 : if parameters.len() != 2 {
2861 1 : bail!(
2862 1 : "invalid number of parameters for pagestream command: {}",
2863 : query
2864 : );
2865 2 : }
2866 2 : let tenant_id = TenantId::from_str(parameters[0])
2867 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2868 1 : let timeline_id = TimelineId::from_str(parameters[1])
2869 1 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2870 1 : Ok(Self {
2871 1 : tenant_id,
2872 1 : timeline_id,
2873 1 : protocol_version,
2874 1 : })
2875 3 : }
2876 : }
2877 :
2878 : impl FullBackupCmd {
2879 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2880 2 : let parameters = query.split_whitespace().collect_vec();
2881 2 : if parameters.len() < 2 || parameters.len() > 4 {
2882 0 : bail!(
2883 0 : "invalid number of parameters for basebackup command: {}",
2884 : query
2885 : );
2886 2 : }
2887 2 : let tenant_id = TenantId::from_str(parameters[0])
2888 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2889 2 : let timeline_id = TimelineId::from_str(parameters[1])
2890 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2891 : // The caller is responsible for providing correct lsn and prev_lsn.
2892 2 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2893 : Some(
2894 1 : Lsn::from_str(lsn_str)
2895 1 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2896 : )
2897 : } else {
2898 1 : None
2899 : };
2900 2 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2901 : Some(
2902 1 : Lsn::from_str(prev_lsn_str)
2903 1 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2904 : )
2905 : } else {
2906 1 : None
2907 : };
2908 2 : Ok(Self {
2909 2 : tenant_id,
2910 2 : timeline_id,
2911 2 : lsn,
2912 2 : prev_lsn,
2913 2 : })
2914 2 : }
2915 : }
2916 :
2917 : impl BaseBackupCmd {
2918 9 : fn parse(query: &str) -> anyhow::Result<Self> {
2919 9 : let parameters = query.split_whitespace().collect_vec();
2920 9 : if parameters.len() < 2 {
2921 0 : bail!(
2922 0 : "invalid number of parameters for basebackup command: {}",
2923 : query
2924 : );
2925 9 : }
2926 9 : let tenant_id = TenantId::from_str(parameters[0])
2927 9 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2928 9 : let timeline_id = TimelineId::from_str(parameters[1])
2929 9 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2930 : let lsn;
2931 : let flags_parse_from;
2932 9 : if let Some(maybe_lsn) = parameters.get(2) {
2933 8 : if *maybe_lsn == "latest" {
2934 1 : lsn = None;
2935 1 : flags_parse_from = 3;
2936 7 : } else if maybe_lsn.starts_with("--") {
2937 5 : lsn = None;
2938 5 : flags_parse_from = 2;
2939 5 : } else {
2940 : lsn = Some(
2941 2 : Lsn::from_str(maybe_lsn)
2942 2 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2943 : );
2944 2 : flags_parse_from = 3;
2945 : }
2946 1 : } else {
2947 1 : lsn = None;
2948 1 : flags_parse_from = 2;
2949 1 : }
2950 :
2951 9 : let mut gzip = false;
2952 9 : let mut replica = false;
2953 :
2954 11 : for ¶m in ¶meters[flags_parse_from..] {
2955 11 : match param {
2956 11 : "--gzip" => {
2957 7 : if gzip {
2958 1 : bail!("duplicate parameter for basebackup command: {param}")
2959 6 : }
2960 6 : gzip = true
2961 : }
2962 4 : "--replica" => {
2963 2 : if replica {
2964 0 : bail!("duplicate parameter for basebackup command: {param}")
2965 2 : }
2966 2 : replica = true
2967 : }
2968 2 : _ => bail!("invalid parameter for basebackup command: {param}"),
2969 : }
2970 : }
2971 6 : Ok(Self {
2972 6 : tenant_id,
2973 6 : timeline_id,
2974 6 : lsn,
2975 6 : gzip,
2976 6 : replica,
2977 6 : })
2978 9 : }
2979 : }
2980 :
2981 : impl LeaseLsnCmd {
2982 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2983 2 : let parameters = query.split_whitespace().collect_vec();
2984 2 : if parameters.len() != 3 {
2985 0 : bail!(
2986 0 : "invalid number of parameters for lease lsn command: {}",
2987 : query
2988 : );
2989 2 : }
2990 2 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2991 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2992 2 : let timeline_id = TimelineId::from_str(parameters[1])
2993 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2994 2 : let lsn = Lsn::from_str(parameters[2])
2995 2 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2996 2 : Ok(Self {
2997 2 : tenant_shard_id,
2998 2 : timeline_id,
2999 2 : lsn,
3000 2 : })
3001 2 : }
3002 : }
3003 :
3004 : impl PageServiceCmd {
3005 21 : fn parse(query: &str) -> anyhow::Result<Self> {
3006 21 : let query = query.trim();
3007 21 : let Some((cmd, other)) = query.split_once(' ') else {
3008 2 : bail!("cannot parse query: {query}")
3009 : };
3010 19 : match cmd.to_ascii_lowercase().as_str() {
3011 19 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
3012 3 : other,
3013 3 : PagestreamProtocolVersion::V2,
3014 2 : )?)),
3015 16 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
3016 0 : other,
3017 0 : PagestreamProtocolVersion::V3,
3018 0 : )?)),
3019 16 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
3020 7 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
3021 5 : "lease" => {
3022 3 : let Some((cmd2, other)) = other.split_once(' ') else {
3023 0 : bail!("invalid lease command: {cmd}");
3024 : };
3025 3 : let cmd2 = cmd2.to_ascii_lowercase();
3026 3 : if cmd2 == "lsn" {
3027 2 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
3028 : } else {
3029 1 : bail!("invalid lease command: {cmd}");
3030 : }
3031 : }
3032 2 : "set" => Ok(Self::Set),
3033 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
3034 : }
3035 21 : }
3036 : }
3037 :
3038 : /// Parse the startup options from the postgres wire protocol startup packet.
3039 : ///
3040 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
3041 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
3042 : /// whether all options are successfully parsed. There could be duplicates in the options
3043 : /// if the caller passed such parameters.
3044 7 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
3045 7 : let mut parsing_config = false;
3046 7 : let mut has_error = false;
3047 7 : let mut config = Vec::new();
3048 16 : for item in options.split_whitespace() {
3049 16 : if item == "-c" {
3050 9 : if !parsing_config {
3051 8 : parsing_config = true;
3052 8 : } else {
3053 : // "-c" followed with another "-c"
3054 1 : tracing::warn!("failed to parse the startup options: {options}");
3055 1 : has_error = true;
3056 1 : break;
3057 : }
3058 7 : } else if item.starts_with("-c") || parsing_config {
3059 7 : let Some((mut key, value)) = item.split_once('=') else {
3060 : // "-c" followed with an invalid option
3061 1 : tracing::warn!("failed to parse the startup options: {options}");
3062 1 : has_error = true;
3063 1 : break;
3064 : };
3065 6 : if !parsing_config {
3066 : // Parse "-coptions=X"
3067 1 : let Some(stripped_key) = key.strip_prefix("-c") else {
3068 0 : tracing::warn!("failed to parse the startup options: {options}");
3069 0 : has_error = true;
3070 0 : break;
3071 : };
3072 1 : key = stripped_key;
3073 5 : }
3074 6 : config.push((key.to_string(), value.to_string()));
3075 6 : parsing_config = false;
3076 : } else {
3077 0 : tracing::warn!("failed to parse the startup options: {options}");
3078 0 : has_error = true;
3079 0 : break;
3080 : }
3081 : }
3082 7 : if parsing_config {
3083 : // "-c" without the option
3084 3 : tracing::warn!("failed to parse the startup options: {options}");
3085 3 : has_error = true;
3086 4 : }
3087 7 : (config, has_error)
3088 7 : }
3089 :
3090 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
3091 : where
3092 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
3093 : {
3094 0 : fn check_auth_jwt(
3095 0 : &mut self,
3096 0 : _pgb: &mut PostgresBackend<IO>,
3097 0 : jwt_response: &[u8],
3098 0 : ) -> Result<(), QueryError> {
3099 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
3100 : // which requires auth to be present
3101 0 : let data: TokenData<Claims> = self
3102 0 : .auth
3103 0 : .as_ref()
3104 0 : .unwrap()
3105 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
3106 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
3107 :
3108 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
3109 0 : return Err(QueryError::Unauthorized(
3110 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
3111 0 : ));
3112 0 : }
3113 :
3114 0 : debug!(
3115 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
3116 : data.claims.scope, data.claims.tenant_id,
3117 : );
3118 :
3119 0 : self.claims = Some(data.claims);
3120 0 : Ok(())
3121 0 : }
3122 :
3123 0 : fn startup(
3124 0 : &mut self,
3125 0 : _pgb: &mut PostgresBackend<IO>,
3126 0 : sm: &FeStartupPacket,
3127 0 : ) -> Result<(), QueryError> {
3128 0 : fail::fail_point!("ps::connection-start::startup-packet");
3129 :
3130 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
3131 0 : if let Some(app_name) = params.get("application_name") {
3132 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
3133 0 : Span::current().record("application_name", field::display(app_name));
3134 0 : }
3135 0 : if let Some(options) = params.get("options") {
3136 0 : let (config, _) = parse_options(options);
3137 0 : for (key, value) in config {
3138 0 : if key == "neon.compute_mode" {
3139 0 : self.perf_span_fields.compute_mode = Some(value.clone());
3140 0 : Span::current().record("compute_mode", field::display(value));
3141 0 : }
3142 : }
3143 0 : }
3144 0 : };
3145 :
3146 0 : Ok(())
3147 0 : }
3148 :
3149 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
3150 : async fn process_query(
3151 : &mut self,
3152 : pgb: &mut PostgresBackend<IO>,
3153 : query_string: &str,
3154 : ) -> Result<(), QueryError> {
3155 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
3156 0 : info!("Hit failpoint for bad connection");
3157 0 : Err(QueryError::SimulatedConnectionError)
3158 0 : });
3159 :
3160 : fail::fail_point!("ps::connection-start::process-query");
3161 :
3162 : let ctx = self.connection_ctx.attached_child();
3163 : debug!("process query {query_string}");
3164 : let query = PageServiceCmd::parse(query_string)?;
3165 : match query {
3166 : PageServiceCmd::PageStream(PageStreamCmd {
3167 : tenant_id,
3168 : timeline_id,
3169 : protocol_version,
3170 : }) => {
3171 : tracing::Span::current()
3172 : .record("tenant_id", field::display(tenant_id))
3173 : .record("timeline_id", field::display(timeline_id));
3174 :
3175 : self.check_permission(Some(tenant_id))?;
3176 : let command_kind = match protocol_version {
3177 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
3178 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
3179 : };
3180 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
3181 :
3182 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
3183 : .await?;
3184 : }
3185 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3186 : tenant_id,
3187 : timeline_id,
3188 : lsn,
3189 : gzip,
3190 : replica,
3191 : }) => {
3192 : tracing::Span::current()
3193 : .record("tenant_id", field::display(tenant_id))
3194 : .record("timeline_id", field::display(timeline_id));
3195 :
3196 : self.check_permission(Some(tenant_id))?;
3197 :
3198 : COMPUTE_COMMANDS_COUNTERS
3199 : .for_command(ComputeCommandKind::Basebackup)
3200 : .inc();
3201 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
3202 0 : let res = async {
3203 0 : self.handle_basebackup_request(
3204 0 : pgb,
3205 0 : tenant_id,
3206 0 : timeline_id,
3207 0 : lsn,
3208 0 : None,
3209 0 : false,
3210 0 : gzip,
3211 0 : replica,
3212 0 : &ctx,
3213 0 : )
3214 0 : .await?;
3215 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3216 0 : Result::<(), QueryError>::Ok(())
3217 0 : }
3218 : .await;
3219 : metric_recording.observe(&res);
3220 : res?;
3221 : }
3222 : // same as basebackup, but result includes relational data as well
3223 : PageServiceCmd::FullBackup(FullBackupCmd {
3224 : tenant_id,
3225 : timeline_id,
3226 : lsn,
3227 : prev_lsn,
3228 : }) => {
3229 : tracing::Span::current()
3230 : .record("tenant_id", field::display(tenant_id))
3231 : .record("timeline_id", field::display(timeline_id));
3232 :
3233 : self.check_permission(Some(tenant_id))?;
3234 :
3235 : COMPUTE_COMMANDS_COUNTERS
3236 : .for_command(ComputeCommandKind::Fullbackup)
3237 : .inc();
3238 :
3239 : // Check that the timeline exists
3240 : self.handle_basebackup_request(
3241 : pgb,
3242 : tenant_id,
3243 : timeline_id,
3244 : lsn,
3245 : prev_lsn,
3246 : true,
3247 : false,
3248 : false,
3249 : &ctx,
3250 : )
3251 : .await?;
3252 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3253 : }
3254 : PageServiceCmd::Set => {
3255 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3256 : // on connect
3257 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3258 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3259 : }
3260 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3261 : tenant_shard_id,
3262 : timeline_id,
3263 : lsn,
3264 : }) => {
3265 : tracing::Span::current()
3266 : .record("tenant_id", field::display(tenant_shard_id))
3267 : .record("timeline_id", field::display(timeline_id));
3268 :
3269 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3270 :
3271 : COMPUTE_COMMANDS_COUNTERS
3272 : .for_command(ComputeCommandKind::LeaseLsn)
3273 : .inc();
3274 :
3275 : match self
3276 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3277 : .await
3278 : {
3279 : Ok(()) => {
3280 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3281 : }
3282 : Err(e) => {
3283 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3284 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3285 : &e.to_string(),
3286 : Some(e.pg_error_code()),
3287 : ))?
3288 : }
3289 : };
3290 : }
3291 : }
3292 :
3293 : Ok(())
3294 : }
3295 : }
3296 :
3297 : /// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
3298 : ///
3299 : /// TODO: rename to PageServiceHandler when libpq impl is removed.
3300 : pub struct GrpcPageServiceHandler {
3301 : tenant_manager: Arc<TenantManager>,
3302 : ctx: RequestContext,
3303 :
3304 : /// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for
3305 : /// in-flight requests to complete. Any tasks we spawn ourselves must respect this token.
3306 : cancel: CancellationToken,
3307 :
3308 : /// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to
3309 : /// complete during shutdown. Request handlers implicitly hold this guard already.
3310 : gate_guard: GateGuard,
3311 :
3312 : /// `get_vectored` concurrency setting.
3313 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3314 : }
3315 :
3316 : impl GrpcPageServiceHandler {
3317 : /// Spawns a gRPC server for the page service.
3318 : ///
3319 : /// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for
3320 : /// any in-flight requests and tasks to complete first.
3321 : ///
3322 : /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
3323 : /// need to reimplement the TCP+TLS accept loop ourselves.
3324 0 : pub fn spawn(
3325 0 : tenant_manager: Arc<TenantManager>,
3326 0 : auth: Option<Arc<SwappableJwtAuth>>,
3327 0 : perf_trace_dispatch: Option<Dispatch>,
3328 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
3329 0 : listener: std::net::TcpListener,
3330 0 : ) -> anyhow::Result<CancellableTask> {
3331 : // Set up a cancellation token for shutting down the server, and a gate to wait for all
3332 : // requests and spawned tasks to complete.
3333 0 : let cancel = CancellationToken::new();
3334 0 : let gate = Gate::default();
3335 :
3336 0 : let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
3337 0 : .download_behavior(DownloadBehavior::Download)
3338 0 : .perf_span_dispatch(perf_trace_dispatch)
3339 0 : .detached_child();
3340 :
3341 : // Set up the TCP socket. We take a preconfigured TcpListener to bind the
3342 : // port early during startup.
3343 0 : let incoming = {
3344 0 : let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
3345 0 : listener.set_nonblocking(true)?;
3346 0 : tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(
3347 0 : listener,
3348 0 : )?)
3349 0 : .with_nodelay(Some(GRPC_TCP_NODELAY))
3350 0 : .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
3351 : };
3352 :
3353 : // Set up the gRPC server.
3354 : //
3355 : // TODO: consider tuning window sizes.
3356 0 : let mut server = tonic::transport::Server::builder()
3357 0 : .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
3358 0 : .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
3359 0 : .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
3360 :
3361 : // Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
3362 : //
3363 : // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
3364 : //
3365 : // * Layers: allow async code, can run code after the service response. However, only has access
3366 : // to the raw HTTP request/response, not the gRPC types.
3367 0 : let page_service_handler = GrpcPageServiceHandler {
3368 0 : tenant_manager,
3369 0 : ctx,
3370 0 : cancel: cancel.clone(),
3371 0 : gate_guard: gate.enter().expect("gate was just created"),
3372 0 : get_vectored_concurrent_io,
3373 0 : };
3374 :
3375 0 : let observability_layer = ObservabilityLayer;
3376 0 : let mut tenant_interceptor = TenantMetadataInterceptor;
3377 0 : let mut auth_interceptor = TenantAuthInterceptor::new(auth);
3378 :
3379 0 : let page_service = tower::ServiceBuilder::new()
3380 : // Create tracing span and record request start time.
3381 0 : .layer(observability_layer)
3382 : // Intercept gRPC requests.
3383 0 : .layer(tonic::service::InterceptorLayer::new(move |mut req| {
3384 : // Extract tenant metadata.
3385 0 : req = tenant_interceptor.call(req)?;
3386 : // Authenticate tenant JWT token.
3387 0 : req = auth_interceptor.call(req)?;
3388 0 : Ok(req)
3389 0 : }))
3390 : // Run the page service.
3391 0 : .service(
3392 0 : proto::PageServiceServer::new(page_service_handler)
3393 : // Support both gzip and zstd compression. The client decides what to use.
3394 0 : .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
3395 0 : .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
3396 0 : .send_compressed(tonic::codec::CompressionEncoding::Gzip)
3397 0 : .send_compressed(tonic::codec::CompressionEncoding::Zstd),
3398 : );
3399 0 : let server = server.add_service(page_service);
3400 :
3401 : // Reflection service for use with e.g. grpcurl.
3402 0 : let reflection_service = tonic_reflection::server::Builder::configure()
3403 0 : .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
3404 0 : .build_v1()?;
3405 0 : let server = server.add_service(reflection_service);
3406 :
3407 : // Spawn server task. It runs until the cancellation token fires and in-flight requests and
3408 : // tasks complete. The `CancellableTask` will wait for the task's join handle, which
3409 : // implicitly waits for the gate to close.
3410 0 : let task_cancel = cancel.clone();
3411 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
3412 : "grpc pageservice listener",
3413 0 : async move {
3414 0 : server
3415 0 : .serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
3416 0 : .await?;
3417 : // Server exited cleanly. All requests should have completed by now. Wait for any
3418 : // spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate.
3419 0 : gate.close().await;
3420 0 : anyhow::Ok(())
3421 0 : },
3422 : ));
3423 :
3424 0 : Ok(CancellableTask { task, cancel })
3425 0 : }
3426 :
3427 : /// Generates a PagestreamRequest header from a ReadLsn and request ID.
3428 0 : fn make_hdr(
3429 0 : read_lsn: page_api::ReadLsn,
3430 0 : req_id: Option<page_api::RequestID>,
3431 0 : ) -> PagestreamRequest {
3432 : PagestreamRequest {
3433 0 : reqid: req_id.map(|r| r.id).unwrap_or_default(),
3434 0 : request_lsn: read_lsn.request_lsn,
3435 0 : not_modified_since: read_lsn
3436 0 : .not_modified_since_lsn
3437 0 : .unwrap_or(read_lsn.request_lsn),
3438 : }
3439 0 : }
3440 :
3441 : /// Acquires a timeline handle for the given request. The shard index must match a local shard.
3442 : ///
3443 : /// NB: this will fail during shard splits, see comment on [`Self::maybe_split_get_page`].
3444 0 : async fn get_request_timeline(
3445 0 : &self,
3446 0 : req: &tonic::Request<impl Any>,
3447 0 : ) -> Result<Handle<TenantManagerTypes>, GetActiveTimelineError> {
3448 : let TenantTimelineId {
3449 0 : tenant_id,
3450 0 : timeline_id,
3451 0 : } = *extract::<TenantTimelineId>(req);
3452 0 : let shard_index = *extract::<ShardIndex>(req);
3453 :
3454 : // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to
3455 : // avoid the unnecessary overhead.
3456 0 : TimelineHandles::new(self.tenant_manager.clone())
3457 0 : .get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
3458 0 : .await
3459 0 : }
3460 :
3461 : /// Acquires a timeline handle for the given request, which must be for shard zero. Most
3462 : /// metadata requests are only valid on shard zero.
3463 : ///
3464 : /// NB: during an ongoing shard split, the compute will keep talking to the parent shard until
3465 : /// the split is committed, but the parent shard may have been removed in the meanwhile. In that
3466 : /// case, we reroute the request to the new child shard. See [`Self::maybe_split_get_page`].
3467 : ///
3468 : /// TODO: revamp the split protocol to avoid this child routing.
3469 0 : async fn get_request_timeline_shard_zero(
3470 0 : &self,
3471 0 : req: &tonic::Request<impl Any>,
3472 0 : ) -> Result<Handle<TenantManagerTypes>, tonic::Status> {
3473 : let TenantTimelineId {
3474 0 : tenant_id,
3475 0 : timeline_id,
3476 0 : } = *extract::<TenantTimelineId>(req);
3477 0 : let shard_index = *extract::<ShardIndex>(req);
3478 :
3479 0 : if shard_index.shard_number.0 != 0 {
3480 0 : return Err(tonic::Status::invalid_argument(format!(
3481 0 : "request only valid on shard zero (requested shard {shard_index})",
3482 0 : )));
3483 0 : }
3484 :
3485 : // TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to
3486 : // avoid the unnecessary overhead.
3487 0 : let mut handles = TimelineHandles::new(self.tenant_manager.clone());
3488 0 : match handles
3489 0 : .get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
3490 0 : .await
3491 : {
3492 0 : Ok(timeline) => Ok(timeline),
3493 0 : Err(err) => {
3494 : // We may be in the middle of a shard split. Try to find a child shard 0.
3495 0 : if let Ok(timeline) = handles
3496 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
3497 0 : .await
3498 0 : && timeline.get_shard_index().shard_count > shard_index.shard_count
3499 : {
3500 0 : return Ok(timeline);
3501 0 : }
3502 0 : Err(err.into())
3503 : }
3504 : }
3505 0 : }
3506 :
3507 : /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
3508 : /// Only errors if the timeline is shutting down.
3509 : ///
3510 : /// TODO: move timer construction to ObservabilityLayer (see TODO there).
3511 : /// TODO: decouple rate limiting (middleware?), and return SlowDown errors instead.
3512 0 : async fn record_op_start_and_throttle(
3513 0 : timeline: &Handle<TenantManagerTypes>,
3514 0 : op: metrics::SmgrQueryType,
3515 0 : received_at: Instant,
3516 0 : ) -> Result<SmgrOpTimer, tonic::Status> {
3517 0 : let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
3518 0 : .await
3519 0 : .map_err(|err| match err {
3520 : // record_op_start_and_throttle() only returns Shutdown.
3521 0 : QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
3522 0 : err => tonic::Status::internal(format!("unexpected error: {err}")),
3523 0 : })?;
3524 0 : timer.observe_execution_start(Instant::now());
3525 0 : Ok(timer)
3526 0 : }
3527 :
3528 : /// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
3529 : ///
3530 : /// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
3531 : /// GetPageResponse with an appropriate status code to avoid terminating the stream.
3532 : ///
3533 : /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
3534 : /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
3535 : /// split them up in the client or server.
3536 : #[instrument(skip_all, fields(
3537 : req_id = %req.request_id,
3538 : rel = %req.rel,
3539 0 : blkno = %req.block_numbers[0],
3540 : blks = %req.block_numbers.len(),
3541 : lsn = %req.read_lsn,
3542 : ))]
3543 : async fn get_page(
3544 : ctx: &RequestContext,
3545 : timeline: Handle<TenantManagerTypes>,
3546 : req: page_api::GetPageRequest,
3547 : io_concurrency: IoConcurrency,
3548 : received_at: Instant,
3549 : ) -> Result<page_api::GetPageResponse, tonic::Status> {
3550 : let ctx = ctx.with_scope_page_service_pagestream(&timeline);
3551 :
3552 : for &blkno in &req.block_numbers {
3553 : let shard = timeline.get_shard_identity();
3554 : let key = rel_block_to_key(req.rel, blkno);
3555 : if !shard.is_key_local(&key) {
3556 : return Err(tonic::Status::invalid_argument(format!(
3557 : "block {blkno} of relation {} requested on wrong shard {} (is on {})",
3558 : req.rel,
3559 : timeline.get_shard_index(),
3560 : ShardIndex::new(shard.get_shard_number(&key), shard.count),
3561 : )));
3562 : }
3563 : }
3564 :
3565 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
3566 : let effective_lsn = PageServerHandler::effective_request_lsn(
3567 : &timeline,
3568 : timeline.get_last_record_lsn(),
3569 : req.read_lsn.request_lsn,
3570 : req.read_lsn
3571 : .not_modified_since_lsn
3572 : .unwrap_or(req.read_lsn.request_lsn),
3573 : &latest_gc_cutoff_lsn,
3574 : )?;
3575 :
3576 : let mut batch = SmallVec::with_capacity(req.block_numbers.len());
3577 : for blkno in req.block_numbers {
3578 : // TODO: this creates one timer per page and throttles it. We should have a timer for
3579 : // the entire batch, and throttle only the batch, but this is equivalent to what
3580 : // PageServerHandler does already so we keep it for now.
3581 : let timer = Self::record_op_start_and_throttle(
3582 : &timeline,
3583 : metrics::SmgrQueryType::GetPageAtLsn,
3584 : received_at,
3585 : )
3586 : .await?;
3587 :
3588 : batch.push(BatchedGetPageRequest {
3589 : req: PagestreamGetPageRequest {
3590 : hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)),
3591 : rel: req.rel,
3592 : blkno,
3593 : },
3594 : lsn_range: LsnRange {
3595 : effective_lsn,
3596 : request_lsn: req.read_lsn.request_lsn,
3597 : },
3598 : timer,
3599 : ctx: ctx.attached_child(),
3600 : batch_wait_ctx: None, // TODO: add tracing
3601 : });
3602 : }
3603 :
3604 : // TODO: this does a relation size query for every page in the batch. Since this batch is
3605 : // all for one relation, we could do this only once. However, this is not the case for the
3606 : // libpq implementation.
3607 : let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
3608 : &timeline,
3609 : batch,
3610 : io_concurrency,
3611 : GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
3612 : &ctx,
3613 : )
3614 : .await;
3615 :
3616 : let mut resp = page_api::GetPageResponse {
3617 : request_id: req.request_id,
3618 : status_code: page_api::GetPageStatusCode::Ok,
3619 : reason: None,
3620 : rel: req.rel,
3621 : pages: Vec::with_capacity(results.len()),
3622 : };
3623 :
3624 : for result in results {
3625 : match result {
3626 : Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page {
3627 : block_number: r.req.blkno,
3628 : image: r.page,
3629 : }),
3630 : Ok((resp, _, _)) => {
3631 : return Err(tonic::Status::internal(format!(
3632 : "unexpected response: {resp:?}"
3633 : )));
3634 : }
3635 : Err(err) => return Err(err.err.into()),
3636 : };
3637 : }
3638 :
3639 : Ok(resp)
3640 : }
3641 :
3642 : /// Processes a GetPage request when there is a potential shard split in progress. We have to
3643 : /// reroute the request to any local child shards, and split batch requests that straddle
3644 : /// multiple child shards.
3645 : ///
3646 : /// Parent shards are split and removed incrementally (there may be many parent shards when
3647 : /// splitting an already-sharded tenant), but the compute is only notified once the overall
3648 : /// split commits, which can take several minutes. In the meanwhile, the compute will be sending
3649 : /// requests to the parent shards.
3650 : ///
3651 : /// TODO: add test infrastructure to provoke this situation frequently and for long periods of
3652 : /// time, to properly exercise it.
3653 : ///
3654 : /// TODO: revamp the split protocol to avoid this, e.g.:
3655 : /// * Keep the parent shard until the split commits and the compute is notified.
3656 : /// * Notify the compute about each subsplit.
3657 : /// * Return an error that updates the compute's shard map.
3658 : #[instrument(skip_all)]
3659 : #[allow(clippy::too_many_arguments)]
3660 : async fn maybe_split_get_page(
3661 : ctx: &RequestContext,
3662 : handles: &mut TimelineHandles,
3663 : tenant_id: TenantId,
3664 : timeline_id: TimelineId,
3665 : parent: ShardIndex,
3666 : req: page_api::GetPageRequest,
3667 : io_concurrency: IoConcurrency,
3668 : received_at: Instant,
3669 : ) -> Result<page_api::GetPageResponse, tonic::Status> {
3670 : // Check the first page to see if we have any child shards at all. Otherwise, the compute is
3671 : // just talking to the wrong Pageserver. If the parent has been split, the shard now owning
3672 : // the page must have a higher shard count.
3673 : let timeline = handles
3674 : .get(
3675 : tenant_id,
3676 : timeline_id,
3677 : ShardSelector::Page(rel_block_to_key(req.rel, req.block_numbers[0])),
3678 : )
3679 : .await?;
3680 :
3681 : let shard_id = timeline.get_shard_identity();
3682 : if shard_id.count <= parent.shard_count {
3683 : return Err(HandleUpgradeError::ShutDown.into()); // emulate original error
3684 : }
3685 :
3686 : // Fast path: the request fits in a single shard.
3687 : if let Some(shard_index) =
3688 : GetPageSplitter::for_single_shard(&req, shard_id.count, Some(shard_id.stripe_size))?
3689 : {
3690 : // We got the shard ID from the first page, so these must be equal.
3691 : assert_eq!(shard_index.shard_number, shard_id.number);
3692 : assert_eq!(shard_index.shard_count, shard_id.count);
3693 : return Self::get_page(ctx, timeline, req, io_concurrency, received_at).await;
3694 : }
3695 :
3696 : // The request spans multiple shards; split it and dispatch parallel requests. All pages
3697 : // were originally in the parent shard, and during a split all children are local, so we
3698 : // expect to find local shards for all pages.
3699 : let mut splitter = GetPageSplitter::split(req, shard_id.count, Some(shard_id.stripe_size))?;
3700 :
3701 : let mut shard_requests = FuturesUnordered::new();
3702 : for (shard_index, shard_req) in splitter.drain_requests() {
3703 : let timeline = handles
3704 : .get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
3705 : .await?;
3706 : let future = Self::get_page(
3707 : ctx,
3708 : timeline,
3709 : shard_req,
3710 : io_concurrency.clone(),
3711 : received_at,
3712 : )
3713 0 : .map(move |result| result.map(|resp| (shard_index, resp)));
3714 : shard_requests.push(future);
3715 : }
3716 :
3717 : while let Some((shard_index, shard_response)) = shard_requests.next().await.transpose()? {
3718 : splitter.add_response(shard_index, shard_response)?;
3719 : }
3720 :
3721 : Ok(splitter.collect_response()?)
3722 : }
3723 : }
3724 :
3725 : /// Implements the gRPC page service.
3726 : ///
3727 : /// On client disconnect (e.g. timeout or client shutdown), Tonic will drop the request handler
3728 : /// futures, so the read path must be cancellation-safe. On server shutdown, Tonic will wait for
3729 : /// in-flight requests to complete.
3730 : ///
3731 : /// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
3732 : #[tonic::async_trait]
3733 : impl proto::PageService for GrpcPageServiceHandler {
3734 : type GetBaseBackupStream = Pin<
3735 : Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
3736 : >;
3737 :
3738 : type GetPagesStream =
3739 : Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
3740 :
3741 : #[instrument(skip_all, fields(lsn))]
3742 : async fn get_base_backup(
3743 : &self,
3744 : req: tonic::Request<proto::GetBaseBackupRequest>,
3745 0 : ) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
3746 : // Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
3747 : // to be the sweet spot where throughput is saturated.
3748 : const CHUNK_SIZE: usize = 256 * 1024;
3749 :
3750 0 : let timeline = self.get_request_timeline_shard_zero(&req).await?;
3751 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
3752 :
3753 : // Validate the request and decorate the span.
3754 0 : if timeline.is_archived() == Some(true) {
3755 0 : return Err(tonic::Status::failed_precondition("timeline is archived"));
3756 0 : }
3757 0 : let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
3758 :
3759 0 : span_record!(lsn=?req.lsn);
3760 :
3761 : // Wait for the LSN to arrive, if given.
3762 0 : if let Some(lsn) = req.lsn {
3763 0 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
3764 0 : timeline
3765 0 : .wait_lsn(
3766 0 : lsn,
3767 0 : WaitLsnWaiter::PageService,
3768 0 : WaitLsnTimeout::Default,
3769 0 : &ctx,
3770 0 : )
3771 0 : .await?;
3772 0 : timeline
3773 0 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
3774 0 : .map_err(|err| {
3775 0 : tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
3776 0 : })?;
3777 0 : }
3778 :
3779 : // Spawn a task to run the basebackup.
3780 0 : let span = Span::current();
3781 0 : let gate_guard = self
3782 0 : .gate_guard
3783 0 : .try_clone()
3784 0 : .map_err(|_| tonic::Status::unavailable("shutting down"))?;
3785 0 : let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
3786 0 : let jh = tokio::spawn(async move {
3787 0 : let _gate_guard = gate_guard; // keep gate open until task completes
3788 :
3789 0 : let gzip_level = match req.compression {
3790 0 : page_api::BaseBackupCompression::None => None,
3791 : // NB: using fast compression because it's on the critical path for compute
3792 : // startup. For an empty database, we get <100KB with this method. The
3793 : // Level::Best compression method gives us <20KB, but maybe we should add
3794 : // basebackup caching on compute shutdown first.
3795 0 : page_api::BaseBackupCompression::Gzip => Some(async_compression::Level::Fastest),
3796 : };
3797 :
3798 : // Check for a cached basebackup.
3799 0 : let cached = timeline
3800 0 : .get_cached_basebackup_if_enabled(
3801 0 : req.lsn,
3802 0 : None,
3803 0 : req.full,
3804 0 : req.replica,
3805 0 : gzip_level.is_some(),
3806 0 : )
3807 0 : .await;
3808 :
3809 0 : let result = if let Some(mut cached) = cached {
3810 : // If we have a cached basebackup, send it.
3811 0 : tokio::io::copy(&mut cached, &mut simplex_write)
3812 0 : .await
3813 0 : .map(|_| ())
3814 0 : .map_err(|err| BasebackupError::Client(err, "cached,copy"))
3815 : } else {
3816 0 : basebackup::send_basebackup_tarball(
3817 0 : &mut simplex_write,
3818 0 : &timeline,
3819 0 : req.lsn,
3820 0 : None,
3821 0 : req.full,
3822 0 : req.replica,
3823 0 : gzip_level,
3824 0 : &ctx,
3825 0 : )
3826 0 : .instrument(span) // propagate request span
3827 0 : .await
3828 : };
3829 0 : simplex_write
3830 0 : .shutdown()
3831 0 : .await
3832 0 : .map_err(|err| BasebackupError::Client(err, "simplex_write"))?;
3833 0 : result
3834 0 : });
3835 :
3836 : // Emit chunks of size CHUNK_SIZE.
3837 0 : let chunks = async_stream::try_stream! {
3838 : loop {
3839 : let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
3840 : loop {
3841 0 : let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
3842 0 : tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
3843 0 : })?;
3844 : if n == 0 {
3845 : break; // full chunk or closed stream
3846 : }
3847 : }
3848 : let chunk = chunk.into_inner().freeze();
3849 : if chunk.is_empty() {
3850 : break;
3851 : }
3852 : yield proto::GetBaseBackupResponseChunk::from(chunk);
3853 : }
3854 : // Wait for the basebackup task to exit and check for errors.
3855 0 : jh.await.map_err(|err| {
3856 0 : tonic::Status::internal(format!("basebackup failed: {err}"))
3857 0 : })??;
3858 : };
3859 :
3860 0 : Ok(tonic::Response::new(Box::pin(chunks)))
3861 0 : }
3862 :
3863 : #[instrument(skip_all, fields(db_oid, lsn))]
3864 : async fn get_db_size(
3865 : &self,
3866 : req: tonic::Request<proto::GetDbSizeRequest>,
3867 0 : ) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
3868 0 : let received_at = extract::<ReceivedAt>(&req).0;
3869 0 : let timeline = self.get_request_timeline_shard_zero(&req).await?;
3870 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
3871 :
3872 : // Validate the request, decorate the span, and convert it to a Pagestream request.
3873 0 : let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
3874 :
3875 0 : span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
3876 :
3877 0 : let req = PagestreamDbSizeRequest {
3878 0 : hdr: Self::make_hdr(req.read_lsn, None),
3879 0 : dbnode: req.db_oid,
3880 0 : };
3881 :
3882 : // Execute the request and convert the response.
3883 0 : let _timer = Self::record_op_start_and_throttle(
3884 0 : &timeline,
3885 0 : metrics::SmgrQueryType::GetDbSize,
3886 0 : received_at,
3887 0 : )
3888 0 : .await?;
3889 :
3890 0 : let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
3891 0 : let resp = resp.db_size as page_api::GetDbSizeResponse;
3892 0 : Ok(tonic::Response::new(resp.into()))
3893 0 : }
3894 :
3895 : // NB: don't instrument this, instrument each streamed request.
3896 0 : async fn get_pages(
3897 : &self,
3898 : req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
3899 0 : ) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
3900 : // Extract the timeline from the request and check that it exists.
3901 : //
3902 : // NB: during shard splits, the compute may still send requests to the parent shard. We'll
3903 : // reroute requests to the child shards below, but we also detect the common cases here
3904 : // where either the shard exists or no shards exist at all. If we have a child shard, we
3905 : // can't acquire a weak handle because we don't know which child shard to use yet.
3906 : let TenantTimelineId {
3907 0 : tenant_id,
3908 0 : timeline_id,
3909 0 : } = *extract::<TenantTimelineId>(&req);
3910 0 : let shard_index = *extract::<ShardIndex>(&req);
3911 :
3912 0 : let mut handles = TimelineHandles::new(self.tenant_manager.clone());
3913 0 : let timeline = match handles
3914 0 : .get(tenant_id, timeline_id, ShardSelector::Known(shard_index))
3915 0 : .await
3916 : {
3917 : // The timeline shard exists. Keep a weak handle to reuse for each request.
3918 0 : Ok(timeline) => Some(timeline.downgrade()),
3919 : // The shard doesn't exist, but a child shard does. We'll reroute requests later.
3920 0 : Err(_) if self.tenant_manager.has_child_shard(tenant_id, shard_index) => None,
3921 : // Failed to fetch the timeline, and no child shard exists. Error out.
3922 0 : Err(err) => return Err(err.into()),
3923 : };
3924 :
3925 : // Spawn an IoConcurrency sidecar, if enabled.
3926 0 : let gate_guard = self
3927 0 : .gate_guard
3928 0 : .try_clone()
3929 0 : .map_err(|_| tonic::Status::unavailable("shutting down"))?;
3930 0 : let io_concurrency =
3931 0 : IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
3932 :
3933 : // Construct the GetPageRequest stream handler.
3934 0 : let span = Span::current();
3935 0 : let ctx = self.ctx.attached_child();
3936 0 : let cancel = self.cancel.clone();
3937 0 : let mut reqs = req.into_inner();
3938 :
3939 0 : let resps = async_stream::try_stream! {
3940 : loop {
3941 : // Wait for the next client request.
3942 : //
3943 : // NB: Tonic considers the entire stream to be an in-flight request and will wait
3944 : // for it to complete before shutting down. React to cancellation between requests.
3945 : let req = tokio::select! {
3946 : biased;
3947 : _ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")),
3948 :
3949 : result = reqs.message() => match result {
3950 : Ok(Some(req)) => Ok(req),
3951 : Ok(None) => break, // client closed the stream
3952 : Err(err) => Err(err),
3953 : },
3954 : }?;
3955 :
3956 : let received_at = Instant::now();
3957 : let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
3958 :
3959 : // Process the request, using a closure to capture errors.
3960 0 : let process_request = async || {
3961 0 : let req = page_api::GetPageRequest::try_from(req)?;
3962 :
3963 : // Fast path: use the pre-acquired timeline handle.
3964 0 : if let Some(Ok(timeline)) = timeline.as_ref().map(|t| t.upgrade()) {
3965 0 : return Self::get_page(&ctx, timeline, req, io_concurrency.clone(), received_at)
3966 0 : .instrument(span.clone()) // propagate request span
3967 0 : .await
3968 0 : }
3969 :
3970 : // The timeline handle is stale. During shard splits, the compute may still be
3971 : // sending requests to the parent shard. Try to re-route requests to the child
3972 : // shards, and split any batch requests that straddle multiple child shards.
3973 0 : Self::maybe_split_get_page(
3974 0 : &ctx,
3975 0 : &mut handles,
3976 0 : tenant_id,
3977 0 : timeline_id,
3978 0 : shard_index,
3979 0 : req,
3980 0 : io_concurrency.clone(),
3981 0 : received_at,
3982 0 : )
3983 0 : .instrument(span.clone()) // propagate request span
3984 0 : .await
3985 0 : };
3986 :
3987 : // Return the response. Convert per-request errors to GetPageResponses if
3988 : // appropriate, or terminate the stream with a tonic::Status.
3989 : yield match process_request().await {
3990 : Ok(resp) => resp.into(),
3991 : Err(status) => {
3992 : // Log the error, since ObservabilityLayer won't see stream errors.
3993 : // TODO: it would be nice if we could propagate the get_page() fields here.
3994 0 : span.in_scope(|| {
3995 0 : warn!("request failed with {:?}: {}", status.code(), status.message());
3996 0 : });
3997 : page_api::GetPageResponse::try_from_status(status, req_id)?.into()
3998 : }
3999 : }
4000 : }
4001 : };
4002 :
4003 0 : Ok(tonic::Response::new(Box::pin(resps)))
4004 0 : }
4005 :
4006 : #[instrument(skip_all, fields(rel, lsn, allow_missing))]
4007 : async fn get_rel_size(
4008 : &self,
4009 : req: tonic::Request<proto::GetRelSizeRequest>,
4010 0 : ) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
4011 0 : let received_at = extract::<ReceivedAt>(&req).0;
4012 0 : let timeline = self.get_request_timeline_shard_zero(&req).await?;
4013 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
4014 :
4015 : // Validate the request, decorate the span, and convert it to a Pagestream request.
4016 0 : let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
4017 0 : let allow_missing = req.allow_missing;
4018 :
4019 0 : span_record!(rel=%req.rel, lsn=%req.read_lsn, allow_missing=%req.allow_missing);
4020 :
4021 0 : let req = PagestreamNblocksRequest {
4022 0 : hdr: Self::make_hdr(req.read_lsn, None),
4023 0 : rel: req.rel,
4024 0 : };
4025 :
4026 : // Execute the request and convert the response.
4027 0 : let _timer = Self::record_op_start_and_throttle(
4028 0 : &timeline,
4029 0 : metrics::SmgrQueryType::GetRelSize,
4030 0 : received_at,
4031 0 : )
4032 0 : .await?;
4033 :
4034 0 : let resp =
4035 0 : PageServerHandler::handle_get_nblocks_request(&timeline, &req, allow_missing, &ctx)
4036 0 : .await?;
4037 0 : let resp: page_api::GetRelSizeResponse = resp.map(|resp| resp.n_blocks);
4038 :
4039 0 : Ok(tonic::Response::new(resp.into()))
4040 0 : }
4041 :
4042 : #[instrument(skip_all, fields(kind, segno, lsn))]
4043 : async fn get_slru_segment(
4044 : &self,
4045 : req: tonic::Request<proto::GetSlruSegmentRequest>,
4046 0 : ) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
4047 0 : let received_at = extract::<ReceivedAt>(&req).0;
4048 0 : let timeline = self.get_request_timeline_shard_zero(&req).await?;
4049 0 : let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
4050 :
4051 : // Validate the request, decorate the span, and convert it to a Pagestream request.
4052 0 : let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
4053 :
4054 0 : span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
4055 :
4056 0 : let req = PagestreamGetSlruSegmentRequest {
4057 0 : hdr: Self::make_hdr(req.read_lsn, None),
4058 0 : kind: req.kind as u8,
4059 0 : segno: req.segno,
4060 0 : };
4061 :
4062 : // Execute the request and convert the response.
4063 0 : let _timer = Self::record_op_start_and_throttle(
4064 0 : &timeline,
4065 0 : metrics::SmgrQueryType::GetSlruSegment,
4066 0 : received_at,
4067 0 : )
4068 0 : .await?;
4069 :
4070 0 : let resp =
4071 0 : PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
4072 0 : let resp: page_api::GetSlruSegmentResponse = resp.segment;
4073 0 : Ok(tonic::Response::new(resp.into()))
4074 0 : }
4075 :
4076 : #[instrument(skip_all, fields(lsn))]
4077 : async fn lease_lsn(
4078 : &self,
4079 : req: tonic::Request<proto::LeaseLsnRequest>,
4080 0 : ) -> Result<tonic::Response<proto::LeaseLsnResponse>, tonic::Status> {
4081 : // TODO: this won't work during shard splits, as the request is directed at a specific shard
4082 : // but the parent shard is removed before the split commits and the compute is notified
4083 : // (which can take several minutes for large tenants). That's also the case for the libpq
4084 : // implementation, so we keep the behavior for now.
4085 0 : let timeline = self.get_request_timeline(&req).await?;
4086 0 : let ctx = self.ctx.with_scope_timeline(&timeline);
4087 :
4088 : // Validate and convert the request, and decorate the span.
4089 0 : let req: page_api::LeaseLsnRequest = req.into_inner().try_into()?;
4090 :
4091 0 : span_record!(lsn=%req.lsn);
4092 :
4093 : // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
4094 0 : let lease_length = timeline.get_lsn_lease_length();
4095 0 : let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) {
4096 0 : Ok(lease) => lease.valid_until,
4097 0 : Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
4098 : };
4099 :
4100 : // TODO: is this spammy? Move it compute-side?
4101 0 : info!(
4102 0 : "acquired lease for {} until {}",
4103 : req.lsn,
4104 0 : chrono::DateTime::<Utc>::from(expires).to_rfc3339()
4105 : );
4106 :
4107 0 : Ok(tonic::Response::new(expires.into()))
4108 0 : }
4109 : }
4110 :
4111 : /// gRPC middleware layer that handles observability concerns:
4112 : ///
4113 : /// * Creates and enters a tracing span.
4114 : /// * Records the request start time as a ReceivedAt request extension.
4115 : ///
4116 : /// TODO: add perf tracing.
4117 : /// TODO: add timing and metrics.
4118 : /// TODO: add logging.
4119 : #[derive(Clone)]
4120 : struct ObservabilityLayer;
4121 :
4122 : impl<S: tonic::server::NamedService> tower::Layer<S> for ObservabilityLayer {
4123 : type Service = ObservabilityLayerService<S>;
4124 :
4125 0 : fn layer(&self, inner: S) -> Self::Service {
4126 0 : Self::Service { inner }
4127 0 : }
4128 : }
4129 :
4130 : #[derive(Clone)]
4131 : struct ObservabilityLayerService<S> {
4132 : inner: S,
4133 : }
4134 :
4135 : #[derive(Clone, Copy)]
4136 : struct ReceivedAt(Instant);
4137 :
4138 : impl<S: tonic::server::NamedService> tonic::server::NamedService for ObservabilityLayerService<S> {
4139 : const NAME: &'static str = S::NAME; // propagate inner service name
4140 : }
4141 :
4142 : impl<S, Req, Resp> tower::Service<http::Request<Req>> for ObservabilityLayerService<S>
4143 : where
4144 : S: tower::Service<http::Request<Req>, Response = http::Response<Resp>> + Send,
4145 : S::Future: Send + 'static,
4146 : {
4147 : type Response = S::Response;
4148 : type Error = S::Error;
4149 : type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
4150 :
4151 0 : fn call(&mut self, mut req: http::Request<Req>) -> Self::Future {
4152 : // Record the request start time as a request extension.
4153 : //
4154 : // TODO: we should start a timer here instead, but it currently requires a timeline handle
4155 : // and SmgrQueryType, which we don't have yet. Refactor it to provide it later.
4156 0 : req.extensions_mut().insert(ReceivedAt(Instant::now()));
4157 :
4158 : // Extract the peer address and gRPC method.
4159 0 : let peer = req
4160 0 : .extensions()
4161 0 : .get::<TcpConnectInfo>()
4162 0 : .and_then(|info| info.remote_addr())
4163 0 : .map(|addr| addr.to_string())
4164 0 : .unwrap_or_default();
4165 :
4166 0 : let method = req
4167 0 : .uri()
4168 0 : .path()
4169 0 : .split('/')
4170 0 : .nth(2)
4171 0 : .unwrap_or(req.uri().path())
4172 0 : .to_string();
4173 :
4174 : // Create a basic tracing span.
4175 : //
4176 : // Enter the span for the current thread and instrument the future. It is not sufficient to
4177 : // only instrument the future, since it only takes effect after the future is returned and
4178 : // polled, not when the inner service is called below (e.g. during interceptor execution).
4179 0 : let span = info_span!(
4180 : "grpc:pageservice",
4181 : // These will be populated by TenantMetadataInterceptor.
4182 : tenant_id = field::Empty,
4183 : timeline_id = field::Empty,
4184 : shard_id = field::Empty,
4185 : // NB: empty fields must be listed first above. Otherwise, the field names will be
4186 : // clobbered when the empty fields are populated. They will be output last regardless.
4187 : %peer,
4188 : %method,
4189 : );
4190 0 : let _guard = span.enter();
4191 :
4192 : // Construct a future for calling the inner service, but don't await it. This avoids having
4193 : // to clone the inner service into the future below.
4194 0 : let call = self.inner.call(req);
4195 :
4196 0 : async move {
4197 : // Await the inner service call.
4198 0 : let result = call.await;
4199 :
4200 : // Log gRPC error statuses. This won't include request info from handler spans, but it
4201 : // will catch all errors (even those emitted before handler spans are constructed). Only
4202 : // unary request errors are logged here, not streaming response errors.
4203 0 : if let Ok(ref resp) = result
4204 0 : && let Some(status) = tonic::Status::from_header_map(resp.headers())
4205 0 : && status.code() != tonic::Code::Ok
4206 : {
4207 : // TODO: it would be nice if we could propagate the handler span's request fields
4208 : // here. This could e.g. be done by attaching the request fields to
4209 : // tonic::Status::metadata via a proc macro.
4210 0 : warn!(
4211 0 : "request failed with {:?}: {}",
4212 0 : status.code(),
4213 0 : status.message()
4214 : );
4215 0 : }
4216 :
4217 0 : result
4218 0 : }
4219 0 : .instrument(span.clone())
4220 0 : .boxed()
4221 0 : }
4222 :
4223 0 : fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
4224 0 : self.inner.poll_ready(cx)
4225 0 : }
4226 : }
4227 :
4228 : /// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
4229 : /// TenantTimelineId and ShardIndex.
4230 : #[derive(Clone)]
4231 : struct TenantMetadataInterceptor;
4232 :
4233 : impl tonic::service::Interceptor for TenantMetadataInterceptor {
4234 0 : fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
4235 : // Decode the tenant ID.
4236 0 : let tenant_id = req
4237 0 : .metadata()
4238 0 : .get("neon-tenant-id")
4239 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
4240 0 : .to_str()
4241 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
4242 0 : let tenant_id = TenantId::from_str(tenant_id)
4243 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
4244 :
4245 : // Decode the timeline ID.
4246 0 : let timeline_id = req
4247 0 : .metadata()
4248 0 : .get("neon-timeline-id")
4249 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
4250 0 : .to_str()
4251 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
4252 0 : let timeline_id = TimelineId::from_str(timeline_id)
4253 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
4254 :
4255 : // Decode the shard ID.
4256 0 : let shard_id = req
4257 0 : .metadata()
4258 0 : .get("neon-shard-id")
4259 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
4260 0 : .to_str()
4261 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
4262 0 : let shard_id = ShardIndex::from_str(shard_id)
4263 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
4264 :
4265 : // Stash them in the request.
4266 0 : let extensions = req.extensions_mut();
4267 0 : extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
4268 0 : extensions.insert(shard_id);
4269 :
4270 : // Decorate the tracing span.
4271 0 : span_record!(%tenant_id, %timeline_id, %shard_id);
4272 :
4273 0 : Ok(req)
4274 0 : }
4275 : }
4276 :
4277 : /// Authenticates gRPC page service requests.
4278 : #[derive(Clone)]
4279 : struct TenantAuthInterceptor {
4280 : auth: Option<Arc<SwappableJwtAuth>>,
4281 : }
4282 :
4283 : impl TenantAuthInterceptor {
4284 0 : fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
4285 0 : Self { auth }
4286 0 : }
4287 : }
4288 :
4289 : impl tonic::service::Interceptor for TenantAuthInterceptor {
4290 0 : fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
4291 : // Do nothing if auth is disabled.
4292 0 : let Some(auth) = self.auth.as_ref() else {
4293 0 : return Ok(req);
4294 : };
4295 :
4296 : // Fetch the tenant ID from the request extensions (set by TenantMetadataInterceptor).
4297 0 : let TenantTimelineId { tenant_id, .. } = *extract::<TenantTimelineId>(&req);
4298 :
4299 : // Fetch and decode the JWT token.
4300 0 : let jwt = req
4301 0 : .metadata()
4302 0 : .get("authorization")
4303 0 : .ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
4304 0 : .to_str()
4305 0 : .map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
4306 0 : .strip_prefix("Bearer ")
4307 0 : .ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
4308 0 : .trim();
4309 0 : let jwtdata: TokenData<Claims> = auth
4310 0 : .decode(jwt)
4311 0 : .map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
4312 0 : let claims = jwtdata.claims;
4313 :
4314 : // Check if the token is valid for this tenant.
4315 0 : check_permission(&claims, Some(tenant_id))
4316 0 : .map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
4317 :
4318 : // TODO: consider stashing the claims in the request extensions, if needed.
4319 :
4320 0 : Ok(req)
4321 0 : }
4322 : }
4323 :
4324 : /// Extracts the given type from the request extensions, or panics if it is missing.
4325 0 : fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
4326 0 : extract_from(req.extensions())
4327 0 : }
4328 :
4329 : /// Extract the given type from the request extensions, or panics if it is missing. This variant
4330 : /// can extract both from a tonic::Request and http::Request.
4331 0 : fn extract_from<T: Send + Sync + 'static>(ext: &http::Extensions) -> &T {
4332 0 : let Some(value) = ext.get::<T>() else {
4333 0 : let name = std::any::type_name::<T>();
4334 0 : panic!("extension {name} should be set by middleware");
4335 : };
4336 0 : value
4337 0 : }
4338 :
4339 : #[derive(Debug, thiserror::Error)]
4340 : pub(crate) enum GetActiveTimelineError {
4341 : #[error(transparent)]
4342 : Tenant(GetActiveTenantError),
4343 : #[error(transparent)]
4344 : Timeline(#[from] GetTimelineError),
4345 : }
4346 :
4347 : impl From<GetActiveTimelineError> for QueryError {
4348 0 : fn from(e: GetActiveTimelineError) -> Self {
4349 0 : match e {
4350 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
4351 0 : GetActiveTimelineError::Tenant(e) => e.into(),
4352 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
4353 : }
4354 0 : }
4355 : }
4356 :
4357 : impl From<GetActiveTimelineError> for tonic::Status {
4358 0 : fn from(err: GetActiveTimelineError) -> Self {
4359 0 : let message = err.to_string();
4360 0 : let code = match err {
4361 0 : GetActiveTimelineError::Tenant(err) => tonic::Status::from(err).code(),
4362 0 : GetActiveTimelineError::Timeline(err) => tonic::Status::from(err).code(),
4363 : };
4364 0 : tonic::Status::new(code, message)
4365 0 : }
4366 : }
4367 :
4368 : impl From<GetTimelineError> for tonic::Status {
4369 0 : fn from(err: GetTimelineError) -> Self {
4370 : use tonic::Code;
4371 0 : let code = match &err {
4372 0 : GetTimelineError::NotFound { .. } => Code::NotFound,
4373 0 : GetTimelineError::NotActive { .. } => Code::Unavailable,
4374 0 : GetTimelineError::ShuttingDown => Code::Unavailable,
4375 : };
4376 0 : tonic::Status::new(code, err.to_string())
4377 0 : }
4378 : }
4379 :
4380 : impl From<GetActiveTenantError> for QueryError {
4381 0 : fn from(e: GetActiveTenantError) -> Self {
4382 0 : match e {
4383 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
4384 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
4385 0 : ),
4386 : GetActiveTenantError::Cancelled
4387 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
4388 0 : QueryError::Shutdown
4389 : }
4390 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
4391 0 : e => QueryError::Other(anyhow::anyhow!(e)),
4392 : }
4393 0 : }
4394 : }
4395 :
4396 : impl From<GetActiveTenantError> for tonic::Status {
4397 0 : fn from(err: GetActiveTenantError) -> Self {
4398 : use tonic::Code;
4399 0 : let code = match &err {
4400 0 : GetActiveTenantError::Broken(_) => Code::Internal,
4401 0 : GetActiveTenantError::Cancelled => Code::Unavailable,
4402 0 : GetActiveTenantError::NotFound(_) => Code::NotFound,
4403 0 : GetActiveTenantError::SwitchedTenant => Code::Unavailable,
4404 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
4405 0 : GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
4406 : };
4407 0 : tonic::Status::new(code, err.to_string())
4408 0 : }
4409 : }
4410 :
4411 : impl From<HandleUpgradeError> for QueryError {
4412 0 : fn from(e: HandleUpgradeError) -> Self {
4413 0 : match e {
4414 0 : HandleUpgradeError::ShutDown => QueryError::Shutdown,
4415 : }
4416 0 : }
4417 : }
4418 :
4419 : impl From<HandleUpgradeError> for tonic::Status {
4420 0 : fn from(err: HandleUpgradeError) -> Self {
4421 0 : match err {
4422 0 : HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
4423 : }
4424 0 : }
4425 : }
4426 :
4427 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
4428 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
4429 0 : tracing::Span::current().record(
4430 0 : "shard_id",
4431 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
4432 : );
4433 0 : debug_assert_current_span_has_tenant_and_timeline_id();
4434 0 : }
4435 :
4436 : struct WaitedForLsn(Lsn);
4437 : impl From<WaitedForLsn> for Lsn {
4438 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
4439 0 : lsn
4440 0 : }
4441 : }
4442 :
4443 : #[cfg(test)]
4444 : mod tests {
4445 : use utils::shard::ShardCount;
4446 :
4447 : use super::*;
4448 :
4449 : #[test]
4450 1 : fn pageservice_cmd_parse() {
4451 1 : let tenant_id = TenantId::generate();
4452 1 : let timeline_id = TimelineId::generate();
4453 1 : let cmd =
4454 1 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
4455 1 : assert_eq!(
4456 : cmd,
4457 1 : PageServiceCmd::PageStream(PageStreamCmd {
4458 1 : tenant_id,
4459 1 : timeline_id,
4460 1 : protocol_version: PagestreamProtocolVersion::V2,
4461 1 : })
4462 : );
4463 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
4464 1 : assert_eq!(
4465 : cmd,
4466 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4467 1 : tenant_id,
4468 1 : timeline_id,
4469 1 : lsn: None,
4470 1 : gzip: false,
4471 1 : replica: false
4472 1 : })
4473 : );
4474 1 : let cmd =
4475 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
4476 1 : assert_eq!(
4477 : cmd,
4478 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4479 1 : tenant_id,
4480 1 : timeline_id,
4481 1 : lsn: None,
4482 1 : gzip: true,
4483 1 : replica: false
4484 1 : })
4485 : );
4486 1 : let cmd =
4487 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
4488 1 : assert_eq!(
4489 : cmd,
4490 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4491 1 : tenant_id,
4492 1 : timeline_id,
4493 1 : lsn: None,
4494 1 : gzip: false,
4495 1 : replica: false
4496 1 : })
4497 : );
4498 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
4499 1 : .unwrap();
4500 1 : assert_eq!(
4501 : cmd,
4502 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4503 1 : tenant_id,
4504 1 : timeline_id,
4505 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4506 1 : gzip: false,
4507 1 : replica: false
4508 1 : })
4509 : );
4510 1 : let cmd = PageServiceCmd::parse(&format!(
4511 1 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
4512 1 : ))
4513 1 : .unwrap();
4514 1 : assert_eq!(
4515 : cmd,
4516 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4517 1 : tenant_id,
4518 1 : timeline_id,
4519 1 : lsn: None,
4520 1 : gzip: true,
4521 1 : replica: true
4522 1 : })
4523 : );
4524 1 : let cmd = PageServiceCmd::parse(&format!(
4525 1 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
4526 1 : ))
4527 1 : .unwrap();
4528 1 : assert_eq!(
4529 : cmd,
4530 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
4531 1 : tenant_id,
4532 1 : timeline_id,
4533 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4534 1 : gzip: true,
4535 1 : replica: true
4536 1 : })
4537 : );
4538 1 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
4539 1 : assert_eq!(
4540 : cmd,
4541 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4542 1 : tenant_id,
4543 1 : timeline_id,
4544 1 : lsn: None,
4545 1 : prev_lsn: None
4546 1 : })
4547 : );
4548 1 : let cmd = PageServiceCmd::parse(&format!(
4549 1 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
4550 1 : ))
4551 1 : .unwrap();
4552 1 : assert_eq!(
4553 : cmd,
4554 1 : PageServiceCmd::FullBackup(FullBackupCmd {
4555 1 : tenant_id,
4556 1 : timeline_id,
4557 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
4558 1 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
4559 1 : })
4560 : );
4561 1 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
4562 1 : let cmd = PageServiceCmd::parse(&format!(
4563 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4564 1 : ))
4565 1 : .unwrap();
4566 1 : assert_eq!(
4567 : cmd,
4568 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4569 1 : tenant_shard_id,
4570 1 : timeline_id,
4571 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4572 1 : })
4573 : );
4574 1 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
4575 1 : let cmd = PageServiceCmd::parse(&format!(
4576 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
4577 1 : ))
4578 1 : .unwrap();
4579 1 : assert_eq!(
4580 : cmd,
4581 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
4582 1 : tenant_shard_id,
4583 1 : timeline_id,
4584 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
4585 1 : })
4586 : );
4587 1 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
4588 1 : assert_eq!(cmd, PageServiceCmd::Set);
4589 1 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
4590 1 : assert_eq!(cmd, PageServiceCmd::Set);
4591 1 : }
4592 :
4593 : #[test]
4594 1 : fn pageservice_cmd_err_handling() {
4595 1 : let tenant_id = TenantId::generate();
4596 1 : let timeline_id = TimelineId::generate();
4597 1 : let cmd = PageServiceCmd::parse("unknown_command");
4598 1 : assert!(cmd.is_err());
4599 1 : let cmd = PageServiceCmd::parse("pagestream_v2");
4600 1 : assert!(cmd.is_err());
4601 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
4602 1 : assert!(cmd.is_err());
4603 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
4604 1 : assert!(cmd.is_err());
4605 1 : let cmd = PageServiceCmd::parse(&format!(
4606 1 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
4607 1 : ));
4608 1 : assert!(cmd.is_err());
4609 1 : let cmd = PageServiceCmd::parse(&format!(
4610 1 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
4611 1 : ));
4612 1 : assert!(cmd.is_err());
4613 1 : let cmd = PageServiceCmd::parse(&format!(
4614 1 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
4615 1 : ));
4616 1 : assert!(cmd.is_err());
4617 1 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
4618 1 : assert!(cmd.is_err());
4619 1 : }
4620 :
4621 : #[test]
4622 1 : fn test_parse_options() {
4623 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
4624 1 : assert!(!has_error);
4625 1 : assert_eq!(
4626 : config,
4627 1 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
4628 : );
4629 :
4630 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
4631 1 : assert!(!has_error);
4632 1 : assert_eq!(
4633 : config,
4634 1 : vec![
4635 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4636 1 : ("foo".to_string(), "bar".to_string()),
4637 : ]
4638 : );
4639 :
4640 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
4641 1 : assert!(!has_error);
4642 1 : assert_eq!(
4643 : config,
4644 1 : vec![
4645 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
4646 1 : ("foo".to_string(), "bar".to_string()),
4647 : ]
4648 : );
4649 :
4650 1 : let (_, has_error) = parse_options("-c");
4651 1 : assert!(has_error);
4652 :
4653 1 : let (_, has_error) = parse_options("-c foo=bar -c -c");
4654 1 : assert!(has_error);
4655 :
4656 1 : let (_, has_error) = parse_options(" ");
4657 1 : assert!(!has_error);
4658 :
4659 1 : let (_, has_error) = parse_options(" -c neon.compute_mode");
4660 1 : assert!(has_error);
4661 1 : }
4662 : }
|