LCOV - code coverage report
Current view: top level - pageserver/src - page_service.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 87.0 % 585 509
Test Date: 2023-09-06 10:18:01 Functions: 39.8 % 176 70

            Line data    Source code
       1              : //
       2              : //! The Page Service listens for client connections and serves their GetPage@LSN
       3              : //! requests.
       4              : //
       5              : //   It is possible to connect here using usual psql/pgbench/libpq. Following
       6              : // commands are supported now:
       7              : //     *status* -- show actual info about this pageserver,
       8              : //     *pagestream* -- enter mode where smgr and pageserver talk with their
       9              : //  custom protocol.
      10              : //
      11              : 
      12              : use anyhow::Context;
      13              : use async_compression::tokio::write::GzipEncoder;
      14              : use bytes::Buf;
      15              : use bytes::Bytes;
      16              : use futures::Stream;
      17              : use pageserver_api::models::TenantState;
      18              : use pageserver_api::models::{
      19              :     PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
      20              :     PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
      21              :     PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
      22              :     PagestreamNblocksRequest, PagestreamNblocksResponse,
      23              : };
      24              : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
      25              : use pq_proto::framed::ConnectionError;
      26              : use pq_proto::FeStartupPacket;
      27              : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
      28              : use std::io;
      29              : use std::net::TcpListener;
      30              : use std::pin::pin;
      31              : use std::str;
      32              : use std::str::FromStr;
      33              : use std::sync::Arc;
      34              : use std::time::Duration;
      35              : use tokio::io::AsyncWriteExt;
      36              : use tokio::io::{AsyncRead, AsyncWrite};
      37              : use tokio_util::io::StreamReader;
      38              : use tracing::field;
      39              : use tracing::*;
      40              : use utils::id::ConnectionId;
      41              : use utils::{
      42              :     auth::{Claims, JwtAuth, Scope},
      43              :     id::{TenantId, TimelineId},
      44              :     lsn::Lsn,
      45              :     simple_rcu::RcuReadGuard,
      46              : };
      47              : 
      48              : use crate::auth::check_permission;
      49              : use crate::basebackup;
      50              : use crate::config::PageServerConf;
      51              : use crate::context::{DownloadBehavior, RequestContext};
      52              : use crate::import_datadir::import_wal_from_tar;
      53              : use crate::metrics;
      54              : use crate::metrics::LIVE_CONNECTIONS_COUNT;
      55              : use crate::task_mgr;
      56              : use crate::task_mgr::TaskKind;
      57              : use crate::tenant;
      58              : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      59              : use crate::tenant::mgr;
      60              : use crate::tenant::mgr::GetTenantError;
      61              : use crate::tenant::{Tenant, Timeline};
      62              : use crate::trace::Tracer;
      63              : 
      64              : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
      65              : use postgres_ffi::BLCKSZ;
      66              : 
      67            7 : fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
      68            7 : where
      69            7 :     IO: AsyncRead + AsyncWrite + Unpin,
      70            7 : {
      71            7 :     async_stream::try_stream! {
      72            7 :         loop {
      73        26355 :             let msg = tokio::select! {
      74            7 :                 biased;
      75            7 : 
      76            7 :                 _ = task_mgr::shutdown_watcher() => {
      77            7 :                     // We were requested to shut down.
      78            7 :                     let msg = "pageserver is shutting down";
      79            7 :                     let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
      80            7 :                     Err(QueryError::Other(anyhow::anyhow!(msg)))
      81            7 :                 }
      82            7 : 
      83        26355 :                 msg = pgb.read_message() => { msg.map_err(QueryError::from)}
      84            7 :             };
      85            7 : 
      86        26355 :             match msg {
      87        26355 :                 Ok(Some(message)) => {
      88        26355 :                     let copy_data_bytes = match message {
      89        26342 :                         FeMessage::CopyData(bytes) => bytes,
      90            7 :                         FeMessage::CopyDone => { break },
      91            7 :                         FeMessage::Sync => continue,
      92            7 :                         FeMessage::Terminate => {
      93            7 :                             let msg = "client terminated connection with Terminate message during COPY";
      94            0 :                             let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
      95            0 :                             // error can't happen here, ErrorResponse serialization should be always ok
      96            0 :                             pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
      97            7 :                             Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
      98            7 :                             break;
      99            7 :                         }
     100            7 :                         m => {
     101            0 :                             let msg = format!("unexpected message {m:?}");
     102            0 :                             // error can't happen here, ErrorResponse serialization should be always ok
     103            0 :                             pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
     104            7 :                             Err(io::Error::new(io::ErrorKind::Other, msg))?;
     105            7 :                             break;
     106            7 :                         }
     107            7 :                     };
     108            7 : 
     109        26342 :                     yield copy_data_bytes;
     110            7 :                 }
     111            7 :                 Ok(None) => {
     112            7 :                     let msg = "client closed connection during COPY";
     113            0 :                     let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
     114            0 :                     // error can't happen here, ErrorResponse serialization should be always ok
     115            0 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
     116            7 :                     pgb.flush().await?;
     117            7 :                     Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
     118            7 :                 }
     119            7 :                 Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     120            0 :                     Err(io_error)?;
     121            7 :                 }
     122            7 :                 Err(other) => {
     123            0 :                     Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
     124            7 :                 }
     125            7 :             };
     126            7 :         }
     127            7 :     }
     128            7 : }
     129              : 
     130              : /// Read the end of a tar archive.
     131              : ///
     132              : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
     133              : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
     134              : /// and check that there is no more data after the EOF marker.
     135              : ///
     136              : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
     137              : /// Perhaps it should be a hard error?
     138            5 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
     139            5 :     use tokio::io::AsyncReadExt;
     140            5 :     let mut buf = [0u8; 512];
     141            5 : 
     142            5 :     // Read the all-zeros block, and verify it
     143            5 :     let mut total_bytes = 0;
     144           10 :     while total_bytes < 512 {
     145            5 :         let nbytes = reader.read(&mut buf[total_bytes..]).await?;
     146            5 :         total_bytes += nbytes;
     147            5 :         if nbytes == 0 {
     148            0 :             break;
     149            5 :         }
     150              :     }
     151            5 :     if total_bytes < 512 {
     152            0 :         anyhow::bail!("incomplete or invalid tar EOF marker");
     153            5 :     }
     154         2560 :     if !buf.iter().all(|&x| x == 0) {
     155            0 :         anyhow::bail!("invalid tar EOF marker");
     156            5 :     }
     157            5 : 
     158            5 :     // Drain any data after the EOF marker
     159            5 :     let mut trailing_bytes = 0;
     160              :     loop {
     161            6 :         let nbytes = reader.read(&mut buf).await?;
     162            6 :         trailing_bytes += nbytes;
     163            6 :         if nbytes == 0 {
     164            5 :             break;
     165            1 :         }
     166              :     }
     167            5 :     if trailing_bytes > 0 {
     168            1 :         warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
     169            4 :     }
     170            5 :     Ok(())
     171            5 : }
     172              : 
     173              : ///////////////////////////////////////////////////////////////////////////////
     174              : 
     175              : ///
     176              : /// Main loop of the page service.
     177              : ///
     178              : /// Listens for connections, and launches a new handler task for each.
     179              : ///
     180          575 : pub async fn libpq_listener_main(
     181          575 :     conf: &'static PageServerConf,
     182          575 :     broker_client: storage_broker::BrokerClientChannel,
     183          575 :     auth: Option<Arc<JwtAuth>>,
     184          575 :     listener: TcpListener,
     185          575 :     auth_type: AuthType,
     186          575 :     listener_ctx: RequestContext,
     187          575 : ) -> anyhow::Result<()> {
     188          575 :     listener.set_nonblocking(true)?;
     189          575 :     let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
     190              : 
     191              :     // Wait for a new connection to arrive, or for server shutdown.
     192         5902 :     while let Some(res) = tokio::select! {
     193              :         biased;
     194              : 
     195              :         _ = task_mgr::shutdown_watcher() => {
     196              :             // We were requested to shut down.
     197              :             None
     198              :         }
     199              : 
     200         5327 :         res = tokio_listener.accept() => {
     201              :             Some(res)
     202              :         }
     203              :     } {
     204         5327 :         match res {
     205         5327 :             Ok((socket, peer_addr)) => {
     206              :                 // Connection established. Spawn a new task to handle it.
     207            0 :                 debug!("accepted connection from {}", peer_addr);
     208         5327 :                 let local_auth = auth.clone();
     209         5327 : 
     210         5327 :                 let connection_ctx = listener_ctx
     211         5327 :                     .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
     212         5327 : 
     213         5327 :                 // PageRequestHandler tasks are not associated with any particular
     214         5327 :                 // timeline in the task manager. In practice most connections will
     215         5327 :                 // only deal with a particular timeline, but we don't know which one
     216         5327 :                 // yet.
     217         5327 :                 task_mgr::spawn(
     218         5327 :                     &tokio::runtime::Handle::current(),
     219         5327 :                     TaskKind::PageRequestHandler,
     220         5327 :                     None,
     221         5327 :                     None,
     222         5327 :                     "serving compute connection task",
     223         5327 :                     false,
     224         5327 :                     page_service_conn_main(
     225         5327 :                         conf,
     226         5327 :                         broker_client.clone(),
     227         5327 :                         local_auth,
     228         5327 :                         socket,
     229         5327 :                         auth_type,
     230         5327 :                         connection_ctx,
     231         5327 :                     ),
     232         5327 :                 );
     233              :             }
     234            0 :             Err(err) => {
     235              :                 // accept() failed. Log the error, and loop back to retry on next connection.
     236            0 :                 error!("accept() failed: {:?}", err);
     237              :             }
     238              :         }
     239              :     }
     240              : 
     241            0 :     debug!("page_service loop terminated");
     242              : 
     243          148 :     Ok(())
     244          148 : }
     245              : 
     246        15981 : #[instrument(skip_all, fields(peer_addr))]
     247              : async fn page_service_conn_main(
     248              :     conf: &'static PageServerConf,
     249              :     broker_client: storage_broker::BrokerClientChannel,
     250              :     auth: Option<Arc<JwtAuth>>,
     251              :     socket: tokio::net::TcpStream,
     252              :     auth_type: AuthType,
     253              :     connection_ctx: RequestContext,
     254              : ) -> anyhow::Result<()> {
     255              :     // Immediately increment the gauge, then create a job to decrement it on task exit.
     256              :     // One of the pros of `defer!` is that this will *most probably*
     257              :     // get called, even in presence of panics.
     258              :     let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
     259              :     gauge.inc();
     260         5260 :     scopeguard::defer! {
     261         5260 :         gauge.dec();
     262         5260 :     }
     263              : 
     264              :     socket
     265              :         .set_nodelay(true)
     266              :         .context("could not set TCP_NODELAY")?;
     267              : 
     268              :     let peer_addr = socket.peer_addr().context("get peer address")?;
     269              :     tracing::Span::current().record("peer_addr", field::display(peer_addr));
     270              : 
     271              :     // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
     272              :     // - long enough for most valid compute connections
     273              :     // - less than infinite to stop us from "leaking" connections to long-gone computes
     274              :     //
     275              :     // no write timeout is used, because the kernel is assumed to error writes after some time.
     276              :     let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
     277              : 
     278              :     // timeout should be lower, but trying out multiple days for
     279              :     // <https://github.com/neondatabase/neon/issues/4205>
     280              :     socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3)));
     281              :     let socket = std::pin::pin!(socket);
     282              : 
     283              :     // XXX: pgbackend.run() should take the connection_ctx,
     284              :     // and create a child per-query context when it invokes process_query.
     285              :     // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
     286              :     // and create the per-query context in process_query ourselves.
     287              :     let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
     288              :     let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
     289              : 
     290              :     match pgbackend
     291              :         .run(&mut conn_handler, task_mgr::shutdown_watcher)
     292              :         .await
     293              :     {
     294              :         Ok(()) => {
     295              :             // we've been requested to shut down
     296              :             Ok(())
     297              :         }
     298              :         Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     299              :             if is_expected_io_error(&io_error) {
     300            7 :                 info!("Postgres client disconnected ({io_error})");
     301              :                 Ok(())
     302              :             } else {
     303              :                 Err(io_error).context("Postgres connection error")
     304              :             }
     305              :         }
     306              :         other => other.context("Postgres query error"),
     307              :     }
     308              : }
     309              : 
     310              : struct PageServerHandler {
     311              :     _conf: &'static PageServerConf,
     312              :     broker_client: storage_broker::BrokerClientChannel,
     313              :     auth: Option<Arc<JwtAuth>>,
     314              :     claims: Option<Claims>,
     315              : 
     316              :     /// The context created for the lifetime of the connection
     317              :     /// services by this PageServerHandler.
     318              :     /// For each query received over the connection,
     319              :     /// `process_query` creates a child context from this one.
     320              :     connection_ctx: RequestContext,
     321              : }
     322              : 
     323              : impl PageServerHandler {
     324         5327 :     pub fn new(
     325         5327 :         conf: &'static PageServerConf,
     326         5327 :         broker_client: storage_broker::BrokerClientChannel,
     327         5327 :         auth: Option<Arc<JwtAuth>>,
     328         5327 :         connection_ctx: RequestContext,
     329         5327 :     ) -> Self {
     330         5327 :         PageServerHandler {
     331         5327 :             _conf: conf,
     332         5327 :             broker_client,
     333         5327 :             auth,
     334         5327 :             claims: None,
     335         5327 :             connection_ctx,
     336         5327 :         }
     337         5327 :     }
     338              : 
     339        22972 :     #[instrument(skip_all)]
     340              :     async fn handle_pagerequests<IO>(
     341              :         &self,
     342              :         pgb: &mut PostgresBackend<IO>,
     343              :         tenant_id: TenantId,
     344              :         timeline_id: TimelineId,
     345              :         ctx: RequestContext,
     346              :     ) -> Result<(), QueryError>
     347              :     where
     348              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     349              :     {
     350              :         debug_assert_current_span_has_tenant_and_timeline_id();
     351              : 
     352              :         // NOTE: pagerequests handler exits when connection is closed,
     353              :         //       so there is no need to reset the association
     354              :         task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
     355              : 
     356              :         // Make request tracer if needed
     357              :         let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
     358              :         let mut tracer = if tenant.get_trace_read_requests() {
     359              :             let connection_id = ConnectionId::generate();
     360              :             let path = tenant
     361              :                 .conf
     362              :                 .trace_path(&tenant_id, &timeline_id, &connection_id);
     363              :             Some(Tracer::new(path))
     364              :         } else {
     365              :             None
     366              :         };
     367              : 
     368              :         // Check that the timeline exists
     369              :         let timeline = tenant
     370              :             .get_timeline(timeline_id, true)
     371            0 :             .map_err(|e| anyhow::anyhow!(e))?;
     372              : 
     373              :         // switch client to COPYBOTH
     374              :         pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
     375              :         pgb.flush().await?;
     376              : 
     377              :         let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
     378              : 
     379              :         loop {
     380      8851539 :             let msg = tokio::select! {
     381              :                 biased;
     382              : 
     383              :                 _ = task_mgr::shutdown_watcher() => {
     384              :                     // We were requested to shut down.
     385           96 :                     info!("shutdown request received in page handler");
     386              :                     break;
     387              :                 }
     388              : 
     389      4604004 :                 msg = pgb.read_message() => { msg }
     390              :             };
     391              : 
     392              :             let copy_data_bytes = match msg? {
     393              :                 Some(FeMessage::CopyData(bytes)) => bytes,
     394              :                 Some(FeMessage::Terminate) => break,
     395              :                 Some(m) => {
     396              :                     return Err(QueryError::Other(anyhow::anyhow!(
     397              :                         "unexpected message: {m:?} during COPY"
     398              :                     )));
     399              :                 }
     400              :                 None => break, // client disconnected
     401              :             };
     402              : 
     403            0 :             trace!("query: {copy_data_bytes:?}");
     404              : 
     405              :             // Trace request if needed
     406              :             if let Some(t) = tracer.as_mut() {
     407              :                 t.trace(&copy_data_bytes)
     408              :             }
     409              : 
     410              :             let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
     411              : 
     412              :             // TODO: We could create a new per-request context here, with unique ID.
     413              :             // Currently we use the same per-timeline context for all requests
     414              : 
     415              :             let response = match neon_fe_msg {
     416              :                 PagestreamFeMessage::Exists(req) => {
     417              :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
     418              :                     self.handle_get_rel_exists_request(&timeline, &req, &ctx)
     419              :                         .await
     420              :                 }
     421              :                 PagestreamFeMessage::Nblocks(req) => {
     422              :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
     423              :                     self.handle_get_nblocks_request(&timeline, &req, &ctx).await
     424              :                 }
     425              :                 PagestreamFeMessage::GetPage(req) => {
     426              :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
     427              :                     self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
     428              :                         .await
     429              :                 }
     430              :                 PagestreamFeMessage::DbSize(req) => {
     431              :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
     432              :                     self.handle_db_size_request(&timeline, &req, &ctx).await
     433              :                 }
     434              :             };
     435              : 
     436            1 :             let response = response.unwrap_or_else(|e| {
     437            1 :                 // print the all details to the log with {:#}, but for the client the
     438            1 :                 // error message is enough
     439            1 :                 error!("error reading relation or page version: {:?}", e);
     440            1 :                 PagestreamBeMessage::Error(PagestreamErrorResponse {
     441            1 :                     message: e.to_string(),
     442            1 :                 })
     443            1 :             });
     444              : 
     445              :             pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
     446              :             pgb.flush().await?;
     447              :         }
     448              :         Ok(())
     449              :     }
     450              : 
     451              :     #[allow(clippy::too_many_arguments)]
     452           10 :     #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
     453              :     async fn handle_import_basebackup<IO>(
     454              :         &self,
     455              :         pgb: &mut PostgresBackend<IO>,
     456              :         tenant_id: TenantId,
     457              :         timeline_id: TimelineId,
     458              :         base_lsn: Lsn,
     459              :         _end_lsn: Lsn,
     460              :         pg_version: u32,
     461              :         ctx: RequestContext,
     462              :     ) -> Result<(), QueryError>
     463              :     where
     464              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     465              :     {
     466              :         debug_assert_current_span_has_tenant_and_timeline_id();
     467              : 
     468              :         task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
     469              :         // Create empty timeline
     470            5 :         info!("creating new timeline");
     471              :         let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
     472              :         let timeline = tenant
     473              :             .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
     474              :             .await?;
     475              : 
     476              :         // TODO mark timeline as not ready until it reaches end_lsn.
     477              :         // We might have some wal to import as well, and we should prevent compute
     478              :         // from connecting before that and writing conflicting wal.
     479              :         //
     480              :         // This is not relevant for pageserver->pageserver migrations, since there's
     481              :         // no wal to import. But should be fixed if we want to import from postgres.
     482              : 
     483              :         // TODO leave clean state on error. For now you can use detach to clean
     484              :         // up broken state from a failed import.
     485              : 
     486              :         // Import basebackup provided via CopyData
     487            5 :         info!("importing basebackup");
     488              :         pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
     489              :         pgb.flush().await?;
     490              : 
     491              :         let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
     492              :         timeline
     493              :             .import_basebackup_from_tar(
     494              :                 &mut copyin_reader,
     495              :                 base_lsn,
     496              :                 self.broker_client.clone(),
     497              :                 &ctx,
     498              :             )
     499              :             .await?;
     500              : 
     501              :         // Read the end of the tar archive.
     502              :         read_tar_eof(copyin_reader).await?;
     503              : 
     504              :         // TODO check checksum
     505              :         // Meanwhile you can verify client-side by taking fullbackup
     506              :         // and checking that it matches in size with what was imported.
     507              :         // It wouldn't work if base came from vanilla postgres though,
     508              :         // since we discard some log files.
     509              : 
     510            3 :         info!("done");
     511              :         Ok(())
     512              :     }
     513              : 
     514            6 :     #[instrument(skip_all, fields(%start_lsn, %end_lsn))]
     515              :     async fn handle_import_wal<IO>(
     516              :         &self,
     517              :         pgb: &mut PostgresBackend<IO>,
     518              :         tenant_id: TenantId,
     519              :         timeline_id: TimelineId,
     520              :         start_lsn: Lsn,
     521              :         end_lsn: Lsn,
     522              :         ctx: RequestContext,
     523              :     ) -> Result<(), QueryError>
     524              :     where
     525              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     526              :     {
     527              :         debug_assert_current_span_has_tenant_and_timeline_id();
     528              :         task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
     529              : 
     530              :         let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
     531              :         let last_record_lsn = timeline.get_last_record_lsn();
     532              :         if last_record_lsn != start_lsn {
     533              :             return Err(QueryError::Other(
     534              :                 anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
     535              :             );
     536              :         }
     537              : 
     538              :         // TODO leave clean state on error. For now you can use detach to clean
     539              :         // up broken state from a failed import.
     540              : 
     541              :         // Import wal provided via CopyData
     542            2 :         info!("importing wal");
     543              :         pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
     544              :         pgb.flush().await?;
     545              :         let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
     546              :         import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
     547            2 :         info!("wal import complete");
     548              : 
     549              :         // Read the end of the tar archive.
     550              :         read_tar_eof(copyin_reader).await?;
     551              : 
     552              :         // TODO Does it make sense to overshoot?
     553              :         if timeline.get_last_record_lsn() < end_lsn {
     554              :             return Err(QueryError::Other(
     555              :                 anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
     556              :             );
     557              :         }
     558              : 
     559              :         // Flush data to disk, then upload to s3. No need for a forced checkpoint.
     560              :         // We only want to persist the data, and it doesn't matter if it's in the
     561              :         // shape of deltas or images.
     562            2 :         info!("flushing layers");
     563              :         timeline.freeze_and_flush().await?;
     564              : 
     565            2 :         info!("done");
     566              :         Ok(())
     567              :     }
     568              : 
     569              :     /// Helper function to handle the LSN from client request.
     570              :     ///
     571              :     /// Each GetPage (and Exists and Nblocks) request includes information about
     572              :     /// which version of the page is being requested. The client can request the
     573              :     /// latest version of the page, or the version that's valid at a particular
     574              :     /// LSN. The primary compute node will always request the latest page
     575              :     /// version, while a standby will request a version at the LSN that it's
     576              :     /// currently caught up to.
     577              :     ///
     578              :     /// In either case, if the page server hasn't received the WAL up to the
     579              :     /// requested LSN yet, we will wait for it to arrive. The return value is
     580              :     /// the LSN that should be used to look up the page versions.
     581      4599615 :     async fn wait_or_get_last_lsn(
     582      4599615 :         timeline: &Timeline,
     583      4599615 :         mut lsn: Lsn,
     584      4599615 :         latest: bool,
     585      4599615 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     586      4599615 :         ctx: &RequestContext,
     587      4599615 :     ) -> anyhow::Result<Lsn> {
     588      4599615 :         if latest {
     589              :             // Latest page version was requested. If LSN is given, it is a hint
     590              :             // to the page server that there have been no modifications to the
     591              :             // page after that LSN. If we haven't received WAL up to that point,
     592              :             // wait until it arrives.
     593      4497372 :             let last_record_lsn = timeline.get_last_record_lsn();
     594      4497372 : 
     595      4497372 :             // Note: this covers the special case that lsn == Lsn(0). That
     596      4497372 :             // special case means "return the latest version whatever it is",
     597      4497372 :             // and it's used for bootstrapping purposes, when the page server is
     598      4497372 :             // connected directly to the compute node. That is needed because
     599      4497372 :             // when you connect to the compute node, to receive the WAL, the
     600      4497372 :             // walsender process will do a look up in the pg_authid catalog
     601      4497372 :             // table for authentication. That poses a deadlock problem: the
     602      4497372 :             // catalog table lookup will send a GetPage request, but the GetPage
     603      4497372 :             // request will block in the page server because the recent WAL
     604      4497372 :             // hasn't been received yet, and it cannot be received until the
     605      4497372 :             // walsender completes the authentication and starts streaming the
     606      4497372 :             // WAL.
     607      4497372 :             if lsn <= last_record_lsn {
     608      4372604 :                 lsn = last_record_lsn;
     609      4372604 :             } else {
     610       169751 :                 timeline.wait_lsn(lsn, ctx).await?;
     611              :                 // Since we waited for 'lsn' to arrive, that is now the last
     612              :                 // record LSN. (Or close enough for our purposes; the
     613              :                 // last-record LSN can advance immediately after we return
     614              :                 // anyway)
     615              :             }
     616              :         } else {
     617       102243 :             if lsn == Lsn(0) {
     618            1 :                 anyhow::bail!("invalid LSN(0) in request");
     619       102242 :             }
     620       102242 :             timeline.wait_lsn(lsn, ctx).await?;
     621              :         }
     622      4599614 :         anyhow::ensure!(
     623      4599614 :             lsn >= **latest_gc_cutoff_lsn,
     624            0 :             "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
     625            0 :             lsn, **latest_gc_cutoff_lsn
     626              :         );
     627      4599614 :         Ok(lsn)
     628      4599615 :     }
     629              : 
     630       191820 :     #[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
     631              :     async fn handle_get_rel_exists_request(
     632              :         &self,
     633              :         timeline: &Timeline,
     634              :         req: &PagestreamExistsRequest,
     635              :         ctx: &RequestContext,
     636              :     ) -> anyhow::Result<PagestreamBeMessage> {
     637              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     638              :         let lsn =
     639              :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     640              :                 .await?;
     641              : 
     642              :         let exists = timeline
     643              :             .get_rel_exists(req.rel, lsn, req.latest, ctx)
     644              :             .await?;
     645              : 
     646              :         Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
     647              :             exists,
     648              :         }))
     649              :     }
     650              : 
     651        72780 :     #[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
     652              :     async fn handle_get_nblocks_request(
     653              :         &self,
     654              :         timeline: &Timeline,
     655              :         req: &PagestreamNblocksRequest,
     656              :         ctx: &RequestContext,
     657              :     ) -> anyhow::Result<PagestreamBeMessage> {
     658              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     659              :         let lsn =
     660              :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     661              :                 .await?;
     662              : 
     663              :         let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
     664              : 
     665              :         Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
     666              :             n_blocks,
     667              :         }))
     668              :     }
     669              : 
     670           20 :     #[instrument(skip(self, timeline, req, ctx), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
     671              :     async fn handle_db_size_request(
     672              :         &self,
     673              :         timeline: &Timeline,
     674              :         req: &PagestreamDbSizeRequest,
     675              :         ctx: &RequestContext,
     676              :     ) -> anyhow::Result<PagestreamBeMessage> {
     677              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     678              :         let lsn =
     679              :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     680              :                 .await?;
     681              : 
     682              :         let total_blocks = timeline
     683              :             .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
     684              :             .await?;
     685              :         let db_size = total_blocks as i64 * BLCKSZ as i64;
     686              : 
     687              :         Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
     688              :             db_size,
     689              :         }))
     690              :     }
     691              : 
     692     13600380 :     #[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
     693              :     async fn handle_get_page_at_lsn_request(
     694              :         &self,
     695              :         timeline: &Timeline,
     696              :         req: &PagestreamGetPageRequest,
     697              :         ctx: &RequestContext,
     698              :     ) -> anyhow::Result<PagestreamBeMessage> {
     699              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     700              :         let lsn =
     701              :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     702              :                 .await?;
     703              :         /*
     704              :         // Add a 1s delay to some requests. The delay helps the requests to
     705              :         // hit the race condition from github issue #1047 more easily.
     706              :         use rand::Rng;
     707              :         if rand::thread_rng().gen::<u8>() < 5 {
     708              :             std::thread::sleep(std::time::Duration::from_millis(1000));
     709              :         }
     710              :         */
     711              : 
     712              :         let page = timeline
     713              :             .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
     714              :             .await?;
     715              : 
     716              :         Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
     717              :             page,
     718              :         }))
     719              :     }
     720              : 
     721              :     #[allow(clippy::too_many_arguments)]
     722         2007 :     #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
     723              :     async fn handle_basebackup_request<IO>(
     724              :         &mut self,
     725              :         pgb: &mut PostgresBackend<IO>,
     726              :         tenant_id: TenantId,
     727              :         timeline_id: TimelineId,
     728              :         lsn: Option<Lsn>,
     729              :         prev_lsn: Option<Lsn>,
     730              :         full_backup: bool,
     731              :         gzip: bool,
     732              :         ctx: RequestContext,
     733              :     ) -> anyhow::Result<()>
     734              :     where
     735              :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     736              :     {
     737              :         debug_assert_current_span_has_tenant_and_timeline_id();
     738              : 
     739              :         let started = std::time::Instant::now();
     740              : 
     741              :         // check that the timeline exists
     742              :         let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
     743              :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     744              :         if let Some(lsn) = lsn {
     745              :             // Backup was requested at a particular LSN. Wait for it to arrive.
     746          220 :             info!("waiting for {}", lsn);
     747              :             timeline.wait_lsn(lsn, &ctx).await?;
     748              :             timeline
     749              :                 .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
     750              :                 .context("invalid basebackup lsn")?;
     751              :         }
     752              : 
     753              :         let lsn_awaited_after = started.elapsed();
     754              : 
     755              :         // switch client to COPYOUT
     756              :         pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
     757              :         pgb.flush().await?;
     758              : 
     759              :         // Send a tarball of the latest layer on the timeline. Compress if not
     760              :         // fullbackup. TODO Compress in that case too (tests need to be updated)
     761              :         if full_backup {
     762              :             let mut writer = pgb.copyout_writer();
     763              :             basebackup::send_basebackup_tarball(
     764              :                 &mut writer,
     765              :                 &timeline,
     766              :                 lsn,
     767              :                 prev_lsn,
     768              :                 full_backup,
     769              :                 &ctx,
     770              :             )
     771              :             .await?;
     772              :         } else {
     773              :             let mut writer = pgb.copyout_writer();
     774              :             if gzip {
     775              :                 let mut encoder = GzipEncoder::with_quality(
     776              :                     writer,
     777              :                     // NOTE using fast compression because it's on the critical path
     778              :                     //      for compute startup. For an empty database, we get
     779              :                     //      <100KB with this method. The Level::Best compression method
     780              :                     //      gives us <20KB, but maybe we should add basebackup caching
     781              :                     //      on compute shutdown first.
     782              :                     async_compression::Level::Fastest,
     783              :                 );
     784              :                 basebackup::send_basebackup_tarball(
     785              :                     &mut encoder,
     786              :                     &timeline,
     787              :                     lsn,
     788              :                     prev_lsn,
     789              :                     full_backup,
     790              :                     &ctx,
     791              :                 )
     792              :                 .await?;
     793              :                 // shutdown the encoder to ensure the gzip footer is written
     794              :                 encoder.shutdown().await?;
     795              :             } else {
     796              :                 basebackup::send_basebackup_tarball(
     797              :                     &mut writer,
     798              :                     &timeline,
     799              :                     lsn,
     800              :                     prev_lsn,
     801              :                     full_backup,
     802              :                     &ctx,
     803              :                 )
     804              :                 .await?;
     805              :             }
     806              :         }
     807              : 
     808              :         pgb.write_message_noflush(&BeMessage::CopyDone)?;
     809              :         pgb.flush().await?;
     810              : 
     811              :         let basebackup_after = started
     812              :             .elapsed()
     813              :             .checked_sub(lsn_awaited_after)
     814              :             .unwrap_or(Duration::ZERO);
     815              : 
     816          660 :         info!(
     817          660 :             lsn_await_millis = lsn_awaited_after.as_millis(),
     818          660 :             basebackup_millis = basebackup_after.as_millis(),
     819          660 :             "basebackup complete"
     820          660 :         );
     821              : 
     822              :         Ok(())
     823              :     }
     824              : 
     825              :     // when accessing management api supply None as an argument
     826              :     // when using to authorize tenant pass corresponding tenant id
     827         5314 :     fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
     828         5314 :         if self.auth.is_none() {
     829              :             // auth is set to Trust, nothing to check so just return ok
     830         5236 :             return Ok(());
     831           78 :         }
     832           78 :         // auth is some, just checked above, when auth is some
     833           78 :         // then claims are always present because of checks during connection init
     834           78 :         // so this expect won't trigger
     835           78 :         let claims = self
     836           78 :             .claims
     837           78 :             .as_ref()
     838           78 :             .expect("claims presence already checked");
     839           78 :         check_permission(claims, tenant_id)
     840         5314 :     }
     841              : }
     842              : 
     843              : #[async_trait::async_trait]
     844              : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
     845              : where
     846              :     IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     847              : {
     848           80 :     fn check_auth_jwt(
     849           80 :         &mut self,
     850           80 :         _pgb: &mut PostgresBackend<IO>,
     851           80 :         jwt_response: &[u8],
     852           80 :     ) -> Result<(), QueryError> {
     853              :         // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
     854              :         // which requires auth to be present
     855           80 :         let data = self
     856           80 :             .auth
     857           80 :             .as_ref()
     858           80 :             .unwrap()
     859           80 :             .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
     860              : 
     861           80 :         if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
     862            0 :             return Err(QueryError::Other(anyhow::anyhow!(
     863            0 :                 "jwt token scope is Tenant, but tenant id is missing"
     864            0 :             )));
     865           80 :         }
     866           80 : 
     867           80 :         info!(
     868           80 :             "jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
     869           80 :             data.claims.scope, data.claims.tenant_id,
     870           80 :         );
     871              : 
     872           80 :         self.claims = Some(data.claims);
     873           80 :         Ok(())
     874           80 :     }
     875              : 
     876         5327 :     fn startup(
     877         5327 :         &mut self,
     878         5327 :         _pgb: &mut PostgresBackend<IO>,
     879         5327 :         _sm: &FeStartupPacket,
     880         5327 :     ) -> Result<(), QueryError> {
     881         5327 :         Ok(())
     882         5327 :     }
     883              : 
     884        10668 :     #[instrument(skip_all, fields(tenant_id, timeline_id))]
     885              :     async fn process_query(
     886              :         &mut self,
     887              :         pgb: &mut PostgresBackend<IO>,
     888              :         query_string: &str,
     889         5334 :     ) -> Result<(), QueryError> {
     890         5334 :         let ctx = self.connection_ctx.attached_child();
     891         5334 :         debug!("process query {query_string:?}");
     892              : 
     893         5334 :         if query_string.starts_with("pagestream ") {
     894         4622 :             let (_, params_raw) = query_string.split_at("pagestream ".len());
     895         4622 :             let params = params_raw.split(' ').collect::<Vec<_>>();
     896         4622 :             if params.len() != 2 {
     897            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
     898            0 :                     "invalid param number for pagestream command"
     899            0 :                 )));
     900         4622 :             }
     901         4622 :             let tenant_id = TenantId::from_str(params[0])
     902         4622 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
     903         4622 :             let timeline_id = TimelineId::from_str(params[1])
     904         4622 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
     905              : 
     906         4622 :             tracing::Span::current()
     907         4622 :                 .record("tenant_id", field::display(tenant_id))
     908         4622 :                 .record("timeline_id", field::display(timeline_id));
     909         4622 : 
     910         4622 :             self.check_permission(Some(tenant_id))?;
     911              : 
     912         4622 :             self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
     913     12981472 :                 .await?;
     914          712 :         } else if query_string.starts_with("basebackup ") {
     915          660 :             let (_, params_raw) = query_string.split_at("basebackup ".len());
     916          660 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
     917          660 : 
     918          660 :             if params.len() < 2 {
     919            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
     920            0 :                     "invalid param number for basebackup command"
     921            0 :                 )));
     922          660 :             }
     923              : 
     924          660 :             let tenant_id = TenantId::from_str(params[0])
     925          660 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
     926          660 :             let timeline_id = TimelineId::from_str(params[1])
     927          660 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
     928              : 
     929          660 :             tracing::Span::current()
     930          660 :                 .record("tenant_id", field::display(tenant_id))
     931          660 :                 .record("timeline_id", field::display(timeline_id));
     932          660 : 
     933          660 :             self.check_permission(Some(tenant_id))?;
     934              : 
     935          660 :             let lsn = if params.len() >= 3 {
     936              :                 Some(
     937          213 :                     Lsn::from_str(params[2])
     938          213 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
     939              :                 )
     940              :             } else {
     941          447 :                 None
     942              :             };
     943              : 
     944          660 :             let gzip = if params.len() >= 4 {
     945          213 :                 if params[3] == "--gzip" {
     946          213 :                     true
     947              :                 } else {
     948            0 :                     return Err(QueryError::Other(anyhow::anyhow!(
     949            0 :                         "Parameter in position 3 unknown {}",
     950            0 :                         params[3],
     951            0 :                     )));
     952              :                 }
     953              :             } else {
     954          447 :                 false
     955              :             };
     956              : 
     957          660 :             ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
     958          660 :                 &*metrics::BASEBACKUP_QUERY_TIME,
     959          660 :                 async move {
     960          660 :                     self.handle_basebackup_request(
     961          660 :                         pgb,
     962          660 :                         tenant_id,
     963          660 :                         timeline_id,
     964          660 :                         lsn,
     965          660 :                         None,
     966          660 :                         false,
     967          660 :                         gzip,
     968          660 :                         ctx,
     969          660 :                     )
     970        21790 :                     .await?;
     971          651 :                     pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
     972          651 :                     anyhow::Ok(())
     973          660 :                 },
     974          660 :             )
     975        21790 :             .await?;
     976              :         }
     977              :         // return pair of prev_lsn and last_lsn
     978           52 :         else if query_string.starts_with("get_last_record_rlsn ") {
     979           11 :             let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
     980           11 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
     981           11 : 
     982           11 :             if params.len() != 2 {
     983            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
     984            0 :                     "invalid param number for get_last_record_rlsn command"
     985            0 :                 )));
     986           11 :             }
     987              : 
     988           11 :             let tenant_id = TenantId::from_str(params[0])
     989           11 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
     990           11 :             let timeline_id = TimelineId::from_str(params[1])
     991           11 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
     992              : 
     993           11 :             tracing::Span::current()
     994           11 :                 .record("tenant_id", field::display(tenant_id))
     995           11 :                 .record("timeline_id", field::display(timeline_id));
     996           11 : 
     997           11 :             self.check_permission(Some(tenant_id))?;
     998            9 :             let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
     999              : 
    1000            9 :             let end_of_timeline = timeline.get_last_record_rlsn();
    1001            9 : 
    1002            9 :             pgb.write_message_noflush(&BeMessage::RowDescription(&[
    1003            9 :                 RowDescriptor::text_col(b"prev_lsn"),
    1004            9 :                 RowDescriptor::text_col(b"last_lsn"),
    1005            9 :             ]))?
    1006            9 :             .write_message_noflush(&BeMessage::DataRow(&[
    1007            9 :                 Some(end_of_timeline.prev.to_string().as_bytes()),
    1008            9 :                 Some(end_of_timeline.last.to_string().as_bytes()),
    1009            9 :             ]))?
    1010            9 :             .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1011              :         }
    1012              :         // same as basebackup, but result includes relational data as well
    1013           41 :         else if query_string.starts_with("fullbackup ") {
    1014            9 :             let (_, params_raw) = query_string.split_at("fullbackup ".len());
    1015            9 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1016            9 : 
    1017            9 :             if params.len() < 2 {
    1018            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1019            0 :                     "invalid param number for fullbackup command"
    1020            0 :                 )));
    1021            9 :             }
    1022              : 
    1023            9 :             let tenant_id = TenantId::from_str(params[0])
    1024            9 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1025            9 :             let timeline_id = TimelineId::from_str(params[1])
    1026            9 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1027              : 
    1028            9 :             tracing::Span::current()
    1029            9 :                 .record("tenant_id", field::display(tenant_id))
    1030            9 :                 .record("timeline_id", field::display(timeline_id));
    1031              : 
    1032              :             // The caller is responsible for providing correct lsn and prev_lsn.
    1033            9 :             let lsn = if params.len() > 2 {
    1034              :                 Some(
    1035            9 :                     Lsn::from_str(params[2])
    1036            9 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
    1037              :                 )
    1038              :             } else {
    1039            0 :                 None
    1040              :             };
    1041            9 :             let prev_lsn = if params.len() > 3 {
    1042              :                 Some(
    1043            6 :                     Lsn::from_str(params[3])
    1044            6 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
    1045              :                 )
    1046              :             } else {
    1047            3 :                 None
    1048              :             };
    1049              : 
    1050            9 :             self.check_permission(Some(tenant_id))?;
    1051              : 
    1052              :             // Check that the timeline exists
    1053            9 :             self.handle_basebackup_request(
    1054            9 :                 pgb,
    1055            9 :                 tenant_id,
    1056            9 :                 timeline_id,
    1057            9 :                 lsn,
    1058            9 :                 prev_lsn,
    1059            9 :                 true,
    1060            9 :                 false,
    1061            9 :                 ctx,
    1062            9 :             )
    1063         3527 :             .await?;
    1064            9 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1065           32 :         } else if query_string.starts_with("import basebackup ") {
    1066              :             // Import the `base` section (everything but the wal) of a basebackup.
    1067              :             // Assumes the tenant already exists on this pageserver.
    1068              :             //
    1069              :             // Files are scheduled to be persisted to remote storage, and the
    1070              :             // caller should poll the http api to check when that is done.
    1071              :             //
    1072              :             // Example import command:
    1073              :             // 1. Get start/end LSN from backup_manifest file
    1074              :             // 2. Run:
    1075              :             // cat my_backup/base.tar | psql -h $PAGESERVER \
    1076              :             //     -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
    1077            5 :             let (_, params_raw) = query_string.split_at("import basebackup ".len());
    1078            5 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1079            5 :             if params.len() != 5 {
    1080            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1081            0 :                     "invalid param number for import basebackup command"
    1082            0 :                 )));
    1083            5 :             }
    1084            5 :             let tenant_id = TenantId::from_str(params[0])
    1085            5 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1086            5 :             let timeline_id = TimelineId::from_str(params[1])
    1087            5 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1088            5 :             let base_lsn = Lsn::from_str(params[2])
    1089            5 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
    1090            5 :             let end_lsn = Lsn::from_str(params[3])
    1091            5 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
    1092            5 :             let pg_version = u32::from_str(params[4])
    1093            5 :                 .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
    1094              : 
    1095            5 :             tracing::Span::current()
    1096            5 :                 .record("tenant_id", field::display(tenant_id))
    1097            5 :                 .record("timeline_id", field::display(timeline_id));
    1098            5 : 
    1099            5 :             self.check_permission(Some(tenant_id))?;
    1100              : 
    1101            5 :             match self
    1102            5 :                 .handle_import_basebackup(
    1103            5 :                     pgb,
    1104            5 :                     tenant_id,
    1105            5 :                     timeline_id,
    1106            5 :                     base_lsn,
    1107            5 :                     end_lsn,
    1108            5 :                     pg_version,
    1109            5 :                     ctx,
    1110            5 :                 )
    1111          546 :                 .await
    1112              :             {
    1113            3 :                 Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
    1114            2 :                 Err(e) => {
    1115            2 :                     error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
    1116            2 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1117            2 :                         &e.to_string(),
    1118            2 :                         Some(e.pg_error_code()),
    1119            2 :                     ))?
    1120              :                 }
    1121              :             };
    1122           27 :         } else if query_string.starts_with("import wal ") {
    1123              :             // Import the `pg_wal` section of a basebackup.
    1124              :             //
    1125              :             // Files are scheduled to be persisted to remote storage, and the
    1126              :             // caller should poll the http api to check when that is done.
    1127            2 :             let (_, params_raw) = query_string.split_at("import wal ".len());
    1128            2 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1129            2 :             if params.len() != 4 {
    1130            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1131            0 :                     "invalid param number for import wal command"
    1132            0 :                 )));
    1133            2 :             }
    1134            2 :             let tenant_id = TenantId::from_str(params[0])
    1135            2 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1136            2 :             let timeline_id = TimelineId::from_str(params[1])
    1137            2 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1138            2 :             let start_lsn = Lsn::from_str(params[2])
    1139            2 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
    1140            2 :             let end_lsn = Lsn::from_str(params[3])
    1141            2 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
    1142              : 
    1143            2 :             tracing::Span::current()
    1144            2 :                 .record("tenant_id", field::display(tenant_id))
    1145            2 :                 .record("timeline_id", field::display(timeline_id));
    1146            2 : 
    1147            2 :             self.check_permission(Some(tenant_id))?;
    1148              : 
    1149            2 :             match self
    1150            2 :                 .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
    1151          124 :                 .await
    1152              :             {
    1153            2 :                 Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
    1154            0 :                 Err(e) => {
    1155            0 :                     error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
    1156            0 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1157            0 :                         &e.to_string(),
    1158            0 :                         Some(e.pg_error_code()),
    1159            0 :                     ))?
    1160              :                 }
    1161              :             };
    1162           25 :         } else if query_string.to_ascii_lowercase().starts_with("set ") {
    1163              :             // important because psycopg2 executes "SET datestyle TO 'ISO'"
    1164              :             // on connect
    1165           20 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1166            5 :         } else if query_string.starts_with("show ") {
    1167              :             // show <tenant_id>
    1168            5 :             let (_, params_raw) = query_string.split_at("show ".len());
    1169            5 :             let params = params_raw.split(' ').collect::<Vec<_>>();
    1170            5 :             if params.len() != 1 {
    1171            0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1172            0 :                     "invalid param number for config command"
    1173            0 :                 )));
    1174            5 :             }
    1175            5 :             let tenant_id = TenantId::from_str(params[0])
    1176            5 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1177              : 
    1178            5 :             tracing::Span::current().record("tenant_id", field::display(tenant_id));
    1179            5 : 
    1180            5 :             self.check_permission(Some(tenant_id))?;
    1181              : 
    1182            5 :             let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
    1183            5 :             pgb.write_message_noflush(&BeMessage::RowDescription(&[
    1184            5 :                 RowDescriptor::int8_col(b"checkpoint_distance"),
    1185            5 :                 RowDescriptor::int8_col(b"checkpoint_timeout"),
    1186            5 :                 RowDescriptor::int8_col(b"compaction_target_size"),
    1187            5 :                 RowDescriptor::int8_col(b"compaction_period"),
    1188            5 :                 RowDescriptor::int8_col(b"compaction_threshold"),
    1189            5 :                 RowDescriptor::int8_col(b"gc_horizon"),
    1190            5 :                 RowDescriptor::int8_col(b"gc_period"),
    1191            5 :                 RowDescriptor::int8_col(b"image_creation_threshold"),
    1192            5 :                 RowDescriptor::int8_col(b"pitr_interval"),
    1193            5 :             ]))?
    1194            5 :             .write_message_noflush(&BeMessage::DataRow(&[
    1195            5 :                 Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
    1196            5 :                 Some(
    1197            5 :                     tenant
    1198            5 :                         .get_checkpoint_timeout()
    1199            5 :                         .as_secs()
    1200            5 :                         .to_string()
    1201            5 :                         .as_bytes(),
    1202            5 :                 ),
    1203            5 :                 Some(tenant.get_compaction_target_size().to_string().as_bytes()),
    1204            5 :                 Some(
    1205            5 :                     tenant
    1206            5 :                         .get_compaction_period()
    1207            5 :                         .as_secs()
    1208            5 :                         .to_string()
    1209            5 :                         .as_bytes(),
    1210            5 :                 ),
    1211            5 :                 Some(tenant.get_compaction_threshold().to_string().as_bytes()),
    1212            5 :                 Some(tenant.get_gc_horizon().to_string().as_bytes()),
    1213            5 :                 Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
    1214            5 :                 Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
    1215            5 :                 Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
    1216            5 :             ]))?
    1217            5 :             .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1218              :         } else {
    1219            0 :             return Err(QueryError::Other(anyhow::anyhow!(
    1220            0 :                 "unknown command {query_string}"
    1221            0 :             )));
    1222              :         }
    1223              : 
    1224         5181 :         Ok(())
    1225        10601 :     }
    1226              : }
    1227              : 
    1228            6 : #[derive(thiserror::Error, Debug)]
    1229              : enum GetActiveTenantError {
    1230              :     #[error(
    1231              :         "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
    1232              :     )]
    1233              :     WaitForActiveTimeout {
    1234              :         latest_state: TenantState,
    1235              :         wait_time: Duration,
    1236              :     },
    1237              :     #[error(transparent)]
    1238              :     NotFound(GetTenantError),
    1239              :     #[error(transparent)]
    1240              :     WaitTenantActive(tenant::WaitToBecomeActiveError),
    1241              : }
    1242              : 
    1243              : impl From<GetActiveTenantError> for QueryError {
    1244           69 :     fn from(e: GetActiveTenantError) -> Self {
    1245           69 :         match e {
    1246            0 :             GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
    1247            0 :                 ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
    1248            0 :             ),
    1249           64 :             GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
    1250            5 :             GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
    1251              :         }
    1252           69 :     }
    1253              : }
    1254              : 
    1255              : /// Get active tenant.
    1256              : ///
    1257              : /// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
    1258              : /// ensures that queries don't fail immediately after pageserver startup, because
    1259              : /// all tenants are still loading.
    1260         5312 : async fn get_active_tenant_with_timeout(
    1261         5312 :     tenant_id: TenantId,
    1262         5312 :     _ctx: &RequestContext, /* require get a context to support cancellation in the future */
    1263         5312 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
    1264         5312 :     let tenant = match mgr::get_tenant(tenant_id, false).await {
    1265         5307 :         Ok(tenant) => tenant,
    1266            5 :         Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
    1267              :         Err(GetTenantError::NotActive(_)) => {
    1268            0 :             unreachable!("we're calling get_tenant with active=false")
    1269              :         }
    1270              :     };
    1271         5307 :     let wait_time = Duration::from_secs(30);
    1272         5307 :     match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
    1273         5241 :         Ok(Ok(())) => Ok(tenant),
    1274              :         // no .context(), the error message is good enough and some tests depend on it
    1275           66 :         Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
    1276              :         Err(_) => {
    1277            0 :             let latest_state = tenant.current_state();
    1278            0 :             if latest_state == TenantState::Active {
    1279            0 :                 Ok(tenant)
    1280              :             } else {
    1281            0 :                 Err(GetActiveTenantError::WaitForActiveTimeout {
    1282            0 :                     latest_state,
    1283            0 :                     wait_time,
    1284            0 :                 })
    1285              :             }
    1286              :         }
    1287              :     }
    1288         5312 : }
    1289              : 
    1290            6 : #[derive(Debug, thiserror::Error)]
    1291              : enum GetActiveTimelineError {
    1292              :     #[error(transparent)]
    1293              :     Tenant(GetActiveTenantError),
    1294              :     #[error(transparent)]
    1295              :     Timeline(anyhow::Error),
    1296              : }
    1297              : 
    1298              : impl From<GetActiveTimelineError> for QueryError {
    1299            0 :     fn from(e: GetActiveTimelineError) -> Self {
    1300            0 :         match e {
    1301            0 :             GetActiveTimelineError::Tenant(e) => e.into(),
    1302            0 :             GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
    1303              :         }
    1304            0 :     }
    1305              : }
    1306              : 
    1307              : /// Shorthand for getting a reference to a Timeline of an Active tenant.
    1308          680 : async fn get_active_tenant_timeline(
    1309          680 :     tenant_id: TenantId,
    1310          680 :     timeline_id: TimelineId,
    1311          680 :     ctx: &RequestContext,
    1312          680 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
    1313          680 :     let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
    1314            8 :         .await
    1315          680 :         .map_err(GetActiveTimelineError::Tenant)?;
    1316          678 :     let timeline = tenant
    1317          678 :         .get_timeline(timeline_id, true)
    1318          678 :         .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
    1319          678 :     Ok(timeline)
    1320          680 : }
        

Generated by: LCOV version 2.1-beta