Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::borrow::Cow;
5 : use std::num::NonZeroUsize;
6 : use std::os::fd::AsRawFd;
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use std::time::{Duration, Instant, SystemTime};
10 : use std::{io, str};
11 :
12 : use anyhow::{Context, bail};
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use futures::FutureExt;
16 : use itertools::Itertools;
17 : use jsonwebtoken::TokenData;
18 : use once_cell::sync::OnceCell;
19 : use pageserver_api::config::{
20 : GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
21 : PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
22 : };
23 : use pageserver_api::key::rel_block_to_key;
24 : use pageserver_api::models::{
25 : self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
26 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
27 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
28 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
29 : PagestreamProtocolVersion, PagestreamRequest, TenantState,
30 : };
31 : use pageserver_api::reltag::SlruKind;
32 : use pageserver_api::shard::TenantShardId;
33 : use postgres_backend::{
34 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
35 : };
36 : use postgres_ffi::BLCKSZ;
37 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
38 : use pq_proto::framed::ConnectionError;
39 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
40 : use strum_macros::IntoStaticStr;
41 : use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
42 : use tokio::task::JoinHandle;
43 : use tokio_util::sync::CancellationToken;
44 : use tracing::*;
45 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
46 : use utils::failpoint_support;
47 : use utils::id::{TenantId, TimelineId};
48 : use utils::logging::log_slow;
49 : use utils::lsn::Lsn;
50 : use utils::simple_rcu::RcuReadGuard;
51 : use utils::sync::gate::{Gate, GateGuard};
52 : use utils::sync::spsc_fold;
53 :
54 : use crate::PERF_TRACE_TARGET;
55 : use crate::auth::check_permission;
56 : use crate::basebackup::BasebackupError;
57 : use crate::basebackup_cache::BasebackupCache;
58 : use crate::config::PageServerConf;
59 : use crate::context::{
60 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
61 : };
62 : use crate::metrics::{
63 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
64 : SmgrOpTimer, TimelineMetrics,
65 : };
66 : use crate::pgdatadir_mapping::{LsnRange, Version};
67 : use crate::span::{
68 : debug_assert_current_span_has_tenant_and_timeline_id,
69 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
70 : };
71 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
72 : use crate::tenant::mgr::{
73 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
74 : };
75 : use crate::tenant::storage_layer::IoConcurrency;
76 : use crate::tenant::timeline::{self, WaitLsnError};
77 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
78 : use crate::{basebackup, timed_after_cancellation};
79 :
80 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
81 : /// is not yet in state [`TenantState::Active`].
82 : ///
83 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
84 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
85 :
86 : /// Threshold at which to log slow GetPage requests.
87 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
88 :
89 : ///////////////////////////////////////////////////////////////////////////////
90 :
91 : pub struct Listener {
92 : cancel: CancellationToken,
93 : /// Cancel the listener task through `listen_cancel` to shut down the listener
94 : /// and get a handle on the existing connections.
95 : task: JoinHandle<Connections>,
96 : }
97 :
98 : pub struct Connections {
99 : cancel: CancellationToken,
100 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
101 : gate: Gate,
102 : }
103 :
104 0 : pub fn spawn(
105 0 : conf: &'static PageServerConf,
106 0 : tenant_manager: Arc<TenantManager>,
107 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
108 0 : perf_trace_dispatch: Option<Dispatch>,
109 0 : tcp_listener: tokio::net::TcpListener,
110 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
111 0 : basebackup_cache: Arc<BasebackupCache>,
112 0 : ) -> Listener {
113 0 : let cancel = CancellationToken::new();
114 0 : let libpq_ctx = RequestContext::todo_child(
115 0 : TaskKind::LibpqEndpointListener,
116 0 : // listener task shouldn't need to download anything. (We will
117 0 : // create a separate sub-contexts for each connection, with their
118 0 : // own download behavior. This context is used only to listen and
119 0 : // accept connections.)
120 0 : DownloadBehavior::Error,
121 0 : );
122 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
123 0 : "libpq listener",
124 0 : libpq_listener_main(
125 0 : conf,
126 0 : tenant_manager,
127 0 : pg_auth,
128 0 : perf_trace_dispatch,
129 0 : tcp_listener,
130 0 : conf.pg_auth_type,
131 0 : tls_config,
132 0 : conf.page_service_pipelining.clone(),
133 0 : basebackup_cache,
134 0 : libpq_ctx,
135 0 : cancel.clone(),
136 0 : )
137 0 : .map(anyhow::Ok),
138 0 : ));
139 0 :
140 0 : Listener { cancel, task }
141 0 : }
142 :
143 : impl Listener {
144 0 : pub async fn stop_accepting(self) -> Connections {
145 0 : self.cancel.cancel();
146 0 : self.task
147 0 : .await
148 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
149 0 : }
150 : }
151 : impl Connections {
152 0 : pub(crate) async fn shutdown(self) {
153 0 : let Self {
154 0 : cancel,
155 0 : mut tasks,
156 0 : gate,
157 0 : } = self;
158 0 : cancel.cancel();
159 0 : while let Some(res) = tasks.join_next().await {
160 0 : Self::handle_connection_completion(res);
161 0 : }
162 0 : gate.close().await;
163 0 : }
164 :
165 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
166 0 : match res {
167 0 : Ok(Ok(())) => {}
168 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
169 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
170 : }
171 0 : }
172 : }
173 :
174 : ///
175 : /// Main loop of the page service.
176 : ///
177 : /// Listens for connections, and launches a new handler task for each.
178 : ///
179 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
180 : /// open connections.
181 : ///
182 : #[allow(clippy::too_many_arguments)]
183 0 : pub async fn libpq_listener_main(
184 0 : conf: &'static PageServerConf,
185 0 : tenant_manager: Arc<TenantManager>,
186 0 : auth: Option<Arc<SwappableJwtAuth>>,
187 0 : perf_trace_dispatch: Option<Dispatch>,
188 0 : listener: tokio::net::TcpListener,
189 0 : auth_type: AuthType,
190 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
191 0 : pipelining_config: PageServicePipeliningConfig,
192 0 : basebackup_cache: Arc<BasebackupCache>,
193 0 : listener_ctx: RequestContext,
194 0 : listener_cancel: CancellationToken,
195 0 : ) -> Connections {
196 0 : let connections_cancel = CancellationToken::new();
197 0 : let connections_gate = Gate::default();
198 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
199 :
200 : loop {
201 0 : let gate_guard = match connections_gate.enter() {
202 0 : Ok(guard) => guard,
203 0 : Err(_) => break,
204 : };
205 :
206 0 : let accepted = tokio::select! {
207 : biased;
208 0 : _ = listener_cancel.cancelled() => break,
209 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
210 0 : let res = next.expect("we dont poll while empty");
211 0 : Connections::handle_connection_completion(res);
212 0 : continue;
213 : }
214 0 : accepted = listener.accept() => accepted,
215 0 : };
216 0 :
217 0 : match accepted {
218 0 : Ok((socket, peer_addr)) => {
219 0 : // Connection established. Spawn a new task to handle it.
220 0 : debug!("accepted connection from {}", peer_addr);
221 0 : let local_auth = auth.clone();
222 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
223 0 : .task_kind(TaskKind::PageRequestHandler)
224 0 : .download_behavior(DownloadBehavior::Download)
225 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
226 0 : .detached_child();
227 0 :
228 0 : connection_handler_tasks.spawn(page_service_conn_main(
229 0 : conf,
230 0 : tenant_manager.clone(),
231 0 : local_auth,
232 0 : socket,
233 0 : auth_type,
234 0 : tls_config.clone(),
235 0 : pipelining_config.clone(),
236 0 : Arc::clone(&basebackup_cache),
237 0 : connection_ctx,
238 0 : connections_cancel.child_token(),
239 0 : gate_guard,
240 0 : ));
241 : }
242 0 : Err(err) => {
243 0 : // accept() failed. Log the error, and loop back to retry on next connection.
244 0 : error!("accept() failed: {:?}", err);
245 : }
246 : }
247 : }
248 :
249 0 : debug!("page_service listener loop terminated");
250 :
251 0 : Connections {
252 0 : cancel: connections_cancel,
253 0 : tasks: connection_handler_tasks,
254 0 : gate: connections_gate,
255 0 : }
256 0 : }
257 :
258 : type ConnectionHandlerResult = anyhow::Result<()>;
259 :
260 : /// Perf root spans start at the per-request level, after shard routing.
261 : /// This struct carries connection-level information to the root perf span definition.
262 : #[derive(Clone)]
263 : struct ConnectionPerfSpanFields {
264 : peer_addr: String,
265 : application_name: Option<String>,
266 : compute_mode: Option<String>,
267 : }
268 :
269 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
270 : #[allow(clippy::too_many_arguments)]
271 : async fn page_service_conn_main(
272 : conf: &'static PageServerConf,
273 : tenant_manager: Arc<TenantManager>,
274 : auth: Option<Arc<SwappableJwtAuth>>,
275 : socket: tokio::net::TcpStream,
276 : auth_type: AuthType,
277 : tls_config: Option<Arc<rustls::ServerConfig>>,
278 : pipelining_config: PageServicePipeliningConfig,
279 : basebackup_cache: Arc<BasebackupCache>,
280 : connection_ctx: RequestContext,
281 : cancel: CancellationToken,
282 : gate_guard: GateGuard,
283 : ) -> ConnectionHandlerResult {
284 : let _guard = LIVE_CONNECTIONS
285 : .with_label_values(&["page_service"])
286 : .guard();
287 :
288 : socket
289 : .set_nodelay(true)
290 : .context("could not set TCP_NODELAY")?;
291 :
292 : let socket_fd = socket.as_raw_fd();
293 :
294 : let peer_addr = socket.peer_addr().context("get peer address")?;
295 :
296 : let perf_span_fields = ConnectionPerfSpanFields {
297 : peer_addr: peer_addr.to_string(),
298 : application_name: None, // filled in later
299 : compute_mode: None, // filled in later
300 : };
301 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
302 :
303 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
304 : // - long enough for most valid compute connections
305 : // - less than infinite to stop us from "leaking" connections to long-gone computes
306 : //
307 : // no write timeout is used, because the kernel is assumed to error writes after some time.
308 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
309 :
310 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
311 0 : let socket_timeout_ms = (|| {
312 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
313 : // Exponential distribution for simulating
314 : // poor network conditions, expect about avg_timeout_ms to be around 15
315 : // in tests
316 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
317 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
318 0 : let u = rand::random::<f32>();
319 0 : ((1.0 - u).ln() / (-avg)) as u64
320 : } else {
321 0 : default_timeout_ms
322 : }
323 0 : });
324 0 : default_timeout_ms
325 : })();
326 :
327 : // A timeout here does not mean the client died, it can happen if it's just idle for
328 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
329 : // they reconnect.
330 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
331 : let socket = Box::pin(socket);
332 :
333 : fail::fail_point!("ps::connection-start::pre-login");
334 :
335 : // XXX: pgbackend.run() should take the connection_ctx,
336 : // and create a child per-query context when it invokes process_query.
337 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
338 : // and create the per-query context in process_query ourselves.
339 : let mut conn_handler = PageServerHandler::new(
340 : tenant_manager,
341 : auth,
342 : pipelining_config,
343 : conf.get_vectored_concurrent_io,
344 : perf_span_fields,
345 : basebackup_cache,
346 : connection_ctx,
347 : cancel.clone(),
348 : gate_guard,
349 : );
350 : let pgbackend =
351 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
352 :
353 : match pgbackend.run(&mut conn_handler, &cancel).await {
354 : Ok(()) => {
355 : // we've been requested to shut down
356 : Ok(())
357 : }
358 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
359 : if is_expected_io_error(&io_error) {
360 : info!("Postgres client disconnected ({io_error})");
361 : Ok(())
362 : } else {
363 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
364 : Err(io_error).context(format!(
365 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
366 : tenant_id, peer_addr
367 : ))
368 : }
369 : }
370 : other => {
371 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
372 : other.context(format!(
373 : "Postgres query error for tenant_id={:?} client peer_addr={}",
374 : tenant_id, peer_addr
375 : ))
376 : }
377 : }
378 : }
379 :
380 : struct PageServerHandler {
381 : auth: Option<Arc<SwappableJwtAuth>>,
382 : claims: Option<Claims>,
383 :
384 : /// The context created for the lifetime of the connection
385 : /// services by this PageServerHandler.
386 : /// For each query received over the connection,
387 : /// `process_query` creates a child context from this one.
388 : connection_ctx: RequestContext,
389 :
390 : perf_span_fields: ConnectionPerfSpanFields,
391 :
392 : cancel: CancellationToken,
393 :
394 : /// None only while pagestream protocol is being processed.
395 : timeline_handles: Option<TimelineHandles>,
396 :
397 : pipelining_config: PageServicePipeliningConfig,
398 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
399 :
400 : basebackup_cache: Arc<BasebackupCache>,
401 :
402 : gate_guard: GateGuard,
403 : }
404 :
405 : struct TimelineHandles {
406 : wrapper: TenantManagerWrapper,
407 : /// Note on size: the typical size of this map is 1. The largest size we expect
408 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
409 : /// or the ratio used when splitting shards (i.e. how many children created from one)
410 : /// parent shard, where a "large" number might be ~8.
411 : handles: timeline::handle::Cache<TenantManagerTypes>,
412 : }
413 :
414 : impl TimelineHandles {
415 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
416 0 : Self {
417 0 : wrapper: TenantManagerWrapper {
418 0 : tenant_manager,
419 0 : tenant_id: OnceCell::new(),
420 0 : },
421 0 : handles: Default::default(),
422 0 : }
423 0 : }
424 0 : async fn get(
425 0 : &mut self,
426 0 : tenant_id: TenantId,
427 0 : timeline_id: TimelineId,
428 0 : shard_selector: ShardSelector,
429 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
430 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
431 0 : return Err(GetActiveTimelineError::Tenant(
432 0 : GetActiveTenantError::SwitchedTenant,
433 0 : ));
434 0 : }
435 0 : self.handles
436 0 : .get(timeline_id, shard_selector, &self.wrapper)
437 0 : .await
438 0 : .map_err(|e| match e {
439 0 : timeline::handle::GetError::TenantManager(e) => e,
440 : timeline::handle::GetError::PerTimelineStateShutDown => {
441 0 : trace!("per-timeline state shut down");
442 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
443 : }
444 0 : })
445 0 : }
446 :
447 0 : fn tenant_id(&self) -> Option<TenantId> {
448 0 : self.wrapper.tenant_id.get().copied()
449 0 : }
450 : }
451 :
452 : pub(crate) struct TenantManagerWrapper {
453 : tenant_manager: Arc<TenantManager>,
454 : // We do not support switching tenant_id on a connection at this point.
455 : // We can can add support for this later if needed without changing
456 : // the protocol.
457 : tenant_id: once_cell::sync::OnceCell<TenantId>,
458 : }
459 :
460 : #[derive(Debug)]
461 : pub(crate) struct TenantManagerTypes;
462 :
463 : impl timeline::handle::Types for TenantManagerTypes {
464 : type TenantManagerError = GetActiveTimelineError;
465 : type TenantManager = TenantManagerWrapper;
466 : type Timeline = TenantManagerCacheItem;
467 : }
468 :
469 : pub(crate) struct TenantManagerCacheItem {
470 : pub(crate) timeline: Arc<Timeline>,
471 : // allow() for cheap propagation through RequestContext inside a task
472 : #[allow(clippy::redundant_allocation)]
473 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
474 : #[allow(dead_code)] // we store it to keep the gate open
475 : pub(crate) gate_guard: GateGuard,
476 : }
477 :
478 : impl std::ops::Deref for TenantManagerCacheItem {
479 : type Target = Arc<Timeline>;
480 0 : fn deref(&self) -> &Self::Target {
481 0 : &self.timeline
482 0 : }
483 : }
484 :
485 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
486 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
487 0 : Timeline::shard_timeline_id(&self.timeline)
488 0 : }
489 :
490 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
491 0 : &self.timeline.handles
492 0 : }
493 :
494 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
495 0 : Timeline::get_shard_identity(&self.timeline)
496 0 : }
497 : }
498 :
499 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
500 0 : async fn resolve(
501 0 : &self,
502 0 : timeline_id: TimelineId,
503 0 : shard_selector: ShardSelector,
504 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
505 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
506 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
507 0 : let wait_start = Instant::now();
508 0 : let deadline = wait_start + timeout;
509 0 : let tenant_shard = loop {
510 0 : let resolved = self
511 0 : .tenant_manager
512 0 : .resolve_attached_shard(tenant_id, shard_selector);
513 0 : match resolved {
514 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
515 : ShardResolveResult::NotFound => {
516 0 : return Err(GetActiveTimelineError::Tenant(
517 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
518 0 : ));
519 : }
520 0 : ShardResolveResult::InProgress(barrier) => {
521 0 : // We can't authoritatively answer right now: wait for InProgress state
522 0 : // to end, then try again
523 0 : tokio::select! {
524 0 : _ = barrier.wait() => {
525 0 : // The barrier completed: proceed around the loop to try looking up again
526 0 : },
527 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
528 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
529 0 : latest_state: None,
530 0 : wait_time: timeout,
531 0 : }));
532 : }
533 : }
534 : }
535 : };
536 : };
537 :
538 0 : tracing::debug!("Waiting for tenant to enter active state...");
539 0 : tenant_shard
540 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
541 0 : .await
542 0 : .map_err(GetActiveTimelineError::Tenant)?;
543 :
544 0 : let timeline = tenant_shard
545 0 : .get_timeline(timeline_id, true)
546 0 : .map_err(GetActiveTimelineError::Timeline)?;
547 :
548 0 : let gate_guard = match timeline.gate.enter() {
549 0 : Ok(guard) => guard,
550 : Err(_) => {
551 0 : return Err(GetActiveTimelineError::Timeline(
552 0 : GetTimelineError::ShuttingDown,
553 0 : ));
554 : }
555 : };
556 :
557 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
558 0 :
559 0 : Ok(TenantManagerCacheItem {
560 0 : timeline,
561 0 : metrics,
562 0 : gate_guard,
563 0 : })
564 0 : }
565 : }
566 :
567 : #[derive(thiserror::Error, Debug)]
568 : enum PageStreamError {
569 : /// We encountered an error that should prompt the client to reconnect:
570 : /// in practice this means we drop the connection without sending a response.
571 : #[error("Reconnect required: {0}")]
572 : Reconnect(Cow<'static, str>),
573 :
574 : /// We were instructed to shutdown while processing the query
575 : #[error("Shutting down")]
576 : Shutdown,
577 :
578 : /// Something went wrong reading a page: this likely indicates a pageserver bug
579 : #[error("Read error")]
580 : Read(#[source] PageReconstructError),
581 :
582 : /// Ran out of time waiting for an LSN
583 : #[error("LSN timeout: {0}")]
584 : LsnTimeout(WaitLsnError),
585 :
586 : /// The entity required to serve the request (tenant or timeline) is not found,
587 : /// or is not found in a suitable state to serve a request.
588 : #[error("Not found: {0}")]
589 : NotFound(Cow<'static, str>),
590 :
591 : /// Request asked for something that doesn't make sense, like an invalid LSN
592 : #[error("Bad request: {0}")]
593 : BadRequest(Cow<'static, str>),
594 : }
595 :
596 : impl From<PageReconstructError> for PageStreamError {
597 0 : fn from(value: PageReconstructError) -> Self {
598 0 : match value {
599 0 : PageReconstructError::Cancelled => Self::Shutdown,
600 0 : e => Self::Read(e),
601 : }
602 0 : }
603 : }
604 :
605 : impl From<GetActiveTimelineError> for PageStreamError {
606 0 : fn from(value: GetActiveTimelineError) -> Self {
607 0 : match value {
608 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
609 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
610 : TenantState::Stopping { .. },
611 : ))
612 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
613 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
614 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
615 : }
616 0 : }
617 : }
618 :
619 : impl From<WaitLsnError> for PageStreamError {
620 0 : fn from(value: WaitLsnError) -> Self {
621 0 : match value {
622 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
623 0 : WaitLsnError::Shutdown => Self::Shutdown,
624 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
625 : }
626 0 : }
627 : }
628 :
629 : impl From<WaitLsnError> for QueryError {
630 0 : fn from(value: WaitLsnError) -> Self {
631 0 : match value {
632 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
633 0 : WaitLsnError::Shutdown => Self::Shutdown,
634 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
635 : }
636 0 : }
637 : }
638 :
639 : #[derive(thiserror::Error, Debug)]
640 : struct BatchedPageStreamError {
641 : req: PagestreamRequest,
642 : err: PageStreamError,
643 : }
644 :
645 : impl std::fmt::Display for BatchedPageStreamError {
646 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647 0 : self.err.fmt(f)
648 0 : }
649 : }
650 :
651 : struct BatchedGetPageRequest {
652 : req: PagestreamGetPageRequest,
653 : timer: SmgrOpTimer,
654 : lsn_range: LsnRange,
655 : ctx: RequestContext,
656 : }
657 :
658 : #[cfg(feature = "testing")]
659 : struct BatchedTestRequest {
660 : req: models::PagestreamTestRequest,
661 : timer: SmgrOpTimer,
662 : }
663 :
664 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
665 : /// so that we don't keep the [`Timeline::gate`] open while the batch
666 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
667 : #[derive(IntoStaticStr)]
668 : enum BatchedFeMessage {
669 : Exists {
670 : span: Span,
671 : timer: SmgrOpTimer,
672 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
673 : req: models::PagestreamExistsRequest,
674 : },
675 : Nblocks {
676 : span: Span,
677 : timer: SmgrOpTimer,
678 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
679 : req: models::PagestreamNblocksRequest,
680 : },
681 : GetPage {
682 : span: Span,
683 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
684 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
685 : batch_break_reason: GetPageBatchBreakReason,
686 : },
687 : DbSize {
688 : span: Span,
689 : timer: SmgrOpTimer,
690 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
691 : req: models::PagestreamDbSizeRequest,
692 : },
693 : GetSlruSegment {
694 : span: Span,
695 : timer: SmgrOpTimer,
696 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
697 : req: models::PagestreamGetSlruSegmentRequest,
698 : },
699 : #[cfg(feature = "testing")]
700 : Test {
701 : span: Span,
702 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
703 : requests: Vec<BatchedTestRequest>,
704 : },
705 : RespondError {
706 : span: Span,
707 : error: BatchedPageStreamError,
708 : },
709 : }
710 :
711 : impl BatchedFeMessage {
712 0 : fn as_static_str(&self) -> &'static str {
713 0 : self.into()
714 0 : }
715 :
716 0 : fn observe_execution_start(&mut self, at: Instant) {
717 0 : match self {
718 0 : BatchedFeMessage::Exists { timer, .. }
719 0 : | BatchedFeMessage::Nblocks { timer, .. }
720 0 : | BatchedFeMessage::DbSize { timer, .. }
721 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
722 0 : timer.observe_execution_start(at);
723 0 : }
724 0 : BatchedFeMessage::GetPage { pages, .. } => {
725 0 : for page in pages {
726 0 : page.timer.observe_execution_start(at);
727 0 : }
728 : }
729 : #[cfg(feature = "testing")]
730 0 : BatchedFeMessage::Test { requests, .. } => {
731 0 : for req in requests {
732 0 : req.timer.observe_execution_start(at);
733 0 : }
734 : }
735 0 : BatchedFeMessage::RespondError { .. } => {}
736 : }
737 0 : }
738 :
739 0 : fn should_break_batch(
740 0 : &self,
741 0 : other: &BatchedFeMessage,
742 0 : max_batch_size: NonZeroUsize,
743 0 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
744 0 : ) -> Option<GetPageBatchBreakReason> {
745 0 : match (self, other) {
746 : (
747 : BatchedFeMessage::GetPage {
748 0 : shard: accum_shard,
749 0 : pages: accum_pages,
750 0 : ..
751 0 : },
752 0 : BatchedFeMessage::GetPage {
753 0 : shard: this_shard,
754 0 : pages: this_pages,
755 0 : ..
756 0 : },
757 0 : ) => {
758 0 : assert_eq!(this_pages.len(), 1);
759 0 : if accum_pages.len() >= max_batch_size.get() {
760 0 : trace!(%max_batch_size, "stopping batching because of batch size");
761 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
762 :
763 0 : return Some(GetPageBatchBreakReason::BatchFull);
764 0 : }
765 0 : if !accum_shard.is_same_handle_as(this_shard) {
766 0 : trace!("stopping batching because timeline object mismatch");
767 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
768 : // But the current logic for keeping responses in order does not support that.
769 :
770 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
771 0 : }
772 0 :
773 0 : match batching_strategy {
774 : PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
775 0 : if let Some(last_in_batch) = accum_pages.last() {
776 0 : if last_in_batch.lsn_range.effective_lsn
777 0 : != this_pages[0].lsn_range.effective_lsn
778 : {
779 0 : trace!(
780 : accum_lsn = %last_in_batch.lsn_range.effective_lsn,
781 0 : this_lsn = %this_pages[0].lsn_range.effective_lsn,
782 0 : "stopping batching because LSN changed"
783 : );
784 :
785 0 : return Some(GetPageBatchBreakReason::NonUniformLsn);
786 0 : }
787 0 : }
788 : }
789 : PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => {
790 : // The read path doesn't curently support serving the same page at different LSNs.
791 : // While technically possible, it's uncertain if the complexity is worth it.
792 : // Break the batch if such a case is encountered.
793 0 : let same_page_different_lsn = accum_pages.iter().any(|batched| {
794 0 : batched.req.rel == this_pages[0].req.rel
795 0 : && batched.req.blkno == this_pages[0].req.blkno
796 0 : && batched.lsn_range.effective_lsn
797 0 : != this_pages[0].lsn_range.effective_lsn
798 0 : });
799 0 :
800 0 : if same_page_different_lsn {
801 0 : trace!(
802 0 : rel=%this_pages[0].req.rel,
803 0 : blkno=%this_pages[0].req.blkno,
804 0 : lsn=%this_pages[0].lsn_range.effective_lsn,
805 0 : "stopping batching because same page was requested at different LSNs"
806 : );
807 :
808 0 : return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn);
809 0 : }
810 : }
811 : }
812 :
813 0 : None
814 : }
815 : #[cfg(feature = "testing")]
816 : (
817 : BatchedFeMessage::Test {
818 0 : shard: accum_shard,
819 0 : requests: accum_requests,
820 0 : ..
821 0 : },
822 0 : BatchedFeMessage::Test {
823 0 : shard: this_shard,
824 0 : requests: this_requests,
825 0 : ..
826 0 : },
827 0 : ) => {
828 0 : assert!(this_requests.len() == 1);
829 0 : if accum_requests.len() >= max_batch_size.get() {
830 0 : trace!(%max_batch_size, "stopping batching because of batch size");
831 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
832 0 : return Some(GetPageBatchBreakReason::BatchFull);
833 0 : }
834 0 : if !accum_shard.is_same_handle_as(this_shard) {
835 0 : trace!("stopping batching because timeline object mismatch");
836 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
837 : // But the current logic for keeping responses in order does not support that.
838 0 : return Some(GetPageBatchBreakReason::NonUniformTimeline);
839 0 : }
840 0 : let this_batch_key = this_requests[0].req.batch_key;
841 0 : let accum_batch_key = accum_requests[0].req.batch_key;
842 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
843 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
844 0 : return Some(GetPageBatchBreakReason::NonUniformKey);
845 0 : }
846 0 : None
847 : }
848 0 : (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest),
849 : }
850 0 : }
851 : }
852 :
853 : impl PageServerHandler {
854 : #[allow(clippy::too_many_arguments)]
855 0 : pub fn new(
856 0 : tenant_manager: Arc<TenantManager>,
857 0 : auth: Option<Arc<SwappableJwtAuth>>,
858 0 : pipelining_config: PageServicePipeliningConfig,
859 0 : get_vectored_concurrent_io: GetVectoredConcurrentIo,
860 0 : perf_span_fields: ConnectionPerfSpanFields,
861 0 : basebackup_cache: Arc<BasebackupCache>,
862 0 : connection_ctx: RequestContext,
863 0 : cancel: CancellationToken,
864 0 : gate_guard: GateGuard,
865 0 : ) -> Self {
866 0 : PageServerHandler {
867 0 : auth,
868 0 : claims: None,
869 0 : connection_ctx,
870 0 : perf_span_fields,
871 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
872 0 : cancel,
873 0 : pipelining_config,
874 0 : get_vectored_concurrent_io,
875 0 : basebackup_cache,
876 0 : gate_guard,
877 0 : }
878 0 : }
879 :
880 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
881 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
882 : /// cancellation if there aren't any timelines in the cache.
883 : ///
884 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
885 : /// timeline cancellation token.
886 0 : async fn flush_cancellable<IO>(
887 0 : &self,
888 0 : pgb: &mut PostgresBackend<IO>,
889 0 : cancel: &CancellationToken,
890 0 : ) -> Result<(), QueryError>
891 0 : where
892 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
893 0 : {
894 0 : tokio::select!(
895 0 : flush_r = pgb.flush() => {
896 0 : Ok(flush_r?)
897 : },
898 0 : _ = cancel.cancelled() => {
899 0 : Err(QueryError::Shutdown)
900 : }
901 : )
902 0 : }
903 :
904 : #[allow(clippy::too_many_arguments)]
905 0 : async fn pagestream_read_message<IO>(
906 0 : pgb: &mut PostgresBackendReader<IO>,
907 0 : tenant_id: TenantId,
908 0 : timeline_id: TimelineId,
909 0 : timeline_handles: &mut TimelineHandles,
910 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
911 0 : cancel: &CancellationToken,
912 0 : ctx: &RequestContext,
913 0 : protocol_version: PagestreamProtocolVersion,
914 0 : parent_span: Span,
915 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
916 0 : where
917 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
918 0 : {
919 0 : let msg = tokio::select! {
920 : biased;
921 0 : _ = cancel.cancelled() => {
922 0 : return Err(QueryError::Shutdown)
923 : }
924 0 : msg = pgb.read_message() => { msg }
925 0 : };
926 0 :
927 0 : let received_at = Instant::now();
928 :
929 0 : let copy_data_bytes = match msg? {
930 0 : Some(FeMessage::CopyData(bytes)) => bytes,
931 : Some(FeMessage::Terminate) => {
932 0 : return Ok(None);
933 : }
934 0 : Some(m) => {
935 0 : return Err(QueryError::Other(anyhow::anyhow!(
936 0 : "unexpected message: {m:?} during COPY"
937 0 : )));
938 : }
939 : None => {
940 0 : return Ok(None);
941 : } // client disconnected
942 : };
943 0 : trace!("query: {copy_data_bytes:?}");
944 :
945 0 : fail::fail_point!("ps::handle-pagerequest-message");
946 :
947 : // parse request
948 0 : let neon_fe_msg =
949 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
950 :
951 : // TODO: turn in to async closure once available to avoid repeating received_at
952 0 : async fn record_op_start_and_throttle(
953 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
954 0 : op: metrics::SmgrQueryType,
955 0 : received_at: Instant,
956 0 : ) -> Result<SmgrOpTimer, QueryError> {
957 0 : // It's important to start the smgr op metric recorder as early as possible
958 0 : // so that the _started counters are incremented before we do
959 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
960 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
961 0 : let now = Instant::now();
962 0 : timer.observe_throttle_start(now);
963 0 : let throttled = tokio::select! {
964 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
965 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
966 : };
967 0 : timer.observe_throttle_done(throttled);
968 0 : Ok(timer)
969 0 : }
970 :
971 0 : let batched_msg = match neon_fe_msg {
972 0 : PagestreamFeMessage::Exists(req) => {
973 0 : let shard = timeline_handles
974 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
975 0 : .await?;
976 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
977 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
978 0 : let timer = record_op_start_and_throttle(
979 0 : &shard,
980 0 : metrics::SmgrQueryType::GetRelExists,
981 0 : received_at,
982 0 : )
983 0 : .await?;
984 0 : BatchedFeMessage::Exists {
985 0 : span,
986 0 : timer,
987 0 : shard: shard.downgrade(),
988 0 : req,
989 0 : }
990 : }
991 0 : PagestreamFeMessage::Nblocks(req) => {
992 0 : let shard = timeline_handles
993 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
994 0 : .await?;
995 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
996 0 : let timer = record_op_start_and_throttle(
997 0 : &shard,
998 0 : metrics::SmgrQueryType::GetRelSize,
999 0 : received_at,
1000 0 : )
1001 0 : .await?;
1002 0 : BatchedFeMessage::Nblocks {
1003 0 : span,
1004 0 : timer,
1005 0 : shard: shard.downgrade(),
1006 0 : req,
1007 0 : }
1008 : }
1009 0 : PagestreamFeMessage::DbSize(req) => {
1010 0 : let shard = timeline_handles
1011 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1012 0 : .await?;
1013 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1014 0 : let timer = record_op_start_and_throttle(
1015 0 : &shard,
1016 0 : metrics::SmgrQueryType::GetDbSize,
1017 0 : received_at,
1018 0 : )
1019 0 : .await?;
1020 0 : BatchedFeMessage::DbSize {
1021 0 : span,
1022 0 : timer,
1023 0 : shard: shard.downgrade(),
1024 0 : req,
1025 0 : }
1026 : }
1027 0 : PagestreamFeMessage::GetSlruSegment(req) => {
1028 0 : let shard = timeline_handles
1029 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1030 0 : .await?;
1031 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
1032 0 : let timer = record_op_start_and_throttle(
1033 0 : &shard,
1034 0 : metrics::SmgrQueryType::GetSlruSegment,
1035 0 : received_at,
1036 0 : )
1037 0 : .await?;
1038 0 : BatchedFeMessage::GetSlruSegment {
1039 0 : span,
1040 0 : timer,
1041 0 : shard: shard.downgrade(),
1042 0 : req,
1043 0 : }
1044 : }
1045 0 : PagestreamFeMessage::GetPage(req) => {
1046 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
1047 : macro_rules! mkspan {
1048 : (before shard routing) => {{
1049 : tracing::info_span!(
1050 : parent: &parent_span,
1051 : "handle_get_page_request",
1052 : request_id = %req.hdr.reqid,
1053 : rel = %req.rel,
1054 : blkno = %req.blkno,
1055 : req_lsn = %req.hdr.request_lsn,
1056 : not_modified_since_lsn = %req.hdr.not_modified_since,
1057 : )
1058 : }};
1059 : ($shard_id:expr) => {{
1060 : tracing::info_span!(
1061 : parent: &parent_span,
1062 : "handle_get_page_request",
1063 : request_id = %req.hdr.reqid,
1064 : rel = %req.rel,
1065 : blkno = %req.blkno,
1066 : req_lsn = %req.hdr.request_lsn,
1067 : not_modified_since_lsn = %req.hdr.not_modified_since,
1068 : shard_id = %$shard_id,
1069 : )
1070 : }};
1071 : }
1072 :
1073 : macro_rules! respond_error {
1074 : ($span:expr, $error:expr) => {{
1075 : let error = BatchedFeMessage::RespondError {
1076 : span: $span,
1077 : error: BatchedPageStreamError {
1078 : req: req.hdr,
1079 : err: $error,
1080 : },
1081 : };
1082 : Ok(Some(error))
1083 : }};
1084 : }
1085 :
1086 0 : let key = rel_block_to_key(req.rel, req.blkno);
1087 :
1088 0 : let res = timeline_handles
1089 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
1090 0 : .await;
1091 :
1092 0 : let shard = match res {
1093 0 : Ok(tl) => tl,
1094 0 : Err(e) => {
1095 0 : let span = mkspan!(before shard routing);
1096 0 : match e {
1097 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
1098 : // We already know this tenant exists in general, because we resolved it at
1099 : // start of connection. Getting a NotFound here indicates that the shard containing
1100 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1101 : // mapping is out of date.
1102 : //
1103 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1104 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1105 : // and talk to a different pageserver.
1106 0 : return respond_error!(
1107 0 : span,
1108 0 : PageStreamError::Reconnect(
1109 0 : "getpage@lsn request routed to wrong shard".into()
1110 0 : )
1111 0 : );
1112 : }
1113 0 : e => {
1114 0 : return respond_error!(span, e.into());
1115 : }
1116 : }
1117 : }
1118 : };
1119 :
1120 0 : let ctx = if shard.is_get_page_request_sampled() {
1121 0 : RequestContextBuilder::from(ctx)
1122 0 : .root_perf_span(|| {
1123 0 : info_span!(
1124 : target: PERF_TRACE_TARGET,
1125 : "GET_PAGE",
1126 : peer_addr = conn_perf_span_fields.peer_addr,
1127 : application_name = conn_perf_span_fields.application_name,
1128 : compute_mode = conn_perf_span_fields.compute_mode,
1129 : tenant_id = %tenant_id,
1130 0 : shard_id = %shard.get_shard_identity().shard_slug(),
1131 : timeline_id = %timeline_id,
1132 : lsn = %req.hdr.request_lsn,
1133 : not_modified_since_lsn = %req.hdr.not_modified_since,
1134 : request_id = %req.hdr.reqid,
1135 : key = %key,
1136 : )
1137 0 : })
1138 0 : .attached_child()
1139 : } else {
1140 0 : ctx.attached_child()
1141 : };
1142 :
1143 : // This ctx travels as part of the BatchedFeMessage through
1144 : // batching into the request handler.
1145 : // The request handler needs to do some per-request work
1146 : // (relsize check) before dispatching the batch as a single
1147 : // get_vectored call to the Timeline.
1148 : // This ctx will be used for the reslize check, whereas the
1149 : // get_vectored call will be a different ctx with separate
1150 : // perf span.
1151 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1152 :
1153 : // Similar game for this `span`: we funnel it through so that
1154 : // request handler log messages contain the request-specific fields.
1155 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1156 :
1157 0 : let timer = record_op_start_and_throttle(
1158 0 : &shard,
1159 0 : metrics::SmgrQueryType::GetPageAtLsn,
1160 0 : received_at,
1161 0 : )
1162 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1163 0 : info_span!(
1164 : target: PERF_TRACE_TARGET,
1165 0 : parent: current_perf_span,
1166 : "THROTTLE",
1167 : )
1168 0 : })
1169 0 : .await?;
1170 :
1171 : // We're holding the Handle
1172 0 : let effective_lsn = match Self::effective_request_lsn(
1173 0 : &shard,
1174 0 : shard.get_last_record_lsn(),
1175 0 : req.hdr.request_lsn,
1176 0 : req.hdr.not_modified_since,
1177 0 : &shard.get_applied_gc_cutoff_lsn(),
1178 0 : ) {
1179 0 : Ok(lsn) => lsn,
1180 0 : Err(e) => {
1181 0 : return respond_error!(span, e);
1182 : }
1183 : };
1184 :
1185 : BatchedFeMessage::GetPage {
1186 0 : span,
1187 0 : shard: shard.downgrade(),
1188 0 : pages: smallvec::smallvec![BatchedGetPageRequest {
1189 0 : req,
1190 0 : timer,
1191 0 : lsn_range: LsnRange {
1192 0 : effective_lsn,
1193 0 : request_lsn: req.hdr.request_lsn
1194 0 : },
1195 0 : ctx,
1196 0 : }],
1197 : // The executor grabs the batch when it becomes idle.
1198 : // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
1199 : // default reason for breaking the batch.
1200 0 : batch_break_reason: GetPageBatchBreakReason::ExecutorSteal,
1201 : }
1202 : }
1203 : #[cfg(feature = "testing")]
1204 0 : PagestreamFeMessage::Test(req) => {
1205 0 : let shard = timeline_handles
1206 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1207 0 : .await?;
1208 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1209 0 : let timer =
1210 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
1211 0 : .await?;
1212 0 : BatchedFeMessage::Test {
1213 0 : span,
1214 0 : shard: shard.downgrade(),
1215 0 : requests: vec![BatchedTestRequest { req, timer }],
1216 0 : }
1217 : }
1218 : };
1219 0 : Ok(Some(batched_msg))
1220 0 : }
1221 :
1222 : /// Post-condition: `batch` is Some()
1223 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1224 : #[allow(clippy::boxed_local)]
1225 : fn pagestream_do_batch(
1226 : batching_strategy: PageServiceProtocolPipelinedBatchingStrategy,
1227 : max_batch_size: NonZeroUsize,
1228 : batch: &mut Result<BatchedFeMessage, QueryError>,
1229 : this_msg: Result<BatchedFeMessage, QueryError>,
1230 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1231 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1232 :
1233 : let this_msg = match this_msg {
1234 : Ok(this_msg) => this_msg,
1235 : Err(e) => return Err(Err(e)),
1236 : };
1237 :
1238 : let eligible_batch = match batch {
1239 : Ok(b) => b,
1240 : Err(_) => {
1241 : return Err(Ok(this_msg));
1242 : }
1243 : };
1244 :
1245 : let batch_break =
1246 : eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy);
1247 :
1248 : match batch_break {
1249 : Some(reason) => {
1250 : if let BatchedFeMessage::GetPage {
1251 : batch_break_reason, ..
1252 : } = eligible_batch
1253 : {
1254 : *batch_break_reason = reason;
1255 : }
1256 :
1257 : Err(Ok(this_msg))
1258 : }
1259 : None => {
1260 : // ok to batch
1261 : match (eligible_batch, this_msg) {
1262 : (
1263 : BatchedFeMessage::GetPage {
1264 : pages: accum_pages, ..
1265 : },
1266 : BatchedFeMessage::GetPage {
1267 : pages: this_pages, ..
1268 : },
1269 : ) => {
1270 : accum_pages.extend(this_pages);
1271 : Ok(())
1272 : }
1273 : #[cfg(feature = "testing")]
1274 : (
1275 : BatchedFeMessage::Test {
1276 : requests: accum_requests,
1277 : ..
1278 : },
1279 : BatchedFeMessage::Test {
1280 : requests: this_requests,
1281 : ..
1282 : },
1283 : ) => {
1284 : accum_requests.extend(this_requests);
1285 : Ok(())
1286 : }
1287 : // Shape guaranteed by [`BatchedFeMessage::should_break_batch`]
1288 : _ => unreachable!(),
1289 : }
1290 : }
1291 : }
1292 : }
1293 :
1294 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1295 : async fn pagestream_handle_batched_message<IO>(
1296 : &mut self,
1297 : pgb_writer: &mut PostgresBackend<IO>,
1298 : batch: BatchedFeMessage,
1299 : io_concurrency: IoConcurrency,
1300 : cancel: &CancellationToken,
1301 : protocol_version: PagestreamProtocolVersion,
1302 : ctx: &RequestContext,
1303 : ) -> Result<(), QueryError>
1304 : where
1305 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1306 : {
1307 : let started_at = Instant::now();
1308 : let batch = {
1309 : let mut batch = batch;
1310 : batch.observe_execution_start(started_at);
1311 : batch
1312 : };
1313 :
1314 : // Dispatch the batch to the appropriate request handler.
1315 : let log_slow_name = batch.as_static_str();
1316 : let (mut handler_results, span) = {
1317 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1318 : // won't fit on the stack.
1319 : let mut boxpinned =
1320 : Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
1321 : log_slow(
1322 : log_slow_name,
1323 : LOG_SLOW_GETPAGE_THRESHOLD,
1324 : boxpinned.as_mut(),
1325 : )
1326 : .await?
1327 : };
1328 :
1329 : // We purposefully don't count flush time into the smgr operation timer.
1330 : //
1331 : // The reason is that current compute client will not perform protocol processing
1332 : // if the postgres backend process is doing things other than `->smgr_read()`.
1333 : // This is especially the case for prefetch.
1334 : //
1335 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1336 : // all the way into our flush call below.
1337 : //
1338 : // The timer's underlying metric is used for a storage-internal latency SLO and
1339 : // we don't want to include latency in it that we can't control.
1340 : // And as pointed out above, in this case, we don't control the time that flush will take.
1341 : //
1342 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1343 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1344 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1345 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1346 : let flush_timers = {
1347 : let flushing_start_time = Instant::now();
1348 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1349 : for handler_result in &mut handler_results {
1350 : let flush_timer = match handler_result {
1351 : Ok((_, timer)) => Some(
1352 : timer
1353 : .observe_execution_end(flushing_start_time)
1354 : .expect("we are the first caller"),
1355 : ),
1356 : Err(_) => {
1357 : // TODO: measure errors
1358 : None
1359 : }
1360 : };
1361 : flush_timers.push(flush_timer);
1362 : }
1363 : assert_eq!(flush_timers.len(), handler_results.len());
1364 : flush_timers
1365 : };
1366 :
1367 : // Map handler result to protocol behavior.
1368 : // Some handler errors cause exit from pagestream protocol.
1369 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1370 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1371 : let response_msg = match handler_result {
1372 : Err(e) => match &e.err {
1373 : PageStreamError::Shutdown => {
1374 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1375 : // shutdown, then do not send the error to the client. Instead just drop the
1376 : // connection.
1377 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1378 : return Err(QueryError::Shutdown);
1379 : }
1380 : PageStreamError::Reconnect(reason) => {
1381 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1382 : return Err(QueryError::Reconnect);
1383 : }
1384 : PageStreamError::Read(_)
1385 : | PageStreamError::LsnTimeout(_)
1386 : | PageStreamError::NotFound(_)
1387 : | PageStreamError::BadRequest(_) => {
1388 : // print the all details to the log with {:#}, but for the client the
1389 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1390 : // here includes cancellation which is not an error.
1391 : let full = utils::error::report_compact_sources(&e.err);
1392 0 : span.in_scope(|| {
1393 0 : error!("error reading relation or page version: {full:#}")
1394 0 : });
1395 :
1396 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1397 : req: e.req,
1398 : message: e.err.to_string(),
1399 : })
1400 : }
1401 : },
1402 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1403 : };
1404 :
1405 : //
1406 : // marshal & transmit response message
1407 : //
1408 :
1409 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1410 : &response_msg.serialize(protocol_version),
1411 : ))?;
1412 :
1413 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1414 :
1415 : // what we want to do
1416 : let socket_fd = pgb_writer.socket_fd;
1417 : let flush_fut = pgb_writer.flush();
1418 : // metric for how long flushing takes
1419 : let flush_fut = match flushing_timer {
1420 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1421 : Instant::now(),
1422 : flush_fut,
1423 : socket_fd,
1424 : )),
1425 : None => futures::future::Either::Right(flush_fut),
1426 : };
1427 : // do it while respecting cancellation
1428 0 : let _: () = async move {
1429 0 : tokio::select! {
1430 : biased;
1431 0 : _ = cancel.cancelled() => {
1432 : // We were requested to shut down.
1433 0 : info!("shutdown request received in page handler");
1434 0 : return Err(QueryError::Shutdown)
1435 : }
1436 0 : res = flush_fut => {
1437 0 : res?;
1438 : }
1439 : }
1440 0 : Ok(())
1441 0 : }
1442 : .await?;
1443 : }
1444 : Ok(())
1445 : }
1446 :
1447 : /// Helper which dispatches a batched message to the appropriate handler.
1448 : /// Returns a vec of results, along with the extracted trace span.
1449 0 : async fn pagestream_dispatch_batched_message(
1450 0 : &mut self,
1451 0 : batch: BatchedFeMessage,
1452 0 : io_concurrency: IoConcurrency,
1453 0 : ctx: &RequestContext,
1454 0 : ) -> Result<
1455 0 : (
1456 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1457 0 : Span,
1458 0 : ),
1459 0 : QueryError,
1460 0 : > {
1461 : macro_rules! upgrade_handle_and_set_context {
1462 : ($shard:ident) => {{
1463 : let weak_handle = &$shard;
1464 : let handle = weak_handle.upgrade()?;
1465 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1466 : (handle, ctx)
1467 : }};
1468 : }
1469 0 : Ok(match batch {
1470 : BatchedFeMessage::Exists {
1471 0 : span,
1472 0 : timer,
1473 0 : shard,
1474 0 : req,
1475 0 : } => {
1476 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1477 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1478 : (
1479 0 : vec![
1480 0 : self.handle_get_rel_exists_request(&shard, &req, &ctx)
1481 0 : .instrument(span.clone())
1482 0 : .await
1483 0 : .map(|msg| (msg, timer))
1484 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1485 0 : ],
1486 0 : span,
1487 : )
1488 : }
1489 : BatchedFeMessage::Nblocks {
1490 0 : span,
1491 0 : timer,
1492 0 : shard,
1493 0 : req,
1494 0 : } => {
1495 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1496 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1497 : (
1498 0 : vec![
1499 0 : self.handle_get_nblocks_request(&shard, &req, &ctx)
1500 0 : .instrument(span.clone())
1501 0 : .await
1502 0 : .map(|msg| (msg, timer))
1503 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1504 0 : ],
1505 0 : span,
1506 : )
1507 : }
1508 : BatchedFeMessage::GetPage {
1509 0 : span,
1510 0 : shard,
1511 0 : pages,
1512 0 : batch_break_reason,
1513 0 : } => {
1514 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1515 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1516 : (
1517 : {
1518 0 : let npages = pages.len();
1519 0 : trace!(npages, "handling getpage request");
1520 0 : let res = self
1521 0 : .handle_get_page_at_lsn_request_batched(
1522 0 : &shard,
1523 0 : pages,
1524 0 : io_concurrency,
1525 0 : batch_break_reason,
1526 0 : &ctx,
1527 0 : )
1528 0 : .instrument(span.clone())
1529 0 : .await;
1530 0 : assert_eq!(res.len(), npages);
1531 0 : res
1532 0 : },
1533 0 : span,
1534 : )
1535 : }
1536 : BatchedFeMessage::DbSize {
1537 0 : span,
1538 0 : timer,
1539 0 : shard,
1540 0 : req,
1541 0 : } => {
1542 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1543 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1544 : (
1545 0 : vec![
1546 0 : self.handle_db_size_request(&shard, &req, &ctx)
1547 0 : .instrument(span.clone())
1548 0 : .await
1549 0 : .map(|msg| (msg, timer))
1550 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1551 0 : ],
1552 0 : span,
1553 : )
1554 : }
1555 : BatchedFeMessage::GetSlruSegment {
1556 0 : span,
1557 0 : timer,
1558 0 : shard,
1559 0 : req,
1560 0 : } => {
1561 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1562 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1563 : (
1564 0 : vec![
1565 0 : self.handle_get_slru_segment_request(&shard, &req, &ctx)
1566 0 : .instrument(span.clone())
1567 0 : .await
1568 0 : .map(|msg| (msg, timer))
1569 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1570 0 : ],
1571 0 : span,
1572 : )
1573 : }
1574 : #[cfg(feature = "testing")]
1575 : BatchedFeMessage::Test {
1576 0 : span,
1577 0 : shard,
1578 0 : requests,
1579 0 : } => {
1580 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1581 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1582 : (
1583 : {
1584 0 : let npages = requests.len();
1585 0 : trace!(npages, "handling getpage request");
1586 0 : let res = self
1587 0 : .handle_test_request_batch(&shard, requests, &ctx)
1588 0 : .instrument(span.clone())
1589 0 : .await;
1590 0 : assert_eq!(res.len(), npages);
1591 0 : res
1592 0 : },
1593 0 : span,
1594 : )
1595 : }
1596 0 : BatchedFeMessage::RespondError { span, error } => {
1597 0 : // We've already decided to respond with an error, so we don't need to
1598 0 : // call the handler.
1599 0 : (vec![Err(error)], span)
1600 : }
1601 : })
1602 0 : }
1603 :
1604 : /// Pagestream sub-protocol handler.
1605 : ///
1606 : /// It is a simple request-response protocol inside a COPYBOTH session.
1607 : ///
1608 : /// # Coding Discipline
1609 : ///
1610 : /// Coding discipline within this function: all interaction with the `pgb` connection
1611 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1612 : /// This is so that we can shutdown page_service quickly.
1613 : #[instrument(skip_all)]
1614 : async fn handle_pagerequests<IO>(
1615 : &mut self,
1616 : pgb: &mut PostgresBackend<IO>,
1617 : tenant_id: TenantId,
1618 : timeline_id: TimelineId,
1619 : protocol_version: PagestreamProtocolVersion,
1620 : ctx: RequestContext,
1621 : ) -> Result<(), QueryError>
1622 : where
1623 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1624 : {
1625 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1626 :
1627 : // switch client to COPYBOTH
1628 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1629 : tokio::select! {
1630 : biased;
1631 : _ = self.cancel.cancelled() => {
1632 : return Err(QueryError::Shutdown)
1633 : }
1634 : res = pgb.flush() => {
1635 : res?;
1636 : }
1637 : }
1638 :
1639 : let io_concurrency = IoConcurrency::spawn_from_conf(
1640 : self.get_vectored_concurrent_io,
1641 : match self.gate_guard.try_clone() {
1642 : Ok(guard) => guard,
1643 : Err(_) => {
1644 : info!("shutdown request received in page handler");
1645 : return Err(QueryError::Shutdown);
1646 : }
1647 : },
1648 : );
1649 :
1650 : let pgb_reader = pgb
1651 : .split()
1652 : .context("implementation error: split pgb into reader and writer")?;
1653 :
1654 : let timeline_handles = self
1655 : .timeline_handles
1656 : .take()
1657 : .expect("implementation error: timeline_handles should not be locked");
1658 :
1659 : let request_span = info_span!("request");
1660 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1661 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1662 : self.handle_pagerequests_pipelined(
1663 : pgb,
1664 : pgb_reader,
1665 : tenant_id,
1666 : timeline_id,
1667 : timeline_handles,
1668 : request_span,
1669 : pipelining_config,
1670 : protocol_version,
1671 : io_concurrency,
1672 : &ctx,
1673 : )
1674 : .await
1675 : }
1676 : PageServicePipeliningConfig::Serial => {
1677 : self.handle_pagerequests_serial(
1678 : pgb,
1679 : pgb_reader,
1680 : tenant_id,
1681 : timeline_id,
1682 : timeline_handles,
1683 : request_span,
1684 : protocol_version,
1685 : io_concurrency,
1686 : &ctx,
1687 : )
1688 : .await
1689 : }
1690 : };
1691 :
1692 : debug!("pagestream subprotocol shut down cleanly");
1693 :
1694 : pgb.unsplit(pgb_reader)
1695 : .context("implementation error: unsplit pgb")?;
1696 :
1697 : let replaced = self.timeline_handles.replace(timeline_handles);
1698 : assert!(replaced.is_none());
1699 :
1700 : result
1701 : }
1702 :
1703 : #[allow(clippy::too_many_arguments)]
1704 0 : async fn handle_pagerequests_serial<IO>(
1705 0 : &mut self,
1706 0 : pgb_writer: &mut PostgresBackend<IO>,
1707 0 : mut pgb_reader: PostgresBackendReader<IO>,
1708 0 : tenant_id: TenantId,
1709 0 : timeline_id: TimelineId,
1710 0 : mut timeline_handles: TimelineHandles,
1711 0 : request_span: Span,
1712 0 : protocol_version: PagestreamProtocolVersion,
1713 0 : io_concurrency: IoConcurrency,
1714 0 : ctx: &RequestContext,
1715 0 : ) -> (
1716 0 : (PostgresBackendReader<IO>, TimelineHandles),
1717 0 : Result<(), QueryError>,
1718 0 : )
1719 0 : where
1720 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1721 0 : {
1722 0 : let cancel = self.cancel.clone();
1723 :
1724 0 : let err = loop {
1725 0 : let msg = Self::pagestream_read_message(
1726 0 : &mut pgb_reader,
1727 0 : tenant_id,
1728 0 : timeline_id,
1729 0 : &mut timeline_handles,
1730 0 : &self.perf_span_fields,
1731 0 : &cancel,
1732 0 : ctx,
1733 0 : protocol_version,
1734 0 : request_span.clone(),
1735 0 : )
1736 0 : .await;
1737 0 : let msg = match msg {
1738 0 : Ok(msg) => msg,
1739 0 : Err(e) => break e,
1740 : };
1741 0 : let msg = match msg {
1742 0 : Some(msg) => msg,
1743 : None => {
1744 0 : debug!("pagestream subprotocol end observed");
1745 0 : return ((pgb_reader, timeline_handles), Ok(()));
1746 : }
1747 : };
1748 :
1749 0 : let result = self
1750 0 : .pagestream_handle_batched_message(
1751 0 : pgb_writer,
1752 0 : msg,
1753 0 : io_concurrency.clone(),
1754 0 : &cancel,
1755 0 : protocol_version,
1756 0 : ctx,
1757 0 : )
1758 0 : .await;
1759 0 : match result {
1760 0 : Ok(()) => {}
1761 0 : Err(e) => break e,
1762 : }
1763 : };
1764 0 : ((pgb_reader, timeline_handles), Err(err))
1765 0 : }
1766 :
1767 : /// # Cancel-Safety
1768 : ///
1769 : /// May leak tokio tasks if not polled to completion.
1770 : #[allow(clippy::too_many_arguments)]
1771 0 : async fn handle_pagerequests_pipelined<IO>(
1772 0 : &mut self,
1773 0 : pgb_writer: &mut PostgresBackend<IO>,
1774 0 : pgb_reader: PostgresBackendReader<IO>,
1775 0 : tenant_id: TenantId,
1776 0 : timeline_id: TimelineId,
1777 0 : mut timeline_handles: TimelineHandles,
1778 0 : request_span: Span,
1779 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1780 0 : protocol_version: PagestreamProtocolVersion,
1781 0 : io_concurrency: IoConcurrency,
1782 0 : ctx: &RequestContext,
1783 0 : ) -> (
1784 0 : (PostgresBackendReader<IO>, TimelineHandles),
1785 0 : Result<(), QueryError>,
1786 0 : )
1787 0 : where
1788 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1789 0 : {
1790 0 : //
1791 0 : // Pipelined pagestream handling consists of
1792 0 : // - a Batcher that reads requests off the wire and
1793 0 : // and batches them if possible,
1794 0 : // - an Executor that processes the batched requests.
1795 0 : //
1796 0 : // The batch is built up inside an `spsc_fold` channel,
1797 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1798 0 : //
1799 0 : // The Batcher continously folds client requests into the batch,
1800 0 : // while the Executor can at any time take out what's in the batch
1801 0 : // in order to process it.
1802 0 : // This means the next batch builds up while the Executor
1803 0 : // executes the last batch.
1804 0 : //
1805 0 : // CANCELLATION
1806 0 : //
1807 0 : // We run both Batcher and Executor futures to completion before
1808 0 : // returning from this function.
1809 0 : //
1810 0 : // If Executor exits first, it signals cancellation to the Batcher
1811 0 : // via a CancellationToken that is child of `self.cancel`.
1812 0 : // If Batcher exits first, it signals cancellation to the Executor
1813 0 : // by dropping the spsc_fold channel Sender.
1814 0 : //
1815 0 : // CLEAN SHUTDOWN
1816 0 : //
1817 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1818 0 : // In response to such a client message, the Batcher exits.
1819 0 : // The Executor continues to run, draining the spsc_fold channel.
1820 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1821 0 : // indicating that the sender disconnected.
1822 0 : // The Executor exits with Ok(()) in response to that error.
1823 0 : //
1824 0 : // Server initiated shutdown is not clean shutdown, but instead
1825 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1826 0 : // error propagation.
1827 0 : //
1828 0 : // ERROR PROPAGATION
1829 0 : //
1830 0 : // When the Batcher encounter an error, it sends it as a value
1831 0 : // through the spsc_fold channel and exits afterwards.
1832 0 : // When the Executor observes such an error in the channel,
1833 0 : // it exits returning that error value.
1834 0 : //
1835 0 : // This design ensures that the Executor stage will still process
1836 0 : // the batch that was in flight when the Batcher encountered an error,
1837 0 : // thereby beahving identical to a serial implementation.
1838 0 :
1839 0 : let PageServicePipeliningConfigPipelined {
1840 0 : max_batch_size,
1841 0 : execution,
1842 0 : batching: batching_strategy,
1843 0 : } = pipelining_config;
1844 :
1845 : // Macro to _define_ a pipeline stage.
1846 : macro_rules! pipeline_stage {
1847 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1848 : let cancel: CancellationToken = $cancel;
1849 : let stage_fut = $make_fut(cancel.clone());
1850 0 : async move {
1851 0 : scopeguard::defer! {
1852 0 : debug!("exiting");
1853 0 : }
1854 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1855 0 : .await
1856 0 : }
1857 : .instrument(tracing::info_span!($name))
1858 : }};
1859 : }
1860 :
1861 : //
1862 : // Batcher
1863 : //
1864 :
1865 0 : let perf_span_fields = self.perf_span_fields.clone();
1866 0 :
1867 0 : let cancel_batcher = self.cancel.child_token();
1868 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1869 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1870 0 : let ctx = ctx.attached_child();
1871 0 : async move {
1872 0 : let mut pgb_reader = pgb_reader;
1873 0 : let mut exit = false;
1874 0 : while !exit {
1875 0 : let read_res = Self::pagestream_read_message(
1876 0 : &mut pgb_reader,
1877 0 : tenant_id,
1878 0 : timeline_id,
1879 0 : &mut timeline_handles,
1880 0 : &perf_span_fields,
1881 0 : &cancel_batcher,
1882 0 : &ctx,
1883 0 : protocol_version,
1884 0 : request_span.clone(),
1885 0 : )
1886 0 : .await;
1887 0 : let Some(read_res) = read_res.transpose() else {
1888 0 : debug!("client-initiated shutdown");
1889 0 : break;
1890 : };
1891 0 : exit |= read_res.is_err();
1892 0 : let could_send = batch_tx
1893 0 : .send(read_res, |batch, res| {
1894 0 : Self::pagestream_do_batch(batching_strategy, max_batch_size, batch, res)
1895 0 : })
1896 0 : .await;
1897 0 : exit |= could_send.is_err();
1898 : }
1899 0 : (pgb_reader, timeline_handles)
1900 0 : }
1901 0 : });
1902 :
1903 : //
1904 : // Executor
1905 : //
1906 :
1907 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1908 0 : let ctx = ctx.attached_child();
1909 0 : async move {
1910 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1911 : loop {
1912 0 : let maybe_batch = batch_rx.recv().await;
1913 0 : let batch = match maybe_batch {
1914 0 : Ok(batch) => batch,
1915 : Err(spsc_fold::RecvError::SenderGone) => {
1916 0 : debug!("upstream gone");
1917 0 : return Ok(());
1918 : }
1919 : };
1920 0 : let batch = match batch {
1921 0 : Ok(batch) => batch,
1922 0 : Err(e) => {
1923 0 : return Err(e);
1924 : }
1925 : };
1926 0 : self.pagestream_handle_batched_message(
1927 0 : pgb_writer,
1928 0 : batch,
1929 0 : io_concurrency.clone(),
1930 0 : &cancel,
1931 0 : protocol_version,
1932 0 : &ctx,
1933 0 : )
1934 0 : .await?;
1935 : }
1936 0 : }
1937 0 : });
1938 :
1939 : //
1940 : // Execute the stages.
1941 : //
1942 :
1943 0 : match execution {
1944 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1945 0 : tokio::join!(batcher, executor)
1946 : }
1947 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1948 : // These tasks are not tracked anywhere.
1949 0 : let read_messages_task = tokio::spawn(batcher);
1950 0 : let (read_messages_task_res, executor_res_) =
1951 0 : tokio::join!(read_messages_task, executor,);
1952 0 : (
1953 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1954 0 : executor_res_,
1955 0 : )
1956 : }
1957 : }
1958 0 : }
1959 :
1960 : /// Helper function to handle the LSN from client request.
1961 : ///
1962 : /// Each GetPage (and Exists and Nblocks) request includes information about
1963 : /// which version of the page is being requested. The primary compute node
1964 : /// will always request the latest page version, by setting 'request_lsn' to
1965 : /// the last inserted or flushed WAL position, while a standby will request
1966 : /// a version at the LSN that it's currently caught up to.
1967 : ///
1968 : /// In either case, if the page server hasn't received the WAL up to the
1969 : /// requested LSN yet, we will wait for it to arrive. The return value is
1970 : /// the LSN that should be used to look up the page versions.
1971 : ///
1972 : /// In addition to the request LSN, each request carries another LSN,
1973 : /// 'not_modified_since', which is a hint to the pageserver that the client
1974 : /// knows that the page has not been modified between 'not_modified_since'
1975 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1976 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1977 : /// information about when the page was modified, it will use
1978 : /// not_modified_since == lsn. If the client lies and sends a too low
1979 : /// not_modified_hint such that there are in fact later page versions, the
1980 : /// behavior is undefined: the pageserver may return any of the page versions
1981 : /// or an error.
1982 0 : async fn wait_or_get_last_lsn(
1983 0 : timeline: &Timeline,
1984 0 : request_lsn: Lsn,
1985 0 : not_modified_since: Lsn,
1986 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1987 0 : ctx: &RequestContext,
1988 0 : ) -> Result<Lsn, PageStreamError> {
1989 0 : let last_record_lsn = timeline.get_last_record_lsn();
1990 0 : let effective_request_lsn = Self::effective_request_lsn(
1991 0 : timeline,
1992 0 : last_record_lsn,
1993 0 : request_lsn,
1994 0 : not_modified_since,
1995 0 : latest_gc_cutoff_lsn,
1996 0 : )?;
1997 :
1998 0 : if effective_request_lsn > last_record_lsn {
1999 0 : timeline
2000 0 : .wait_lsn(
2001 0 : not_modified_since,
2002 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2003 0 : timeline::WaitLsnTimeout::Default,
2004 0 : ctx,
2005 0 : )
2006 0 : .await?;
2007 :
2008 : // Since we waited for 'effective_request_lsn' to arrive, that is now the last
2009 : // record LSN. (Or close enough for our purposes; the last-record LSN can
2010 : // advance immediately after we return anyway)
2011 0 : }
2012 :
2013 0 : Ok(effective_request_lsn)
2014 0 : }
2015 :
2016 0 : fn effective_request_lsn(
2017 0 : timeline: &Timeline,
2018 0 : last_record_lsn: Lsn,
2019 0 : request_lsn: Lsn,
2020 0 : not_modified_since: Lsn,
2021 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
2022 0 : ) -> Result<Lsn, PageStreamError> {
2023 0 : // Sanity check the request
2024 0 : if request_lsn < not_modified_since {
2025 0 : return Err(PageStreamError::BadRequest(
2026 0 : format!(
2027 0 : "invalid request with request LSN {} and not_modified_since {}",
2028 0 : request_lsn, not_modified_since,
2029 0 : )
2030 0 : .into(),
2031 0 : ));
2032 0 : }
2033 0 :
2034 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
2035 0 : if request_lsn == Lsn::INVALID {
2036 0 : return Err(PageStreamError::BadRequest(
2037 0 : "invalid LSN(0) in request".into(),
2038 0 : ));
2039 0 : }
2040 0 :
2041 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
2042 0 : //
2043 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
2044 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
2045 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
2046 0 : let gc_info = &timeline.gc_info.read().unwrap();
2047 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
2048 0 : return Err(
2049 0 : PageStreamError::BadRequest(format!(
2050 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
2051 0 : request_lsn, **latest_gc_cutoff_lsn
2052 0 : ).into())
2053 0 : );
2054 0 : }
2055 0 : }
2056 :
2057 0 : if not_modified_since > last_record_lsn {
2058 0 : Ok(not_modified_since)
2059 : } else {
2060 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
2061 : // here instead. That would give the same result, since we know that there
2062 : // haven't been any modifications since 'not_modified_since'. Using an older
2063 : // LSN might be faster, because that could allow skipping recent layers when
2064 : // finding the page. However, we have historically used 'last_record_lsn', so
2065 : // stick to that for now.
2066 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
2067 : }
2068 0 : }
2069 :
2070 : /// Handles the lsn lease request.
2071 : /// If a lease cannot be obtained, the client will receive NULL.
2072 : #[instrument(skip_all, fields(shard_id, %lsn))]
2073 : async fn handle_make_lsn_lease<IO>(
2074 : &mut self,
2075 : pgb: &mut PostgresBackend<IO>,
2076 : tenant_shard_id: TenantShardId,
2077 : timeline_id: TimelineId,
2078 : lsn: Lsn,
2079 : ctx: &RequestContext,
2080 : ) -> Result<(), QueryError>
2081 : where
2082 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2083 : {
2084 : let timeline = self
2085 : .timeline_handles
2086 : .as_mut()
2087 : .unwrap()
2088 : .get(
2089 : tenant_shard_id.tenant_id,
2090 : timeline_id,
2091 : ShardSelector::Known(tenant_shard_id.to_index()),
2092 : )
2093 : .await?;
2094 : set_tracing_field_shard_id(&timeline);
2095 :
2096 : let lease = timeline
2097 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
2098 0 : .inspect_err(|e| {
2099 0 : warn!("{e}");
2100 0 : })
2101 : .ok();
2102 0 : let valid_until_str = lease.map(|l| {
2103 0 : l.valid_until
2104 0 : .duration_since(SystemTime::UNIX_EPOCH)
2105 0 : .expect("valid_until is earlier than UNIX_EPOCH")
2106 0 : .as_millis()
2107 0 : .to_string()
2108 0 : });
2109 :
2110 : info!(
2111 : "acquired lease for {} until {}",
2112 : lsn,
2113 : valid_until_str.as_deref().unwrap_or("<unknown>")
2114 : );
2115 :
2116 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
2117 :
2118 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
2119 : b"valid_until",
2120 : )]))?
2121 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
2122 :
2123 : Ok(())
2124 : }
2125 :
2126 : #[instrument(skip_all, fields(shard_id))]
2127 : async fn handle_get_rel_exists_request(
2128 : &mut self,
2129 : timeline: &Timeline,
2130 : req: &PagestreamExistsRequest,
2131 : ctx: &RequestContext,
2132 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2133 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2134 : let lsn = Self::wait_or_get_last_lsn(
2135 : timeline,
2136 : req.hdr.request_lsn,
2137 : req.hdr.not_modified_since,
2138 : &latest_gc_cutoff_lsn,
2139 : ctx,
2140 : )
2141 : .await?;
2142 :
2143 : let exists = timeline
2144 : .get_rel_exists(
2145 : req.rel,
2146 : Version::LsnRange(LsnRange {
2147 : effective_lsn: lsn,
2148 : request_lsn: req.hdr.request_lsn,
2149 : }),
2150 : ctx,
2151 : )
2152 : .await?;
2153 :
2154 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
2155 : req: *req,
2156 : exists,
2157 : }))
2158 : }
2159 :
2160 : #[instrument(skip_all, fields(shard_id))]
2161 : async fn handle_get_nblocks_request(
2162 : &mut self,
2163 : timeline: &Timeline,
2164 : req: &PagestreamNblocksRequest,
2165 : ctx: &RequestContext,
2166 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2167 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2168 : let lsn = Self::wait_or_get_last_lsn(
2169 : timeline,
2170 : req.hdr.request_lsn,
2171 : req.hdr.not_modified_since,
2172 : &latest_gc_cutoff_lsn,
2173 : ctx,
2174 : )
2175 : .await?;
2176 :
2177 : let n_blocks = timeline
2178 : .get_rel_size(
2179 : req.rel,
2180 : Version::LsnRange(LsnRange {
2181 : effective_lsn: lsn,
2182 : request_lsn: req.hdr.request_lsn,
2183 : }),
2184 : ctx,
2185 : )
2186 : .await?;
2187 :
2188 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
2189 : req: *req,
2190 : n_blocks,
2191 : }))
2192 : }
2193 :
2194 : #[instrument(skip_all, fields(shard_id))]
2195 : async fn handle_db_size_request(
2196 : &mut self,
2197 : timeline: &Timeline,
2198 : req: &PagestreamDbSizeRequest,
2199 : ctx: &RequestContext,
2200 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2201 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2202 : let lsn = Self::wait_or_get_last_lsn(
2203 : timeline,
2204 : req.hdr.request_lsn,
2205 : req.hdr.not_modified_since,
2206 : &latest_gc_cutoff_lsn,
2207 : ctx,
2208 : )
2209 : .await?;
2210 :
2211 : let total_blocks = timeline
2212 : .get_db_size(
2213 : DEFAULTTABLESPACE_OID,
2214 : req.dbnode,
2215 : Version::LsnRange(LsnRange {
2216 : effective_lsn: lsn,
2217 : request_lsn: req.hdr.request_lsn,
2218 : }),
2219 : ctx,
2220 : )
2221 : .await?;
2222 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2223 :
2224 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
2225 : req: *req,
2226 : db_size,
2227 : }))
2228 : }
2229 :
2230 : #[instrument(skip_all)]
2231 : async fn handle_get_page_at_lsn_request_batched(
2232 : &mut self,
2233 : timeline: &Timeline,
2234 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
2235 : io_concurrency: IoConcurrency,
2236 : batch_break_reason: GetPageBatchBreakReason,
2237 : ctx: &RequestContext,
2238 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2239 : debug_assert_current_span_has_tenant_and_timeline_id();
2240 :
2241 : timeline
2242 : .query_metrics
2243 : .observe_getpage_batch_start(requests.len(), batch_break_reason);
2244 :
2245 : // If a page trace is running, submit an event for this request.
2246 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2247 : let time = SystemTime::now();
2248 : for batch in &requests {
2249 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2250 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2251 : _ = page_trace.try_send(PageTraceEvent {
2252 : key,
2253 : effective_lsn: batch.lsn_range.effective_lsn,
2254 : time,
2255 : });
2256 : }
2257 : }
2258 :
2259 : // If any request in the batch needs to wait for LSN, then do so now.
2260 : let mut perf_instrument = false;
2261 : let max_effective_lsn = requests
2262 : .iter()
2263 0 : .map(|req| {
2264 0 : if req.ctx.has_perf_span() {
2265 0 : perf_instrument = true;
2266 0 : }
2267 :
2268 0 : req.lsn_range.effective_lsn
2269 0 : })
2270 : .max()
2271 : .expect("batch is never empty");
2272 :
2273 : let ctx = match perf_instrument {
2274 : true => RequestContextBuilder::from(ctx)
2275 0 : .root_perf_span(|| {
2276 0 : info_span!(
2277 : target: PERF_TRACE_TARGET,
2278 : "GET_VECTORED",
2279 : tenant_id = %timeline.tenant_shard_id.tenant_id,
2280 : timeline_id = %timeline.timeline_id,
2281 0 : shard = %timeline.tenant_shard_id.shard_slug(),
2282 : %max_effective_lsn
2283 : )
2284 0 : })
2285 : .attached_child(),
2286 : false => ctx.attached_child(),
2287 : };
2288 :
2289 : let last_record_lsn = timeline.get_last_record_lsn();
2290 : if max_effective_lsn > last_record_lsn {
2291 : if let Err(e) = timeline
2292 : .wait_lsn(
2293 : max_effective_lsn,
2294 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2295 : timeline::WaitLsnTimeout::Default,
2296 : &ctx,
2297 : )
2298 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
2299 0 : info_span!(
2300 : target: PERF_TRACE_TARGET,
2301 0 : parent: current_perf_span,
2302 : "WAIT_LSN",
2303 : )
2304 0 : })
2305 : .await
2306 : {
2307 0 : return Vec::from_iter(requests.into_iter().map(|req| {
2308 0 : Err(BatchedPageStreamError {
2309 0 : err: PageStreamError::from(e.clone()),
2310 0 : req: req.req.hdr,
2311 0 : })
2312 0 : }));
2313 : }
2314 : }
2315 :
2316 : let results = timeline
2317 : .get_rel_page_at_lsn_batched(
2318 0 : requests.iter().map(|p| {
2319 0 : (
2320 0 : &p.req.rel,
2321 0 : &p.req.blkno,
2322 0 : p.lsn_range,
2323 0 : p.ctx.attached_child(),
2324 0 : )
2325 0 : }),
2326 : io_concurrency,
2327 : &ctx,
2328 : )
2329 : .await;
2330 : assert_eq!(results.len(), requests.len());
2331 :
2332 : // TODO: avoid creating the new Vec here
2333 : Vec::from_iter(
2334 : requests
2335 : .into_iter()
2336 : .zip(results.into_iter())
2337 0 : .map(|(req, res)| {
2338 0 : res.map(|page| {
2339 0 : (
2340 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
2341 0 : req: req.req,
2342 0 : page,
2343 0 : }),
2344 0 : req.timer,
2345 0 : )
2346 0 : })
2347 0 : .map_err(|e| BatchedPageStreamError {
2348 0 : err: PageStreamError::from(e),
2349 0 : req: req.req.hdr,
2350 0 : })
2351 0 : }),
2352 : )
2353 : }
2354 :
2355 : #[instrument(skip_all, fields(shard_id))]
2356 : async fn handle_get_slru_segment_request(
2357 : &mut self,
2358 : timeline: &Timeline,
2359 : req: &PagestreamGetSlruSegmentRequest,
2360 : ctx: &RequestContext,
2361 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2362 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2363 : let lsn = Self::wait_or_get_last_lsn(
2364 : timeline,
2365 : req.hdr.request_lsn,
2366 : req.hdr.not_modified_since,
2367 : &latest_gc_cutoff_lsn,
2368 : ctx,
2369 : )
2370 : .await?;
2371 :
2372 : let kind = SlruKind::from_repr(req.kind)
2373 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2374 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2375 :
2376 : Ok(PagestreamBeMessage::GetSlruSegment(
2377 : PagestreamGetSlruSegmentResponse { req: *req, segment },
2378 : ))
2379 : }
2380 :
2381 : // NB: this impl mimics what we do for batched getpage requests.
2382 : #[cfg(feature = "testing")]
2383 : #[instrument(skip_all, fields(shard_id))]
2384 : async fn handle_test_request_batch(
2385 : &mut self,
2386 : timeline: &Timeline,
2387 : requests: Vec<BatchedTestRequest>,
2388 : _ctx: &RequestContext,
2389 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2390 : // real requests would do something with the timeline
2391 : let mut results = Vec::with_capacity(requests.len());
2392 : for _req in requests.iter() {
2393 : tokio::task::yield_now().await;
2394 :
2395 : results.push({
2396 : if timeline.cancel.is_cancelled() {
2397 : Err(PageReconstructError::Cancelled)
2398 : } else {
2399 : Ok(())
2400 : }
2401 : });
2402 : }
2403 :
2404 : // TODO: avoid creating the new Vec here
2405 : Vec::from_iter(
2406 : requests
2407 : .into_iter()
2408 : .zip(results.into_iter())
2409 0 : .map(|(req, res)| {
2410 0 : res.map(|()| {
2411 0 : (
2412 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2413 0 : req: req.req.clone(),
2414 0 : }),
2415 0 : req.timer,
2416 0 : )
2417 0 : })
2418 0 : .map_err(|e| BatchedPageStreamError {
2419 0 : err: PageStreamError::from(e),
2420 0 : req: req.req.hdr,
2421 0 : })
2422 0 : }),
2423 : )
2424 : }
2425 :
2426 : /// Note on "fullbackup":
2427 : /// Full basebackups should only be used for debugging purposes.
2428 : /// Originally, it was introduced to enable breaking storage format changes,
2429 : /// but that is not applicable anymore.
2430 : ///
2431 : /// # Coding Discipline
2432 : ///
2433 : /// Coding discipline within this function: all interaction with the `pgb` connection
2434 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2435 : /// This is so that we can shutdown page_service quickly.
2436 : ///
2437 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2438 : /// to connection cancellation.
2439 : #[allow(clippy::too_many_arguments)]
2440 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2441 : async fn handle_basebackup_request<IO>(
2442 : &mut self,
2443 : pgb: &mut PostgresBackend<IO>,
2444 : tenant_id: TenantId,
2445 : timeline_id: TimelineId,
2446 : lsn: Option<Lsn>,
2447 : prev_lsn: Option<Lsn>,
2448 : full_backup: bool,
2449 : gzip: bool,
2450 : replica: bool,
2451 : ctx: &RequestContext,
2452 : ) -> Result<(), QueryError>
2453 : where
2454 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2455 : {
2456 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2457 0 : match err {
2458 : // TODO: passthrough the error site to the final error message?
2459 0 : BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
2460 0 : BasebackupError::Server(e) => QueryError::Other(e),
2461 0 : BasebackupError::Shutdown => QueryError::Shutdown,
2462 : }
2463 0 : }
2464 :
2465 : let started = std::time::Instant::now();
2466 :
2467 : let timeline = self
2468 : .timeline_handles
2469 : .as_mut()
2470 : .unwrap()
2471 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2472 : .await?;
2473 : set_tracing_field_shard_id(&timeline);
2474 : let ctx = ctx.with_scope_timeline(&timeline);
2475 :
2476 : if timeline.is_archived() == Some(true) {
2477 : tracing::info!(
2478 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2479 : );
2480 : return Err(QueryError::NotFound("timeline is archived".into()));
2481 : }
2482 :
2483 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2484 : if let Some(lsn) = lsn {
2485 : // Backup was requested at a particular LSN. Wait for it to arrive.
2486 : info!("waiting for {}", lsn);
2487 : timeline
2488 : .wait_lsn(
2489 : lsn,
2490 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2491 : crate::tenant::timeline::WaitLsnTimeout::Default,
2492 : &ctx,
2493 : )
2494 : .await?;
2495 : timeline
2496 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2497 : .context("invalid basebackup lsn")?;
2498 : }
2499 :
2500 : let lsn_awaited_after = started.elapsed();
2501 :
2502 : // switch client to COPYOUT
2503 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2504 : .map_err(QueryError::Disconnected)?;
2505 : self.flush_cancellable(pgb, &self.cancel).await?;
2506 :
2507 : let mut from_cache = false;
2508 :
2509 : // Send a tarball of the latest layer on the timeline. Compress if not
2510 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2511 : if full_backup {
2512 : let mut writer = pgb.copyout_writer();
2513 : basebackup::send_basebackup_tarball(
2514 : &mut writer,
2515 : &timeline,
2516 : lsn,
2517 : prev_lsn,
2518 : full_backup,
2519 : replica,
2520 : &ctx,
2521 : )
2522 : .await
2523 : .map_err(map_basebackup_error)?;
2524 : } else {
2525 : let mut writer = BufWriter::new(pgb.copyout_writer());
2526 :
2527 : let cached = {
2528 : // Basebackup is cached only for this combination of parameters.
2529 : if timeline.is_basebackup_cache_enabled()
2530 : && gzip
2531 : && lsn.is_some()
2532 : && prev_lsn.is_none()
2533 : {
2534 : self.basebackup_cache
2535 : .get(tenant_id, timeline_id, lsn.unwrap())
2536 : .await
2537 : } else {
2538 : None
2539 : }
2540 : };
2541 :
2542 : if let Some(mut cached) = cached {
2543 : from_cache = true;
2544 : tokio::io::copy(&mut cached, &mut writer)
2545 : .await
2546 0 : .map_err(|e| {
2547 0 : map_basebackup_error(BasebackupError::Client(
2548 0 : e,
2549 0 : "handle_basebackup_request,cached,copy",
2550 0 : ))
2551 0 : })?;
2552 : } else if gzip {
2553 : let mut encoder = GzipEncoder::with_quality(
2554 : &mut writer,
2555 : // NOTE using fast compression because it's on the critical path
2556 : // for compute startup. For an empty database, we get
2557 : // <100KB with this method. The Level::Best compression method
2558 : // gives us <20KB, but maybe we should add basebackup caching
2559 : // on compute shutdown first.
2560 : async_compression::Level::Fastest,
2561 : );
2562 : basebackup::send_basebackup_tarball(
2563 : &mut encoder,
2564 : &timeline,
2565 : lsn,
2566 : prev_lsn,
2567 : full_backup,
2568 : replica,
2569 : &ctx,
2570 : )
2571 : .await
2572 : .map_err(map_basebackup_error)?;
2573 : // shutdown the encoder to ensure the gzip footer is written
2574 : encoder
2575 : .shutdown()
2576 : .await
2577 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2578 : } else {
2579 : basebackup::send_basebackup_tarball(
2580 : &mut writer,
2581 : &timeline,
2582 : lsn,
2583 : prev_lsn,
2584 : full_backup,
2585 : replica,
2586 : &ctx,
2587 : )
2588 : .await
2589 : .map_err(map_basebackup_error)?;
2590 : }
2591 0 : writer.flush().await.map_err(|e| {
2592 0 : map_basebackup_error(BasebackupError::Client(
2593 0 : e,
2594 0 : "handle_basebackup_request,flush",
2595 0 : ))
2596 0 : })?;
2597 : }
2598 :
2599 : pgb.write_message_noflush(&BeMessage::CopyDone)
2600 : .map_err(QueryError::Disconnected)?;
2601 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2602 :
2603 : let basebackup_after = started
2604 : .elapsed()
2605 : .checked_sub(lsn_awaited_after)
2606 : .unwrap_or(Duration::ZERO);
2607 :
2608 : info!(
2609 : lsn_await_millis = lsn_awaited_after.as_millis(),
2610 : basebackup_millis = basebackup_after.as_millis(),
2611 : %from_cache,
2612 : "basebackup complete"
2613 : );
2614 :
2615 : Ok(())
2616 : }
2617 :
2618 : // when accessing management api supply None as an argument
2619 : // when using to authorize tenant pass corresponding tenant id
2620 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2621 0 : if self.auth.is_none() {
2622 : // auth is set to Trust, nothing to check so just return ok
2623 0 : return Ok(());
2624 0 : }
2625 0 : // auth is some, just checked above, when auth is some
2626 0 : // then claims are always present because of checks during connection init
2627 0 : // so this expect won't trigger
2628 0 : let claims = self
2629 0 : .claims
2630 0 : .as_ref()
2631 0 : .expect("claims presence already checked");
2632 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2633 0 : }
2634 : }
2635 :
2636 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2637 : #[derive(Debug, Clone, Eq, PartialEq)]
2638 : struct BaseBackupCmd {
2639 : tenant_id: TenantId,
2640 : timeline_id: TimelineId,
2641 : lsn: Option<Lsn>,
2642 : gzip: bool,
2643 : replica: bool,
2644 : }
2645 :
2646 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2647 : #[derive(Debug, Clone, Eq, PartialEq)]
2648 : struct FullBackupCmd {
2649 : tenant_id: TenantId,
2650 : timeline_id: TimelineId,
2651 : lsn: Option<Lsn>,
2652 : prev_lsn: Option<Lsn>,
2653 : }
2654 :
2655 : /// `pagestream_v2 tenant timeline`
2656 : #[derive(Debug, Clone, Eq, PartialEq)]
2657 : struct PageStreamCmd {
2658 : tenant_id: TenantId,
2659 : timeline_id: TimelineId,
2660 : protocol_version: PagestreamProtocolVersion,
2661 : }
2662 :
2663 : /// `lease lsn tenant timeline lsn`
2664 : #[derive(Debug, Clone, Eq, PartialEq)]
2665 : struct LeaseLsnCmd {
2666 : tenant_shard_id: TenantShardId,
2667 : timeline_id: TimelineId,
2668 : lsn: Lsn,
2669 : }
2670 :
2671 : #[derive(Debug, Clone, Eq, PartialEq)]
2672 : enum PageServiceCmd {
2673 : Set,
2674 : PageStream(PageStreamCmd),
2675 : BaseBackup(BaseBackupCmd),
2676 : FullBackup(FullBackupCmd),
2677 : LeaseLsn(LeaseLsnCmd),
2678 : }
2679 :
2680 : impl PageStreamCmd {
2681 3 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2682 3 : let parameters = query.split_whitespace().collect_vec();
2683 3 : if parameters.len() != 2 {
2684 1 : bail!(
2685 1 : "invalid number of parameters for pagestream command: {}",
2686 1 : query
2687 1 : );
2688 2 : }
2689 2 : let tenant_id = TenantId::from_str(parameters[0])
2690 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2691 1 : let timeline_id = TimelineId::from_str(parameters[1])
2692 1 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2693 1 : Ok(Self {
2694 1 : tenant_id,
2695 1 : timeline_id,
2696 1 : protocol_version,
2697 1 : })
2698 3 : }
2699 : }
2700 :
2701 : impl FullBackupCmd {
2702 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2703 2 : let parameters = query.split_whitespace().collect_vec();
2704 2 : if parameters.len() < 2 || parameters.len() > 4 {
2705 0 : bail!(
2706 0 : "invalid number of parameters for basebackup command: {}",
2707 0 : query
2708 0 : );
2709 2 : }
2710 2 : let tenant_id = TenantId::from_str(parameters[0])
2711 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2712 2 : let timeline_id = TimelineId::from_str(parameters[1])
2713 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2714 : // The caller is responsible for providing correct lsn and prev_lsn.
2715 2 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2716 : Some(
2717 1 : Lsn::from_str(lsn_str)
2718 1 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2719 : )
2720 : } else {
2721 1 : None
2722 : };
2723 2 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2724 : Some(
2725 1 : Lsn::from_str(prev_lsn_str)
2726 1 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2727 : )
2728 : } else {
2729 1 : None
2730 : };
2731 2 : Ok(Self {
2732 2 : tenant_id,
2733 2 : timeline_id,
2734 2 : lsn,
2735 2 : prev_lsn,
2736 2 : })
2737 2 : }
2738 : }
2739 :
2740 : impl BaseBackupCmd {
2741 9 : fn parse(query: &str) -> anyhow::Result<Self> {
2742 9 : let parameters = query.split_whitespace().collect_vec();
2743 9 : if parameters.len() < 2 {
2744 0 : bail!(
2745 0 : "invalid number of parameters for basebackup command: {}",
2746 0 : query
2747 0 : );
2748 9 : }
2749 9 : let tenant_id = TenantId::from_str(parameters[0])
2750 9 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2751 9 : let timeline_id = TimelineId::from_str(parameters[1])
2752 9 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2753 : let lsn;
2754 : let flags_parse_from;
2755 9 : if let Some(maybe_lsn) = parameters.get(2) {
2756 8 : if *maybe_lsn == "latest" {
2757 1 : lsn = None;
2758 1 : flags_parse_from = 3;
2759 7 : } else if maybe_lsn.starts_with("--") {
2760 5 : lsn = None;
2761 5 : flags_parse_from = 2;
2762 5 : } else {
2763 : lsn = Some(
2764 2 : Lsn::from_str(maybe_lsn)
2765 2 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2766 : );
2767 2 : flags_parse_from = 3;
2768 : }
2769 1 : } else {
2770 1 : lsn = None;
2771 1 : flags_parse_from = 2;
2772 1 : }
2773 :
2774 9 : let mut gzip = false;
2775 9 : let mut replica = false;
2776 :
2777 11 : for ¶m in ¶meters[flags_parse_from..] {
2778 11 : match param {
2779 11 : "--gzip" => {
2780 7 : if gzip {
2781 1 : bail!("duplicate parameter for basebackup command: {param}")
2782 6 : }
2783 6 : gzip = true
2784 : }
2785 4 : "--replica" => {
2786 2 : if replica {
2787 0 : bail!("duplicate parameter for basebackup command: {param}")
2788 2 : }
2789 2 : replica = true
2790 : }
2791 2 : _ => bail!("invalid parameter for basebackup command: {param}"),
2792 : }
2793 : }
2794 6 : Ok(Self {
2795 6 : tenant_id,
2796 6 : timeline_id,
2797 6 : lsn,
2798 6 : gzip,
2799 6 : replica,
2800 6 : })
2801 9 : }
2802 : }
2803 :
2804 : impl LeaseLsnCmd {
2805 2 : fn parse(query: &str) -> anyhow::Result<Self> {
2806 2 : let parameters = query.split_whitespace().collect_vec();
2807 2 : if parameters.len() != 3 {
2808 0 : bail!(
2809 0 : "invalid number of parameters for lease lsn command: {}",
2810 0 : query
2811 0 : );
2812 2 : }
2813 2 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2814 2 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2815 2 : let timeline_id = TimelineId::from_str(parameters[1])
2816 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2817 2 : let lsn = Lsn::from_str(parameters[2])
2818 2 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2819 2 : Ok(Self {
2820 2 : tenant_shard_id,
2821 2 : timeline_id,
2822 2 : lsn,
2823 2 : })
2824 2 : }
2825 : }
2826 :
2827 : impl PageServiceCmd {
2828 21 : fn parse(query: &str) -> anyhow::Result<Self> {
2829 21 : let query = query.trim();
2830 21 : let Some((cmd, other)) = query.split_once(' ') else {
2831 2 : bail!("cannot parse query: {query}")
2832 : };
2833 19 : match cmd.to_ascii_lowercase().as_str() {
2834 19 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2835 3 : other,
2836 3 : PagestreamProtocolVersion::V2,
2837 3 : )?)),
2838 16 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2839 0 : other,
2840 0 : PagestreamProtocolVersion::V3,
2841 0 : )?)),
2842 16 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2843 7 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2844 5 : "lease" => {
2845 3 : let Some((cmd2, other)) = other.split_once(' ') else {
2846 0 : bail!("invalid lease command: {cmd}");
2847 : };
2848 3 : let cmd2 = cmd2.to_ascii_lowercase();
2849 3 : if cmd2 == "lsn" {
2850 2 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2851 : } else {
2852 1 : bail!("invalid lease command: {cmd}");
2853 : }
2854 : }
2855 2 : "set" => Ok(Self::Set),
2856 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2857 : }
2858 21 : }
2859 : }
2860 :
2861 : /// Parse the startup options from the postgres wire protocol startup packet.
2862 : ///
2863 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
2864 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
2865 : /// whether all options are successfully parsed. There could be duplicates in the options
2866 : /// if the caller passed such parameters.
2867 7 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
2868 7 : let mut parsing_config = false;
2869 7 : let mut has_error = false;
2870 7 : let mut config = Vec::new();
2871 16 : for item in options.split_whitespace() {
2872 16 : if item == "-c" {
2873 9 : if !parsing_config {
2874 8 : parsing_config = true;
2875 8 : } else {
2876 : // "-c" followed with another "-c"
2877 1 : tracing::warn!("failed to parse the startup options: {options}");
2878 1 : has_error = true;
2879 1 : break;
2880 : }
2881 7 : } else if item.starts_with("-c") || parsing_config {
2882 7 : let Some((mut key, value)) = item.split_once('=') else {
2883 : // "-c" followed with an invalid option
2884 1 : tracing::warn!("failed to parse the startup options: {options}");
2885 1 : has_error = true;
2886 1 : break;
2887 : };
2888 6 : if !parsing_config {
2889 : // Parse "-coptions=X"
2890 1 : let Some(stripped_key) = key.strip_prefix("-c") else {
2891 0 : tracing::warn!("failed to parse the startup options: {options}");
2892 0 : has_error = true;
2893 0 : break;
2894 : };
2895 1 : key = stripped_key;
2896 5 : }
2897 6 : config.push((key.to_string(), value.to_string()));
2898 6 : parsing_config = false;
2899 : } else {
2900 0 : tracing::warn!("failed to parse the startup options: {options}");
2901 0 : has_error = true;
2902 0 : break;
2903 : }
2904 : }
2905 7 : if parsing_config {
2906 : // "-c" without the option
2907 3 : tracing::warn!("failed to parse the startup options: {options}");
2908 3 : has_error = true;
2909 4 : }
2910 7 : (config, has_error)
2911 7 : }
2912 :
2913 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2914 : where
2915 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2916 : {
2917 0 : fn check_auth_jwt(
2918 0 : &mut self,
2919 0 : _pgb: &mut PostgresBackend<IO>,
2920 0 : jwt_response: &[u8],
2921 0 : ) -> Result<(), QueryError> {
2922 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2923 : // which requires auth to be present
2924 0 : let data: TokenData<Claims> = self
2925 0 : .auth
2926 0 : .as_ref()
2927 0 : .unwrap()
2928 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2929 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2930 :
2931 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2932 0 : return Err(QueryError::Unauthorized(
2933 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2934 0 : ));
2935 0 : }
2936 0 :
2937 0 : debug!(
2938 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2939 : data.claims.scope, data.claims.tenant_id,
2940 : );
2941 :
2942 0 : self.claims = Some(data.claims);
2943 0 : Ok(())
2944 0 : }
2945 :
2946 0 : fn startup(
2947 0 : &mut self,
2948 0 : _pgb: &mut PostgresBackend<IO>,
2949 0 : sm: &FeStartupPacket,
2950 0 : ) -> Result<(), QueryError> {
2951 0 : fail::fail_point!("ps::connection-start::startup-packet");
2952 :
2953 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
2954 0 : if let Some(app_name) = params.get("application_name") {
2955 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
2956 0 : Span::current().record("application_name", field::display(app_name));
2957 0 : }
2958 0 : if let Some(options) = params.get("options") {
2959 0 : let (config, _) = parse_options(options);
2960 0 : for (key, value) in config {
2961 0 : if key == "neon.compute_mode" {
2962 0 : self.perf_span_fields.compute_mode = Some(value.clone());
2963 0 : Span::current().record("compute_mode", field::display(value));
2964 0 : }
2965 : }
2966 0 : }
2967 0 : };
2968 :
2969 0 : Ok(())
2970 0 : }
2971 :
2972 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2973 : async fn process_query(
2974 : &mut self,
2975 : pgb: &mut PostgresBackend<IO>,
2976 : query_string: &str,
2977 : ) -> Result<(), QueryError> {
2978 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2979 0 : info!("Hit failpoint for bad connection");
2980 0 : Err(QueryError::SimulatedConnectionError)
2981 0 : });
2982 :
2983 : fail::fail_point!("ps::connection-start::process-query");
2984 :
2985 : let ctx = self.connection_ctx.attached_child();
2986 : debug!("process query {query_string}");
2987 : let query = PageServiceCmd::parse(query_string)?;
2988 : match query {
2989 : PageServiceCmd::PageStream(PageStreamCmd {
2990 : tenant_id,
2991 : timeline_id,
2992 : protocol_version,
2993 : }) => {
2994 : tracing::Span::current()
2995 : .record("tenant_id", field::display(tenant_id))
2996 : .record("timeline_id", field::display(timeline_id));
2997 :
2998 : self.check_permission(Some(tenant_id))?;
2999 : let command_kind = match protocol_version {
3000 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
3001 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
3002 : };
3003 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
3004 :
3005 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
3006 : .await?;
3007 : }
3008 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3009 : tenant_id,
3010 : timeline_id,
3011 : lsn,
3012 : gzip,
3013 : replica,
3014 : }) => {
3015 : tracing::Span::current()
3016 : .record("tenant_id", field::display(tenant_id))
3017 : .record("timeline_id", field::display(timeline_id));
3018 :
3019 : self.check_permission(Some(tenant_id))?;
3020 :
3021 : COMPUTE_COMMANDS_COUNTERS
3022 : .for_command(ComputeCommandKind::Basebackup)
3023 : .inc();
3024 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
3025 0 : let res = async {
3026 0 : self.handle_basebackup_request(
3027 0 : pgb,
3028 0 : tenant_id,
3029 0 : timeline_id,
3030 0 : lsn,
3031 0 : None,
3032 0 : false,
3033 0 : gzip,
3034 0 : replica,
3035 0 : &ctx,
3036 0 : )
3037 0 : .await?;
3038 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3039 0 : Result::<(), QueryError>::Ok(())
3040 0 : }
3041 : .await;
3042 : metric_recording.observe(&res);
3043 : res?;
3044 : }
3045 : // same as basebackup, but result includes relational data as well
3046 : PageServiceCmd::FullBackup(FullBackupCmd {
3047 : tenant_id,
3048 : timeline_id,
3049 : lsn,
3050 : prev_lsn,
3051 : }) => {
3052 : tracing::Span::current()
3053 : .record("tenant_id", field::display(tenant_id))
3054 : .record("timeline_id", field::display(timeline_id));
3055 :
3056 : self.check_permission(Some(tenant_id))?;
3057 :
3058 : COMPUTE_COMMANDS_COUNTERS
3059 : .for_command(ComputeCommandKind::Fullbackup)
3060 : .inc();
3061 :
3062 : // Check that the timeline exists
3063 : self.handle_basebackup_request(
3064 : pgb,
3065 : tenant_id,
3066 : timeline_id,
3067 : lsn,
3068 : prev_lsn,
3069 : true,
3070 : false,
3071 : false,
3072 : &ctx,
3073 : )
3074 : .await?;
3075 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3076 : }
3077 : PageServiceCmd::Set => {
3078 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
3079 : // on connect
3080 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
3081 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
3082 : }
3083 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3084 : tenant_shard_id,
3085 : timeline_id,
3086 : lsn,
3087 : }) => {
3088 : tracing::Span::current()
3089 : .record("tenant_id", field::display(tenant_shard_id))
3090 : .record("timeline_id", field::display(timeline_id));
3091 :
3092 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
3093 :
3094 : COMPUTE_COMMANDS_COUNTERS
3095 : .for_command(ComputeCommandKind::LeaseLsn)
3096 : .inc();
3097 :
3098 : match self
3099 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
3100 : .await
3101 : {
3102 : Ok(()) => {
3103 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
3104 : }
3105 : Err(e) => {
3106 : error!("error obtaining lsn lease for {lsn}: {e:?}");
3107 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
3108 : &e.to_string(),
3109 : Some(e.pg_error_code()),
3110 : ))?
3111 : }
3112 : };
3113 : }
3114 : }
3115 :
3116 : Ok(())
3117 : }
3118 : }
3119 :
3120 : impl From<GetActiveTenantError> for QueryError {
3121 0 : fn from(e: GetActiveTenantError) -> Self {
3122 0 : match e {
3123 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
3124 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
3125 0 : ),
3126 : GetActiveTenantError::Cancelled
3127 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
3128 0 : QueryError::Shutdown
3129 : }
3130 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
3131 0 : e => QueryError::Other(anyhow::anyhow!(e)),
3132 : }
3133 0 : }
3134 : }
3135 :
3136 : #[derive(Debug, thiserror::Error)]
3137 : pub(crate) enum GetActiveTimelineError {
3138 : #[error(transparent)]
3139 : Tenant(GetActiveTenantError),
3140 : #[error(transparent)]
3141 : Timeline(#[from] GetTimelineError),
3142 : }
3143 :
3144 : impl From<GetActiveTimelineError> for QueryError {
3145 0 : fn from(e: GetActiveTimelineError) -> Self {
3146 0 : match e {
3147 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
3148 0 : GetActiveTimelineError::Tenant(e) => e.into(),
3149 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
3150 : }
3151 0 : }
3152 : }
3153 :
3154 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
3155 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
3156 0 : match e {
3157 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
3158 0 : }
3159 0 : }
3160 : }
3161 :
3162 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
3163 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
3164 0 : tracing::Span::current().record(
3165 0 : "shard_id",
3166 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
3167 0 : );
3168 0 : debug_assert_current_span_has_tenant_and_timeline_id();
3169 0 : }
3170 :
3171 : struct WaitedForLsn(Lsn);
3172 : impl From<WaitedForLsn> for Lsn {
3173 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
3174 0 : lsn
3175 0 : }
3176 : }
3177 :
3178 : #[cfg(test)]
3179 : mod tests {
3180 : use utils::shard::ShardCount;
3181 :
3182 : use super::*;
3183 :
3184 : #[test]
3185 1 : fn pageservice_cmd_parse() {
3186 1 : let tenant_id = TenantId::generate();
3187 1 : let timeline_id = TimelineId::generate();
3188 1 : let cmd =
3189 1 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
3190 1 : assert_eq!(
3191 1 : cmd,
3192 1 : PageServiceCmd::PageStream(PageStreamCmd {
3193 1 : tenant_id,
3194 1 : timeline_id,
3195 1 : protocol_version: PagestreamProtocolVersion::V2,
3196 1 : })
3197 1 : );
3198 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
3199 1 : assert_eq!(
3200 1 : cmd,
3201 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3202 1 : tenant_id,
3203 1 : timeline_id,
3204 1 : lsn: None,
3205 1 : gzip: false,
3206 1 : replica: false
3207 1 : })
3208 1 : );
3209 1 : let cmd =
3210 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
3211 1 : assert_eq!(
3212 1 : cmd,
3213 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3214 1 : tenant_id,
3215 1 : timeline_id,
3216 1 : lsn: None,
3217 1 : gzip: true,
3218 1 : replica: false
3219 1 : })
3220 1 : );
3221 1 : let cmd =
3222 1 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
3223 1 : assert_eq!(
3224 1 : cmd,
3225 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3226 1 : tenant_id,
3227 1 : timeline_id,
3228 1 : lsn: None,
3229 1 : gzip: false,
3230 1 : replica: false
3231 1 : })
3232 1 : );
3233 1 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
3234 1 : .unwrap();
3235 1 : assert_eq!(
3236 1 : cmd,
3237 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3238 1 : tenant_id,
3239 1 : timeline_id,
3240 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3241 1 : gzip: false,
3242 1 : replica: false
3243 1 : })
3244 1 : );
3245 1 : let cmd = PageServiceCmd::parse(&format!(
3246 1 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
3247 1 : ))
3248 1 : .unwrap();
3249 1 : assert_eq!(
3250 1 : cmd,
3251 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3252 1 : tenant_id,
3253 1 : timeline_id,
3254 1 : lsn: None,
3255 1 : gzip: true,
3256 1 : replica: true
3257 1 : })
3258 1 : );
3259 1 : let cmd = PageServiceCmd::parse(&format!(
3260 1 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
3261 1 : ))
3262 1 : .unwrap();
3263 1 : assert_eq!(
3264 1 : cmd,
3265 1 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3266 1 : tenant_id,
3267 1 : timeline_id,
3268 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3269 1 : gzip: true,
3270 1 : replica: true
3271 1 : })
3272 1 : );
3273 1 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
3274 1 : assert_eq!(
3275 1 : cmd,
3276 1 : PageServiceCmd::FullBackup(FullBackupCmd {
3277 1 : tenant_id,
3278 1 : timeline_id,
3279 1 : lsn: None,
3280 1 : prev_lsn: None
3281 1 : })
3282 1 : );
3283 1 : let cmd = PageServiceCmd::parse(&format!(
3284 1 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
3285 1 : ))
3286 1 : .unwrap();
3287 1 : assert_eq!(
3288 1 : cmd,
3289 1 : PageServiceCmd::FullBackup(FullBackupCmd {
3290 1 : tenant_id,
3291 1 : timeline_id,
3292 1 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3293 1 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
3294 1 : })
3295 1 : );
3296 1 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
3297 1 : let cmd = PageServiceCmd::parse(&format!(
3298 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3299 1 : ))
3300 1 : .unwrap();
3301 1 : assert_eq!(
3302 1 : cmd,
3303 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3304 1 : tenant_shard_id,
3305 1 : timeline_id,
3306 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3307 1 : })
3308 1 : );
3309 1 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
3310 1 : let cmd = PageServiceCmd::parse(&format!(
3311 1 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3312 1 : ))
3313 1 : .unwrap();
3314 1 : assert_eq!(
3315 1 : cmd,
3316 1 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3317 1 : tenant_shard_id,
3318 1 : timeline_id,
3319 1 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3320 1 : })
3321 1 : );
3322 1 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
3323 1 : assert_eq!(cmd, PageServiceCmd::Set);
3324 1 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
3325 1 : assert_eq!(cmd, PageServiceCmd::Set);
3326 1 : }
3327 :
3328 : #[test]
3329 1 : fn pageservice_cmd_err_handling() {
3330 1 : let tenant_id = TenantId::generate();
3331 1 : let timeline_id = TimelineId::generate();
3332 1 : let cmd = PageServiceCmd::parse("unknown_command");
3333 1 : assert!(cmd.is_err());
3334 1 : let cmd = PageServiceCmd::parse("pagestream_v2");
3335 1 : assert!(cmd.is_err());
3336 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
3337 1 : assert!(cmd.is_err());
3338 1 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
3339 1 : assert!(cmd.is_err());
3340 1 : let cmd = PageServiceCmd::parse(&format!(
3341 1 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
3342 1 : ));
3343 1 : assert!(cmd.is_err());
3344 1 : let cmd = PageServiceCmd::parse(&format!(
3345 1 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
3346 1 : ));
3347 1 : assert!(cmd.is_err());
3348 1 : let cmd = PageServiceCmd::parse(&format!(
3349 1 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
3350 1 : ));
3351 1 : assert!(cmd.is_err());
3352 1 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
3353 1 : assert!(cmd.is_err());
3354 1 : }
3355 :
3356 : #[test]
3357 1 : fn test_parse_options() {
3358 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
3359 1 : assert!(!has_error);
3360 1 : assert_eq!(
3361 1 : config,
3362 1 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
3363 1 : );
3364 :
3365 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
3366 1 : assert!(!has_error);
3367 1 : assert_eq!(
3368 1 : config,
3369 1 : vec![
3370 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
3371 1 : ("foo".to_string(), "bar".to_string()),
3372 1 : ]
3373 1 : );
3374 :
3375 1 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
3376 1 : assert!(!has_error);
3377 1 : assert_eq!(
3378 1 : config,
3379 1 : vec![
3380 1 : ("neon.compute_mode".to_string(), "primary".to_string()),
3381 1 : ("foo".to_string(), "bar".to_string()),
3382 1 : ]
3383 1 : );
3384 :
3385 1 : let (_, has_error) = parse_options("-c");
3386 1 : assert!(has_error);
3387 :
3388 1 : let (_, has_error) = parse_options("-c foo=bar -c -c");
3389 1 : assert!(has_error);
3390 :
3391 1 : let (_, has_error) = parse_options(" ");
3392 1 : assert!(!has_error);
3393 :
3394 1 : let (_, has_error) = parse_options(" -c neon.compute_mode");
3395 1 : assert!(has_error);
3396 1 : }
3397 : }
|