Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::borrow::Cow;
5 : use std::num::NonZeroUsize;
6 : use std::os::fd::AsRawFd;
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use std::time::{Duration, Instant, SystemTime};
10 : use std::{io, str};
11 :
12 : use crate::PERF_TRACE_TARGET;
13 : use anyhow::{Context, bail};
14 : use async_compression::tokio::write::GzipEncoder;
15 : use bytes::Buf;
16 : use futures::FutureExt;
17 : use itertools::Itertools;
18 : use jsonwebtoken::TokenData;
19 : use once_cell::sync::OnceCell;
20 : use pageserver_api::config::{
21 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
22 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
23 : };
24 : use pageserver_api::key::rel_block_to_key;
25 : use pageserver_api::models::{
26 : self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
27 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
28 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
29 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
30 : PagestreamProtocolVersion, PagestreamRequest, TenantState,
31 : };
32 : use pageserver_api::reltag::SlruKind;
33 : use pageserver_api::shard::TenantShardId;
34 : use postgres_backend::{
35 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
36 : };
37 : use postgres_ffi::BLCKSZ;
38 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
39 : use pq_proto::framed::ConnectionError;
40 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
41 : use strum_macros::IntoStaticStr;
42 : use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
43 : use tokio::task::JoinHandle;
44 : use tokio_util::sync::CancellationToken;
45 : use tracing::*;
46 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
47 : use utils::failpoint_support;
48 : use utils::id::{TenantId, TimelineId};
49 : use utils::logging::log_slow;
50 : use utils::lsn::Lsn;
51 : use utils::simple_rcu::RcuReadGuard;
52 : use utils::sync::gate::{Gate, GateGuard};
53 : use utils::sync::spsc_fold;
54 :
55 : use crate::auth::check_permission;
56 : use crate::basebackup::BasebackupError;
57 : use crate::config::PageServerConf;
58 : use crate::context::{
59 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
60 : };
61 : use crate::metrics::{
62 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
63 : SmgrOpTimer, TimelineMetrics,
64 : };
65 : use crate::pgdatadir_mapping::Version;
66 : use crate::span::{
67 : debug_assert_current_span_has_tenant_and_timeline_id,
68 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
69 : };
70 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
71 : use crate::tenant::mgr::{
72 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
73 : };
74 : use crate::tenant::storage_layer::IoConcurrency;
75 : use crate::tenant::timeline::{self, WaitLsnError};
76 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
77 : use crate::{basebackup, timed_after_cancellation};
78 :
79 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
80 : /// is not yet in state [`TenantState::Active`].
81 : ///
82 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
83 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
84 :
85 : /// Threshold at which to log slow GetPage requests.
86 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
87 :
88 : ///////////////////////////////////////////////////////////////////////////////
89 :
90 : pub struct Listener {
91 : cancel: CancellationToken,
92 : /// Cancel the listener task through `listen_cancel` to shut down the listener
93 : /// and get a handle on the existing connections.
94 : task: JoinHandle<Connections>,
95 : }
96 :
97 : pub struct Connections {
98 : cancel: CancellationToken,
99 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
100 : gate: Gate,
101 : }
102 :
103 0 : pub fn spawn(
104 0 : conf: &'static PageServerConf,
105 0 : tenant_manager: Arc<TenantManager>,
106 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
107 0 : perf_trace_dispatch: Option<Dispatch>,
108 0 : tcp_listener: tokio::net::TcpListener,
109 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
110 0 : ) -> Listener {
111 0 : let cancel = CancellationToken::new();
112 0 : let libpq_ctx = RequestContext::todo_child(
113 0 : TaskKind::LibpqEndpointListener,
114 0 : // listener task shouldn't need to download anything. (We will
115 0 : // create a separate sub-contexts for each connection, with their
116 0 : // own download behavior. This context is used only to listen and
117 0 : // accept connections.)
118 0 : DownloadBehavior::Error,
119 0 : );
120 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
121 0 : "libpq listener",
122 0 : libpq_listener_main(
123 0 : conf,
124 0 : tenant_manager,
125 0 : pg_auth,
126 0 : perf_trace_dispatch,
127 0 : tcp_listener,
128 0 : conf.pg_auth_type,
129 0 : tls_config,
130 0 : conf.page_service_pipelining.clone(),
131 0 : libpq_ctx,
132 0 : cancel.clone(),
133 0 : )
134 0 : .map(anyhow::Ok),
135 0 : ));
136 0 :
137 0 : Listener { cancel, task }
138 0 : }
139 :
140 : impl Listener {
141 0 : pub async fn stop_accepting(self) -> Connections {
142 0 : self.cancel.cancel();
143 0 : self.task
144 0 : .await
145 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
146 0 : }
147 : }
148 : impl Connections {
149 0 : pub(crate) async fn shutdown(self) {
150 0 : let Self {
151 0 : cancel,
152 0 : mut tasks,
153 0 : gate,
154 0 : } = self;
155 0 : cancel.cancel();
156 0 : while let Some(res) = tasks.join_next().await {
157 0 : Self::handle_connection_completion(res);
158 0 : }
159 0 : gate.close().await;
160 0 : }
161 :
162 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
163 0 : match res {
164 0 : Ok(Ok(())) => {}
165 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
166 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
167 : }
168 0 : }
169 : }
170 :
171 : ///
172 : /// Main loop of the page service.
173 : ///
174 : /// Listens for connections, and launches a new handler task for each.
175 : ///
176 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
177 : /// open connections.
178 : ///
179 : #[allow(clippy::too_many_arguments)]
180 0 : pub async fn libpq_listener_main(
181 0 : conf: &'static PageServerConf,
182 0 : tenant_manager: Arc<TenantManager>,
183 0 : auth: Option<Arc<SwappableJwtAuth>>,
184 0 : perf_trace_dispatch: Option<Dispatch>,
185 0 : listener: tokio::net::TcpListener,
186 0 : auth_type: AuthType,
187 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
188 0 : pipelining_config: PageServicePipeliningConfig,
189 0 : listener_ctx: RequestContext,
190 0 : listener_cancel: CancellationToken,
191 0 : ) -> Connections {
192 0 : let connections_cancel = CancellationToken::new();
193 0 : let connections_gate = Gate::default();
194 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
195 :
196 : loop {
197 0 : let gate_guard = match connections_gate.enter() {
198 0 : Ok(guard) => guard,
199 0 : Err(_) => break,
200 : };
201 :
202 0 : let accepted = tokio::select! {
203 : biased;
204 0 : _ = listener_cancel.cancelled() => break,
205 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
206 0 : let res = next.expect("we dont poll while empty");
207 0 : Connections::handle_connection_completion(res);
208 0 : continue;
209 : }
210 0 : accepted = listener.accept() => accepted,
211 0 : };
212 0 :
213 0 : match accepted {
214 0 : Ok((socket, peer_addr)) => {
215 0 : // Connection established. Spawn a new task to handle it.
216 0 : debug!("accepted connection from {}", peer_addr);
217 0 : let local_auth = auth.clone();
218 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
219 0 : .task_kind(TaskKind::PageRequestHandler)
220 0 : .download_behavior(DownloadBehavior::Download)
221 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
222 0 : .detached_child();
223 0 :
224 0 : connection_handler_tasks.spawn(page_service_conn_main(
225 0 : conf,
226 0 : tenant_manager.clone(),
227 0 : local_auth,
228 0 : socket,
229 0 : auth_type,
230 0 : tls_config.clone(),
231 0 : pipelining_config.clone(),
232 0 : connection_ctx,
233 0 : connections_cancel.child_token(),
234 0 : gate_guard,
235 0 : ));
236 : }
237 0 : Err(err) => {
238 0 : // accept() failed. Log the error, and loop back to retry on next connection.
239 0 : error!("accept() failed: {:?}", err);
240 : }
241 : }
242 : }
243 :
244 0 : debug!("page_service listener loop terminated");
245 :
246 0 : Connections {
247 0 : cancel: connections_cancel,
248 0 : tasks: connection_handler_tasks,
249 0 : gate: connections_gate,
250 0 : }
251 0 : }
252 :
253 : type ConnectionHandlerResult = anyhow::Result<()>;
254 :
255 : /// Perf root spans start at the per-request level, after shard routing.
256 : /// This struct carries connection-level information to the root perf span definition.
257 : #[derive(Clone)]
258 : struct ConnectionPerfSpanFields {
259 : peer_addr: String,
260 : application_name: Option<String>,
261 : compute_mode: Option<String>,
262 : }
263 :
264 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
265 : #[allow(clippy::too_many_arguments)]
266 : async fn page_service_conn_main(
267 : conf: &'static PageServerConf,
268 : tenant_manager: Arc<TenantManager>,
269 : auth: Option<Arc<SwappableJwtAuth>>,
270 : socket: tokio::net::TcpStream,
271 : auth_type: AuthType,
272 : tls_config: Option<Arc<rustls::ServerConfig>>,
273 : pipelining_config: PageServicePipeliningConfig,
274 : connection_ctx: RequestContext,
275 : cancel: CancellationToken,
276 : gate_guard: GateGuard,
277 : ) -> ConnectionHandlerResult {
278 : let _guard = LIVE_CONNECTIONS
279 : .with_label_values(&["page_service"])
280 : .guard();
281 :
282 : socket
283 : .set_nodelay(true)
284 : .context("could not set TCP_NODELAY")?;
285 :
286 : let socket_fd = socket.as_raw_fd();
287 :
288 : let peer_addr = socket.peer_addr().context("get peer address")?;
289 :
290 : let perf_span_fields = ConnectionPerfSpanFields {
291 : peer_addr: peer_addr.to_string(),
292 : application_name: None, // filled in later
293 : compute_mode: None, // filled in later
294 : };
295 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
296 :
297 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
298 : // - long enough for most valid compute connections
299 : // - less than infinite to stop us from "leaking" connections to long-gone computes
300 : //
301 : // no write timeout is used, because the kernel is assumed to error writes after some time.
302 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
303 :
304 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
305 0 : let socket_timeout_ms = (|| {
306 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
307 : // Exponential distribution for simulating
308 : // poor network conditions, expect about avg_timeout_ms to be around 15
309 : // in tests
310 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
311 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
312 0 : let u = rand::random::<f32>();
313 0 : ((1.0 - u).ln() / (-avg)) as u64
314 : } else {
315 0 : default_timeout_ms
316 : }
317 0 : });
318 0 : default_timeout_ms
319 : })();
320 :
321 : // A timeout here does not mean the client died, it can happen if it's just idle for
322 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
323 : // they reconnect.
324 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
325 : let socket = Box::pin(socket);
326 :
327 : fail::fail_point!("ps::connection-start::pre-login");
328 :
329 : // XXX: pgbackend.run() should take the connection_ctx,
330 : // and create a child per-query context when it invokes process_query.
331 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
332 : // and create the per-query context in process_query ourselves.
333 : let mut conn_handler = PageServerHandler::new(
334 : conf,
335 : tenant_manager,
336 : auth,
337 : pipelining_config,
338 : perf_span_fields,
339 : connection_ctx,
340 : cancel.clone(),
341 : gate_guard,
342 : );
343 : let pgbackend =
344 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
345 :
346 : match pgbackend.run(&mut conn_handler, &cancel).await {
347 : Ok(()) => {
348 : // we've been requested to shut down
349 : Ok(())
350 : }
351 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
352 : if is_expected_io_error(&io_error) {
353 : info!("Postgres client disconnected ({io_error})");
354 : Ok(())
355 : } else {
356 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
357 : Err(io_error).context(format!(
358 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
359 : tenant_id, peer_addr
360 : ))
361 : }
362 : }
363 : other => {
364 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
365 : other.context(format!(
366 : "Postgres query error for tenant_id={:?} client peer_addr={}",
367 : tenant_id, peer_addr
368 : ))
369 : }
370 : }
371 : }
372 :
373 : struct PageServerHandler {
374 : conf: &'static PageServerConf,
375 : auth: Option<Arc<SwappableJwtAuth>>,
376 : claims: Option<Claims>,
377 :
378 : /// The context created for the lifetime of the connection
379 : /// services by this PageServerHandler.
380 : /// For each query received over the connection,
381 : /// `process_query` creates a child context from this one.
382 : connection_ctx: RequestContext,
383 :
384 : perf_span_fields: ConnectionPerfSpanFields,
385 :
386 : cancel: CancellationToken,
387 :
388 : /// None only while pagestream protocol is being processed.
389 : timeline_handles: Option<TimelineHandles>,
390 :
391 : pipelining_config: PageServicePipeliningConfig,
392 :
393 : gate_guard: GateGuard,
394 : }
395 :
396 : struct TimelineHandles {
397 : wrapper: TenantManagerWrapper,
398 : /// Note on size: the typical size of this map is 1. The largest size we expect
399 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
400 : /// or the ratio used when splitting shards (i.e. how many children created from one)
401 : /// parent shard, where a "large" number might be ~8.
402 : handles: timeline::handle::Cache<TenantManagerTypes>,
403 : }
404 :
405 : impl TimelineHandles {
406 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
407 0 : Self {
408 0 : wrapper: TenantManagerWrapper {
409 0 : tenant_manager,
410 0 : tenant_id: OnceCell::new(),
411 0 : },
412 0 : handles: Default::default(),
413 0 : }
414 0 : }
415 0 : async fn get(
416 0 : &mut self,
417 0 : tenant_id: TenantId,
418 0 : timeline_id: TimelineId,
419 0 : shard_selector: ShardSelector,
420 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
421 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
422 0 : return Err(GetActiveTimelineError::Tenant(
423 0 : GetActiveTenantError::SwitchedTenant,
424 0 : ));
425 0 : }
426 0 : self.handles
427 0 : .get(timeline_id, shard_selector, &self.wrapper)
428 0 : .await
429 0 : .map_err(|e| match e {
430 0 : timeline::handle::GetError::TenantManager(e) => e,
431 : timeline::handle::GetError::PerTimelineStateShutDown => {
432 0 : trace!("per-timeline state shut down");
433 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
434 : }
435 0 : })
436 0 : }
437 :
438 0 : fn tenant_id(&self) -> Option<TenantId> {
439 0 : self.wrapper.tenant_id.get().copied()
440 0 : }
441 : }
442 :
443 : pub(crate) struct TenantManagerWrapper {
444 : tenant_manager: Arc<TenantManager>,
445 : // We do not support switching tenant_id on a connection at this point.
446 : // We can can add support for this later if needed without changing
447 : // the protocol.
448 : tenant_id: once_cell::sync::OnceCell<TenantId>,
449 : }
450 :
451 : #[derive(Debug)]
452 : pub(crate) struct TenantManagerTypes;
453 :
454 : impl timeline::handle::Types for TenantManagerTypes {
455 : type TenantManagerError = GetActiveTimelineError;
456 : type TenantManager = TenantManagerWrapper;
457 : type Timeline = TenantManagerCacheItem;
458 : }
459 :
460 : pub(crate) struct TenantManagerCacheItem {
461 : pub(crate) timeline: Arc<Timeline>,
462 : // allow() for cheap propagation through RequestContext inside a task
463 : #[allow(clippy::redundant_allocation)]
464 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
465 : #[allow(dead_code)] // we store it to keep the gate open
466 : pub(crate) gate_guard: GateGuard,
467 : }
468 :
469 : impl std::ops::Deref for TenantManagerCacheItem {
470 : type Target = Arc<Timeline>;
471 0 : fn deref(&self) -> &Self::Target {
472 0 : &self.timeline
473 0 : }
474 : }
475 :
476 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
477 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
478 0 : Timeline::shard_timeline_id(&self.timeline)
479 0 : }
480 :
481 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
482 0 : &self.timeline.handles
483 0 : }
484 :
485 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
486 0 : Timeline::get_shard_identity(&self.timeline)
487 0 : }
488 : }
489 :
490 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
491 0 : async fn resolve(
492 0 : &self,
493 0 : timeline_id: TimelineId,
494 0 : shard_selector: ShardSelector,
495 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
496 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
497 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
498 0 : let wait_start = Instant::now();
499 0 : let deadline = wait_start + timeout;
500 0 : let tenant_shard = loop {
501 0 : let resolved = self
502 0 : .tenant_manager
503 0 : .resolve_attached_shard(tenant_id, shard_selector);
504 0 : match resolved {
505 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
506 : ShardResolveResult::NotFound => {
507 0 : return Err(GetActiveTimelineError::Tenant(
508 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
509 0 : ));
510 : }
511 0 : ShardResolveResult::InProgress(barrier) => {
512 0 : // We can't authoritatively answer right now: wait for InProgress state
513 0 : // to end, then try again
514 0 : tokio::select! {
515 0 : _ = barrier.wait() => {
516 0 : // The barrier completed: proceed around the loop to try looking up again
517 0 : },
518 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
519 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
520 0 : latest_state: None,
521 0 : wait_time: timeout,
522 0 : }));
523 : }
524 : }
525 : }
526 : };
527 : };
528 :
529 0 : tracing::debug!("Waiting for tenant to enter active state...");
530 0 : tenant_shard
531 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
532 0 : .await
533 0 : .map_err(GetActiveTimelineError::Tenant)?;
534 :
535 0 : let timeline = tenant_shard
536 0 : .get_timeline(timeline_id, true)
537 0 : .map_err(GetActiveTimelineError::Timeline)?;
538 :
539 0 : let gate_guard = match timeline.gate.enter() {
540 0 : Ok(guard) => guard,
541 : Err(_) => {
542 0 : return Err(GetActiveTimelineError::Timeline(
543 0 : GetTimelineError::ShuttingDown,
544 0 : ));
545 : }
546 : };
547 :
548 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
549 0 :
550 0 : Ok(TenantManagerCacheItem {
551 0 : timeline,
552 0 : metrics,
553 0 : gate_guard,
554 0 : })
555 0 : }
556 : }
557 :
558 : #[derive(thiserror::Error, Debug)]
559 : enum PageStreamError {
560 : /// We encountered an error that should prompt the client to reconnect:
561 : /// in practice this means we drop the connection without sending a response.
562 : #[error("Reconnect required: {0}")]
563 : Reconnect(Cow<'static, str>),
564 :
565 : /// We were instructed to shutdown while processing the query
566 : #[error("Shutting down")]
567 : Shutdown,
568 :
569 : /// Something went wrong reading a page: this likely indicates a pageserver bug
570 : #[error("Read error")]
571 : Read(#[source] PageReconstructError),
572 :
573 : /// Ran out of time waiting for an LSN
574 : #[error("LSN timeout: {0}")]
575 : LsnTimeout(WaitLsnError),
576 :
577 : /// The entity required to serve the request (tenant or timeline) is not found,
578 : /// or is not found in a suitable state to serve a request.
579 : #[error("Not found: {0}")]
580 : NotFound(Cow<'static, str>),
581 :
582 : /// Request asked for something that doesn't make sense, like an invalid LSN
583 : #[error("Bad request: {0}")]
584 : BadRequest(Cow<'static, str>),
585 : }
586 :
587 : impl From<PageReconstructError> for PageStreamError {
588 0 : fn from(value: PageReconstructError) -> Self {
589 0 : match value {
590 0 : PageReconstructError::Cancelled => Self::Shutdown,
591 0 : e => Self::Read(e),
592 : }
593 0 : }
594 : }
595 :
596 : impl From<GetActiveTimelineError> for PageStreamError {
597 0 : fn from(value: GetActiveTimelineError) -> Self {
598 0 : match value {
599 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
600 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
601 : TenantState::Stopping { .. },
602 : ))
603 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
604 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
605 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
606 : }
607 0 : }
608 : }
609 :
610 : impl From<WaitLsnError> for PageStreamError {
611 0 : fn from(value: WaitLsnError) -> Self {
612 0 : match value {
613 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
614 0 : WaitLsnError::Shutdown => Self::Shutdown,
615 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
616 : }
617 0 : }
618 : }
619 :
620 : impl From<WaitLsnError> for QueryError {
621 0 : fn from(value: WaitLsnError) -> Self {
622 0 : match value {
623 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
624 0 : WaitLsnError::Shutdown => Self::Shutdown,
625 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
626 : }
627 0 : }
628 : }
629 :
630 : #[derive(thiserror::Error, Debug)]
631 : struct BatchedPageStreamError {
632 : req: PagestreamRequest,
633 : err: PageStreamError,
634 : }
635 :
636 : impl std::fmt::Display for BatchedPageStreamError {
637 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638 0 : self.err.fmt(f)
639 0 : }
640 : }
641 :
642 : struct BatchedGetPageRequest {
643 : req: PagestreamGetPageRequest,
644 : timer: SmgrOpTimer,
645 : effective_request_lsn: Lsn,
646 : ctx: RequestContext,
647 : }
648 :
649 : #[cfg(feature = "testing")]
650 : struct BatchedTestRequest {
651 : req: models::PagestreamTestRequest,
652 : timer: SmgrOpTimer,
653 : }
654 :
655 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
656 : /// so that we don't keep the [`Timeline::gate`] open while the batch
657 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
658 : #[derive(IntoStaticStr)]
659 : enum BatchedFeMessage {
660 : Exists {
661 : span: Span,
662 : timer: SmgrOpTimer,
663 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
664 : req: models::PagestreamExistsRequest,
665 : },
666 : Nblocks {
667 : span: Span,
668 : timer: SmgrOpTimer,
669 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
670 : req: models::PagestreamNblocksRequest,
671 : },
672 : GetPage {
673 : span: Span,
674 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
675 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
676 : batch_break_reason: GetPageBatchBreakReason,
677 : },
678 : DbSize {
679 : span: Span,
680 : timer: SmgrOpTimer,
681 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
682 : req: models::PagestreamDbSizeRequest,
683 : },
684 : GetSlruSegment {
685 : span: Span,
686 : timer: SmgrOpTimer,
687 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
688 : req: models::PagestreamGetSlruSegmentRequest,
689 : },
690 : #[cfg(feature = "testing")]
691 : Test {
692 : span: Span,
693 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
694 : requests: Vec<BatchedTestRequest>,
695 : },
696 : RespondError {
697 : span: Span,
698 : error: BatchedPageStreamError,
699 : },
700 : }
701 :
702 : impl BatchedFeMessage {
703 0 : fn as_static_str(&self) -> &'static str {
704 0 : self.into()
705 0 : }
706 :
707 0 : fn observe_execution_start(&mut self, at: Instant) {
708 0 : match self {
709 0 : BatchedFeMessage::Exists { timer, .. }
710 0 : | BatchedFeMessage::Nblocks { timer, .. }
711 0 : | BatchedFeMessage::DbSize { timer, .. }
712 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
713 0 : timer.observe_execution_start(at);
714 0 : }
715 0 : BatchedFeMessage::GetPage { pages, .. } => {
716 0 : for page in pages {
717 0 : page.timer.observe_execution_start(at);
718 0 : }
719 : }
720 : #[cfg(feature = "testing")]
721 0 : BatchedFeMessage::Test { requests, .. } => {
722 0 : for req in requests {
723 0 : req.timer.observe_execution_start(at);
724 0 : }
725 : }
726 0 : BatchedFeMessage::RespondError { .. } => {}
727 : }
728 0 : }
729 :
730 0 : fn should_break_batch(
731 0 : &self,
732 0 : other: &BatchedFeMessage,
733 0 : max_batch_size: NonZeroUsize,
734 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
735 0 : ) -> Option<GetPageBatchBreakReason> {
736 0 : match (self, other) {
737 : (
738 : BatchedFeMessage::GetPage {
739 0 : shard: accum_shard,
740 0 : pages: accum_pages,
741 0 : ..
742 0 : },
743 0 : BatchedFeMessage::GetPage {
744 0 : shard: this_shard,
745 0 : pages: this_pages,
746 0 : ..
747 0 : },
748 0 : ) => {
749 0 : assert_eq!(this_pages.len(), 1);
750 0 : if accum_pages.len() >= max_batch_size.get() {
751 0 : trace!(%max_batch_size, "stopping batching because of batch size");
752 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
753 :
754 0 : return Some(GetPageBatchBreakReason::BatchFull);
755 0 : }
756 0 : if !accum_shard.is_same_handle_as(this_shard) {
757 0 : trace!("stopping batching because timeline object mismatch");
758 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
759 : // But the current logic for keeping responses in order does not support that.
760 :
761 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
762 0 : }
763 0 :
764 0 : match batching_strategy {
765 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
766 0 : if let Some(last_in_batch) = accum_pages.last() {
767 0 : if last_in_batch.effective_request_lsn
768 0 : != this_pages[0].effective_request_lsn
769 : {
770 0 : trace!(
771 : accum_lsn = %last_in_batch.effective_request_lsn,
772 0 : this_lsn = %this_pages[0].effective_request_lsn,
773 0 : "stopping batching because LSN changed"
774 : );
775 :
776 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
777 0 : }
778 0 : }
779 : }
780 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
781 : // The read path doesn't curently support serving the same page at different LSNs.
782 : // While technically possible, it's uncertain if the complexity is worth it.
783 : // Break the batch if such a case is encountered.
784 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
785 0 : batched.req.rel == this_pages[0].req.rel
786 0 : && batched.req.blkno == this_pages[0].req.blkno
787 0 : && batched.effective_request_lsn
788 0 : != this_pages[0].effective_request_lsn
789 0 : });
790 0 :
791 0 : if same_page_different_lsn {
792 0 : trace!(
793 0 : rel=%this_pages[0].req.rel,
794 0 : blkno=%this_pages[0].req.blkno,
795 0 : lsn=%this_pages[0].effective_request_lsn,
796 0 : "stopping batching because same page was requested at different LSNs"
797 : );
798 :
799 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
800 0 : }
801 : }
802 : }
803 :
804 0 : None
805 : }
806 : #[cfg(feature = "testing")]
807 : (
808 : BatchedFeMessage::Test {
809 0 : shard: accum_shard,
810 0 : requests: accum_requests,
811 0 : ..
812 0 : },
813 0 : BatchedFeMessage::Test {
814 0 : shard: this_shard,
815 0 : requests: this_requests,
816 0 : ..
817 0 : },
818 0 : ) => {
819 0 : assert!(this_requests.len() == 1);
820 0 : if accum_requests.len() >= max_batch_size.get() {
821 0 : trace!(%max_batch_size, "stopping batching because of batch size");
822 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
823 0 : return Some(GetPageBatchBreakReason::BatchFull);
824 0 : }
825 0 : if !accum_shard.is_same_handle_as(this_shard) {
826 0 : trace!("stopping batching because timeline object mismatch");
827 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
828 : // But the current logic for keeping responses in order does not support that.
829 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
830 0 : }
831 0 : let this_batch_key = this_requests[0].req.batch_key;
832 0 : let accum_batch_key = accum_requests[0].req.batch_key;
833 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
834 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
835 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
836 0 : }
837 0 : None
838 : }
839 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
840 : }
841 0 : }
842 : }
843 :
844 : impl PageServerHandler {
845 : #[allow(clippy::too_many_arguments)]
846 0 : pub fn new(
847 0 : conf: &'static PageServerConf,
848 0 : tenant_manager: Arc<TenantManager>,
849 0 : auth: Option<Arc<SwappableJwtAuth>>,
850 0 : pipelining_config: PageServicePipeliningConfig,
851 0 : perf_span_fields: ConnectionPerfSpanFields,
852 0 : connection_ctx: RequestContext,
853 0 : cancel: CancellationToken,
854 0 : gate_guard: GateGuard,
855 0 : ) -> Self {
856 0 : PageServerHandler {
857 0 : conf,
858 0 : auth,
859 0 : claims: None,
860 0 : connection_ctx,
861 0 : perf_span_fields,
862 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
863 0 : cancel,
864 0 : pipelining_config,
865 0 : gate_guard,
866 0 : }
867 0 : }
868 :
869 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
870 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
871 : /// cancellation if there aren't any timelines in the cache.
872 : ///
873 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
874 : /// timeline cancellation token.
875 0 : async fn flush_cancellable<IO>(
876 0 : &self,
877 0 : pgb: &mut PostgresBackend<IO>,
878 0 : cancel: &CancellationToken,
879 0 : ) -> Result<(), QueryError>
880 0 : where
881 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
882 0 : {
883 0 : tokio::select!(
884 0 : flush_r = pgb.flush() => {
885 0 : Ok(flush_r?)
886 : },
887 0 : _ = cancel.cancelled() => {
888 0 : Err(QueryError::Shutdown)
889 : }
890 : )
891 0 : }
892 :
893 : #[allow(clippy::too_many_arguments)]
894 0 : async fn pagestream_read_message<IO>(
895 0 : pgb: &mut PostgresBackendReader<IO>,
896 0 : tenant_id: TenantId,
897 0 : timeline_id: TimelineId,
898 0 : timeline_handles: &mut TimelineHandles,
899 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
900 0 : cancel: &CancellationToken,
901 0 : ctx: &RequestContext,
902 0 : protocol_version: PagestreamProtocolVersion,
903 0 : parent_span: Span,
904 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
905 0 : where
906 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
907 0 : {
908 0 : let msg = tokio::select! {
909 : biased;
910 0 : _ = cancel.cancelled() => {
911 0 : return Err(QueryError::Shutdown)
912 : }
913 0 : msg = pgb.read_message() => { msg }
914 0 : };
915 0 :
916 0 : let received_at = Instant::now();
917 :
918 0 : let copy_data_bytes = match msg? {
919 0 : Some(FeMessage::CopyData(bytes)) => bytes,
920 : Some(FeMessage::Terminate) => {
921 0 : return Ok(None);
922 : }
923 0 : Some(m) => {
924 0 : return Err(QueryError::Other(anyhow::anyhow!(
925 0 : "unexpected message: {m:?} during COPY"
926 0 : )));
927 : }
928 : None => {
929 0 : return Ok(None);
930 : } // client disconnected
931 : };
932 0 : trace!("query: {copy_data_bytes:?}");
933 :
934 0 : fail::fail_point!("ps::handle-pagerequest-message");
935 :
936 : // parse request
937 0 : let neon_fe_msg =
938 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
939 :
940 : // TODO: turn in to async closure once available to avoid repeating received_at
941 0 : async fn record_op_start_and_throttle(
942 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
943 0 : op: metrics::SmgrQueryType,
944 0 : received_at: Instant,
945 0 : ) -> Result<SmgrOpTimer, QueryError> {
946 0 : // It's important to start the smgr op metric recorder as early as possible
947 0 : // so that the _started counters are incremented before we do
948 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
949 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
950 0 : let now = Instant::now();
951 0 : timer.observe_throttle_start(now);
952 0 : let throttled = tokio::select! {
953 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
954 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
955 : };
956 0 : timer.observe_throttle_done(throttled);
957 0 : Ok(timer)
958 0 : }
959 :
960 0 : let batched_msg = match neon_fe_msg {
961 0 : PagestreamFeMessage::Exists(req) => {
962 0 : let shard = timeline_handles
963 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
964 0 : .await?;
965 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
966 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
967 0 : let timer = record_op_start_and_throttle(
968 0 : &shard,
969 0 : metrics::SmgrQueryType::GetRelExists,
970 0 : received_at,
971 0 : )
972 0 : .await?;
973 0 : BatchedFeMessage::Exists {
974 0 : span,
975 0 : timer,
976 0 : shard: shard.downgrade(),
977 0 : req,
978 0 : }
979 : }
980 0 : PagestreamFeMessage::Nblocks(req) => {
981 0 : let shard = timeline_handles
982 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
983 0 : .await?;
984 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
985 0 : let timer = record_op_start_and_throttle(
986 0 : &shard,
987 0 : metrics::SmgrQueryType::GetRelSize,
988 0 : received_at,
989 0 : )
990 0 : .await?;
991 0 : BatchedFeMessage::Nblocks {
992 0 : span,
993 0 : timer,
994 0 : shard: shard.downgrade(),
995 0 : req,
996 0 : }
997 : }
998 0 : PagestreamFeMessage::DbSize(req) => {
999 0 : let shard = timeline_handles
1000 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1001 0 : .await?;
1002 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1003 0 : let timer = record_op_start_and_throttle(
1004 0 : &shard,
1005 0 : metrics::SmgrQueryType::GetDbSize,
1006 0 : received_at,
1007 0 : )
1008 0 : .await?;
1009 0 : BatchedFeMessage::DbSize {
1010 0 : span,
1011 0 : timer,
1012 0 : shard: shard.downgrade(),
1013 0 : req,
1014 0 : }
1015 : }
1016 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1017 0 : let shard = timeline_handles
1018 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1019 0 : .await?;
1020 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1021 0 : let timer = record_op_start_and_throttle(
1022 0 : &shard,
1023 0 : metrics::SmgrQueryType::GetSlruSegment,
1024 0 : received_at,
1025 0 : )
1026 0 : .await?;
1027 0 : BatchedFeMessage::GetSlruSegment {
1028 0 : span,
1029 0 : timer,
1030 0 : shard: shard.downgrade(),
1031 0 : req,
1032 0 : }
1033 : }
1034 0 : PagestreamFeMessage::GetPage(req) => {
1035 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1036 : macro_rules! mkspan {
1037 : (before shard routing) => {{
1038 : tracing::info_span!(
1039 : parent: &parent_span,
1040 : "handle_get_page_request",
1041 : rel = %req.rel,
1042 : blkno = %req.blkno,
1043 : req_lsn = %req.hdr.request_lsn,
1044 : not_modified_since_lsn = %req.hdr.not_modified_since
1045 : )
1046 : }};
1047 : ($shard_id:expr) => {{
1048 : tracing::info_span!(
1049 : parent: &parent_span,
1050 : "handle_get_page_request",
1051 : rel = %req.rel,
1052 : blkno = %req.blkno,
1053 : req_lsn = %req.hdr.request_lsn,
1054 : not_modified_since_lsn = %req.hdr.not_modified_since,
1055 : shard_id = %$shard_id
1056 : )
1057 : }};
1058 : }
1059 :
1060 : macro_rules! respond_error {
1061 : ($span:expr, $error:expr) => {{
1062 : let error = BatchedFeMessage::RespondError {
1063 : span: $span,
1064 : error: BatchedPageStreamError {
1065 : req: req.hdr,
1066 : err: $error,
1067 : },
1068 : };
1069 : Ok(Some(error))
1070 : }};
1071 : }
1072 :
1073 0 : let key = rel_block_to_key(req.rel, req.blkno);
1074 :
1075 0 : let res = timeline_handles
1076 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1077 0 : .await;
1078 :
1079 0 : let shard = match res {
1080 0 : Ok(tl) => tl,
1081 0 : Err(e) => {
1082 0 : let span = mkspan!(before shard routing);
1083 0 : match e {
1084 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1085 : // We already know this tenant exists in general, because we resolved it at
1086 : // start of connection. Getting a NotFound here indicates that the shard containing
1087 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1088 : // mapping is out of date.
1089 : //
1090 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1091 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1092 : // and talk to a different pageserver.
1093 0 : return respond_error!(
1094 0 : span,
1095 0 : PageStreamError::Reconnect(
1096 0 : "getpage@lsn request routed to wrong shard".into()
1097 0 : )
1098 0 : );
1099 : }
1100 0 : e => {
1101 0 : return respond_error!(span, e.into());
1102 : }
1103 : }
1104 : }
1105 : };
1106 :
1107 0 : let ctx = if shard.is_get_page_request_sampled() {
1108 0 : RequestContextBuilder::from(ctx)
1109 0 : .root_perf_span(|| {
1110 0 : info_span!(
1111 : target: PERF_TRACE_TARGET,
1112 : "GET_PAGE",
1113 : peer_addr = conn_perf_span_fields.peer_addr,
1114 : application_name = conn_perf_span_fields.application_name,
1115 : compute_mode = conn_perf_span_fields.compute_mode,
1116 : tenant_id = %tenant_id,
1117 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1118 : timeline_id = %timeline_id,
1119 : lsn = %req.hdr.request_lsn,
1120 : not_modified_since_lsn = %req.hdr.not_modified_since,
1121 : request_id = %req.hdr.reqid,
1122 : key = %key,
1123 : )
1124 0 : })
1125 0 : .attached_child()
1126 : } else {
1127 0 : ctx.attached_child()
1128 : };
1129 :
1130 : // This ctx travels as part of the BatchedFeMessage through
1131 : // batching into the request handler.
1132 : // The request handler needs to do some per-request work
1133 : // (relsize check) before dispatching the batch as a single
1134 : // get_vectored call to the Timeline.
1135 : // This ctx will be used for the reslize check, whereas the
1136 : // get_vectored call will be a different ctx with separate
1137 : // perf span.
1138 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1139 :
1140 : // Similar game for this `span`: we funnel it through so that
1141 : // request handler log messages contain the request-specific fields.
1142 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1143 :
1144 0 : let timer = record_op_start_and_throttle(
1145 0 : &shard,
1146 0 : metrics::SmgrQueryType::GetPageAtLsn,
1147 0 : received_at,
1148 0 : )
1149 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1150 0 : info_span!(
1151 : target: PERF_TRACE_TARGET,
1152 0 : parent: current_perf_span,
1153 : "THROTTLE",
1154 : )
1155 0 : })
1156 0 : .await?;
1157 :
1158 : // We're holding the Handle
1159 0 : let effective_request_lsn = match Self::effective_request_lsn(
1160 0 : &shard,
1161 0 : shard.get_last_record_lsn(),
1162 0 : req.hdr.request_lsn,
1163 0 : req.hdr.not_modified_since,
1164 0 : &shard.get_applied_gc_cutoff_lsn(),
1165 0 : ) {
1166 0 : Ok(lsn) => lsn,
1167 0 : Err(e) => {
1168 0 : return respond_error!(span, e);
1169 : }
1170 : };
1171 :
1172 : BatchedFeMessage::GetPage {
1173 0 : span,
1174 0 : shard: shard.downgrade(),
1175 0 : pages: smallvec::smallvec![BatchedGetPageRequest {
1176 0 : req,
1177 0 : timer,
1178 0 : effective_request_lsn,
1179 0 : ctx,
1180 0 : }],
1181 : // The executor grabs the batch when it becomes idle.
1182 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1183 : // default reason for breaking the batch.
1184 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1185 : }
1186 : }
1187 : #[cfg(feature = "testing")]
1188 0 : PagestreamFeMessage::Test(req) => {
1189 0 : let shard = timeline_handles
1190 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1191 0 : .await?;
1192 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1193 0 : let timer =
1194 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
1195 0 : .await?;
1196 0 : BatchedFeMessage::Test {
1197 0 : span,
1198 0 : shard: shard.downgrade(),
1199 0 : requests: vec![BatchedTestRequest { req, timer }],
1200 0 : }
1201 : }
1202 : };
1203 0 : Ok(Some(batched_msg))
1204 0 : }
1205 :
1206 : /// Post-condition: `batch` is Some()
1207 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1208 : #[allow(clippy::boxed_local)]
1209 : fn pagestream_do_batch(
1210 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1211 : max_batch_size: NonZeroUsize,
1212 : batch: &mut Result<BatchedFeMessage, QueryError>,
1213 : this_msg: Result<BatchedFeMessage, QueryError>,
1214 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1215 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1216 :
1217 : let this_msg = match this_msg {
1218 : Ok(this_msg) => this_msg,
1219 : Err(e) => return Err(Err(e)),
1220 : };
1221 :
1222 : let eligible_batch = match batch {
1223 : Ok(b) => b,
1224 : Err(_) => {
1225 : return Err(Ok(this_msg));
1226 : }
1227 : };
1228 :
1229 : let batch_break =
1230 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1231 :
1232 : match batch_break {
1233 : Some(reason) => {
1234 : if let BatchedFeMessage::GetPage {
1235 : batch_break_reason, ..
1236 : } = eligible_batch
1237 : {
1238 : *batch_break_reason = reason;
1239 : }
1240 :
1241 : Err(Ok(this_msg))
1242 : }
1243 : None => {
1244 : // ok to batch
1245 : match (eligible_batch, this_msg) {
1246 : (
1247 : BatchedFeMessage::GetPage {
1248 : pages: accum_pages, ..
1249 : },
1250 : BatchedFeMessage::GetPage {
1251 : pages: this_pages, ..
1252 : },
1253 : ) => {
1254 : accum_pages.extend(this_pages);
1255 : Ok(())
1256 : }
1257 : #[cfg(feature = "testing")]
1258 : (
1259 : BatchedFeMessage::Test {
1260 : requests: accum_requests,
1261 : ..
1262 : },
1263 : BatchedFeMessage::Test {
1264 : requests: this_requests,
1265 : ..
1266 : },
1267 : ) => {
1268 : accum_requests.extend(this_requests);
1269 : Ok(())
1270 : }
1271 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1272 : _ => unreachable!(),
1273 : }
1274 : }
1275 : }
1276 : }
1277 :
1278 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1279 : async fn pagesteam_handle_batched_message<IO>(
1280 : &mut self,
1281 : pgb_writer: &mut PostgresBackend<IO>,
1282 : batch: BatchedFeMessage,
1283 : io_concurrency: IoConcurrency,
1284 : cancel: &CancellationToken,
1285 : protocol_version: PagestreamProtocolVersion,
1286 : ctx: &RequestContext,
1287 : ) -> Result<(), QueryError>
1288 : where
1289 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1290 : {
1291 : let started_at = Instant::now();
1292 : let batch = {
1293 : let mut batch = batch;
1294 : batch.observe_execution_start(started_at);
1295 : batch
1296 : };
1297 :
1298 : // Dispatch the batch to the appropriate request handler.
1299 : let log_slow_name = batch.as_static_str();
1300 : let (mut handler_results, span) = {
1301 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1302 : // won't fit on the stack.
1303 : let mut boxpinned =
1304 : Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
1305 : log_slow(
1306 : log_slow_name,
1307 : LOG_SLOW_GETPAGE_THRESHOLD,
1308 : boxpinned.as_mut(),
1309 : )
1310 : .await?
1311 : };
1312 :
1313 : // We purposefully don't count flush time into the smgr operation timer.
1314 : //
1315 : // The reason is that current compute client will not perform protocol processing
1316 : // if the postgres backend process is doing things other than `->smgr_read()`.
1317 : // This is especially the case for prefetch.
1318 : //
1319 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1320 : // all the way into our flush call below.
1321 : //
1322 : // The timer's underlying metric is used for a storage-internal latency SLO and
1323 : // we don't want to include latency in it that we can't control.
1324 : // And as pointed out above, in this case, we don't control the time that flush will take.
1325 : //
1326 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1327 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1328 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1329 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1330 : let flush_timers = {
1331 : let flushing_start_time = Instant::now();
1332 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1333 : for handler_result in &mut handler_results {
1334 : let flush_timer = match handler_result {
1335 : Ok((_, timer)) => Some(
1336 : timer
1337 : .observe_execution_end(flushing_start_time)
1338 : .expect("we are the first caller"),
1339 : ),
1340 : Err(_) => {
1341 : // TODO: measure errors
1342 : None
1343 : }
1344 : };
1345 : flush_timers.push(flush_timer);
1346 : }
1347 : assert_eq!(flush_timers.len(), handler_results.len());
1348 : flush_timers
1349 : };
1350 :
1351 : // Map handler result to protocol behavior.
1352 : // Some handler errors cause exit from pagestream protocol.
1353 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1354 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1355 : let response_msg = match handler_result {
1356 : Err(e) => match &e.err {
1357 : PageStreamError::Shutdown => {
1358 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1359 : // shutdown, then do not send the error to the client. Instead just drop the
1360 : // connection.
1361 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1362 : return Err(QueryError::Shutdown);
1363 : }
1364 : PageStreamError::Reconnect(reason) => {
1365 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1366 : return Err(QueryError::Reconnect);
1367 : }
1368 : PageStreamError::Read(_)
1369 : | PageStreamError::LsnTimeout(_)
1370 : | PageStreamError::NotFound(_)
1371 : | PageStreamError::BadRequest(_) => {
1372 : // print the all details to the log with {:#}, but for the client the
1373 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1374 : // here includes cancellation which is not an error.
1375 : let full = utils::error::report_compact_sources(&e.err);
1376 0 : span.in_scope(|| {
1377 0 : error!("error reading relation or page version: {full:#}")
1378 0 : });
1379 :
1380 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1381 : req: e.req,
1382 : message: e.err.to_string(),
1383 : })
1384 : }
1385 : },
1386 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1387 : };
1388 :
1389 : //
1390 : // marshal & transmit response message
1391 : //
1392 :
1393 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1394 : &response_msg.serialize(protocol_version),
1395 : ))?;
1396 :
1397 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1398 :
1399 : // what we want to do
1400 : let socket_fd = pgb_writer.socket_fd;
1401 : let flush_fut = pgb_writer.flush();
1402 : // metric for how long flushing takes
1403 : let flush_fut = match flushing_timer {
1404 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1405 : Instant::now(),
1406 : flush_fut,
1407 : socket_fd,
1408 : )),
1409 : None => futures::future::Either::Right(flush_fut),
1410 : };
1411 : // do it while respecting cancellation
1412 0 : let _: () = async move {
1413 0 : tokio::select! {
1414 : biased;
1415 0 : _ = cancel.cancelled() => {
1416 : // We were requested to shut down.
1417 0 : info!("shutdown request received in page handler");
1418 0 : return Err(QueryError::Shutdown)
1419 : }
1420 0 : res = flush_fut => {
1421 0 : res?;
1422 : }
1423 : }
1424 0 : Ok(())
1425 0 : }
1426 : .await?;
1427 : }
1428 : Ok(())
1429 : }
1430 :
1431 : /// Helper which dispatches a batched message to the appropriate handler.
1432 : /// Returns a vec of results, along with the extracted trace span.
1433 0 : async fn pagestream_dispatch_batched_message(
1434 0 : &mut self,
1435 0 : batch: BatchedFeMessage,
1436 0 : io_concurrency: IoConcurrency,
1437 0 : ctx: &RequestContext,
1438 0 : ) -> Result<
1439 0 : (
1440 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1441 0 : Span,
1442 0 : ),
1443 0 : QueryError,
1444 0 : > {
1445 : macro_rules! upgrade_handle_and_set_context {
1446 : ($shard:ident) => {{
1447 : let weak_handle = &$shard;
1448 : let handle = weak_handle.upgrade()?;
1449 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1450 : (handle, ctx)
1451 : }};
1452 : }
1453 0 : Ok(match batch {
1454 : BatchedFeMessage::Exists {
1455 0 : span,
1456 0 : timer,
1457 0 : shard,
1458 0 : req,
1459 0 : } => {
1460 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1461 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1462 : (
1463 0 : vec![
1464 0 : self.handle_get_rel_exists_request(&shard, &req, &ctx)
1465 0 : .instrument(span.clone())
1466 0 : .await
1467 0 : .map(|msg| (msg, timer))
1468 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1469 0 : ],
1470 0 : span,
1471 : )
1472 : }
1473 : BatchedFeMessage::Nblocks {
1474 0 : span,
1475 0 : timer,
1476 0 : shard,
1477 0 : req,
1478 0 : } => {
1479 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1480 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1481 : (
1482 0 : vec![
1483 0 : self.handle_get_nblocks_request(&shard, &req, &ctx)
1484 0 : .instrument(span.clone())
1485 0 : .await
1486 0 : .map(|msg| (msg, timer))
1487 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1488 0 : ],
1489 0 : span,
1490 : )
1491 : }
1492 : BatchedFeMessage::GetPage {
1493 0 : span,
1494 0 : shard,
1495 0 : pages,
1496 0 : batch_break_reason,
1497 0 : } => {
1498 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1499 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1500 : (
1501 : {
1502 0 : let npages = pages.len();
1503 0 : trace!(npages, "handling getpage request");
1504 0 : let res = self
1505 0 : .handle_get_page_at_lsn_request_batched(
1506 0 : &shard,
1507 0 : pages,
1508 0 : io_concurrency,
1509 0 : batch_break_reason,
1510 0 : &ctx,
1511 0 : )
1512 0 : .instrument(span.clone())
1513 0 : .await;
1514 0 : assert_eq!(res.len(), npages);
1515 0 : res
1516 0 : },
1517 0 : span,
1518 : )
1519 : }
1520 : BatchedFeMessage::DbSize {
1521 0 : span,
1522 0 : timer,
1523 0 : shard,
1524 0 : req,
1525 0 : } => {
1526 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1527 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1528 : (
1529 0 : vec![
1530 0 : self.handle_db_size_request(&shard, &req, &ctx)
1531 0 : .instrument(span.clone())
1532 0 : .await
1533 0 : .map(|msg| (msg, timer))
1534 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1535 0 : ],
1536 0 : span,
1537 : )
1538 : }
1539 : BatchedFeMessage::GetSlruSegment {
1540 0 : span,
1541 0 : timer,
1542 0 : shard,
1543 0 : req,
1544 0 : } => {
1545 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1546 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1547 : (
1548 0 : vec![
1549 0 : self.handle_get_slru_segment_request(&shard, &req, &ctx)
1550 0 : .instrument(span.clone())
1551 0 : .await
1552 0 : .map(|msg| (msg, timer))
1553 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1554 0 : ],
1555 0 : span,
1556 : )
1557 : }
1558 : #[cfg(feature = "testing")]
1559 : BatchedFeMessage::Test {
1560 0 : span,
1561 0 : shard,
1562 0 : requests,
1563 0 : } => {
1564 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1565 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1566 : (
1567 : {
1568 0 : let npages = requests.len();
1569 0 : trace!(npages, "handling getpage request");
1570 0 : let res = self
1571 0 : .handle_test_request_batch(&shard, requests, &ctx)
1572 0 : .instrument(span.clone())
1573 0 : .await;
1574 0 : assert_eq!(res.len(), npages);
1575 0 : res
1576 0 : },
1577 0 : span,
1578 : )
1579 : }
1580 0 : BatchedFeMessage::RespondError { span, error } => {
1581 0 : // We've already decided to respond with an error, so we don't need to
1582 0 : // call the handler.
1583 0 : (vec![Err(error)], span)
1584 : }
1585 : })
1586 0 : }
1587 :
1588 : /// Pagestream sub-protocol handler.
1589 : ///
1590 : /// It is a simple request-response protocol inside a COPYBOTH session.
1591 : ///
1592 : /// # Coding Discipline
1593 : ///
1594 : /// Coding discipline within this function: all interaction with the `pgb` connection
1595 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1596 : /// This is so that we can shutdown page_service quickly.
1597 : #[instrument(skip_all)]
1598 : async fn handle_pagerequests<IO>(
1599 : &mut self,
1600 : pgb: &mut PostgresBackend<IO>,
1601 : tenant_id: TenantId,
1602 : timeline_id: TimelineId,
1603 : protocol_version: PagestreamProtocolVersion,
1604 : ctx: RequestContext,
1605 : ) -> Result<(), QueryError>
1606 : where
1607 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1608 : {
1609 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1610 :
1611 : // switch client to COPYBOTH
1612 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1613 : tokio::select! {
1614 : biased;
1615 : _ = self.cancel.cancelled() => {
1616 : return Err(QueryError::Shutdown)
1617 : }
1618 : res = pgb.flush() => {
1619 : res?;
1620 : }
1621 : }
1622 :
1623 : let io_concurrency = IoConcurrency::spawn_from_conf(
1624 : self.conf,
1625 : match self.gate_guard.try_clone() {
1626 : Ok(guard) => guard,
1627 : Err(_) => {
1628 : info!("shutdown request received in page handler");
1629 : return Err(QueryError::Shutdown);
1630 : }
1631 : },
1632 : );
1633 :
1634 : let pgb_reader = pgb
1635 : .split()
1636 : .context("implementation error: split pgb into reader and writer")?;
1637 :
1638 : let timeline_handles = self
1639 : .timeline_handles
1640 : .take()
1641 : .expect("implementation error: timeline_handles should not be locked");
1642 :
1643 : let request_span = info_span!("request");
1644 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1645 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1646 : self.handle_pagerequests_pipelined(
1647 : pgb,
1648 : pgb_reader,
1649 : tenant_id,
1650 : timeline_id,
1651 : timeline_handles,
1652 : request_span,
1653 : pipelining_config,
1654 : protocol_version,
1655 : io_concurrency,
1656 : &ctx,
1657 : )
1658 : .await
1659 : }
1660 : PageServicePipeliningConfig::Serial => {
1661 : self.handle_pagerequests_serial(
1662 : pgb,
1663 : pgb_reader,
1664 : tenant_id,
1665 : timeline_id,
1666 : timeline_handles,
1667 : request_span,
1668 : protocol_version,
1669 : io_concurrency,
1670 : &ctx,
1671 : )
1672 : .await
1673 : }
1674 : };
1675 :
1676 : debug!("pagestream subprotocol shut down cleanly");
1677 :
1678 : pgb.unsplit(pgb_reader)
1679 : .context("implementation error: unsplit pgb")?;
1680 :
1681 : let replaced = self.timeline_handles.replace(timeline_handles);
1682 : assert!(replaced.is_none());
1683 :
1684 : result
1685 : }
1686 :
1687 : #[allow(clippy::too_many_arguments)]
1688 0 : async fn handle_pagerequests_serial<IO>(
1689 0 : &mut self,
1690 0 : pgb_writer: &mut PostgresBackend<IO>,
1691 0 : mut pgb_reader: PostgresBackendReader<IO>,
1692 0 : tenant_id: TenantId,
1693 0 : timeline_id: TimelineId,
1694 0 : mut timeline_handles: TimelineHandles,
1695 0 : request_span: Span,
1696 0 : protocol_version: PagestreamProtocolVersion,
1697 0 : io_concurrency: IoConcurrency,
1698 0 : ctx: &RequestContext,
1699 0 : ) -> (
1700 0 : (PostgresBackendReader<IO>, TimelineHandles),
1701 0 : Result<(), QueryError>,
1702 0 : )
1703 0 : where
1704 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1705 0 : {
1706 0 : let cancel = self.cancel.clone();
1707 :
1708 0 : let err = loop {
1709 0 : let msg = Self::pagestream_read_message(
1710 0 : &mut pgb_reader,
1711 0 : tenant_id,
1712 0 : timeline_id,
1713 0 : &mut timeline_handles,
1714 0 : &self.perf_span_fields,
1715 0 : &cancel,
1716 0 : ctx,
1717 0 : protocol_version,
1718 0 : request_span.clone(),
1719 0 : )
1720 0 : .await;
1721 0 : let msg = match msg {
1722 0 : Ok(msg) => msg,
1723 0 : Err(e) => break e,
1724 : };
1725 0 : let msg = match msg {
1726 0 : Some(msg) => msg,
1727 : None => {
1728 0 : debug!("pagestream subprotocol end observed");
1729 0 : return ((pgb_reader, timeline_handles), Ok(()));
1730 : }
1731 : };
1732 :
1733 0 : let result = self
1734 0 : .pagesteam_handle_batched_message(
1735 0 : pgb_writer,
1736 0 : msg,
1737 0 : io_concurrency.clone(),
1738 0 : &cancel,
1739 0 : protocol_version,
1740 0 : ctx,
1741 0 : )
1742 0 : .await;
1743 0 : match result {
1744 0 : Ok(()) => {}
1745 0 : Err(e) => break e,
1746 : }
1747 : };
1748 0 : ((pgb_reader, timeline_handles), Err(err))
1749 0 : }
1750 :
1751 : /// # Cancel-Safety
1752 : ///
1753 : /// May leak tokio tasks if not polled to completion.
1754 : #[allow(clippy::too_many_arguments)]
1755 0 : async fn handle_pagerequests_pipelined<IO>(
1756 0 : &mut self,
1757 0 : pgb_writer: &mut PostgresBackend<IO>,
1758 0 : pgb_reader: PostgresBackendReader<IO>,
1759 0 : tenant_id: TenantId,
1760 0 : timeline_id: TimelineId,
1761 0 : mut timeline_handles: TimelineHandles,
1762 0 : request_span: Span,
1763 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1764 0 : protocol_version: PagestreamProtocolVersion,
1765 0 : io_concurrency: IoConcurrency,
1766 0 : ctx: &RequestContext,
1767 0 : ) -> (
1768 0 : (PostgresBackendReader<IO>, TimelineHandles),
1769 0 : Result<(), QueryError>,
1770 0 : )
1771 0 : where
1772 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1773 0 : {
1774 0 : //
1775 0 : // Pipelined pagestream handling consists of
1776 0 : // - a Batcher that reads requests off the wire and
1777 0 : // and batches them if possible,
1778 0 : // - an Executor that processes the batched requests.
1779 0 : //
1780 0 : // The batch is built up inside an `spsc_fold` channel,
1781 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1782 0 : //
1783 0 : // The Batcher continously folds client requests into the batch,
1784 0 : // while the Executor can at any time take out what's in the batch
1785 0 : // in order to process it.
1786 0 : // This means the next batch builds up while the Executor
1787 0 : // executes the last batch.
1788 0 : //
1789 0 : // CANCELLATION
1790 0 : //
1791 0 : // We run both Batcher and Executor futures to completion before
1792 0 : // returning from this function.
1793 0 : //
1794 0 : // If Executor exits first, it signals cancellation to the Batcher
1795 0 : // via a CancellationToken that is child of `self.cancel`.
1796 0 : // If Batcher exits first, it signals cancellation to the Executor
1797 0 : // by dropping the spsc_fold channel Sender.
1798 0 : //
1799 0 : // CLEAN SHUTDOWN
1800 0 : //
1801 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1802 0 : // In response to such a client message, the Batcher exits.
1803 0 : // The Executor continues to run, draining the spsc_fold channel.
1804 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1805 0 : // indicating that the sender disconnected.
1806 0 : // The Executor exits with Ok(()) in response to that error.
1807 0 : //
1808 0 : // Server initiated shutdown is not clean shutdown, but instead
1809 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1810 0 : // error propagation.
1811 0 : //
1812 0 : // ERROR PROPAGATION
1813 0 : //
1814 0 : // When the Batcher encounter an error, it sends it as a value
1815 0 : // through the spsc_fold channel and exits afterwards.
1816 0 : // When the Executor observes such an error in the channel,
1817 0 : // it exits returning that error value.
1818 0 : //
1819 0 : // This design ensures that the Executor stage will still process
1820 0 : // the batch that was in flight when the Batcher encountered an error,
1821 0 : // thereby beahving identical to a serial implementation.
1822 0 :
1823 0 : let PageServicePipeliningConfigPipelined {
1824 0 : max_batch_size,
1825 0 : execution,
1826 0 : batching: batching_strategy,
1827 0 : } = pipelining_config;
1828 :
1829 : // Macro to _define_ a pipeline stage.
1830 : macro_rules! pipeline_stage {
1831 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1832 : let cancel: CancellationToken = $cancel;
1833 : let stage_fut = $make_fut(cancel.clone());
1834 0 : async move {
1835 0 : scopeguard::defer! {
1836 0 : debug!("exiting");
1837 0 : }
1838 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1839 0 : .await
1840 0 : }
1841 : .instrument(tracing::info_span!($name))
1842 : }};
1843 : }
1844 :
1845 : //
1846 : // Batcher
1847 : //
1848 :
1849 0 : let perf_span_fields = self.perf_span_fields.clone();
1850 0 :
1851 0 : let cancel_batcher = self.cancel.child_token();
1852 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1853 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1854 0 : let ctx = ctx.attached_child();
1855 0 : async move {
1856 0 : let mut pgb_reader = pgb_reader;
1857 0 : let mut exit = false;
1858 0 : while !exit {
1859 0 : let read_res = Self::pagestream_read_message(
1860 0 : &mut pgb_reader,
1861 0 : tenant_id,
1862 0 : timeline_id,
1863 0 : &mut timeline_handles,
1864 0 : &perf_span_fields,
1865 0 : &cancel_batcher,
1866 0 : &ctx,
1867 0 : protocol_version,
1868 0 : request_span.clone(),
1869 0 : )
1870 0 : .await;
1871 0 : let Some(read_res) = read_res.transpose() else {
1872 0 : debug!("client-initiated shutdown");
1873 0 : break;
1874 : };
1875 0 : exit |= read_res.is_err();
1876 0 : let could_send = batch_tx
1877 0 : .send(read_res, |batch, res| {
1878 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
1879 0 : })
1880 0 : .await;
1881 0 : exit |= could_send.is_err();
1882 : }
1883 0 : (pgb_reader, timeline_handles)
1884 0 : }
1885 0 : });
1886 :
1887 : //
1888 : // Executor
1889 : //
1890 :
1891 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1892 0 : let ctx = ctx.attached_child();
1893 0 : async move {
1894 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1895 : loop {
1896 0 : let maybe_batch = batch_rx.recv().await;
1897 0 : let batch = match maybe_batch {
1898 0 : Ok(batch) => batch,
1899 : Err(spsc_fold::RecvError::SenderGone) => {
1900 0 : debug!("upstream gone");
1901 0 : return Ok(());
1902 : }
1903 : };
1904 0 : let batch = match batch {
1905 0 : Ok(batch) => batch,
1906 0 : Err(e) => {
1907 0 : return Err(e);
1908 : }
1909 : };
1910 0 : self.pagesteam_handle_batched_message(
1911 0 : pgb_writer,
1912 0 : batch,
1913 0 : io_concurrency.clone(),
1914 0 : &cancel,
1915 0 : protocol_version,
1916 0 : &ctx,
1917 0 : )
1918 0 : .await?;
1919 : }
1920 0 : }
1921 0 : });
1922 :
1923 : //
1924 : // Execute the stages.
1925 : //
1926 :
1927 0 : match execution {
1928 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1929 0 : tokio::join!(batcher, executor)
1930 : }
1931 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1932 : // These tasks are not tracked anywhere.
1933 0 : let read_messages_task = tokio::spawn(batcher);
1934 0 : let (read_messages_task_res, executor_res_) =
1935 0 : tokio::join!(read_messages_task, executor,);
1936 0 : (
1937 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1938 0 : executor_res_,
1939 0 : )
1940 : }
1941 : }
1942 0 : }
1943 :
1944 : /// Helper function to handle the LSN from client request.
1945 : ///
1946 : /// Each GetPage (and Exists and Nblocks) request includes information about
1947 : /// which version of the page is being requested. The primary compute node
1948 : /// will always request the latest page version, by setting 'request_lsn' to
1949 : /// the last inserted or flushed WAL position, while a standby will request
1950 : /// a version at the LSN that it's currently caught up to.
1951 : ///
1952 : /// In either case, if the page server hasn't received the WAL up to the
1953 : /// requested LSN yet, we will wait for it to arrive. The return value is
1954 : /// the LSN that should be used to look up the page versions.
1955 : ///
1956 : /// In addition to the request LSN, each request carries another LSN,
1957 : /// 'not_modified_since', which is a hint to the pageserver that the client
1958 : /// knows that the page has not been modified between 'not_modified_since'
1959 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1960 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1961 : /// information about when the page was modified, it will use
1962 : /// not_modified_since == lsn. If the client lies and sends a too low
1963 : /// not_modified_hint such that there are in fact later page versions, the
1964 : /// behavior is undefined: the pageserver may return any of the page versions
1965 : /// or an error.
1966 0 : async fn wait_or_get_last_lsn(
1967 0 : timeline: &Timeline,
1968 0 : request_lsn: Lsn,
1969 0 : not_modified_since: Lsn,
1970 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1971 0 : ctx: &RequestContext,
1972 0 : ) -> Result<Lsn, PageStreamError> {
1973 0 : let last_record_lsn = timeline.get_last_record_lsn();
1974 0 : let effective_request_lsn = Self::effective_request_lsn(
1975 0 : timeline,
1976 0 : last_record_lsn,
1977 0 : request_lsn,
1978 0 : not_modified_since,
1979 0 : latest_gc_cutoff_lsn,
1980 0 : )?;
1981 :
1982 0 : if effective_request_lsn > last_record_lsn {
1983 0 : timeline
1984 0 : .wait_lsn(
1985 0 : not_modified_since,
1986 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1987 0 : timeline::WaitLsnTimeout::Default,
1988 0 : ctx,
1989 0 : )
1990 0 : .await?;
1991 :
1992 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
1993 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1994 : // advance immediately after we return anyway)
1995 0 : }
1996 :
1997 0 : Ok(effective_request_lsn)
1998 0 : }
1999 :
2000 0 : fn effective_request_lsn(
2001 0 : timeline: &Timeline,
2002 0 : last_record_lsn: Lsn,
2003 0 : request_lsn: Lsn,
2004 0 : not_modified_since: Lsn,
2005 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2006 0 : ) -> Result<Lsn, PageStreamError> {
2007 0 : // Sanity check the request
2008 0 : if request_lsn < not_modified_since {
2009 0 : return Err(PageStreamError::BadRequest(
2010 0 : format!(
2011 0 : "invalid request with request LSN {} and not_modified_since {}",
2012 0 : request_lsn, not_modified_since,
2013 0 : )
2014 0 : .into(),
2015 0 : ));
2016 0 : }
2017 0 :
2018 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2019 0 : if request_lsn == Lsn::INVALID {
2020 0 : return Err(PageStreamError::BadRequest(
2021 0 : "invalid LSN(0) in request".into(),
2022 0 : ));
2023 0 : }
2024 0 :
2025 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2026 0 : //
2027 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
2028 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2029 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2030 0 : let gc_info = &timeline.gc_info.read().unwrap();
2031 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2032 0 : return Err(
2033 0 : PageStreamError::BadRequest(format!(
2034 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2035 0 : request_lsn, **latest_gc_cutoff_lsn
2036 0 : ).into())
2037 0 : );
2038 0 : }
2039 0 : }
2040 :
2041 0 : if not_modified_since > last_record_lsn {
2042 0 : Ok(not_modified_since)
2043 : } else {
2044 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2045 : // here instead. That would give the same result, since we know that there
2046 : // haven't been any modifications since 'not_modified_since'. Using an older
2047 : // LSN might be faster, because that could allow skipping recent layers when
2048 : // finding the page. However, we have historically used 'last_record_lsn', so
2049 : // stick to that for now.
2050 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2051 : }
2052 0 : }
2053 :
2054 : /// Handles the lsn lease request.
2055 : /// If a lease cannot be obtained, the client will receive NULL.
2056 : #[instrument(skip_all, fields(shard_id, %lsn))]
2057 : async fn handle_make_lsn_lease<IO>(
2058 : &mut self,
2059 : pgb: &mut PostgresBackend<IO>,
2060 : tenant_shard_id: TenantShardId,
2061 : timeline_id: TimelineId,
2062 : lsn: Lsn,
2063 : ctx: &RequestContext,
2064 : ) -> Result<(), QueryError>
2065 : where
2066 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2067 : {
2068 : let timeline = self
2069 : .timeline_handles
2070 : .as_mut()
2071 : .unwrap()
2072 : .get(
2073 : tenant_shard_id.tenant_id,
2074 : timeline_id,
2075 : ShardSelector::Known(tenant_shard_id.to_index()),
2076 : )
2077 : .await?;
2078 : set_tracing_field_shard_id(&timeline);
2079 :
2080 : let lease = timeline
2081 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2082 0 : .inspect_err(|e| {
2083 0 : warn!("{e}");
2084 0 : })
2085 : .ok();
2086 0 : let valid_until_str = lease.map(|l| {
2087 0 : l.valid_until
2088 0 : .duration_since(SystemTime::UNIX_EPOCH)
2089 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2090 0 : .as_millis()
2091 0 : .to_string()
2092 0 : });
2093 :
2094 : info!(
2095 : "acquired lease for {} until {}",
2096 : lsn,
2097 : valid_until_str.as_deref().unwrap_or("<unknown>")
2098 : );
2099 :
2100 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2101 :
2102 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2103 : b"valid_until",
2104 : )]))?
2105 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2106 :
2107 : Ok(())
2108 : }
2109 :
2110 : #[instrument(skip_all, fields(shard_id))]
2111 : async fn handle_get_rel_exists_request(
2112 : &mut self,
2113 : timeline: &Timeline,
2114 : req: &PagestreamExistsRequest,
2115 : ctx: &RequestContext,
2116 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2117 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2118 : let lsn = Self::wait_or_get_last_lsn(
2119 : timeline,
2120 : req.hdr.request_lsn,
2121 : req.hdr.not_modified_since,
2122 : &latest_gc_cutoff_lsn,
2123 : ctx,
2124 : )
2125 : .await?;
2126 :
2127 : let exists = timeline
2128 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
2129 : .await?;
2130 :
2131 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
2132 : req: *req,
2133 : exists,
2134 : }))
2135 : }
2136 :
2137 : #[instrument(skip_all, fields(shard_id))]
2138 : async fn handle_get_nblocks_request(
2139 : &mut self,
2140 : timeline: &Timeline,
2141 : req: &PagestreamNblocksRequest,
2142 : ctx: &RequestContext,
2143 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2144 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2145 : let lsn = Self::wait_or_get_last_lsn(
2146 : timeline,
2147 : req.hdr.request_lsn,
2148 : req.hdr.not_modified_since,
2149 : &latest_gc_cutoff_lsn,
2150 : ctx,
2151 : )
2152 : .await?;
2153 :
2154 : let n_blocks = timeline
2155 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
2156 : .await?;
2157 :
2158 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
2159 : req: *req,
2160 : n_blocks,
2161 : }))
2162 : }
2163 :
2164 : #[instrument(skip_all, fields(shard_id))]
2165 : async fn handle_db_size_request(
2166 : &mut self,
2167 : timeline: &Timeline,
2168 : req: &PagestreamDbSizeRequest,
2169 : ctx: &RequestContext,
2170 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2171 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2172 : let lsn = Self::wait_or_get_last_lsn(
2173 : timeline,
2174 : req.hdr.request_lsn,
2175 : req.hdr.not_modified_since,
2176 : &latest_gc_cutoff_lsn,
2177 : ctx,
2178 : )
2179 : .await?;
2180 :
2181 : let total_blocks = timeline
2182 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
2183 : .await?;
2184 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2185 :
2186 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
2187 : req: *req,
2188 : db_size,
2189 : }))
2190 : }
2191 :
2192 : #[instrument(skip_all)]
2193 : async fn handle_get_page_at_lsn_request_batched(
2194 : &mut self,
2195 : timeline: &Timeline,
2196 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
2197 : io_concurrency: IoConcurrency,
2198 : batch_break_reason: GetPageBatchBreakReason,
2199 : ctx: &RequestContext,
2200 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2201 : debug_assert_current_span_has_tenant_and_timeline_id();
2202 :
2203 : timeline
2204 : .query_metrics
2205 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2206 :
2207 : // If a page trace is running, submit an event for this request.
2208 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2209 : let time = SystemTime::now();
2210 : for batch in &requests {
2211 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2212 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2213 : _ = page_trace.try_send(PageTraceEvent {
2214 : key,
2215 : effective_lsn: batch.effective_request_lsn,
2216 : time,
2217 : });
2218 : }
2219 : }
2220 :
2221 : // If any request in the batch needs to wait for LSN, then do so now.
2222 : let mut perf_instrument = false;
2223 : let max_effective_lsn = requests
2224 : .iter()
2225 0 : .map(|req| {
2226 0 : if req.ctx.has_perf_span() {
2227 0 : perf_instrument = true;
2228 0 : }
2229 :
2230 0 : req.effective_request_lsn
2231 0 : })
2232 : .max()
2233 : .expect("batch is never empty");
2234 :
2235 : let ctx = match perf_instrument {
2236 : true => RequestContextBuilder::from(ctx)
2237 0 : .root_perf_span(|| {
2238 0 : info_span!(
2239 : target: PERF_TRACE_TARGET,
2240 : "GET_VECTORED",
2241 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2242 : timeline_id = %timeline.timeline_id,
2243 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2244 : %max_effective_lsn
2245 : )
2246 0 : })
2247 : .attached_child(),
2248 : false => ctx.attached_child(),
2249 : };
2250 :
2251 : let last_record_lsn = timeline.get_last_record_lsn();
2252 : if max_effective_lsn > last_record_lsn {
2253 : if let Err(e) = timeline
2254 : .wait_lsn(
2255 : max_effective_lsn,
2256 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2257 : timeline::WaitLsnTimeout::Default,
2258 : &ctx,
2259 : )
2260 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2261 0 : info_span!(
2262 : target: PERF_TRACE_TARGET,
2263 0 : parent: current_perf_span,
2264 : "WAIT_LSN",
2265 : )
2266 0 : })
2267 : .await
2268 : {
2269 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2270 0 : Err(BatchedPageStreamError {
2271 0 : err: PageStreamError::from(e.clone()),
2272 0 : req: req.req.hdr,
2273 0 : })
2274 0 : }));
2275 : }
2276 : }
2277 :
2278 : let results = timeline
2279 : .get_rel_page_at_lsn_batched(
2280 0 : requests.iter().map(|p| {
2281 0 : (
2282 0 : &p.req.rel,
2283 0 : &p.req.blkno,
2284 0 : p.effective_request_lsn,
2285 0 : p.ctx.attached_child(),
2286 0 : )
2287 0 : }),
2288 : io_concurrency,
2289 : &ctx,
2290 : )
2291 : .await;
2292 : assert_eq!(results.len(), requests.len());
2293 :
2294 : // TODO: avoid creating the new Vec here
2295 : Vec::from_iter(
2296 : requests
2297 : .into_iter()
2298 : .zip(results.into_iter())
2299 0 : .map(|(req, res)| {
2300 0 : res.map(|page| {
2301 0 : (
2302 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
2303 0 : req: req.req,
2304 0 : page,
2305 0 : }),
2306 0 : req.timer,
2307 0 : )
2308 0 : })
2309 0 : .map_err(|e| BatchedPageStreamError {
2310 0 : err: PageStreamError::from(e),
2311 0 : req: req.req.hdr,
2312 0 : })
2313 0 : }),
2314 : )
2315 : }
2316 :
2317 : #[instrument(skip_all, fields(shard_id))]
2318 : async fn handle_get_slru_segment_request(
2319 : &mut self,
2320 : timeline: &Timeline,
2321 : req: &PagestreamGetSlruSegmentRequest,
2322 : ctx: &RequestContext,
2323 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2324 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2325 : let lsn = Self::wait_or_get_last_lsn(
2326 : timeline,
2327 : req.hdr.request_lsn,
2328 : req.hdr.not_modified_since,
2329 : &latest_gc_cutoff_lsn,
2330 : ctx,
2331 : )
2332 : .await?;
2333 :
2334 : let kind = SlruKind::from_repr(req.kind)
2335 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2336 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2337 :
2338 : Ok(PagestreamBeMessage::GetSlruSegment(
2339 : PagestreamGetSlruSegmentResponse { req: *req, segment },
2340 : ))
2341 : }
2342 :
2343 : // NB: this impl mimics what we do for batched getpage requests.
2344 : #[cfg(feature = "testing")]
2345 : #[instrument(skip_all, fields(shard_id))]
2346 : async fn handle_test_request_batch(
2347 : &mut self,
2348 : timeline: &Timeline,
2349 : requests: Vec<BatchedTestRequest>,
2350 : _ctx: &RequestContext,
2351 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2352 : // real requests would do something with the timeline
2353 : let mut results = Vec::with_capacity(requests.len());
2354 : for _req in requests.iter() {
2355 : tokio::task::yield_now().await;
2356 :
2357 : results.push({
2358 : if timeline.cancel.is_cancelled() {
2359 : Err(PageReconstructError::Cancelled)
2360 : } else {
2361 : Ok(())
2362 : }
2363 : });
2364 : }
2365 :
2366 : // TODO: avoid creating the new Vec here
2367 : Vec::from_iter(
2368 : requests
2369 : .into_iter()
2370 : .zip(results.into_iter())
2371 0 : .map(|(req, res)| {
2372 0 : res.map(|()| {
2373 0 : (
2374 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2375 0 : req: req.req.clone(),
2376 0 : }),
2377 0 : req.timer,
2378 0 : )
2379 0 : })
2380 0 : .map_err(|e| BatchedPageStreamError {
2381 0 : err: PageStreamError::from(e),
2382 0 : req: req.req.hdr,
2383 0 : })
2384 0 : }),
2385 : )
2386 : }
2387 :
2388 : /// Note on "fullbackup":
2389 : /// Full basebackups should only be used for debugging purposes.
2390 : /// Originally, it was introduced to enable breaking storage format changes,
2391 : /// but that is not applicable anymore.
2392 : ///
2393 : /// # Coding Discipline
2394 : ///
2395 : /// Coding discipline within this function: all interaction with the `pgb` connection
2396 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2397 : /// This is so that we can shutdown page_service quickly.
2398 : ///
2399 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2400 : /// to connection cancellation.
2401 : #[allow(clippy::too_many_arguments)]
2402 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2403 : async fn handle_basebackup_request<IO>(
2404 : &mut self,
2405 : pgb: &mut PostgresBackend<IO>,
2406 : tenant_id: TenantId,
2407 : timeline_id: TimelineId,
2408 : lsn: Option<Lsn>,
2409 : prev_lsn: Option<Lsn>,
2410 : full_backup: bool,
2411 : gzip: bool,
2412 : replica: bool,
2413 : ctx: &RequestContext,
2414 : ) -> Result<(), QueryError>
2415 : where
2416 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2417 : {
2418 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2419 0 : match err {
2420 : // TODO: passthrough the error site to the final error message?
2421 0 : BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
2422 0 : BasebackupError::Server(e) => QueryError::Other(e),
2423 0 : BasebackupError::Shutdown => QueryError::Shutdown,
2424 : }
2425 0 : }
2426 :
2427 : let started = std::time::Instant::now();
2428 :
2429 : let timeline = self
2430 : .timeline_handles
2431 : .as_mut()
2432 : .unwrap()
2433 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2434 : .await?;
2435 : set_tracing_field_shard_id(&timeline);
2436 : let ctx = ctx.with_scope_timeline(&timeline);
2437 :
2438 : if timeline.is_archived() == Some(true) {
2439 : tracing::info!(
2440 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2441 : );
2442 : return Err(QueryError::NotFound("timeline is archived".into()));
2443 : }
2444 :
2445 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2446 : if let Some(lsn) = lsn {
2447 : // Backup was requested at a particular LSN. Wait for it to arrive.
2448 : info!("waiting for {}", lsn);
2449 : timeline
2450 : .wait_lsn(
2451 : lsn,
2452 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2453 : crate::tenant::timeline::WaitLsnTimeout::Default,
2454 : &ctx,
2455 : )
2456 : .await?;
2457 : timeline
2458 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2459 : .context("invalid basebackup lsn")?;
2460 : }
2461 :
2462 : let lsn_awaited_after = started.elapsed();
2463 :
2464 : // switch client to COPYOUT
2465 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2466 : .map_err(QueryError::Disconnected)?;
2467 : self.flush_cancellable(pgb, &self.cancel).await?;
2468 :
2469 : // Send a tarball of the latest layer on the timeline. Compress if not
2470 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2471 : if full_backup {
2472 : let mut writer = pgb.copyout_writer();
2473 : basebackup::send_basebackup_tarball(
2474 : &mut writer,
2475 : &timeline,
2476 : lsn,
2477 : prev_lsn,
2478 : full_backup,
2479 : replica,
2480 : &ctx,
2481 : )
2482 : .await
2483 : .map_err(map_basebackup_error)?;
2484 : } else {
2485 : let mut writer = BufWriter::new(pgb.copyout_writer());
2486 : if gzip {
2487 : let mut encoder = GzipEncoder::with_quality(
2488 : &mut writer,
2489 : // NOTE using fast compression because it's on the critical path
2490 : // for compute startup. For an empty database, we get
2491 : // <100KB with this method. The Level::Best compression method
2492 : // gives us <20KB, but maybe we should add basebackup caching
2493 : // on compute shutdown first.
2494 : async_compression::Level::Fastest,
2495 : );
2496 : basebackup::send_basebackup_tarball(
2497 : &mut encoder,
2498 : &timeline,
2499 : lsn,
2500 : prev_lsn,
2501 : full_backup,
2502 : replica,
2503 : &ctx,
2504 : )
2505 : .await
2506 : .map_err(map_basebackup_error)?;
2507 : // shutdown the encoder to ensure the gzip footer is written
2508 : encoder
2509 : .shutdown()
2510 : .await
2511 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2512 : } else {
2513 : basebackup::send_basebackup_tarball(
2514 : &mut writer,
2515 : &timeline,
2516 : lsn,
2517 : prev_lsn,
2518 : full_backup,
2519 : replica,
2520 : &ctx,
2521 : )
2522 : .await
2523 : .map_err(map_basebackup_error)?;
2524 : }
2525 0 : writer.flush().await.map_err(|e| {
2526 0 : map_basebackup_error(BasebackupError::Client(
2527 0 : e,
2528 0 : "handle_basebackup_request,flush",
2529 0 : ))
2530 0 : })?;
2531 : }
2532 :
2533 : pgb.write_message_noflush(&BeMessage::CopyDone)
2534 : .map_err(QueryError::Disconnected)?;
2535 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2536 :
2537 : let basebackup_after = started
2538 : .elapsed()
2539 : .checked_sub(lsn_awaited_after)
2540 : .unwrap_or(Duration::ZERO);
2541 :
2542 : info!(
2543 : lsn_await_millis = lsn_awaited_after.as_millis(),
2544 : basebackup_millis = basebackup_after.as_millis(),
2545 : "basebackup complete"
2546 : );
2547 :
2548 : Ok(())
2549 : }
2550 :
2551 : // when accessing management api supply None as an argument
2552 : // when using to authorize tenant pass corresponding tenant id
2553 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2554 0 : if self.auth.is_none() {
2555 : // auth is set to Trust, nothing to check so just return ok
2556 0 : return Ok(());
2557 0 : }
2558 0 : // auth is some, just checked above, when auth is some
2559 0 : // then claims are always present because of checks during connection init
2560 0 : // so this expect won't trigger
2561 0 : let claims = self
2562 0 : .claims
2563 0 : .as_ref()
2564 0 : .expect("claims presence already checked");
2565 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2566 0 : }
2567 : }
2568 :
2569 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2570 : #[derive(Debug, Clone, Eq, PartialEq)]
2571 : struct BaseBackupCmd {
2572 : tenant_id: TenantId,
2573 : timeline_id: TimelineId,
2574 : lsn: Option<Lsn>,
2575 : gzip: bool,
2576 : replica: bool,
2577 : }
2578 :
2579 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2580 : #[derive(Debug, Clone, Eq, PartialEq)]
2581 : struct FullBackupCmd {
2582 : tenant_id: TenantId,
2583 : timeline_id: TimelineId,
2584 : lsn: Option<Lsn>,
2585 : prev_lsn: Option<Lsn>,
2586 : }
2587 :
2588 : /// `pagestream_v2 tenant timeline`
2589 : #[derive(Debug, Clone, Eq, PartialEq)]
2590 : struct PageStreamCmd {
2591 : tenant_id: TenantId,
2592 : timeline_id: TimelineId,
2593 : protocol_version: PagestreamProtocolVersion,
2594 : }
2595 :
2596 : /// `lease lsn tenant timeline lsn`
2597 : #[derive(Debug, Clone, Eq, PartialEq)]
2598 : struct LeaseLsnCmd {
2599 : tenant_shard_id: TenantShardId,
2600 : timeline_id: TimelineId,
2601 : lsn: Lsn,
2602 : }
2603 :
2604 : #[derive(Debug, Clone, Eq, PartialEq)]
2605 : enum PageServiceCmd {
2606 : Set,
2607 : PageStream(PageStreamCmd),
2608 : BaseBackup(BaseBackupCmd),
2609 : FullBackup(FullBackupCmd),
2610 : LeaseLsn(LeaseLsnCmd),
2611 : }
2612 :
2613 : impl PageStreamCmd {
2614 36 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2615 36 : let parameters = query.split_whitespace().collect_vec();
2616 36 : if parameters.len() != 2 {
2617 12 : bail!(
2618 12 : "invalid number of parameters for pagestream command: {}",
2619 12 : query
2620 12 : );
2621 24 : }
2622 24 : let tenant_id = TenantId::from_str(parameters[0])
2623 24 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2624 12 : let timeline_id = TimelineId::from_str(parameters[1])
2625 12 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2626 12 : Ok(Self {
2627 12 : tenant_id,
2628 12 : timeline_id,
2629 12 : protocol_version,
2630 12 : })
2631 36 : }
2632 : }
2633 :
2634 : impl FullBackupCmd {
2635 24 : fn parse(query: &str) -> anyhow::Result<Self> {
2636 24 : let parameters = query.split_whitespace().collect_vec();
2637 24 : if parameters.len() < 2 || parameters.len() > 4 {
2638 0 : bail!(
2639 0 : "invalid number of parameters for basebackup command: {}",
2640 0 : query
2641 0 : );
2642 24 : }
2643 24 : let tenant_id = TenantId::from_str(parameters[0])
2644 24 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2645 24 : let timeline_id = TimelineId::from_str(parameters[1])
2646 24 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2647 : // The caller is responsible for providing correct lsn and prev_lsn.
2648 24 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2649 : Some(
2650 12 : Lsn::from_str(lsn_str)
2651 12 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2652 : )
2653 : } else {
2654 12 : None
2655 : };
2656 24 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2657 : Some(
2658 12 : Lsn::from_str(prev_lsn_str)
2659 12 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2660 : )
2661 : } else {
2662 12 : None
2663 : };
2664 24 : Ok(Self {
2665 24 : tenant_id,
2666 24 : timeline_id,
2667 24 : lsn,
2668 24 : prev_lsn,
2669 24 : })
2670 24 : }
2671 : }
2672 :
2673 : impl BaseBackupCmd {
2674 108 : fn parse(query: &str) -> anyhow::Result<Self> {
2675 108 : let parameters = query.split_whitespace().collect_vec();
2676 108 : if parameters.len() < 2 {
2677 0 : bail!(
2678 0 : "invalid number of parameters for basebackup command: {}",
2679 0 : query
2680 0 : );
2681 108 : }
2682 108 : let tenant_id = TenantId::from_str(parameters[0])
2683 108 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2684 108 : let timeline_id = TimelineId::from_str(parameters[1])
2685 108 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2686 : let lsn;
2687 : let flags_parse_from;
2688 108 : if let Some(maybe_lsn) = parameters.get(2) {
2689 96 : if *maybe_lsn == "latest" {
2690 12 : lsn = None;
2691 12 : flags_parse_from = 3;
2692 84 : } else if maybe_lsn.starts_with("--") {
2693 60 : lsn = None;
2694 60 : flags_parse_from = 2;
2695 60 : } else {
2696 : lsn = Some(
2697 24 : Lsn::from_str(maybe_lsn)
2698 24 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2699 : );
2700 24 : flags_parse_from = 3;
2701 : }
2702 12 : } else {
2703 12 : lsn = None;
2704 12 : flags_parse_from = 2;
2705 12 : }
2706 :
2707 108 : let mut gzip = false;
2708 108 : let mut replica = false;
2709 :
2710 132 : for ¶m in ¶meters[flags_parse_from..] {
2711 132 : match param {
2712 132 : "--gzip" => {
2713 84 : if gzip {
2714 12 : bail!("duplicate parameter for basebackup command: {param}")
2715 72 : }
2716 72 : gzip = true
2717 : }
2718 48 : "--replica" => {
2719 24 : if replica {
2720 0 : bail!("duplicate parameter for basebackup command: {param}")
2721 24 : }
2722 24 : replica = true
2723 : }
2724 24 : _ => bail!("invalid parameter for basebackup command: {param}"),
2725 : }
2726 : }
2727 72 : Ok(Self {
2728 72 : tenant_id,
2729 72 : timeline_id,
2730 72 : lsn,
2731 72 : gzip,
2732 72 : replica,
2733 72 : })
2734 108 : }
2735 : }
2736 :
2737 : impl LeaseLsnCmd {
2738 24 : fn parse(query: &str) -> anyhow::Result<Self> {
2739 24 : let parameters = query.split_whitespace().collect_vec();
2740 24 : if parameters.len() != 3 {
2741 0 : bail!(
2742 0 : "invalid number of parameters for lease lsn command: {}",
2743 0 : query
2744 0 : );
2745 24 : }
2746 24 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2747 24 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2748 24 : let timeline_id = TimelineId::from_str(parameters[1])
2749 24 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2750 24 : let lsn = Lsn::from_str(parameters[2])
2751 24 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2752 24 : Ok(Self {
2753 24 : tenant_shard_id,
2754 24 : timeline_id,
2755 24 : lsn,
2756 24 : })
2757 24 : }
2758 : }
2759 :
2760 : impl PageServiceCmd {
2761 252 : fn parse(query: &str) -> anyhow::Result<Self> {
2762 252 : let query = query.trim();
2763 252 : let Some((cmd, other)) = query.split_once(' ') else {
2764 24 : bail!("cannot parse query: {query}")
2765 : };
2766 228 : match cmd.to_ascii_lowercase().as_str() {
2767 228 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2768 36 : other,
2769 36 : PagestreamProtocolVersion::V2,
2770 36 : )?)),
2771 192 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2772 0 : other,
2773 0 : PagestreamProtocolVersion::V3,
2774 0 : )?)),
2775 192 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2776 84 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2777 60 : "lease" => {
2778 36 : let Some((cmd2, other)) = other.split_once(' ') else {
2779 0 : bail!("invalid lease command: {cmd}");
2780 : };
2781 36 : let cmd2 = cmd2.to_ascii_lowercase();
2782 36 : if cmd2 == "lsn" {
2783 24 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2784 : } else {
2785 12 : bail!("invalid lease command: {cmd}");
2786 : }
2787 : }
2788 24 : "set" => Ok(Self::Set),
2789 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2790 : }
2791 252 : }
2792 : }
2793 :
2794 : /// Parse the startup options from the postgres wire protocol startup packet.
2795 : ///
2796 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
2797 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
2798 : /// whether all options are successfully parsed. There could be duplicates in the options
2799 : /// if the caller passed such parameters.
2800 84 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
2801 84 : let mut parsing_config = false;
2802 84 : let mut has_error = false;
2803 84 : let mut config = Vec::new();
2804 192 : for item in options.split_whitespace() {
2805 192 : if item == "-c" {
2806 108 : if !parsing_config {
2807 96 : parsing_config = true;
2808 96 : } else {
2809 : // "-c" followed with another "-c"
2810 12 : tracing::warn!("failed to parse the startup options: {options}");
2811 12 : has_error = true;
2812 12 : break;
2813 : }
2814 84 : } else if item.starts_with("-c") || parsing_config {
2815 84 : let Some((mut key, value)) = item.split_once('=') else {
2816 : // "-c" followed with an invalid option
2817 12 : tracing::warn!("failed to parse the startup options: {options}");
2818 12 : has_error = true;
2819 12 : break;
2820 : };
2821 72 : if !parsing_config {
2822 : // Parse "-coptions=X"
2823 12 : let Some(stripped_key) = key.strip_prefix("-c") else {
2824 0 : tracing::warn!("failed to parse the startup options: {options}");
2825 0 : has_error = true;
2826 0 : break;
2827 : };
2828 12 : key = stripped_key;
2829 60 : }
2830 72 : config.push((key.to_string(), value.to_string()));
2831 72 : parsing_config = false;
2832 : } else {
2833 0 : tracing::warn!("failed to parse the startup options: {options}");
2834 0 : has_error = true;
2835 0 : break;
2836 : }
2837 : }
2838 84 : if parsing_config {
2839 : // "-c" without the option
2840 36 : tracing::warn!("failed to parse the startup options: {options}");
2841 36 : has_error = true;
2842 48 : }
2843 84 : (config, has_error)
2844 84 : }
2845 :
2846 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2847 : where
2848 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2849 : {
2850 0 : fn check_auth_jwt(
2851 0 : &mut self,
2852 0 : _pgb: &mut PostgresBackend<IO>,
2853 0 : jwt_response: &[u8],
2854 0 : ) -> Result<(), QueryError> {
2855 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2856 : // which requires auth to be present
2857 0 : let data: TokenData<Claims> = self
2858 0 : .auth
2859 0 : .as_ref()
2860 0 : .unwrap()
2861 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2862 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2863 :
2864 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2865 0 : return Err(QueryError::Unauthorized(
2866 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2867 0 : ));
2868 0 : }
2869 0 :
2870 0 : debug!(
2871 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2872 : data.claims.scope, data.claims.tenant_id,
2873 : );
2874 :
2875 0 : self.claims = Some(data.claims);
2876 0 : Ok(())
2877 0 : }
2878 :
2879 0 : fn startup(
2880 0 : &mut self,
2881 0 : _pgb: &mut PostgresBackend<IO>,
2882 0 : sm: &FeStartupPacket,
2883 0 : ) -> Result<(), QueryError> {
2884 0 : fail::fail_point!("ps::connection-start::startup-packet");
2885 :
2886 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
2887 0 : if let Some(app_name) = params.get("application_name") {
2888 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
2889 0 : Span::current().record("application_name", field::display(app_name));
2890 0 : }
2891 0 : if let Some(options) = params.get("options") {
2892 0 : let (config, _) = parse_options(options);
2893 0 : for (key, value) in config {
2894 0 : if key == "neon.compute_mode" {
2895 0 : self.perf_span_fields.compute_mode = Some(value.clone());
2896 0 : Span::current().record("compute_mode", field::display(value));
2897 0 : }
2898 : }
2899 0 : }
2900 0 : };
2901 :
2902 0 : Ok(())
2903 0 : }
2904 :
2905 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2906 : async fn process_query(
2907 : &mut self,
2908 : pgb: &mut PostgresBackend<IO>,
2909 : query_string: &str,
2910 : ) -> Result<(), QueryError> {
2911 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2912 0 : info!("Hit failpoint for bad connection");
2913 0 : Err(QueryError::SimulatedConnectionError)
2914 0 : });
2915 :
2916 : fail::fail_point!("ps::connection-start::process-query");
2917 :
2918 : let ctx = self.connection_ctx.attached_child();
2919 : debug!("process query {query_string}");
2920 : let query = PageServiceCmd::parse(query_string)?;
2921 : match query {
2922 : PageServiceCmd::PageStream(PageStreamCmd {
2923 : tenant_id,
2924 : timeline_id,
2925 : protocol_version,
2926 : }) => {
2927 : tracing::Span::current()
2928 : .record("tenant_id", field::display(tenant_id))
2929 : .record("timeline_id", field::display(timeline_id));
2930 :
2931 : self.check_permission(Some(tenant_id))?;
2932 : let command_kind = match protocol_version {
2933 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2934 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2935 : };
2936 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2937 :
2938 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2939 : .await?;
2940 : }
2941 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2942 : tenant_id,
2943 : timeline_id,
2944 : lsn,
2945 : gzip,
2946 : replica,
2947 : }) => {
2948 : tracing::Span::current()
2949 : .record("tenant_id", field::display(tenant_id))
2950 : .record("timeline_id", field::display(timeline_id));
2951 :
2952 : self.check_permission(Some(tenant_id))?;
2953 :
2954 : COMPUTE_COMMANDS_COUNTERS
2955 : .for_command(ComputeCommandKind::Basebackup)
2956 : .inc();
2957 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2958 0 : let res = async {
2959 0 : self.handle_basebackup_request(
2960 0 : pgb,
2961 0 : tenant_id,
2962 0 : timeline_id,
2963 0 : lsn,
2964 0 : None,
2965 0 : false,
2966 0 : gzip,
2967 0 : replica,
2968 0 : &ctx,
2969 0 : )
2970 0 : .await?;
2971 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2972 0 : Result::<(), QueryError>::Ok(())
2973 0 : }
2974 : .await;
2975 : metric_recording.observe(&res);
2976 : res?;
2977 : }
2978 : // same as basebackup, but result includes relational data as well
2979 : PageServiceCmd::FullBackup(FullBackupCmd {
2980 : tenant_id,
2981 : timeline_id,
2982 : lsn,
2983 : prev_lsn,
2984 : }) => {
2985 : tracing::Span::current()
2986 : .record("tenant_id", field::display(tenant_id))
2987 : .record("timeline_id", field::display(timeline_id));
2988 :
2989 : self.check_permission(Some(tenant_id))?;
2990 :
2991 : COMPUTE_COMMANDS_COUNTERS
2992 : .for_command(ComputeCommandKind::Fullbackup)
2993 : .inc();
2994 :
2995 : // Check that the timeline exists
2996 : self.handle_basebackup_request(
2997 : pgb,
2998 : tenant_id,
2999 : timeline_id,
3000 : lsn,
3001 : prev_lsn,
3002 : true,
3003 : false,
3004 : false,
3005 : &ctx,
3006 : )
3007 : .await?;
3008 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3009 : }
3010 : PageServiceCmd::Set => {
3011 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3012 : // on connect
3013 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3014 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3015 : }
3016 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3017 : tenant_shard_id,
3018 : timeline_id,
3019 : lsn,
3020 : }) => {
3021 : tracing::Span::current()
3022 : .record("tenant_id", field::display(tenant_shard_id))
3023 : .record("timeline_id", field::display(timeline_id));
3024 :
3025 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3026 :
3027 : COMPUTE_COMMANDS_COUNTERS
3028 : .for_command(ComputeCommandKind::LeaseLsn)
3029 : .inc();
3030 :
3031 : match self
3032 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3033 : .await
3034 : {
3035 : Ok(()) => {
3036 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3037 : }
3038 : Err(e) => {
3039 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3040 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3041 : &e.to_string(),
3042 : Some(e.pg_error_code()),
3043 : ))?
3044 : }
3045 : };
3046 : }
3047 : }
3048 :
3049 : Ok(())
3050 : }
3051 : }
3052 :
3053 : impl From<GetActiveTenantError> for QueryError {
3054 0 : fn from(e: GetActiveTenantError) -> Self {
3055 0 : match e {
3056 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
3057 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
3058 0 : ),
3059 : GetActiveTenantError::Cancelled
3060 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
3061 0 : QueryError::Shutdown
3062 : }
3063 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
3064 0 : e => QueryError::Other(anyhow::anyhow!(e)),
3065 : }
3066 0 : }
3067 : }
3068 :
3069 : #[derive(Debug, thiserror::Error)]
3070 : pub(crate) enum GetActiveTimelineError {
3071 : #[error(transparent)]
3072 : Tenant(GetActiveTenantError),
3073 : #[error(transparent)]
3074 : Timeline(#[from] GetTimelineError),
3075 : }
3076 :
3077 : impl From<GetActiveTimelineError> for QueryError {
3078 0 : fn from(e: GetActiveTimelineError) -> Self {
3079 0 : match e {
3080 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
3081 0 : GetActiveTimelineError::Tenant(e) => e.into(),
3082 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
3083 : }
3084 0 : }
3085 : }
3086 :
3087 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
3088 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
3089 0 : match e {
3090 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
3091 0 : }
3092 0 : }
3093 : }
3094 :
3095 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
3096 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
3097 0 : tracing::Span::current().record(
3098 0 : "shard_id",
3099 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
3100 0 : );
3101 0 : debug_assert_current_span_has_tenant_and_timeline_id();
3102 0 : }
3103 :
3104 : struct WaitedForLsn(Lsn);
3105 : impl From<WaitedForLsn> for Lsn {
3106 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
3107 0 : lsn
3108 0 : }
3109 : }
3110 :
3111 : #[cfg(test)]
3112 : mod tests {
3113 : use utils::shard::ShardCount;
3114 :
3115 : use super::*;
3116 :
3117 : #[test]
3118 12 : fn pageservice_cmd_parse() {
3119 12 : let tenant_id = TenantId::generate();
3120 12 : let timeline_id = TimelineId::generate();
3121 12 : let cmd =
3122 12 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
3123 12 : assert_eq!(
3124 12 : cmd,
3125 12 : PageServiceCmd::PageStream(PageStreamCmd {
3126 12 : tenant_id,
3127 12 : timeline_id,
3128 12 : protocol_version: PagestreamProtocolVersion::V2,
3129 12 : })
3130 12 : );
3131 12 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
3132 12 : assert_eq!(
3133 12 : cmd,
3134 12 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3135 12 : tenant_id,
3136 12 : timeline_id,
3137 12 : lsn: None,
3138 12 : gzip: false,
3139 12 : replica: false
3140 12 : })
3141 12 : );
3142 12 : let cmd =
3143 12 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
3144 12 : assert_eq!(
3145 12 : cmd,
3146 12 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3147 12 : tenant_id,
3148 12 : timeline_id,
3149 12 : lsn: None,
3150 12 : gzip: true,
3151 12 : replica: false
3152 12 : })
3153 12 : );
3154 12 : let cmd =
3155 12 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
3156 12 : assert_eq!(
3157 12 : cmd,
3158 12 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3159 12 : tenant_id,
3160 12 : timeline_id,
3161 12 : lsn: None,
3162 12 : gzip: false,
3163 12 : replica: false
3164 12 : })
3165 12 : );
3166 12 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
3167 12 : .unwrap();
3168 12 : assert_eq!(
3169 12 : cmd,
3170 12 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3171 12 : tenant_id,
3172 12 : timeline_id,
3173 12 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3174 12 : gzip: false,
3175 12 : replica: false
3176 12 : })
3177 12 : );
3178 12 : let cmd = PageServiceCmd::parse(&format!(
3179 12 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
3180 12 : ))
3181 12 : .unwrap();
3182 12 : assert_eq!(
3183 12 : cmd,
3184 12 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3185 12 : tenant_id,
3186 12 : timeline_id,
3187 12 : lsn: None,
3188 12 : gzip: true,
3189 12 : replica: true
3190 12 : })
3191 12 : );
3192 12 : let cmd = PageServiceCmd::parse(&format!(
3193 12 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
3194 12 : ))
3195 12 : .unwrap();
3196 12 : assert_eq!(
3197 12 : cmd,
3198 12 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3199 12 : tenant_id,
3200 12 : timeline_id,
3201 12 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3202 12 : gzip: true,
3203 12 : replica: true
3204 12 : })
3205 12 : );
3206 12 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
3207 12 : assert_eq!(
3208 12 : cmd,
3209 12 : PageServiceCmd::FullBackup(FullBackupCmd {
3210 12 : tenant_id,
3211 12 : timeline_id,
3212 12 : lsn: None,
3213 12 : prev_lsn: None
3214 12 : })
3215 12 : );
3216 12 : let cmd = PageServiceCmd::parse(&format!(
3217 12 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
3218 12 : ))
3219 12 : .unwrap();
3220 12 : assert_eq!(
3221 12 : cmd,
3222 12 : PageServiceCmd::FullBackup(FullBackupCmd {
3223 12 : tenant_id,
3224 12 : timeline_id,
3225 12 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3226 12 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
3227 12 : })
3228 12 : );
3229 12 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
3230 12 : let cmd = PageServiceCmd::parse(&format!(
3231 12 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3232 12 : ))
3233 12 : .unwrap();
3234 12 : assert_eq!(
3235 12 : cmd,
3236 12 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3237 12 : tenant_shard_id,
3238 12 : timeline_id,
3239 12 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3240 12 : })
3241 12 : );
3242 12 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
3243 12 : let cmd = PageServiceCmd::parse(&format!(
3244 12 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3245 12 : ))
3246 12 : .unwrap();
3247 12 : assert_eq!(
3248 12 : cmd,
3249 12 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3250 12 : tenant_shard_id,
3251 12 : timeline_id,
3252 12 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3253 12 : })
3254 12 : );
3255 12 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
3256 12 : assert_eq!(cmd, PageServiceCmd::Set);
3257 12 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
3258 12 : assert_eq!(cmd, PageServiceCmd::Set);
3259 12 : }
3260 :
3261 : #[test]
3262 12 : fn pageservice_cmd_err_handling() {
3263 12 : let tenant_id = TenantId::generate();
3264 12 : let timeline_id = TimelineId::generate();
3265 12 : let cmd = PageServiceCmd::parse("unknown_command");
3266 12 : assert!(cmd.is_err());
3267 12 : let cmd = PageServiceCmd::parse("pagestream_v2");
3268 12 : assert!(cmd.is_err());
3269 12 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
3270 12 : assert!(cmd.is_err());
3271 12 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
3272 12 : assert!(cmd.is_err());
3273 12 : let cmd = PageServiceCmd::parse(&format!(
3274 12 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
3275 12 : ));
3276 12 : assert!(cmd.is_err());
3277 12 : let cmd = PageServiceCmd::parse(&format!(
3278 12 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
3279 12 : ));
3280 12 : assert!(cmd.is_err());
3281 12 : let cmd = PageServiceCmd::parse(&format!(
3282 12 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
3283 12 : ));
3284 12 : assert!(cmd.is_err());
3285 12 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
3286 12 : assert!(cmd.is_err());
3287 12 : }
3288 :
3289 : #[test]
3290 12 : fn test_parse_options() {
3291 12 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
3292 12 : assert!(!has_error);
3293 12 : assert_eq!(
3294 12 : config,
3295 12 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
3296 12 : );
3297 :
3298 12 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
3299 12 : assert!(!has_error);
3300 12 : assert_eq!(
3301 12 : config,
3302 12 : vec![
3303 12 : ("neon.compute_mode".to_string(), "primary".to_string()),
3304 12 : ("foo".to_string(), "bar".to_string()),
3305 12 : ]
3306 12 : );
3307 :
3308 12 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
3309 12 : assert!(!has_error);
3310 12 : assert_eq!(
3311 12 : config,
3312 12 : vec![
3313 12 : ("neon.compute_mode".to_string(), "primary".to_string()),
3314 12 : ("foo".to_string(), "bar".to_string()),
3315 12 : ]
3316 12 : );
3317 :
3318 12 : let (_, has_error) = parse_options("-c");
3319 12 : assert!(has_error);
3320 :
3321 12 : let (_, has_error) = parse_options("-c foo=bar -c -c");
3322 12 : assert!(has_error);
3323 :
3324 12 : let (_, has_error) = parse_options(" ");
3325 12 : assert!(!has_error);
3326 :
3327 12 : let (_, has_error) = parse_options(" -c neon.compute_mode");
3328 12 : assert!(has_error);
3329 12 : }
3330 : }
|