Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::borrow::Cow;
5 : use std::num::NonZeroUsize;
6 : use std::os::fd::AsRawFd;
7 : use std::pin::Pin;
8 : use std::str::FromStr;
9 : use std::sync::Arc;
10 : use std::time::{Duration, Instant, SystemTime};
11 : use std::{io, str};
12 :
13 : use anyhow::{Context, bail};
14 : use async_compression::tokio::write::GzipEncoder;
15 : use bytes::Buf;
16 : use futures::{FutureExt, Stream};
17 : use itertools::Itertools;
18 : use jsonwebtoken::TokenData;
19 : use once_cell::sync::OnceCell;
20 : use pageserver_api::config::{
21 : GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
22 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
23 : };
24 : use pageserver_api::key::rel_block_to_key;
25 : use pageserver_api::models::{
26 : self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
27 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
28 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
29 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
30 : PagestreamProtocolVersion, PagestreamRequest, TenantState,
31 : };
32 : use pageserver_api::reltag::SlruKind;
33 : use pageserver_api::shard::TenantShardId;
34 : use pageserver_page_api::proto;
35 : use postgres_backend::{
36 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
37 : };
38 : use postgres_ffi::BLCKSZ;
39 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
40 : use pq_proto::framed::ConnectionError;
41 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
42 : use strum_macros::IntoStaticStr;
43 : use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
44 : use tokio::task::JoinHandle;
45 : use tokio_util::sync::CancellationToken;
46 : use tonic::service::Interceptor as _;
47 : use tracing::*;
48 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
49 : use utils::failpoint_support;
50 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
51 : use utils::logging::log_slow;
52 : use utils::lsn::Lsn;
53 : use utils::shard::ShardIndex;
54 : use utils::simple_rcu::RcuReadGuard;
55 : use utils::sync::gate::{Gate, GateGuard};
56 : use utils::sync::spsc_fold;
57 :
58 : use crate::auth::check_permission;
59 : use crate::basebackup::{self, BasebackupError};
60 : use crate::basebackup_cache::BasebackupCache;
61 : use crate::config::PageServerConf;
62 : use crate::context::{
63 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
64 : };
65 : use crate::metrics::{
66 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
67 : SmgrOpTimer, TimelineMetrics,
68 : };
69 : use crate::pgdatadir_mapping::{LsnRange, Version};
70 : use crate::span::{
71 : debug_assert_current_span_has_tenant_and_timeline_id,
72 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
73 : };
74 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
75 : use crate::tenant::mgr::{
76 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
77 : };
78 : use crate::tenant::storage_layer::IoConcurrency;
79 : use crate::tenant::timeline::{self, WaitLsnError};
80 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
81 : use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
82 :
83 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
84 : /// is not yet in state [`TenantState::Active`].
85 : ///
86 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
87 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
88 :
89 : /// Threshold at which to log slow GetPage requests.
90 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
91 :
92 : /// The idle time before sending TCP keepalive probes for gRPC connections. The
93 : /// interval and timeout between each probe is configured via sysctl. This
94 : /// allows detecting dead connections sooner.
95 : const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
96 :
97 : /// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
98 : /// algorithm, which can cause latency spikes for small messages.
99 : const GRPC_TCP_NODELAY: bool = true;
100 :
101 : /// The interval between HTTP2 keepalive pings. This allows shutting down server
102 : /// tasks when clients are unresponsive.
103 : const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
104 :
105 : /// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
106 : const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
107 :
108 : /// Number of concurrent gRPC streams per TCP connection. We expect something
109 : /// like 8 GetPage streams per connections, plus any unary requests.
110 : const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
111 :
112 : ///////////////////////////////////////////////////////////////////////////////
113 :
114 : pub struct Listener {
115 : cancel: CancellationToken,
116 : /// Cancel the listener task through `listen_cancel` to shut down the listener
117 : /// and get a handle on the existing connections.
118 : task: JoinHandle<Connections>,
119 : }
120 :
121 : pub struct Connections {
122 : cancel: CancellationToken,
123 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
124 : gate: Gate,
125 : }
126 :
127 0 : pub fn spawn(
128 0 : conf: &'static PageServerConf,
129 0 : tenant_manager: Arc<TenantManager>,
130 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
131 0 : perf_trace_dispatch: Option<Dispatch>,
132 0 : tcp_listener: tokio::net::TcpListener,
133 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
134 0 : basebackup_cache: Arc<BasebackupCache>,
135 0 : ) -> Listener {
136 0 : let cancel = CancellationToken::new();
137 0 : let libpq_ctx = RequestContext::todo_child(
138 0 : TaskKind::LibpqEndpointListener,
139 0 : // listener task shouldn't need to download anything. (We will
140 0 : // create a separate sub-contexts for each connection, with their
141 0 : // own download behavior. This context is used only to listen and
142 0 : // accept connections.)
143 0 : DownloadBehavior::Error,
144 0 : );
145 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
146 0 : "libpq listener",
147 0 : libpq_listener_main(
148 0 : conf,
149 0 : tenant_manager,
150 0 : pg_auth,
151 0 : perf_trace_dispatch,
152 0 : tcp_listener,
153 0 : conf.pg_auth_type,
154 0 : tls_config,
155 0 : conf.page_service_pipelining.clone(),
156 0 : basebackup_cache,
157 0 : libpq_ctx,
158 0 : cancel.clone(),
159 0 : )
160 0 : .map(anyhow::Ok),
161 0 : ));
162 0 :
163 0 : Listener { cancel, task }
164 0 : }
165 :
166 : /// Spawns a gRPC server for the page service.
167 : ///
168 : /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
169 : /// need to reimplement the TCP+TLS accept loop ourselves.
170 0 : pub fn spawn_grpc(
171 0 : conf: &'static PageServerConf,
172 0 : tenant_manager: Arc<TenantManager>,
173 0 : auth: Option<Arc<SwappableJwtAuth>>,
174 0 : perf_trace_dispatch: Option<Dispatch>,
175 0 : listener: std::net::TcpListener,
176 0 : basebackup_cache: Arc<BasebackupCache>,
177 0 : ) -> anyhow::Result<CancellableTask> {
178 0 : let cancel = CancellationToken::new();
179 0 : let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
180 0 : .download_behavior(DownloadBehavior::Download)
181 0 : .perf_span_dispatch(perf_trace_dispatch)
182 0 : .detached_child();
183 0 : let gate = Gate::default();
184 :
185 : // Set up the TCP socket. We take a preconfigured TcpListener to bind the
186 : // port early during startup.
187 0 : let incoming = {
188 0 : let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
189 0 : listener.set_nonblocking(true)?;
190 0 : tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
191 0 : .with_nodelay(Some(GRPC_TCP_NODELAY))
192 0 : .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
193 0 : };
194 0 :
195 0 : // Set up the gRPC server.
196 0 : //
197 0 : // TODO: consider tuning window sizes.
198 0 : // TODO: wire up tracing.
199 0 : let mut server = tonic::transport::Server::builder()
200 0 : .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
201 0 : .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
202 0 : .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
203 0 :
204 0 : // Main page service.
205 0 : let page_service_handler = PageServerHandler::new(
206 0 : tenant_manager,
207 0 : auth.clone(),
208 0 : PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
209 0 : conf.get_vectored_concurrent_io,
210 0 : ConnectionPerfSpanFields::default(),
211 0 : basebackup_cache,
212 0 : ctx,
213 0 : cancel.clone(),
214 0 : gate.enter().expect("just created"),
215 0 : );
216 0 :
217 0 : let mut tenant_interceptor = TenantMetadataInterceptor;
218 0 : let mut auth_interceptor = TenantAuthInterceptor::new(auth);
219 0 : let interceptors = move |mut req: tonic::Request<()>| {
220 0 : req = tenant_interceptor.call(req)?;
221 0 : req = auth_interceptor.call(req)?;
222 0 : Ok(req)
223 0 : };
224 :
225 0 : let page_service =
226 0 : proto::PageServiceServer::with_interceptor(page_service_handler, interceptors);
227 0 : let server = server.add_service(page_service);
228 :
229 : // Reflection service for use with e.g. grpcurl.
230 0 : let reflection_service = tonic_reflection::server::Builder::configure()
231 0 : .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
232 0 : .build_v1()?;
233 0 : let server = server.add_service(reflection_service);
234 0 :
235 0 : // Spawn server task.
236 0 : let task_cancel = cancel.clone();
237 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
238 0 : "grpc listener",
239 0 : async move {
240 0 : let result = server
241 0 : .serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
242 0 : .await;
243 0 : if result.is_ok() {
244 : // TODO: revisit shutdown logic once page service is implemented.
245 0 : gate.close().await;
246 0 : }
247 0 : result
248 0 : },
249 0 : ));
250 0 :
251 0 : Ok(CancellableTask { task, cancel })
252 0 : }
253 :
254 : impl Listener {
255 0 : pub async fn stop_accepting(self) -> Connections {
256 0 : self.cancel.cancel();
257 0 : self.task
258 0 : .await
259 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
260 0 : }
261 : }
262 : impl Connections {
263 0 : pub(crate) async fn shutdown(self) {
264 0 : let Self {
265 0 : cancel,
266 0 : mut tasks,
267 0 : gate,
268 0 : } = self;
269 0 : cancel.cancel();
270 0 : while let Some(res) = tasks.join_next().await {
271 0 : Self::handle_connection_completion(res);
272 0 : }
273 0 : gate.close().await;
274 0 : }
275 :
276 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
277 0 : match res {
278 0 : Ok(Ok(())) => {}
279 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
280 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
281 : }
282 0 : }
283 : }
284 :
285 : ///
286 : /// Main loop of the page service.
287 : ///
288 : /// Listens for connections, and launches a new handler task for each.
289 : ///
290 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
291 : /// open connections.
292 : ///
293 : #[allow(clippy::too_many_arguments)]
294 0 : pub async fn libpq_listener_main(
295 0 : conf: &'static PageServerConf,
296 0 : tenant_manager: Arc<TenantManager>,
297 0 : auth: Option<Arc<SwappableJwtAuth>>,
298 0 : perf_trace_dispatch: Option<Dispatch>,
299 0 : listener: tokio::net::TcpListener,
300 0 : auth_type: AuthType,
301 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
302 0 : pipelining_config: PageServicePipeliningConfig,
303 0 : basebackup_cache: Arc<BasebackupCache>,
304 0 : listener_ctx: RequestContext,
305 0 : listener_cancel: CancellationToken,
306 0 : ) -> Connections {
307 0 : let connections_cancel = CancellationToken::new();
308 0 : let connections_gate = Gate::default();
309 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
310 :
311 : loop {
312 0 : let gate_guard = match connections_gate.enter() {
313 0 : Ok(guard) => guard,
314 0 : Err(_) => break,
315 : };
316 :
317 0 : let accepted = tokio::select! {
318 : biased;
319 0 : _ = listener_cancel.cancelled() => break,
320 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
321 0 : let res = next.expect("we dont poll while empty");
322 0 : Connections::handle_connection_completion(res);
323 0 : continue;
324 : }
325 0 : accepted = listener.accept() => accepted,
326 0 : };
327 0 :
328 0 : match accepted {
329 0 : Ok((socket, peer_addr)) => {
330 0 : // Connection established. Spawn a new task to handle it.
331 0 : debug!("accepted connection from {}", peer_addr);
332 0 : let local_auth = auth.clone();
333 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
334 0 : .task_kind(TaskKind::PageRequestHandler)
335 0 : .download_behavior(DownloadBehavior::Download)
336 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
337 0 : .detached_child();
338 0 :
339 0 : connection_handler_tasks.spawn(page_service_conn_main(
340 0 : conf,
341 0 : tenant_manager.clone(),
342 0 : local_auth,
343 0 : socket,
344 0 : auth_type,
345 0 : tls_config.clone(),
346 0 : pipelining_config.clone(),
347 0 : Arc::clone(&basebackup_cache),
348 0 : connection_ctx,
349 0 : connections_cancel.child_token(),
350 0 : gate_guard,
351 0 : ));
352 : }
353 0 : Err(err) => {
354 0 : // accept() failed. Log the error, and loop back to retry on next connection.
355 0 : error!("accept() failed: {:?}", err);
356 : }
357 : }
358 : }
359 :
360 0 : debug!("page_service listener loop terminated");
361 :
362 0 : Connections {
363 0 : cancel: connections_cancel,
364 0 : tasks: connection_handler_tasks,
365 0 : gate: connections_gate,
366 0 : }
367 0 : }
368 :
369 : type ConnectionHandlerResult = anyhow::Result<()>;
370 :
371 : /// Perf root spans start at the per-request level, after shard routing.
372 : /// This struct carries connection-level information to the root perf span definition.
373 : #[derive(Clone, Default)]
374 : struct ConnectionPerfSpanFields {
375 : peer_addr: String,
376 : application_name: Option<String>,
377 : compute_mode: Option<String>,
378 : }
379 :
380 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
381 : #[allow(clippy::too_many_arguments)]
382 : async fn page_service_conn_main(
383 : conf: &'static PageServerConf,
384 : tenant_manager: Arc<TenantManager>,
385 : auth: Option<Arc<SwappableJwtAuth>>,
386 : socket: tokio::net::TcpStream,
387 : auth_type: AuthType,
388 : tls_config: Option<Arc<rustls::ServerConfig>>,
389 : pipelining_config: PageServicePipeliningConfig,
390 : basebackup_cache: Arc<BasebackupCache>,
391 : connection_ctx: RequestContext,
392 : cancel: CancellationToken,
393 : gate_guard: GateGuard,
394 : ) -> ConnectionHandlerResult {
395 : let _guard = LIVE_CONNECTIONS
396 : .with_label_values(&["page_service"])
397 : .guard();
398 :
399 : socket
400 : .set_nodelay(true)
401 : .context("could not set TCP_NODELAY")?;
402 :
403 : let socket_fd = socket.as_raw_fd();
404 :
405 : let peer_addr = socket.peer_addr().context("get peer address")?;
406 :
407 : let perf_span_fields = ConnectionPerfSpanFields {
408 : peer_addr: peer_addr.to_string(),
409 : application_name: None, // filled in later
410 : compute_mode: None, // filled in later
411 : };
412 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
413 :
414 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
415 : // - long enough for most valid compute connections
416 : // - less than infinite to stop us from "leaking" connections to long-gone computes
417 : //
418 : // no write timeout is used, because the kernel is assumed to error writes after some time.
419 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
420 :
421 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
422 0 : let socket_timeout_ms = (|| {
423 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
424 : // Exponential distribution for simulating
425 : // poor network conditions, expect about avg_timeout_ms to be around 15
426 : // in tests
427 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
428 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
429 0 : let u = rand::random::<f32>();
430 0 : ((1.0 - u).ln() / (-avg)) as u64
431 : } else {
432 0 : default_timeout_ms
433 : }
434 0 : });
435 0 : default_timeout_ms
436 : })();
437 :
438 : // A timeout here does not mean the client died, it can happen if it's just idle for
439 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
440 : // they reconnect.
441 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
442 : let socket = Box::pin(socket);
443 :
444 : fail::fail_point!("ps::connection-start::pre-login");
445 :
446 : // XXX: pgbackend.run() should take the connection_ctx,
447 : // and create a child per-query context when it invokes process_query.
448 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
449 : // and create the per-query context in process_query ourselves.
450 : let mut conn_handler = PageServerHandler::new(
451 : tenant_manager,
452 : auth,
453 : pipelining_config,
454 : conf.get_vectored_concurrent_io,
455 : perf_span_fields,
456 : basebackup_cache,
457 : connection_ctx,
458 : cancel.clone(),
459 : gate_guard,
460 : );
461 : let pgbackend =
462 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
463 :
464 : match pgbackend.run(&mut conn_handler, &cancel).await {
465 : Ok(()) => {
466 : // we've been requested to shut down
467 : Ok(())
468 : }
469 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
470 : if is_expected_io_error(&io_error) {
471 : info!("Postgres client disconnected ({io_error})");
472 : Ok(())
473 : } else {
474 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
475 : Err(io_error).context(format!(
476 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
477 : tenant_id, peer_addr
478 : ))
479 : }
480 : }
481 : other => {
482 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
483 : other.context(format!(
484 : "Postgres query error for tenant_id={:?} client peer_addr={}",
485 : tenant_id, peer_addr
486 : ))
487 : }
488 : }
489 : }
490 :
491 : /// Page service connection handler.
492 : ///
493 : /// TODO: for gRPC, this will be shared by all requests from all connections.
494 : /// Decompose it into global state and per-connection/request state, and make
495 : /// libpq-specific options (e.g. pipelining) separate.
496 : struct PageServerHandler {
497 : auth: Option<Arc<SwappableJwtAuth>>,
498 : claims: Option<Claims>,
499 :
500 : /// The context created for the lifetime of the connection
501 : /// services by this PageServerHandler.
502 : /// For each query received over the connection,
503 : /// `process_query` creates a child context from this one.
504 : connection_ctx: RequestContext,
505 :
506 : perf_span_fields: ConnectionPerfSpanFields,
507 :
508 : cancel: CancellationToken,
509 :
510 : /// None only while pagestream protocol is being processed.
511 : timeline_handles: Option<TimelineHandles>,
512 :
513 : pipelining_config: PageServicePipeliningConfig,
514 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
515 :
516 : basebackup_cache: Arc<BasebackupCache>,
517 :
518 : gate_guard: GateGuard,
519 : }
520 :
521 : struct TimelineHandles {
522 : wrapper: TenantManagerWrapper,
523 : /// Note on size: the typical size of this map is 1. The largest size we expect
524 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
525 : /// or the ratio used when splitting shards (i.e. how many children created from one)
526 : /// parent shard, where a "large" number might be ~8.
527 : handles: timeline::handle::Cache<TenantManagerTypes>,
528 : }
529 :
530 : impl TimelineHandles {
531 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
532 0 : Self {
533 0 : wrapper: TenantManagerWrapper {
534 0 : tenant_manager,
535 0 : tenant_id: OnceCell::new(),
536 0 : },
537 0 : handles: Default::default(),
538 0 : }
539 0 : }
540 0 : async fn get(
541 0 : &mut self,
542 0 : tenant_id: TenantId,
543 0 : timeline_id: TimelineId,
544 0 : shard_selector: ShardSelector,
545 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
546 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
547 0 : return Err(GetActiveTimelineError::Tenant(
548 0 : GetActiveTenantError::SwitchedTenant,
549 0 : ));
550 0 : }
551 0 : self.handles
552 0 : .get(timeline_id, shard_selector, &self.wrapper)
553 0 : .await
554 0 : .map_err(|e| match e {
555 0 : timeline::handle::GetError::TenantManager(e) => e,
556 : timeline::handle::GetError::PerTimelineStateShutDown => {
557 0 : trace!("per-timeline state shut down");
558 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
559 : }
560 0 : })
561 0 : }
562 :
563 0 : fn tenant_id(&self) -> Option<TenantId> {
564 0 : self.wrapper.tenant_id.get().copied()
565 0 : }
566 : }
567 :
568 : pub(crate) struct TenantManagerWrapper {
569 : tenant_manager: Arc<TenantManager>,
570 : // We do not support switching tenant_id on a connection at this point.
571 : // We can can add support for this later if needed without changing
572 : // the protocol.
573 : tenant_id: once_cell::sync::OnceCell<TenantId>,
574 : }
575 :
576 : #[derive(Debug)]
577 : pub(crate) struct TenantManagerTypes;
578 :
579 : impl timeline::handle::Types for TenantManagerTypes {
580 : type TenantManagerError = GetActiveTimelineError;
581 : type TenantManager = TenantManagerWrapper;
582 : type Timeline = TenantManagerCacheItem;
583 : }
584 :
585 : pub(crate) struct TenantManagerCacheItem {
586 : pub(crate) timeline: Arc<Timeline>,
587 : // allow() for cheap propagation through RequestContext inside a task
588 : #[allow(clippy::redundant_allocation)]
589 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
590 : #[allow(dead_code)] // we store it to keep the gate open
591 : pub(crate) gate_guard: GateGuard,
592 : }
593 :
594 : impl std::ops::Deref for TenantManagerCacheItem {
595 : type Target = Arc<Timeline>;
596 0 : fn deref(&self) -> &Self::Target {
597 0 : &self.timeline
598 0 : }
599 : }
600 :
601 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
602 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
603 0 : Timeline::shard_timeline_id(&self.timeline)
604 0 : }
605 :
606 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
607 0 : &self.timeline.handles
608 0 : }
609 :
610 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
611 0 : Timeline::get_shard_identity(&self.timeline)
612 0 : }
613 : }
614 :
615 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
616 0 : async fn resolve(
617 0 : &self,
618 0 : timeline_id: TimelineId,
619 0 : shard_selector: ShardSelector,
620 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
621 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
622 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
623 0 : let wait_start = Instant::now();
624 0 : let deadline = wait_start + timeout;
625 0 : let tenant_shard = loop {
626 0 : let resolved = self
627 0 : .tenant_manager
628 0 : .resolve_attached_shard(tenant_id, shard_selector);
629 0 : match resolved {
630 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
631 : ShardResolveResult::NotFound => {
632 0 : return Err(GetActiveTimelineError::Tenant(
633 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
634 0 : ));
635 : }
636 0 : ShardResolveResult::InProgress(barrier) => {
637 0 : // We can't authoritatively answer right now: wait for InProgress state
638 0 : // to end, then try again
639 0 : tokio::select! {
640 0 : _ = barrier.wait() => {
641 0 : // The barrier completed: proceed around the loop to try looking up again
642 0 : },
643 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
644 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
645 0 : latest_state: None,
646 0 : wait_time: timeout,
647 0 : }));
648 : }
649 : }
650 : }
651 : };
652 : };
653 :
654 0 : tracing::debug!("Waiting for tenant to enter active state...");
655 0 : tenant_shard
656 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
657 0 : .await
658 0 : .map_err(GetActiveTimelineError::Tenant)?;
659 :
660 0 : let timeline = tenant_shard
661 0 : .get_timeline(timeline_id, true)
662 0 : .map_err(GetActiveTimelineError::Timeline)?;
663 :
664 0 : let gate_guard = match timeline.gate.enter() {
665 0 : Ok(guard) => guard,
666 : Err(_) => {
667 0 : return Err(GetActiveTimelineError::Timeline(
668 0 : GetTimelineError::ShuttingDown,
669 0 : ));
670 : }
671 : };
672 :
673 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
674 0 :
675 0 : Ok(TenantManagerCacheItem {
676 0 : timeline,
677 0 : metrics,
678 0 : gate_guard,
679 0 : })
680 0 : }
681 : }
682 :
683 : #[derive(thiserror::Error, Debug)]
684 : enum PageStreamError {
685 : /// We encountered an error that should prompt the client to reconnect:
686 : /// in practice this means we drop the connection without sending a response.
687 : #[error("Reconnect required: {0}")]
688 : Reconnect(Cow<'static, str>),
689 :
690 : /// We were instructed to shutdown while processing the query
691 : #[error("Shutting down")]
692 : Shutdown,
693 :
694 : /// Something went wrong reading a page: this likely indicates a pageserver bug
695 : #[error("Read error")]
696 : Read(#[source] PageReconstructError),
697 :
698 : /// Ran out of time waiting for an LSN
699 : #[error("LSN timeout: {0}")]
700 : LsnTimeout(WaitLsnError),
701 :
702 : /// The entity required to serve the request (tenant or timeline) is not found,
703 : /// or is not found in a suitable state to serve a request.
704 : #[error("Not found: {0}")]
705 : NotFound(Cow<'static, str>),
706 :
707 : /// Request asked for something that doesn't make sense, like an invalid LSN
708 : #[error("Bad request: {0}")]
709 : BadRequest(Cow<'static, str>),
710 : }
711 :
712 : impl From<PageReconstructError> for PageStreamError {
713 0 : fn from(value: PageReconstructError) -> Self {
714 0 : match value {
715 0 : PageReconstructError::Cancelled => Self::Shutdown,
716 0 : e => Self::Read(e),
717 : }
718 0 : }
719 : }
720 :
721 : impl From<GetActiveTimelineError> for PageStreamError {
722 0 : fn from(value: GetActiveTimelineError) -> Self {
723 0 : match value {
724 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
725 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
726 : TenantState::Stopping { .. },
727 : ))
728 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
729 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
730 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
731 : }
732 0 : }
733 : }
734 :
735 : impl From<WaitLsnError> for PageStreamError {
736 0 : fn from(value: WaitLsnError) -> Self {
737 0 : match value {
738 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
739 0 : WaitLsnError::Shutdown => Self::Shutdown,
740 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
741 : }
742 0 : }
743 : }
744 :
745 : impl From<WaitLsnError> for QueryError {
746 0 : fn from(value: WaitLsnError) -> Self {
747 0 : match value {
748 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
749 0 : WaitLsnError::Shutdown => Self::Shutdown,
750 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
751 : }
752 0 : }
753 : }
754 :
755 : #[derive(thiserror::Error, Debug)]
756 : struct BatchedPageStreamError {
757 : req: PagestreamRequest,
758 : err: PageStreamError,
759 : }
760 :
761 : impl std::fmt::Display for BatchedPageStreamError {
762 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
763 0 : self.err.fmt(f)
764 0 : }
765 : }
766 :
767 : struct BatchedGetPageRequest {
768 : req: PagestreamGetPageRequest,
769 : timer: SmgrOpTimer,
770 : lsn_range: LsnRange,
771 : ctx: RequestContext,
772 : }
773 :
774 : #[cfg(feature = "testing")]
775 : struct BatchedTestRequest {
776 : req: models::PagestreamTestRequest,
777 : timer: SmgrOpTimer,
778 : }
779 :
780 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
781 : /// so that we don't keep the [`Timeline::gate`] open while the batch
782 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
783 : #[derive(IntoStaticStr)]
784 : enum BatchedFeMessage {
785 : Exists {
786 : span: Span,
787 : timer: SmgrOpTimer,
788 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
789 : req: models::PagestreamExistsRequest,
790 : },
791 : Nblocks {
792 : span: Span,
793 : timer: SmgrOpTimer,
794 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
795 : req: models::PagestreamNblocksRequest,
796 : },
797 : GetPage {
798 : span: Span,
799 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
800 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
801 : batch_break_reason: GetPageBatchBreakReason,
802 : },
803 : DbSize {
804 : span: Span,
805 : timer: SmgrOpTimer,
806 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
807 : req: models::PagestreamDbSizeRequest,
808 : },
809 : GetSlruSegment {
810 : span: Span,
811 : timer: SmgrOpTimer,
812 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
813 : req: models::PagestreamGetSlruSegmentRequest,
814 : },
815 : #[cfg(feature = "testing")]
816 : Test {
817 : span: Span,
818 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
819 : requests: Vec<BatchedTestRequest>,
820 : },
821 : RespondError {
822 : span: Span,
823 : error: BatchedPageStreamError,
824 : },
825 : }
826 :
827 : impl BatchedFeMessage {
828 0 : fn as_static_str(&self) -> &'static str {
829 0 : self.into()
830 0 : }
831 :
832 0 : fn observe_execution_start(&mut self, at: Instant) {
833 0 : match self {
834 0 : BatchedFeMessage::Exists { timer, .. }
835 0 : | BatchedFeMessage::Nblocks { timer, .. }
836 0 : | BatchedFeMessage::DbSize { timer, .. }
837 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
838 0 : timer.observe_execution_start(at);
839 0 : }
840 0 : BatchedFeMessage::GetPage { pages, .. } => {
841 0 : for page in pages {
842 0 : page.timer.observe_execution_start(at);
843 0 : }
844 : }
845 : #[cfg(feature = "testing")]
846 0 : BatchedFeMessage::Test { requests, .. } => {
847 0 : for req in requests {
848 0 : req.timer.observe_execution_start(at);
849 0 : }
850 : }
851 0 : BatchedFeMessage::RespondError { .. } => {}
852 : }
853 0 : }
854 :
855 0 : fn should_break_batch(
856 0 : &self,
857 0 : other: &BatchedFeMessage,
858 0 : max_batch_size: NonZeroUsize,
859 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
860 0 : ) -> Option<GetPageBatchBreakReason> {
861 0 : match (self, other) {
862 : (
863 : BatchedFeMessage::GetPage {
864 0 : shard: accum_shard,
865 0 : pages: accum_pages,
866 0 : ..
867 0 : },
868 0 : BatchedFeMessage::GetPage {
869 0 : shard: this_shard,
870 0 : pages: this_pages,
871 0 : ..
872 0 : },
873 0 : ) => {
874 0 : assert_eq!(this_pages.len(), 1);
875 0 : if accum_pages.len() >= max_batch_size.get() {
876 0 : trace!(%max_batch_size, "stopping batching because of batch size");
877 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
878 :
879 0 : return Some(GetPageBatchBreakReason::BatchFull);
880 0 : }
881 0 : if !accum_shard.is_same_handle_as(this_shard) {
882 0 : trace!("stopping batching because timeline object mismatch");
883 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
884 : // But the current logic for keeping responses in order does not support that.
885 :
886 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
887 0 : }
888 0 :
889 0 : match batching_strategy {
890 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
891 0 : if let Some(last_in_batch) = accum_pages.last() {
892 0 : if last_in_batch.lsn_range.effective_lsn
893 0 : != this_pages[0].lsn_range.effective_lsn
894 : {
895 0 : trace!(
896 : accum_lsn = %last_in_batch.lsn_range.effective_lsn,
897 0 : this_lsn = %this_pages[0].lsn_range.effective_lsn,
898 0 : "stopping batching because LSN changed"
899 : );
900 :
901 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
902 0 : }
903 0 : }
904 : }
905 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
906 : // The read path doesn't curently support serving the same page at different LSNs.
907 : // While technically possible, it's uncertain if the complexity is worth it.
908 : // Break the batch if such a case is encountered.
909 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
910 0 : batched.req.rel == this_pages[0].req.rel
911 0 : && batched.req.blkno == this_pages[0].req.blkno
912 0 : && batched.lsn_range.effective_lsn
913 0 : != this_pages[0].lsn_range.effective_lsn
914 0 : });
915 0 :
916 0 : if same_page_different_lsn {
917 0 : trace!(
918 0 : rel=%this_pages[0].req.rel,
919 0 : blkno=%this_pages[0].req.blkno,
920 0 : lsn=%this_pages[0].lsn_range.effective_lsn,
921 0 : "stopping batching because same page was requested at different LSNs"
922 : );
923 :
924 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
925 0 : }
926 : }
927 : }
928 :
929 0 : None
930 : }
931 : #[cfg(feature = "testing")]
932 : (
933 : BatchedFeMessage::Test {
934 0 : shard: accum_shard,
935 0 : requests: accum_requests,
936 0 : ..
937 0 : },
938 0 : BatchedFeMessage::Test {
939 0 : shard: this_shard,
940 0 : requests: this_requests,
941 0 : ..
942 0 : },
943 0 : ) => {
944 0 : assert!(this_requests.len() == 1);
945 0 : if accum_requests.len() >= max_batch_size.get() {
946 0 : trace!(%max_batch_size, "stopping batching because of batch size");
947 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
948 0 : return Some(GetPageBatchBreakReason::BatchFull);
949 0 : }
950 0 : if !accum_shard.is_same_handle_as(this_shard) {
951 0 : trace!("stopping batching because timeline object mismatch");
952 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
953 : // But the current logic for keeping responses in order does not support that.
954 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
955 0 : }
956 0 : let this_batch_key = this_requests[0].req.batch_key;
957 0 : let accum_batch_key = accum_requests[0].req.batch_key;
958 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
959 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
960 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
961 0 : }
962 0 : None
963 : }
964 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
965 : }
966 0 : }
967 : }
968 :
969 : impl PageServerHandler {
970 : #[allow(clippy::too_many_arguments)]
971 0 : pub fn new(
972 0 : tenant_manager: Arc<TenantManager>,
973 0 : auth: Option<Arc<SwappableJwtAuth>>,
974 0 : pipelining_config: PageServicePipeliningConfig,
975 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
976 0 : perf_span_fields: ConnectionPerfSpanFields,
977 0 : basebackup_cache: Arc<BasebackupCache>,
978 0 : connection_ctx: RequestContext,
979 0 : cancel: CancellationToken,
980 0 : gate_guard: GateGuard,
981 0 : ) -> Self {
982 0 : PageServerHandler {
983 0 : auth,
984 0 : claims: None,
985 0 : connection_ctx,
986 0 : perf_span_fields,
987 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
988 0 : cancel,
989 0 : pipelining_config,
990 0 : get_vectored_concurrent_io,
991 0 : basebackup_cache,
992 0 : gate_guard,
993 0 : }
994 0 : }
995 :
996 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
997 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
998 : /// cancellation if there aren't any timelines in the cache.
999 : ///
1000 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
1001 : /// timeline cancellation token.
1002 0 : async fn flush_cancellable<IO>(
1003 0 : &self,
1004 0 : pgb: &mut PostgresBackend<IO>,
1005 0 : cancel: &CancellationToken,
1006 0 : ) -> Result<(), QueryError>
1007 0 : where
1008 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1009 0 : {
1010 0 : tokio::select!(
1011 0 : flush_r = pgb.flush() => {
1012 0 : Ok(flush_r?)
1013 : },
1014 0 : _ = cancel.cancelled() => {
1015 0 : Err(QueryError::Shutdown)
1016 : }
1017 : )
1018 0 : }
1019 :
1020 : #[allow(clippy::too_many_arguments)]
1021 0 : async fn pagestream_read_message<IO>(
1022 0 : pgb: &mut PostgresBackendReader<IO>,
1023 0 : tenant_id: TenantId,
1024 0 : timeline_id: TimelineId,
1025 0 : timeline_handles: &mut TimelineHandles,
1026 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
1027 0 : cancel: &CancellationToken,
1028 0 : ctx: &RequestContext,
1029 0 : protocol_version: PagestreamProtocolVersion,
1030 0 : parent_span: Span,
1031 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
1032 0 : where
1033 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1034 0 : {
1035 0 : let msg = tokio::select! {
1036 : biased;
1037 0 : _ = cancel.cancelled() => {
1038 0 : return Err(QueryError::Shutdown)
1039 : }
1040 0 : msg = pgb.read_message() => { msg }
1041 0 : };
1042 0 :
1043 0 : let received_at = Instant::now();
1044 :
1045 0 : let copy_data_bytes = match msg? {
1046 0 : Some(FeMessage::CopyData(bytes)) => bytes,
1047 : Some(FeMessage::Terminate) => {
1048 0 : return Ok(None);
1049 : }
1050 0 : Some(m) => {
1051 0 : return Err(QueryError::Other(anyhow::anyhow!(
1052 0 : "unexpected message: {m:?} during COPY"
1053 0 : )));
1054 : }
1055 : None => {
1056 0 : return Ok(None);
1057 : } // client disconnected
1058 : };
1059 0 : trace!("query: {copy_data_bytes:?}");
1060 :
1061 0 : fail::fail_point!("ps::handle-pagerequest-message");
1062 :
1063 : // parse request
1064 0 : let neon_fe_msg =
1065 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
1066 :
1067 : // TODO: turn in to async closure once available to avoid repeating received_at
1068 0 : async fn record_op_start_and_throttle(
1069 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
1070 0 : op: metrics::SmgrQueryType,
1071 0 : received_at: Instant,
1072 0 : ) -> Result<SmgrOpTimer, QueryError> {
1073 0 : // It's important to start the smgr op metric recorder as early as possible
1074 0 : // so that the _started counters are incremented before we do
1075 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
1076 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
1077 0 : let now = Instant::now();
1078 0 : timer.observe_throttle_start(now);
1079 0 : let throttled = tokio::select! {
1080 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
1081 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
1082 : };
1083 0 : timer.observe_throttle_done(throttled);
1084 0 : Ok(timer)
1085 0 : }
1086 :
1087 0 : let batched_msg = match neon_fe_msg {
1088 0 : PagestreamFeMessage::Exists(req) => {
1089 0 : let shard = timeline_handles
1090 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1091 0 : .await?;
1092 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1093 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1094 0 : let timer = record_op_start_and_throttle(
1095 0 : &shard,
1096 0 : metrics::SmgrQueryType::GetRelExists,
1097 0 : received_at,
1098 0 : )
1099 0 : .await?;
1100 0 : BatchedFeMessage::Exists {
1101 0 : span,
1102 0 : timer,
1103 0 : shard: shard.downgrade(),
1104 0 : req,
1105 0 : }
1106 : }
1107 0 : PagestreamFeMessage::Nblocks(req) => {
1108 0 : let shard = timeline_handles
1109 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1110 0 : .await?;
1111 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1112 0 : let timer = record_op_start_and_throttle(
1113 0 : &shard,
1114 0 : metrics::SmgrQueryType::GetRelSize,
1115 0 : received_at,
1116 0 : )
1117 0 : .await?;
1118 0 : BatchedFeMessage::Nblocks {
1119 0 : span,
1120 0 : timer,
1121 0 : shard: shard.downgrade(),
1122 0 : req,
1123 0 : }
1124 : }
1125 0 : PagestreamFeMessage::DbSize(req) => {
1126 0 : let shard = timeline_handles
1127 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1128 0 : .await?;
1129 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1130 0 : let timer = record_op_start_and_throttle(
1131 0 : &shard,
1132 0 : metrics::SmgrQueryType::GetDbSize,
1133 0 : received_at,
1134 0 : )
1135 0 : .await?;
1136 0 : BatchedFeMessage::DbSize {
1137 0 : span,
1138 0 : timer,
1139 0 : shard: shard.downgrade(),
1140 0 : req,
1141 0 : }
1142 : }
1143 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1144 0 : let shard = timeline_handles
1145 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1146 0 : .await?;
1147 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1148 0 : let timer = record_op_start_and_throttle(
1149 0 : &shard,
1150 0 : metrics::SmgrQueryType::GetSlruSegment,
1151 0 : received_at,
1152 0 : )
1153 0 : .await?;
1154 0 : BatchedFeMessage::GetSlruSegment {
1155 0 : span,
1156 0 : timer,
1157 0 : shard: shard.downgrade(),
1158 0 : req,
1159 0 : }
1160 : }
1161 0 : PagestreamFeMessage::GetPage(req) => {
1162 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1163 : macro_rules! mkspan {
1164 : (before shard routing) => {{
1165 : tracing::info_span!(
1166 : parent: &parent_span,
1167 : "handle_get_page_request",
1168 : request_id = %req.hdr.reqid,
1169 : rel = %req.rel,
1170 : blkno = %req.blkno,
1171 : req_lsn = %req.hdr.request_lsn,
1172 : not_modified_since_lsn = %req.hdr.not_modified_since,
1173 : )
1174 : }};
1175 : ($shard_id:expr) => {{
1176 : tracing::info_span!(
1177 : parent: &parent_span,
1178 : "handle_get_page_request",
1179 : request_id = %req.hdr.reqid,
1180 : rel = %req.rel,
1181 : blkno = %req.blkno,
1182 : req_lsn = %req.hdr.request_lsn,
1183 : not_modified_since_lsn = %req.hdr.not_modified_since,
1184 : shard_id = %$shard_id,
1185 : )
1186 : }};
1187 : }
1188 :
1189 : macro_rules! respond_error {
1190 : ($span:expr, $error:expr) => {{
1191 : let error = BatchedFeMessage::RespondError {
1192 : span: $span,
1193 : error: BatchedPageStreamError {
1194 : req: req.hdr,
1195 : err: $error,
1196 : },
1197 : };
1198 : Ok(Some(error))
1199 : }};
1200 : }
1201 :
1202 0 : let key = rel_block_to_key(req.rel, req.blkno);
1203 :
1204 0 : let res = timeline_handles
1205 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1206 0 : .await;
1207 :
1208 0 : let shard = match res {
1209 0 : Ok(tl) => tl,
1210 0 : Err(e) => {
1211 0 : let span = mkspan!(before shard routing);
1212 0 : match e {
1213 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1214 : // We already know this tenant exists in general, because we resolved it at
1215 : // start of connection. Getting a NotFound here indicates that the shard containing
1216 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1217 : // mapping is out of date.
1218 : //
1219 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1220 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1221 : // and talk to a different pageserver.
1222 0 : return respond_error!(
1223 0 : span,
1224 0 : PageStreamError::Reconnect(
1225 0 : "getpage@lsn request routed to wrong shard".into()
1226 0 : )
1227 0 : );
1228 : }
1229 0 : e => {
1230 0 : return respond_error!(span, e.into());
1231 : }
1232 : }
1233 : }
1234 : };
1235 :
1236 0 : let ctx = if shard.is_get_page_request_sampled() {
1237 0 : RequestContextBuilder::from(ctx)
1238 0 : .root_perf_span(|| {
1239 0 : info_span!(
1240 : target: PERF_TRACE_TARGET,
1241 : "GET_PAGE",
1242 : peer_addr = conn_perf_span_fields.peer_addr,
1243 : application_name = conn_perf_span_fields.application_name,
1244 : compute_mode = conn_perf_span_fields.compute_mode,
1245 : tenant_id = %tenant_id,
1246 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1247 : timeline_id = %timeline_id,
1248 : lsn = %req.hdr.request_lsn,
1249 : not_modified_since_lsn = %req.hdr.not_modified_since,
1250 : request_id = %req.hdr.reqid,
1251 : key = %key,
1252 : )
1253 0 : })
1254 0 : .attached_child()
1255 : } else {
1256 0 : ctx.attached_child()
1257 : };
1258 :
1259 : // This ctx travels as part of the BatchedFeMessage through
1260 : // batching into the request handler.
1261 : // The request handler needs to do some per-request work
1262 : // (relsize check) before dispatching the batch as a single
1263 : // get_vectored call to the Timeline.
1264 : // This ctx will be used for the reslize check, whereas the
1265 : // get_vectored call will be a different ctx with separate
1266 : // perf span.
1267 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1268 :
1269 : // Similar game for this `span`: we funnel it through so that
1270 : // request handler log messages contain the request-specific fields.
1271 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1272 :
1273 0 : let timer = record_op_start_and_throttle(
1274 0 : &shard,
1275 0 : metrics::SmgrQueryType::GetPageAtLsn,
1276 0 : received_at,
1277 0 : )
1278 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1279 0 : info_span!(
1280 : target: PERF_TRACE_TARGET,
1281 0 : parent: current_perf_span,
1282 : "THROTTLE",
1283 : )
1284 0 : })
1285 0 : .await?;
1286 :
1287 : // We're holding the Handle
1288 0 : let effective_lsn = match Self::effective_request_lsn(
1289 0 : &shard,
1290 0 : shard.get_last_record_lsn(),
1291 0 : req.hdr.request_lsn,
1292 0 : req.hdr.not_modified_since,
1293 0 : &shard.get_applied_gc_cutoff_lsn(),
1294 0 : ) {
1295 0 : Ok(lsn) => lsn,
1296 0 : Err(e) => {
1297 0 : return respond_error!(span, e);
1298 : }
1299 : };
1300 :
1301 : BatchedFeMessage::GetPage {
1302 0 : span,
1303 0 : shard: shard.downgrade(),
1304 0 : pages: smallvec::smallvec![BatchedGetPageRequest {
1305 0 : req,
1306 0 : timer,
1307 0 : lsn_range: LsnRange {
1308 0 : effective_lsn,
1309 0 : request_lsn: req.hdr.request_lsn
1310 0 : },
1311 0 : ctx,
1312 0 : }],
1313 : // The executor grabs the batch when it becomes idle.
1314 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1315 : // default reason for breaking the batch.
1316 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1317 : }
1318 : }
1319 : #[cfg(feature = "testing")]
1320 0 : PagestreamFeMessage::Test(req) => {
1321 0 : let shard = timeline_handles
1322 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1323 0 : .await?;
1324 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1325 0 : let timer =
1326 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
1327 0 : .await?;
1328 0 : BatchedFeMessage::Test {
1329 0 : span,
1330 0 : shard: shard.downgrade(),
1331 0 : requests: vec![BatchedTestRequest { req, timer }],
1332 0 : }
1333 : }
1334 : };
1335 0 : Ok(Some(batched_msg))
1336 0 : }
1337 :
1338 : /// Post-condition: `batch` is Some()
1339 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1340 : #[allow(clippy::boxed_local)]
1341 : fn pagestream_do_batch(
1342 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1343 : max_batch_size: NonZeroUsize,
1344 : batch: &mut Result<BatchedFeMessage, QueryError>,
1345 : this_msg: Result<BatchedFeMessage, QueryError>,
1346 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1347 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1348 :
1349 : let this_msg = match this_msg {
1350 : Ok(this_msg) => this_msg,
1351 : Err(e) => return Err(Err(e)),
1352 : };
1353 :
1354 : let eligible_batch = match batch {
1355 : Ok(b) => b,
1356 : Err(_) => {
1357 : return Err(Ok(this_msg));
1358 : }
1359 : };
1360 :
1361 : let batch_break =
1362 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1363 :
1364 : match batch_break {
1365 : Some(reason) => {
1366 : if let BatchedFeMessage::GetPage {
1367 : batch_break_reason, ..
1368 : } = eligible_batch
1369 : {
1370 : *batch_break_reason = reason;
1371 : }
1372 :
1373 : Err(Ok(this_msg))
1374 : }
1375 : None => {
1376 : // ok to batch
1377 : match (eligible_batch, this_msg) {
1378 : (
1379 : BatchedFeMessage::GetPage {
1380 : pages: accum_pages, ..
1381 : },
1382 : BatchedFeMessage::GetPage {
1383 : pages: this_pages, ..
1384 : },
1385 : ) => {
1386 : accum_pages.extend(this_pages);
1387 : Ok(())
1388 : }
1389 : #[cfg(feature = "testing")]
1390 : (
1391 : BatchedFeMessage::Test {
1392 : requests: accum_requests,
1393 : ..
1394 : },
1395 : BatchedFeMessage::Test {
1396 : requests: this_requests,
1397 : ..
1398 : },
1399 : ) => {
1400 : accum_requests.extend(this_requests);
1401 : Ok(())
1402 : }
1403 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1404 : _ => unreachable!(),
1405 : }
1406 : }
1407 : }
1408 : }
1409 :
1410 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1411 : async fn pagestream_handle_batched_message<IO>(
1412 : &mut self,
1413 : pgb_writer: &mut PostgresBackend<IO>,
1414 : batch: BatchedFeMessage,
1415 : io_concurrency: IoConcurrency,
1416 : cancel: &CancellationToken,
1417 : protocol_version: PagestreamProtocolVersion,
1418 : ctx: &RequestContext,
1419 : ) -> Result<(), QueryError>
1420 : where
1421 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1422 : {
1423 : let started_at = Instant::now();
1424 : let batch = {
1425 : let mut batch = batch;
1426 : batch.observe_execution_start(started_at);
1427 : batch
1428 : };
1429 :
1430 : // Dispatch the batch to the appropriate request handler.
1431 : let log_slow_name = batch.as_static_str();
1432 : let (mut handler_results, span) = {
1433 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1434 : // won't fit on the stack.
1435 : let mut boxpinned =
1436 : Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
1437 : log_slow(
1438 : log_slow_name,
1439 : LOG_SLOW_GETPAGE_THRESHOLD,
1440 : boxpinned.as_mut(),
1441 : )
1442 : .await?
1443 : };
1444 :
1445 : // We purposefully don't count flush time into the smgr operation timer.
1446 : //
1447 : // The reason is that current compute client will not perform protocol processing
1448 : // if the postgres backend process is doing things other than `->smgr_read()`.
1449 : // This is especially the case for prefetch.
1450 : //
1451 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1452 : // all the way into our flush call below.
1453 : //
1454 : // The timer's underlying metric is used for a storage-internal latency SLO and
1455 : // we don't want to include latency in it that we can't control.
1456 : // And as pointed out above, in this case, we don't control the time that flush will take.
1457 : //
1458 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1459 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1460 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1461 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1462 : let flush_timers = {
1463 : let flushing_start_time = Instant::now();
1464 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1465 : for handler_result in &mut handler_results {
1466 : let flush_timer = match handler_result {
1467 : Ok((_, timer)) => Some(
1468 : timer
1469 : .observe_execution_end(flushing_start_time)
1470 : .expect("we are the first caller"),
1471 : ),
1472 : Err(_) => {
1473 : // TODO: measure errors
1474 : None
1475 : }
1476 : };
1477 : flush_timers.push(flush_timer);
1478 : }
1479 : assert_eq!(flush_timers.len(), handler_results.len());
1480 : flush_timers
1481 : };
1482 :
1483 : // Map handler result to protocol behavior.
1484 : // Some handler errors cause exit from pagestream protocol.
1485 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1486 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1487 : let response_msg = match handler_result {
1488 : Err(e) => match &e.err {
1489 : PageStreamError::Shutdown => {
1490 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1491 : // shutdown, then do not send the error to the client. Instead just drop the
1492 : // connection.
1493 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1494 : return Err(QueryError::Shutdown);
1495 : }
1496 : PageStreamError::Reconnect(reason) => {
1497 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1498 : return Err(QueryError::Reconnect);
1499 : }
1500 : PageStreamError::Read(_)
1501 : | PageStreamError::LsnTimeout(_)
1502 : | PageStreamError::NotFound(_)
1503 : | PageStreamError::BadRequest(_) => {
1504 : // print the all details to the log with {:#}, but for the client the
1505 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1506 : // here includes cancellation which is not an error.
1507 : let full = utils::error::report_compact_sources(&e.err);
1508 0 : span.in_scope(|| {
1509 0 : error!("error reading relation or page version: {full:#}")
1510 0 : });
1511 :
1512 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1513 : req: e.req,
1514 : message: e.err.to_string(),
1515 : })
1516 : }
1517 : },
1518 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1519 : };
1520 :
1521 : //
1522 : // marshal & transmit response message
1523 : //
1524 :
1525 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1526 : &response_msg.serialize(protocol_version),
1527 : ))?;
1528 :
1529 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1530 :
1531 : // what we want to do
1532 : let socket_fd = pgb_writer.socket_fd;
1533 : let flush_fut = pgb_writer.flush();
1534 : // metric for how long flushing takes
1535 : let flush_fut = match flushing_timer {
1536 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1537 : Instant::now(),
1538 : flush_fut,
1539 : socket_fd,
1540 : )),
1541 : None => futures::future::Either::Right(flush_fut),
1542 : };
1543 : // do it while respecting cancellation
1544 0 : let _: () = async move {
1545 0 : tokio::select! {
1546 : biased;
1547 0 : _ = cancel.cancelled() => {
1548 : // We were requested to shut down.
1549 0 : info!("shutdown request received in page handler");
1550 0 : return Err(QueryError::Shutdown)
1551 : }
1552 0 : res = flush_fut => {
1553 0 : res?;
1554 : }
1555 : }
1556 0 : Ok(())
1557 0 : }
1558 : .await?;
1559 : }
1560 : Ok(())
1561 : }
1562 :
1563 : /// Helper which dispatches a batched message to the appropriate handler.
1564 : /// Returns a vec of results, along with the extracted trace span.
1565 0 : async fn pagestream_dispatch_batched_message(
1566 0 : &mut self,
1567 0 : batch: BatchedFeMessage,
1568 0 : io_concurrency: IoConcurrency,
1569 0 : ctx: &RequestContext,
1570 0 : ) -> Result<
1571 0 : (
1572 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1573 0 : Span,
1574 0 : ),
1575 0 : QueryError,
1576 0 : > {
1577 : macro_rules! upgrade_handle_and_set_context {
1578 : ($shard:ident) => {{
1579 : let weak_handle = &$shard;
1580 : let handle = weak_handle.upgrade()?;
1581 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1582 : (handle, ctx)
1583 : }};
1584 : }
1585 0 : Ok(match batch {
1586 : BatchedFeMessage::Exists {
1587 0 : span,
1588 0 : timer,
1589 0 : shard,
1590 0 : req,
1591 0 : } => {
1592 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1593 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1594 : (
1595 0 : vec![
1596 0 : self.handle_get_rel_exists_request(&shard, &req, &ctx)
1597 0 : .instrument(span.clone())
1598 0 : .await
1599 0 : .map(|msg| (msg, timer))
1600 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1601 0 : ],
1602 0 : span,
1603 : )
1604 : }
1605 : BatchedFeMessage::Nblocks {
1606 0 : span,
1607 0 : timer,
1608 0 : shard,
1609 0 : req,
1610 0 : } => {
1611 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1612 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1613 : (
1614 0 : vec![
1615 0 : self.handle_get_nblocks_request(&shard, &req, &ctx)
1616 0 : .instrument(span.clone())
1617 0 : .await
1618 0 : .map(|msg| (msg, timer))
1619 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1620 0 : ],
1621 0 : span,
1622 : )
1623 : }
1624 : BatchedFeMessage::GetPage {
1625 0 : span,
1626 0 : shard,
1627 0 : pages,
1628 0 : batch_break_reason,
1629 0 : } => {
1630 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1631 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1632 : (
1633 : {
1634 0 : let npages = pages.len();
1635 0 : trace!(npages, "handling getpage request");
1636 0 : let res = self
1637 0 : .handle_get_page_at_lsn_request_batched(
1638 0 : &shard,
1639 0 : pages,
1640 0 : io_concurrency,
1641 0 : batch_break_reason,
1642 0 : &ctx,
1643 0 : )
1644 0 : .instrument(span.clone())
1645 0 : .await;
1646 0 : assert_eq!(res.len(), npages);
1647 0 : res
1648 0 : },
1649 0 : span,
1650 : )
1651 : }
1652 : BatchedFeMessage::DbSize {
1653 0 : span,
1654 0 : timer,
1655 0 : shard,
1656 0 : req,
1657 0 : } => {
1658 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1659 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1660 : (
1661 0 : vec![
1662 0 : self.handle_db_size_request(&shard, &req, &ctx)
1663 0 : .instrument(span.clone())
1664 0 : .await
1665 0 : .map(|msg| (msg, timer))
1666 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1667 0 : ],
1668 0 : span,
1669 : )
1670 : }
1671 : BatchedFeMessage::GetSlruSegment {
1672 0 : span,
1673 0 : timer,
1674 0 : shard,
1675 0 : req,
1676 0 : } => {
1677 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1678 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1679 : (
1680 0 : vec![
1681 0 : self.handle_get_slru_segment_request(&shard, &req, &ctx)
1682 0 : .instrument(span.clone())
1683 0 : .await
1684 0 : .map(|msg| (msg, timer))
1685 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1686 0 : ],
1687 0 : span,
1688 : )
1689 : }
1690 : #[cfg(feature = "testing")]
1691 : BatchedFeMessage::Test {
1692 0 : span,
1693 0 : shard,
1694 0 : requests,
1695 0 : } => {
1696 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1697 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1698 : (
1699 : {
1700 0 : let npages = requests.len();
1701 0 : trace!(npages, "handling getpage request");
1702 0 : let res = self
1703 0 : .handle_test_request_batch(&shard, requests, &ctx)
1704 0 : .instrument(span.clone())
1705 0 : .await;
1706 0 : assert_eq!(res.len(), npages);
1707 0 : res
1708 0 : },
1709 0 : span,
1710 : )
1711 : }
1712 0 : BatchedFeMessage::RespondError { span, error } => {
1713 0 : // We've already decided to respond with an error, so we don't need to
1714 0 : // call the handler.
1715 0 : (vec![Err(error)], span)
1716 : }
1717 : })
1718 0 : }
1719 :
1720 : /// Pagestream sub-protocol handler.
1721 : ///
1722 : /// It is a simple request-response protocol inside a COPYBOTH session.
1723 : ///
1724 : /// # Coding Discipline
1725 : ///
1726 : /// Coding discipline within this function: all interaction with the `pgb` connection
1727 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1728 : /// This is so that we can shutdown page_service quickly.
1729 : #[instrument(skip_all)]
1730 : async fn handle_pagerequests<IO>(
1731 : &mut self,
1732 : pgb: &mut PostgresBackend<IO>,
1733 : tenant_id: TenantId,
1734 : timeline_id: TimelineId,
1735 : protocol_version: PagestreamProtocolVersion,
1736 : ctx: RequestContext,
1737 : ) -> Result<(), QueryError>
1738 : where
1739 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1740 : {
1741 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1742 :
1743 : // switch client to COPYBOTH
1744 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1745 : tokio::select! {
1746 : biased;
1747 : _ = self.cancel.cancelled() => {
1748 : return Err(QueryError::Shutdown)
1749 : }
1750 : res = pgb.flush() => {
1751 : res?;
1752 : }
1753 : }
1754 :
1755 : let io_concurrency = IoConcurrency::spawn_from_conf(
1756 : self.get_vectored_concurrent_io,
1757 : match self.gate_guard.try_clone() {
1758 : Ok(guard) => guard,
1759 : Err(_) => {
1760 : info!("shutdown request received in page handler");
1761 : return Err(QueryError::Shutdown);
1762 : }
1763 : },
1764 : );
1765 :
1766 : let pgb_reader = pgb
1767 : .split()
1768 : .context("implementation error: split pgb into reader and writer")?;
1769 :
1770 : let timeline_handles = self
1771 : .timeline_handles
1772 : .take()
1773 : .expect("implementation error: timeline_handles should not be locked");
1774 :
1775 : let request_span = info_span!("request");
1776 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1777 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1778 : self.handle_pagerequests_pipelined(
1779 : pgb,
1780 : pgb_reader,
1781 : tenant_id,
1782 : timeline_id,
1783 : timeline_handles,
1784 : request_span,
1785 : pipelining_config,
1786 : protocol_version,
1787 : io_concurrency,
1788 : &ctx,
1789 : )
1790 : .await
1791 : }
1792 : PageServicePipeliningConfig::Serial => {
1793 : self.handle_pagerequests_serial(
1794 : pgb,
1795 : pgb_reader,
1796 : tenant_id,
1797 : timeline_id,
1798 : timeline_handles,
1799 : request_span,
1800 : protocol_version,
1801 : io_concurrency,
1802 : &ctx,
1803 : )
1804 : .await
1805 : }
1806 : };
1807 :
1808 : debug!("pagestream subprotocol shut down cleanly");
1809 :
1810 : pgb.unsplit(pgb_reader)
1811 : .context("implementation error: unsplit pgb")?;
1812 :
1813 : let replaced = self.timeline_handles.replace(timeline_handles);
1814 : assert!(replaced.is_none());
1815 :
1816 : result
1817 : }
1818 :
1819 : #[allow(clippy::too_many_arguments)]
1820 0 : async fn handle_pagerequests_serial<IO>(
1821 0 : &mut self,
1822 0 : pgb_writer: &mut PostgresBackend<IO>,
1823 0 : mut pgb_reader: PostgresBackendReader<IO>,
1824 0 : tenant_id: TenantId,
1825 0 : timeline_id: TimelineId,
1826 0 : mut timeline_handles: TimelineHandles,
1827 0 : request_span: Span,
1828 0 : protocol_version: PagestreamProtocolVersion,
1829 0 : io_concurrency: IoConcurrency,
1830 0 : ctx: &RequestContext,
1831 0 : ) -> (
1832 0 : (PostgresBackendReader<IO>, TimelineHandles),
1833 0 : Result<(), QueryError>,
1834 0 : )
1835 0 : where
1836 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1837 0 : {
1838 0 : let cancel = self.cancel.clone();
1839 :
1840 0 : let err = loop {
1841 0 : let msg = Self::pagestream_read_message(
1842 0 : &mut pgb_reader,
1843 0 : tenant_id,
1844 0 : timeline_id,
1845 0 : &mut timeline_handles,
1846 0 : &self.perf_span_fields,
1847 0 : &cancel,
1848 0 : ctx,
1849 0 : protocol_version,
1850 0 : request_span.clone(),
1851 0 : )
1852 0 : .await;
1853 0 : let msg = match msg {
1854 0 : Ok(msg) => msg,
1855 0 : Err(e) => break e,
1856 : };
1857 0 : let msg = match msg {
1858 0 : Some(msg) => msg,
1859 : None => {
1860 0 : debug!("pagestream subprotocol end observed");
1861 0 : return ((pgb_reader, timeline_handles), Ok(()));
1862 : }
1863 : };
1864 :
1865 0 : let result = self
1866 0 : .pagestream_handle_batched_message(
1867 0 : pgb_writer,
1868 0 : msg,
1869 0 : io_concurrency.clone(),
1870 0 : &cancel,
1871 0 : protocol_version,
1872 0 : ctx,
1873 0 : )
1874 0 : .await;
1875 0 : match result {
1876 0 : Ok(()) => {}
1877 0 : Err(e) => break e,
1878 : }
1879 : };
1880 0 : ((pgb_reader, timeline_handles), Err(err))
1881 0 : }
1882 :
1883 : /// # Cancel-Safety
1884 : ///
1885 : /// May leak tokio tasks if not polled to completion.
1886 : #[allow(clippy::too_many_arguments)]
1887 0 : async fn handle_pagerequests_pipelined<IO>(
1888 0 : &mut self,
1889 0 : pgb_writer: &mut PostgresBackend<IO>,
1890 0 : pgb_reader: PostgresBackendReader<IO>,
1891 0 : tenant_id: TenantId,
1892 0 : timeline_id: TimelineId,
1893 0 : mut timeline_handles: TimelineHandles,
1894 0 : request_span: Span,
1895 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1896 0 : protocol_version: PagestreamProtocolVersion,
1897 0 : io_concurrency: IoConcurrency,
1898 0 : ctx: &RequestContext,
1899 0 : ) -> (
1900 0 : (PostgresBackendReader<IO>, TimelineHandles),
1901 0 : Result<(), QueryError>,
1902 0 : )
1903 0 : where
1904 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1905 0 : {
1906 0 : //
1907 0 : // Pipelined pagestream handling consists of
1908 0 : // - a Batcher that reads requests off the wire and
1909 0 : // and batches them if possible,
1910 0 : // - an Executor that processes the batched requests.
1911 0 : //
1912 0 : // The batch is built up inside an `spsc_fold` channel,
1913 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1914 0 : //
1915 0 : // The Batcher continously folds client requests into the batch,
1916 0 : // while the Executor can at any time take out what's in the batch
1917 0 : // in order to process it.
1918 0 : // This means the next batch builds up while the Executor
1919 0 : // executes the last batch.
1920 0 : //
1921 0 : // CANCELLATION
1922 0 : //
1923 0 : // We run both Batcher and Executor futures to completion before
1924 0 : // returning from this function.
1925 0 : //
1926 0 : // If Executor exits first, it signals cancellation to the Batcher
1927 0 : // via a CancellationToken that is child of `self.cancel`.
1928 0 : // If Batcher exits first, it signals cancellation to the Executor
1929 0 : // by dropping the spsc_fold channel Sender.
1930 0 : //
1931 0 : // CLEAN SHUTDOWN
1932 0 : //
1933 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1934 0 : // In response to such a client message, the Batcher exits.
1935 0 : // The Executor continues to run, draining the spsc_fold channel.
1936 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1937 0 : // indicating that the sender disconnected.
1938 0 : // The Executor exits with Ok(()) in response to that error.
1939 0 : //
1940 0 : // Server initiated shutdown is not clean shutdown, but instead
1941 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1942 0 : // error propagation.
1943 0 : //
1944 0 : // ERROR PROPAGATION
1945 0 : //
1946 0 : // When the Batcher encounter an error, it sends it as a value
1947 0 : // through the spsc_fold channel and exits afterwards.
1948 0 : // When the Executor observes such an error in the channel,
1949 0 : // it exits returning that error value.
1950 0 : //
1951 0 : // This design ensures that the Executor stage will still process
1952 0 : // the batch that was in flight when the Batcher encountered an error,
1953 0 : // thereby beahving identical to a serial implementation.
1954 0 :
1955 0 : let PageServicePipeliningConfigPipelined {
1956 0 : max_batch_size,
1957 0 : execution,
1958 0 : batching: batching_strategy,
1959 0 : } = pipelining_config;
1960 :
1961 : // Macro to _define_ a pipeline stage.
1962 : macro_rules! pipeline_stage {
1963 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1964 : let cancel: CancellationToken = $cancel;
1965 : let stage_fut = $make_fut(cancel.clone());
1966 0 : async move {
1967 0 : scopeguard::defer! {
1968 0 : debug!("exiting");
1969 0 : }
1970 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1971 0 : .await
1972 0 : }
1973 : .instrument(tracing::info_span!($name))
1974 : }};
1975 : }
1976 :
1977 : //
1978 : // Batcher
1979 : //
1980 :
1981 0 : let perf_span_fields = self.perf_span_fields.clone();
1982 0 :
1983 0 : let cancel_batcher = self.cancel.child_token();
1984 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1985 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1986 0 : let ctx = ctx.attached_child();
1987 0 : async move {
1988 0 : let mut pgb_reader = pgb_reader;
1989 0 : let mut exit = false;
1990 0 : while !exit {
1991 0 : let read_res = Self::pagestream_read_message(
1992 0 : &mut pgb_reader,
1993 0 : tenant_id,
1994 0 : timeline_id,
1995 0 : &mut timeline_handles,
1996 0 : &perf_span_fields,
1997 0 : &cancel_batcher,
1998 0 : &ctx,
1999 0 : protocol_version,
2000 0 : request_span.clone(),
2001 0 : )
2002 0 : .await;
2003 0 : let Some(read_res) = read_res.transpose() else {
2004 0 : debug!("client-initiated shutdown");
2005 0 : break;
2006 : };
2007 0 : exit |= read_res.is_err();
2008 0 : let could_send = batch_tx
2009 0 : .send(read_res, |batch, res| {
2010 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
2011 0 : })
2012 0 : .await;
2013 0 : exit |= could_send.is_err();
2014 : }
2015 0 : (pgb_reader, timeline_handles)
2016 0 : }
2017 0 : });
2018 :
2019 : //
2020 : // Executor
2021 : //
2022 :
2023 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
2024 0 : let ctx = ctx.attached_child();
2025 0 : async move {
2026 0 : let _cancel_batcher = cancel_batcher.drop_guard();
2027 : loop {
2028 0 : let maybe_batch = batch_rx.recv().await;
2029 0 : let batch = match maybe_batch {
2030 0 : Ok(batch) => batch,
2031 : Err(spsc_fold::RecvError::SenderGone) => {
2032 0 : debug!("upstream gone");
2033 0 : return Ok(());
2034 : }
2035 : };
2036 0 : let batch = match batch {
2037 0 : Ok(batch) => batch,
2038 0 : Err(e) => {
2039 0 : return Err(e);
2040 : }
2041 : };
2042 0 : self.pagestream_handle_batched_message(
2043 0 : pgb_writer,
2044 0 : batch,
2045 0 : io_concurrency.clone(),
2046 0 : &cancel,
2047 0 : protocol_version,
2048 0 : &ctx,
2049 0 : )
2050 0 : .await?;
2051 : }
2052 0 : }
2053 0 : });
2054 :
2055 : //
2056 : // Execute the stages.
2057 : //
2058 :
2059 0 : match execution {
2060 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
2061 0 : tokio::join!(batcher, executor)
2062 : }
2063 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
2064 : // These tasks are not tracked anywhere.
2065 0 : let read_messages_task = tokio::spawn(batcher);
2066 0 : let (read_messages_task_res, executor_res_) =
2067 0 : tokio::join!(read_messages_task, executor,);
2068 0 : (
2069 0 : read_messages_task_res.expect("propagated panic from read_messages"),
2070 0 : executor_res_,
2071 0 : )
2072 : }
2073 : }
2074 0 : }
2075 :
2076 : /// Helper function to handle the LSN from client request.
2077 : ///
2078 : /// Each GetPage (and Exists and Nblocks) request includes information about
2079 : /// which version of the page is being requested. The primary compute node
2080 : /// will always request the latest page version, by setting 'request_lsn' to
2081 : /// the last inserted or flushed WAL position, while a standby will request
2082 : /// a version at the LSN that it's currently caught up to.
2083 : ///
2084 : /// In either case, if the page server hasn't received the WAL up to the
2085 : /// requested LSN yet, we will wait for it to arrive. The return value is
2086 : /// the LSN that should be used to look up the page versions.
2087 : ///
2088 : /// In addition to the request LSN, each request carries another LSN,
2089 : /// 'not_modified_since', which is a hint to the pageserver that the client
2090 : /// knows that the page has not been modified between 'not_modified_since'
2091 : /// and the request LSN. This allows skipping the wait, as long as the WAL
2092 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
2093 : /// information about when the page was modified, it will use
2094 : /// not_modified_since == lsn. If the client lies and sends a too low
2095 : /// not_modified_hint such that there are in fact later page versions, the
2096 : /// behavior is undefined: the pageserver may return any of the page versions
2097 : /// or an error.
2098 0 : async fn wait_or_get_last_lsn(
2099 0 : timeline: &Timeline,
2100 0 : request_lsn: Lsn,
2101 0 : not_modified_since: Lsn,
2102 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2103 0 : ctx: &RequestContext,
2104 0 : ) -> Result<Lsn, PageStreamError> {
2105 0 : let last_record_lsn = timeline.get_last_record_lsn();
2106 0 : let effective_request_lsn = Self::effective_request_lsn(
2107 0 : timeline,
2108 0 : last_record_lsn,
2109 0 : request_lsn,
2110 0 : not_modified_since,
2111 0 : latest_gc_cutoff_lsn,
2112 0 : )?;
2113 :
2114 0 : if effective_request_lsn > last_record_lsn {
2115 0 : timeline
2116 0 : .wait_lsn(
2117 0 : not_modified_since,
2118 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2119 0 : timeline::WaitLsnTimeout::Default,
2120 0 : ctx,
2121 0 : )
2122 0 : .await?;
2123 :
2124 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
2125 : // record LSN. (Or close enough for our purposes; the last-record LSN can
2126 : // advance immediately after we return anyway)
2127 0 : }
2128 :
2129 0 : Ok(effective_request_lsn)
2130 0 : }
2131 :
2132 0 : fn effective_request_lsn(
2133 0 : timeline: &Timeline,
2134 0 : last_record_lsn: Lsn,
2135 0 : request_lsn: Lsn,
2136 0 : not_modified_since: Lsn,
2137 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2138 0 : ) -> Result<Lsn, PageStreamError> {
2139 0 : // Sanity check the request
2140 0 : if request_lsn < not_modified_since {
2141 0 : return Err(PageStreamError::BadRequest(
2142 0 : format!(
2143 0 : "invalid request with request LSN {} and not_modified_since {}",
2144 0 : request_lsn, not_modified_since,
2145 0 : )
2146 0 : .into(),
2147 0 : ));
2148 0 : }
2149 0 :
2150 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2151 0 : if request_lsn == Lsn::INVALID {
2152 0 : return Err(PageStreamError::BadRequest(
2153 0 : "invalid LSN(0) in request".into(),
2154 0 : ));
2155 0 : }
2156 0 :
2157 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2158 0 : //
2159 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
2160 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2161 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2162 0 : let gc_info = &timeline.gc_info.read().unwrap();
2163 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2164 0 : return Err(
2165 0 : PageStreamError::BadRequest(format!(
2166 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2167 0 : request_lsn, **latest_gc_cutoff_lsn
2168 0 : ).into())
2169 0 : );
2170 0 : }
2171 0 : }
2172 :
2173 0 : if not_modified_since > last_record_lsn {
2174 0 : Ok(not_modified_since)
2175 : } else {
2176 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2177 : // here instead. That would give the same result, since we know that there
2178 : // haven't been any modifications since 'not_modified_since'. Using an older
2179 : // LSN might be faster, because that could allow skipping recent layers when
2180 : // finding the page. However, we have historically used 'last_record_lsn', so
2181 : // stick to that for now.
2182 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2183 : }
2184 0 : }
2185 :
2186 : /// Handles the lsn lease request.
2187 : /// If a lease cannot be obtained, the client will receive NULL.
2188 : #[instrument(skip_all, fields(shard_id, %lsn))]
2189 : async fn handle_make_lsn_lease<IO>(
2190 : &mut self,
2191 : pgb: &mut PostgresBackend<IO>,
2192 : tenant_shard_id: TenantShardId,
2193 : timeline_id: TimelineId,
2194 : lsn: Lsn,
2195 : ctx: &RequestContext,
2196 : ) -> Result<(), QueryError>
2197 : where
2198 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2199 : {
2200 : let timeline = self
2201 : .timeline_handles
2202 : .as_mut()
2203 : .unwrap()
2204 : .get(
2205 : tenant_shard_id.tenant_id,
2206 : timeline_id,
2207 : ShardSelector::Known(tenant_shard_id.to_index()),
2208 : )
2209 : .await?;
2210 : set_tracing_field_shard_id(&timeline);
2211 :
2212 : let lease = timeline
2213 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2214 0 : .inspect_err(|e| {
2215 0 : warn!("{e}");
2216 0 : })
2217 : .ok();
2218 0 : let valid_until_str = lease.map(|l| {
2219 0 : l.valid_until
2220 0 : .duration_since(SystemTime::UNIX_EPOCH)
2221 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2222 0 : .as_millis()
2223 0 : .to_string()
2224 0 : });
2225 :
2226 : info!(
2227 : "acquired lease for {} until {}",
2228 : lsn,
2229 : valid_until_str.as_deref().unwrap_or("<unknown>")
2230 : );
2231 :
2232 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2233 :
2234 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2235 : b"valid_until",
2236 : )]))?
2237 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2238 :
2239 : Ok(())
2240 : }
2241 :
2242 : #[instrument(skip_all, fields(shard_id))]
2243 : async fn handle_get_rel_exists_request(
2244 : &mut self,
2245 : timeline: &Timeline,
2246 : req: &PagestreamExistsRequest,
2247 : ctx: &RequestContext,
2248 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2249 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2250 : let lsn = Self::wait_or_get_last_lsn(
2251 : timeline,
2252 : req.hdr.request_lsn,
2253 : req.hdr.not_modified_since,
2254 : &latest_gc_cutoff_lsn,
2255 : ctx,
2256 : )
2257 : .await?;
2258 :
2259 : let exists = timeline
2260 : .get_rel_exists(
2261 : req.rel,
2262 : Version::LsnRange(LsnRange {
2263 : effective_lsn: lsn,
2264 : request_lsn: req.hdr.request_lsn,
2265 : }),
2266 : ctx,
2267 : )
2268 : .await?;
2269 :
2270 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
2271 : req: *req,
2272 : exists,
2273 : }))
2274 : }
2275 :
2276 : #[instrument(skip_all, fields(shard_id))]
2277 : async fn handle_get_nblocks_request(
2278 : &mut self,
2279 : timeline: &Timeline,
2280 : req: &PagestreamNblocksRequest,
2281 : ctx: &RequestContext,
2282 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2283 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2284 : let lsn = Self::wait_or_get_last_lsn(
2285 : timeline,
2286 : req.hdr.request_lsn,
2287 : req.hdr.not_modified_since,
2288 : &latest_gc_cutoff_lsn,
2289 : ctx,
2290 : )
2291 : .await?;
2292 :
2293 : let n_blocks = timeline
2294 : .get_rel_size(
2295 : req.rel,
2296 : Version::LsnRange(LsnRange {
2297 : effective_lsn: lsn,
2298 : request_lsn: req.hdr.request_lsn,
2299 : }),
2300 : ctx,
2301 : )
2302 : .await?;
2303 :
2304 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
2305 : req: *req,
2306 : n_blocks,
2307 : }))
2308 : }
2309 :
2310 : #[instrument(skip_all, fields(shard_id))]
2311 : async fn handle_db_size_request(
2312 : &mut self,
2313 : timeline: &Timeline,
2314 : req: &PagestreamDbSizeRequest,
2315 : ctx: &RequestContext,
2316 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2317 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2318 : let lsn = Self::wait_or_get_last_lsn(
2319 : timeline,
2320 : req.hdr.request_lsn,
2321 : req.hdr.not_modified_since,
2322 : &latest_gc_cutoff_lsn,
2323 : ctx,
2324 : )
2325 : .await?;
2326 :
2327 : let total_blocks = timeline
2328 : .get_db_size(
2329 : DEFAULTTABLESPACE_OID,
2330 : req.dbnode,
2331 : Version::LsnRange(LsnRange {
2332 : effective_lsn: lsn,
2333 : request_lsn: req.hdr.request_lsn,
2334 : }),
2335 : ctx,
2336 : )
2337 : .await?;
2338 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2339 :
2340 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
2341 : req: *req,
2342 : db_size,
2343 : }))
2344 : }
2345 :
2346 : #[instrument(skip_all)]
2347 : async fn handle_get_page_at_lsn_request_batched(
2348 : &mut self,
2349 : timeline: &Timeline,
2350 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
2351 : io_concurrency: IoConcurrency,
2352 : batch_break_reason: GetPageBatchBreakReason,
2353 : ctx: &RequestContext,
2354 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2355 : debug_assert_current_span_has_tenant_and_timeline_id();
2356 :
2357 : timeline
2358 : .query_metrics
2359 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2360 :
2361 : // If a page trace is running, submit an event for this request.
2362 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2363 : let time = SystemTime::now();
2364 : for batch in &requests {
2365 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2366 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2367 : _ = page_trace.try_send(PageTraceEvent {
2368 : key,
2369 : effective_lsn: batch.lsn_range.effective_lsn,
2370 : time,
2371 : });
2372 : }
2373 : }
2374 :
2375 : // If any request in the batch needs to wait for LSN, then do so now.
2376 : let mut perf_instrument = false;
2377 : let max_effective_lsn = requests
2378 : .iter()
2379 0 : .map(|req| {
2380 0 : if req.ctx.has_perf_span() {
2381 0 : perf_instrument = true;
2382 0 : }
2383 :
2384 0 : req.lsn_range.effective_lsn
2385 0 : })
2386 : .max()
2387 : .expect("batch is never empty");
2388 :
2389 : let ctx = match perf_instrument {
2390 : true => RequestContextBuilder::from(ctx)
2391 0 : .root_perf_span(|| {
2392 0 : info_span!(
2393 : target: PERF_TRACE_TARGET,
2394 : "GET_VECTORED",
2395 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2396 : timeline_id = %timeline.timeline_id,
2397 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2398 : %max_effective_lsn
2399 : )
2400 0 : })
2401 : .attached_child(),
2402 : false => ctx.attached_child(),
2403 : };
2404 :
2405 : let last_record_lsn = timeline.get_last_record_lsn();
2406 : if max_effective_lsn > last_record_lsn {
2407 : if let Err(e) = timeline
2408 : .wait_lsn(
2409 : max_effective_lsn,
2410 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2411 : timeline::WaitLsnTimeout::Default,
2412 : &ctx,
2413 : )
2414 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2415 0 : info_span!(
2416 : target: PERF_TRACE_TARGET,
2417 0 : parent: current_perf_span,
2418 : "WAIT_LSN",
2419 : )
2420 0 : })
2421 : .await
2422 : {
2423 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2424 0 : Err(BatchedPageStreamError {
2425 0 : err: PageStreamError::from(e.clone()),
2426 0 : req: req.req.hdr,
2427 0 : })
2428 0 : }));
2429 : }
2430 : }
2431 :
2432 : let results = timeline
2433 : .get_rel_page_at_lsn_batched(
2434 0 : requests.iter().map(|p| {
2435 0 : (
2436 0 : &p.req.rel,
2437 0 : &p.req.blkno,
2438 0 : p.lsn_range,
2439 0 : p.ctx.attached_child(),
2440 0 : )
2441 0 : }),
2442 : io_concurrency,
2443 : &ctx,
2444 : )
2445 : .await;
2446 : assert_eq!(results.len(), requests.len());
2447 :
2448 : // TODO: avoid creating the new Vec here
2449 : Vec::from_iter(
2450 : requests
2451 : .into_iter()
2452 : .zip(results.into_iter())
2453 0 : .map(|(req, res)| {
2454 0 : res.map(|page| {
2455 0 : (
2456 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
2457 0 : req: req.req,
2458 0 : page,
2459 0 : }),
2460 0 : req.timer,
2461 0 : )
2462 0 : })
2463 0 : .map_err(|e| BatchedPageStreamError {
2464 0 : err: PageStreamError::from(e),
2465 0 : req: req.req.hdr,
2466 0 : })
2467 0 : }),
2468 : )
2469 : }
2470 :
2471 : #[instrument(skip_all, fields(shard_id))]
2472 : async fn handle_get_slru_segment_request(
2473 : &mut self,
2474 : timeline: &Timeline,
2475 : req: &PagestreamGetSlruSegmentRequest,
2476 : ctx: &RequestContext,
2477 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2478 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2479 : let lsn = Self::wait_or_get_last_lsn(
2480 : timeline,
2481 : req.hdr.request_lsn,
2482 : req.hdr.not_modified_since,
2483 : &latest_gc_cutoff_lsn,
2484 : ctx,
2485 : )
2486 : .await?;
2487 :
2488 : let kind = SlruKind::from_repr(req.kind)
2489 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2490 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2491 :
2492 : Ok(PagestreamBeMessage::GetSlruSegment(
2493 : PagestreamGetSlruSegmentResponse { req: *req, segment },
2494 : ))
2495 : }
2496 :
2497 : // NB: this impl mimics what we do for batched getpage requests.
2498 : #[cfg(feature = "testing")]
2499 : #[instrument(skip_all, fields(shard_id))]
2500 : async fn handle_test_request_batch(
2501 : &mut self,
2502 : timeline: &Timeline,
2503 : requests: Vec<BatchedTestRequest>,
2504 : _ctx: &RequestContext,
2505 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2506 : // real requests would do something with the timeline
2507 : let mut results = Vec::with_capacity(requests.len());
2508 : for _req in requests.iter() {
2509 : tokio::task::yield_now().await;
2510 :
2511 : results.push({
2512 : if timeline.cancel.is_cancelled() {
2513 : Err(PageReconstructError::Cancelled)
2514 : } else {
2515 : Ok(())
2516 : }
2517 : });
2518 : }
2519 :
2520 : // TODO: avoid creating the new Vec here
2521 : Vec::from_iter(
2522 : requests
2523 : .into_iter()
2524 : .zip(results.into_iter())
2525 0 : .map(|(req, res)| {
2526 0 : res.map(|()| {
2527 0 : (
2528 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2529 0 : req: req.req.clone(),
2530 0 : }),
2531 0 : req.timer,
2532 0 : )
2533 0 : })
2534 0 : .map_err(|e| BatchedPageStreamError {
2535 0 : err: PageStreamError::from(e),
2536 0 : req: req.req.hdr,
2537 0 : })
2538 0 : }),
2539 : )
2540 : }
2541 :
2542 : /// Note on "fullbackup":
2543 : /// Full basebackups should only be used for debugging purposes.
2544 : /// Originally, it was introduced to enable breaking storage format changes,
2545 : /// but that is not applicable anymore.
2546 : ///
2547 : /// # Coding Discipline
2548 : ///
2549 : /// Coding discipline within this function: all interaction with the `pgb` connection
2550 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2551 : /// This is so that we can shutdown page_service quickly.
2552 : ///
2553 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2554 : /// to connection cancellation.
2555 : #[allow(clippy::too_many_arguments)]
2556 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2557 : async fn handle_basebackup_request<IO>(
2558 : &mut self,
2559 : pgb: &mut PostgresBackend<IO>,
2560 : tenant_id: TenantId,
2561 : timeline_id: TimelineId,
2562 : lsn: Option<Lsn>,
2563 : prev_lsn: Option<Lsn>,
2564 : full_backup: bool,
2565 : gzip: bool,
2566 : replica: bool,
2567 : ctx: &RequestContext,
2568 : ) -> Result<(), QueryError>
2569 : where
2570 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2571 : {
2572 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2573 0 : match err {
2574 : // TODO: passthrough the error site to the final error message?
2575 0 : BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
2576 0 : BasebackupError::Server(e) => QueryError::Other(e),
2577 0 : BasebackupError::Shutdown => QueryError::Shutdown,
2578 : }
2579 0 : }
2580 :
2581 : let started = std::time::Instant::now();
2582 :
2583 : let timeline = self
2584 : .timeline_handles
2585 : .as_mut()
2586 : .unwrap()
2587 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2588 : .await?;
2589 : set_tracing_field_shard_id(&timeline);
2590 : let ctx = ctx.with_scope_timeline(&timeline);
2591 :
2592 : if timeline.is_archived() == Some(true) {
2593 : tracing::info!(
2594 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2595 : );
2596 : return Err(QueryError::NotFound("timeline is archived".into()));
2597 : }
2598 :
2599 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2600 : if let Some(lsn) = lsn {
2601 : // Backup was requested at a particular LSN. Wait for it to arrive.
2602 : info!("waiting for {}", lsn);
2603 : timeline
2604 : .wait_lsn(
2605 : lsn,
2606 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2607 : crate::tenant::timeline::WaitLsnTimeout::Default,
2608 : &ctx,
2609 : )
2610 : .await?;
2611 : timeline
2612 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2613 : .context("invalid basebackup lsn")?;
2614 : }
2615 :
2616 : let lsn_awaited_after = started.elapsed();
2617 :
2618 : // switch client to COPYOUT
2619 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2620 : .map_err(QueryError::Disconnected)?;
2621 : self.flush_cancellable(pgb, &self.cancel).await?;
2622 :
2623 : let mut from_cache = false;
2624 :
2625 : // Send a tarball of the latest layer on the timeline. Compress if not
2626 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2627 : if full_backup {
2628 : let mut writer = pgb.copyout_writer();
2629 : basebackup::send_basebackup_tarball(
2630 : &mut writer,
2631 : &timeline,
2632 : lsn,
2633 : prev_lsn,
2634 : full_backup,
2635 : replica,
2636 : &ctx,
2637 : )
2638 : .await
2639 : .map_err(map_basebackup_error)?;
2640 : } else {
2641 : let mut writer = BufWriter::new(pgb.copyout_writer());
2642 :
2643 : let cached = {
2644 : // Basebackup is cached only for this combination of parameters.
2645 : if timeline.is_basebackup_cache_enabled()
2646 : && gzip
2647 : && lsn.is_some()
2648 : && prev_lsn.is_none()
2649 : {
2650 : self.basebackup_cache
2651 : .get(tenant_id, timeline_id, lsn.unwrap())
2652 : .await
2653 : } else {
2654 : None
2655 : }
2656 : };
2657 :
2658 : if let Some(mut cached) = cached {
2659 : from_cache = true;
2660 : tokio::io::copy(&mut cached, &mut writer)
2661 : .await
2662 0 : .map_err(|e| {
2663 0 : map_basebackup_error(BasebackupError::Client(
2664 0 : e,
2665 0 : "handle_basebackup_request,cached,copy",
2666 0 : ))
2667 0 : })?;
2668 : } else if gzip {
2669 : let mut encoder = GzipEncoder::with_quality(
2670 : &mut writer,
2671 : // NOTE using fast compression because it's on the critical path
2672 : // for compute startup. For an empty database, we get
2673 : // <100KB with this method. The Level::Best compression method
2674 : // gives us <20KB, but maybe we should add basebackup caching
2675 : // on compute shutdown first.
2676 : async_compression::Level::Fastest,
2677 : );
2678 : basebackup::send_basebackup_tarball(
2679 : &mut encoder,
2680 : &timeline,
2681 : lsn,
2682 : prev_lsn,
2683 : full_backup,
2684 : replica,
2685 : &ctx,
2686 : )
2687 : .await
2688 : .map_err(map_basebackup_error)?;
2689 : // shutdown the encoder to ensure the gzip footer is written
2690 : encoder
2691 : .shutdown()
2692 : .await
2693 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2694 : } else {
2695 : basebackup::send_basebackup_tarball(
2696 : &mut writer,
2697 : &timeline,
2698 : lsn,
2699 : prev_lsn,
2700 : full_backup,
2701 : replica,
2702 : &ctx,
2703 : )
2704 : .await
2705 : .map_err(map_basebackup_error)?;
2706 : }
2707 0 : writer.flush().await.map_err(|e| {
2708 0 : map_basebackup_error(BasebackupError::Client(
2709 0 : e,
2710 0 : "handle_basebackup_request,flush",
2711 0 : ))
2712 0 : })?;
2713 : }
2714 :
2715 : pgb.write_message_noflush(&BeMessage::CopyDone)
2716 : .map_err(QueryError::Disconnected)?;
2717 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2718 :
2719 : let basebackup_after = started
2720 : .elapsed()
2721 : .checked_sub(lsn_awaited_after)
2722 : .unwrap_or(Duration::ZERO);
2723 :
2724 : info!(
2725 : lsn_await_millis = lsn_awaited_after.as_millis(),
2726 : basebackup_millis = basebackup_after.as_millis(),
2727 : %from_cache,
2728 : "basebackup complete"
2729 : );
2730 :
2731 : Ok(())
2732 : }
2733 :
2734 : // when accessing management api supply None as an argument
2735 : // when using to authorize tenant pass corresponding tenant id
2736 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2737 0 : if self.auth.is_none() {
2738 : // auth is set to Trust, nothing to check so just return ok
2739 0 : return Ok(());
2740 0 : }
2741 0 : // auth is some, just checked above, when auth is some
2742 0 : // then claims are always present because of checks during connection init
2743 0 : // so this expect won't trigger
2744 0 : let claims = self
2745 0 : .claims
2746 0 : .as_ref()
2747 0 : .expect("claims presence already checked");
2748 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2749 0 : }
2750 : }
2751 :
2752 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2753 : #[derive(Debug, Clone, Eq, PartialEq)]
2754 : struct BaseBackupCmd {
2755 : tenant_id: TenantId,
2756 : timeline_id: TimelineId,
2757 : lsn: Option<Lsn>,
2758 : gzip: bool,
2759 : replica: bool,
2760 : }
2761 :
2762 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2763 : #[derive(Debug, Clone, Eq, PartialEq)]
2764 : struct FullBackupCmd {
2765 : tenant_id: TenantId,
2766 : timeline_id: TimelineId,
2767 : lsn: Option<Lsn>,
2768 : prev_lsn: Option<Lsn>,
2769 : }
2770 :
2771 : /// `pagestream_v2 tenant timeline`
2772 : #[derive(Debug, Clone, Eq, PartialEq)]
2773 : struct PageStreamCmd {
2774 : tenant_id: TenantId,
2775 : timeline_id: TimelineId,
2776 : protocol_version: PagestreamProtocolVersion,
2777 : }
2778 :
2779 : /// `lease lsn tenant timeline lsn`
2780 : #[derive(Debug, Clone, Eq, PartialEq)]
2781 : struct LeaseLsnCmd {
2782 : tenant_shard_id: TenantShardId,
2783 : timeline_id: TimelineId,
2784 : lsn: Lsn,
2785 : }
2786 :
2787 : #[derive(Debug, Clone, Eq, PartialEq)]
2788 : enum PageServiceCmd {
2789 : Set,
2790 : PageStream(PageStreamCmd),
2791 : BaseBackup(BaseBackupCmd),
2792 : FullBackup(FullBackupCmd),
2793 : LeaseLsn(LeaseLsnCmd),
2794 : }
2795 :
2796 : impl PageStreamCmd {
2797 3 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2798 3 : let parameters = query.split_whitespace().collect_vec();
2799 3 : if parameters.len() != 2 {
2800 1 : bail!(
2801 1 : "invalid number of parameters for pagestream command: {}",
2802 1 : query
2803 1 : );
2804 2 : }
2805 2 : let tenant_id = TenantId::from_str(parameters[0])
2806 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2807 1 : let timeline_id = TimelineId::from_str(parameters[1])
2808 1 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2809 1 : Ok(Self {
2810 1 : tenant_id,
2811 1 : timeline_id,
2812 1 : protocol_version,
2813 1 : })
2814 3 : }
2815 : }
2816 :
2817 : impl FullBackupCmd {
2818 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2819 2 : let parameters = query.split_whitespace().collect_vec();
2820 2 : if parameters.len() < 2 || parameters.len() > 4 {
2821 0 : bail!(
2822 0 : "invalid number of parameters for basebackup command: {}",
2823 0 : query
2824 0 : );
2825 2 : }
2826 2 : let tenant_id = TenantId::from_str(parameters[0])
2827 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2828 2 : let timeline_id = TimelineId::from_str(parameters[1])
2829 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2830 : // The caller is responsible for providing correct lsn and prev_lsn.
2831 2 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2832 : Some(
2833 1 : Lsn::from_str(lsn_str)
2834 1 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2835 : )
2836 : } else {
2837 1 : None
2838 : };
2839 2 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2840 : Some(
2841 1 : Lsn::from_str(prev_lsn_str)
2842 1 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2843 : )
2844 : } else {
2845 1 : None
2846 : };
2847 2 : Ok(Self {
2848 2 : tenant_id,
2849 2 : timeline_id,
2850 2 : lsn,
2851 2 : prev_lsn,
2852 2 : })
2853 2 : }
2854 : }
2855 :
2856 : impl BaseBackupCmd {
2857 9 : fn parse(query: &str) -> anyhow::Result<Self> {
2858 9 : let parameters = query.split_whitespace().collect_vec();
2859 9 : if parameters.len() < 2 {
2860 0 : bail!(
2861 0 : "invalid number of parameters for basebackup command: {}",
2862 0 : query
2863 0 : );
2864 9 : }
2865 9 : let tenant_id = TenantId::from_str(parameters[0])
2866 9 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2867 9 : let timeline_id = TimelineId::from_str(parameters[1])
2868 9 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2869 : let lsn;
2870 : let flags_parse_from;
2871 9 : if let Some(maybe_lsn) = parameters.get(2) {
2872 8 : if *maybe_lsn == "latest" {
2873 1 : lsn = None;
2874 1 : flags_parse_from = 3;
2875 7 : } else if maybe_lsn.starts_with("--") {
2876 5 : lsn = None;
2877 5 : flags_parse_from = 2;
2878 5 : } else {
2879 : lsn = Some(
2880 2 : Lsn::from_str(maybe_lsn)
2881 2 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2882 : );
2883 2 : flags_parse_from = 3;
2884 : }
2885 1 : } else {
2886 1 : lsn = None;
2887 1 : flags_parse_from = 2;
2888 1 : }
2889 :
2890 9 : let mut gzip = false;
2891 9 : let mut replica = false;
2892 :
2893 11 : for ¶m in ¶meters[flags_parse_from..] {
2894 11 : match param {
2895 11 : "--gzip" => {
2896 7 : if gzip {
2897 1 : bail!("duplicate parameter for basebackup command: {param}")
2898 6 : }
2899 6 : gzip = true
2900 : }
2901 4 : "--replica" => {
2902 2 : if replica {
2903 0 : bail!("duplicate parameter for basebackup command: {param}")
2904 2 : }
2905 2 : replica = true
2906 : }
2907 2 : _ => bail!("invalid parameter for basebackup command: {param}"),
2908 : }
2909 : }
2910 6 : Ok(Self {
2911 6 : tenant_id,
2912 6 : timeline_id,
2913 6 : lsn,
2914 6 : gzip,
2915 6 : replica,
2916 6 : })
2917 9 : }
2918 : }
2919 :
2920 : impl LeaseLsnCmd {
2921 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2922 2 : let parameters = query.split_whitespace().collect_vec();
2923 2 : if parameters.len() != 3 {
2924 0 : bail!(
2925 0 : "invalid number of parameters for lease lsn command: {}",
2926 0 : query
2927 0 : );
2928 2 : }
2929 2 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2930 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2931 2 : let timeline_id = TimelineId::from_str(parameters[1])
2932 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2933 2 : let lsn = Lsn::from_str(parameters[2])
2934 2 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2935 2 : Ok(Self {
2936 2 : tenant_shard_id,
2937 2 : timeline_id,
2938 2 : lsn,
2939 2 : })
2940 2 : }
2941 : }
2942 :
2943 : impl PageServiceCmd {
2944 21 : fn parse(query: &str) -> anyhow::Result<Self> {
2945 21 : let query = query.trim();
2946 21 : let Some((cmd, other)) = query.split_once(' ') else {
2947 2 : bail!("cannot parse query: {query}")
2948 : };
2949 19 : match cmd.to_ascii_lowercase().as_str() {
2950 19 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2951 3 : other,
2952 3 : PagestreamProtocolVersion::V2,
2953 3 : )?)),
2954 16 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2955 0 : other,
2956 0 : PagestreamProtocolVersion::V3,
2957 0 : )?)),
2958 16 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2959 7 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2960 5 : "lease" => {
2961 3 : let Some((cmd2, other)) = other.split_once(' ') else {
2962 0 : bail!("invalid lease command: {cmd}");
2963 : };
2964 3 : let cmd2 = cmd2.to_ascii_lowercase();
2965 3 : if cmd2 == "lsn" {
2966 2 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2967 : } else {
2968 1 : bail!("invalid lease command: {cmd}");
2969 : }
2970 : }
2971 2 : "set" => Ok(Self::Set),
2972 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2973 : }
2974 21 : }
2975 : }
2976 :
2977 : /// Parse the startup options from the postgres wire protocol startup packet.
2978 : ///
2979 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
2980 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
2981 : /// whether all options are successfully parsed. There could be duplicates in the options
2982 : /// if the caller passed such parameters.
2983 7 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
2984 7 : let mut parsing_config = false;
2985 7 : let mut has_error = false;
2986 7 : let mut config = Vec::new();
2987 16 : for item in options.split_whitespace() {
2988 16 : if item == "-c" {
2989 9 : if !parsing_config {
2990 8 : parsing_config = true;
2991 8 : } else {
2992 : // "-c" followed with another "-c"
2993 1 : tracing::warn!("failed to parse the startup options: {options}");
2994 1 : has_error = true;
2995 1 : break;
2996 : }
2997 7 : } else if item.starts_with("-c") || parsing_config {
2998 7 : let Some((mut key, value)) = item.split_once('=') else {
2999 : // "-c" followed with an invalid option
3000 1 : tracing::warn!("failed to parse the startup options: {options}");
3001 1 : has_error = true;
3002 1 : break;
3003 : };
3004 6 : if !parsing_config {
3005 : // Parse "-coptions=X"
3006 1 : let Some(stripped_key) = key.strip_prefix("-c") else {
3007 0 : tracing::warn!("failed to parse the startup options: {options}");
3008 0 : has_error = true;
3009 0 : break;
3010 : };
3011 1 : key = stripped_key;
3012 5 : }
3013 6 : config.push((key.to_string(), value.to_string()));
3014 6 : parsing_config = false;
3015 : } else {
3016 0 : tracing::warn!("failed to parse the startup options: {options}");
3017 0 : has_error = true;
3018 0 : break;
3019 : }
3020 : }
3021 7 : if parsing_config {
3022 : // "-c" without the option
3023 3 : tracing::warn!("failed to parse the startup options: {options}");
3024 3 : has_error = true;
3025 4 : }
3026 7 : (config, has_error)
3027 7 : }
3028 :
3029 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
3030 : where
3031 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
3032 : {
3033 0 : fn check_auth_jwt(
3034 0 : &mut self,
3035 0 : _pgb: &mut PostgresBackend<IO>,
3036 0 : jwt_response: &[u8],
3037 0 : ) -> Result<(), QueryError> {
3038 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
3039 : // which requires auth to be present
3040 0 : let data: TokenData<Claims> = self
3041 0 : .auth
3042 0 : .as_ref()
3043 0 : .unwrap()
3044 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
3045 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
3046 :
3047 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
3048 0 : return Err(QueryError::Unauthorized(
3049 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
3050 0 : ));
3051 0 : }
3052 0 :
3053 0 : debug!(
3054 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
3055 : data.claims.scope, data.claims.tenant_id,
3056 : );
3057 :
3058 0 : self.claims = Some(data.claims);
3059 0 : Ok(())
3060 0 : }
3061 :
3062 0 : fn startup(
3063 0 : &mut self,
3064 0 : _pgb: &mut PostgresBackend<IO>,
3065 0 : sm: &FeStartupPacket,
3066 0 : ) -> Result<(), QueryError> {
3067 0 : fail::fail_point!("ps::connection-start::startup-packet");
3068 :
3069 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
3070 0 : if let Some(app_name) = params.get("application_name") {
3071 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
3072 0 : Span::current().record("application_name", field::display(app_name));
3073 0 : }
3074 0 : if let Some(options) = params.get("options") {
3075 0 : let (config, _) = parse_options(options);
3076 0 : for (key, value) in config {
3077 0 : if key == "neon.compute_mode" {
3078 0 : self.perf_span_fields.compute_mode = Some(value.clone());
3079 0 : Span::current().record("compute_mode", field::display(value));
3080 0 : }
3081 : }
3082 0 : }
3083 0 : };
3084 :
3085 0 : Ok(())
3086 0 : }
3087 :
3088 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
3089 : async fn process_query(
3090 : &mut self,
3091 : pgb: &mut PostgresBackend<IO>,
3092 : query_string: &str,
3093 : ) -> Result<(), QueryError> {
3094 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
3095 0 : info!("Hit failpoint for bad connection");
3096 0 : Err(QueryError::SimulatedConnectionError)
3097 0 : });
3098 :
3099 : fail::fail_point!("ps::connection-start::process-query");
3100 :
3101 : let ctx = self.connection_ctx.attached_child();
3102 : debug!("process query {query_string}");
3103 : let query = PageServiceCmd::parse(query_string)?;
3104 : match query {
3105 : PageServiceCmd::PageStream(PageStreamCmd {
3106 : tenant_id,
3107 : timeline_id,
3108 : protocol_version,
3109 : }) => {
3110 : tracing::Span::current()
3111 : .record("tenant_id", field::display(tenant_id))
3112 : .record("timeline_id", field::display(timeline_id));
3113 :
3114 : self.check_permission(Some(tenant_id))?;
3115 : let command_kind = match protocol_version {
3116 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
3117 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
3118 : };
3119 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
3120 :
3121 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
3122 : .await?;
3123 : }
3124 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3125 : tenant_id,
3126 : timeline_id,
3127 : lsn,
3128 : gzip,
3129 : replica,
3130 : }) => {
3131 : tracing::Span::current()
3132 : .record("tenant_id", field::display(tenant_id))
3133 : .record("timeline_id", field::display(timeline_id));
3134 :
3135 : self.check_permission(Some(tenant_id))?;
3136 :
3137 : COMPUTE_COMMANDS_COUNTERS
3138 : .for_command(ComputeCommandKind::Basebackup)
3139 : .inc();
3140 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
3141 0 : let res = async {
3142 0 : self.handle_basebackup_request(
3143 0 : pgb,
3144 0 : tenant_id,
3145 0 : timeline_id,
3146 0 : lsn,
3147 0 : None,
3148 0 : false,
3149 0 : gzip,
3150 0 : replica,
3151 0 : &ctx,
3152 0 : )
3153 0 : .await?;
3154 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3155 0 : Result::<(), QueryError>::Ok(())
3156 0 : }
3157 : .await;
3158 : metric_recording.observe(&res);
3159 : res?;
3160 : }
3161 : // same as basebackup, but result includes relational data as well
3162 : PageServiceCmd::FullBackup(FullBackupCmd {
3163 : tenant_id,
3164 : timeline_id,
3165 : lsn,
3166 : prev_lsn,
3167 : }) => {
3168 : tracing::Span::current()
3169 : .record("tenant_id", field::display(tenant_id))
3170 : .record("timeline_id", field::display(timeline_id));
3171 :
3172 : self.check_permission(Some(tenant_id))?;
3173 :
3174 : COMPUTE_COMMANDS_COUNTERS
3175 : .for_command(ComputeCommandKind::Fullbackup)
3176 : .inc();
3177 :
3178 : // Check that the timeline exists
3179 : self.handle_basebackup_request(
3180 : pgb,
3181 : tenant_id,
3182 : timeline_id,
3183 : lsn,
3184 : prev_lsn,
3185 : true,
3186 : false,
3187 : false,
3188 : &ctx,
3189 : )
3190 : .await?;
3191 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3192 : }
3193 : PageServiceCmd::Set => {
3194 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3195 : // on connect
3196 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3197 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3198 : }
3199 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3200 : tenant_shard_id,
3201 : timeline_id,
3202 : lsn,
3203 : }) => {
3204 : tracing::Span::current()
3205 : .record("tenant_id", field::display(tenant_shard_id))
3206 : .record("timeline_id", field::display(timeline_id));
3207 :
3208 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3209 :
3210 : COMPUTE_COMMANDS_COUNTERS
3211 : .for_command(ComputeCommandKind::LeaseLsn)
3212 : .inc();
3213 :
3214 : match self
3215 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3216 : .await
3217 : {
3218 : Ok(()) => {
3219 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3220 : }
3221 : Err(e) => {
3222 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3223 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3224 : &e.to_string(),
3225 : Some(e.pg_error_code()),
3226 : ))?
3227 : }
3228 : };
3229 : }
3230 : }
3231 :
3232 : Ok(())
3233 : }
3234 : }
3235 :
3236 : /// Implements the page service over gRPC.
3237 : ///
3238 : /// TODO: not yet implemented, all methods return unimplemented.
3239 : #[tonic::async_trait]
3240 : impl proto::PageService for PageServerHandler {
3241 : type GetBaseBackupStream = Pin<
3242 : Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
3243 : >;
3244 : type GetPagesStream =
3245 : Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
3246 :
3247 0 : async fn check_rel_exists(
3248 0 : &self,
3249 0 : _: tonic::Request<proto::CheckRelExistsRequest>,
3250 0 : ) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
3251 0 : Err(tonic::Status::unimplemented("not implemented"))
3252 0 : }
3253 :
3254 0 : async fn get_base_backup(
3255 0 : &self,
3256 0 : _: tonic::Request<proto::GetBaseBackupRequest>,
3257 0 : ) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
3258 0 : Err(tonic::Status::unimplemented("not implemented"))
3259 0 : }
3260 :
3261 0 : async fn get_db_size(
3262 0 : &self,
3263 0 : _: tonic::Request<proto::GetDbSizeRequest>,
3264 0 : ) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
3265 0 : Err(tonic::Status::unimplemented("not implemented"))
3266 0 : }
3267 :
3268 0 : async fn get_pages(
3269 0 : &self,
3270 0 : _: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
3271 0 : ) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
3272 0 : Err(tonic::Status::unimplemented("not implemented"))
3273 0 : }
3274 :
3275 0 : async fn get_rel_size(
3276 0 : &self,
3277 0 : _: tonic::Request<proto::GetRelSizeRequest>,
3278 0 : ) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
3279 0 : Err(tonic::Status::unimplemented("not implemented"))
3280 0 : }
3281 :
3282 0 : async fn get_slru_segment(
3283 0 : &self,
3284 0 : _: tonic::Request<proto::GetSlruSegmentRequest>,
3285 0 : ) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
3286 0 : Err(tonic::Status::unimplemented("not implemented"))
3287 0 : }
3288 : }
3289 :
3290 : impl From<GetActiveTenantError> for QueryError {
3291 0 : fn from(e: GetActiveTenantError) -> Self {
3292 0 : match e {
3293 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
3294 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
3295 0 : ),
3296 : GetActiveTenantError::Cancelled
3297 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
3298 0 : QueryError::Shutdown
3299 : }
3300 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
3301 0 : e => QueryError::Other(anyhow::anyhow!(e)),
3302 : }
3303 0 : }
3304 : }
3305 :
3306 : /// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
3307 : /// TenantTimelineId and ShardIndex.
3308 : ///
3309 : /// TODO: consider looking up the timeline handle here and storing it.
3310 : #[derive(Clone)]
3311 : struct TenantMetadataInterceptor;
3312 :
3313 : impl tonic::service::Interceptor for TenantMetadataInterceptor {
3314 0 : fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
3315 : // Decode the tenant ID.
3316 0 : let tenant_id = req
3317 0 : .metadata()
3318 0 : .get("neon-tenant-id")
3319 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-tenant-id"))?
3320 0 : .to_str()
3321 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
3322 0 : let tenant_id = TenantId::from_str(tenant_id)
3323 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-tenant-id"))?;
3324 :
3325 : // Decode the timeline ID.
3326 0 : let timeline_id = req
3327 0 : .metadata()
3328 0 : .get("neon-timeline-id")
3329 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-timeline-id"))?
3330 0 : .to_str()
3331 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
3332 0 : let timeline_id = TimelineId::from_str(timeline_id)
3333 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?;
3334 :
3335 : // Decode the shard ID.
3336 0 : let shard_index = req
3337 0 : .metadata()
3338 0 : .get("neon-shard-id")
3339 0 : .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))?
3340 0 : .to_str()
3341 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
3342 0 : let shard_index = ShardIndex::from_str(shard_index)
3343 0 : .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?;
3344 :
3345 : // Stash them in the request.
3346 0 : let extensions = req.extensions_mut();
3347 0 : extensions.insert(TenantTimelineId::new(tenant_id, timeline_id));
3348 0 : extensions.insert(shard_index);
3349 0 :
3350 0 : Ok(req)
3351 0 : }
3352 : }
3353 :
3354 : /// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor.
3355 : #[derive(Clone)]
3356 : struct TenantAuthInterceptor {
3357 : auth: Option<Arc<SwappableJwtAuth>>,
3358 : }
3359 :
3360 : impl TenantAuthInterceptor {
3361 0 : fn new(auth: Option<Arc<SwappableJwtAuth>>) -> Self {
3362 0 : Self { auth }
3363 0 : }
3364 : }
3365 :
3366 : impl tonic::service::Interceptor for TenantAuthInterceptor {
3367 0 : fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
3368 : // Do nothing if auth is disabled.
3369 0 : let Some(auth) = self.auth.as_ref() else {
3370 0 : return Ok(req);
3371 : };
3372 :
3373 : // Fetch the tenant ID that's been set by TenantMetadataInterceptor.
3374 0 : let ttid = req
3375 0 : .extensions()
3376 0 : .get::<TenantTimelineId>()
3377 0 : .expect("TenantMetadataInterceptor must run before TenantAuthInterceptor");
3378 :
3379 : // Fetch and decode the JWT token.
3380 0 : let jwt = req
3381 0 : .metadata()
3382 0 : .get("authorization")
3383 0 : .ok_or_else(|| tonic::Status::unauthenticated("no authorization header"))?
3384 0 : .to_str()
3385 0 : .map_err(|_| tonic::Status::invalid_argument("invalid authorization header"))?
3386 0 : .strip_prefix("Bearer ")
3387 0 : .ok_or_else(|| tonic::Status::invalid_argument("invalid authorization header"))?
3388 0 : .trim();
3389 0 : let jwtdata: TokenData<Claims> = auth
3390 0 : .decode(jwt)
3391 0 : .map_err(|err| tonic::Status::invalid_argument(format!("invalid JWT token: {err}")))?;
3392 0 : let claims = jwtdata.claims;
3393 0 :
3394 0 : // Check if the token is valid for this tenant.
3395 0 : check_permission(&claims, Some(ttid.tenant_id))
3396 0 : .map_err(|err| tonic::Status::permission_denied(err.to_string()))?;
3397 :
3398 : // TODO: consider stashing the claims in the request extensions, if needed.
3399 :
3400 0 : Ok(req)
3401 0 : }
3402 : }
3403 :
3404 : #[derive(Debug, thiserror::Error)]
3405 : pub(crate) enum GetActiveTimelineError {
3406 : #[error(transparent)]
3407 : Tenant(GetActiveTenantError),
3408 : #[error(transparent)]
3409 : Timeline(#[from] GetTimelineError),
3410 : }
3411 :
3412 : impl From<GetActiveTimelineError> for QueryError {
3413 0 : fn from(e: GetActiveTimelineError) -> Self {
3414 0 : match e {
3415 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
3416 0 : GetActiveTimelineError::Tenant(e) => e.into(),
3417 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
3418 : }
3419 0 : }
3420 : }
3421 :
3422 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
3423 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
3424 0 : match e {
3425 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
3426 0 : }
3427 0 : }
3428 : }
3429 :
3430 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
3431 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
3432 0 : tracing::Span::current().record(
3433 0 : "shard_id",
3434 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
3435 0 : );
3436 0 : debug_assert_current_span_has_tenant_and_timeline_id();
3437 0 : }
3438 :
3439 : struct WaitedForLsn(Lsn);
3440 : impl From<WaitedForLsn> for Lsn {
3441 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
3442 0 : lsn
3443 0 : }
3444 : }
3445 :
3446 : #[cfg(test)]
3447 : mod tests {
3448 : use utils::shard::ShardCount;
3449 :
3450 : use super::*;
3451 :
3452 : #[test]
3453 1 : fn pageservice_cmd_parse() {
3454 1 : let tenant_id = TenantId::generate();
3455 1 : let timeline_id = TimelineId::generate();
3456 1 : let cmd =
3457 1 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
3458 1 : assert_eq!(
3459 1 : cmd,
3460 1 : PageServiceCmd::PageStream(PageStreamCmd {
3461 1 : tenant_id,
3462 1 : timeline_id,
3463 1 : protocol_version: PagestreamProtocolVersion::V2,
3464 1 : })
3465 1 : );
3466 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
3467 1 : assert_eq!(
3468 1 : cmd,
3469 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3470 1 : tenant_id,
3471 1 : timeline_id,
3472 1 : lsn: None,
3473 1 : gzip: false,
3474 1 : replica: false
3475 1 : })
3476 1 : );
3477 1 : let cmd =
3478 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
3479 1 : assert_eq!(
3480 1 : cmd,
3481 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3482 1 : tenant_id,
3483 1 : timeline_id,
3484 1 : lsn: None,
3485 1 : gzip: true,
3486 1 : replica: false
3487 1 : })
3488 1 : );
3489 1 : let cmd =
3490 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
3491 1 : assert_eq!(
3492 1 : cmd,
3493 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3494 1 : tenant_id,
3495 1 : timeline_id,
3496 1 : lsn: None,
3497 1 : gzip: false,
3498 1 : replica: false
3499 1 : })
3500 1 : );
3501 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
3502 1 : .unwrap();
3503 1 : assert_eq!(
3504 1 : cmd,
3505 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3506 1 : tenant_id,
3507 1 : timeline_id,
3508 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3509 1 : gzip: false,
3510 1 : replica: false
3511 1 : })
3512 1 : );
3513 1 : let cmd = PageServiceCmd::parse(&format!(
3514 1 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
3515 1 : ))
3516 1 : .unwrap();
3517 1 : assert_eq!(
3518 1 : cmd,
3519 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3520 1 : tenant_id,
3521 1 : timeline_id,
3522 1 : lsn: None,
3523 1 : gzip: true,
3524 1 : replica: true
3525 1 : })
3526 1 : );
3527 1 : let cmd = PageServiceCmd::parse(&format!(
3528 1 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
3529 1 : ))
3530 1 : .unwrap();
3531 1 : assert_eq!(
3532 1 : cmd,
3533 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3534 1 : tenant_id,
3535 1 : timeline_id,
3536 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3537 1 : gzip: true,
3538 1 : replica: true
3539 1 : })
3540 1 : );
3541 1 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
3542 1 : assert_eq!(
3543 1 : cmd,
3544 1 : PageServiceCmd::FullBackup(FullBackupCmd {
3545 1 : tenant_id,
3546 1 : timeline_id,
3547 1 : lsn: None,
3548 1 : prev_lsn: None
3549 1 : })
3550 1 : );
3551 1 : let cmd = PageServiceCmd::parse(&format!(
3552 1 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
3553 1 : ))
3554 1 : .unwrap();
3555 1 : assert_eq!(
3556 1 : cmd,
3557 1 : PageServiceCmd::FullBackup(FullBackupCmd {
3558 1 : tenant_id,
3559 1 : timeline_id,
3560 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3561 1 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
3562 1 : })
3563 1 : );
3564 1 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
3565 1 : let cmd = PageServiceCmd::parse(&format!(
3566 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3567 1 : ))
3568 1 : .unwrap();
3569 1 : assert_eq!(
3570 1 : cmd,
3571 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3572 1 : tenant_shard_id,
3573 1 : timeline_id,
3574 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3575 1 : })
3576 1 : );
3577 1 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
3578 1 : let cmd = PageServiceCmd::parse(&format!(
3579 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3580 1 : ))
3581 1 : .unwrap();
3582 1 : assert_eq!(
3583 1 : cmd,
3584 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3585 1 : tenant_shard_id,
3586 1 : timeline_id,
3587 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3588 1 : })
3589 1 : );
3590 1 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
3591 1 : assert_eq!(cmd, PageServiceCmd::Set);
3592 1 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
3593 1 : assert_eq!(cmd, PageServiceCmd::Set);
3594 1 : }
3595 :
3596 : #[test]
3597 1 : fn pageservice_cmd_err_handling() {
3598 1 : let tenant_id = TenantId::generate();
3599 1 : let timeline_id = TimelineId::generate();
3600 1 : let cmd = PageServiceCmd::parse("unknown_command");
3601 1 : assert!(cmd.is_err());
3602 1 : let cmd = PageServiceCmd::parse("pagestream_v2");
3603 1 : assert!(cmd.is_err());
3604 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
3605 1 : assert!(cmd.is_err());
3606 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
3607 1 : assert!(cmd.is_err());
3608 1 : let cmd = PageServiceCmd::parse(&format!(
3609 1 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
3610 1 : ));
3611 1 : assert!(cmd.is_err());
3612 1 : let cmd = PageServiceCmd::parse(&format!(
3613 1 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
3614 1 : ));
3615 1 : assert!(cmd.is_err());
3616 1 : let cmd = PageServiceCmd::parse(&format!(
3617 1 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
3618 1 : ));
3619 1 : assert!(cmd.is_err());
3620 1 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
3621 1 : assert!(cmd.is_err());
3622 1 : }
3623 :
3624 : #[test]
3625 1 : fn test_parse_options() {
3626 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
3627 1 : assert!(!has_error);
3628 1 : assert_eq!(
3629 1 : config,
3630 1 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
3631 1 : );
3632 :
3633 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
3634 1 : assert!(!has_error);
3635 1 : assert_eq!(
3636 1 : config,
3637 1 : vec![
3638 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
3639 1 : ("foo".to_string(), "bar".to_string()),
3640 1 : ]
3641 1 : );
3642 :
3643 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
3644 1 : assert!(!has_error);
3645 1 : assert_eq!(
3646 1 : config,
3647 1 : vec![
3648 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
3649 1 : ("foo".to_string(), "bar".to_string()),
3650 1 : ]
3651 1 : );
3652 :
3653 1 : let (_, has_error) = parse_options("-c");
3654 1 : assert!(has_error);
3655 :
3656 1 : let (_, has_error) = parse_options("-c foo=bar -c -c");
3657 1 : assert!(has_error);
3658 :
3659 1 : let (_, has_error) = parse_options(" ");
3660 1 : assert!(!has_error);
3661 :
3662 1 : let (_, has_error) = parse_options(" -c neon.compute_mode");
3663 1 : assert!(has_error);
3664 1 : }
3665 : }
|