LCOV - differential code coverage report
Current view: top level - pageserver/src - page_service.rs (source / functions) Coverage Total Hit UBC GBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 81.5 % 712 580 132 6 574
Current Date: 2024-01-09 02:06:09 Functions: 32.3 % 198 64 134 3 61
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //
       2                 : //! The Page Service listens for client connections and serves their GetPage@LSN
       3                 : //! requests.
       4                 : //
       5                 : //   It is possible to connect here using usual psql/pgbench/libpq. Following
       6                 : // commands are supported now:
       7                 : //     *status* -- show actual info about this pageserver,
       8                 : //     *pagestream* -- enter mode where smgr and pageserver talk with their
       9                 : //  custom protocol.
      10                 : //
      11                 : 
      12                 : use anyhow::Context;
      13                 : use async_compression::tokio::write::GzipEncoder;
      14                 : use bytes::Buf;
      15                 : use bytes::Bytes;
      16                 : use futures::Stream;
      17                 : use pageserver_api::models::TenantState;
      18                 : use pageserver_api::models::{
      19                 :     PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
      20                 :     PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
      21                 :     PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
      22                 :     PagestreamNblocksRequest, PagestreamNblocksResponse,
      23                 : };
      24                 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
      25                 : use pq_proto::framed::ConnectionError;
      26                 : use pq_proto::FeStartupPacket;
      27                 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
      28                 : use std::borrow::Cow;
      29                 : use std::io;
      30                 : use std::net::TcpListener;
      31                 : use std::pin::pin;
      32                 : use std::str;
      33                 : use std::str::FromStr;
      34                 : use std::sync::Arc;
      35                 : use std::time::Duration;
      36                 : use tokio::io::AsyncWriteExt;
      37                 : use tokio::io::{AsyncRead, AsyncWrite};
      38                 : use tokio_util::io::StreamReader;
      39                 : use tokio_util::sync::CancellationToken;
      40                 : use tracing::field;
      41                 : use tracing::*;
      42                 : use utils::id::ConnectionId;
      43                 : use utils::{
      44                 :     auth::{Claims, Scope, SwappableJwtAuth},
      45                 :     id::{TenantId, TimelineId},
      46                 :     lsn::Lsn,
      47                 :     simple_rcu::RcuReadGuard,
      48                 : };
      49                 : 
      50                 : use crate::auth::check_permission;
      51                 : use crate::basebackup;
      52                 : use crate::config::PageServerConf;
      53                 : use crate::context::{DownloadBehavior, RequestContext};
      54                 : use crate::import_datadir::import_wal_from_tar;
      55                 : use crate::metrics;
      56                 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
      57                 : use crate::pgdatadir_mapping::{rel_block_to_key, Version};
      58                 : use crate::task_mgr;
      59                 : use crate::task_mgr::TaskKind;
      60                 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      61                 : use crate::tenant::mgr;
      62                 : use crate::tenant::mgr::get_active_tenant_with_timeout;
      63                 : use crate::tenant::mgr::GetActiveTenantError;
      64                 : use crate::tenant::mgr::ShardSelector;
      65                 : use crate::tenant::timeline::WaitLsnError;
      66                 : use crate::tenant::GetTimelineError;
      67                 : use crate::tenant::PageReconstructError;
      68                 : use crate::tenant::Timeline;
      69                 : use crate::trace::Tracer;
      70                 : 
      71                 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
      72                 : use postgres_ffi::BLCKSZ;
      73                 : 
      74                 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
      75                 : // is not yet in state [`TenantState::Active`].
      76                 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
      77                 : 
      78                 : /// Read the end of a tar archive.
      79                 : ///
      80                 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
      81                 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
      82                 : /// and check that there is no more data after the EOF marker.
      83                 : ///
      84                 : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
      85                 : /// Perhaps it should be a hard error?
      86 CBC          11 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
      87              11 :     use tokio::io::AsyncReadExt;
      88              11 :     let mut buf = [0u8; 512];
      89              11 : 
      90              11 :     // Read the all-zeros block, and verify it
      91              11 :     let mut total_bytes = 0;
      92              22 :     while total_bytes < 512 {
      93              11 :         let nbytes = reader.read(&mut buf[total_bytes..]).await?;
      94              11 :         total_bytes += nbytes;
      95              11 :         if nbytes == 0 {
      96 UBC           0 :             break;
      97 CBC          11 :         }
      98                 :     }
      99              11 :     if total_bytes < 512 {
     100 UBC           0 :         anyhow::bail!("incomplete or invalid tar EOF marker");
     101 CBC          11 :     }
     102            5632 :     if !buf.iter().all(|&x| x == 0) {
     103 UBC           0 :         anyhow::bail!("invalid tar EOF marker");
     104 CBC          11 :     }
     105              11 : 
     106              11 :     // Drain any data after the EOF marker
     107              11 :     let mut trailing_bytes = 0;
     108                 :     loop {
     109              84 :         let nbytes = reader.read(&mut buf).await?;
     110              84 :         trailing_bytes += nbytes;
     111              84 :         if nbytes == 0 {
     112              11 :             break;
     113              73 :         }
     114                 :     }
     115              11 :     if trailing_bytes > 0 {
     116               7 :         warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
     117               4 :     }
     118              11 :     Ok(())
     119              11 : }
     120                 : 
     121                 : ///////////////////////////////////////////////////////////////////////////////
     122                 : 
     123                 : ///
     124                 : /// Main loop of the page service.
     125                 : ///
     126                 : /// Listens for connections, and launches a new handler task for each.
     127                 : ///
     128             557 : pub async fn libpq_listener_main(
     129             557 :     conf: &'static PageServerConf,
     130             557 :     broker_client: storage_broker::BrokerClientChannel,
     131             557 :     auth: Option<Arc<SwappableJwtAuth>>,
     132             557 :     listener: TcpListener,
     133             557 :     auth_type: AuthType,
     134             557 :     listener_ctx: RequestContext,
     135             557 :     cancel: CancellationToken,
     136             557 : ) -> anyhow::Result<()> {
     137             557 :     listener.set_nonblocking(true)?;
     138             557 :     let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
     139                 : 
     140                 :     // Wait for a new connection to arrive, or for server shutdown.
     141            8638 :     while let Some(res) = tokio::select! {
     142                 :         biased;
     143                 : 
     144                 :         _ = cancel.cancelled() => {
     145                 :             // We were requested to shut down.
     146                 :             None
     147                 :         }
     148                 : 
     149            8081 :         res = tokio_listener.accept() => {
     150                 :             Some(res)
     151                 :         }
     152                 :     } {
     153            8081 :         match res {
     154            8081 :             Ok((socket, peer_addr)) => {
     155                 :                 // Connection established. Spawn a new task to handle it.
     156 UBC           0 :                 debug!("accepted connection from {}", peer_addr);
     157 CBC        8081 :                 let local_auth = auth.clone();
     158            8081 : 
     159            8081 :                 let connection_ctx = listener_ctx
     160            8081 :                     .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
     161            8081 : 
     162            8081 :                 // PageRequestHandler tasks are not associated with any particular
     163            8081 :                 // timeline in the task manager. In practice most connections will
     164            8081 :                 // only deal with a particular timeline, but we don't know which one
     165            8081 :                 // yet.
     166            8081 :                 task_mgr::spawn(
     167            8081 :                     &tokio::runtime::Handle::current(),
     168            8081 :                     TaskKind::PageRequestHandler,
     169            8081 :                     None,
     170            8081 :                     None,
     171            8081 :                     "serving compute connection task",
     172            8081 :                     false,
     173            8081 :                     page_service_conn_main(
     174            8081 :                         conf,
     175            8081 :                         broker_client.clone(),
     176            8081 :                         local_auth,
     177            8081 :                         socket,
     178            8081 :                         auth_type,
     179            8081 :                         connection_ctx,
     180            8081 :                     ),
     181            8081 :                 );
     182                 :             }
     183 UBC           0 :             Err(err) => {
     184                 :                 // accept() failed. Log the error, and loop back to retry on next connection.
     185               0 :                 error!("accept() failed: {:?}", err);
     186                 :             }
     187                 :         }
     188                 :     }
     189                 : 
     190               0 :     debug!("page_service loop terminated");
     191                 : 
     192 CBC         159 :     Ok(())
     193             159 : }
     194                 : 
     195 UBC           0 : #[instrument(skip_all, fields(peer_addr))]
     196                 : async fn page_service_conn_main(
     197                 :     conf: &'static PageServerConf,
     198                 :     broker_client: storage_broker::BrokerClientChannel,
     199                 :     auth: Option<Arc<SwappableJwtAuth>>,
     200                 :     socket: tokio::net::TcpStream,
     201                 :     auth_type: AuthType,
     202                 :     connection_ctx: RequestContext,
     203                 : ) -> anyhow::Result<()> {
     204                 :     // Immediately increment the gauge, then create a job to decrement it on task exit.
     205                 :     // One of the pros of `defer!` is that this will *most probably*
     206                 :     // get called, even in presence of panics.
     207                 :     let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
     208                 :     gauge.inc();
     209 CBC        8029 :     scopeguard::defer! {
     210            8029 :         gauge.dec();
     211            8029 :     }
     212                 : 
     213                 :     socket
     214                 :         .set_nodelay(true)
     215                 :         .context("could not set TCP_NODELAY")?;
     216                 : 
     217                 :     let peer_addr = socket.peer_addr().context("get peer address")?;
     218                 :     tracing::Span::current().record("peer_addr", field::display(peer_addr));
     219                 : 
     220                 :     // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
     221                 :     // - long enough for most valid compute connections
     222                 :     // - less than infinite to stop us from "leaking" connections to long-gone computes
     223                 :     //
     224                 :     // no write timeout is used, because the kernel is assumed to error writes after some time.
     225                 :     let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
     226                 : 
     227                 :     let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
     228            8081 :     let socket_timeout_ms = (|| {
     229            8081 :         fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
     230                 :             // Exponential distribution for simulating
     231                 :             // poor network conditions, expect about avg_timeout_ms to be around 15
     232                 :             // in tests
     233 GBC           5 :             if let Some(avg_timeout_ms) = avg_timeout_ms {
     234               5 :                 let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
     235               5 :                 let u = rand::random::<f32>();
     236               5 :                 ((1.0 - u).ln() / (-avg)) as u64
     237                 :             } else {
     238 UBC           0 :                 default_timeout_ms
     239                 :             }
     240 CBC        8081 :         });
     241            8076 :         default_timeout_ms
     242                 :     })();
     243                 : 
     244                 :     // A timeout here does not mean the client died, it can happen if it's just idle for
     245                 :     // a while: we will tear down this PageServerHandler and instantiate a new one if/when
     246                 :     // they reconnect.
     247                 :     socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
     248                 :     let socket = std::pin::pin!(socket);
     249                 : 
     250                 :     // XXX: pgbackend.run() should take the connection_ctx,
     251                 :     // and create a child per-query context when it invokes process_query.
     252                 :     // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
     253                 :     // and create the per-query context in process_query ourselves.
     254                 :     let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
     255                 :     let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
     256                 : 
     257                 :     match pgbackend
     258                 :         .run(&mut conn_handler, task_mgr::shutdown_watcher)
     259                 :         .await
     260                 :     {
     261                 :         Ok(()) => {
     262                 :             // we've been requested to shut down
     263                 :             Ok(())
     264                 :         }
     265                 :         Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     266                 :             if is_expected_io_error(&io_error) {
     267 UBC           0 :                 info!("Postgres client disconnected ({io_error})");
     268                 :                 Ok(())
     269                 :             } else {
     270                 :                 Err(io_error).context("Postgres connection error")
     271                 :             }
     272                 :         }
     273                 :         other => other.context("Postgres query error"),
     274                 :     }
     275                 : }
     276                 : 
     277                 : struct PageServerHandler {
     278                 :     _conf: &'static PageServerConf,
     279                 :     broker_client: storage_broker::BrokerClientChannel,
     280                 :     auth: Option<Arc<SwappableJwtAuth>>,
     281                 :     claims: Option<Claims>,
     282                 : 
     283                 :     /// The context created for the lifetime of the connection
     284                 :     /// services by this PageServerHandler.
     285                 :     /// For each query received over the connection,
     286                 :     /// `process_query` creates a child context from this one.
     287                 :     connection_ctx: RequestContext,
     288                 : }
     289                 : 
     290 CBC           2 : #[derive(thiserror::Error, Debug)]
     291                 : enum PageStreamError {
     292                 :     /// We encountered an error that should prompt the client to reconnect:
     293                 :     /// in practice this means we drop the connection without sending a response.
     294                 :     #[error("Reconnect required: {0}")]
     295                 :     Reconnect(Cow<'static, str>),
     296                 : 
     297                 :     /// We were instructed to shutdown while processing the query
     298                 :     #[error("Shutting down")]
     299                 :     Shutdown,
     300                 : 
     301                 :     /// Something went wrong reading a page: this likely indicates a pageserver bug
     302                 :     #[error("Read error: {0}")]
     303                 :     Read(PageReconstructError),
     304                 : 
     305                 :     /// Ran out of time waiting for an LSN
     306                 :     #[error("LSN timeout: {0}")]
     307                 :     LsnTimeout(WaitLsnError),
     308                 : 
     309                 :     /// The entity required to serve the request (tenant or timeline) is not found,
     310                 :     /// or is not found in a suitable state to serve a request.
     311                 :     #[error("Not found: {0}")]
     312                 :     NotFound(std::borrow::Cow<'static, str>),
     313                 : 
     314                 :     /// Request asked for something that doesn't make sense, like an invalid LSN
     315                 :     #[error("Bad request: {0}")]
     316                 :     BadRequest(std::borrow::Cow<'static, str>),
     317                 : }
     318                 : 
     319                 : impl From<PageReconstructError> for PageStreamError {
     320               3 :     fn from(value: PageReconstructError) -> Self {
     321               3 :         match value {
     322               3 :             PageReconstructError::Cancelled => Self::Shutdown,
     323 UBC           0 :             e => Self::Read(e),
     324                 :         }
     325 CBC           3 :     }
     326                 : }
     327                 : 
     328                 : impl From<GetActiveTimelineError> for PageStreamError {
     329 UBC           0 :     fn from(value: GetActiveTimelineError) -> Self {
     330               0 :         match value {
     331               0 :             GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
     332               0 :             GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
     333               0 :             GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
     334                 :         }
     335               0 :     }
     336                 : }
     337                 : 
     338                 : impl From<WaitLsnError> for PageStreamError {
     339               0 :     fn from(value: WaitLsnError) -> Self {
     340               0 :         match value {
     341               0 :             e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
     342               0 :             WaitLsnError::Shutdown => Self::Shutdown,
     343               0 :             WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
     344                 :         }
     345               0 :     }
     346                 : }
     347                 : 
     348                 : impl PageServerHandler {
     349 CBC        8081 :     pub fn new(
     350            8081 :         conf: &'static PageServerConf,
     351            8081 :         broker_client: storage_broker::BrokerClientChannel,
     352            8081 :         auth: Option<Arc<SwappableJwtAuth>>,
     353            8081 :         connection_ctx: RequestContext,
     354            8081 :     ) -> Self {
     355            8081 :         PageServerHandler {
     356            8081 :             _conf: conf,
     357            8081 :             broker_client,
     358            8081 :             auth,
     359            8081 :             claims: None,
     360            8081 :             connection_ctx,
     361            8081 :         }
     362            8081 :     }
     363                 : 
     364                 :     /// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
     365                 :     /// this rather than naked flush() in order to shut down promptly.  Without this, we would
     366                 :     /// block shutdown of a tenant if a postgres client was failing to consume bytes we send
     367                 :     /// in the flush.
     368         3649597 :     async fn flush_cancellable<IO>(
     369         3649597 :         &self,
     370         3649597 :         pgb: &mut PostgresBackend<IO>,
     371         3649597 :         cancel: &CancellationToken,
     372         3649597 :     ) -> Result<(), QueryError>
     373         3649597 :     where
     374         3649597 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     375         3649597 :     {
     376         3655643 :         tokio::select!(
     377         3649597 :             flush_r = pgb.flush() => {
     378                 :                 Ok(flush_r?)
     379                 :             },
     380                 :             _ = cancel.cancelled() => {
     381                 :                 Err(QueryError::Shutdown)
     382                 :             }
     383                 :         )
     384         3649597 :     }
     385                 : 
     386              13 :     fn copyin_stream<'a, IO>(
     387              13 :         &'a self,
     388              13 :         pgb: &'a mut PostgresBackend<IO>,
     389              13 :         cancel: &'a CancellationToken,
     390              13 :     ) -> impl Stream<Item = io::Result<Bytes>> + 'a
     391              13 :     where
     392              13 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     393              13 :     {
     394              13 :         async_stream::try_stream! {
     395                 :             loop {
     396           83402 :                 let msg = tokio::select! {
     397                 :                     biased;
     398                 : 
     399                 :                     _ = cancel.cancelled() => {
     400                 :                         // We were requested to shut down.
     401                 :                         let msg = "pageserver is shutting down";
     402                 :                         let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
     403                 :                         Err(QueryError::Shutdown)
     404                 :                     }
     405                 : 
     406           83402 :                     msg = pgb.read_message() => { msg.map_err(QueryError::from)}
     407                 :                 };
     408                 : 
     409           83402 :                 match msg {
     410           83402 :                     Ok(Some(message)) => {
     411           83402 :                         let copy_data_bytes = match message {
     412           83383 :                             FeMessage::CopyData(bytes) => bytes,
     413              12 :                             FeMessage::CopyDone => { break },
     414               7 :                             FeMessage::Sync => continue,
     415                 :                             FeMessage::Terminate => {
     416 UBC           0 :                                 let msg = "client terminated connection with Terminate message during COPY";
     417               0 :                                 let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
     418               0 :                                 // error can't happen here, ErrorResponse serialization should be always ok
     419               0 :                                 pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
     420               0 :                                 Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
     421               0 :                                 break;
     422                 :                             }
     423               0 :                             m => {
     424               0 :                                 let msg = format!("unexpected message {m:?}");
     425               0 :                                 // error can't happen here, ErrorResponse serialization should be always ok
     426               0 :                                 pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
     427               0 :                                 Err(io::Error::new(io::ErrorKind::Other, msg))?;
     428               0 :                                 break;
     429                 :                             }
     430                 :                         };
     431                 : 
     432 CBC       83383 :                         yield copy_data_bytes;
     433                 :                     }
     434                 :                     Ok(None) => {
     435 UBC           0 :                         let msg = "client closed connection during COPY";
     436               0 :                         let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
     437               0 :                         // error can't happen here, ErrorResponse serialization should be always ok
     438               0 :                         pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
     439               0 :                         self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
     440               0 :                         Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
     441                 :                     }
     442               0 :                     Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     443               0 :                         Err(io_error)?;
     444                 :                     }
     445               0 :                     Err(other) => {
     446               0 :                         Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
     447                 :                     }
     448                 :                 };
     449                 :             }
     450                 :         }
     451 CBC          13 :     }
     452                 : 
     453            6961 :     #[instrument(skip_all)]
     454                 :     async fn handle_pagerequests<IO>(
     455                 :         &self,
     456                 :         pgb: &mut PostgresBackend<IO>,
     457                 :         tenant_id: TenantId,
     458                 :         timeline_id: TimelineId,
     459                 :         ctx: RequestContext,
     460                 :     ) -> Result<(), QueryError>
     461                 :     where
     462                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     463                 :     {
     464                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     465                 : 
     466                 :         // Note that since one connection may contain getpage requests that target different
     467                 :         // shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
     468                 :         // that we look up here may not be the one that serves all the actual requests: we will double
     469                 :         // check the mapping of key->shard later before calling into Timeline for getpage requests.
     470                 :         let tenant = mgr::get_active_tenant_with_timeout(
     471                 :             tenant_id,
     472                 :             ShardSelector::First,
     473                 :             ACTIVE_TENANT_TIMEOUT,
     474                 :             &task_mgr::shutdown_token(),
     475                 :         )
     476                 :         .await?;
     477                 : 
     478                 :         // Make request tracer if needed
     479                 :         let mut tracer = if tenant.get_trace_read_requests() {
     480                 :             let connection_id = ConnectionId::generate();
     481                 :             let path =
     482                 :                 tenant
     483                 :                     .conf
     484                 :                     .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
     485                 :             Some(Tracer::new(path))
     486                 :         } else {
     487                 :             None
     488                 :         };
     489                 : 
     490                 :         // Check that the timeline exists
     491                 :         let timeline = tenant
     492                 :             .get_timeline(timeline_id, true)
     493 UBC           0 :             .map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
     494                 : 
     495                 :         // Avoid starting new requests if the timeline has already started shutting down,
     496                 :         // and block timeline shutdown until this request is complete, or drops out due
     497                 :         // to cancellation.
     498               0 :         let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
     499                 : 
     500                 :         // switch client to COPYBOTH
     501                 :         pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
     502                 :         self.flush_cancellable(pgb, &timeline.cancel).await?;
     503                 : 
     504                 :         let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
     505                 : 
     506                 :         loop {
     507 CBC     7029451 :             let msg = tokio::select! {
     508                 :                 biased;
     509                 : 
     510                 :                 _ = timeline.cancel.cancelled() => {
     511                 :                     // We were requested to shut down.
     512             178 :                     info!("shutdown request received in page handler");
     513                 :                     return Err(QueryError::Shutdown)
     514                 :                 }
     515                 : 
     516         3648235 :                 msg = pgb.read_message() => { msg }
     517                 :             };
     518                 : 
     519                 :             let copy_data_bytes = match msg? {
     520                 :                 Some(FeMessage::CopyData(bytes)) => bytes,
     521                 :                 Some(FeMessage::Terminate) => break,
     522                 :                 Some(m) => {
     523                 :                     return Err(QueryError::Other(anyhow::anyhow!(
     524                 :                         "unexpected message: {m:?} during COPY"
     525                 :                     )));
     526                 :                 }
     527                 :                 None => break, // client disconnected
     528                 :             };
     529                 : 
     530 UBC           0 :             trace!("query: {copy_data_bytes:?}");
     531                 : 
     532                 :             // Trace request if needed
     533                 :             if let Some(t) = tracer.as_mut() {
     534                 :                 t.trace(&copy_data_bytes)
     535                 :             }
     536                 : 
     537                 :             let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
     538                 : 
     539                 :             // TODO: We could create a new per-request context here, with unique ID.
     540                 :             // Currently we use the same per-timeline context for all requests
     541                 : 
     542                 :             let (response, span) = match neon_fe_msg {
     543                 :                 PagestreamFeMessage::Exists(req) => {
     544                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
     545                 :                     let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
     546                 :                     (
     547                 :                         self.handle_get_rel_exists_request(&timeline, &req, &ctx)
     548                 :                             .instrument(span.clone())
     549                 :                             .await,
     550                 :                         span,
     551                 :                     )
     552                 :                 }
     553                 :                 PagestreamFeMessage::Nblocks(req) => {
     554                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
     555                 :                     let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
     556                 :                     (
     557                 :                         self.handle_get_nblocks_request(&timeline, &req, &ctx)
     558                 :                             .instrument(span.clone())
     559                 :                             .await,
     560                 :                         span,
     561                 :                     )
     562                 :                 }
     563                 :                 PagestreamFeMessage::GetPage(req) => {
     564                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
     565                 :                     let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
     566                 :                     (
     567                 :                         self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
     568                 :                             .instrument(span.clone())
     569                 :                             .await,
     570                 :                         span,
     571                 :                     )
     572                 :                 }
     573                 :                 PagestreamFeMessage::DbSize(req) => {
     574                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
     575                 :                     let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
     576                 :                     (
     577                 :                         self.handle_db_size_request(&timeline, &req, &ctx)
     578                 :                             .instrument(span.clone())
     579                 :                             .await,
     580                 :                         span,
     581                 :                     )
     582                 :                 }
     583                 :             };
     584                 : 
     585                 :             match response {
     586                 :                 Err(PageStreamError::Shutdown) => {
     587                 :                     // If we fail to fulfil a request during shutdown, which may be _because_ of
     588                 :                     // shutdown, then do not send the error to the client.  Instead just drop the
     589                 :                     // connection.
     590 CBC           3 :                     span.in_scope(|| info!("dropping connection due to shutdown"));
     591                 :                     return Err(QueryError::Shutdown);
     592                 :                 }
     593                 :                 Err(PageStreamError::Reconnect(reason)) => {
     594 UBC           0 :                     span.in_scope(|| info!("handler requested reconnect: {reason}"));
     595                 :                     return Err(QueryError::Reconnect);
     596                 :                 }
     597                 :                 Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
     598                 :                     // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
     599                 :                     // shutdown error, this may be buried inside a PageReconstructError::Other for example.
     600                 :                     //
     601                 :                     // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
     602                 :                     // because wait_lsn etc will drop out
     603                 :                     // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
     604                 :                     // is_canceled(): [`Timeline::shutdown`]` has entered
     605               0 :                     span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
     606                 :                     return Err(QueryError::Shutdown);
     607                 :                 }
     608                 :                 r => {
     609 CBC           1 :                     let response_msg = r.unwrap_or_else(|e| {
     610               1 :                         // print the all details to the log with {:#}, but for the client the
     611               1 :                         // error message is enough.  Do not log if shutting down, as the anyhow::Error
     612               1 :                         // here includes cancellation which is not an error.
     613               1 :                         span.in_scope(|| error!("error reading relation or page version: {:#}", e));
     614               1 :                         PagestreamBeMessage::Error(PagestreamErrorResponse {
     615               1 :                             message: e.to_string(),
     616               1 :                         })
     617               1 :                     });
     618                 : 
     619                 :                     pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
     620                 :                     self.flush_cancellable(pgb, &timeline.cancel).await?;
     621                 :                 }
     622                 :             }
     623                 :         }
     624                 :         Ok(())
     625                 :     }
     626                 : 
     627                 :     #[allow(clippy::too_many_arguments)]
     628              11 :     #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
     629                 :     async fn handle_import_basebackup<IO>(
     630                 :         &self,
     631                 :         pgb: &mut PostgresBackend<IO>,
     632                 :         tenant_id: TenantId,
     633                 :         timeline_id: TimelineId,
     634                 :         base_lsn: Lsn,
     635                 :         _end_lsn: Lsn,
     636                 :         pg_version: u32,
     637                 :         ctx: RequestContext,
     638                 :     ) -> Result<(), QueryError>
     639                 :     where
     640                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     641                 :     {
     642                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     643                 : 
     644                 :         // Create empty timeline
     645              11 :         info!("creating new timeline");
     646                 :         let tenant = get_active_tenant_with_timeout(
     647                 :             tenant_id,
     648                 :             ShardSelector::Zero,
     649                 :             ACTIVE_TENANT_TIMEOUT,
     650                 :             &task_mgr::shutdown_token(),
     651                 :         )
     652                 :         .await?;
     653                 :         let timeline = tenant
     654                 :             .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
     655                 :             .await?;
     656                 : 
     657                 :         // TODO mark timeline as not ready until it reaches end_lsn.
     658                 :         // We might have some wal to import as well, and we should prevent compute
     659                 :         // from connecting before that and writing conflicting wal.
     660                 :         //
     661                 :         // This is not relevant for pageserver->pageserver migrations, since there's
     662                 :         // no wal to import. But should be fixed if we want to import from postgres.
     663                 : 
     664                 :         // TODO leave clean state on error. For now you can use detach to clean
     665                 :         // up broken state from a failed import.
     666                 : 
     667                 :         // Import basebackup provided via CopyData
     668              11 :         info!("importing basebackup");
     669                 :         pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
     670                 :         self.flush_cancellable(pgb, &tenant.cancel).await?;
     671                 : 
     672                 :         let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
     673                 :         timeline
     674                 :             .import_basebackup_from_tar(
     675                 :                 &mut copyin_reader,
     676                 :                 base_lsn,
     677                 :                 self.broker_client.clone(),
     678                 :                 &ctx,
     679                 :             )
     680                 :             .await?;
     681                 : 
     682                 :         // Read the end of the tar archive.
     683                 :         read_tar_eof(copyin_reader).await?;
     684                 : 
     685                 :         // TODO check checksum
     686                 :         // Meanwhile you can verify client-side by taking fullbackup
     687                 :         // and checking that it matches in size with what was imported.
     688                 :         // It wouldn't work if base came from vanilla postgres though,
     689                 :         // since we discard some log files.
     690                 : 
     691               9 :         info!("done");
     692                 :         Ok(())
     693                 :     }
     694                 : 
     695 UBC           0 :     #[instrument(skip_all, fields(%start_lsn, %end_lsn))]
     696                 :     async fn handle_import_wal<IO>(
     697                 :         &self,
     698                 :         pgb: &mut PostgresBackend<IO>,
     699                 :         tenant_id: TenantId,
     700                 :         timeline_id: TimelineId,
     701                 :         start_lsn: Lsn,
     702                 :         end_lsn: Lsn,
     703                 :         ctx: RequestContext,
     704                 :     ) -> Result<(), QueryError>
     705                 :     where
     706                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     707                 :     {
     708                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     709                 : 
     710                 :         let timeline = self
     711                 :             .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
     712                 :             .await?;
     713                 :         let last_record_lsn = timeline.get_last_record_lsn();
     714                 :         if last_record_lsn != start_lsn {
     715                 :             return Err(QueryError::Other(
     716                 :                 anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
     717                 :             );
     718                 :         }
     719                 : 
     720                 :         // TODO leave clean state on error. For now you can use detach to clean
     721                 :         // up broken state from a failed import.
     722                 : 
     723                 :         // Import wal provided via CopyData
     724 CBC           2 :         info!("importing wal");
     725                 :         pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
     726                 :         self.flush_cancellable(pgb, &timeline.cancel).await?;
     727                 :         let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
     728                 :         import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
     729               2 :         info!("wal import complete");
     730                 : 
     731                 :         // Read the end of the tar archive.
     732                 :         read_tar_eof(copyin_reader).await?;
     733                 : 
     734                 :         // TODO Does it make sense to overshoot?
     735                 :         if timeline.get_last_record_lsn() < end_lsn {
     736                 :             return Err(QueryError::Other(
     737                 :                 anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
     738                 :             );
     739                 :         }
     740                 : 
     741                 :         // Flush data to disk, then upload to s3. No need for a forced checkpoint.
     742                 :         // We only want to persist the data, and it doesn't matter if it's in the
     743                 :         // shape of deltas or images.
     744               2 :         info!("flushing layers");
     745                 :         timeline.freeze_and_flush().await?;
     746                 : 
     747               2 :         info!("done");
     748                 :         Ok(())
     749                 :     }
     750                 : 
     751                 :     /// Helper function to handle the LSN from client request.
     752                 :     ///
     753                 :     /// Each GetPage (and Exists and Nblocks) request includes information about
     754                 :     /// which version of the page is being requested. The client can request the
     755                 :     /// latest version of the page, or the version that's valid at a particular
     756                 :     /// LSN. The primary compute node will always request the latest page
     757                 :     /// version, while a standby will request a version at the LSN that it's
     758                 :     /// currently caught up to.
     759                 :     ///
     760                 :     /// In either case, if the page server hasn't received the WAL up to the
     761                 :     /// requested LSN yet, we will wait for it to arrive. The return value is
     762                 :     /// the LSN that should be used to look up the page versions.
     763         3641536 :     async fn wait_or_get_last_lsn(
     764         3641536 :         timeline: &Timeline,
     765         3641536 :         mut lsn: Lsn,
     766         3641536 :         latest: bool,
     767         3641536 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     768         3641536 :         ctx: &RequestContext,
     769         3641536 :     ) -> Result<Lsn, PageStreamError> {
     770         3641536 :         if latest {
     771                 :             // Latest page version was requested. If LSN is given, it is a hint
     772                 :             // to the page server that there have been no modifications to the
     773                 :             // page after that LSN. If we haven't received WAL up to that point,
     774                 :             // wait until it arrives.
     775         3473262 :             let last_record_lsn = timeline.get_last_record_lsn();
     776         3473262 : 
     777         3473262 :             // Note: this covers the special case that lsn == Lsn(0). That
     778         3473262 :             // special case means "return the latest version whatever it is",
     779         3473262 :             // and it's used for bootstrapping purposes, when the page server is
     780         3473262 :             // connected directly to the compute node. That is needed because
     781         3473262 :             // when you connect to the compute node, to receive the WAL, the
     782         3473262 :             // walsender process will do a look up in the pg_authid catalog
     783         3473262 :             // table for authentication. That poses a deadlock problem: the
     784         3473262 :             // catalog table lookup will send a GetPage request, but the GetPage
     785         3473262 :             // request will block in the page server because the recent WAL
     786         3473262 :             // hasn't been received yet, and it cannot be received until the
     787         3473262 :             // walsender completes the authentication and starts streaming the
     788         3473262 :             // WAL.
     789         3473262 :             if lsn <= last_record_lsn {
     790         3420188 :                 lsn = last_record_lsn;
     791         3420188 :             } else {
     792          103368 :                 timeline.wait_lsn(lsn, ctx).await?;
     793                 :                 // Since we waited for 'lsn' to arrive, that is now the last
     794                 :                 // record LSN. (Or close enough for our purposes; the
     795                 :                 // last-record LSN can advance immediately after we return
     796                 :                 // anyway)
     797                 :             }
     798                 :         } else {
     799          168274 :             if lsn == Lsn(0) {
     800               1 :                 return Err(PageStreamError::BadRequest(
     801               1 :                     "invalid LSN(0) in request".into(),
     802               1 :                 ));
     803          168273 :             }
     804          168273 :             timeline.wait_lsn(lsn, ctx).await?;
     805                 :         }
     806                 : 
     807         3641535 :         if lsn < **latest_gc_cutoff_lsn {
     808 UBC           0 :             return Err(PageStreamError::BadRequest(format!(
     809               0 :                 "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
     810               0 :                 lsn, **latest_gc_cutoff_lsn
     811               0 :             ).into()));
     812 CBC     3641535 :         }
     813         3641535 :         Ok(lsn)
     814         3641536 :     }
     815                 : 
     816           42614 :     async fn handle_get_rel_exists_request(
     817           42614 :         &self,
     818           42614 :         timeline: &Timeline,
     819           42614 :         req: &PagestreamExistsRequest,
     820           42614 :         ctx: &RequestContext,
     821           42614 :     ) -> Result<PagestreamBeMessage, PageStreamError> {
     822           42614 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     823           42614 :         let lsn =
     824           42614 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     825 UBC           0 :                 .await?;
     826                 : 
     827 CBC       42614 :         let exists = timeline
     828           42614 :             .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
     829            9224 :             .await?;
     830                 : 
     831           42614 :         Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
     832           42614 :             exists,
     833           42614 :         }))
     834           42614 :     }
     835                 : 
     836           15892 :     async fn handle_get_nblocks_request(
     837           15892 :         &self,
     838           15892 :         timeline: &Timeline,
     839           15892 :         req: &PagestreamNblocksRequest,
     840           15892 :         ctx: &RequestContext,
     841           15892 :     ) -> Result<PagestreamBeMessage, PageStreamError> {
     842           15892 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     843           15892 :         let lsn =
     844           15892 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     845             200 :                 .await?;
     846                 : 
     847           15892 :         let n_blocks = timeline
     848           15892 :             .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
     849             289 :             .await?;
     850                 : 
     851           15892 :         Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
     852           15892 :             n_blocks,
     853           15892 :         }))
     854           15892 :     }
     855                 : 
     856               5 :     async fn handle_db_size_request(
     857               5 :         &self,
     858               5 :         timeline: &Timeline,
     859               5 :         req: &PagestreamDbSizeRequest,
     860               5 :         ctx: &RequestContext,
     861               5 :     ) -> Result<PagestreamBeMessage, PageStreamError> {
     862               5 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     863               5 :         let lsn =
     864               5 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     865 UBC           0 :                 .await?;
     866                 : 
     867 CBC           5 :         let total_blocks = timeline
     868               5 :             .get_db_size(
     869               5 :                 DEFAULTTABLESPACE_OID,
     870               5 :                 req.dbnode,
     871               5 :                 Version::Lsn(lsn),
     872               5 :                 req.latest,
     873               5 :                 ctx,
     874               5 :             )
     875              91 :             .await?;
     876               5 :         let db_size = total_blocks as i64 * BLCKSZ as i64;
     877               5 : 
     878               5 :         Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
     879               5 :             db_size,
     880               5 :         }))
     881               5 :     }
     882                 : 
     883         3583025 :     async fn do_handle_get_page_at_lsn_request(
     884         3583025 :         &self,
     885         3583025 :         timeline: &Timeline,
     886         3583025 :         req: &PagestreamGetPageRequest,
     887         3583025 :         ctx: &RequestContext,
     888         3583025 :     ) -> Result<PagestreamBeMessage, PageStreamError> {
     889         3583025 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     890         3583024 :         let lsn =
     891         3583025 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     892          103170 :                 .await?;
     893         3583024 :         let page = timeline
     894         3583024 :             .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
     895         1489524 :             .await?;
     896                 : 
     897         3582995 :         Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
     898         3582995 :             page,
     899         3582995 :         }))
     900         3582999 :     }
     901                 : 
     902         3583025 :     async fn handle_get_page_at_lsn_request(
     903         3583025 :         &self,
     904         3583025 :         timeline: &Timeline,
     905         3583025 :         req: &PagestreamGetPageRequest,
     906         3583025 :         ctx: &RequestContext,
     907         3583025 :     ) -> Result<PagestreamBeMessage, PageStreamError> {
     908         3583025 :         let key = rel_block_to_key(req.rel, req.blkno);
     909         3583025 :         if timeline.get_shard_identity().is_key_local(&key) {
     910         3583025 :             self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
     911         1592694 :                 .await
     912                 :         } else {
     913                 :             // The Tenant shard we looked up at connection start does not hold this particular
     914                 :             // key: look for other shards in this tenant.  This scenario occurs if a pageserver
     915                 :             // has multiple shards for the same tenant.
     916                 :             //
     917                 :             // TODO: optimize this (https://github.com/neondatabase/neon/pull/6037)
     918 UBC           0 :             let timeline = match self
     919               0 :                 .get_active_tenant_timeline(
     920               0 :                     timeline.tenant_shard_id.tenant_id,
     921               0 :                     timeline.timeline_id,
     922               0 :                     ShardSelector::Page(key),
     923               0 :                 )
     924               0 :                 .await
     925                 :             {
     926               0 :                 Ok(t) => t,
     927                 :                 Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
     928                 :                     // We already know this tenant exists in general, because we resolved it at
     929                 :                     // start of connection.  Getting a NotFound here indicates that the shard containing
     930                 :                     // the requested page is not present on this node: the client's knowledge of shard->pageserver
     931                 :                     // mapping is out of date.
     932               0 :                     tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
     933               0 :                         timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
     934                 :                     // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
     935                 :                     // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
     936                 :                     // and talk to a different pageserver.
     937               0 :                     return Err(PageStreamError::Reconnect(
     938               0 :                         "getpage@lsn request routed to wrong shard".into(),
     939               0 :                     ));
     940                 :                 }
     941               0 :                 Err(e) => return Err(e.into()),
     942                 :             };
     943                 : 
     944                 :             // Take a GateGuard for the duration of this request.  If we were using our main Timeline object,
     945                 :             // the GateGuard was already held over the whole connection.
     946               0 :             let _timeline_guard = timeline
     947               0 :                 .gate
     948               0 :                 .enter()
     949               0 :                 .map_err(|_| PageStreamError::Shutdown)?;
     950                 : 
     951               0 :             self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
     952               0 :                 .await
     953                 :         }
     954 CBC     3582999 :     }
     955                 : 
     956                 :     #[allow(clippy::too_many_arguments)]
     957             185 :     #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
     958                 :     async fn handle_basebackup_request<IO>(
     959                 :         &mut self,
     960                 :         pgb: &mut PostgresBackend<IO>,
     961                 :         tenant_id: TenantId,
     962                 :         timeline_id: TimelineId,
     963                 :         lsn: Option<Lsn>,
     964                 :         prev_lsn: Option<Lsn>,
     965                 :         full_backup: bool,
     966                 :         gzip: bool,
     967                 :         ctx: RequestContext,
     968                 :     ) -> anyhow::Result<()>
     969                 :     where
     970                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     971                 :     {
     972                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     973                 : 
     974                 :         let started = std::time::Instant::now();
     975                 : 
     976                 :         // check that the timeline exists
     977                 :         let timeline = self
     978                 :             .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
     979                 :             .await?;
     980                 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     981                 :         if let Some(lsn) = lsn {
     982                 :             // Backup was requested at a particular LSN. Wait for it to arrive.
     983             185 :             info!("waiting for {}", lsn);
     984                 :             timeline.wait_lsn(lsn, &ctx).await?;
     985                 :             timeline
     986                 :                 .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
     987                 :                 .context("invalid basebackup lsn")?;
     988                 :         }
     989                 : 
     990                 :         let lsn_awaited_after = started.elapsed();
     991                 : 
     992                 :         // switch client to COPYOUT
     993                 :         pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
     994                 :         self.flush_cancellable(pgb, &timeline.cancel).await?;
     995                 : 
     996                 :         // Send a tarball of the latest layer on the timeline. Compress if not
     997                 :         // fullbackup. TODO Compress in that case too (tests need to be updated)
     998                 :         if full_backup {
     999                 :             let mut writer = pgb.copyout_writer();
    1000                 :             basebackup::send_basebackup_tarball(
    1001                 :                 &mut writer,
    1002                 :                 &timeline,
    1003                 :                 lsn,
    1004                 :                 prev_lsn,
    1005                 :                 full_backup,
    1006                 :                 &ctx,
    1007                 :             )
    1008                 :             .await?;
    1009                 :         } else {
    1010                 :             let mut writer = pgb.copyout_writer();
    1011                 :             if gzip {
    1012                 :                 let mut encoder = GzipEncoder::with_quality(
    1013                 :                     writer,
    1014                 :                     // NOTE using fast compression because it's on the critical path
    1015                 :                     //      for compute startup. For an empty database, we get
    1016                 :                     //      <100KB with this method. The Level::Best compression method
    1017                 :                     //      gives us <20KB, but maybe we should add basebackup caching
    1018                 :                     //      on compute shutdown first.
    1019                 :                     async_compression::Level::Fastest,
    1020                 :                 );
    1021                 :                 basebackup::send_basebackup_tarball(
    1022                 :                     &mut encoder,
    1023                 :                     &timeline,
    1024                 :                     lsn,
    1025                 :                     prev_lsn,
    1026                 :                     full_backup,
    1027                 :                     &ctx,
    1028                 :                 )
    1029                 :                 .await?;
    1030                 :                 // shutdown the encoder to ensure the gzip footer is written
    1031                 :                 encoder.shutdown().await?;
    1032                 :             } else {
    1033                 :                 basebackup::send_basebackup_tarball(
    1034                 :                     &mut writer,
    1035                 :                     &timeline,
    1036                 :                     lsn,
    1037                 :                     prev_lsn,
    1038                 :                     full_backup,
    1039                 :                     &ctx,
    1040                 :                 )
    1041                 :                 .await?;
    1042                 :             }
    1043                 :         }
    1044                 : 
    1045                 :         pgb.write_message_noflush(&BeMessage::CopyDone)?;
    1046                 :         self.flush_cancellable(pgb, &timeline.cancel).await?;
    1047                 : 
    1048                 :         let basebackup_after = started
    1049                 :             .elapsed()
    1050                 :             .checked_sub(lsn_awaited_after)
    1051                 :             .unwrap_or(Duration::ZERO);
    1052                 : 
    1053             557 :         info!(
    1054             557 :             lsn_await_millis = lsn_awaited_after.as_millis(),
    1055             557 :             basebackup_millis = basebackup_after.as_millis(),
    1056             557 :             "basebackup complete"
    1057             557 :         );
    1058                 : 
    1059                 :         Ok(())
    1060                 :     }
    1061                 : 
    1062                 :     // when accessing management api supply None as an argument
    1063                 :     // when using to authorize tenant pass corresponding tenant id
    1064            8069 :     fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
    1065            8069 :         if self.auth.is_none() {
    1066                 :             // auth is set to Trust, nothing to check so just return ok
    1067            7989 :             return Ok(());
    1068              80 :         }
    1069              80 :         // auth is some, just checked above, when auth is some
    1070              80 :         // then claims are always present because of checks during connection init
    1071              80 :         // so this expect won't trigger
    1072              80 :         let claims = self
    1073              80 :             .claims
    1074              80 :             .as_ref()
    1075              80 :             .expect("claims presence already checked");
    1076              80 :         check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
    1077            8069 :     }
    1078                 : 
    1079                 :     /// Shorthand for getting a reference to a Timeline of an Active tenant.
    1080             576 :     async fn get_active_tenant_timeline(
    1081             576 :         &self,
    1082             576 :         tenant_id: TenantId,
    1083             576 :         timeline_id: TimelineId,
    1084             576 :         selector: ShardSelector,
    1085             576 :     ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
    1086             576 :         let tenant = get_active_tenant_with_timeout(
    1087             576 :             tenant_id,
    1088             576 :             selector,
    1089             576 :             ACTIVE_TENANT_TIMEOUT,
    1090             576 :             &task_mgr::shutdown_token(),
    1091             576 :         )
    1092               3 :         .await
    1093             576 :         .map_err(GetActiveTimelineError::Tenant)?;
    1094             576 :         let timeline = tenant.get_timeline(timeline_id, true)?;
    1095             575 :         Ok(timeline)
    1096             576 :     }
    1097                 : }
    1098                 : 
    1099                 : #[async_trait::async_trait]
    1100                 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
    1101                 : where
    1102                 :     IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
    1103                 : {
    1104              82 :     fn check_auth_jwt(
    1105              82 :         &mut self,
    1106              82 :         _pgb: &mut PostgresBackend<IO>,
    1107              82 :         jwt_response: &[u8],
    1108              82 :     ) -> Result<(), QueryError> {
    1109                 :         // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
    1110                 :         // which requires auth to be present
    1111              82 :         let data = self
    1112              82 :             .auth
    1113              82 :             .as_ref()
    1114              82 :             .unwrap()
    1115              82 :             .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
    1116              82 :             .map_err(|e| QueryError::Unauthorized(e.0))?;
    1117                 : 
    1118              82 :         if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
    1119 UBC           0 :             return Err(QueryError::Unauthorized(
    1120               0 :                 "jwt token scope is Tenant, but tenant id is missing".into(),
    1121               0 :             ));
    1122 CBC          82 :         }
    1123              82 : 
    1124              82 :         debug!(
    1125 UBC           0 :             "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
    1126               0 :             data.claims.scope, data.claims.tenant_id,
    1127               0 :         );
    1128                 : 
    1129 CBC          82 :         self.claims = Some(data.claims);
    1130              82 :         Ok(())
    1131              82 :     }
    1132                 : 
    1133            8081 :     fn startup(
    1134            8081 :         &mut self,
    1135            8081 :         _pgb: &mut PostgresBackend<IO>,
    1136            8081 :         _sm: &FeStartupPacket,
    1137            8081 :     ) -> Result<(), QueryError> {
    1138            8081 :         Ok(())
    1139            8081 :     }
    1140                 : 
    1141 UBC           0 :     #[instrument(skip_all, fields(tenant_id, timeline_id))]
    1142                 :     async fn process_query(
    1143                 :         &mut self,
    1144                 :         pgb: &mut PostgresBackend<IO>,
    1145                 :         query_string: &str,
    1146 CBC        8096 :     ) -> Result<(), QueryError> {
    1147            8096 :         fail::fail_point!("simulated-bad-compute-connection", |_| {
    1148 GBC           7 :             info!("Hit failpoint for bad connection");
    1149               7 :             Err(QueryError::SimulatedConnectionError)
    1150 CBC        8096 :         });
    1151                 : 
    1152            8089 :         let ctx = self.connection_ctx.attached_child();
    1153            8089 :         debug!("process query {query_string:?}");
    1154            8089 :         if query_string.starts_with("pagestream ") {
    1155            7475 :             let (_, params_raw) = query_string.split_at("pagestream ".len());
    1156            7475 :             let params = params_raw.split(' ').collect::<Vec<_>>();
    1157            7475 :             if params.len() != 2 {
    1158 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1159               0 :                     "invalid param number for pagestream command"
    1160               0 :                 )));
    1161 CBC        7475 :             }
    1162            7475 :             let tenant_id = TenantId::from_str(params[0])
    1163            7475 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1164            7475 :             let timeline_id = TimelineId::from_str(params[1])
    1165            7475 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1166                 : 
    1167            7475 :             tracing::Span::current()
    1168            7475 :                 .record("tenant_id", field::display(tenant_id))
    1169            7475 :                 .record("timeline_id", field::display(timeline_id));
    1170            7475 : 
    1171            7475 :             self.check_permission(Some(tenant_id))?;
    1172                 : 
    1173            7475 :             self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
    1174         4989978 :                 .await?;
    1175             614 :         } else if query_string.starts_with("basebackup ") {
    1176             550 :             let (_, params_raw) = query_string.split_at("basebackup ".len());
    1177             550 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1178             550 : 
    1179             550 :             if params.len() < 2 {
    1180 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1181               0 :                     "invalid param number for basebackup command"
    1182               0 :                 )));
    1183 CBC         550 :             }
    1184                 : 
    1185             550 :             let tenant_id = TenantId::from_str(params[0])
    1186             550 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1187             550 :             let timeline_id = TimelineId::from_str(params[1])
    1188             550 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1189                 : 
    1190             550 :             tracing::Span::current()
    1191             550 :                 .record("tenant_id", field::display(tenant_id))
    1192             550 :                 .record("timeline_id", field::display(timeline_id));
    1193             550 : 
    1194             550 :             self.check_permission(Some(tenant_id))?;
    1195                 : 
    1196             550 :             let lsn = if params.len() >= 3 {
    1197                 :                 Some(
    1198             170 :                     Lsn::from_str(params[2])
    1199             170 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
    1200                 :                 )
    1201                 :             } else {
    1202             380 :                 None
    1203                 :             };
    1204                 : 
    1205             550 :             let gzip = if params.len() >= 4 {
    1206             170 :                 if params[3] == "--gzip" {
    1207             170 :                     true
    1208                 :                 } else {
    1209 UBC           0 :                     return Err(QueryError::Other(anyhow::anyhow!(
    1210               0 :                         "Parameter in position 3 unknown {}",
    1211               0 :                         params[3],
    1212               0 :                     )));
    1213                 :                 }
    1214                 :             } else {
    1215 CBC         380 :                 false
    1216                 :             };
    1217                 : 
    1218             550 :             ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
    1219             550 :                 &*metrics::BASEBACKUP_QUERY_TIME,
    1220             550 :                 async move {
    1221             550 :                     self.handle_basebackup_request(
    1222             550 :                         pgb,
    1223             550 :                         tenant_id,
    1224             550 :                         timeline_id,
    1225             550 :                         lsn,
    1226             550 :                         None,
    1227             550 :                         false,
    1228             550 :                         gzip,
    1229             550 :                         ctx,
    1230             550 :                     )
    1231           28234 :                     .await?;
    1232             542 :                     pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1233             542 :                     anyhow::Ok(())
    1234             550 :                 },
    1235             550 :             )
    1236           28234 :             .await?;
    1237                 :         }
    1238                 :         // return pair of prev_lsn and last_lsn
    1239              64 :         else if query_string.starts_with("get_last_record_rlsn ") {
    1240              11 :             let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
    1241              11 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1242              11 : 
    1243              11 :             if params.len() != 2 {
    1244 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1245               0 :                     "invalid param number for get_last_record_rlsn command"
    1246               0 :                 )));
    1247 CBC          11 :             }
    1248                 : 
    1249              11 :             let tenant_id = TenantId::from_str(params[0])
    1250              11 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1251              11 :             let timeline_id = TimelineId::from_str(params[1])
    1252              11 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1253                 : 
    1254              11 :             tracing::Span::current()
    1255              11 :                 .record("tenant_id", field::display(tenant_id))
    1256              11 :                 .record("timeline_id", field::display(timeline_id));
    1257              11 : 
    1258              11 :             self.check_permission(Some(tenant_id))?;
    1259               9 :             let timeline = self
    1260               9 :                 .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
    1261 UBC           0 :                 .await?;
    1262                 : 
    1263 CBC           9 :             let end_of_timeline = timeline.get_last_record_rlsn();
    1264               9 : 
    1265               9 :             pgb.write_message_noflush(&BeMessage::RowDescription(&[
    1266               9 :                 RowDescriptor::text_col(b"prev_lsn"),
    1267               9 :                 RowDescriptor::text_col(b"last_lsn"),
    1268               9 :             ]))?
    1269               9 :             .write_message_noflush(&BeMessage::DataRow(&[
    1270               9 :                 Some(end_of_timeline.prev.to_string().as_bytes()),
    1271               9 :                 Some(end_of_timeline.last.to_string().as_bytes()),
    1272               9 :             ]))?
    1273               9 :             .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1274                 :         }
    1275                 :         // same as basebackup, but result includes relational data as well
    1276              53 :         else if query_string.starts_with("fullbackup ") {
    1277              15 :             let (_, params_raw) = query_string.split_at("fullbackup ".len());
    1278              15 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1279              15 : 
    1280              15 :             if params.len() < 2 {
    1281 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1282               0 :                     "invalid param number for fullbackup command"
    1283               0 :                 )));
    1284 CBC          15 :             }
    1285                 : 
    1286              15 :             let tenant_id = TenantId::from_str(params[0])
    1287              15 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1288              15 :             let timeline_id = TimelineId::from_str(params[1])
    1289              15 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1290                 : 
    1291              15 :             tracing::Span::current()
    1292              15 :                 .record("tenant_id", field::display(tenant_id))
    1293              15 :                 .record("timeline_id", field::display(timeline_id));
    1294                 : 
    1295                 :             // The caller is responsible for providing correct lsn and prev_lsn.
    1296              15 :             let lsn = if params.len() > 2 {
    1297                 :                 Some(
    1298              15 :                     Lsn::from_str(params[2])
    1299              15 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
    1300                 :                 )
    1301                 :             } else {
    1302 UBC           0 :                 None
    1303                 :             };
    1304 CBC          15 :             let prev_lsn = if params.len() > 3 {
    1305                 :                 Some(
    1306              12 :                     Lsn::from_str(params[3])
    1307              12 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
    1308                 :                 )
    1309                 :             } else {
    1310               3 :                 None
    1311                 :             };
    1312                 : 
    1313              15 :             self.check_permission(Some(tenant_id))?;
    1314                 : 
    1315                 :             // Check that the timeline exists
    1316              15 :             self.handle_basebackup_request(
    1317              15 :                 pgb,
    1318              15 :                 tenant_id,
    1319              15 :                 timeline_id,
    1320              15 :                 lsn,
    1321              15 :                 prev_lsn,
    1322              15 :                 true,
    1323              15 :                 false,
    1324              15 :                 ctx,
    1325              15 :             )
    1326            7559 :             .await?;
    1327              15 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1328              38 :         } else if query_string.starts_with("import basebackup ") {
    1329                 :             // Import the `base` section (everything but the wal) of a basebackup.
    1330                 :             // Assumes the tenant already exists on this pageserver.
    1331                 :             //
    1332                 :             // Files are scheduled to be persisted to remote storage, and the
    1333                 :             // caller should poll the http api to check when that is done.
    1334                 :             //
    1335                 :             // Example import command:
    1336                 :             // 1. Get start/end LSN from backup_manifest file
    1337                 :             // 2. Run:
    1338                 :             // cat my_backup/base.tar | psql -h $PAGESERVER \
    1339                 :             //     -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
    1340              11 :             let (_, params_raw) = query_string.split_at("import basebackup ".len());
    1341              11 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1342              11 :             if params.len() != 5 {
    1343 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1344               0 :                     "invalid param number for import basebackup command"
    1345               0 :                 )));
    1346 CBC          11 :             }
    1347              11 :             let tenant_id = TenantId::from_str(params[0])
    1348              11 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1349              11 :             let timeline_id = TimelineId::from_str(params[1])
    1350              11 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1351              11 :             let base_lsn = Lsn::from_str(params[2])
    1352              11 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
    1353              11 :             let end_lsn = Lsn::from_str(params[3])
    1354              11 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
    1355              11 :             let pg_version = u32::from_str(params[4])
    1356              11 :                 .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
    1357                 : 
    1358              11 :             tracing::Span::current()
    1359              11 :                 .record("tenant_id", field::display(tenant_id))
    1360              11 :                 .record("timeline_id", field::display(timeline_id));
    1361              11 : 
    1362              11 :             self.check_permission(Some(tenant_id))?;
    1363                 : 
    1364              11 :             match self
    1365              11 :                 .handle_import_basebackup(
    1366              11 :                     pgb,
    1367              11 :                     tenant_id,
    1368              11 :                     timeline_id,
    1369              11 :                     base_lsn,
    1370              11 :                     end_lsn,
    1371              11 :                     pg_version,
    1372              11 :                     ctx,
    1373              11 :                 )
    1374           10177 :                 .await
    1375                 :             {
    1376               9 :                 Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
    1377               2 :                 Err(e) => {
    1378               2 :                     error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
    1379               2 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1380               2 :                         &e.to_string(),
    1381               2 :                         Some(e.pg_error_code()),
    1382               2 :                     ))?
    1383                 :                 }
    1384                 :             };
    1385              27 :         } else if query_string.starts_with("import wal ") {
    1386                 :             // Import the `pg_wal` section of a basebackup.
    1387                 :             //
    1388                 :             // Files are scheduled to be persisted to remote storage, and the
    1389                 :             // caller should poll the http api to check when that is done.
    1390               2 :             let (_, params_raw) = query_string.split_at("import wal ".len());
    1391               2 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1392               2 :             if params.len() != 4 {
    1393 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1394               0 :                     "invalid param number for import wal command"
    1395               0 :                 )));
    1396 CBC           2 :             }
    1397               2 :             let tenant_id = TenantId::from_str(params[0])
    1398               2 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1399               2 :             let timeline_id = TimelineId::from_str(params[1])
    1400               2 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1401               2 :             let start_lsn = Lsn::from_str(params[2])
    1402               2 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
    1403               2 :             let end_lsn = Lsn::from_str(params[3])
    1404               2 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
    1405                 : 
    1406               2 :             tracing::Span::current()
    1407               2 :                 .record("tenant_id", field::display(tenant_id))
    1408               2 :                 .record("timeline_id", field::display(timeline_id));
    1409               2 : 
    1410               2 :             self.check_permission(Some(tenant_id))?;
    1411                 : 
    1412               2 :             match self
    1413               2 :                 .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
    1414            3781 :                 .await
    1415                 :             {
    1416               2 :                 Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
    1417 UBC           0 :                 Err(e) => {
    1418               0 :                     error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
    1419               0 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1420               0 :                         &e.to_string(),
    1421               0 :                         Some(e.pg_error_code()),
    1422               0 :                     ))?
    1423                 :                 }
    1424                 :             };
    1425 CBC          25 :         } else if query_string.to_ascii_lowercase().starts_with("set ") {
    1426                 :             // important because psycopg2 executes "SET datestyle TO 'ISO'"
    1427                 :             // on connect
    1428              20 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1429               5 :         } else if query_string.starts_with("show ") {
    1430                 :             // show <tenant_id>
    1431               5 :             let (_, params_raw) = query_string.split_at("show ".len());
    1432               5 :             let params = params_raw.split(' ').collect::<Vec<_>>();
    1433               5 :             if params.len() != 1 {
    1434 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1435               0 :                     "invalid param number for config command"
    1436               0 :                 )));
    1437 CBC           5 :             }
    1438               5 :             let tenant_id = TenantId::from_str(params[0])
    1439               5 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1440                 : 
    1441               5 :             tracing::Span::current().record("tenant_id", field::display(tenant_id));
    1442               5 : 
    1443               5 :             self.check_permission(Some(tenant_id))?;
    1444                 : 
    1445               5 :             let tenant = get_active_tenant_with_timeout(
    1446               5 :                 tenant_id,
    1447               5 :                 ShardSelector::Zero,
    1448               5 :                 ACTIVE_TENANT_TIMEOUT,
    1449               5 :                 &task_mgr::shutdown_token(),
    1450               5 :             )
    1451 UBC           0 :             .await?;
    1452 CBC           5 :             pgb.write_message_noflush(&BeMessage::RowDescription(&[
    1453               5 :                 RowDescriptor::int8_col(b"checkpoint_distance"),
    1454               5 :                 RowDescriptor::int8_col(b"checkpoint_timeout"),
    1455               5 :                 RowDescriptor::int8_col(b"compaction_target_size"),
    1456               5 :                 RowDescriptor::int8_col(b"compaction_period"),
    1457               5 :                 RowDescriptor::int8_col(b"compaction_threshold"),
    1458               5 :                 RowDescriptor::int8_col(b"gc_horizon"),
    1459               5 :                 RowDescriptor::int8_col(b"gc_period"),
    1460               5 :                 RowDescriptor::int8_col(b"image_creation_threshold"),
    1461               5 :                 RowDescriptor::int8_col(b"pitr_interval"),
    1462               5 :             ]))?
    1463               5 :             .write_message_noflush(&BeMessage::DataRow(&[
    1464               5 :                 Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
    1465               5 :                 Some(
    1466               5 :                     tenant
    1467               5 :                         .get_checkpoint_timeout()
    1468               5 :                         .as_secs()
    1469               5 :                         .to_string()
    1470               5 :                         .as_bytes(),
    1471               5 :                 ),
    1472               5 :                 Some(tenant.get_compaction_target_size().to_string().as_bytes()),
    1473               5 :                 Some(
    1474               5 :                     tenant
    1475               5 :                         .get_compaction_period()
    1476               5 :                         .as_secs()
    1477               5 :                         .to_string()
    1478               5 :                         .as_bytes(),
    1479               5 :                 ),
    1480               5 :                 Some(tenant.get_compaction_threshold().to_string().as_bytes()),
    1481               5 :                 Some(tenant.get_gc_horizon().to_string().as_bytes()),
    1482               5 :                 Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
    1483               5 :                 Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
    1484               5 :                 Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
    1485               5 :             ]))?
    1486               5 :             .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1487                 :         } else {
    1488 UBC           0 :             return Err(QueryError::Other(anyhow::anyhow!(
    1489               0 :                 "unknown command {query_string}"
    1490               0 :             )));
    1491                 :         }
    1492                 : 
    1493 CBC        7294 :         Ok(())
    1494           16140 :     }
    1495                 : }
    1496                 : 
    1497                 : impl From<GetActiveTenantError> for QueryError {
    1498             514 :     fn from(e: GetActiveTenantError) -> Self {
    1499 UBC           0 :         match e {
    1500               0 :             GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
    1501               0 :                 ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
    1502               0 :             ),
    1503                 :             GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
    1504               0 :                 QueryError::Shutdown
    1505                 :             }
    1506 CBC         514 :             e => QueryError::Other(anyhow::anyhow!(e)),
    1507                 :         }
    1508             514 :     }
    1509                 : }
    1510                 : 
    1511               3 : #[derive(Debug, thiserror::Error)]
    1512                 : enum GetActiveTimelineError {
    1513                 :     #[error(transparent)]
    1514                 :     Tenant(GetActiveTenantError),
    1515                 :     #[error(transparent)]
    1516                 :     Timeline(#[from] GetTimelineError),
    1517                 : }
    1518                 : 
    1519                 : impl From<GetActiveTimelineError> for QueryError {
    1520 UBC           0 :     fn from(e: GetActiveTimelineError) -> Self {
    1521               0 :         match e {
    1522               0 :             GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
    1523               0 :             GetActiveTimelineError::Tenant(e) => e.into(),
    1524               0 :             GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
    1525                 :         }
    1526               0 :     }
    1527                 : }
        

Generated by: LCOV version 2.1-beta