LCOV - code coverage report
Current view: top level - pageserver/src - page_service.rs (source / functions) Coverage Total Hit
Test: 09e7485004805bd42b53a0c369170b3228136512.info Lines: 38.5 % 737 284
Test Date: 2024-11-21 18:36:18 Functions: 8.9 % 90 8

            Line data    Source code
       1              : //! The Page Service listens for client connections and serves their GetPage@LSN
       2              : //! requests.
       3              : 
       4              : use anyhow::{bail, Context};
       5              : use async_compression::tokio::write::GzipEncoder;
       6              : use async_timer::Timer;
       7              : use bytes::Buf;
       8              : use futures::FutureExt;
       9              : use itertools::Itertools;
      10              : use once_cell::sync::OnceCell;
      11              : use pageserver_api::models::{self, TenantState};
      12              : use pageserver_api::models::{
      13              :     PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
      14              :     PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
      15              :     PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
      16              :     PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
      17              :     PagestreamProtocolVersion,
      18              : };
      19              : use pageserver_api::shard::TenantShardId;
      20              : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
      21              : use pq_proto::framed::ConnectionError;
      22              : use pq_proto::FeStartupPacket;
      23              : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
      24              : use std::borrow::Cow;
      25              : use std::io;
      26              : use std::pin::Pin;
      27              : use std::str;
      28              : use std::str::FromStr;
      29              : use std::sync::Arc;
      30              : use std::time::SystemTime;
      31              : use std::time::{Duration, Instant};
      32              : use tokio::io::{AsyncRead, AsyncWrite};
      33              : use tokio::io::{AsyncWriteExt, BufWriter};
      34              : use tokio::task::JoinHandle;
      35              : use tokio_util::sync::CancellationToken;
      36              : use tracing::*;
      37              : use utils::{
      38              :     auth::{Claims, Scope, SwappableJwtAuth},
      39              :     id::{TenantId, TimelineId},
      40              :     lsn::Lsn,
      41              :     simple_rcu::RcuReadGuard,
      42              : };
      43              : 
      44              : use crate::auth::check_permission;
      45              : use crate::basebackup;
      46              : use crate::basebackup::BasebackupError;
      47              : use crate::config::PageServerConf;
      48              : use crate::context::{DownloadBehavior, RequestContext};
      49              : use crate::metrics::{self};
      50              : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
      51              : use crate::pgdatadir_mapping::Version;
      52              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
      53              : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
      54              : use crate::task_mgr::TaskKind;
      55              : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
      56              : use crate::tenant::mgr::ShardSelector;
      57              : use crate::tenant::mgr::TenantManager;
      58              : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
      59              : use crate::tenant::timeline::{self, WaitLsnError};
      60              : use crate::tenant::GetTimelineError;
      61              : use crate::tenant::PageReconstructError;
      62              : use crate::tenant::Timeline;
      63              : use pageserver_api::key::rel_block_to_key;
      64              : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
      65              : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
      66              : use postgres_ffi::BLCKSZ;
      67              : 
      68              : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
      69              : /// is not yet in state [`TenantState::Active`].
      70              : ///
      71              : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
      72              : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
      73              : 
      74              : ///////////////////////////////////////////////////////////////////////////////
      75              : 
      76              : pub struct Listener {
      77              :     cancel: CancellationToken,
      78              :     /// Cancel the listener task through `listen_cancel` to shut down the listener
      79              :     /// and get a handle on the existing connections.
      80              :     task: JoinHandle<Connections>,
      81              : }
      82              : 
      83              : pub struct Connections {
      84              :     cancel: CancellationToken,
      85              :     tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
      86              : }
      87              : 
      88            0 : pub fn spawn(
      89            0 :     conf: &'static PageServerConf,
      90            0 :     tenant_manager: Arc<TenantManager>,
      91            0 :     pg_auth: Option<Arc<SwappableJwtAuth>>,
      92            0 :     tcp_listener: tokio::net::TcpListener,
      93            0 : ) -> Listener {
      94            0 :     let cancel = CancellationToken::new();
      95            0 :     let libpq_ctx = RequestContext::todo_child(
      96            0 :         TaskKind::LibpqEndpointListener,
      97            0 :         // listener task shouldn't need to download anything. (We will
      98            0 :         // create a separate sub-contexts for each connection, with their
      99            0 :         // own download behavior. This context is used only to listen and
     100            0 :         // accept connections.)
     101            0 :         DownloadBehavior::Error,
     102            0 :     );
     103            0 :     let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
     104            0 :         "libpq listener",
     105            0 :         libpq_listener_main(
     106            0 :             tenant_manager,
     107            0 :             pg_auth,
     108            0 :             tcp_listener,
     109            0 :             conf.pg_auth_type,
     110            0 :             conf.server_side_batch_timeout,
     111            0 :             libpq_ctx,
     112            0 :             cancel.clone(),
     113            0 :         )
     114            0 :         .map(anyhow::Ok),
     115            0 :     ));
     116            0 : 
     117            0 :     Listener { cancel, task }
     118            0 : }
     119              : 
     120              : impl Listener {
     121            0 :     pub async fn stop_accepting(self) -> Connections {
     122            0 :         self.cancel.cancel();
     123            0 :         self.task
     124            0 :             .await
     125            0 :             .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
     126            0 :     }
     127              : }
     128              : impl Connections {
     129            0 :     pub(crate) async fn shutdown(self) {
     130            0 :         let Self { cancel, mut tasks } = self;
     131            0 :         cancel.cancel();
     132            0 :         while let Some(res) = tasks.join_next().await {
     133            0 :             Self::handle_connection_completion(res);
     134            0 :         }
     135            0 :     }
     136              : 
     137            0 :     fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
     138            0 :         match res {
     139            0 :             Ok(Ok(())) => {}
     140            0 :             Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
     141            0 :             Err(e) => error!("page_service connection task panicked: {:?}", e),
     142              :         }
     143            0 :     }
     144              : }
     145              : 
     146              : ///
     147              : /// Main loop of the page service.
     148              : ///
     149              : /// Listens for connections, and launches a new handler task for each.
     150              : ///
     151              : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
     152              : /// open connections.
     153              : ///
     154            0 : pub async fn libpq_listener_main(
     155            0 :     tenant_manager: Arc<TenantManager>,
     156            0 :     auth: Option<Arc<SwappableJwtAuth>>,
     157            0 :     listener: tokio::net::TcpListener,
     158            0 :     auth_type: AuthType,
     159            0 :     server_side_batch_timeout: Option<Duration>,
     160            0 :     listener_ctx: RequestContext,
     161            0 :     listener_cancel: CancellationToken,
     162            0 : ) -> Connections {
     163            0 :     let connections_cancel = CancellationToken::new();
     164            0 :     let mut connection_handler_tasks = tokio::task::JoinSet::default();
     165              : 
     166              :     loop {
     167            0 :         let accepted = tokio::select! {
     168              :             biased;
     169            0 :             _ = listener_cancel.cancelled() => break,
     170            0 :             next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
     171            0 :                 let res = next.expect("we dont poll while empty");
     172            0 :                 Connections::handle_connection_completion(res);
     173            0 :                 continue;
     174              :             }
     175            0 :             accepted = listener.accept() => accepted,
     176            0 :         };
     177            0 : 
     178            0 :         match accepted {
     179            0 :             Ok((socket, peer_addr)) => {
     180            0 :                 // Connection established. Spawn a new task to handle it.
     181            0 :                 debug!("accepted connection from {}", peer_addr);
     182            0 :                 let local_auth = auth.clone();
     183            0 :                 let connection_ctx = listener_ctx
     184            0 :                     .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
     185            0 :                 connection_handler_tasks.spawn(page_service_conn_main(
     186            0 :                     tenant_manager.clone(),
     187            0 :                     local_auth,
     188            0 :                     socket,
     189            0 :                     auth_type,
     190            0 :                     server_side_batch_timeout,
     191            0 :                     connection_ctx,
     192            0 :                     connections_cancel.child_token(),
     193            0 :                 ));
     194              :             }
     195            0 :             Err(err) => {
     196            0 :                 // accept() failed. Log the error, and loop back to retry on next connection.
     197            0 :                 error!("accept() failed: {:?}", err);
     198              :             }
     199              :         }
     200              :     }
     201              : 
     202            0 :     debug!("page_service listener loop terminated");
     203              : 
     204            0 :     Connections {
     205            0 :         cancel: connections_cancel,
     206            0 :         tasks: connection_handler_tasks,
     207            0 :     }
     208            0 : }
     209              : 
     210              : type ConnectionHandlerResult = anyhow::Result<()>;
     211              : 
     212            0 : #[instrument(skip_all, fields(peer_addr))]
     213              : async fn page_service_conn_main(
     214              :     tenant_manager: Arc<TenantManager>,
     215              :     auth: Option<Arc<SwappableJwtAuth>>,
     216              :     socket: tokio::net::TcpStream,
     217              :     auth_type: AuthType,
     218              :     server_side_batch_timeout: Option<Duration>,
     219              :     connection_ctx: RequestContext,
     220              :     cancel: CancellationToken,
     221              : ) -> ConnectionHandlerResult {
     222              :     let _guard = LIVE_CONNECTIONS
     223              :         .with_label_values(&["page_service"])
     224              :         .guard();
     225              : 
     226              :     socket
     227              :         .set_nodelay(true)
     228              :         .context("could not set TCP_NODELAY")?;
     229              : 
     230              :     let peer_addr = socket.peer_addr().context("get peer address")?;
     231              :     tracing::Span::current().record("peer_addr", field::display(peer_addr));
     232              : 
     233              :     // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
     234              :     // - long enough for most valid compute connections
     235              :     // - less than infinite to stop us from "leaking" connections to long-gone computes
     236              :     //
     237              :     // no write timeout is used, because the kernel is assumed to error writes after some time.
     238              :     let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
     239              : 
     240              :     let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
     241            0 :     let socket_timeout_ms = (|| {
     242            0 :         fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
     243              :             // Exponential distribution for simulating
     244              :             // poor network conditions, expect about avg_timeout_ms to be around 15
     245              :             // in tests
     246            0 :             if let Some(avg_timeout_ms) = avg_timeout_ms {
     247            0 :                 let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
     248            0 :                 let u = rand::random::<f32>();
     249            0 :                 ((1.0 - u).ln() / (-avg)) as u64
     250              :             } else {
     251            0 :                 default_timeout_ms
     252              :             }
     253            0 :         });
     254            0 :         default_timeout_ms
     255              :     })();
     256              : 
     257              :     // A timeout here does not mean the client died, it can happen if it's just idle for
     258              :     // a while: we will tear down this PageServerHandler and instantiate a new one if/when
     259              :     // they reconnect.
     260              :     socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
     261              :     let socket = std::pin::pin!(socket);
     262              : 
     263              :     fail::fail_point!("ps::connection-start::pre-login");
     264              : 
     265              :     // XXX: pgbackend.run() should take the connection_ctx,
     266              :     // and create a child per-query context when it invokes process_query.
     267              :     // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
     268              :     // and create the per-query context in process_query ourselves.
     269              :     let mut conn_handler = PageServerHandler::new(
     270              :         tenant_manager,
     271              :         auth,
     272              :         server_side_batch_timeout,
     273              :         connection_ctx,
     274              :         cancel.clone(),
     275              :     );
     276              :     let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
     277              : 
     278              :     match pgbackend.run(&mut conn_handler, &cancel).await {
     279              :         Ok(()) => {
     280              :             // we've been requested to shut down
     281              :             Ok(())
     282              :         }
     283              :         Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     284              :             if is_expected_io_error(&io_error) {
     285              :                 info!("Postgres client disconnected ({io_error})");
     286              :                 Ok(())
     287              :             } else {
     288              :                 let tenant_id = conn_handler.timeline_handles.tenant_id();
     289              :                 Err(io_error).context(format!(
     290              :                     "Postgres connection error for tenant_id={:?} client at peer_addr={}",
     291              :                     tenant_id, peer_addr
     292              :                 ))
     293              :             }
     294              :         }
     295              :         other => {
     296              :             let tenant_id = conn_handler.timeline_handles.tenant_id();
     297              :             other.context(format!(
     298              :                 "Postgres query error for tenant_id={:?} client peer_addr={}",
     299              :                 tenant_id, peer_addr
     300              :             ))
     301              :         }
     302              :     }
     303              : }
     304              : 
     305              : struct PageServerHandler {
     306              :     auth: Option<Arc<SwappableJwtAuth>>,
     307              :     claims: Option<Claims>,
     308              : 
     309              :     /// The context created for the lifetime of the connection
     310              :     /// services by this PageServerHandler.
     311              :     /// For each query received over the connection,
     312              :     /// `process_query` creates a child context from this one.
     313              :     connection_ctx: RequestContext,
     314              : 
     315              :     cancel: CancellationToken,
     316              : 
     317              :     timeline_handles: TimelineHandles,
     318              : 
     319              :     /// See [`PageServerConf::server_side_batch_timeout`]
     320              :     server_side_batch_timeout: Option<Duration>,
     321              : 
     322              :     server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
     323              : }
     324              : 
     325              : struct Carry {
     326              :     msg: BatchedFeMessage,
     327              :     started_at: Instant,
     328              : }
     329              : 
     330              : struct TimelineHandles {
     331              :     wrapper: TenantManagerWrapper,
     332              :     /// Note on size: the typical size of this map is 1.  The largest size we expect
     333              :     /// to see is the number of shards divided by the number of pageservers (typically < 2),
     334              :     /// or the ratio used when splitting shards (i.e. how many children created from one)
     335              :     /// parent shard, where a "large" number might be ~8.
     336              :     handles: timeline::handle::Cache<TenantManagerTypes>,
     337              : }
     338              : 
     339              : impl TimelineHandles {
     340            0 :     fn new(tenant_manager: Arc<TenantManager>) -> Self {
     341            0 :         Self {
     342            0 :             wrapper: TenantManagerWrapper {
     343            0 :                 tenant_manager,
     344            0 :                 tenant_id: OnceCell::new(),
     345            0 :             },
     346            0 :             handles: Default::default(),
     347            0 :         }
     348            0 :     }
     349            0 :     async fn get(
     350            0 :         &mut self,
     351            0 :         tenant_id: TenantId,
     352            0 :         timeline_id: TimelineId,
     353            0 :         shard_selector: ShardSelector,
     354            0 :     ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
     355            0 :         if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
     356            0 :             return Err(GetActiveTimelineError::Tenant(
     357            0 :                 GetActiveTenantError::SwitchedTenant,
     358            0 :             ));
     359            0 :         }
     360            0 :         self.handles
     361            0 :             .get(timeline_id, shard_selector, &self.wrapper)
     362            0 :             .await
     363            0 :             .map_err(|e| match e {
     364            0 :                 timeline::handle::GetError::TenantManager(e) => e,
     365              :                 timeline::handle::GetError::TimelineGateClosed => {
     366            0 :                     trace!("timeline gate closed");
     367            0 :                     GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
     368              :                 }
     369              :                 timeline::handle::GetError::PerTimelineStateShutDown => {
     370            0 :                     trace!("per-timeline state shut down");
     371            0 :                     GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
     372              :                 }
     373            0 :             })
     374            0 :     }
     375              : 
     376            0 :     fn tenant_id(&self) -> Option<TenantId> {
     377            0 :         self.wrapper.tenant_id.get().copied()
     378            0 :     }
     379              : }
     380              : 
     381              : pub(crate) struct TenantManagerWrapper {
     382              :     tenant_manager: Arc<TenantManager>,
     383              :     // We do not support switching tenant_id on a connection at this point.
     384              :     // We can can add support for this later if needed without changing
     385              :     // the protocol.
     386              :     tenant_id: once_cell::sync::OnceCell<TenantId>,
     387              : }
     388              : 
     389              : #[derive(Debug)]
     390              : pub(crate) struct TenantManagerTypes;
     391              : 
     392              : impl timeline::handle::Types for TenantManagerTypes {
     393              :     type TenantManagerError = GetActiveTimelineError;
     394              :     type TenantManager = TenantManagerWrapper;
     395              :     type Timeline = Arc<Timeline>;
     396              : }
     397              : 
     398              : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
     399            0 :     fn gate(&self) -> &utils::sync::gate::Gate {
     400            0 :         &self.gate
     401            0 :     }
     402              : 
     403            0 :     fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
     404            0 :         Timeline::shard_timeline_id(self)
     405            0 :     }
     406              : 
     407            0 :     fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
     408            0 :         &self.handles
     409            0 :     }
     410              : 
     411            0 :     fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
     412            0 :         Timeline::get_shard_identity(self)
     413            0 :     }
     414              : }
     415              : 
     416              : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
     417            0 :     async fn resolve(
     418            0 :         &self,
     419            0 :         timeline_id: TimelineId,
     420            0 :         shard_selector: ShardSelector,
     421            0 :     ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
     422            0 :         let tenant_id = self.tenant_id.get().expect("we set this in get()");
     423            0 :         let timeout = ACTIVE_TENANT_TIMEOUT;
     424            0 :         let wait_start = Instant::now();
     425            0 :         let deadline = wait_start + timeout;
     426            0 :         let tenant_shard = loop {
     427            0 :             let resolved = self
     428            0 :                 .tenant_manager
     429            0 :                 .resolve_attached_shard(tenant_id, shard_selector);
     430            0 :             match resolved {
     431            0 :                 ShardResolveResult::Found(tenant_shard) => break tenant_shard,
     432              :                 ShardResolveResult::NotFound => {
     433            0 :                     return Err(GetActiveTimelineError::Tenant(
     434            0 :                         GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
     435            0 :                     ));
     436              :                 }
     437            0 :                 ShardResolveResult::InProgress(barrier) => {
     438            0 :                     // We can't authoritatively answer right now: wait for InProgress state
     439            0 :                     // to end, then try again
     440            0 :                     tokio::select! {
     441            0 :                         _  = barrier.wait() => {
     442            0 :                             // The barrier completed: proceed around the loop to try looking up again
     443            0 :                         },
     444            0 :                         _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
     445            0 :                             return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
     446            0 :                                 latest_state: None,
     447            0 :                                 wait_time: timeout,
     448            0 :                             }));
     449              :                         }
     450              :                     }
     451              :                 }
     452              :             };
     453              :         };
     454              : 
     455            0 :         tracing::debug!("Waiting for tenant to enter active state...");
     456            0 :         tenant_shard
     457            0 :             .wait_to_become_active(deadline.duration_since(Instant::now()))
     458            0 :             .await
     459            0 :             .map_err(GetActiveTimelineError::Tenant)?;
     460              : 
     461            0 :         let timeline = tenant_shard
     462            0 :             .get_timeline(timeline_id, true)
     463            0 :             .map_err(GetActiveTimelineError::Timeline)?;
     464            0 :         set_tracing_field_shard_id(&timeline);
     465            0 :         Ok(timeline)
     466            0 :     }
     467              : }
     468              : 
     469            0 : #[derive(thiserror::Error, Debug)]
     470              : enum PageStreamError {
     471              :     /// We encountered an error that should prompt the client to reconnect:
     472              :     /// in practice this means we drop the connection without sending a response.
     473              :     #[error("Reconnect required: {0}")]
     474              :     Reconnect(Cow<'static, str>),
     475              : 
     476              :     /// We were instructed to shutdown while processing the query
     477              :     #[error("Shutting down")]
     478              :     Shutdown,
     479              : 
     480              :     /// Something went wrong reading a page: this likely indicates a pageserver bug
     481              :     #[error("Read error")]
     482              :     Read(#[source] PageReconstructError),
     483              : 
     484              :     /// Ran out of time waiting for an LSN
     485              :     #[error("LSN timeout: {0}")]
     486              :     LsnTimeout(WaitLsnError),
     487              : 
     488              :     /// The entity required to serve the request (tenant or timeline) is not found,
     489              :     /// or is not found in a suitable state to serve a request.
     490              :     #[error("Not found: {0}")]
     491              :     NotFound(Cow<'static, str>),
     492              : 
     493              :     /// Request asked for something that doesn't make sense, like an invalid LSN
     494              :     #[error("Bad request: {0}")]
     495              :     BadRequest(Cow<'static, str>),
     496              : }
     497              : 
     498              : impl From<PageReconstructError> for PageStreamError {
     499            0 :     fn from(value: PageReconstructError) -> Self {
     500            0 :         match value {
     501            0 :             PageReconstructError::Cancelled => Self::Shutdown,
     502            0 :             e => Self::Read(e),
     503              :         }
     504            0 :     }
     505              : }
     506              : 
     507              : impl From<GetActiveTimelineError> for PageStreamError {
     508            0 :     fn from(value: GetActiveTimelineError) -> Self {
     509            0 :         match value {
     510              :             GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
     511              :             | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
     512              :                 TenantState::Stopping { .. },
     513              :             ))
     514            0 :             | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
     515            0 :             GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
     516            0 :             GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
     517              :         }
     518            0 :     }
     519              : }
     520              : 
     521              : impl From<WaitLsnError> for PageStreamError {
     522            0 :     fn from(value: WaitLsnError) -> Self {
     523            0 :         match value {
     524            0 :             e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
     525            0 :             WaitLsnError::Shutdown => Self::Shutdown,
     526            0 :             e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
     527              :         }
     528            0 :     }
     529              : }
     530              : 
     531              : impl From<WaitLsnError> for QueryError {
     532            0 :     fn from(value: WaitLsnError) -> Self {
     533            0 :         match value {
     534            0 :             e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
     535            0 :             WaitLsnError::Shutdown => Self::Shutdown,
     536            0 :             WaitLsnError::BadState { .. } => Self::Reconnect,
     537              :         }
     538            0 :     }
     539              : }
     540              : 
     541              : enum BatchedFeMessage {
     542              :     Exists {
     543              :         span: Span,
     544              :         req: models::PagestreamExistsRequest,
     545              :     },
     546              :     Nblocks {
     547              :         span: Span,
     548              :         req: models::PagestreamNblocksRequest,
     549              :     },
     550              :     GetPage {
     551              :         span: Span,
     552              :         shard: timeline::handle::Handle<TenantManagerTypes>,
     553              :         effective_request_lsn: Lsn,
     554              :         pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
     555              :     },
     556              :     DbSize {
     557              :         span: Span,
     558              :         req: models::PagestreamDbSizeRequest,
     559              :     },
     560              :     GetSlruSegment {
     561              :         span: Span,
     562              :         req: models::PagestreamGetSlruSegmentRequest,
     563              :     },
     564              :     RespondError {
     565              :         span: Span,
     566              :         error: PageStreamError,
     567              :     },
     568              : }
     569              : 
     570              : enum BatchOrEof {
     571              :     /// In the common case, this has one entry.
     572              :     /// At most, it has two entries: the first is the leftover batch, the second is an error.
     573              :     Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>),
     574              :     Eof,
     575              : }
     576              : 
     577              : impl PageServerHandler {
     578            0 :     pub fn new(
     579            0 :         tenant_manager: Arc<TenantManager>,
     580            0 :         auth: Option<Arc<SwappableJwtAuth>>,
     581            0 :         server_side_batch_timeout: Option<Duration>,
     582            0 :         connection_ctx: RequestContext,
     583            0 :         cancel: CancellationToken,
     584            0 :     ) -> Self {
     585            0 :         PageServerHandler {
     586            0 :             auth,
     587            0 :             claims: None,
     588            0 :             connection_ctx,
     589            0 :             timeline_handles: TimelineHandles::new(tenant_manager),
     590            0 :             cancel,
     591            0 :             server_side_batch_timeout,
     592            0 :             server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
     593            0 :         }
     594            0 :     }
     595              : 
     596              :     /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`.  Pass in
     597              :     /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
     598              :     /// cancellation if there aren't any timelines in the cache.
     599              :     ///
     600              :     /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
     601              :     /// timeline cancellation token.
     602            0 :     async fn flush_cancellable<IO>(
     603            0 :         &self,
     604            0 :         pgb: &mut PostgresBackend<IO>,
     605            0 :         cancel: &CancellationToken,
     606            0 :     ) -> Result<(), QueryError>
     607            0 :     where
     608            0 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     609            0 :     {
     610            0 :         tokio::select!(
     611            0 :             flush_r = pgb.flush() => {
     612            0 :                 Ok(flush_r?)
     613              :             },
     614            0 :             _ = cancel.cancelled() => {
     615            0 :                 Err(QueryError::Shutdown)
     616              :             }
     617              :         )
     618            0 :     }
     619              : 
     620            0 :     #[instrument(skip_all, level = tracing::Level::TRACE)]
     621              :     async fn read_batch_from_connection<IO>(
     622              :         &mut self,
     623              :         pgb: &mut PostgresBackend<IO>,
     624              :         tenant_id: &TenantId,
     625              :         timeline_id: &TimelineId,
     626              :         maybe_carry: &mut Option<Carry>,
     627              :         ctx: &RequestContext,
     628              :     ) -> Result<BatchOrEof, QueryError>
     629              :     where
     630              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     631              :     {
     632              :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     633              : 
     634              :         let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
     635              : 
     636              :         loop {
     637              :             // Create a future that will become ready when we need to stop batching.
     638              :             use futures::future::Either;
     639              :             let batching_deadline = match (
     640              :                 &*maybe_carry as &Option<Carry>,
     641              :                 &mut batching_deadline_storage,
     642              :             ) {
     643              :                 (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
     644              :                 (None, Some(_)) => unreachable!(),
     645              :                 (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
     646              :                 (Some(carry), None) => {
     647              :                     match self.server_side_batch_timeout {
     648              :                         None => {
     649              :                             return Ok(BatchOrEof::Batch(smallvec::smallvec![
     650              :                                 maybe_carry
     651              :                                     .take()
     652              :                                     .expect("we already checked it's Some")
     653              :                                     .msg
     654              :                             ]))
     655              :                         }
     656              :                         Some(batch_timeout) => {
     657              :                             // Take into consideration the time the carry spent waiting.
     658              :                             let batch_timeout =
     659              :                                 batch_timeout.saturating_sub(carry.started_at.elapsed());
     660              :                             if batch_timeout.is_zero() {
     661              :                                 // the timer doesn't support restarting with zero duration
     662              :                                 return Ok(BatchOrEof::Batch(smallvec::smallvec![
     663              :                                     maybe_carry
     664              :                                         .take()
     665              :                                         .expect("we already checked it's Some")
     666              :                                         .msg
     667              :                                 ]));
     668              :                             } else {
     669              :                                 self.server_side_batch_timer.restart(batch_timeout);
     670              :                                 batching_deadline_storage = Some(&mut self.server_side_batch_timer);
     671              :                                 Either::Right(
     672              :                                     batching_deadline_storage.as_mut().expect("we just set it"),
     673              :                                 )
     674              :                             }
     675              :                         }
     676              :                     }
     677              :                 }
     678              :             };
     679              :             let msg = tokio::select! {
     680              :                 biased;
     681              :                 _ = self.cancel.cancelled() => {
     682              :                     return Err(QueryError::Shutdown)
     683              :                 }
     684              :                 _ = batching_deadline => {
     685              :                     return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg]));
     686              :                 }
     687              :                 msg = pgb.read_message() => { msg }
     688              :             };
     689              : 
     690              :             let msg_start = Instant::now();
     691              : 
     692              :             // Rest of this loop body is trying to batch `msg` into `batch`.
     693              :             // If we can add msg to batch we continue into the next loop iteration.
     694              :             // If we can't add msg to batch batch, we carry `msg` over to the next call.
     695              : 
     696              :             let copy_data_bytes = match msg? {
     697              :                 Some(FeMessage::CopyData(bytes)) => bytes,
     698              :                 Some(FeMessage::Terminate) => {
     699              :                     return Ok(BatchOrEof::Eof);
     700              :                 }
     701              :                 Some(m) => {
     702              :                     return Err(QueryError::Other(anyhow::anyhow!(
     703              :                         "unexpected message: {m:?} during COPY"
     704              :                     )));
     705              :                 }
     706              :                 None => {
     707              :                     return Ok(BatchOrEof::Eof);
     708              :                 } // client disconnected
     709              :             };
     710              :             trace!("query: {copy_data_bytes:?}");
     711              : 
     712              :             fail::fail_point!("ps::handle-pagerequest-message");
     713              : 
     714              :             // parse request
     715              :             let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
     716              : 
     717              :             let this_msg = match neon_fe_msg {
     718              :                 PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists {
     719              :                     span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn),
     720              :                     req,
     721              :                 },
     722              :                 PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks {
     723              :                     span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn),
     724              :                     req,
     725              :                 },
     726              :                 PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize {
     727              :                     span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn),
     728              :                     req,
     729              :                 },
     730              :                 PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment {
     731              :                     span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn),
     732              :                     req,
     733              :                 },
     734              :                 PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
     735              :                     request_lsn,
     736              :                     not_modified_since,
     737              :                     rel,
     738              :                     blkno,
     739              :                 }) => {
     740              :                     // shard_id is filled in by the handler
     741              :                     let span = tracing::info_span!(
     742              :                         "handle_get_page_at_lsn_request_batched",
     743              :                         %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn,
     744              :                         batch_size = tracing::field::Empty, batch_id = tracing::field::Empty
     745              :                     );
     746              : 
     747              :                     macro_rules! current_batch_and_error {
     748              :                         ($error:expr) => {{
     749              :                             let error = BatchedFeMessage::RespondError {
     750              :                                 span,
     751              :                                 error: $error,
     752              :                             };
     753              :                             let batch_and_error = match maybe_carry.take() {
     754              :                                 Some(carry) => smallvec::smallvec![carry.msg, error],
     755              :                                 None => smallvec::smallvec![error],
     756              :                             };
     757              :                             Ok(BatchOrEof::Batch(batch_and_error))
     758              :                         }};
     759              :                     }
     760              : 
     761              :                     let key = rel_block_to_key(rel, blkno);
     762              :                     let shard = match self
     763              :                         .timeline_handles
     764              :                         .get(*tenant_id, *timeline_id, ShardSelector::Page(key))
     765              :                         .instrument(span.clone())
     766              :                         .await
     767              :                     {
     768              :                         Ok(tl) => tl,
     769              :                         Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
     770              :                             // We already know this tenant exists in general, because we resolved it at
     771              :                             // start of connection.  Getting a NotFound here indicates that the shard containing
     772              :                             // the requested page is not present on this node: the client's knowledge of shard->pageserver
     773              :                             // mapping is out of date.
     774              :                             //
     775              :                             // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
     776              :                             // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
     777              :                             // and talk to a different pageserver.
     778              :                             return current_batch_and_error!(PageStreamError::Reconnect(
     779              :                                 "getpage@lsn request routed to wrong shard".into()
     780              :                             ));
     781              :                         }
     782              :                         Err(e) => {
     783              :                             return current_batch_and_error!(e.into());
     784              :                         }
     785              :                     };
     786              :                     let effective_request_lsn = match Self::wait_or_get_last_lsn(
     787              :                         &shard,
     788              :                         request_lsn,
     789              :                         not_modified_since,
     790              :                         &shard.get_latest_gc_cutoff_lsn(),
     791              :                         ctx,
     792              :                     )
     793              :                     // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
     794              :                     .await
     795              :                     {
     796              :                         Ok(lsn) => lsn,
     797              :                         Err(e) => {
     798              :                             return current_batch_and_error!(e);
     799              :                         }
     800              :                     };
     801              :                     BatchedFeMessage::GetPage {
     802              :                         span,
     803              :                         shard,
     804              :                         effective_request_lsn,
     805              :                         pages: smallvec::smallvec![(rel, blkno)],
     806              :                     }
     807              :                 }
     808              :             };
     809              : 
     810              :             //
     811              :             // batch
     812              :             //
     813              :             match (maybe_carry.as_mut(), this_msg) {
     814              :                 (None, this_msg) => {
     815              :                     *maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
     816              :                 }
     817              :                 (
     818              :                     Some(Carry { msg: BatchedFeMessage::GetPage {
     819              :                         span: _,
     820              :                         shard: accum_shard,
     821              :                         pages: ref mut accum_pages,
     822              :                         effective_request_lsn: accum_lsn,
     823              :                     }, started_at: _}),
     824              :                     BatchedFeMessage::GetPage {
     825              :                         span: _,
     826              :                         shard: this_shard,
     827              :                         pages: this_pages,
     828              :                         effective_request_lsn: this_lsn,
     829              :                     },
     830            0 :                 ) if async {
     831            0 :                     assert_eq!(this_pages.len(), 1);
     832            0 :                     if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
     833            0 :                         trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
     834            0 :                         assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
     835            0 :                         return false;
     836            0 :                     }
     837            0 :                     if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
     838            0 :                         != (this_shard.tenant_shard_id, this_shard.timeline_id)
     839              :                     {
     840            0 :                         trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
     841              :                         // TODO: we _could_ batch & execute each shard seperately (and in parallel).
     842              :                         // But the current logic for keeping responses in order does not support that.
     843            0 :                         return false;
     844            0 :                     }
     845            0 :                     // the vectored get currently only supports a single LSN, so, bounce as soon
     846            0 :                     // as the effective request_lsn changes
     847            0 :                     if *accum_lsn != this_lsn {
     848            0 :                         trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
     849            0 :                         return false;
     850            0 :                     }
     851            0 :                     true
     852            0 :                 }
     853              :                 .await =>
     854              :                 {
     855              :                     // ok to batch
     856              :                     accum_pages.extend(this_pages);
     857              :                 }
     858              :                 (Some(carry), this_msg) => {
     859              :                     // by default, don't continue batching
     860              :                     let carry = std::mem::replace(carry,
     861              :                         Carry {
     862              :                             msg: this_msg,
     863              :                             started_at: msg_start,
     864              :                         });
     865              :                     return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
     866              :                 }
     867              :             }
     868              :         }
     869              :     }
     870              : 
     871              :     /// Pagestream sub-protocol handler.
     872              :     ///
     873              :     /// It is a simple request-response protocol inside a COPYBOTH session.
     874              :     ///
     875              :     /// # Coding Discipline
     876              :     ///
     877              :     /// Coding discipline within this function: all interaction with the `pgb` connection
     878              :     /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
     879              :     /// This is so that we can shutdown page_service quickly.
     880            0 :     #[instrument(skip_all)]
     881              :     async fn handle_pagerequests<IO>(
     882              :         &mut self,
     883              :         pgb: &mut PostgresBackend<IO>,
     884              :         tenant_id: TenantId,
     885              :         timeline_id: TimelineId,
     886              :         _protocol_version: PagestreamProtocolVersion,
     887              :         ctx: RequestContext,
     888              :     ) -> Result<(), QueryError>
     889              :     where
     890              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     891              :     {
     892              :         debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
     893              : 
     894              :         // switch client to COPYBOTH
     895              :         pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
     896              :         tokio::select! {
     897              :             biased;
     898              :             _ = self.cancel.cancelled() => {
     899              :                 return Err(QueryError::Shutdown)
     900              :             }
     901              :             res = pgb.flush() => {
     902              :                 res?;
     903              :             }
     904              :         }
     905              : 
     906              :         let mut carry: Option<Carry> = None;
     907              : 
     908              :         loop {
     909              :             let maybe_batched = self
     910              :                 .read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx)
     911              :                 .await?;
     912              :             let batched = match maybe_batched {
     913              :                 BatchOrEof::Batch(b) => b,
     914              :                 BatchOrEof::Eof => {
     915              :                     break;
     916              :                 }
     917              :             };
     918              : 
     919              :             for batch in batched {
     920              :                 // invoke handler function
     921              :                 let (handler_results, span): (
     922              :                     Vec<Result<PagestreamBeMessage, PageStreamError>>,
     923              :                     _,
     924              :                 ) = match batch {
     925              :                     BatchedFeMessage::Exists { span, req } => {
     926              :                         fail::fail_point!("ps::handle-pagerequest-message::exists");
     927              :                         (
     928              :                             vec![
     929              :                                 self.handle_get_rel_exists_request(
     930              :                                     tenant_id,
     931              :                                     timeline_id,
     932              :                                     &req,
     933              :                                     &ctx,
     934              :                                 )
     935              :                                 .instrument(span.clone())
     936              :                                 .await,
     937              :                             ],
     938              :                             span,
     939              :                         )
     940              :                     }
     941              :                     BatchedFeMessage::Nblocks { span, req } => {
     942              :                         fail::fail_point!("ps::handle-pagerequest-message::nblocks");
     943              :                         (
     944              :                             vec![
     945              :                                 self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
     946              :                                     .instrument(span.clone())
     947              :                                     .await,
     948              :                             ],
     949              :                             span,
     950              :                         )
     951              :                     }
     952              :                     BatchedFeMessage::GetPage {
     953              :                         span,
     954              :                         shard,
     955              :                         effective_request_lsn,
     956              :                         pages,
     957              :                     } => {
     958              :                         fail::fail_point!("ps::handle-pagerequest-message::getpage");
     959              :                         (
     960              :                             {
     961              :                                 let npages = pages.len();
     962              :                                 trace!(npages, "handling getpage request");
     963              :                                 let res = self
     964              :                                     .handle_get_page_at_lsn_request_batched(
     965              :                                         &shard,
     966              :                                         effective_request_lsn,
     967              :                                         pages,
     968              :                                         &ctx,
     969              :                                     )
     970              :                                     .instrument(span.clone())
     971              :                                     .await;
     972              :                                 assert_eq!(res.len(), npages);
     973              :                                 res
     974              :                             },
     975              :                             span,
     976              :                         )
     977              :                     }
     978              :                     BatchedFeMessage::DbSize { span, req } => {
     979              :                         fail::fail_point!("ps::handle-pagerequest-message::dbsize");
     980              :                         (
     981              :                             vec![
     982              :                                 self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
     983              :                                     .instrument(span.clone())
     984              :                                     .await,
     985              :                             ],
     986              :                             span,
     987              :                         )
     988              :                     }
     989              :                     BatchedFeMessage::GetSlruSegment { span, req } => {
     990              :                         fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
     991              :                         (
     992              :                             vec![
     993              :                                 self.handle_get_slru_segment_request(
     994              :                                     tenant_id,
     995              :                                     timeline_id,
     996              :                                     &req,
     997              :                                     &ctx,
     998              :                                 )
     999              :                                 .instrument(span.clone())
    1000              :                                 .await,
    1001              :                             ],
    1002              :                             span,
    1003              :                         )
    1004              :                     }
    1005              :                     BatchedFeMessage::RespondError { span, error } => {
    1006              :                         // We've already decided to respond with an error, so we don't need to
    1007              :                         // call the handler.
    1008              :                         (vec![Err(error)], span)
    1009              :                     }
    1010              :                 };
    1011              : 
    1012              :                 // Map handler result to protocol behavior.
    1013              :                 // Some handler errors cause exit from pagestream protocol.
    1014              :                 // Other handler errors are sent back as an error message and we stay in pagestream protocol.
    1015              :                 for handler_result in handler_results {
    1016              :                     let response_msg = match handler_result {
    1017              :                         Err(e) => match &e {
    1018              :                             PageStreamError::Shutdown => {
    1019              :                                 // If we fail to fulfil a request during shutdown, which may be _because_ of
    1020              :                                 // shutdown, then do not send the error to the client.  Instead just drop the
    1021              :                                 // connection.
    1022            0 :                                 span.in_scope(|| info!("dropping connection due to shutdown"));
    1023              :                                 return Err(QueryError::Shutdown);
    1024              :                             }
    1025              :                             PageStreamError::Reconnect(reason) => {
    1026            0 :                                 span.in_scope(|| info!("handler requested reconnect: {reason}"));
    1027              :                                 return Err(QueryError::Reconnect);
    1028              :                             }
    1029              :                             PageStreamError::Read(_)
    1030              :                             | PageStreamError::LsnTimeout(_)
    1031              :                             | PageStreamError::NotFound(_)
    1032              :                             | PageStreamError::BadRequest(_) => {
    1033              :                                 // print the all details to the log with {:#}, but for the client the
    1034              :                                 // error message is enough.  Do not log if shutting down, as the anyhow::Error
    1035              :                                 // here includes cancellation which is not an error.
    1036              :                                 let full = utils::error::report_compact_sources(&e);
    1037            0 :                                 span.in_scope(|| {
    1038            0 :                                     error!("error reading relation or page version: {full:#}")
    1039            0 :                                 });
    1040              :                                 PagestreamBeMessage::Error(PagestreamErrorResponse {
    1041              :                                     message: e.to_string(),
    1042              :                                 })
    1043              :                             }
    1044              :                         },
    1045              :                         Ok(response_msg) => response_msg,
    1046              :                     };
    1047              : 
    1048              :                     // marshal & transmit response message
    1049              :                     pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
    1050              :                 }
    1051              :                 tokio::select! {
    1052              :                     biased;
    1053              :                     _ = self.cancel.cancelled() => {
    1054              :                         // We were requested to shut down.
    1055              :                         info!("shutdown request received in page handler");
    1056              :                         return Err(QueryError::Shutdown)
    1057              :                     }
    1058              :                     res = pgb.flush() => {
    1059              :                         res?;
    1060              :                     }
    1061              :                 }
    1062              :             }
    1063              :         }
    1064              :         Ok(())
    1065              :     }
    1066              : 
    1067              :     /// Helper function to handle the LSN from client request.
    1068              :     ///
    1069              :     /// Each GetPage (and Exists and Nblocks) request includes information about
    1070              :     /// which version of the page is being requested. The primary compute node
    1071              :     /// will always request the latest page version, by setting 'request_lsn' to
    1072              :     /// the last inserted or flushed WAL position, while a standby will request
    1073              :     /// a version at the LSN that it's currently caught up to.
    1074              :     ///
    1075              :     /// In either case, if the page server hasn't received the WAL up to the
    1076              :     /// requested LSN yet, we will wait for it to arrive. The return value is
    1077              :     /// the LSN that should be used to look up the page versions.
    1078              :     ///
    1079              :     /// In addition to the request LSN, each request carries another LSN,
    1080              :     /// 'not_modified_since', which is a hint to the pageserver that the client
    1081              :     /// knows that the page has not been modified between 'not_modified_since'
    1082              :     /// and the request LSN. This allows skipping the wait, as long as the WAL
    1083              :     /// up to 'not_modified_since' has arrived. If the client doesn't have any
    1084              :     /// information about when the page was modified, it will use
    1085              :     /// not_modified_since == lsn. If the client lies and sends a too low
    1086              :     /// not_modified_hint such that there are in fact later page versions, the
    1087              :     /// behavior is undefined: the pageserver may return any of the page versions
    1088              :     /// or an error.
    1089            0 :     async fn wait_or_get_last_lsn(
    1090            0 :         timeline: &Timeline,
    1091            0 :         request_lsn: Lsn,
    1092            0 :         not_modified_since: Lsn,
    1093            0 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
    1094            0 :         ctx: &RequestContext,
    1095            0 :     ) -> Result<Lsn, PageStreamError> {
    1096            0 :         let last_record_lsn = timeline.get_last_record_lsn();
    1097            0 : 
    1098            0 :         // Sanity check the request
    1099            0 :         if request_lsn < not_modified_since {
    1100            0 :             return Err(PageStreamError::BadRequest(
    1101            0 :                 format!(
    1102            0 :                     "invalid request with request LSN {} and not_modified_since {}",
    1103            0 :                     request_lsn, not_modified_since,
    1104            0 :                 )
    1105            0 :                 .into(),
    1106            0 :             ));
    1107            0 :         }
    1108            0 : 
    1109            0 :         if request_lsn < **latest_gc_cutoff_lsn {
    1110            0 :             let gc_info = &timeline.gc_info.read().unwrap();
    1111            0 :             if !gc_info.leases.contains_key(&request_lsn) {
    1112              :                 // The requested LSN is below gc cutoff and is not guarded by a lease.
    1113              : 
    1114              :                 // Check explicitly for INVALID just to get a less scary error message if the
    1115              :                 // request is obviously bogus
    1116            0 :                 return Err(if request_lsn == Lsn::INVALID {
    1117            0 :                     PageStreamError::BadRequest("invalid LSN(0) in request".into())
    1118              :                 } else {
    1119            0 :                     PageStreamError::BadRequest(format!(
    1120            0 :                         "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
    1121            0 :                         request_lsn, **latest_gc_cutoff_lsn
    1122            0 :                     ).into())
    1123              :                 });
    1124            0 :             }
    1125            0 :         }
    1126              : 
    1127              :         // Wait for WAL up to 'not_modified_since' to arrive, if necessary
    1128            0 :         if not_modified_since > last_record_lsn {
    1129            0 :             timeline
    1130            0 :                 .wait_lsn(
    1131            0 :                     not_modified_since,
    1132            0 :                     crate::tenant::timeline::WaitLsnWaiter::PageService,
    1133            0 :                     ctx,
    1134            0 :                 )
    1135            0 :                 .await?;
    1136              :             // Since we waited for 'not_modified_since' to arrive, that is now the last
    1137              :             // record LSN. (Or close enough for our purposes; the last-record LSN can
    1138              :             // advance immediately after we return anyway)
    1139            0 :             Ok(not_modified_since)
    1140              :         } else {
    1141              :             // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
    1142              :             // here instead. That would give the same result, since we know that there
    1143              :             // haven't been any modifications since 'not_modified_since'. Using an older
    1144              :             // LSN might be faster, because that could allow skipping recent layers when
    1145              :             // finding the page. However, we have historically used 'last_record_lsn', so
    1146              :             // stick to that for now.
    1147            0 :             Ok(std::cmp::min(last_record_lsn, request_lsn))
    1148              :         }
    1149            0 :     }
    1150              : 
    1151              :     /// Handles the lsn lease request.
    1152              :     /// If a lease cannot be obtained, the client will receive NULL.
    1153            0 :     #[instrument(skip_all, fields(shard_id, %lsn))]
    1154              :     async fn handle_make_lsn_lease<IO>(
    1155              :         &mut self,
    1156              :         pgb: &mut PostgresBackend<IO>,
    1157              :         tenant_shard_id: TenantShardId,
    1158              :         timeline_id: TimelineId,
    1159              :         lsn: Lsn,
    1160              :         ctx: &RequestContext,
    1161              :     ) -> Result<(), QueryError>
    1162              :     where
    1163              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
    1164              :     {
    1165              :         let timeline = self
    1166              :             .timeline_handles
    1167              :             .get(
    1168              :                 tenant_shard_id.tenant_id,
    1169              :                 timeline_id,
    1170              :                 ShardSelector::Known(tenant_shard_id.to_index()),
    1171              :             )
    1172              :             .await?;
    1173              :         set_tracing_field_shard_id(&timeline);
    1174              : 
    1175              :         let lease = timeline
    1176              :             .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
    1177            0 :             .inspect_err(|e| {
    1178            0 :                 warn!("{e}");
    1179            0 :             })
    1180              :             .ok();
    1181            0 :         let valid_until_str = lease.map(|l| {
    1182            0 :             l.valid_until
    1183            0 :                 .duration_since(SystemTime::UNIX_EPOCH)
    1184            0 :                 .expect("valid_until is earlier than UNIX_EPOCH")
    1185            0 :                 .as_millis()
    1186            0 :                 .to_string()
    1187            0 :         });
    1188            0 :         let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
    1189              : 
    1190              :         pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
    1191              :             b"valid_until",
    1192              :         )]))?
    1193              :         .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
    1194              : 
    1195              :         Ok(())
    1196              :     }
    1197              : 
    1198            0 :     #[instrument(skip_all, fields(shard_id))]
    1199              :     async fn handle_get_rel_exists_request(
    1200              :         &mut self,
    1201              :         tenant_id: TenantId,
    1202              :         timeline_id: TimelineId,
    1203              :         req: &PagestreamExistsRequest,
    1204              :         ctx: &RequestContext,
    1205              :     ) -> Result<PagestreamBeMessage, PageStreamError> {
    1206              :         let timeline = self
    1207              :             .timeline_handles
    1208              :             .get(tenant_id, timeline_id, ShardSelector::Zero)
    1209              :             .await?;
    1210              :         let _timer = timeline
    1211              :             .query_metrics
    1212              :             .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
    1213              : 
    1214              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
    1215              :         let lsn = Self::wait_or_get_last_lsn(
    1216              :             &timeline,
    1217              :             req.request_lsn,
    1218              :             req.not_modified_since,
    1219              :             &latest_gc_cutoff_lsn,
    1220              :             ctx,
    1221              :         )
    1222              :         .await?;
    1223              : 
    1224              :         let exists = timeline
    1225              :             .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
    1226              :             .await?;
    1227              : 
    1228              :         Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
    1229              :             exists,
    1230              :         }))
    1231              :     }
    1232              : 
    1233            0 :     #[instrument(skip_all, fields(shard_id))]
    1234              :     async fn handle_get_nblocks_request(
    1235              :         &mut self,
    1236              :         tenant_id: TenantId,
    1237              :         timeline_id: TimelineId,
    1238              :         req: &PagestreamNblocksRequest,
    1239              :         ctx: &RequestContext,
    1240              :     ) -> Result<PagestreamBeMessage, PageStreamError> {
    1241              :         let timeline = self
    1242              :             .timeline_handles
    1243              :             .get(tenant_id, timeline_id, ShardSelector::Zero)
    1244              :             .await?;
    1245              : 
    1246              :         let _timer = timeline
    1247              :             .query_metrics
    1248              :             .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
    1249              : 
    1250              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
    1251              :         let lsn = Self::wait_or_get_last_lsn(
    1252              :             &timeline,
    1253              :             req.request_lsn,
    1254              :             req.not_modified_since,
    1255              :             &latest_gc_cutoff_lsn,
    1256              :             ctx,
    1257              :         )
    1258              :         .await?;
    1259              : 
    1260              :         let n_blocks = timeline
    1261              :             .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
    1262              :             .await?;
    1263              : 
    1264              :         Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
    1265              :             n_blocks,
    1266              :         }))
    1267              :     }
    1268              : 
    1269            0 :     #[instrument(skip_all, fields(shard_id))]
    1270              :     async fn handle_db_size_request(
    1271              :         &mut self,
    1272              :         tenant_id: TenantId,
    1273              :         timeline_id: TimelineId,
    1274              :         req: &PagestreamDbSizeRequest,
    1275              :         ctx: &RequestContext,
    1276              :     ) -> Result<PagestreamBeMessage, PageStreamError> {
    1277              :         let timeline = self
    1278              :             .timeline_handles
    1279              :             .get(tenant_id, timeline_id, ShardSelector::Zero)
    1280              :             .await?;
    1281              : 
    1282              :         let _timer = timeline
    1283              :             .query_metrics
    1284              :             .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
    1285              : 
    1286              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
    1287              :         let lsn = Self::wait_or_get_last_lsn(
    1288              :             &timeline,
    1289              :             req.request_lsn,
    1290              :             req.not_modified_since,
    1291              :             &latest_gc_cutoff_lsn,
    1292              :             ctx,
    1293              :         )
    1294              :         .await?;
    1295              : 
    1296              :         let total_blocks = timeline
    1297              :             .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
    1298              :             .await?;
    1299              :         let db_size = total_blocks as i64 * BLCKSZ as i64;
    1300              : 
    1301              :         Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
    1302              :             db_size,
    1303              :         }))
    1304              :     }
    1305              : 
    1306            0 :     #[instrument(skip_all)]
    1307              :     async fn handle_get_page_at_lsn_request_batched(
    1308              :         &mut self,
    1309              :         timeline: &Timeline,
    1310              :         effective_lsn: Lsn,
    1311              :         pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
    1312              :         ctx: &RequestContext,
    1313              :     ) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
    1314              :         debug_assert_current_span_has_tenant_and_timeline_id();
    1315              :         let _timer = timeline.query_metrics.start_timer_many(
    1316              :             metrics::SmgrQueryType::GetPageAtLsn,
    1317              :             pages.len(),
    1318              :             ctx,
    1319              :         );
    1320              : 
    1321              :         let pages = timeline
    1322              :             .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
    1323              :             .await;
    1324              : 
    1325            0 :         Vec::from_iter(pages.into_iter().map(|page| {
    1326            0 :             page.map(|page| {
    1327            0 :                 PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
    1328            0 :             })
    1329            0 :             .map_err(PageStreamError::from)
    1330            0 :         }))
    1331              :     }
    1332              : 
    1333            0 :     #[instrument(skip_all, fields(shard_id))]
    1334              :     async fn handle_get_slru_segment_request(
    1335              :         &mut self,
    1336              :         tenant_id: TenantId,
    1337              :         timeline_id: TimelineId,
    1338              :         req: &PagestreamGetSlruSegmentRequest,
    1339              :         ctx: &RequestContext,
    1340              :     ) -> Result<PagestreamBeMessage, PageStreamError> {
    1341              :         let timeline = self
    1342              :             .timeline_handles
    1343              :             .get(tenant_id, timeline_id, ShardSelector::Zero)
    1344              :             .await?;
    1345              : 
    1346              :         let _timer = timeline
    1347              :             .query_metrics
    1348              :             .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
    1349              : 
    1350              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
    1351              :         let lsn = Self::wait_or_get_last_lsn(
    1352              :             &timeline,
    1353              :             req.request_lsn,
    1354              :             req.not_modified_since,
    1355              :             &latest_gc_cutoff_lsn,
    1356              :             ctx,
    1357              :         )
    1358              :         .await?;
    1359              : 
    1360              :         let kind = SlruKind::from_repr(req.kind)
    1361              :             .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
    1362              :         let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
    1363              : 
    1364              :         Ok(PagestreamBeMessage::GetSlruSegment(
    1365              :             PagestreamGetSlruSegmentResponse { segment },
    1366              :         ))
    1367              :     }
    1368              : 
    1369              :     /// Note on "fullbackup":
    1370              :     /// Full basebackups should only be used for debugging purposes.
    1371              :     /// Originally, it was introduced to enable breaking storage format changes,
    1372              :     /// but that is not applicable anymore.
    1373              :     ///
    1374              :     /// # Coding Discipline
    1375              :     ///
    1376              :     /// Coding discipline within this function: all interaction with the `pgb` connection
    1377              :     /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
    1378              :     /// This is so that we can shutdown page_service quickly.
    1379              :     ///
    1380              :     /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
    1381              :     /// to connection cancellation.
    1382              :     #[allow(clippy::too_many_arguments)]
    1383            0 :     #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
    1384              :     async fn handle_basebackup_request<IO>(
    1385              :         &mut self,
    1386              :         pgb: &mut PostgresBackend<IO>,
    1387              :         tenant_id: TenantId,
    1388              :         timeline_id: TimelineId,
    1389              :         lsn: Option<Lsn>,
    1390              :         prev_lsn: Option<Lsn>,
    1391              :         full_backup: bool,
    1392              :         gzip: bool,
    1393              :         replica: bool,
    1394              :         ctx: &RequestContext,
    1395              :     ) -> Result<(), QueryError>
    1396              :     where
    1397              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
    1398              :     {
    1399            0 :         fn map_basebackup_error(err: BasebackupError) -> QueryError {
    1400            0 :             match err {
    1401            0 :                 BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
    1402            0 :                 BasebackupError::Server(e) => QueryError::Other(e),
    1403              :             }
    1404            0 :         }
    1405              : 
    1406              :         let started = std::time::Instant::now();
    1407              : 
    1408              :         let timeline = self
    1409              :             .timeline_handles
    1410              :             .get(tenant_id, timeline_id, ShardSelector::Zero)
    1411              :             .await?;
    1412              : 
    1413              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
    1414              :         if let Some(lsn) = lsn {
    1415              :             // Backup was requested at a particular LSN. Wait for it to arrive.
    1416              :             info!("waiting for {}", lsn);
    1417              :             timeline
    1418              :                 .wait_lsn(
    1419              :                     lsn,
    1420              :                     crate::tenant::timeline::WaitLsnWaiter::PageService,
    1421              :                     ctx,
    1422              :                 )
    1423              :                 .await?;
    1424              :             timeline
    1425              :                 .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
    1426              :                 .context("invalid basebackup lsn")?;
    1427              :         }
    1428              : 
    1429              :         let lsn_awaited_after = started.elapsed();
    1430              : 
    1431              :         // switch client to COPYOUT
    1432              :         pgb.write_message_noflush(&BeMessage::CopyOutResponse)
    1433              :             .map_err(QueryError::Disconnected)?;
    1434              :         self.flush_cancellable(pgb, &self.cancel).await?;
    1435              : 
    1436              :         // Send a tarball of the latest layer on the timeline. Compress if not
    1437              :         // fullbackup. TODO Compress in that case too (tests need to be updated)
    1438              :         if full_backup {
    1439              :             let mut writer = pgb.copyout_writer();
    1440              :             basebackup::send_basebackup_tarball(
    1441              :                 &mut writer,
    1442              :                 &timeline,
    1443              :                 lsn,
    1444              :                 prev_lsn,
    1445              :                 full_backup,
    1446              :                 replica,
    1447              :                 ctx,
    1448              :             )
    1449              :             .await
    1450              :             .map_err(map_basebackup_error)?;
    1451              :         } else {
    1452              :             let mut writer = BufWriter::new(pgb.copyout_writer());
    1453              :             if gzip {
    1454              :                 let mut encoder = GzipEncoder::with_quality(
    1455              :                     &mut writer,
    1456              :                     // NOTE using fast compression because it's on the critical path
    1457              :                     //      for compute startup. For an empty database, we get
    1458              :                     //      <100KB with this method. The Level::Best compression method
    1459              :                     //      gives us <20KB, but maybe we should add basebackup caching
    1460              :                     //      on compute shutdown first.
    1461              :                     async_compression::Level::Fastest,
    1462              :                 );
    1463              :                 basebackup::send_basebackup_tarball(
    1464              :                     &mut encoder,
    1465              :                     &timeline,
    1466              :                     lsn,
    1467              :                     prev_lsn,
    1468              :                     full_backup,
    1469              :                     replica,
    1470              :                     ctx,
    1471              :                 )
    1472              :                 .await
    1473              :                 .map_err(map_basebackup_error)?;
    1474              :                 // shutdown the encoder to ensure the gzip footer is written
    1475              :                 encoder
    1476              :                     .shutdown()
    1477              :                     .await
    1478            0 :                     .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
    1479              :             } else {
    1480              :                 basebackup::send_basebackup_tarball(
    1481              :                     &mut writer,
    1482              :                     &timeline,
    1483              :                     lsn,
    1484              :                     prev_lsn,
    1485              :                     full_backup,
    1486              :                     replica,
    1487              :                     ctx,
    1488              :                 )
    1489              :                 .await
    1490              :                 .map_err(map_basebackup_error)?;
    1491              :             }
    1492              :             writer
    1493              :                 .flush()
    1494              :                 .await
    1495            0 :                 .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
    1496              :         }
    1497              : 
    1498              :         pgb.write_message_noflush(&BeMessage::CopyDone)
    1499              :             .map_err(QueryError::Disconnected)?;
    1500              :         self.flush_cancellable(pgb, &timeline.cancel).await?;
    1501              : 
    1502              :         let basebackup_after = started
    1503              :             .elapsed()
    1504              :             .checked_sub(lsn_awaited_after)
    1505              :             .unwrap_or(Duration::ZERO);
    1506              : 
    1507              :         info!(
    1508              :             lsn_await_millis = lsn_awaited_after.as_millis(),
    1509              :             basebackup_millis = basebackup_after.as_millis(),
    1510              :             "basebackup complete"
    1511              :         );
    1512              : 
    1513              :         Ok(())
    1514              :     }
    1515              : 
    1516              :     // when accessing management api supply None as an argument
    1517              :     // when using to authorize tenant pass corresponding tenant id
    1518            0 :     fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
    1519            0 :         if self.auth.is_none() {
    1520              :             // auth is set to Trust, nothing to check so just return ok
    1521            0 :             return Ok(());
    1522            0 :         }
    1523            0 :         // auth is some, just checked above, when auth is some
    1524            0 :         // then claims are always present because of checks during connection init
    1525            0 :         // so this expect won't trigger
    1526            0 :         let claims = self
    1527            0 :             .claims
    1528            0 :             .as_ref()
    1529            0 :             .expect("claims presence already checked");
    1530            0 :         check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
    1531            0 :     }
    1532              : }
    1533              : 
    1534              : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
    1535              : #[derive(Debug, Clone, Eq, PartialEq)]
    1536              : struct BaseBackupCmd {
    1537              :     tenant_id: TenantId,
    1538              :     timeline_id: TimelineId,
    1539              :     lsn: Option<Lsn>,
    1540              :     gzip: bool,
    1541              :     replica: bool,
    1542              : }
    1543              : 
    1544              : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
    1545              : #[derive(Debug, Clone, Eq, PartialEq)]
    1546              : struct FullBackupCmd {
    1547              :     tenant_id: TenantId,
    1548              :     timeline_id: TimelineId,
    1549              :     lsn: Option<Lsn>,
    1550              :     prev_lsn: Option<Lsn>,
    1551              : }
    1552              : 
    1553              : /// `pagestream_v2 tenant timeline`
    1554              : #[derive(Debug, Clone, Eq, PartialEq)]
    1555              : struct PageStreamCmd {
    1556              :     tenant_id: TenantId,
    1557              :     timeline_id: TimelineId,
    1558              : }
    1559              : 
    1560              : /// `lease lsn tenant timeline lsn`
    1561              : #[derive(Debug, Clone, Eq, PartialEq)]
    1562              : struct LeaseLsnCmd {
    1563              :     tenant_shard_id: TenantShardId,
    1564              :     timeline_id: TimelineId,
    1565              :     lsn: Lsn,
    1566              : }
    1567              : 
    1568              : #[derive(Debug, Clone, Eq, PartialEq)]
    1569              : enum PageServiceCmd {
    1570              :     Set,
    1571              :     PageStream(PageStreamCmd),
    1572              :     BaseBackup(BaseBackupCmd),
    1573              :     FullBackup(FullBackupCmd),
    1574              :     LeaseLsn(LeaseLsnCmd),
    1575              : }
    1576              : 
    1577              : impl PageStreamCmd {
    1578            6 :     fn parse(query: &str) -> anyhow::Result<Self> {
    1579            6 :         let parameters = query.split_whitespace().collect_vec();
    1580            6 :         if parameters.len() != 2 {
    1581            2 :             bail!(
    1582            2 :                 "invalid number of parameters for pagestream command: {}",
    1583            2 :                 query
    1584            2 :             );
    1585            4 :         }
    1586            4 :         let tenant_id = TenantId::from_str(parameters[0])
    1587            4 :             .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
    1588            2 :         let timeline_id = TimelineId::from_str(parameters[1])
    1589            2 :             .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
    1590            2 :         Ok(Self {
    1591            2 :             tenant_id,
    1592            2 :             timeline_id,
    1593            2 :         })
    1594            6 :     }
    1595              : }
    1596              : 
    1597              : impl FullBackupCmd {
    1598            4 :     fn parse(query: &str) -> anyhow::Result<Self> {
    1599            4 :         let parameters = query.split_whitespace().collect_vec();
    1600            4 :         if parameters.len() < 2 || parameters.len() > 4 {
    1601            0 :             bail!(
    1602            0 :                 "invalid number of parameters for basebackup command: {}",
    1603            0 :                 query
    1604            0 :             );
    1605            4 :         }
    1606            4 :         let tenant_id = TenantId::from_str(parameters[0])
    1607            4 :             .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
    1608            4 :         let timeline_id = TimelineId::from_str(parameters[1])
    1609            4 :             .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
    1610              :         // The caller is responsible for providing correct lsn and prev_lsn.
    1611            4 :         let lsn = if let Some(lsn_str) = parameters.get(2) {
    1612              :             Some(
    1613            2 :                 Lsn::from_str(lsn_str)
    1614            2 :                     .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
    1615              :             )
    1616              :         } else {
    1617            2 :             None
    1618              :         };
    1619            4 :         let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
    1620              :             Some(
    1621            2 :                 Lsn::from_str(prev_lsn_str)
    1622            2 :                     .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
    1623              :             )
    1624              :         } else {
    1625            2 :             None
    1626              :         };
    1627            4 :         Ok(Self {
    1628            4 :             tenant_id,
    1629            4 :             timeline_id,
    1630            4 :             lsn,
    1631            4 :             prev_lsn,
    1632            4 :         })
    1633            4 :     }
    1634              : }
    1635              : 
    1636              : impl BaseBackupCmd {
    1637           18 :     fn parse(query: &str) -> anyhow::Result<Self> {
    1638           18 :         let parameters = query.split_whitespace().collect_vec();
    1639           18 :         if parameters.len() < 2 {
    1640            0 :             bail!(
    1641            0 :                 "invalid number of parameters for basebackup command: {}",
    1642            0 :                 query
    1643            0 :             );
    1644           18 :         }
    1645           18 :         let tenant_id = TenantId::from_str(parameters[0])
    1646           18 :             .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
    1647           18 :         let timeline_id = TimelineId::from_str(parameters[1])
    1648           18 :             .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
    1649              :         let lsn;
    1650              :         let flags_parse_from;
    1651           18 :         if let Some(maybe_lsn) = parameters.get(2) {
    1652           16 :             if *maybe_lsn == "latest" {
    1653            2 :                 lsn = None;
    1654            2 :                 flags_parse_from = 3;
    1655           14 :             } else if maybe_lsn.starts_with("--") {
    1656           10 :                 lsn = None;
    1657           10 :                 flags_parse_from = 2;
    1658           10 :             } else {
    1659              :                 lsn = Some(
    1660            4 :                     Lsn::from_str(maybe_lsn)
    1661            4 :                         .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
    1662              :                 );
    1663            4 :                 flags_parse_from = 3;
    1664              :             }
    1665            2 :         } else {
    1666            2 :             lsn = None;
    1667            2 :             flags_parse_from = 2;
    1668            2 :         }
    1669              : 
    1670           18 :         let mut gzip = false;
    1671           18 :         let mut replica = false;
    1672              : 
    1673           22 :         for &param in &parameters[flags_parse_from..] {
    1674           22 :             match param {
    1675           22 :                 "--gzip" => {
    1676           14 :                     if gzip {
    1677            2 :                         bail!("duplicate parameter for basebackup command: {param}")
    1678           12 :                     }
    1679           12 :                     gzip = true
    1680              :                 }
    1681            8 :                 "--replica" => {
    1682            4 :                     if replica {
    1683            0 :                         bail!("duplicate parameter for basebackup command: {param}")
    1684            4 :                     }
    1685            4 :                     replica = true
    1686              :                 }
    1687            4 :                 _ => bail!("invalid parameter for basebackup command: {param}"),
    1688              :             }
    1689              :         }
    1690           12 :         Ok(Self {
    1691           12 :             tenant_id,
    1692           12 :             timeline_id,
    1693           12 :             lsn,
    1694           12 :             gzip,
    1695           12 :             replica,
    1696           12 :         })
    1697           18 :     }
    1698              : }
    1699              : 
    1700              : impl LeaseLsnCmd {
    1701            4 :     fn parse(query: &str) -> anyhow::Result<Self> {
    1702            4 :         let parameters = query.split_whitespace().collect_vec();
    1703            4 :         if parameters.len() != 3 {
    1704            0 :             bail!(
    1705            0 :                 "invalid number of parameters for lease lsn command: {}",
    1706            0 :                 query
    1707            0 :             );
    1708            4 :         }
    1709            4 :         let tenant_shard_id = TenantShardId::from_str(parameters[0])
    1710            4 :             .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
    1711            4 :         let timeline_id = TimelineId::from_str(parameters[1])
    1712            4 :             .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
    1713            4 :         let lsn = Lsn::from_str(parameters[2])
    1714            4 :             .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
    1715            4 :         Ok(Self {
    1716            4 :             tenant_shard_id,
    1717            4 :             timeline_id,
    1718            4 :             lsn,
    1719            4 :         })
    1720            4 :     }
    1721              : }
    1722              : 
    1723              : impl PageServiceCmd {
    1724           42 :     fn parse(query: &str) -> anyhow::Result<Self> {
    1725           42 :         let query = query.trim();
    1726           42 :         let Some((cmd, other)) = query.split_once(' ') else {
    1727            4 :             bail!("cannot parse query: {query}")
    1728              :         };
    1729           38 :         match cmd.to_ascii_lowercase().as_str() {
    1730           38 :             "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)),
    1731           32 :             "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
    1732           14 :             "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
    1733           10 :             "lease" => {
    1734            6 :                 let Some((cmd2, other)) = other.split_once(' ') else {
    1735            0 :                     bail!("invalid lease command: {cmd}");
    1736              :                 };
    1737            6 :                 let cmd2 = cmd2.to_ascii_lowercase();
    1738            6 :                 if cmd2 == "lsn" {
    1739            4 :                     Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
    1740              :                 } else {
    1741            2 :                     bail!("invalid lease command: {cmd}");
    1742              :                 }
    1743              :             }
    1744            4 :             "set" => Ok(Self::Set),
    1745            0 :             _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
    1746              :         }
    1747           42 :     }
    1748              : }
    1749              : 
    1750              : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
    1751              : where
    1752              :     IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
    1753              : {
    1754            0 :     fn check_auth_jwt(
    1755            0 :         &mut self,
    1756            0 :         _pgb: &mut PostgresBackend<IO>,
    1757            0 :         jwt_response: &[u8],
    1758            0 :     ) -> Result<(), QueryError> {
    1759              :         // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
    1760              :         // which requires auth to be present
    1761            0 :         let data = self
    1762            0 :             .auth
    1763            0 :             .as_ref()
    1764            0 :             .unwrap()
    1765            0 :             .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
    1766            0 :             .map_err(|e| QueryError::Unauthorized(e.0))?;
    1767              : 
    1768            0 :         if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
    1769            0 :             return Err(QueryError::Unauthorized(
    1770            0 :                 "jwt token scope is Tenant, but tenant id is missing".into(),
    1771            0 :             ));
    1772            0 :         }
    1773            0 : 
    1774            0 :         debug!(
    1775            0 :             "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
    1776              :             data.claims.scope, data.claims.tenant_id,
    1777              :         );
    1778              : 
    1779            0 :         self.claims = Some(data.claims);
    1780            0 :         Ok(())
    1781            0 :     }
    1782              : 
    1783            0 :     fn startup(
    1784            0 :         &mut self,
    1785            0 :         _pgb: &mut PostgresBackend<IO>,
    1786            0 :         _sm: &FeStartupPacket,
    1787            0 :     ) -> Result<(), QueryError> {
    1788            0 :         fail::fail_point!("ps::connection-start::startup-packet");
    1789            0 :         Ok(())
    1790            0 :     }
    1791              : 
    1792            0 :     #[instrument(skip_all, fields(tenant_id, timeline_id))]
    1793              :     async fn process_query(
    1794              :         &mut self,
    1795              :         pgb: &mut PostgresBackend<IO>,
    1796              :         query_string: &str,
    1797              :     ) -> Result<(), QueryError> {
    1798            0 :         fail::fail_point!("simulated-bad-compute-connection", |_| {
    1799            0 :             info!("Hit failpoint for bad connection");
    1800            0 :             Err(QueryError::SimulatedConnectionError)
    1801            0 :         });
    1802              : 
    1803              :         fail::fail_point!("ps::connection-start::process-query");
    1804              : 
    1805              :         let ctx = self.connection_ctx.attached_child();
    1806              :         debug!("process query {query_string}");
    1807              :         let query = PageServiceCmd::parse(query_string)?;
    1808              :         match query {
    1809              :             PageServiceCmd::PageStream(PageStreamCmd {
    1810              :                 tenant_id,
    1811              :                 timeline_id,
    1812              :             }) => {
    1813              :                 tracing::Span::current()
    1814              :                     .record("tenant_id", field::display(tenant_id))
    1815              :                     .record("timeline_id", field::display(timeline_id));
    1816              : 
    1817              :                 self.check_permission(Some(tenant_id))?;
    1818              : 
    1819              :                 COMPUTE_COMMANDS_COUNTERS
    1820              :                     .for_command(ComputeCommandKind::PageStreamV2)
    1821              :                     .inc();
    1822              : 
    1823              :                 self.handle_pagerequests(
    1824              :                     pgb,
    1825              :                     tenant_id,
    1826              :                     timeline_id,
    1827              :                     PagestreamProtocolVersion::V2,
    1828              :                     ctx,
    1829              :                 )
    1830              :                 .await?;
    1831              :             }
    1832              :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    1833              :                 tenant_id,
    1834              :                 timeline_id,
    1835              :                 lsn,
    1836              :                 gzip,
    1837              :                 replica,
    1838              :             }) => {
    1839              :                 tracing::Span::current()
    1840              :                     .record("tenant_id", field::display(tenant_id))
    1841              :                     .record("timeline_id", field::display(timeline_id));
    1842              : 
    1843              :                 self.check_permission(Some(tenant_id))?;
    1844              : 
    1845              :                 COMPUTE_COMMANDS_COUNTERS
    1846              :                     .for_command(ComputeCommandKind::Basebackup)
    1847              :                     .inc();
    1848              :                 let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
    1849            0 :                 let res = async {
    1850            0 :                     self.handle_basebackup_request(
    1851            0 :                         pgb,
    1852            0 :                         tenant_id,
    1853            0 :                         timeline_id,
    1854            0 :                         lsn,
    1855            0 :                         None,
    1856            0 :                         false,
    1857            0 :                         gzip,
    1858            0 :                         replica,
    1859            0 :                         &ctx,
    1860            0 :                     )
    1861            0 :                     .await?;
    1862            0 :                     pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1863            0 :                     Result::<(), QueryError>::Ok(())
    1864            0 :                 }
    1865              :                 .await;
    1866              :                 metric_recording.observe(&res);
    1867              :                 res?;
    1868              :             }
    1869              :             // same as basebackup, but result includes relational data as well
    1870              :             PageServiceCmd::FullBackup(FullBackupCmd {
    1871              :                 tenant_id,
    1872              :                 timeline_id,
    1873              :                 lsn,
    1874              :                 prev_lsn,
    1875              :             }) => {
    1876              :                 tracing::Span::current()
    1877              :                     .record("tenant_id", field::display(tenant_id))
    1878              :                     .record("timeline_id", field::display(timeline_id));
    1879              : 
    1880              :                 self.check_permission(Some(tenant_id))?;
    1881              : 
    1882              :                 COMPUTE_COMMANDS_COUNTERS
    1883              :                     .for_command(ComputeCommandKind::Fullbackup)
    1884              :                     .inc();
    1885              : 
    1886              :                 // Check that the timeline exists
    1887              :                 self.handle_basebackup_request(
    1888              :                     pgb,
    1889              :                     tenant_id,
    1890              :                     timeline_id,
    1891              :                     lsn,
    1892              :                     prev_lsn,
    1893              :                     true,
    1894              :                     false,
    1895              :                     false,
    1896              :                     &ctx,
    1897              :                 )
    1898              :                 .await?;
    1899              :                 pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1900              :             }
    1901              :             PageServiceCmd::Set => {
    1902              :                 // important because psycopg2 executes "SET datestyle TO 'ISO'"
    1903              :                 // on connect
    1904              :                 pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1905              :             }
    1906              :             PageServiceCmd::LeaseLsn(LeaseLsnCmd {
    1907              :                 tenant_shard_id,
    1908              :                 timeline_id,
    1909              :                 lsn,
    1910              :             }) => {
    1911              :                 tracing::Span::current()
    1912              :                     .record("tenant_id", field::display(tenant_shard_id))
    1913              :                     .record("timeline_id", field::display(timeline_id));
    1914              : 
    1915              :                 self.check_permission(Some(tenant_shard_id.tenant_id))?;
    1916              : 
    1917              :                 COMPUTE_COMMANDS_COUNTERS
    1918              :                     .for_command(ComputeCommandKind::LeaseLsn)
    1919              :                     .inc();
    1920              : 
    1921              :                 match self
    1922              :                     .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
    1923              :                     .await
    1924              :                 {
    1925              :                     Ok(()) => {
    1926              :                         pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
    1927              :                     }
    1928              :                     Err(e) => {
    1929              :                         error!("error obtaining lsn lease for {lsn}: {e:?}");
    1930              :                         pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1931              :                             &e.to_string(),
    1932              :                             Some(e.pg_error_code()),
    1933              :                         ))?
    1934              :                     }
    1935              :                 };
    1936              :             }
    1937              :         }
    1938              : 
    1939              :         Ok(())
    1940              :     }
    1941              : }
    1942              : 
    1943              : impl From<GetActiveTenantError> for QueryError {
    1944            0 :     fn from(e: GetActiveTenantError) -> Self {
    1945            0 :         match e {
    1946            0 :             GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
    1947            0 :                 ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
    1948            0 :             ),
    1949              :             GetActiveTenantError::Cancelled
    1950              :             | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
    1951            0 :                 QueryError::Shutdown
    1952              :             }
    1953            0 :             e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
    1954            0 :             e => QueryError::Other(anyhow::anyhow!(e)),
    1955              :         }
    1956            0 :     }
    1957              : }
    1958              : 
    1959            0 : #[derive(Debug, thiserror::Error)]
    1960              : pub(crate) enum GetActiveTimelineError {
    1961              :     #[error(transparent)]
    1962              :     Tenant(GetActiveTenantError),
    1963              :     #[error(transparent)]
    1964              :     Timeline(#[from] GetTimelineError),
    1965              : }
    1966              : 
    1967              : impl From<GetActiveTimelineError> for QueryError {
    1968            0 :     fn from(e: GetActiveTimelineError) -> Self {
    1969            0 :         match e {
    1970            0 :             GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
    1971            0 :             GetActiveTimelineError::Tenant(e) => e.into(),
    1972            0 :             GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
    1973              :         }
    1974            0 :     }
    1975              : }
    1976              : 
    1977            0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
    1978            0 :     debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
    1979            0 :     tracing::Span::current().record(
    1980            0 :         "shard_id",
    1981            0 :         tracing::field::display(timeline.tenant_shard_id.shard_slug()),
    1982            0 :     );
    1983            0 :     debug_assert_current_span_has_tenant_and_timeline_id();
    1984            0 : }
    1985              : 
    1986              : struct WaitedForLsn(Lsn);
    1987              : impl From<WaitedForLsn> for Lsn {
    1988            0 :     fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
    1989            0 :         lsn
    1990            0 :     }
    1991              : }
    1992              : 
    1993              : #[cfg(test)]
    1994              : mod tests {
    1995              :     use utils::shard::ShardCount;
    1996              : 
    1997              :     use super::*;
    1998              : 
    1999              :     #[test]
    2000            2 :     fn pageservice_cmd_parse() {
    2001            2 :         let tenant_id = TenantId::generate();
    2002            2 :         let timeline_id = TimelineId::generate();
    2003            2 :         let cmd =
    2004            2 :             PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
    2005            2 :         assert_eq!(
    2006            2 :             cmd,
    2007            2 :             PageServiceCmd::PageStream(PageStreamCmd {
    2008            2 :                 tenant_id,
    2009            2 :                 timeline_id
    2010            2 :             })
    2011            2 :         );
    2012            2 :         let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
    2013            2 :         assert_eq!(
    2014            2 :             cmd,
    2015            2 :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    2016            2 :                 tenant_id,
    2017            2 :                 timeline_id,
    2018            2 :                 lsn: None,
    2019            2 :                 gzip: false,
    2020            2 :                 replica: false
    2021            2 :             })
    2022            2 :         );
    2023            2 :         let cmd =
    2024            2 :             PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
    2025            2 :         assert_eq!(
    2026            2 :             cmd,
    2027            2 :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    2028            2 :                 tenant_id,
    2029            2 :                 timeline_id,
    2030            2 :                 lsn: None,
    2031            2 :                 gzip: true,
    2032            2 :                 replica: false
    2033            2 :             })
    2034            2 :         );
    2035            2 :         let cmd =
    2036            2 :             PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
    2037            2 :         assert_eq!(
    2038            2 :             cmd,
    2039            2 :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    2040            2 :                 tenant_id,
    2041            2 :                 timeline_id,
    2042            2 :                 lsn: None,
    2043            2 :                 gzip: false,
    2044            2 :                 replica: false
    2045            2 :             })
    2046            2 :         );
    2047            2 :         let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
    2048            2 :             .unwrap();
    2049            2 :         assert_eq!(
    2050            2 :             cmd,
    2051            2 :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    2052            2 :                 tenant_id,
    2053            2 :                 timeline_id,
    2054            2 :                 lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
    2055            2 :                 gzip: false,
    2056            2 :                 replica: false
    2057            2 :             })
    2058            2 :         );
    2059            2 :         let cmd = PageServiceCmd::parse(&format!(
    2060            2 :             "basebackup {tenant_id} {timeline_id} --replica --gzip"
    2061            2 :         ))
    2062            2 :         .unwrap();
    2063            2 :         assert_eq!(
    2064            2 :             cmd,
    2065            2 :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    2066            2 :                 tenant_id,
    2067            2 :                 timeline_id,
    2068            2 :                 lsn: None,
    2069            2 :                 gzip: true,
    2070            2 :                 replica: true
    2071            2 :             })
    2072            2 :         );
    2073            2 :         let cmd = PageServiceCmd::parse(&format!(
    2074            2 :             "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
    2075            2 :         ))
    2076            2 :         .unwrap();
    2077            2 :         assert_eq!(
    2078            2 :             cmd,
    2079            2 :             PageServiceCmd::BaseBackup(BaseBackupCmd {
    2080            2 :                 tenant_id,
    2081            2 :                 timeline_id,
    2082            2 :                 lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
    2083            2 :                 gzip: true,
    2084            2 :                 replica: true
    2085            2 :             })
    2086            2 :         );
    2087            2 :         let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
    2088            2 :         assert_eq!(
    2089            2 :             cmd,
    2090            2 :             PageServiceCmd::FullBackup(FullBackupCmd {
    2091            2 :                 tenant_id,
    2092            2 :                 timeline_id,
    2093            2 :                 lsn: None,
    2094            2 :                 prev_lsn: None
    2095            2 :             })
    2096            2 :         );
    2097            2 :         let cmd = PageServiceCmd::parse(&format!(
    2098            2 :             "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
    2099            2 :         ))
    2100            2 :         .unwrap();
    2101            2 :         assert_eq!(
    2102            2 :             cmd,
    2103            2 :             PageServiceCmd::FullBackup(FullBackupCmd {
    2104            2 :                 tenant_id,
    2105            2 :                 timeline_id,
    2106            2 :                 lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
    2107            2 :                 prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
    2108            2 :             })
    2109            2 :         );
    2110            2 :         let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    2111            2 :         let cmd = PageServiceCmd::parse(&format!(
    2112            2 :             "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
    2113            2 :         ))
    2114            2 :         .unwrap();
    2115            2 :         assert_eq!(
    2116            2 :             cmd,
    2117            2 :             PageServiceCmd::LeaseLsn(LeaseLsnCmd {
    2118            2 :                 tenant_shard_id,
    2119            2 :                 timeline_id,
    2120            2 :                 lsn: Lsn::from_str("0/16ABCDE").unwrap(),
    2121            2 :             })
    2122            2 :         );
    2123            2 :         let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
    2124            2 :         let cmd = PageServiceCmd::parse(&format!(
    2125            2 :             "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
    2126            2 :         ))
    2127            2 :         .unwrap();
    2128            2 :         assert_eq!(
    2129            2 :             cmd,
    2130            2 :             PageServiceCmd::LeaseLsn(LeaseLsnCmd {
    2131            2 :                 tenant_shard_id,
    2132            2 :                 timeline_id,
    2133            2 :                 lsn: Lsn::from_str("0/16ABCDE").unwrap(),
    2134            2 :             })
    2135            2 :         );
    2136            2 :         let cmd = PageServiceCmd::parse("set a = b").unwrap();
    2137            2 :         assert_eq!(cmd, PageServiceCmd::Set);
    2138            2 :         let cmd = PageServiceCmd::parse("SET foo").unwrap();
    2139            2 :         assert_eq!(cmd, PageServiceCmd::Set);
    2140            2 :     }
    2141              : 
    2142              :     #[test]
    2143            2 :     fn pageservice_cmd_err_handling() {
    2144            2 :         let tenant_id = TenantId::generate();
    2145            2 :         let timeline_id = TimelineId::generate();
    2146            2 :         let cmd = PageServiceCmd::parse("unknown_command");
    2147            2 :         assert!(cmd.is_err());
    2148            2 :         let cmd = PageServiceCmd::parse("pagestream_v2");
    2149            2 :         assert!(cmd.is_err());
    2150            2 :         let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
    2151            2 :         assert!(cmd.is_err());
    2152            2 :         let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
    2153            2 :         assert!(cmd.is_err());
    2154            2 :         let cmd = PageServiceCmd::parse(&format!(
    2155            2 :             "basebackup {tenant_id} {timeline_id} --gzip --gzip"
    2156            2 :         ));
    2157            2 :         assert!(cmd.is_err());
    2158            2 :         let cmd = PageServiceCmd::parse(&format!(
    2159            2 :             "basebackup {tenant_id} {timeline_id} --gzip --unknown"
    2160            2 :         ));
    2161            2 :         assert!(cmd.is_err());
    2162            2 :         let cmd = PageServiceCmd::parse(&format!(
    2163            2 :             "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
    2164            2 :         ));
    2165            2 :         assert!(cmd.is_err());
    2166            2 :         let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
    2167            2 :         assert!(cmd.is_err());
    2168            2 :     }
    2169              : }
        

Generated by: LCOV version 2.1-beta