Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use std::borrow::Cow;
5 : use std::num::NonZeroUsize;
6 : use std::os::fd::AsRawFd;
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use std::time::{Duration, Instant, SystemTime};
10 : use std::{io, str};
11 :
12 : use crate::PERF_TRACE_TARGET;
13 : use anyhow::{Context, bail};
14 : use async_compression::tokio::write::GzipEncoder;
15 : use bytes::Buf;
16 : use futures::FutureExt;
17 : use itertools::Itertools;
18 : use once_cell::sync::OnceCell;
19 : use pageserver_api::config::{
20 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
21 : PageServiceProtocolPipelinedExecutionStrategy,
22 : };
23 : use pageserver_api::key::rel_block_to_key;
24 : use pageserver_api::models::{
25 : self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
26 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
27 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
28 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
29 : PagestreamProtocolVersion, PagestreamRequest, TenantState,
30 : };
31 : use pageserver_api::reltag::SlruKind;
32 : use pageserver_api::shard::TenantShardId;
33 : use postgres_backend::{
34 : AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
35 : };
36 : use postgres_ffi::BLCKSZ;
37 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
38 : use pq_proto::framed::ConnectionError;
39 : use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
40 : use strum_macros::IntoStaticStr;
41 : use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
42 : use tokio::task::JoinHandle;
43 : use tokio_util::sync::CancellationToken;
44 : use tracing::*;
45 : use utils::auth::{Claims, Scope, SwappableJwtAuth};
46 : use utils::failpoint_support;
47 : use utils::id::{TenantId, TimelineId};
48 : use utils::logging::log_slow;
49 : use utils::lsn::Lsn;
50 : use utils::simple_rcu::RcuReadGuard;
51 : use utils::sync::gate::{Gate, GateGuard};
52 : use utils::sync::spsc_fold;
53 :
54 : use crate::auth::check_permission;
55 : use crate::basebackup::BasebackupError;
56 : use crate::config::PageServerConf;
57 : use crate::context::{
58 : DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
59 : };
60 : use crate::metrics::{
61 : self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer,
62 : TimelineMetrics,
63 : };
64 : use crate::pgdatadir_mapping::Version;
65 : use crate::span::{
66 : debug_assert_current_span_has_tenant_and_timeline_id,
67 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
68 : };
69 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
70 : use crate::tenant::mgr::{
71 : GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
72 : };
73 : use crate::tenant::storage_layer::IoConcurrency;
74 : use crate::tenant::timeline::{self, WaitLsnError};
75 : use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
76 : use crate::{basebackup, timed_after_cancellation};
77 :
78 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
79 : /// is not yet in state [`TenantState::Active`].
80 : ///
81 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
82 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
83 :
84 : /// Threshold at which to log slow GetPage requests.
85 : const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
86 :
87 : ///////////////////////////////////////////////////////////////////////////////
88 :
89 : pub struct Listener {
90 : cancel: CancellationToken,
91 : /// Cancel the listener task through `listen_cancel` to shut down the listener
92 : /// and get a handle on the existing connections.
93 : task: JoinHandle<Connections>,
94 : }
95 :
96 : pub struct Connections {
97 : cancel: CancellationToken,
98 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
99 : gate: Gate,
100 : }
101 :
102 0 : pub fn spawn(
103 0 : conf: &'static PageServerConf,
104 0 : tenant_manager: Arc<TenantManager>,
105 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
106 0 : perf_trace_dispatch: Option<Dispatch>,
107 0 : tcp_listener: tokio::net::TcpListener,
108 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
109 0 : ) -> Listener {
110 0 : let cancel = CancellationToken::new();
111 0 : let libpq_ctx = RequestContext::todo_child(
112 0 : TaskKind::LibpqEndpointListener,
113 0 : // listener task shouldn't need to download anything. (We will
114 0 : // create a separate sub-contexts for each connection, with their
115 0 : // own download behavior. This context is used only to listen and
116 0 : // accept connections.)
117 0 : DownloadBehavior::Error,
118 0 : );
119 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
120 0 : "libpq listener",
121 0 : libpq_listener_main(
122 0 : conf,
123 0 : tenant_manager,
124 0 : pg_auth,
125 0 : perf_trace_dispatch,
126 0 : tcp_listener,
127 0 : conf.pg_auth_type,
128 0 : tls_config,
129 0 : conf.page_service_pipelining.clone(),
130 0 : libpq_ctx,
131 0 : cancel.clone(),
132 0 : )
133 0 : .map(anyhow::Ok),
134 0 : ));
135 0 :
136 0 : Listener { cancel, task }
137 0 : }
138 :
139 : impl Listener {
140 0 : pub async fn stop_accepting(self) -> Connections {
141 0 : self.cancel.cancel();
142 0 : self.task
143 0 : .await
144 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
145 0 : }
146 : }
147 : impl Connections {
148 0 : pub(crate) async fn shutdown(self) {
149 0 : let Self {
150 0 : cancel,
151 0 : mut tasks,
152 0 : gate,
153 0 : } = self;
154 0 : cancel.cancel();
155 0 : while let Some(res) = tasks.join_next().await {
156 0 : Self::handle_connection_completion(res);
157 0 : }
158 0 : gate.close().await;
159 0 : }
160 :
161 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
162 0 : match res {
163 0 : Ok(Ok(())) => {}
164 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
165 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
166 : }
167 0 : }
168 : }
169 :
170 : ///
171 : /// Main loop of the page service.
172 : ///
173 : /// Listens for connections, and launches a new handler task for each.
174 : ///
175 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
176 : /// open connections.
177 : ///
178 : #[allow(clippy::too_many_arguments)]
179 0 : pub async fn libpq_listener_main(
180 0 : conf: &'static PageServerConf,
181 0 : tenant_manager: Arc<TenantManager>,
182 0 : auth: Option<Arc<SwappableJwtAuth>>,
183 0 : perf_trace_dispatch: Option<Dispatch>,
184 0 : listener: tokio::net::TcpListener,
185 0 : auth_type: AuthType,
186 0 : tls_config: Option<Arc<rustls::ServerConfig>>,
187 0 : pipelining_config: PageServicePipeliningConfig,
188 0 : listener_ctx: RequestContext,
189 0 : listener_cancel: CancellationToken,
190 0 : ) -> Connections {
191 0 : let connections_cancel = CancellationToken::new();
192 0 : let connections_gate = Gate::default();
193 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
194 :
195 : loop {
196 0 : let gate_guard = match connections_gate.enter() {
197 0 : Ok(guard) => guard,
198 0 : Err(_) => break,
199 : };
200 :
201 0 : let accepted = tokio::select! {
202 : biased;
203 0 : _ = listener_cancel.cancelled() => break,
204 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
205 0 : let res = next.expect("we dont poll while empty");
206 0 : Connections::handle_connection_completion(res);
207 0 : continue;
208 : }
209 0 : accepted = listener.accept() => accepted,
210 0 : };
211 0 :
212 0 : match accepted {
213 0 : Ok((socket, peer_addr)) => {
214 0 : // Connection established. Spawn a new task to handle it.
215 0 : debug!("accepted connection from {}", peer_addr);
216 0 : let local_auth = auth.clone();
217 0 : let connection_ctx = RequestContextBuilder::from(&listener_ctx)
218 0 : .task_kind(TaskKind::PageRequestHandler)
219 0 : .download_behavior(DownloadBehavior::Download)
220 0 : .perf_span_dispatch(perf_trace_dispatch.clone())
221 0 : .detached_child();
222 0 :
223 0 : connection_handler_tasks.spawn(page_service_conn_main(
224 0 : conf,
225 0 : tenant_manager.clone(),
226 0 : local_auth,
227 0 : socket,
228 0 : auth_type,
229 0 : tls_config.clone(),
230 0 : pipelining_config.clone(),
231 0 : connection_ctx,
232 0 : connections_cancel.child_token(),
233 0 : gate_guard,
234 0 : ));
235 : }
236 0 : Err(err) => {
237 0 : // accept() failed. Log the error, and loop back to retry on next connection.
238 0 : error!("accept() failed: {:?}", err);
239 : }
240 : }
241 : }
242 :
243 0 : debug!("page_service listener loop terminated");
244 :
245 0 : Connections {
246 0 : cancel: connections_cancel,
247 0 : tasks: connection_handler_tasks,
248 0 : gate: connections_gate,
249 0 : }
250 0 : }
251 :
252 : type ConnectionHandlerResult = anyhow::Result<()>;
253 :
254 : /// Perf root spans start at the per-request level, after shard routing.
255 : /// This struct carries connection-level information to the root perf span definition.
256 : #[derive(Clone)]
257 : struct ConnectionPerfSpanFields {
258 : peer_addr: String,
259 : application_name: Option<String>,
260 : compute_mode: Option<String>,
261 : }
262 :
263 : #[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
264 : #[allow(clippy::too_many_arguments)]
265 : async fn page_service_conn_main(
266 : conf: &'static PageServerConf,
267 : tenant_manager: Arc<TenantManager>,
268 : auth: Option<Arc<SwappableJwtAuth>>,
269 : socket: tokio::net::TcpStream,
270 : auth_type: AuthType,
271 : tls_config: Option<Arc<rustls::ServerConfig>>,
272 : pipelining_config: PageServicePipeliningConfig,
273 : connection_ctx: RequestContext,
274 : cancel: CancellationToken,
275 : gate_guard: GateGuard,
276 : ) -> ConnectionHandlerResult {
277 : let _guard = LIVE_CONNECTIONS
278 : .with_label_values(&["page_service"])
279 : .guard();
280 :
281 : socket
282 : .set_nodelay(true)
283 : .context("could not set TCP_NODELAY")?;
284 :
285 : let socket_fd = socket.as_raw_fd();
286 :
287 : let peer_addr = socket.peer_addr().context("get peer address")?;
288 :
289 : let perf_span_fields = ConnectionPerfSpanFields {
290 : peer_addr: peer_addr.to_string(),
291 : application_name: None, // filled in later
292 : compute_mode: None, // filled in later
293 : };
294 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
295 :
296 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
297 : // - long enough for most valid compute connections
298 : // - less than infinite to stop us from "leaking" connections to long-gone computes
299 : //
300 : // no write timeout is used, because the kernel is assumed to error writes after some time.
301 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
302 :
303 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
304 0 : let socket_timeout_ms = (|| {
305 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
306 : // Exponential distribution for simulating
307 : // poor network conditions, expect about avg_timeout_ms to be around 15
308 : // in tests
309 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
310 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
311 0 : let u = rand::random::<f32>();
312 0 : ((1.0 - u).ln() / (-avg)) as u64
313 : } else {
314 0 : default_timeout_ms
315 : }
316 0 : });
317 0 : default_timeout_ms
318 : })();
319 :
320 : // A timeout here does not mean the client died, it can happen if it's just idle for
321 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
322 : // they reconnect.
323 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
324 : let socket = Box::pin(socket);
325 :
326 : fail::fail_point!("ps::connection-start::pre-login");
327 :
328 : // XXX: pgbackend.run() should take the connection_ctx,
329 : // and create a child per-query context when it invokes process_query.
330 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
331 : // and create the per-query context in process_query ourselves.
332 : let mut conn_handler = PageServerHandler::new(
333 : conf,
334 : tenant_manager,
335 : auth,
336 : pipelining_config,
337 : perf_span_fields,
338 : connection_ctx,
339 : cancel.clone(),
340 : gate_guard,
341 : );
342 : let pgbackend =
343 : PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
344 :
345 : match pgbackend.run(&mut conn_handler, &cancel).await {
346 : Ok(()) => {
347 : // we've been requested to shut down
348 : Ok(())
349 : }
350 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
351 : if is_expected_io_error(&io_error) {
352 : info!("Postgres client disconnected ({io_error})");
353 : Ok(())
354 : } else {
355 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
356 : Err(io_error).context(format!(
357 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
358 : tenant_id, peer_addr
359 : ))
360 : }
361 : }
362 : other => {
363 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
364 : other.context(format!(
365 : "Postgres query error for tenant_id={:?} client peer_addr={}",
366 : tenant_id, peer_addr
367 : ))
368 : }
369 : }
370 : }
371 :
372 : struct PageServerHandler {
373 : conf: &'static PageServerConf,
374 : auth: Option<Arc<SwappableJwtAuth>>,
375 : claims: Option<Claims>,
376 :
377 : /// The context created for the lifetime of the connection
378 : /// services by this PageServerHandler.
379 : /// For each query received over the connection,
380 : /// `process_query` creates a child context from this one.
381 : connection_ctx: RequestContext,
382 :
383 : perf_span_fields: ConnectionPerfSpanFields,
384 :
385 : cancel: CancellationToken,
386 :
387 : /// None only while pagestream protocol is being processed.
388 : timeline_handles: Option<TimelineHandles>,
389 :
390 : pipelining_config: PageServicePipeliningConfig,
391 :
392 : gate_guard: GateGuard,
393 : }
394 :
395 : struct TimelineHandles {
396 : wrapper: TenantManagerWrapper,
397 : /// Note on size: the typical size of this map is 1. The largest size we expect
398 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
399 : /// or the ratio used when splitting shards (i.e. how many children created from one)
400 : /// parent shard, where a "large" number might be ~8.
401 : handles: timeline::handle::Cache<TenantManagerTypes>,
402 : }
403 :
404 : impl TimelineHandles {
405 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
406 0 : Self {
407 0 : wrapper: TenantManagerWrapper {
408 0 : tenant_manager,
409 0 : tenant_id: OnceCell::new(),
410 0 : },
411 0 : handles: Default::default(),
412 0 : }
413 0 : }
414 0 : async fn get(
415 0 : &mut self,
416 0 : tenant_id: TenantId,
417 0 : timeline_id: TimelineId,
418 0 : shard_selector: ShardSelector,
419 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
420 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
421 0 : return Err(GetActiveTimelineError::Tenant(
422 0 : GetActiveTenantError::SwitchedTenant,
423 0 : ));
424 0 : }
425 0 : self.handles
426 0 : .get(timeline_id, shard_selector, &self.wrapper)
427 0 : .await
428 0 : .map_err(|e| match e {
429 0 : timeline::handle::GetError::TenantManager(e) => e,
430 : timeline::handle::GetError::PerTimelineStateShutDown => {
431 0 : trace!("per-timeline state shut down");
432 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
433 : }
434 0 : })
435 0 : }
436 :
437 0 : fn tenant_id(&self) -> Option<TenantId> {
438 0 : self.wrapper.tenant_id.get().copied()
439 0 : }
440 : }
441 :
442 : pub(crate) struct TenantManagerWrapper {
443 : tenant_manager: Arc<TenantManager>,
444 : // We do not support switching tenant_id on a connection at this point.
445 : // We can can add support for this later if needed without changing
446 : // the protocol.
447 : tenant_id: once_cell::sync::OnceCell<TenantId>,
448 : }
449 :
450 : #[derive(Debug)]
451 : pub(crate) struct TenantManagerTypes;
452 :
453 : impl timeline::handle::Types for TenantManagerTypes {
454 : type TenantManagerError = GetActiveTimelineError;
455 : type TenantManager = TenantManagerWrapper;
456 : type Timeline = TenantManagerCacheItem;
457 : }
458 :
459 : pub(crate) struct TenantManagerCacheItem {
460 : pub(crate) timeline: Arc<Timeline>,
461 : // allow() for cheap propagation through RequestContext inside a task
462 : #[allow(clippy::redundant_allocation)]
463 : pub(crate) metrics: Arc<Arc<TimelineMetrics>>,
464 : #[allow(dead_code)] // we store it to keep the gate open
465 : pub(crate) gate_guard: GateGuard,
466 : }
467 :
468 : impl std::ops::Deref for TenantManagerCacheItem {
469 : type Target = Arc<Timeline>;
470 0 : fn deref(&self) -> &Self::Target {
471 0 : &self.timeline
472 0 : }
473 : }
474 :
475 : impl timeline::handle::Timeline<TenantManagerTypes> for TenantManagerCacheItem {
476 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
477 0 : Timeline::shard_timeline_id(&self.timeline)
478 0 : }
479 :
480 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
481 0 : &self.timeline.handles
482 0 : }
483 :
484 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
485 0 : Timeline::get_shard_identity(&self.timeline)
486 0 : }
487 : }
488 :
489 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
490 0 : async fn resolve(
491 0 : &self,
492 0 : timeline_id: TimelineId,
493 0 : shard_selector: ShardSelector,
494 0 : ) -> Result<TenantManagerCacheItem, GetActiveTimelineError> {
495 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
496 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
497 0 : let wait_start = Instant::now();
498 0 : let deadline = wait_start + timeout;
499 0 : let tenant_shard = loop {
500 0 : let resolved = self
501 0 : .tenant_manager
502 0 : .resolve_attached_shard(tenant_id, shard_selector);
503 0 : match resolved {
504 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
505 : ShardResolveResult::NotFound => {
506 0 : return Err(GetActiveTimelineError::Tenant(
507 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
508 0 : ));
509 : }
510 0 : ShardResolveResult::InProgress(barrier) => {
511 0 : // We can't authoritatively answer right now: wait for InProgress state
512 0 : // to end, then try again
513 0 : tokio::select! {
514 0 : _ = barrier.wait() => {
515 0 : // The barrier completed: proceed around the loop to try looking up again
516 0 : },
517 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
518 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
519 0 : latest_state: None,
520 0 : wait_time: timeout,
521 0 : }));
522 : }
523 : }
524 : }
525 : };
526 : };
527 :
528 0 : tracing::debug!("Waiting for tenant to enter active state...");
529 0 : tenant_shard
530 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
531 0 : .await
532 0 : .map_err(GetActiveTimelineError::Tenant)?;
533 :
534 0 : let timeline = tenant_shard
535 0 : .get_timeline(timeline_id, true)
536 0 : .map_err(GetActiveTimelineError::Timeline)?;
537 :
538 0 : let gate_guard = match timeline.gate.enter() {
539 0 : Ok(guard) => guard,
540 : Err(_) => {
541 0 : return Err(GetActiveTimelineError::Timeline(
542 0 : GetTimelineError::ShuttingDown,
543 0 : ));
544 : }
545 : };
546 :
547 0 : let metrics = Arc::new(Arc::clone(&timeline.metrics));
548 0 :
549 0 : Ok(TenantManagerCacheItem {
550 0 : timeline,
551 0 : metrics,
552 0 : gate_guard,
553 0 : })
554 0 : }
555 : }
556 :
557 : #[derive(thiserror::Error, Debug)]
558 : enum PageStreamError {
559 : /// We encountered an error that should prompt the client to reconnect:
560 : /// in practice this means we drop the connection without sending a response.
561 : #[error("Reconnect required: {0}")]
562 : Reconnect(Cow<'static, str>),
563 :
564 : /// We were instructed to shutdown while processing the query
565 : #[error("Shutting down")]
566 : Shutdown,
567 :
568 : /// Something went wrong reading a page: this likely indicates a pageserver bug
569 : #[error("Read error")]
570 : Read(#[source] PageReconstructError),
571 :
572 : /// Ran out of time waiting for an LSN
573 : #[error("LSN timeout: {0}")]
574 : LsnTimeout(WaitLsnError),
575 :
576 : /// The entity required to serve the request (tenant or timeline) is not found,
577 : /// or is not found in a suitable state to serve a request.
578 : #[error("Not found: {0}")]
579 : NotFound(Cow<'static, str>),
580 :
581 : /// Request asked for something that doesn't make sense, like an invalid LSN
582 : #[error("Bad request: {0}")]
583 : BadRequest(Cow<'static, str>),
584 : }
585 :
586 : impl From<PageReconstructError> for PageStreamError {
587 0 : fn from(value: PageReconstructError) -> Self {
588 0 : match value {
589 0 : PageReconstructError::Cancelled => Self::Shutdown,
590 0 : e => Self::Read(e),
591 : }
592 0 : }
593 : }
594 :
595 : impl From<GetActiveTimelineError> for PageStreamError {
596 0 : fn from(value: GetActiveTimelineError) -> Self {
597 0 : match value {
598 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
599 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
600 : TenantState::Stopping { .. },
601 : ))
602 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
603 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
604 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
605 : }
606 0 : }
607 : }
608 :
609 : impl From<WaitLsnError> for PageStreamError {
610 0 : fn from(value: WaitLsnError) -> Self {
611 0 : match value {
612 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
613 0 : WaitLsnError::Shutdown => Self::Shutdown,
614 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
615 : }
616 0 : }
617 : }
618 :
619 : impl From<WaitLsnError> for QueryError {
620 0 : fn from(value: WaitLsnError) -> Self {
621 0 : match value {
622 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
623 0 : WaitLsnError::Shutdown => Self::Shutdown,
624 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
625 : }
626 0 : }
627 : }
628 :
629 : #[derive(thiserror::Error, Debug)]
630 : struct BatchedPageStreamError {
631 : req: PagestreamRequest,
632 : err: PageStreamError,
633 : }
634 :
635 : impl std::fmt::Display for BatchedPageStreamError {
636 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637 0 : self.err.fmt(f)
638 0 : }
639 : }
640 :
641 : struct BatchedGetPageRequest {
642 : req: PagestreamGetPageRequest,
643 : timer: SmgrOpTimer,
644 : ctx: RequestContext,
645 : }
646 :
647 : #[cfg(feature = "testing")]
648 : struct BatchedTestRequest {
649 : req: models::PagestreamTestRequest,
650 : timer: SmgrOpTimer,
651 : }
652 :
653 : /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
654 : /// so that we don't keep the [`Timeline::gate`] open while the batch
655 : /// is being built up inside the [`spsc_fold`] (pagestream pipelining).
656 : #[derive(IntoStaticStr)]
657 : enum BatchedFeMessage {
658 : Exists {
659 : span: Span,
660 : timer: SmgrOpTimer,
661 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
662 : req: models::PagestreamExistsRequest,
663 : },
664 : Nblocks {
665 : span: Span,
666 : timer: SmgrOpTimer,
667 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
668 : req: models::PagestreamNblocksRequest,
669 : },
670 : GetPage {
671 : span: Span,
672 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
673 : effective_request_lsn: Lsn,
674 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
675 : },
676 : DbSize {
677 : span: Span,
678 : timer: SmgrOpTimer,
679 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
680 : req: models::PagestreamDbSizeRequest,
681 : },
682 : GetSlruSegment {
683 : span: Span,
684 : timer: SmgrOpTimer,
685 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
686 : req: models::PagestreamGetSlruSegmentRequest,
687 : },
688 : #[cfg(feature = "testing")]
689 : Test {
690 : span: Span,
691 : shard: timeline::handle::WeakHandle<TenantManagerTypes>,
692 : requests: Vec<BatchedTestRequest>,
693 : },
694 : RespondError {
695 : span: Span,
696 : error: BatchedPageStreamError,
697 : },
698 : }
699 :
700 : impl BatchedFeMessage {
701 0 : fn as_static_str(&self) -> &'static str {
702 0 : self.into()
703 0 : }
704 :
705 0 : fn observe_execution_start(&mut self, at: Instant) {
706 0 : match self {
707 0 : BatchedFeMessage::Exists { timer, .. }
708 0 : | BatchedFeMessage::Nblocks { timer, .. }
709 0 : | BatchedFeMessage::DbSize { timer, .. }
710 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
711 0 : timer.observe_execution_start(at);
712 0 : }
713 0 : BatchedFeMessage::GetPage { pages, .. } => {
714 0 : for page in pages {
715 0 : page.timer.observe_execution_start(at);
716 0 : }
717 : }
718 : #[cfg(feature = "testing")]
719 0 : BatchedFeMessage::Test { requests, .. } => {
720 0 : for req in requests {
721 0 : req.timer.observe_execution_start(at);
722 0 : }
723 : }
724 0 : BatchedFeMessage::RespondError { .. } => {}
725 : }
726 0 : }
727 : }
728 :
729 : impl PageServerHandler {
730 : #[allow(clippy::too_many_arguments)]
731 0 : pub fn new(
732 0 : conf: &'static PageServerConf,
733 0 : tenant_manager: Arc<TenantManager>,
734 0 : auth: Option<Arc<SwappableJwtAuth>>,
735 0 : pipelining_config: PageServicePipeliningConfig,
736 0 : perf_span_fields: ConnectionPerfSpanFields,
737 0 : connection_ctx: RequestContext,
738 0 : cancel: CancellationToken,
739 0 : gate_guard: GateGuard,
740 0 : ) -> Self {
741 0 : PageServerHandler {
742 0 : conf,
743 0 : auth,
744 0 : claims: None,
745 0 : connection_ctx,
746 0 : perf_span_fields,
747 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
748 0 : cancel,
749 0 : pipelining_config,
750 0 : gate_guard,
751 0 : }
752 0 : }
753 :
754 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
755 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
756 : /// cancellation if there aren't any timelines in the cache.
757 : ///
758 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
759 : /// timeline cancellation token.
760 0 : async fn flush_cancellable<IO>(
761 0 : &self,
762 0 : pgb: &mut PostgresBackend<IO>,
763 0 : cancel: &CancellationToken,
764 0 : ) -> Result<(), QueryError>
765 0 : where
766 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
767 0 : {
768 0 : tokio::select!(
769 0 : flush_r = pgb.flush() => {
770 0 : Ok(flush_r?)
771 : },
772 0 : _ = cancel.cancelled() => {
773 0 : Err(QueryError::Shutdown)
774 : }
775 : )
776 0 : }
777 :
778 : #[allow(clippy::too_many_arguments)]
779 0 : async fn pagestream_read_message<IO>(
780 0 : pgb: &mut PostgresBackendReader<IO>,
781 0 : tenant_id: TenantId,
782 0 : timeline_id: TimelineId,
783 0 : timeline_handles: &mut TimelineHandles,
784 0 : conn_perf_span_fields: &ConnectionPerfSpanFields,
785 0 : cancel: &CancellationToken,
786 0 : ctx: &RequestContext,
787 0 : protocol_version: PagestreamProtocolVersion,
788 0 : parent_span: Span,
789 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
790 0 : where
791 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
792 0 : {
793 0 : let msg = tokio::select! {
794 : biased;
795 0 : _ = cancel.cancelled() => {
796 0 : return Err(QueryError::Shutdown)
797 : }
798 0 : msg = pgb.read_message() => { msg }
799 0 : };
800 0 :
801 0 : let received_at = Instant::now();
802 :
803 0 : let copy_data_bytes = match msg? {
804 0 : Some(FeMessage::CopyData(bytes)) => bytes,
805 : Some(FeMessage::Terminate) => {
806 0 : return Ok(None);
807 : }
808 0 : Some(m) => {
809 0 : return Err(QueryError::Other(anyhow::anyhow!(
810 0 : "unexpected message: {m:?} during COPY"
811 0 : )));
812 : }
813 : None => {
814 0 : return Ok(None);
815 : } // client disconnected
816 : };
817 0 : trace!("query: {copy_data_bytes:?}");
818 :
819 0 : fail::fail_point!("ps::handle-pagerequest-message");
820 :
821 : // parse request
822 0 : let neon_fe_msg =
823 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
824 :
825 : // TODO: turn in to async closure once available to avoid repeating received_at
826 0 : async fn record_op_start_and_throttle(
827 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
828 0 : op: metrics::SmgrQueryType,
829 0 : received_at: Instant,
830 0 : ) -> Result<SmgrOpTimer, QueryError> {
831 0 : // It's important to start the smgr op metric recorder as early as possible
832 0 : // so that the _started counters are incremented before we do
833 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
834 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
835 0 : let now = Instant::now();
836 0 : timer.observe_throttle_start(now);
837 0 : let throttled = tokio::select! {
838 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
839 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
840 : };
841 0 : timer.observe_throttle_done(throttled);
842 0 : Ok(timer)
843 0 : }
844 :
845 0 : let batched_msg = match neon_fe_msg {
846 0 : PagestreamFeMessage::Exists(req) => {
847 0 : let shard = timeline_handles
848 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
849 0 : .await?;
850 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
851 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
852 0 : let timer = record_op_start_and_throttle(
853 0 : &shard,
854 0 : metrics::SmgrQueryType::GetRelExists,
855 0 : received_at,
856 0 : )
857 0 : .await?;
858 0 : BatchedFeMessage::Exists {
859 0 : span,
860 0 : timer,
861 0 : shard: shard.downgrade(),
862 0 : req,
863 0 : }
864 : }
865 0 : PagestreamFeMessage::Nblocks(req) => {
866 0 : let shard = timeline_handles
867 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
868 0 : .await?;
869 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
870 0 : let timer = record_op_start_and_throttle(
871 0 : &shard,
872 0 : metrics::SmgrQueryType::GetRelSize,
873 0 : received_at,
874 0 : )
875 0 : .await?;
876 0 : BatchedFeMessage::Nblocks {
877 0 : span,
878 0 : timer,
879 0 : shard: shard.downgrade(),
880 0 : req,
881 0 : }
882 : }
883 0 : PagestreamFeMessage::DbSize(req) => {
884 0 : let shard = timeline_handles
885 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
886 0 : .await?;
887 0 : let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
888 0 : let timer = record_op_start_and_throttle(
889 0 : &shard,
890 0 : metrics::SmgrQueryType::GetDbSize,
891 0 : received_at,
892 0 : )
893 0 : .await?;
894 0 : BatchedFeMessage::DbSize {
895 0 : span,
896 0 : timer,
897 0 : shard: shard.downgrade(),
898 0 : req,
899 0 : }
900 : }
901 0 : PagestreamFeMessage::GetSlruSegment(req) => {
902 0 : let shard = timeline_handles
903 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
904 0 : .await?;
905 0 : let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
906 0 : let timer = record_op_start_and_throttle(
907 0 : &shard,
908 0 : metrics::SmgrQueryType::GetSlruSegment,
909 0 : received_at,
910 0 : )
911 0 : .await?;
912 0 : BatchedFeMessage::GetSlruSegment {
913 0 : span,
914 0 : timer,
915 0 : shard: shard.downgrade(),
916 0 : req,
917 0 : }
918 : }
919 0 : PagestreamFeMessage::GetPage(req) => {
920 : // avoid a somewhat costly Span::record() by constructing the entire span in one go.
921 : macro_rules! mkspan {
922 : (before shard routing) => {{
923 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
924 : }};
925 : ($shard_id:expr) => {{
926 : tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
927 : }};
928 : }
929 :
930 : macro_rules! respond_error {
931 : ($span:expr, $error:expr) => {{
932 : let error = BatchedFeMessage::RespondError {
933 : span: $span,
934 : error: BatchedPageStreamError {
935 : req: req.hdr,
936 : err: $error,
937 : },
938 : };
939 : Ok(Some(error))
940 : }};
941 : }
942 :
943 0 : let key = rel_block_to_key(req.rel, req.blkno);
944 :
945 0 : let res = timeline_handles
946 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
947 0 : .await;
948 :
949 0 : let shard = match res {
950 0 : Ok(tl) => tl,
951 0 : Err(e) => {
952 0 : let span = mkspan!(before shard routing);
953 0 : match e {
954 : GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_)) => {
955 : // We already know this tenant exists in general, because we resolved it at
956 : // start of connection. Getting a NotFound here indicates that the shard containing
957 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
958 : // mapping is out of date.
959 : //
960 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
961 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
962 : // and talk to a different pageserver.
963 0 : return respond_error!(
964 0 : span,
965 0 : PageStreamError::Reconnect(
966 0 : "getpage@lsn request routed to wrong shard".into()
967 0 : )
968 0 : );
969 : }
970 0 : e => {
971 0 : return respond_error!(span, e.into());
972 : }
973 : }
974 : }
975 : };
976 :
977 0 : let ctx = if shard.is_get_page_request_sampled() {
978 0 : RequestContextBuilder::from(ctx)
979 0 : .root_perf_span(|| {
980 0 : info_span!(
981 : target: PERF_TRACE_TARGET,
982 : "GET_PAGE",
983 : peer_addr = conn_perf_span_fields.peer_addr,
984 : application_name = conn_perf_span_fields.application_name,
985 : compute_mode = conn_perf_span_fields.compute_mode,
986 : tenant_id = %tenant_id,
987 0 : shard_id = %shard.get_shard_identity().shard_slug(),
988 : timeline_id = %timeline_id,
989 : lsn = %req.hdr.request_lsn,
990 : request_id = %req.hdr.reqid,
991 : key = %key,
992 : )
993 0 : })
994 0 : .attached_child()
995 : } else {
996 0 : ctx.attached_child()
997 : };
998 :
999 : // This ctx travels as part of the BatchedFeMessage through
1000 : // batching into the request handler.
1001 : // The request handler needs to do some per-request work
1002 : // (relsize check) before dispatching the batch as a single
1003 : // get_vectored call to the Timeline.
1004 : // This ctx will be used for the reslize check, whereas the
1005 : // get_vectored call will be a different ctx with separate
1006 : // perf span.
1007 0 : let ctx = ctx.with_scope_page_service_pagestream(&shard);
1008 :
1009 : // Similar game for this `span`: we funnel it through so that
1010 : // request handler log messages contain the request-specific fields.
1011 0 : let span = mkspan!(shard.tenant_shard_id.shard_slug());
1012 :
1013 0 : let timer = record_op_start_and_throttle(
1014 0 : &shard,
1015 0 : metrics::SmgrQueryType::GetPageAtLsn,
1016 0 : received_at,
1017 0 : )
1018 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1019 0 : info_span!(
1020 : target: PERF_TRACE_TARGET,
1021 0 : parent: current_perf_span,
1022 : "THROTTLE",
1023 : )
1024 0 : })
1025 0 : .await?;
1026 :
1027 : // We're holding the Handle
1028 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
1029 0 : let res = Self::wait_or_get_last_lsn(
1030 0 : &shard,
1031 0 : req.hdr.request_lsn,
1032 0 : req.hdr.not_modified_since,
1033 0 : &shard.get_applied_gc_cutoff_lsn(),
1034 0 : &ctx,
1035 0 : )
1036 0 : .maybe_perf_instrument(&ctx, |current_perf_span| {
1037 0 : info_span!(
1038 : target: PERF_TRACE_TARGET,
1039 0 : parent: current_perf_span,
1040 : "WAIT_LSN",
1041 : )
1042 0 : })
1043 0 : .await;
1044 :
1045 0 : let effective_request_lsn = match res {
1046 0 : Ok(lsn) => lsn,
1047 0 : Err(e) => {
1048 0 : return respond_error!(span, e);
1049 : }
1050 : };
1051 : BatchedFeMessage::GetPage {
1052 0 : span,
1053 0 : shard: shard.downgrade(),
1054 0 : effective_request_lsn,
1055 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer, ctx }],
1056 : }
1057 : }
1058 : #[cfg(feature = "testing")]
1059 0 : PagestreamFeMessage::Test(req) => {
1060 0 : let shard = timeline_handles
1061 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1062 0 : .await?;
1063 0 : let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
1064 0 : let timer =
1065 0 : record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
1066 0 : .await?;
1067 0 : BatchedFeMessage::Test {
1068 0 : span,
1069 0 : shard: shard.downgrade(),
1070 0 : requests: vec![BatchedTestRequest { req, timer }],
1071 0 : }
1072 : }
1073 : };
1074 0 : Ok(Some(batched_msg))
1075 0 : }
1076 :
1077 : /// Post-condition: `batch` is Some()
1078 : #[instrument(skip_all, level = tracing::Level::TRACE)]
1079 : #[allow(clippy::boxed_local)]
1080 : fn pagestream_do_batch(
1081 : max_batch_size: NonZeroUsize,
1082 : batch: &mut Result<BatchedFeMessage, QueryError>,
1083 : this_msg: Result<BatchedFeMessage, QueryError>,
1084 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
1085 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1086 :
1087 : let this_msg = match this_msg {
1088 : Ok(this_msg) => this_msg,
1089 : Err(e) => return Err(Err(e)),
1090 : };
1091 :
1092 : match (&mut *batch, this_msg) {
1093 : // something batched already, let's see if we can add this message to the batch
1094 : (
1095 : Ok(BatchedFeMessage::GetPage {
1096 : span: _,
1097 : shard: accum_shard,
1098 : pages: accum_pages,
1099 : effective_request_lsn: accum_lsn,
1100 : }),
1101 : BatchedFeMessage::GetPage {
1102 : span: _,
1103 : shard: this_shard,
1104 : pages: this_pages,
1105 : effective_request_lsn: this_lsn,
1106 : },
1107 0 : ) if (|| {
1108 0 : assert_eq!(this_pages.len(), 1);
1109 0 : if accum_pages.len() >= max_batch_size.get() {
1110 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
1111 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
1112 0 : return false;
1113 0 : }
1114 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1115 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
1116 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1117 : // But the current logic for keeping responses in order does not support that.
1118 0 : return false;
1119 0 : }
1120 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
1121 0 : // as the effective request_lsn changes
1122 0 : if *accum_lsn != this_lsn {
1123 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
1124 0 : return false;
1125 0 : }
1126 0 : true
1127 : })() =>
1128 : {
1129 : // ok to batch
1130 : accum_pages.extend(this_pages);
1131 : Ok(())
1132 : }
1133 : #[cfg(feature = "testing")]
1134 : (
1135 : Ok(BatchedFeMessage::Test {
1136 : shard: accum_shard,
1137 : requests: accum_requests,
1138 : ..
1139 : }),
1140 : BatchedFeMessage::Test {
1141 : shard: this_shard,
1142 : requests: this_requests,
1143 : ..
1144 : },
1145 0 : ) if (|| {
1146 0 : assert!(this_requests.len() == 1);
1147 0 : if accum_requests.len() >= max_batch_size.get() {
1148 0 : trace!(%max_batch_size, "stopping batching because of batch size");
1149 0 : assert_eq!(accum_requests.len(), max_batch_size.get());
1150 0 : return false;
1151 0 : }
1152 0 : if !accum_shard.is_same_handle_as(&this_shard) {
1153 0 : trace!("stopping batching because timeline object mismatch");
1154 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
1155 : // But the current logic for keeping responses in order does not support that.
1156 0 : return false;
1157 0 : }
1158 0 : let this_batch_key = this_requests[0].req.batch_key;
1159 0 : let accum_batch_key = accum_requests[0].req.batch_key;
1160 0 : if this_requests[0].req.batch_key != accum_requests[0].req.batch_key {
1161 0 : trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed");
1162 0 : return false;
1163 0 : }
1164 0 : true
1165 : })() =>
1166 : {
1167 : // ok to batch
1168 : accum_requests.extend(this_requests);
1169 : Ok(())
1170 : }
1171 : // something batched already but this message is unbatchable
1172 : (_, this_msg) => {
1173 : // by default, don't continue batching
1174 : Err(Ok(this_msg))
1175 : }
1176 : }
1177 : }
1178 :
1179 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
1180 : async fn pagesteam_handle_batched_message<IO>(
1181 : &mut self,
1182 : pgb_writer: &mut PostgresBackend<IO>,
1183 : batch: BatchedFeMessage,
1184 : io_concurrency: IoConcurrency,
1185 : cancel: &CancellationToken,
1186 : protocol_version: PagestreamProtocolVersion,
1187 : ctx: &RequestContext,
1188 : ) -> Result<(), QueryError>
1189 : where
1190 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1191 : {
1192 : let started_at = Instant::now();
1193 : let batch = {
1194 : let mut batch = batch;
1195 : batch.observe_execution_start(started_at);
1196 : batch
1197 : };
1198 :
1199 : // Dispatch the batch to the appropriate request handler.
1200 : let log_slow_name = batch.as_static_str();
1201 : let (mut handler_results, span) = {
1202 : // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
1203 : // won't fit on the stack.
1204 : let mut boxpinned =
1205 : Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
1206 : log_slow(
1207 : log_slow_name,
1208 : LOG_SLOW_GETPAGE_THRESHOLD,
1209 : boxpinned.as_mut(),
1210 : )
1211 : .await?
1212 : };
1213 :
1214 : // We purposefully don't count flush time into the smgr operation timer.
1215 : //
1216 : // The reason is that current compute client will not perform protocol processing
1217 : // if the postgres backend process is doing things other than `->smgr_read()`.
1218 : // This is especially the case for prefetch.
1219 : //
1220 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1221 : // all the way into our flush call below.
1222 : //
1223 : // The timer's underlying metric is used for a storage-internal latency SLO and
1224 : // we don't want to include latency in it that we can't control.
1225 : // And as pointed out above, in this case, we don't control the time that flush will take.
1226 : //
1227 : // We put each response in the batch onto the wire in a separate pgb_writer.flush()
1228 : // call, which (all unmeasured) adds syscall overhead but reduces time to first byte
1229 : // and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
1230 : // TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
1231 : let flush_timers = {
1232 : let flushing_start_time = Instant::now();
1233 : let mut flush_timers = Vec::with_capacity(handler_results.len());
1234 : for handler_result in &mut handler_results {
1235 : let flush_timer = match handler_result {
1236 : Ok((_, timer)) => Some(
1237 : timer
1238 : .observe_execution_end(flushing_start_time)
1239 : .expect("we are the first caller"),
1240 : ),
1241 : Err(_) => {
1242 : // TODO: measure errors
1243 : None
1244 : }
1245 : };
1246 : flush_timers.push(flush_timer);
1247 : }
1248 : assert_eq!(flush_timers.len(), handler_results.len());
1249 : flush_timers
1250 : };
1251 :
1252 : // Map handler result to protocol behavior.
1253 : // Some handler errors cause exit from pagestream protocol.
1254 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1255 : for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
1256 : let response_msg = match handler_result {
1257 : Err(e) => match &e.err {
1258 : PageStreamError::Shutdown => {
1259 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1260 : // shutdown, then do not send the error to the client. Instead just drop the
1261 : // connection.
1262 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1263 : return Err(QueryError::Shutdown);
1264 : }
1265 : PageStreamError::Reconnect(reason) => {
1266 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1267 : return Err(QueryError::Reconnect);
1268 : }
1269 : PageStreamError::Read(_)
1270 : | PageStreamError::LsnTimeout(_)
1271 : | PageStreamError::NotFound(_)
1272 : | PageStreamError::BadRequest(_) => {
1273 : // print the all details to the log with {:#}, but for the client the
1274 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1275 : // here includes cancellation which is not an error.
1276 : let full = utils::error::report_compact_sources(&e.err);
1277 0 : span.in_scope(|| {
1278 0 : error!("error reading relation or page version: {full:#}")
1279 0 : });
1280 :
1281 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1282 : req: e.req,
1283 : message: e.err.to_string(),
1284 : })
1285 : }
1286 : },
1287 : Ok((response_msg, _op_timer_already_observed)) => response_msg,
1288 : };
1289 :
1290 : //
1291 : // marshal & transmit response message
1292 : //
1293 :
1294 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1295 : &response_msg.serialize(protocol_version),
1296 : ))?;
1297 :
1298 : failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel);
1299 :
1300 : // what we want to do
1301 : let socket_fd = pgb_writer.socket_fd;
1302 : let flush_fut = pgb_writer.flush();
1303 : // metric for how long flushing takes
1304 : let flush_fut = match flushing_timer {
1305 : Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
1306 : Instant::now(),
1307 : flush_fut,
1308 : socket_fd,
1309 : )),
1310 : None => futures::future::Either::Right(flush_fut),
1311 : };
1312 : // do it while respecting cancellation
1313 0 : let _: () = async move {
1314 0 : tokio::select! {
1315 : biased;
1316 0 : _ = cancel.cancelled() => {
1317 : // We were requested to shut down.
1318 0 : info!("shutdown request received in page handler");
1319 0 : return Err(QueryError::Shutdown)
1320 : }
1321 0 : res = flush_fut => {
1322 0 : res?;
1323 : }
1324 : }
1325 0 : Ok(())
1326 0 : }
1327 : .await?;
1328 : }
1329 : Ok(())
1330 : }
1331 :
1332 : /// Helper which dispatches a batched message to the appropriate handler.
1333 : /// Returns a vec of results, along with the extracted trace span.
1334 0 : async fn pagestream_dispatch_batched_message(
1335 0 : &mut self,
1336 0 : batch: BatchedFeMessage,
1337 0 : io_concurrency: IoConcurrency,
1338 0 : ctx: &RequestContext,
1339 0 : ) -> Result<
1340 0 : (
1341 0 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
1342 0 : Span,
1343 0 : ),
1344 0 : QueryError,
1345 0 : > {
1346 : macro_rules! upgrade_handle_and_set_context {
1347 : ($shard:ident) => {{
1348 : let weak_handle = &$shard;
1349 : let handle = weak_handle.upgrade()?;
1350 : let ctx = ctx.with_scope_page_service_pagestream(&handle);
1351 : (handle, ctx)
1352 : }};
1353 : }
1354 0 : Ok(match batch {
1355 : BatchedFeMessage::Exists {
1356 0 : span,
1357 0 : timer,
1358 0 : shard,
1359 0 : req,
1360 0 : } => {
1361 0 : fail::fail_point!("ps::handle-pagerequest-message::exists");
1362 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1363 : (
1364 0 : vec![
1365 0 : self.handle_get_rel_exists_request(&shard, &req, &ctx)
1366 0 : .instrument(span.clone())
1367 0 : .await
1368 0 : .map(|msg| (msg, timer))
1369 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1370 0 : ],
1371 0 : span,
1372 : )
1373 : }
1374 : BatchedFeMessage::Nblocks {
1375 0 : span,
1376 0 : timer,
1377 0 : shard,
1378 0 : req,
1379 0 : } => {
1380 0 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
1381 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1382 : (
1383 0 : vec![
1384 0 : self.handle_get_nblocks_request(&shard, &req, &ctx)
1385 0 : .instrument(span.clone())
1386 0 : .await
1387 0 : .map(|msg| (msg, timer))
1388 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1389 0 : ],
1390 0 : span,
1391 : )
1392 : }
1393 : BatchedFeMessage::GetPage {
1394 0 : span,
1395 0 : shard,
1396 0 : effective_request_lsn,
1397 0 : pages,
1398 0 : } => {
1399 0 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1400 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1401 : (
1402 : {
1403 0 : let npages = pages.len();
1404 0 : trace!(npages, "handling getpage request");
1405 0 : let res = self
1406 0 : .handle_get_page_at_lsn_request_batched(
1407 0 : &shard,
1408 0 : effective_request_lsn,
1409 0 : pages,
1410 0 : io_concurrency,
1411 0 : &ctx,
1412 0 : )
1413 0 : .instrument(span.clone())
1414 0 : .await;
1415 0 : assert_eq!(res.len(), npages);
1416 0 : res
1417 0 : },
1418 0 : span,
1419 : )
1420 : }
1421 : BatchedFeMessage::DbSize {
1422 0 : span,
1423 0 : timer,
1424 0 : shard,
1425 0 : req,
1426 0 : } => {
1427 0 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1428 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1429 : (
1430 0 : vec![
1431 0 : self.handle_db_size_request(&shard, &req, &ctx)
1432 0 : .instrument(span.clone())
1433 0 : .await
1434 0 : .map(|msg| (msg, timer))
1435 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1436 0 : ],
1437 0 : span,
1438 : )
1439 : }
1440 : BatchedFeMessage::GetSlruSegment {
1441 0 : span,
1442 0 : timer,
1443 0 : shard,
1444 0 : req,
1445 0 : } => {
1446 0 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1447 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1448 : (
1449 0 : vec![
1450 0 : self.handle_get_slru_segment_request(&shard, &req, &ctx)
1451 0 : .instrument(span.clone())
1452 0 : .await
1453 0 : .map(|msg| (msg, timer))
1454 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
1455 0 : ],
1456 0 : span,
1457 : )
1458 : }
1459 : #[cfg(feature = "testing")]
1460 : BatchedFeMessage::Test {
1461 0 : span,
1462 0 : shard,
1463 0 : requests,
1464 0 : } => {
1465 0 : fail::fail_point!("ps::handle-pagerequest-message::test");
1466 0 : let (shard, ctx) = upgrade_handle_and_set_context!(shard);
1467 : (
1468 : {
1469 0 : let npages = requests.len();
1470 0 : trace!(npages, "handling getpage request");
1471 0 : let res = self
1472 0 : .handle_test_request_batch(&shard, requests, &ctx)
1473 0 : .instrument(span.clone())
1474 0 : .await;
1475 0 : assert_eq!(res.len(), npages);
1476 0 : res
1477 0 : },
1478 0 : span,
1479 : )
1480 : }
1481 0 : BatchedFeMessage::RespondError { span, error } => {
1482 0 : // We've already decided to respond with an error, so we don't need to
1483 0 : // call the handler.
1484 0 : (vec![Err(error)], span)
1485 : }
1486 : })
1487 0 : }
1488 :
1489 : /// Pagestream sub-protocol handler.
1490 : ///
1491 : /// It is a simple request-response protocol inside a COPYBOTH session.
1492 : ///
1493 : /// # Coding Discipline
1494 : ///
1495 : /// Coding discipline within this function: all interaction with the `pgb` connection
1496 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1497 : /// This is so that we can shutdown page_service quickly.
1498 : #[instrument(skip_all)]
1499 : async fn handle_pagerequests<IO>(
1500 : &mut self,
1501 : pgb: &mut PostgresBackend<IO>,
1502 : tenant_id: TenantId,
1503 : timeline_id: TimelineId,
1504 : protocol_version: PagestreamProtocolVersion,
1505 : ctx: RequestContext,
1506 : ) -> Result<(), QueryError>
1507 : where
1508 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1509 : {
1510 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1511 :
1512 : // switch client to COPYBOTH
1513 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1514 : tokio::select! {
1515 : biased;
1516 : _ = self.cancel.cancelled() => {
1517 : return Err(QueryError::Shutdown)
1518 : }
1519 : res = pgb.flush() => {
1520 : res?;
1521 : }
1522 : }
1523 :
1524 : let io_concurrency = IoConcurrency::spawn_from_conf(
1525 : self.conf,
1526 : match self.gate_guard.try_clone() {
1527 : Ok(guard) => guard,
1528 : Err(_) => {
1529 : info!("shutdown request received in page handler");
1530 : return Err(QueryError::Shutdown);
1531 : }
1532 : },
1533 : );
1534 :
1535 : let pgb_reader = pgb
1536 : .split()
1537 : .context("implementation error: split pgb into reader and writer")?;
1538 :
1539 : let timeline_handles = self
1540 : .timeline_handles
1541 : .take()
1542 : .expect("implementation error: timeline_handles should not be locked");
1543 :
1544 : let request_span = info_span!("request");
1545 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1546 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1547 : self.handle_pagerequests_pipelined(
1548 : pgb,
1549 : pgb_reader,
1550 : tenant_id,
1551 : timeline_id,
1552 : timeline_handles,
1553 : request_span,
1554 : pipelining_config,
1555 : protocol_version,
1556 : io_concurrency,
1557 : &ctx,
1558 : )
1559 : .await
1560 : }
1561 : PageServicePipeliningConfig::Serial => {
1562 : self.handle_pagerequests_serial(
1563 : pgb,
1564 : pgb_reader,
1565 : tenant_id,
1566 : timeline_id,
1567 : timeline_handles,
1568 : request_span,
1569 : protocol_version,
1570 : io_concurrency,
1571 : &ctx,
1572 : )
1573 : .await
1574 : }
1575 : };
1576 :
1577 : debug!("pagestream subprotocol shut down cleanly");
1578 :
1579 : pgb.unsplit(pgb_reader)
1580 : .context("implementation error: unsplit pgb")?;
1581 :
1582 : let replaced = self.timeline_handles.replace(timeline_handles);
1583 : assert!(replaced.is_none());
1584 :
1585 : result
1586 : }
1587 :
1588 : #[allow(clippy::too_many_arguments)]
1589 0 : async fn handle_pagerequests_serial<IO>(
1590 0 : &mut self,
1591 0 : pgb_writer: &mut PostgresBackend<IO>,
1592 0 : mut pgb_reader: PostgresBackendReader<IO>,
1593 0 : tenant_id: TenantId,
1594 0 : timeline_id: TimelineId,
1595 0 : mut timeline_handles: TimelineHandles,
1596 0 : request_span: Span,
1597 0 : protocol_version: PagestreamProtocolVersion,
1598 0 : io_concurrency: IoConcurrency,
1599 0 : ctx: &RequestContext,
1600 0 : ) -> (
1601 0 : (PostgresBackendReader<IO>, TimelineHandles),
1602 0 : Result<(), QueryError>,
1603 0 : )
1604 0 : where
1605 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1606 0 : {
1607 0 : let cancel = self.cancel.clone();
1608 :
1609 0 : let err = loop {
1610 0 : let msg = Self::pagestream_read_message(
1611 0 : &mut pgb_reader,
1612 0 : tenant_id,
1613 0 : timeline_id,
1614 0 : &mut timeline_handles,
1615 0 : &self.perf_span_fields,
1616 0 : &cancel,
1617 0 : ctx,
1618 0 : protocol_version,
1619 0 : request_span.clone(),
1620 0 : )
1621 0 : .await;
1622 0 : let msg = match msg {
1623 0 : Ok(msg) => msg,
1624 0 : Err(e) => break e,
1625 : };
1626 0 : let msg = match msg {
1627 0 : Some(msg) => msg,
1628 : None => {
1629 0 : debug!("pagestream subprotocol end observed");
1630 0 : return ((pgb_reader, timeline_handles), Ok(()));
1631 : }
1632 : };
1633 :
1634 0 : let result = self
1635 0 : .pagesteam_handle_batched_message(
1636 0 : pgb_writer,
1637 0 : msg,
1638 0 : io_concurrency.clone(),
1639 0 : &cancel,
1640 0 : protocol_version,
1641 0 : ctx,
1642 0 : )
1643 0 : .await;
1644 0 : match result {
1645 0 : Ok(()) => {}
1646 0 : Err(e) => break e,
1647 : }
1648 : };
1649 0 : ((pgb_reader, timeline_handles), Err(err))
1650 0 : }
1651 :
1652 : /// # Cancel-Safety
1653 : ///
1654 : /// May leak tokio tasks if not polled to completion.
1655 : #[allow(clippy::too_many_arguments)]
1656 0 : async fn handle_pagerequests_pipelined<IO>(
1657 0 : &mut self,
1658 0 : pgb_writer: &mut PostgresBackend<IO>,
1659 0 : pgb_reader: PostgresBackendReader<IO>,
1660 0 : tenant_id: TenantId,
1661 0 : timeline_id: TimelineId,
1662 0 : mut timeline_handles: TimelineHandles,
1663 0 : request_span: Span,
1664 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1665 0 : protocol_version: PagestreamProtocolVersion,
1666 0 : io_concurrency: IoConcurrency,
1667 0 : ctx: &RequestContext,
1668 0 : ) -> (
1669 0 : (PostgresBackendReader<IO>, TimelineHandles),
1670 0 : Result<(), QueryError>,
1671 0 : )
1672 0 : where
1673 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1674 0 : {
1675 0 : //
1676 0 : // Pipelined pagestream handling consists of
1677 0 : // - a Batcher that reads requests off the wire and
1678 0 : // and batches them if possible,
1679 0 : // - an Executor that processes the batched requests.
1680 0 : //
1681 0 : // The batch is built up inside an `spsc_fold` channel,
1682 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1683 0 : //
1684 0 : // The Batcher continously folds client requests into the batch,
1685 0 : // while the Executor can at any time take out what's in the batch
1686 0 : // in order to process it.
1687 0 : // This means the next batch builds up while the Executor
1688 0 : // executes the last batch.
1689 0 : //
1690 0 : // CANCELLATION
1691 0 : //
1692 0 : // We run both Batcher and Executor futures to completion before
1693 0 : // returning from this function.
1694 0 : //
1695 0 : // If Executor exits first, it signals cancellation to the Batcher
1696 0 : // via a CancellationToken that is child of `self.cancel`.
1697 0 : // If Batcher exits first, it signals cancellation to the Executor
1698 0 : // by dropping the spsc_fold channel Sender.
1699 0 : //
1700 0 : // CLEAN SHUTDOWN
1701 0 : //
1702 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1703 0 : // In response to such a client message, the Batcher exits.
1704 0 : // The Executor continues to run, draining the spsc_fold channel.
1705 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1706 0 : // indicating that the sender disconnected.
1707 0 : // The Executor exits with Ok(()) in response to that error.
1708 0 : //
1709 0 : // Server initiated shutdown is not clean shutdown, but instead
1710 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1711 0 : // error propagation.
1712 0 : //
1713 0 : // ERROR PROPAGATION
1714 0 : //
1715 0 : // When the Batcher encounter an error, it sends it as a value
1716 0 : // through the spsc_fold channel and exits afterwards.
1717 0 : // When the Executor observes such an error in the channel,
1718 0 : // it exits returning that error value.
1719 0 : //
1720 0 : // This design ensures that the Executor stage will still process
1721 0 : // the batch that was in flight when the Batcher encountered an error,
1722 0 : // thereby beahving identical to a serial implementation.
1723 0 :
1724 0 : let PageServicePipeliningConfigPipelined {
1725 0 : max_batch_size,
1726 0 : execution,
1727 0 : } = pipelining_config;
1728 :
1729 : // Macro to _define_ a pipeline stage.
1730 : macro_rules! pipeline_stage {
1731 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1732 : let cancel: CancellationToken = $cancel;
1733 : let stage_fut = $make_fut(cancel.clone());
1734 0 : async move {
1735 0 : scopeguard::defer! {
1736 0 : debug!("exiting");
1737 0 : }
1738 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1739 0 : .await
1740 0 : }
1741 : .instrument(tracing::info_span!($name))
1742 : }};
1743 : }
1744 :
1745 : //
1746 : // Batcher
1747 : //
1748 :
1749 0 : let perf_span_fields = self.perf_span_fields.clone();
1750 0 :
1751 0 : let cancel_batcher = self.cancel.child_token();
1752 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1753 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1754 0 : let ctx = ctx.attached_child();
1755 0 : async move {
1756 0 : let mut pgb_reader = pgb_reader;
1757 0 : let mut exit = false;
1758 0 : while !exit {
1759 0 : let read_res = Self::pagestream_read_message(
1760 0 : &mut pgb_reader,
1761 0 : tenant_id,
1762 0 : timeline_id,
1763 0 : &mut timeline_handles,
1764 0 : &perf_span_fields,
1765 0 : &cancel_batcher,
1766 0 : &ctx,
1767 0 : protocol_version,
1768 0 : request_span.clone(),
1769 0 : )
1770 0 : .await;
1771 0 : let Some(read_res) = read_res.transpose() else {
1772 0 : debug!("client-initiated shutdown");
1773 0 : break;
1774 : };
1775 0 : exit |= read_res.is_err();
1776 0 : let could_send = batch_tx
1777 0 : .send(read_res, |batch, res| {
1778 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1779 0 : })
1780 0 : .await;
1781 0 : exit |= could_send.is_err();
1782 : }
1783 0 : (pgb_reader, timeline_handles)
1784 0 : }
1785 0 : });
1786 :
1787 : //
1788 : // Executor
1789 : //
1790 :
1791 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1792 0 : let ctx = ctx.attached_child();
1793 0 : async move {
1794 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1795 : loop {
1796 0 : let maybe_batch = batch_rx.recv().await;
1797 0 : let batch = match maybe_batch {
1798 0 : Ok(batch) => batch,
1799 : Err(spsc_fold::RecvError::SenderGone) => {
1800 0 : debug!("upstream gone");
1801 0 : return Ok(());
1802 : }
1803 : };
1804 0 : let batch = match batch {
1805 0 : Ok(batch) => batch,
1806 0 : Err(e) => {
1807 0 : return Err(e);
1808 : }
1809 : };
1810 0 : self.pagesteam_handle_batched_message(
1811 0 : pgb_writer,
1812 0 : batch,
1813 0 : io_concurrency.clone(),
1814 0 : &cancel,
1815 0 : protocol_version,
1816 0 : &ctx,
1817 0 : )
1818 0 : .await?;
1819 : }
1820 0 : }
1821 0 : });
1822 :
1823 : //
1824 : // Execute the stages.
1825 : //
1826 :
1827 0 : match execution {
1828 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1829 0 : tokio::join!(batcher, executor)
1830 : }
1831 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1832 : // These tasks are not tracked anywhere.
1833 0 : let read_messages_task = tokio::spawn(batcher);
1834 0 : let (read_messages_task_res, executor_res_) =
1835 0 : tokio::join!(read_messages_task, executor,);
1836 0 : (
1837 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1838 0 : executor_res_,
1839 0 : )
1840 : }
1841 : }
1842 0 : }
1843 :
1844 : /// Helper function to handle the LSN from client request.
1845 : ///
1846 : /// Each GetPage (and Exists and Nblocks) request includes information about
1847 : /// which version of the page is being requested. The primary compute node
1848 : /// will always request the latest page version, by setting 'request_lsn' to
1849 : /// the last inserted or flushed WAL position, while a standby will request
1850 : /// a version at the LSN that it's currently caught up to.
1851 : ///
1852 : /// In either case, if the page server hasn't received the WAL up to the
1853 : /// requested LSN yet, we will wait for it to arrive. The return value is
1854 : /// the LSN that should be used to look up the page versions.
1855 : ///
1856 : /// In addition to the request LSN, each request carries another LSN,
1857 : /// 'not_modified_since', which is a hint to the pageserver that the client
1858 : /// knows that the page has not been modified between 'not_modified_since'
1859 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1860 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1861 : /// information about when the page was modified, it will use
1862 : /// not_modified_since == lsn. If the client lies and sends a too low
1863 : /// not_modified_hint such that there are in fact later page versions, the
1864 : /// behavior is undefined: the pageserver may return any of the page versions
1865 : /// or an error.
1866 0 : async fn wait_or_get_last_lsn(
1867 0 : timeline: &Timeline,
1868 0 : request_lsn: Lsn,
1869 0 : not_modified_since: Lsn,
1870 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1871 0 : ctx: &RequestContext,
1872 0 : ) -> Result<Lsn, PageStreamError> {
1873 0 : let last_record_lsn = timeline.get_last_record_lsn();
1874 0 :
1875 0 : // Sanity check the request
1876 0 : if request_lsn < not_modified_since {
1877 0 : return Err(PageStreamError::BadRequest(
1878 0 : format!(
1879 0 : "invalid request with request LSN {} and not_modified_since {}",
1880 0 : request_lsn, not_modified_since,
1881 0 : )
1882 0 : .into(),
1883 0 : ));
1884 0 : }
1885 0 :
1886 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1887 0 : if request_lsn == Lsn::INVALID {
1888 0 : return Err(PageStreamError::BadRequest(
1889 0 : "invalid LSN(0) in request".into(),
1890 0 : ));
1891 0 : }
1892 0 :
1893 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1894 0 : //
1895 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1896 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1897 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1898 0 : let gc_info = &timeline.gc_info.read().unwrap();
1899 0 : if !gc_info.lsn_covered_by_lease(request_lsn) {
1900 0 : return Err(
1901 0 : PageStreamError::BadRequest(format!(
1902 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1903 0 : request_lsn, **latest_gc_cutoff_lsn
1904 0 : ).into())
1905 0 : );
1906 0 : }
1907 0 : }
1908 :
1909 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1910 0 : if not_modified_since > last_record_lsn {
1911 0 : timeline
1912 0 : .wait_lsn(
1913 0 : not_modified_since,
1914 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1915 0 : timeline::WaitLsnTimeout::Default,
1916 0 : ctx,
1917 0 : )
1918 0 : .await?;
1919 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1920 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1921 : // advance immediately after we return anyway)
1922 0 : Ok(not_modified_since)
1923 : } else {
1924 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1925 : // here instead. That would give the same result, since we know that there
1926 : // haven't been any modifications since 'not_modified_since'. Using an older
1927 : // LSN might be faster, because that could allow skipping recent layers when
1928 : // finding the page. However, we have historically used 'last_record_lsn', so
1929 : // stick to that for now.
1930 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1931 : }
1932 0 : }
1933 :
1934 : /// Handles the lsn lease request.
1935 : /// If a lease cannot be obtained, the client will receive NULL.
1936 : #[instrument(skip_all, fields(shard_id, %lsn))]
1937 : async fn handle_make_lsn_lease<IO>(
1938 : &mut self,
1939 : pgb: &mut PostgresBackend<IO>,
1940 : tenant_shard_id: TenantShardId,
1941 : timeline_id: TimelineId,
1942 : lsn: Lsn,
1943 : ctx: &RequestContext,
1944 : ) -> Result<(), QueryError>
1945 : where
1946 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1947 : {
1948 : let timeline = self
1949 : .timeline_handles
1950 : .as_mut()
1951 : .unwrap()
1952 : .get(
1953 : tenant_shard_id.tenant_id,
1954 : timeline_id,
1955 : ShardSelector::Known(tenant_shard_id.to_index()),
1956 : )
1957 : .await?;
1958 : set_tracing_field_shard_id(&timeline);
1959 :
1960 : let lease = timeline
1961 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1962 0 : .inspect_err(|e| {
1963 0 : warn!("{e}");
1964 0 : })
1965 : .ok();
1966 0 : let valid_until_str = lease.map(|l| {
1967 0 : l.valid_until
1968 0 : .duration_since(SystemTime::UNIX_EPOCH)
1969 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1970 0 : .as_millis()
1971 0 : .to_string()
1972 0 : });
1973 :
1974 : info!(
1975 : "acquired lease for {} until {}",
1976 : lsn,
1977 : valid_until_str.as_deref().unwrap_or("<unknown>")
1978 : );
1979 :
1980 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1981 :
1982 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1983 : b"valid_until",
1984 : )]))?
1985 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1986 :
1987 : Ok(())
1988 : }
1989 :
1990 : #[instrument(skip_all, fields(shard_id))]
1991 : async fn handle_get_rel_exists_request(
1992 : &mut self,
1993 : timeline: &Timeline,
1994 : req: &PagestreamExistsRequest,
1995 : ctx: &RequestContext,
1996 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1997 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
1998 : let lsn = Self::wait_or_get_last_lsn(
1999 : timeline,
2000 : req.hdr.request_lsn,
2001 : req.hdr.not_modified_since,
2002 : &latest_gc_cutoff_lsn,
2003 : ctx,
2004 : )
2005 : .await?;
2006 :
2007 : let exists = timeline
2008 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
2009 : .await?;
2010 :
2011 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
2012 : req: *req,
2013 : exists,
2014 : }))
2015 : }
2016 :
2017 : #[instrument(skip_all, fields(shard_id))]
2018 : async fn handle_get_nblocks_request(
2019 : &mut self,
2020 : timeline: &Timeline,
2021 : req: &PagestreamNblocksRequest,
2022 : ctx: &RequestContext,
2023 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2024 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2025 : let lsn = Self::wait_or_get_last_lsn(
2026 : timeline,
2027 : req.hdr.request_lsn,
2028 : req.hdr.not_modified_since,
2029 : &latest_gc_cutoff_lsn,
2030 : ctx,
2031 : )
2032 : .await?;
2033 :
2034 : let n_blocks = timeline
2035 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
2036 : .await?;
2037 :
2038 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
2039 : req: *req,
2040 : n_blocks,
2041 : }))
2042 : }
2043 :
2044 : #[instrument(skip_all, fields(shard_id))]
2045 : async fn handle_db_size_request(
2046 : &mut self,
2047 : timeline: &Timeline,
2048 : req: &PagestreamDbSizeRequest,
2049 : ctx: &RequestContext,
2050 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2051 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2052 : let lsn = Self::wait_or_get_last_lsn(
2053 : timeline,
2054 : req.hdr.request_lsn,
2055 : req.hdr.not_modified_since,
2056 : &latest_gc_cutoff_lsn,
2057 : ctx,
2058 : )
2059 : .await?;
2060 :
2061 : let total_blocks = timeline
2062 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
2063 : .await?;
2064 : let db_size = total_blocks as i64 * BLCKSZ as i64;
2065 :
2066 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
2067 : req: *req,
2068 : db_size,
2069 : }))
2070 : }
2071 :
2072 : #[instrument(skip_all)]
2073 : async fn handle_get_page_at_lsn_request_batched(
2074 : &mut self,
2075 : timeline: &Timeline,
2076 : effective_lsn: Lsn,
2077 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
2078 : io_concurrency: IoConcurrency,
2079 : ctx: &RequestContext,
2080 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2081 : debug_assert_current_span_has_tenant_and_timeline_id();
2082 :
2083 : timeline
2084 : .query_metrics
2085 : .observe_getpage_batch_start(requests.len());
2086 :
2087 : // If a page trace is running, submit an event for this request.
2088 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
2089 : let time = SystemTime::now();
2090 : for batch in &requests {
2091 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
2092 : // Ignore error (trace buffer may be full or tracer may have disconnected).
2093 : _ = page_trace.try_send(PageTraceEvent {
2094 : key,
2095 : effective_lsn,
2096 : time,
2097 : });
2098 : }
2099 : }
2100 :
2101 : let results = timeline
2102 : .get_rel_page_at_lsn_batched(
2103 : requests
2104 : .iter()
2105 0 : .map(|p| (&p.req.rel, &p.req.blkno, p.ctx.attached_child())),
2106 : effective_lsn,
2107 : io_concurrency,
2108 : ctx,
2109 : )
2110 : .await;
2111 : assert_eq!(results.len(), requests.len());
2112 :
2113 : // TODO: avoid creating the new Vec here
2114 : Vec::from_iter(
2115 : requests
2116 : .into_iter()
2117 : .zip(results.into_iter())
2118 0 : .map(|(req, res)| {
2119 0 : res.map(|page| {
2120 0 : (
2121 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
2122 0 : req: req.req,
2123 0 : page,
2124 0 : }),
2125 0 : req.timer,
2126 0 : )
2127 0 : })
2128 0 : .map_err(|e| BatchedPageStreamError {
2129 0 : err: PageStreamError::from(e),
2130 0 : req: req.req.hdr,
2131 0 : })
2132 0 : }),
2133 : )
2134 : }
2135 :
2136 : #[instrument(skip_all, fields(shard_id))]
2137 : async fn handle_get_slru_segment_request(
2138 : &mut self,
2139 : timeline: &Timeline,
2140 : req: &PagestreamGetSlruSegmentRequest,
2141 : ctx: &RequestContext,
2142 : ) -> Result<PagestreamBeMessage, PageStreamError> {
2143 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2144 : let lsn = Self::wait_or_get_last_lsn(
2145 : timeline,
2146 : req.hdr.request_lsn,
2147 : req.hdr.not_modified_since,
2148 : &latest_gc_cutoff_lsn,
2149 : ctx,
2150 : )
2151 : .await?;
2152 :
2153 : let kind = SlruKind::from_repr(req.kind)
2154 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
2155 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
2156 :
2157 : Ok(PagestreamBeMessage::GetSlruSegment(
2158 : PagestreamGetSlruSegmentResponse { req: *req, segment },
2159 : ))
2160 : }
2161 :
2162 : // NB: this impl mimics what we do for batched getpage requests.
2163 : #[cfg(feature = "testing")]
2164 : #[instrument(skip_all, fields(shard_id))]
2165 : async fn handle_test_request_batch(
2166 : &mut self,
2167 : timeline: &Timeline,
2168 : requests: Vec<BatchedTestRequest>,
2169 : _ctx: &RequestContext,
2170 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
2171 : // real requests would do something with the timeline
2172 : let mut results = Vec::with_capacity(requests.len());
2173 : for _req in requests.iter() {
2174 : tokio::task::yield_now().await;
2175 :
2176 : results.push({
2177 : if timeline.cancel.is_cancelled() {
2178 : Err(PageReconstructError::Cancelled)
2179 : } else {
2180 : Ok(())
2181 : }
2182 : });
2183 : }
2184 :
2185 : // TODO: avoid creating the new Vec here
2186 : Vec::from_iter(
2187 : requests
2188 : .into_iter()
2189 : .zip(results.into_iter())
2190 0 : .map(|(req, res)| {
2191 0 : res.map(|()| {
2192 0 : (
2193 0 : PagestreamBeMessage::Test(models::PagestreamTestResponse {
2194 0 : req: req.req.clone(),
2195 0 : }),
2196 0 : req.timer,
2197 0 : )
2198 0 : })
2199 0 : .map_err(|e| BatchedPageStreamError {
2200 0 : err: PageStreamError::from(e),
2201 0 : req: req.req.hdr,
2202 0 : })
2203 0 : }),
2204 : )
2205 : }
2206 :
2207 : /// Note on "fullbackup":
2208 : /// Full basebackups should only be used for debugging purposes.
2209 : /// Originally, it was introduced to enable breaking storage format changes,
2210 : /// but that is not applicable anymore.
2211 : ///
2212 : /// # Coding Discipline
2213 : ///
2214 : /// Coding discipline within this function: all interaction with the `pgb` connection
2215 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
2216 : /// This is so that we can shutdown page_service quickly.
2217 : ///
2218 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
2219 : /// to connection cancellation.
2220 : #[allow(clippy::too_many_arguments)]
2221 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
2222 : async fn handle_basebackup_request<IO>(
2223 : &mut self,
2224 : pgb: &mut PostgresBackend<IO>,
2225 : tenant_id: TenantId,
2226 : timeline_id: TimelineId,
2227 : lsn: Option<Lsn>,
2228 : prev_lsn: Option<Lsn>,
2229 : full_backup: bool,
2230 : gzip: bool,
2231 : replica: bool,
2232 : ctx: &RequestContext,
2233 : ) -> Result<(), QueryError>
2234 : where
2235 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
2236 : {
2237 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
2238 0 : match err {
2239 : // TODO: passthrough the error site to the final error message?
2240 0 : BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)),
2241 0 : BasebackupError::Server(e) => QueryError::Other(e),
2242 0 : BasebackupError::Shutdown => QueryError::Shutdown,
2243 : }
2244 0 : }
2245 :
2246 : let started = std::time::Instant::now();
2247 :
2248 : let timeline = self
2249 : .timeline_handles
2250 : .as_mut()
2251 : .unwrap()
2252 : .get(tenant_id, timeline_id, ShardSelector::Zero)
2253 : .await?;
2254 : set_tracing_field_shard_id(&timeline);
2255 : let ctx = ctx.with_scope_timeline(&timeline);
2256 :
2257 : if timeline.is_archived() == Some(true) {
2258 : tracing::info!(
2259 : "timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."
2260 : );
2261 : return Err(QueryError::NotFound("timeline is archived".into()));
2262 : }
2263 :
2264 : let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
2265 : if let Some(lsn) = lsn {
2266 : // Backup was requested at a particular LSN. Wait for it to arrive.
2267 : info!("waiting for {}", lsn);
2268 : timeline
2269 : .wait_lsn(
2270 : lsn,
2271 : crate::tenant::timeline::WaitLsnWaiter::PageService,
2272 : crate::tenant::timeline::WaitLsnTimeout::Default,
2273 : &ctx,
2274 : )
2275 : .await?;
2276 : timeline
2277 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
2278 : .context("invalid basebackup lsn")?;
2279 : }
2280 :
2281 : let lsn_awaited_after = started.elapsed();
2282 :
2283 : // switch client to COPYOUT
2284 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
2285 : .map_err(QueryError::Disconnected)?;
2286 : self.flush_cancellable(pgb, &self.cancel).await?;
2287 :
2288 : // Send a tarball of the latest layer on the timeline. Compress if not
2289 : // fullbackup. TODO Compress in that case too (tests need to be updated)
2290 : if full_backup {
2291 : let mut writer = pgb.copyout_writer();
2292 : basebackup::send_basebackup_tarball(
2293 : &mut writer,
2294 : &timeline,
2295 : lsn,
2296 : prev_lsn,
2297 : full_backup,
2298 : replica,
2299 : &ctx,
2300 : )
2301 : .await
2302 : .map_err(map_basebackup_error)?;
2303 : } else {
2304 : let mut writer = BufWriter::new(pgb.copyout_writer());
2305 : if gzip {
2306 : let mut encoder = GzipEncoder::with_quality(
2307 : &mut writer,
2308 : // NOTE using fast compression because it's on the critical path
2309 : // for compute startup. For an empty database, we get
2310 : // <100KB with this method. The Level::Best compression method
2311 : // gives us <20KB, but maybe we should add basebackup caching
2312 : // on compute shutdown first.
2313 : async_compression::Level::Fastest,
2314 : );
2315 : basebackup::send_basebackup_tarball(
2316 : &mut encoder,
2317 : &timeline,
2318 : lsn,
2319 : prev_lsn,
2320 : full_backup,
2321 : replica,
2322 : &ctx,
2323 : )
2324 : .await
2325 : .map_err(map_basebackup_error)?;
2326 : // shutdown the encoder to ensure the gzip footer is written
2327 : encoder
2328 : .shutdown()
2329 : .await
2330 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
2331 : } else {
2332 : basebackup::send_basebackup_tarball(
2333 : &mut writer,
2334 : &timeline,
2335 : lsn,
2336 : prev_lsn,
2337 : full_backup,
2338 : replica,
2339 : &ctx,
2340 : )
2341 : .await
2342 : .map_err(map_basebackup_error)?;
2343 : }
2344 0 : writer.flush().await.map_err(|e| {
2345 0 : map_basebackup_error(BasebackupError::Client(
2346 0 : e,
2347 0 : "handle_basebackup_request,flush",
2348 0 : ))
2349 0 : })?;
2350 : }
2351 :
2352 : pgb.write_message_noflush(&BeMessage::CopyDone)
2353 : .map_err(QueryError::Disconnected)?;
2354 : self.flush_cancellable(pgb, &timeline.cancel).await?;
2355 :
2356 : let basebackup_after = started
2357 : .elapsed()
2358 : .checked_sub(lsn_awaited_after)
2359 : .unwrap_or(Duration::ZERO);
2360 :
2361 : info!(
2362 : lsn_await_millis = lsn_awaited_after.as_millis(),
2363 : basebackup_millis = basebackup_after.as_millis(),
2364 : "basebackup complete"
2365 : );
2366 :
2367 : Ok(())
2368 : }
2369 :
2370 : // when accessing management api supply None as an argument
2371 : // when using to authorize tenant pass corresponding tenant id
2372 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
2373 0 : if self.auth.is_none() {
2374 : // auth is set to Trust, nothing to check so just return ok
2375 0 : return Ok(());
2376 0 : }
2377 0 : // auth is some, just checked above, when auth is some
2378 0 : // then claims are always present because of checks during connection init
2379 0 : // so this expect won't trigger
2380 0 : let claims = self
2381 0 : .claims
2382 0 : .as_ref()
2383 0 : .expect("claims presence already checked");
2384 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
2385 0 : }
2386 : }
2387 :
2388 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
2389 : #[derive(Debug, Clone, Eq, PartialEq)]
2390 : struct BaseBackupCmd {
2391 : tenant_id: TenantId,
2392 : timeline_id: TimelineId,
2393 : lsn: Option<Lsn>,
2394 : gzip: bool,
2395 : replica: bool,
2396 : }
2397 :
2398 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
2399 : #[derive(Debug, Clone, Eq, PartialEq)]
2400 : struct FullBackupCmd {
2401 : tenant_id: TenantId,
2402 : timeline_id: TimelineId,
2403 : lsn: Option<Lsn>,
2404 : prev_lsn: Option<Lsn>,
2405 : }
2406 :
2407 : /// `pagestream_v2 tenant timeline`
2408 : #[derive(Debug, Clone, Eq, PartialEq)]
2409 : struct PageStreamCmd {
2410 : tenant_id: TenantId,
2411 : timeline_id: TimelineId,
2412 : protocol_version: PagestreamProtocolVersion,
2413 : }
2414 :
2415 : /// `lease lsn tenant timeline lsn`
2416 : #[derive(Debug, Clone, Eq, PartialEq)]
2417 : struct LeaseLsnCmd {
2418 : tenant_shard_id: TenantShardId,
2419 : timeline_id: TimelineId,
2420 : lsn: Lsn,
2421 : }
2422 :
2423 : #[derive(Debug, Clone, Eq, PartialEq)]
2424 : enum PageServiceCmd {
2425 : Set,
2426 : PageStream(PageStreamCmd),
2427 : BaseBackup(BaseBackupCmd),
2428 : FullBackup(FullBackupCmd),
2429 : LeaseLsn(LeaseLsnCmd),
2430 : }
2431 :
2432 : impl PageStreamCmd {
2433 12 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2434 12 : let parameters = query.split_whitespace().collect_vec();
2435 12 : if parameters.len() != 2 {
2436 4 : bail!(
2437 4 : "invalid number of parameters for pagestream command: {}",
2438 4 : query
2439 4 : );
2440 8 : }
2441 8 : let tenant_id = TenantId::from_str(parameters[0])
2442 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2443 4 : let timeline_id = TimelineId::from_str(parameters[1])
2444 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2445 4 : Ok(Self {
2446 4 : tenant_id,
2447 4 : timeline_id,
2448 4 : protocol_version,
2449 4 : })
2450 12 : }
2451 : }
2452 :
2453 : impl FullBackupCmd {
2454 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2455 8 : let parameters = query.split_whitespace().collect_vec();
2456 8 : if parameters.len() < 2 || parameters.len() > 4 {
2457 0 : bail!(
2458 0 : "invalid number of parameters for basebackup command: {}",
2459 0 : query
2460 0 : );
2461 8 : }
2462 8 : let tenant_id = TenantId::from_str(parameters[0])
2463 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2464 8 : let timeline_id = TimelineId::from_str(parameters[1])
2465 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2466 : // The caller is responsible for providing correct lsn and prev_lsn.
2467 8 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2468 : Some(
2469 4 : Lsn::from_str(lsn_str)
2470 4 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2471 : )
2472 : } else {
2473 4 : None
2474 : };
2475 8 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2476 : Some(
2477 4 : Lsn::from_str(prev_lsn_str)
2478 4 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2479 : )
2480 : } else {
2481 4 : None
2482 : };
2483 8 : Ok(Self {
2484 8 : tenant_id,
2485 8 : timeline_id,
2486 8 : lsn,
2487 8 : prev_lsn,
2488 8 : })
2489 8 : }
2490 : }
2491 :
2492 : impl BaseBackupCmd {
2493 36 : fn parse(query: &str) -> anyhow::Result<Self> {
2494 36 : let parameters = query.split_whitespace().collect_vec();
2495 36 : if parameters.len() < 2 {
2496 0 : bail!(
2497 0 : "invalid number of parameters for basebackup command: {}",
2498 0 : query
2499 0 : );
2500 36 : }
2501 36 : let tenant_id = TenantId::from_str(parameters[0])
2502 36 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2503 36 : let timeline_id = TimelineId::from_str(parameters[1])
2504 36 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2505 : let lsn;
2506 : let flags_parse_from;
2507 36 : if let Some(maybe_lsn) = parameters.get(2) {
2508 32 : if *maybe_lsn == "latest" {
2509 4 : lsn = None;
2510 4 : flags_parse_from = 3;
2511 28 : } else if maybe_lsn.starts_with("--") {
2512 20 : lsn = None;
2513 20 : flags_parse_from = 2;
2514 20 : } else {
2515 : lsn = Some(
2516 8 : Lsn::from_str(maybe_lsn)
2517 8 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2518 : );
2519 8 : flags_parse_from = 3;
2520 : }
2521 4 : } else {
2522 4 : lsn = None;
2523 4 : flags_parse_from = 2;
2524 4 : }
2525 :
2526 36 : let mut gzip = false;
2527 36 : let mut replica = false;
2528 :
2529 44 : for ¶m in ¶meters[flags_parse_from..] {
2530 44 : match param {
2531 44 : "--gzip" => {
2532 28 : if gzip {
2533 4 : bail!("duplicate parameter for basebackup command: {param}")
2534 24 : }
2535 24 : gzip = true
2536 : }
2537 16 : "--replica" => {
2538 8 : if replica {
2539 0 : bail!("duplicate parameter for basebackup command: {param}")
2540 8 : }
2541 8 : replica = true
2542 : }
2543 8 : _ => bail!("invalid parameter for basebackup command: {param}"),
2544 : }
2545 : }
2546 24 : Ok(Self {
2547 24 : tenant_id,
2548 24 : timeline_id,
2549 24 : lsn,
2550 24 : gzip,
2551 24 : replica,
2552 24 : })
2553 36 : }
2554 : }
2555 :
2556 : impl LeaseLsnCmd {
2557 8 : fn parse(query: &str) -> anyhow::Result<Self> {
2558 8 : let parameters = query.split_whitespace().collect_vec();
2559 8 : if parameters.len() != 3 {
2560 0 : bail!(
2561 0 : "invalid number of parameters for lease lsn command: {}",
2562 0 : query
2563 0 : );
2564 8 : }
2565 8 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2566 8 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2567 8 : let timeline_id = TimelineId::from_str(parameters[1])
2568 8 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2569 8 : let lsn = Lsn::from_str(parameters[2])
2570 8 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2571 8 : Ok(Self {
2572 8 : tenant_shard_id,
2573 8 : timeline_id,
2574 8 : lsn,
2575 8 : })
2576 8 : }
2577 : }
2578 :
2579 : impl PageServiceCmd {
2580 84 : fn parse(query: &str) -> anyhow::Result<Self> {
2581 84 : let query = query.trim();
2582 84 : let Some((cmd, other)) = query.split_once(' ') else {
2583 8 : bail!("cannot parse query: {query}")
2584 : };
2585 76 : match cmd.to_ascii_lowercase().as_str() {
2586 76 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2587 12 : other,
2588 12 : PagestreamProtocolVersion::V2,
2589 12 : )?)),
2590 64 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2591 0 : other,
2592 0 : PagestreamProtocolVersion::V3,
2593 0 : )?)),
2594 64 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2595 28 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2596 20 : "lease" => {
2597 12 : let Some((cmd2, other)) = other.split_once(' ') else {
2598 0 : bail!("invalid lease command: {cmd}");
2599 : };
2600 12 : let cmd2 = cmd2.to_ascii_lowercase();
2601 12 : if cmd2 == "lsn" {
2602 8 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2603 : } else {
2604 4 : bail!("invalid lease command: {cmd}");
2605 : }
2606 : }
2607 8 : "set" => Ok(Self::Set),
2608 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2609 : }
2610 84 : }
2611 : }
2612 :
2613 : /// Parse the startup options from the postgres wire protocol startup packet.
2614 : ///
2615 : /// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
2616 : /// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
2617 : /// whether all options are successfully parsed. There could be duplicates in the options
2618 : /// if the caller passed such parameters.
2619 28 : fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
2620 28 : let mut parsing_config = false;
2621 28 : let mut has_error = false;
2622 28 : let mut config = Vec::new();
2623 64 : for item in options.split_whitespace() {
2624 64 : if item == "-c" {
2625 36 : if !parsing_config {
2626 32 : parsing_config = true;
2627 32 : } else {
2628 : // "-c" followed with another "-c"
2629 4 : tracing::warn!("failed to parse the startup options: {options}");
2630 4 : has_error = true;
2631 4 : break;
2632 : }
2633 28 : } else if item.starts_with("-c") || parsing_config {
2634 28 : let Some((mut key, value)) = item.split_once('=') else {
2635 : // "-c" followed with an invalid option
2636 4 : tracing::warn!("failed to parse the startup options: {options}");
2637 4 : has_error = true;
2638 4 : break;
2639 : };
2640 24 : if !parsing_config {
2641 : // Parse "-coptions=X"
2642 4 : let Some(stripped_key) = key.strip_prefix("-c") else {
2643 0 : tracing::warn!("failed to parse the startup options: {options}");
2644 0 : has_error = true;
2645 0 : break;
2646 : };
2647 4 : key = stripped_key;
2648 20 : }
2649 24 : config.push((key.to_string(), value.to_string()));
2650 24 : parsing_config = false;
2651 : } else {
2652 0 : tracing::warn!("failed to parse the startup options: {options}");
2653 0 : has_error = true;
2654 0 : break;
2655 : }
2656 : }
2657 28 : if parsing_config {
2658 : // "-c" without the option
2659 12 : tracing::warn!("failed to parse the startup options: {options}");
2660 12 : has_error = true;
2661 16 : }
2662 28 : (config, has_error)
2663 28 : }
2664 :
2665 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2666 : where
2667 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2668 : {
2669 0 : fn check_auth_jwt(
2670 0 : &mut self,
2671 0 : _pgb: &mut PostgresBackend<IO>,
2672 0 : jwt_response: &[u8],
2673 0 : ) -> Result<(), QueryError> {
2674 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2675 : // which requires auth to be present
2676 0 : let data = self
2677 0 : .auth
2678 0 : .as_ref()
2679 0 : .unwrap()
2680 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2681 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2682 :
2683 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2684 0 : return Err(QueryError::Unauthorized(
2685 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2686 0 : ));
2687 0 : }
2688 0 :
2689 0 : debug!(
2690 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2691 : data.claims.scope, data.claims.tenant_id,
2692 : );
2693 :
2694 0 : self.claims = Some(data.claims);
2695 0 : Ok(())
2696 0 : }
2697 :
2698 0 : fn startup(
2699 0 : &mut self,
2700 0 : _pgb: &mut PostgresBackend<IO>,
2701 0 : sm: &FeStartupPacket,
2702 0 : ) -> Result<(), QueryError> {
2703 0 : fail::fail_point!("ps::connection-start::startup-packet");
2704 :
2705 0 : if let FeStartupPacket::StartupMessage { params, .. } = sm {
2706 0 : if let Some(app_name) = params.get("application_name") {
2707 0 : self.perf_span_fields.application_name = Some(app_name.to_string());
2708 0 : Span::current().record("application_name", field::display(app_name));
2709 0 : }
2710 0 : if let Some(options) = params.get("options") {
2711 0 : let (config, _) = parse_options(options);
2712 0 : for (key, value) in config {
2713 0 : if key == "neon.compute_mode" {
2714 0 : self.perf_span_fields.compute_mode = Some(value.clone());
2715 0 : Span::current().record("compute_mode", field::display(value));
2716 0 : }
2717 : }
2718 0 : }
2719 0 : };
2720 :
2721 0 : Ok(())
2722 0 : }
2723 :
2724 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2725 : async fn process_query(
2726 : &mut self,
2727 : pgb: &mut PostgresBackend<IO>,
2728 : query_string: &str,
2729 : ) -> Result<(), QueryError> {
2730 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2731 0 : info!("Hit failpoint for bad connection");
2732 0 : Err(QueryError::SimulatedConnectionError)
2733 0 : });
2734 :
2735 : fail::fail_point!("ps::connection-start::process-query");
2736 :
2737 : let ctx = self.connection_ctx.attached_child();
2738 : debug!("process query {query_string}");
2739 : let query = PageServiceCmd::parse(query_string)?;
2740 : match query {
2741 : PageServiceCmd::PageStream(PageStreamCmd {
2742 : tenant_id,
2743 : timeline_id,
2744 : protocol_version,
2745 : }) => {
2746 : tracing::Span::current()
2747 : .record("tenant_id", field::display(tenant_id))
2748 : .record("timeline_id", field::display(timeline_id));
2749 :
2750 : self.check_permission(Some(tenant_id))?;
2751 : let command_kind = match protocol_version {
2752 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2753 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2754 : };
2755 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2756 :
2757 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2758 : .await?;
2759 : }
2760 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2761 : tenant_id,
2762 : timeline_id,
2763 : lsn,
2764 : gzip,
2765 : replica,
2766 : }) => {
2767 : tracing::Span::current()
2768 : .record("tenant_id", field::display(tenant_id))
2769 : .record("timeline_id", field::display(timeline_id));
2770 :
2771 : self.check_permission(Some(tenant_id))?;
2772 :
2773 : COMPUTE_COMMANDS_COUNTERS
2774 : .for_command(ComputeCommandKind::Basebackup)
2775 : .inc();
2776 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2777 0 : let res = async {
2778 0 : self.handle_basebackup_request(
2779 0 : pgb,
2780 0 : tenant_id,
2781 0 : timeline_id,
2782 0 : lsn,
2783 0 : None,
2784 0 : false,
2785 0 : gzip,
2786 0 : replica,
2787 0 : &ctx,
2788 0 : )
2789 0 : .await?;
2790 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2791 0 : Result::<(), QueryError>::Ok(())
2792 0 : }
2793 : .await;
2794 : metric_recording.observe(&res);
2795 : res?;
2796 : }
2797 : // same as basebackup, but result includes relational data as well
2798 : PageServiceCmd::FullBackup(FullBackupCmd {
2799 : tenant_id,
2800 : timeline_id,
2801 : lsn,
2802 : prev_lsn,
2803 : }) => {
2804 : tracing::Span::current()
2805 : .record("tenant_id", field::display(tenant_id))
2806 : .record("timeline_id", field::display(timeline_id));
2807 :
2808 : self.check_permission(Some(tenant_id))?;
2809 :
2810 : COMPUTE_COMMANDS_COUNTERS
2811 : .for_command(ComputeCommandKind::Fullbackup)
2812 : .inc();
2813 :
2814 : // Check that the timeline exists
2815 : self.handle_basebackup_request(
2816 : pgb,
2817 : tenant_id,
2818 : timeline_id,
2819 : lsn,
2820 : prev_lsn,
2821 : true,
2822 : false,
2823 : false,
2824 : &ctx,
2825 : )
2826 : .await?;
2827 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2828 : }
2829 : PageServiceCmd::Set => {
2830 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2831 : // on connect
2832 : // TODO: allow setting options, i.e., application_name/compute_mode via SET commands
2833 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2834 : }
2835 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2836 : tenant_shard_id,
2837 : timeline_id,
2838 : lsn,
2839 : }) => {
2840 : tracing::Span::current()
2841 : .record("tenant_id", field::display(tenant_shard_id))
2842 : .record("timeline_id", field::display(timeline_id));
2843 :
2844 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2845 :
2846 : COMPUTE_COMMANDS_COUNTERS
2847 : .for_command(ComputeCommandKind::LeaseLsn)
2848 : .inc();
2849 :
2850 : match self
2851 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2852 : .await
2853 : {
2854 : Ok(()) => {
2855 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2856 : }
2857 : Err(e) => {
2858 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2859 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2860 : &e.to_string(),
2861 : Some(e.pg_error_code()),
2862 : ))?
2863 : }
2864 : };
2865 : }
2866 : }
2867 :
2868 : Ok(())
2869 : }
2870 : }
2871 :
2872 : impl From<GetActiveTenantError> for QueryError {
2873 0 : fn from(e: GetActiveTenantError) -> Self {
2874 0 : match e {
2875 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2876 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2877 0 : ),
2878 : GetActiveTenantError::Cancelled
2879 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2880 0 : QueryError::Shutdown
2881 : }
2882 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2883 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2884 : }
2885 0 : }
2886 : }
2887 :
2888 : #[derive(Debug, thiserror::Error)]
2889 : pub(crate) enum GetActiveTimelineError {
2890 : #[error(transparent)]
2891 : Tenant(GetActiveTenantError),
2892 : #[error(transparent)]
2893 : Timeline(#[from] GetTimelineError),
2894 : }
2895 :
2896 : impl From<GetActiveTimelineError> for QueryError {
2897 0 : fn from(e: GetActiveTimelineError) -> Self {
2898 0 : match e {
2899 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2900 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2901 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2902 : }
2903 0 : }
2904 : }
2905 :
2906 : impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
2907 0 : fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
2908 0 : match e {
2909 0 : crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
2910 0 : }
2911 0 : }
2912 : }
2913 :
2914 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2915 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2916 0 : tracing::Span::current().record(
2917 0 : "shard_id",
2918 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2919 0 : );
2920 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2921 0 : }
2922 :
2923 : struct WaitedForLsn(Lsn);
2924 : impl From<WaitedForLsn> for Lsn {
2925 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2926 0 : lsn
2927 0 : }
2928 : }
2929 :
2930 : #[cfg(test)]
2931 : mod tests {
2932 : use utils::shard::ShardCount;
2933 :
2934 : use super::*;
2935 :
2936 : #[test]
2937 4 : fn pageservice_cmd_parse() {
2938 4 : let tenant_id = TenantId::generate();
2939 4 : let timeline_id = TimelineId::generate();
2940 4 : let cmd =
2941 4 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2942 4 : assert_eq!(
2943 4 : cmd,
2944 4 : PageServiceCmd::PageStream(PageStreamCmd {
2945 4 : tenant_id,
2946 4 : timeline_id,
2947 4 : protocol_version: PagestreamProtocolVersion::V2,
2948 4 : })
2949 4 : );
2950 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2951 4 : assert_eq!(
2952 4 : cmd,
2953 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2954 4 : tenant_id,
2955 4 : timeline_id,
2956 4 : lsn: None,
2957 4 : gzip: false,
2958 4 : replica: false
2959 4 : })
2960 4 : );
2961 4 : let cmd =
2962 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2963 4 : assert_eq!(
2964 4 : cmd,
2965 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2966 4 : tenant_id,
2967 4 : timeline_id,
2968 4 : lsn: None,
2969 4 : gzip: true,
2970 4 : replica: false
2971 4 : })
2972 4 : );
2973 4 : let cmd =
2974 4 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2975 4 : assert_eq!(
2976 4 : cmd,
2977 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2978 4 : tenant_id,
2979 4 : timeline_id,
2980 4 : lsn: None,
2981 4 : gzip: false,
2982 4 : replica: false
2983 4 : })
2984 4 : );
2985 4 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2986 4 : .unwrap();
2987 4 : assert_eq!(
2988 4 : cmd,
2989 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2990 4 : tenant_id,
2991 4 : timeline_id,
2992 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2993 4 : gzip: false,
2994 4 : replica: false
2995 4 : })
2996 4 : );
2997 4 : let cmd = PageServiceCmd::parse(&format!(
2998 4 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2999 4 : ))
3000 4 : .unwrap();
3001 4 : assert_eq!(
3002 4 : cmd,
3003 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3004 4 : tenant_id,
3005 4 : timeline_id,
3006 4 : lsn: None,
3007 4 : gzip: true,
3008 4 : replica: true
3009 4 : })
3010 4 : );
3011 4 : let cmd = PageServiceCmd::parse(&format!(
3012 4 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
3013 4 : ))
3014 4 : .unwrap();
3015 4 : assert_eq!(
3016 4 : cmd,
3017 4 : PageServiceCmd::BaseBackup(BaseBackupCmd {
3018 4 : tenant_id,
3019 4 : timeline_id,
3020 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3021 4 : gzip: true,
3022 4 : replica: true
3023 4 : })
3024 4 : );
3025 4 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
3026 4 : assert_eq!(
3027 4 : cmd,
3028 4 : PageServiceCmd::FullBackup(FullBackupCmd {
3029 4 : tenant_id,
3030 4 : timeline_id,
3031 4 : lsn: None,
3032 4 : prev_lsn: None
3033 4 : })
3034 4 : );
3035 4 : let cmd = PageServiceCmd::parse(&format!(
3036 4 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
3037 4 : ))
3038 4 : .unwrap();
3039 4 : assert_eq!(
3040 4 : cmd,
3041 4 : PageServiceCmd::FullBackup(FullBackupCmd {
3042 4 : tenant_id,
3043 4 : timeline_id,
3044 4 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
3045 4 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
3046 4 : })
3047 4 : );
3048 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
3049 4 : let cmd = PageServiceCmd::parse(&format!(
3050 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3051 4 : ))
3052 4 : .unwrap();
3053 4 : assert_eq!(
3054 4 : cmd,
3055 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3056 4 : tenant_shard_id,
3057 4 : timeline_id,
3058 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3059 4 : })
3060 4 : );
3061 4 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
3062 4 : let cmd = PageServiceCmd::parse(&format!(
3063 4 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
3064 4 : ))
3065 4 : .unwrap();
3066 4 : assert_eq!(
3067 4 : cmd,
3068 4 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
3069 4 : tenant_shard_id,
3070 4 : timeline_id,
3071 4 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
3072 4 : })
3073 4 : );
3074 4 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
3075 4 : assert_eq!(cmd, PageServiceCmd::Set);
3076 4 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
3077 4 : assert_eq!(cmd, PageServiceCmd::Set);
3078 4 : }
3079 :
3080 : #[test]
3081 4 : fn pageservice_cmd_err_handling() {
3082 4 : let tenant_id = TenantId::generate();
3083 4 : let timeline_id = TimelineId::generate();
3084 4 : let cmd = PageServiceCmd::parse("unknown_command");
3085 4 : assert!(cmd.is_err());
3086 4 : let cmd = PageServiceCmd::parse("pagestream_v2");
3087 4 : assert!(cmd.is_err());
3088 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
3089 4 : assert!(cmd.is_err());
3090 4 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
3091 4 : assert!(cmd.is_err());
3092 4 : let cmd = PageServiceCmd::parse(&format!(
3093 4 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
3094 4 : ));
3095 4 : assert!(cmd.is_err());
3096 4 : let cmd = PageServiceCmd::parse(&format!(
3097 4 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
3098 4 : ));
3099 4 : assert!(cmd.is_err());
3100 4 : let cmd = PageServiceCmd::parse(&format!(
3101 4 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
3102 4 : ));
3103 4 : assert!(cmd.is_err());
3104 4 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
3105 4 : assert!(cmd.is_err());
3106 4 : }
3107 :
3108 : #[test]
3109 4 : fn test_parse_options() {
3110 4 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
3111 4 : assert!(!has_error);
3112 4 : assert_eq!(
3113 4 : config,
3114 4 : vec![("neon.compute_mode".to_string(), "primary".to_string())]
3115 4 : );
3116 :
3117 4 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
3118 4 : assert!(!has_error);
3119 4 : assert_eq!(
3120 4 : config,
3121 4 : vec![
3122 4 : ("neon.compute_mode".to_string(), "primary".to_string()),
3123 4 : ("foo".to_string(), "bar".to_string()),
3124 4 : ]
3125 4 : );
3126 :
3127 4 : let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
3128 4 : assert!(!has_error);
3129 4 : assert_eq!(
3130 4 : config,
3131 4 : vec![
3132 4 : ("neon.compute_mode".to_string(), "primary".to_string()),
3133 4 : ("foo".to_string(), "bar".to_string()),
3134 4 : ]
3135 4 : );
3136 :
3137 4 : let (_, has_error) = parse_options("-c");
3138 4 : assert!(has_error);
3139 :
3140 4 : let (_, has_error) = parse_options("-c foo=bar -c -c");
3141 4 : assert!(has_error);
3142 :
3143 4 : let (_, has_error) = parse_options(" ");
3144 4 : assert!(!has_error);
3145 :
3146 4 : let (_, has_error) = parse_options(" -c neon.compute_mode");
3147 4 : assert!(has_error);
3148 4 : }
3149 : }
|