LCOV - differential code coverage report
Current view: top level - pageserver/src - page_service.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 87.9 % 663 583 2 78 583
Current Date: 2023-10-19 02:04:12 Functions: 38.5 % 182 70 1 111 70
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //
       2                 : //! The Page Service listens for client connections and serves their GetPage@LSN
       3                 : //! requests.
       4                 : //
       5                 : //   It is possible to connect here using usual psql/pgbench/libpq. Following
       6                 : // commands are supported now:
       7                 : //     *status* -- show actual info about this pageserver,
       8                 : //     *pagestream* -- enter mode where smgr and pageserver talk with their
       9                 : //  custom protocol.
      10                 : //
      11                 : 
      12                 : use anyhow::Context;
      13                 : use async_compression::tokio::write::GzipEncoder;
      14                 : use bytes::Buf;
      15                 : use bytes::Bytes;
      16                 : use futures::Stream;
      17                 : use pageserver_api::models::TenantState;
      18                 : use pageserver_api::models::{
      19                 :     PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
      20                 :     PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
      21                 :     PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
      22                 :     PagestreamNblocksRequest, PagestreamNblocksResponse,
      23                 : };
      24                 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
      25                 : use pq_proto::framed::ConnectionError;
      26                 : use pq_proto::FeStartupPacket;
      27                 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
      28                 : use std::io;
      29                 : use std::net::TcpListener;
      30                 : use std::pin::pin;
      31                 : use std::str;
      32                 : use std::str::FromStr;
      33                 : use std::sync::Arc;
      34                 : use std::time::Duration;
      35                 : use tokio::io::AsyncWriteExt;
      36                 : use tokio::io::{AsyncRead, AsyncWrite};
      37                 : use tokio_util::io::StreamReader;
      38                 : use tokio_util::sync::CancellationToken;
      39                 : use tracing::field;
      40                 : use tracing::*;
      41                 : use utils::id::ConnectionId;
      42                 : use utils::{
      43                 :     auth::{Claims, JwtAuth, Scope},
      44                 :     id::{TenantId, TimelineId},
      45                 :     lsn::Lsn,
      46                 :     simple_rcu::RcuReadGuard,
      47                 : };
      48                 : 
      49                 : use crate::auth::check_permission;
      50                 : use crate::basebackup;
      51                 : use crate::config::PageServerConf;
      52                 : use crate::context::{DownloadBehavior, RequestContext};
      53                 : use crate::import_datadir::import_wal_from_tar;
      54                 : use crate::metrics;
      55                 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
      56                 : use crate::task_mgr;
      57                 : use crate::task_mgr::TaskKind;
      58                 : use crate::tenant;
      59                 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      60                 : use crate::tenant::mgr;
      61                 : use crate::tenant::mgr::GetTenantError;
      62                 : use crate::tenant::{Tenant, Timeline};
      63                 : use crate::trace::Tracer;
      64                 : 
      65                 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
      66                 : use postgres_ffi::BLCKSZ;
      67                 : 
      68                 : /// Read the end of a tar archive.
      69                 : ///
      70                 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
      71                 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
      72                 : /// and check that there is no more data after the EOF marker.
      73                 : ///
      74                 : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
      75                 : /// Perhaps it should be a hard error?
      76 CBC           5 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
      77               5 :     use tokio::io::AsyncReadExt;
      78               5 :     let mut buf = [0u8; 512];
      79               5 : 
      80               5 :     // Read the all-zeros block, and verify it
      81               5 :     let mut total_bytes = 0;
      82              10 :     while total_bytes < 512 {
      83               5 :         let nbytes = reader.read(&mut buf[total_bytes..]).await?;
      84               5 :         total_bytes += nbytes;
      85               5 :         if nbytes == 0 {
      86 UBC           0 :             break;
      87 CBC           5 :         }
      88                 :     }
      89               5 :     if total_bytes < 512 {
      90 UBC           0 :         anyhow::bail!("incomplete or invalid tar EOF marker");
      91 CBC           5 :     }
      92            2560 :     if !buf.iter().all(|&x| x == 0) {
      93 UBC           0 :         anyhow::bail!("invalid tar EOF marker");
      94 CBC           5 :     }
      95               5 : 
      96               5 :     // Drain any data after the EOF marker
      97               5 :     let mut trailing_bytes = 0;
      98                 :     loop {
      99               6 :         let nbytes = reader.read(&mut buf).await?;
     100               6 :         trailing_bytes += nbytes;
     101               6 :         if nbytes == 0 {
     102               5 :             break;
     103               1 :         }
     104                 :     }
     105               5 :     if trailing_bytes > 0 {
     106               1 :         warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
     107               4 :     }
     108               5 :     Ok(())
     109               5 : }
     110                 : 
     111                 : ///////////////////////////////////////////////////////////////////////////////
     112                 : 
     113                 : ///
     114                 : /// Main loop of the page service.
     115                 : ///
     116                 : /// Listens for connections, and launches a new handler task for each.
     117                 : ///
     118             560 : pub async fn libpq_listener_main(
     119             560 :     conf: &'static PageServerConf,
     120             560 :     broker_client: storage_broker::BrokerClientChannel,
     121             560 :     auth: Option<Arc<JwtAuth>>,
     122             560 :     listener: TcpListener,
     123             560 :     auth_type: AuthType,
     124             560 :     listener_ctx: RequestContext,
     125             560 :     cancel: CancellationToken,
     126             560 : ) -> anyhow::Result<()> {
     127             560 :     listener.set_nonblocking(true)?;
     128             560 :     let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
     129                 : 
     130                 :     // Wait for a new connection to arrive, or for server shutdown.
     131            5679 :     while let Some(res) = tokio::select! {
     132                 :         biased;
     133                 : 
     134                 :         _ = cancel.cancelled() => {
     135                 :             // We were requested to shut down.
     136                 :             None
     137                 :         }
     138                 : 
     139            5119 :         res = tokio_listener.accept() => {
     140                 :             Some(res)
     141                 :         }
     142                 :     } {
     143            5119 :         match res {
     144            5119 :             Ok((socket, peer_addr)) => {
     145                 :                 // Connection established. Spawn a new task to handle it.
     146 UBC           0 :                 debug!("accepted connection from {}", peer_addr);
     147 CBC        5119 :                 let local_auth = auth.clone();
     148            5119 : 
     149            5119 :                 let connection_ctx = listener_ctx
     150            5119 :                     .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
     151            5119 : 
     152            5119 :                 // PageRequestHandler tasks are not associated with any particular
     153            5119 :                 // timeline in the task manager. In practice most connections will
     154            5119 :                 // only deal with a particular timeline, but we don't know which one
     155            5119 :                 // yet.
     156            5119 :                 task_mgr::spawn(
     157            5119 :                     &tokio::runtime::Handle::current(),
     158            5119 :                     TaskKind::PageRequestHandler,
     159            5119 :                     None,
     160            5119 :                     None,
     161            5119 :                     "serving compute connection task",
     162            5119 :                     false,
     163            5119 :                     page_service_conn_main(
     164            5119 :                         conf,
     165            5119 :                         broker_client.clone(),
     166            5119 :                         local_auth,
     167            5119 :                         socket,
     168            5119 :                         auth_type,
     169            5119 :                         connection_ctx,
     170            5119 :                     ),
     171            5119 :                 );
     172                 :             }
     173 UBC           0 :             Err(err) => {
     174                 :                 // accept() failed. Log the error, and loop back to retry on next connection.
     175               0 :                 error!("accept() failed: {:?}", err);
     176                 :             }
     177                 :         }
     178                 :     }
     179                 : 
     180               0 :     debug!("page_service loop terminated");
     181                 : 
     182 CBC         144 :     Ok(())
     183             144 : }
     184                 : 
     185           15357 : #[instrument(skip_all, fields(peer_addr))]
     186                 : async fn page_service_conn_main(
     187                 :     conf: &'static PageServerConf,
     188                 :     broker_client: storage_broker::BrokerClientChannel,
     189                 :     auth: Option<Arc<JwtAuth>>,
     190                 :     socket: tokio::net::TcpStream,
     191                 :     auth_type: AuthType,
     192                 :     connection_ctx: RequestContext,
     193                 : ) -> anyhow::Result<()> {
     194                 :     // Immediately increment the gauge, then create a job to decrement it on task exit.
     195                 :     // One of the pros of `defer!` is that this will *most probably*
     196                 :     // get called, even in presence of panics.
     197                 :     let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
     198                 :     gauge.inc();
     199            5043 :     scopeguard::defer! {
     200            5043 :         gauge.dec();
     201            5043 :     }
     202                 : 
     203                 :     socket
     204                 :         .set_nodelay(true)
     205                 :         .context("could not set TCP_NODELAY")?;
     206                 : 
     207                 :     let peer_addr = socket.peer_addr().context("get peer address")?;
     208                 :     tracing::Span::current().record("peer_addr", field::display(peer_addr));
     209                 : 
     210                 :     // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
     211                 :     // - long enough for most valid compute connections
     212                 :     // - less than infinite to stop us from "leaking" connections to long-gone computes
     213                 :     //
     214                 :     // no write timeout is used, because the kernel is assumed to error writes after some time.
     215                 :     let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
     216                 : 
     217                 :     // timeout should be lower, but trying out multiple days for
     218                 :     // <https://github.com/neondatabase/neon/issues/4205>
     219                 :     socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3)));
     220                 :     let socket = std::pin::pin!(socket);
     221                 : 
     222                 :     // XXX: pgbackend.run() should take the connection_ctx,
     223                 :     // and create a child per-query context when it invokes process_query.
     224                 :     // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
     225                 :     // and create the per-query context in process_query ourselves.
     226                 :     let mut conn_handler = PageServerHandler::new(
     227                 :         conf,
     228                 :         broker_client,
     229                 :         auth,
     230                 :         connection_ctx,
     231                 :         task_mgr::shutdown_token(),
     232                 :     );
     233                 :     let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
     234                 : 
     235                 :     match pgbackend
     236                 :         .run(&mut conn_handler, task_mgr::shutdown_watcher)
     237                 :         .await
     238                 :     {
     239                 :         Ok(()) => {
     240                 :             // we've been requested to shut down
     241                 :             Ok(())
     242                 :         }
     243                 :         Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     244                 :             if is_expected_io_error(&io_error) {
     245 UBC           0 :                 info!("Postgres client disconnected ({io_error})");
     246                 :                 Ok(())
     247                 :             } else {
     248                 :                 Err(io_error).context("Postgres connection error")
     249                 :             }
     250                 :         }
     251                 :         other => other.context("Postgres query error"),
     252                 :     }
     253                 : }
     254                 : 
     255                 : struct PageServerHandler {
     256                 :     _conf: &'static PageServerConf,
     257                 :     broker_client: storage_broker::BrokerClientChannel,
     258                 :     auth: Option<Arc<JwtAuth>>,
     259                 :     claims: Option<Claims>,
     260                 : 
     261                 :     /// The context created for the lifetime of the connection
     262                 :     /// services by this PageServerHandler.
     263                 :     /// For each query received over the connection,
     264                 :     /// `process_query` creates a child context from this one.
     265                 :     connection_ctx: RequestContext,
     266                 : 
     267                 :     /// A token that should fire when the tenant transitions from
     268                 :     /// attached state, or when the pageserver is shutting down.
     269                 :     cancel: CancellationToken,
     270                 : }
     271                 : 
     272                 : impl PageServerHandler {
     273 CBC        5119 :     pub fn new(
     274            5119 :         conf: &'static PageServerConf,
     275            5119 :         broker_client: storage_broker::BrokerClientChannel,
     276            5119 :         auth: Option<Arc<JwtAuth>>,
     277            5119 :         connection_ctx: RequestContext,
     278            5119 :         cancel: CancellationToken,
     279            5119 :     ) -> Self {
     280            5119 :         PageServerHandler {
     281            5119 :             _conf: conf,
     282            5119 :             broker_client,
     283            5119 :             auth,
     284            5119 :             claims: None,
     285            5119 :             connection_ctx,
     286            5119 :             cancel,
     287            5119 :         }
     288            5119 :     }
     289                 : 
     290                 :     /// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
     291                 :     /// this rather than naked flush() in order to shut down promptly.  Without this, we would
     292                 :     /// block shutdown of a tenant if a postgres client was failing to consume bytes we send
     293                 :     /// in the flush.
     294         3784731 :     async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
     295         3784731 :     where
     296         3784731 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     297         3784731 :     {
     298         3793867 :         tokio::select!(
     299         3784727 :             flush_r = pgb.flush() => {
     300                 :                 Ok(flush_r?)
     301                 :             },
     302                 :             _ = self.cancel.cancelled() => {
     303                 :                 Err(QueryError::Shutdown)
     304                 :             }
     305                 :         )
     306         3784731 :     }
     307                 : 
     308               7 :     fn copyin_stream<'a, IO>(
     309               7 :         &'a self,
     310               7 :         pgb: &'a mut PostgresBackend<IO>,
     311               7 :     ) -> impl Stream<Item = io::Result<Bytes>> + 'a
     312               7 :     where
     313               7 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     314               7 :     {
     315               7 :         async_stream::try_stream! {
     316               7 :             loop {
     317           26355 :                 let msg = tokio::select! {
     318               7 :                     biased;
     319               7 : 
     320               7 :                     _ = self.cancel.cancelled() => {
     321               7 :                         // We were requested to shut down.
     322               7 :                         let msg = "pageserver is shutting down";
     323               7 :                         let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
     324               7 :                         Err(QueryError::Shutdown)
     325               7 :                     }
     326               7 : 
     327           26355 :                     msg = pgb.read_message() => { msg.map_err(QueryError::from)}
     328               7 :                 };
     329               7 : 
     330           26355 :                 match msg {
     331           26355 :                     Ok(Some(message)) => {
     332           26355 :                         let copy_data_bytes = match message {
     333           26342 :                             FeMessage::CopyData(bytes) => bytes,
     334               7 :                             FeMessage::CopyDone => { break },
     335               7 :                             FeMessage::Sync => continue,
     336               7 :                             FeMessage::Terminate => {
     337               7 :                                 let msg = "client terminated connection with Terminate message during COPY";
     338 UBC           0 :                                 let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
     339               0 :                                 // error can't happen here, ErrorResponse serialization should be always ok
     340               0 :                                 pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
     341 CBC           7 :                                 Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
     342               7 :                                 break;
     343               7 :                             }
     344               7 :                             m => {
     345 UBC           0 :                                 let msg = format!("unexpected message {m:?}");
     346               0 :                                 // error can't happen here, ErrorResponse serialization should be always ok
     347               0 :                                 pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
     348 CBC           7 :                                 Err(io::Error::new(io::ErrorKind::Other, msg))?;
     349               7 :                                 break;
     350               7 :                             }
     351               7 :                         };
     352               7 : 
     353           26342 :                         yield copy_data_bytes;
     354               7 :                     }
     355               7 :                     Ok(None) => {
     356               7 :                         let msg = "client closed connection during COPY";
     357 UBC           0 :                         let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
     358               0 :                         // error can't happen here, ErrorResponse serialization should be always ok
     359               0 :                         pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
     360 CBC           7 :                         self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
     361               7 :                         Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
     362               7 :                     }
     363               7 :                     Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
     364 UBC           0 :                         Err(io_error)?;
     365 CBC           7 :                     }
     366               7 :                     Err(other) => {
     367 UBC           0 :                         Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
     368 CBC           7 :                     }
     369               7 :                 };
     370               7 :             }
     371               7 :         }
     372               7 :     }
     373                 : 
     374           22110 :     #[instrument(skip_all)]
     375                 :     async fn handle_pagerequests<IO>(
     376                 :         &self,
     377                 :         pgb: &mut PostgresBackend<IO>,
     378                 :         tenant_id: TenantId,
     379                 :         timeline_id: TimelineId,
     380                 :         ctx: RequestContext,
     381                 :     ) -> Result<(), QueryError>
     382                 :     where
     383                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     384                 :     {
     385                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     386                 : 
     387                 :         // NOTE: pagerequests handler exits when connection is closed,
     388                 :         //       so there is no need to reset the association
     389                 :         task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
     390                 : 
     391                 :         // Make request tracer if needed
     392                 :         let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
     393                 :         let mut tracer = if tenant.get_trace_read_requests() {
     394                 :             let connection_id = ConnectionId::generate();
     395                 :             let path = tenant
     396                 :                 .conf
     397                 :                 .trace_path(&tenant_id, &timeline_id, &connection_id);
     398                 :             Some(Tracer::new(path))
     399                 :         } else {
     400                 :             None
     401                 :         };
     402                 : 
     403                 :         // Check that the timeline exists
     404                 :         let timeline = tenant
     405                 :             .get_timeline(timeline_id, true)
     406 LBC         (1) :             .map_err(|e| anyhow::anyhow!(e))?;
     407                 : 
     408                 :         // switch client to COPYBOTH
     409                 :         pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
     410                 :         self.flush_cancellable(pgb).await?;
     411                 : 
     412                 :         let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
     413                 : 
     414                 :         loop {
     415 CBC     7193794 :             let msg = tokio::select! {
     416                 :                 biased;
     417                 : 
     418                 :                 _ = self.cancel.cancelled() => {
     419                 :                     // We were requested to shut down.
     420             105 :                     info!("shutdown request received in page handler");
     421                 :                     return Err(QueryError::Shutdown)
     422                 :                 }
     423                 : 
     424         3783293 :                 msg = pgb.read_message() => { msg }
     425                 :             };
     426                 : 
     427                 :             let copy_data_bytes = match msg? {
     428                 :                 Some(FeMessage::CopyData(bytes)) => bytes,
     429                 :                 Some(FeMessage::Terminate) => break,
     430                 :                 Some(m) => {
     431                 :                     return Err(QueryError::Other(anyhow::anyhow!(
     432                 :                         "unexpected message: {m:?} during COPY"
     433                 :                     )));
     434                 :                 }
     435                 :                 None => break, // client disconnected
     436                 :             };
     437                 : 
     438 UBC           0 :             trace!("query: {copy_data_bytes:?}");
     439                 : 
     440                 :             // Trace request if needed
     441                 :             if let Some(t) = tracer.as_mut() {
     442                 :                 t.trace(&copy_data_bytes)
     443                 :             }
     444                 : 
     445                 :             let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
     446                 : 
     447                 :             // TODO: We could create a new per-request context here, with unique ID.
     448                 :             // Currently we use the same per-timeline context for all requests
     449                 : 
     450                 :             let (response, span) = match neon_fe_msg {
     451                 :                 PagestreamFeMessage::Exists(req) => {
     452                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
     453                 :                     let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
     454                 :                     (
     455                 :                         self.handle_get_rel_exists_request(&timeline, &req, &ctx)
     456                 :                             .instrument(span.clone())
     457                 :                             .await,
     458                 :                         span,
     459                 :                     )
     460                 :                 }
     461                 :                 PagestreamFeMessage::Nblocks(req) => {
     462                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
     463                 :                     let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
     464                 :                     (
     465                 :                         self.handle_get_nblocks_request(&timeline, &req, &ctx)
     466                 :                             .instrument(span.clone())
     467                 :                             .await,
     468                 :                         span,
     469                 :                     )
     470                 :                 }
     471                 :                 PagestreamFeMessage::GetPage(req) => {
     472                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
     473                 :                     let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
     474                 :                     (
     475                 :                         self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
     476                 :                             .instrument(span.clone())
     477                 :                             .await,
     478                 :                         span,
     479                 :                     )
     480                 :                 }
     481                 :                 PagestreamFeMessage::DbSize(req) => {
     482                 :                     let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
     483                 :                     let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
     484                 :                     (
     485                 :                         self.handle_db_size_request(&timeline, &req, &ctx)
     486                 :                             .instrument(span.clone())
     487                 :                             .await,
     488                 :                         span,
     489                 :                     )
     490                 :                 }
     491                 :             };
     492                 : 
     493 CBC           1 :             let response = response.unwrap_or_else(|e| {
     494               1 :                 // print the all details to the log with {:#}, but for the client the
     495               1 :                 // error message is enough
     496               1 :                 span.in_scope(|| error!("error reading relation or page version: {:#}", e));
     497               1 :                 PagestreamBeMessage::Error(PagestreamErrorResponse {
     498               1 :                     message: e.to_string(),
     499               1 :                 })
     500               1 :             });
     501                 : 
     502                 :             pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
     503                 :             self.flush_cancellable(pgb).await?;
     504                 :         }
     505                 :         Ok(())
     506                 :     }
     507                 : 
     508                 :     #[allow(clippy::too_many_arguments)]
     509              10 :     #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
     510                 :     async fn handle_import_basebackup<IO>(
     511                 :         &self,
     512                 :         pgb: &mut PostgresBackend<IO>,
     513                 :         tenant_id: TenantId,
     514                 :         timeline_id: TimelineId,
     515                 :         base_lsn: Lsn,
     516                 :         _end_lsn: Lsn,
     517                 :         pg_version: u32,
     518                 :         ctx: RequestContext,
     519                 :     ) -> Result<(), QueryError>
     520                 :     where
     521                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     522                 :     {
     523                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     524                 : 
     525                 :         task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
     526                 :         // Create empty timeline
     527               5 :         info!("creating new timeline");
     528                 :         let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
     529                 :         let timeline = tenant
     530                 :             .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
     531                 :             .await?;
     532                 : 
     533                 :         // TODO mark timeline as not ready until it reaches end_lsn.
     534                 :         // We might have some wal to import as well, and we should prevent compute
     535                 :         // from connecting before that and writing conflicting wal.
     536                 :         //
     537                 :         // This is not relevant for pageserver->pageserver migrations, since there's
     538                 :         // no wal to import. But should be fixed if we want to import from postgres.
     539                 : 
     540                 :         // TODO leave clean state on error. For now you can use detach to clean
     541                 :         // up broken state from a failed import.
     542                 : 
     543                 :         // Import basebackup provided via CopyData
     544               5 :         info!("importing basebackup");
     545                 :         pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
     546                 :         self.flush_cancellable(pgb).await?;
     547                 : 
     548                 :         let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
     549                 :         timeline
     550                 :             .import_basebackup_from_tar(
     551                 :                 &mut copyin_reader,
     552                 :                 base_lsn,
     553                 :                 self.broker_client.clone(),
     554                 :                 &ctx,
     555                 :             )
     556                 :             .await?;
     557                 : 
     558                 :         // Read the end of the tar archive.
     559                 :         read_tar_eof(copyin_reader).await?;
     560                 : 
     561                 :         // TODO check checksum
     562                 :         // Meanwhile you can verify client-side by taking fullbackup
     563                 :         // and checking that it matches in size with what was imported.
     564                 :         // It wouldn't work if base came from vanilla postgres though,
     565                 :         // since we discard some log files.
     566                 : 
     567               3 :         info!("done");
     568                 :         Ok(())
     569                 :     }
     570                 : 
     571               6 :     #[instrument(skip_all, fields(%start_lsn, %end_lsn))]
     572                 :     async fn handle_import_wal<IO>(
     573                 :         &self,
     574                 :         pgb: &mut PostgresBackend<IO>,
     575                 :         tenant_id: TenantId,
     576                 :         timeline_id: TimelineId,
     577                 :         start_lsn: Lsn,
     578                 :         end_lsn: Lsn,
     579                 :         ctx: RequestContext,
     580                 :     ) -> Result<(), QueryError>
     581                 :     where
     582                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     583                 :     {
     584                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     585                 :         task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
     586                 : 
     587                 :         let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
     588                 :         let last_record_lsn = timeline.get_last_record_lsn();
     589                 :         if last_record_lsn != start_lsn {
     590                 :             return Err(QueryError::Other(
     591                 :                 anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
     592                 :             );
     593                 :         }
     594                 : 
     595                 :         // TODO leave clean state on error. For now you can use detach to clean
     596                 :         // up broken state from a failed import.
     597                 : 
     598                 :         // Import wal provided via CopyData
     599               2 :         info!("importing wal");
     600                 :         pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
     601                 :         self.flush_cancellable(pgb).await?;
     602                 :         let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
     603                 :         import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
     604               2 :         info!("wal import complete");
     605                 : 
     606                 :         // Read the end of the tar archive.
     607                 :         read_tar_eof(copyin_reader).await?;
     608                 : 
     609                 :         // TODO Does it make sense to overshoot?
     610                 :         if timeline.get_last_record_lsn() < end_lsn {
     611                 :             return Err(QueryError::Other(
     612                 :                 anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
     613                 :             );
     614                 :         }
     615                 : 
     616                 :         // Flush data to disk, then upload to s3. No need for a forced checkpoint.
     617                 :         // We only want to persist the data, and it doesn't matter if it's in the
     618                 :         // shape of deltas or images.
     619               2 :         info!("flushing layers");
     620                 :         timeline.freeze_and_flush().await?;
     621                 : 
     622               2 :         info!("done");
     623                 :         Ok(())
     624                 :     }
     625                 : 
     626                 :     /// Helper function to handle the LSN from client request.
     627                 :     ///
     628                 :     /// Each GetPage (and Exists and Nblocks) request includes information about
     629                 :     /// which version of the page is being requested. The client can request the
     630                 :     /// latest version of the page, or the version that's valid at a particular
     631                 :     /// LSN. The primary compute node will always request the latest page
     632                 :     /// version, while a standby will request a version at the LSN that it's
     633                 :     /// currently caught up to.
     634                 :     ///
     635                 :     /// In either case, if the page server hasn't received the WAL up to the
     636                 :     /// requested LSN yet, we will wait for it to arrive. The return value is
     637                 :     /// the LSN that should be used to look up the page versions.
     638         3779079 :     async fn wait_or_get_last_lsn(
     639         3779079 :         timeline: &Timeline,
     640         3779079 :         mut lsn: Lsn,
     641         3779079 :         latest: bool,
     642         3779079 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     643         3779079 :         ctx: &RequestContext,
     644         3779079 :     ) -> anyhow::Result<Lsn> {
     645         3779077 :         if latest {
     646                 :             // Latest page version was requested. If LSN is given, it is a hint
     647                 :             // to the page server that there have been no modifications to the
     648                 :             // page after that LSN. If we haven't received WAL up to that point,
     649                 :             // wait until it arrives.
     650         3665291 :             let last_record_lsn = timeline.get_last_record_lsn();
     651         3665291 : 
     652         3665291 :             // Note: this covers the special case that lsn == Lsn(0). That
     653         3665291 :             // special case means "return the latest version whatever it is",
     654         3665291 :             // and it's used for bootstrapping purposes, when the page server is
     655         3665291 :             // connected directly to the compute node. That is needed because
     656         3665291 :             // when you connect to the compute node, to receive the WAL, the
     657         3665291 :             // walsender process will do a look up in the pg_authid catalog
     658         3665291 :             // table for authentication. That poses a deadlock problem: the
     659         3665291 :             // catalog table lookup will send a GetPage request, but the GetPage
     660         3665291 :             // request will block in the page server because the recent WAL
     661         3665291 :             // hasn't been received yet, and it cannot be received until the
     662         3665291 :             // walsender completes the authentication and starts streaming the
     663         3665291 :             // WAL.
     664         3665291 :             if lsn <= last_record_lsn {
     665         3562783 :                 lsn = last_record_lsn;
     666         3562783 :             } else {
     667          145031 :                 timeline.wait_lsn(lsn, ctx).await?;
     668                 :                 // Since we waited for 'lsn' to arrive, that is now the last
     669                 :                 // record LSN. (Or close enough for our purposes; the
     670                 :                 // last-record LSN can advance immediately after we return
     671                 :                 // anyway)
     672                 :             }
     673                 :         } else {
     674          113786 :             if lsn == Lsn(0) {
     675               1 :                 anyhow::bail!("invalid LSN(0) in request");
     676          113785 :             }
     677          113785 :             timeline.wait_lsn(lsn, ctx).await?;
     678                 :         }
     679         3779075 :         anyhow::ensure!(
     680         3779075 :             lsn >= **latest_gc_cutoff_lsn,
     681 UBC           0 :             "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
     682               0 :             lsn, **latest_gc_cutoff_lsn
     683                 :         );
     684 CBC     3779075 :         Ok(lsn)
     685         3779076 :     }
     686                 : 
     687           46663 :     async fn handle_get_rel_exists_request(
     688           46663 :         &self,
     689           46663 :         timeline: &Timeline,
     690           46663 :         req: &PagestreamExistsRequest,
     691           46663 :         ctx: &RequestContext,
     692           46663 :     ) -> anyhow::Result<PagestreamBeMessage> {
     693           46663 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     694           46663 :         let lsn =
     695           46663 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     696 UBC           0 :                 .await?;
     697                 : 
     698 CBC       46663 :         let exists = timeline
     699           46663 :             .get_rel_exists(req.rel, lsn, req.latest, ctx)
     700           31900 :             .await?;
     701                 : 
     702           46663 :         Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
     703           46663 :             exists,
     704           46663 :         }))
     705           46663 :     }
     706                 : 
     707           17067 :     async fn handle_get_nblocks_request(
     708           17067 :         &self,
     709           17067 :         timeline: &Timeline,
     710           17067 :         req: &PagestreamNblocksRequest,
     711           17067 :         ctx: &RequestContext,
     712           17067 :     ) -> anyhow::Result<PagestreamBeMessage> {
     713           17067 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     714           17067 :         let lsn =
     715           17067 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     716             183 :                 .await?;
     717                 : 
     718           17067 :         let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
     719                 : 
     720           17067 :         Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
     721           17067 :             n_blocks,
     722           17067 :         }))
     723           17067 :     }
     724                 : 
     725               5 :     async fn handle_db_size_request(
     726               5 :         &self,
     727               5 :         timeline: &Timeline,
     728               5 :         req: &PagestreamDbSizeRequest,
     729               5 :         ctx: &RequestContext,
     730               5 :     ) -> anyhow::Result<PagestreamBeMessage> {
     731               5 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     732               5 :         let lsn =
     733               5 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     734 LBC         (2) :                 .await?;
     735                 : 
     736 CBC           5 :         let total_blocks = timeline
     737               5 :             .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
     738             107 :             .await?;
     739               5 :         let db_size = total_blocks as i64 * BLCKSZ as i64;
     740               5 : 
     741               5 :         Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
     742               5 :             db_size,
     743               5 :         }))
     744               5 :     }
     745                 : 
     746         3715344 :     async fn handle_get_page_at_lsn_request(
     747         3715344 :         &self,
     748         3715344 :         timeline: &Timeline,
     749         3715344 :         req: &PagestreamGetPageRequest,
     750         3715344 :         ctx: &RequestContext,
     751         3715344 :     ) -> anyhow::Result<PagestreamBeMessage> {
     752         3715342 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     753         3715340 :         let lsn =
     754         3715342 :             Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
     755          144848 :                 .await?;
     756                 :         /*
     757                 :         // Add a 1s delay to some requests. The delay helps the requests to
     758                 :         // hit the race condition from github issue #1047 more easily.
     759                 :         use rand::Rng;
     760                 :         if rand::thread_rng().gen::<u8>() < 5 {
     761                 :             std::thread::sleep(std::time::Duration::from_millis(1000));
     762                 :         }
     763                 :         */
     764                 : 
     765         3715340 :         let page = timeline
     766         3715340 :             .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
     767         5377724 :             .await?;
     768                 : 
     769         3715309 :         Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
     770         3715309 :             page,
     771         3715309 :         }))
     772         3715310 :     }
     773                 : 
     774                 :     #[allow(clippy::too_many_arguments)]
     775            1941 :     #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
     776                 :     async fn handle_basebackup_request<IO>(
     777                 :         &mut self,
     778                 :         pgb: &mut PostgresBackend<IO>,
     779                 :         tenant_id: TenantId,
     780                 :         timeline_id: TimelineId,
     781                 :         lsn: Option<Lsn>,
     782                 :         prev_lsn: Option<Lsn>,
     783                 :         full_backup: bool,
     784                 :         gzip: bool,
     785                 :         ctx: RequestContext,
     786                 :     ) -> anyhow::Result<()>
     787                 :     where
     788                 :         IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     789                 :     {
     790                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     791                 : 
     792                 :         let started = std::time::Instant::now();
     793                 : 
     794                 :         // check that the timeline exists
     795                 :         let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
     796                 :         let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
     797                 :         if let Some(lsn) = lsn {
     798                 :             // Backup was requested at a particular LSN. Wait for it to arrive.
     799             237 :             info!("waiting for {}", lsn);
     800                 :             timeline.wait_lsn(lsn, &ctx).await?;
     801                 :             timeline
     802                 :                 .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
     803                 :                 .context("invalid basebackup lsn")?;
     804                 :         }
     805                 : 
     806                 :         let lsn_awaited_after = started.elapsed();
     807                 : 
     808                 :         // switch client to COPYOUT
     809                 :         pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
     810                 :         self.flush_cancellable(pgb).await?;
     811                 : 
     812                 :         // Send a tarball of the latest layer on the timeline. Compress if not
     813                 :         // fullbackup. TODO Compress in that case too (tests need to be updated)
     814                 :         if full_backup {
     815                 :             let mut writer = pgb.copyout_writer();
     816                 :             basebackup::send_basebackup_tarball(
     817                 :                 &mut writer,
     818                 :                 &timeline,
     819                 :                 lsn,
     820                 :                 prev_lsn,
     821                 :                 full_backup,
     822                 :                 &ctx,
     823                 :             )
     824                 :             .await?;
     825                 :         } else {
     826                 :             let mut writer = pgb.copyout_writer();
     827                 :             if gzip {
     828                 :                 let mut encoder = GzipEncoder::with_quality(
     829                 :                     writer,
     830                 :                     // NOTE using fast compression because it's on the critical path
     831                 :                     //      for compute startup. For an empty database, we get
     832                 :                     //      <100KB with this method. The Level::Best compression method
     833                 :                     //      gives us <20KB, but maybe we should add basebackup caching
     834                 :                     //      on compute shutdown first.
     835                 :                     async_compression::Level::Fastest,
     836                 :                 );
     837                 :                 basebackup::send_basebackup_tarball(
     838                 :                     &mut encoder,
     839                 :                     &timeline,
     840                 :                     lsn,
     841                 :                     prev_lsn,
     842                 :                     full_backup,
     843                 :                     &ctx,
     844                 :                 )
     845                 :                 .await?;
     846                 :                 // shutdown the encoder to ensure the gzip footer is written
     847                 :                 encoder.shutdown().await?;
     848                 :             } else {
     849                 :                 basebackup::send_basebackup_tarball(
     850                 :                     &mut writer,
     851                 :                     &timeline,
     852                 :                     lsn,
     853                 :                     prev_lsn,
     854                 :                     full_backup,
     855                 :                     &ctx,
     856                 :                 )
     857                 :                 .await?;
     858                 :             }
     859                 :         }
     860                 : 
     861                 :         pgb.write_message_noflush(&BeMessage::CopyDone)?;
     862                 :         self.flush_cancellable(pgb).await?;
     863                 : 
     864                 :         let basebackup_after = started
     865                 :             .elapsed()
     866                 :             .checked_sub(lsn_awaited_after)
     867                 :             .unwrap_or(Duration::ZERO);
     868                 : 
     869             638 :         info!(
     870             638 :             lsn_await_millis = lsn_awaited_after.as_millis(),
     871             638 :             basebackup_millis = basebackup_after.as_millis(),
     872             638 :             "basebackup complete"
     873             638 :         );
     874                 : 
     875                 :         Ok(())
     876                 :     }
     877                 : 
     878                 :     // when accessing management api supply None as an argument
     879                 :     // when using to authorize tenant pass corresponding tenant id
     880            5106 :     fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
     881            5106 :         if self.auth.is_none() {
     882                 :             // auth is set to Trust, nothing to check so just return ok
     883            5027 :             return Ok(());
     884              79 :         }
     885              79 :         // auth is some, just checked above, when auth is some
     886              79 :         // then claims are always present because of checks during connection init
     887              79 :         // so this expect won't trigger
     888              79 :         let claims = self
     889              79 :             .claims
     890              79 :             .as_ref()
     891              79 :             .expect("claims presence already checked");
     892              79 :         check_permission(claims, tenant_id)
     893            5106 :     }
     894                 : }
     895                 : 
     896                 : #[async_trait::async_trait]
     897                 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
     898                 : where
     899                 :     IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
     900                 : {
     901              81 :     fn check_auth_jwt(
     902              81 :         &mut self,
     903              81 :         _pgb: &mut PostgresBackend<IO>,
     904              81 :         jwt_response: &[u8],
     905              81 :     ) -> Result<(), QueryError> {
     906                 :         // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
     907                 :         // which requires auth to be present
     908              81 :         let data = self
     909              81 :             .auth
     910              81 :             .as_ref()
     911              81 :             .unwrap()
     912              81 :             .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
     913                 : 
     914              81 :         if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
     915 UBC           0 :             return Err(QueryError::Other(anyhow::anyhow!(
     916               0 :                 "jwt token scope is Tenant, but tenant id is missing"
     917               0 :             )));
     918 CBC          81 :         }
     919              81 : 
     920              81 :         info!(
     921              81 :             "jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
     922              81 :             data.claims.scope, data.claims.tenant_id,
     923              81 :         );
     924                 : 
     925              81 :         self.claims = Some(data.claims);
     926              81 :         Ok(())
     927              81 :     }
     928                 : 
     929            5119 :     fn startup(
     930            5119 :         &mut self,
     931            5119 :         _pgb: &mut PostgresBackend<IO>,
     932            5119 :         _sm: &FeStartupPacket,
     933            5119 :     ) -> Result<(), QueryError> {
     934            5119 :         Ok(())
     935            5119 :     }
     936                 : 
     937           10252 :     #[instrument(skip_all, fields(tenant_id, timeline_id))]
     938                 :     async fn process_query(
     939                 :         &mut self,
     940                 :         pgb: &mut PostgresBackend<IO>,
     941                 :         query_string: &str,
     942            5126 :     ) -> Result<(), QueryError> {
     943            5126 :         let ctx = self.connection_ctx.attached_child();
     944            5126 :         debug!("process query {query_string:?}");
     945                 : 
     946            5126 :         if query_string.starts_with("pagestream ") {
     947            4436 :             let (_, params_raw) = query_string.split_at("pagestream ".len());
     948            4436 :             let params = params_raw.split(' ').collect::<Vec<_>>();
     949            4436 :             if params.len() != 2 {
     950 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
     951               0 :                     "invalid param number for pagestream command"
     952               0 :                 )));
     953 CBC        4436 :             }
     954            4436 :             let tenant_id = TenantId::from_str(params[0])
     955            4436 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
     956            4436 :             let timeline_id = TimelineId::from_str(params[1])
     957            4436 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
     958                 : 
     959            4436 :             tracing::Span::current()
     960            4436 :                 .record("tenant_id", field::display(tenant_id))
     961            4436 :                 .record("timeline_id", field::display(timeline_id));
     962            4436 : 
     963            4436 :             self.check_permission(Some(tenant_id))?;
     964                 : 
     965            4436 :             self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
     966         8974679 :                 .await?;
     967             690 :         } else if query_string.starts_with("basebackup ") {
     968             638 :             let (_, params_raw) = query_string.split_at("basebackup ".len());
     969             638 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
     970             638 : 
     971             638 :             if params.len() < 2 {
     972 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
     973               0 :                     "invalid param number for basebackup command"
     974               0 :                 )));
     975 CBC         638 :             }
     976                 : 
     977             638 :             let tenant_id = TenantId::from_str(params[0])
     978             638 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
     979             638 :             let timeline_id = TimelineId::from_str(params[1])
     980             638 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
     981                 : 
     982             638 :             tracing::Span::current()
     983             638 :                 .record("tenant_id", field::display(tenant_id))
     984             638 :                 .record("timeline_id", field::display(timeline_id));
     985             638 : 
     986             638 :             self.check_permission(Some(tenant_id))?;
     987                 : 
     988             638 :             let lsn = if params.len() >= 3 {
     989                 :                 Some(
     990             229 :                     Lsn::from_str(params[2])
     991             229 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
     992                 :                 )
     993                 :             } else {
     994             409 :                 None
     995                 :             };
     996                 : 
     997             638 :             let gzip = if params.len() >= 4 {
     998             229 :                 if params[3] == "--gzip" {
     999             229 :                     true
    1000                 :                 } else {
    1001 UBC           0 :                     return Err(QueryError::Other(anyhow::anyhow!(
    1002               0 :                         "Parameter in position 3 unknown {}",
    1003               0 :                         params[3],
    1004               0 :                     )));
    1005                 :                 }
    1006                 :             } else {
    1007 CBC         409 :                 false
    1008                 :             };
    1009                 : 
    1010             638 :             ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
    1011             638 :                 &*metrics::BASEBACKUP_QUERY_TIME,
    1012             638 :                 async move {
    1013             638 :                     self.handle_basebackup_request(
    1014             638 :                         pgb,
    1015             638 :                         tenant_id,
    1016             638 :                         timeline_id,
    1017             638 :                         lsn,
    1018             638 :                         None,
    1019             638 :                         false,
    1020             638 :                         gzip,
    1021             638 :                         ctx,
    1022             638 :                     )
    1023           40022 :                     .await?;
    1024             629 :                     pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1025             629 :                     anyhow::Ok(())
    1026             638 :                 },
    1027             638 :             )
    1028           40022 :             .await?;
    1029                 :         }
    1030                 :         // return pair of prev_lsn and last_lsn
    1031              52 :         else if query_string.starts_with("get_last_record_rlsn ") {
    1032              11 :             let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
    1033              11 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1034              11 : 
    1035              11 :             if params.len() != 2 {
    1036 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1037               0 :                     "invalid param number for get_last_record_rlsn command"
    1038               0 :                 )));
    1039 CBC          11 :             }
    1040                 : 
    1041              11 :             let tenant_id = TenantId::from_str(params[0])
    1042              11 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1043              11 :             let timeline_id = TimelineId::from_str(params[1])
    1044              11 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1045                 : 
    1046              11 :             tracing::Span::current()
    1047              11 :                 .record("tenant_id", field::display(tenant_id))
    1048              11 :                 .record("timeline_id", field::display(timeline_id));
    1049              11 : 
    1050              11 :             self.check_permission(Some(tenant_id))?;
    1051               9 :             let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
    1052                 : 
    1053               9 :             let end_of_timeline = timeline.get_last_record_rlsn();
    1054               9 : 
    1055               9 :             pgb.write_message_noflush(&BeMessage::RowDescription(&[
    1056               9 :                 RowDescriptor::text_col(b"prev_lsn"),
    1057               9 :                 RowDescriptor::text_col(b"last_lsn"),
    1058               9 :             ]))?
    1059               9 :             .write_message_noflush(&BeMessage::DataRow(&[
    1060               9 :                 Some(end_of_timeline.prev.to_string().as_bytes()),
    1061               9 :                 Some(end_of_timeline.last.to_string().as_bytes()),
    1062               9 :             ]))?
    1063               9 :             .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1064                 :         }
    1065                 :         // same as basebackup, but result includes relational data as well
    1066              41 :         else if query_string.starts_with("fullbackup ") {
    1067               9 :             let (_, params_raw) = query_string.split_at("fullbackup ".len());
    1068               9 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1069               9 : 
    1070               9 :             if params.len() < 2 {
    1071 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1072               0 :                     "invalid param number for fullbackup command"
    1073               0 :                 )));
    1074 CBC           9 :             }
    1075                 : 
    1076               9 :             let tenant_id = TenantId::from_str(params[0])
    1077               9 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1078               9 :             let timeline_id = TimelineId::from_str(params[1])
    1079               9 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1080                 : 
    1081               9 :             tracing::Span::current()
    1082               9 :                 .record("tenant_id", field::display(tenant_id))
    1083               9 :                 .record("timeline_id", field::display(timeline_id));
    1084                 : 
    1085                 :             // The caller is responsible for providing correct lsn and prev_lsn.
    1086               9 :             let lsn = if params.len() > 2 {
    1087                 :                 Some(
    1088               9 :                     Lsn::from_str(params[2])
    1089               9 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
    1090                 :                 )
    1091                 :             } else {
    1092 UBC           0 :                 None
    1093                 :             };
    1094 CBC           9 :             let prev_lsn = if params.len() > 3 {
    1095                 :                 Some(
    1096               6 :                     Lsn::from_str(params[3])
    1097               6 :                         .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
    1098                 :                 )
    1099                 :             } else {
    1100               3 :                 None
    1101                 :             };
    1102                 : 
    1103               9 :             self.check_permission(Some(tenant_id))?;
    1104                 : 
    1105                 :             // Check that the timeline exists
    1106               9 :             self.handle_basebackup_request(
    1107               9 :                 pgb,
    1108               9 :                 tenant_id,
    1109               9 :                 timeline_id,
    1110               9 :                 lsn,
    1111               9 :                 prev_lsn,
    1112               9 :                 true,
    1113               9 :                 false,
    1114               9 :                 ctx,
    1115               9 :             )
    1116            5788 :             .await?;
    1117               9 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1118              32 :         } else if query_string.starts_with("import basebackup ") {
    1119                 :             // Import the `base` section (everything but the wal) of a basebackup.
    1120                 :             // Assumes the tenant already exists on this pageserver.
    1121                 :             //
    1122                 :             // Files are scheduled to be persisted to remote storage, and the
    1123                 :             // caller should poll the http api to check when that is done.
    1124                 :             //
    1125                 :             // Example import command:
    1126                 :             // 1. Get start/end LSN from backup_manifest file
    1127                 :             // 2. Run:
    1128                 :             // cat my_backup/base.tar | psql -h $PAGESERVER \
    1129                 :             //     -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
    1130               5 :             let (_, params_raw) = query_string.split_at("import basebackup ".len());
    1131               5 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1132               5 :             if params.len() != 5 {
    1133 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1134               0 :                     "invalid param number for import basebackup command"
    1135               0 :                 )));
    1136 CBC           5 :             }
    1137               5 :             let tenant_id = TenantId::from_str(params[0])
    1138               5 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1139               5 :             let timeline_id = TimelineId::from_str(params[1])
    1140               5 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1141               5 :             let base_lsn = Lsn::from_str(params[2])
    1142               5 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
    1143               5 :             let end_lsn = Lsn::from_str(params[3])
    1144               5 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
    1145               5 :             let pg_version = u32::from_str(params[4])
    1146               5 :                 .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
    1147                 : 
    1148               5 :             tracing::Span::current()
    1149               5 :                 .record("tenant_id", field::display(tenant_id))
    1150               5 :                 .record("timeline_id", field::display(timeline_id));
    1151               5 : 
    1152               5 :             self.check_permission(Some(tenant_id))?;
    1153                 : 
    1154               5 :             match self
    1155               5 :                 .handle_import_basebackup(
    1156               5 :                     pgb,
    1157               5 :                     tenant_id,
    1158               5 :                     timeline_id,
    1159               5 :                     base_lsn,
    1160               5 :                     end_lsn,
    1161               5 :                     pg_version,
    1162               5 :                     ctx,
    1163               5 :                 )
    1164             814 :                 .await
    1165                 :             {
    1166               3 :                 Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
    1167               2 :                 Err(e) => {
    1168               2 :                     error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
    1169               2 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1170               2 :                         &e.to_string(),
    1171               2 :                         Some(e.pg_error_code()),
    1172               2 :                     ))?
    1173                 :                 }
    1174                 :             };
    1175              27 :         } else if query_string.starts_with("import wal ") {
    1176                 :             // Import the `pg_wal` section of a basebackup.
    1177                 :             //
    1178                 :             // Files are scheduled to be persisted to remote storage, and the
    1179                 :             // caller should poll the http api to check when that is done.
    1180               2 :             let (_, params_raw) = query_string.split_at("import wal ".len());
    1181               2 :             let params = params_raw.split_whitespace().collect::<Vec<_>>();
    1182               2 :             if params.len() != 4 {
    1183 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1184               0 :                     "invalid param number for import wal command"
    1185               0 :                 )));
    1186 CBC           2 :             }
    1187               2 :             let tenant_id = TenantId::from_str(params[0])
    1188               2 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1189               2 :             let timeline_id = TimelineId::from_str(params[1])
    1190               2 :                 .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
    1191               2 :             let start_lsn = Lsn::from_str(params[2])
    1192               2 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
    1193               2 :             let end_lsn = Lsn::from_str(params[3])
    1194               2 :                 .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
    1195                 : 
    1196               2 :             tracing::Span::current()
    1197               2 :                 .record("tenant_id", field::display(tenant_id))
    1198               2 :                 .record("timeline_id", field::display(timeline_id));
    1199               2 : 
    1200               2 :             self.check_permission(Some(tenant_id))?;
    1201                 : 
    1202               2 :             match self
    1203               2 :                 .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
    1204             246 :                 .await
    1205                 :             {
    1206               2 :                 Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
    1207 UBC           0 :                 Err(e) => {
    1208               0 :                     error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
    1209               0 :                     pgb.write_message_noflush(&BeMessage::ErrorResponse(
    1210               0 :                         &e.to_string(),
    1211               0 :                         Some(e.pg_error_code()),
    1212               0 :                     ))?
    1213                 :                 }
    1214                 :             };
    1215 CBC          25 :         } else if query_string.to_ascii_lowercase().starts_with("set ") {
    1216                 :             // important because psycopg2 executes "SET datestyle TO 'ISO'"
    1217                 :             // on connect
    1218              20 :             pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1219               5 :         } else if query_string.starts_with("show ") {
    1220                 :             // show <tenant_id>
    1221               5 :             let (_, params_raw) = query_string.split_at("show ".len());
    1222               5 :             let params = params_raw.split(' ').collect::<Vec<_>>();
    1223               5 :             if params.len() != 1 {
    1224 UBC           0 :                 return Err(QueryError::Other(anyhow::anyhow!(
    1225               0 :                     "invalid param number for config command"
    1226               0 :                 )));
    1227 CBC           5 :             }
    1228               5 :             let tenant_id = TenantId::from_str(params[0])
    1229               5 :                 .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
    1230                 : 
    1231               5 :             tracing::Span::current().record("tenant_id", field::display(tenant_id));
    1232               5 : 
    1233               5 :             self.check_permission(Some(tenant_id))?;
    1234                 : 
    1235               5 :             let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
    1236               5 :             pgb.write_message_noflush(&BeMessage::RowDescription(&[
    1237               5 :                 RowDescriptor::int8_col(b"checkpoint_distance"),
    1238               5 :                 RowDescriptor::int8_col(b"checkpoint_timeout"),
    1239               5 :                 RowDescriptor::int8_col(b"compaction_target_size"),
    1240               5 :                 RowDescriptor::int8_col(b"compaction_period"),
    1241               5 :                 RowDescriptor::int8_col(b"compaction_threshold"),
    1242               5 :                 RowDescriptor::int8_col(b"gc_horizon"),
    1243               5 :                 RowDescriptor::int8_col(b"gc_period"),
    1244               5 :                 RowDescriptor::int8_col(b"image_creation_threshold"),
    1245               5 :                 RowDescriptor::int8_col(b"pitr_interval"),
    1246               5 :             ]))?
    1247               5 :             .write_message_noflush(&BeMessage::DataRow(&[
    1248               5 :                 Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
    1249               5 :                 Some(
    1250               5 :                     tenant
    1251               5 :                         .get_checkpoint_timeout()
    1252               5 :                         .as_secs()
    1253               5 :                         .to_string()
    1254               5 :                         .as_bytes(),
    1255               5 :                 ),
    1256               5 :                 Some(tenant.get_compaction_target_size().to_string().as_bytes()),
    1257               5 :                 Some(
    1258               5 :                     tenant
    1259               5 :                         .get_compaction_period()
    1260               5 :                         .as_secs()
    1261               5 :                         .to_string()
    1262               5 :                         .as_bytes(),
    1263               5 :                 ),
    1264               5 :                 Some(tenant.get_compaction_threshold().to_string().as_bytes()),
    1265               5 :                 Some(tenant.get_gc_horizon().to_string().as_bytes()),
    1266               5 :                 Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
    1267               5 :                 Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
    1268               5 :                 Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
    1269               5 :             ]))?
    1270               5 :             .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
    1271                 :         } else {
    1272 UBC           0 :             return Err(QueryError::Other(anyhow::anyhow!(
    1273               0 :                 "unknown command {query_string}"
    1274               0 :             )));
    1275                 :         }
    1276                 : 
    1277 CBC        4888 :         Ok(())
    1278           10176 :     }
    1279                 : }
    1280                 : 
    1281               3 : #[derive(thiserror::Error, Debug)]
    1282                 : enum GetActiveTenantError {
    1283                 :     #[error(
    1284                 :         "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
    1285                 :     )]
    1286                 :     WaitForActiveTimeout {
    1287                 :         latest_state: TenantState,
    1288                 :         wait_time: Duration,
    1289                 :     },
    1290                 :     #[error(transparent)]
    1291                 :     NotFound(GetTenantError),
    1292                 :     #[error(transparent)]
    1293                 :     WaitTenantActive(tenant::WaitToBecomeActiveError),
    1294                 : }
    1295                 : 
    1296                 : impl From<GetActiveTenantError> for QueryError {
    1297              35 :     fn from(e: GetActiveTenantError) -> Self {
    1298              35 :         match e {
    1299 UBC           0 :             GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
    1300               0 :                 ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
    1301               0 :             ),
    1302 CBC          30 :             GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
    1303               5 :             GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
    1304                 :         }
    1305              35 :     }
    1306                 : }
    1307                 : 
    1308                 : /// Get active tenant.
    1309                 : ///
    1310                 : /// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
    1311                 : /// ensures that queries don't fail immediately after pageserver startup, because
    1312                 : /// all tenants are still loading.
    1313            5104 : async fn get_active_tenant_with_timeout(
    1314            5104 :     tenant_id: TenantId,
    1315            5104 :     _ctx: &RequestContext, /* require get a context to support cancellation in the future */
    1316            5104 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
    1317            5104 :     let tenant = match mgr::get_tenant(tenant_id, false).await {
    1318            5099 :         Ok(tenant) => tenant,
    1319               5 :         Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
    1320                 :         Err(GetTenantError::NotActive(_)) => {
    1321 UBC           0 :             unreachable!("we're calling get_tenant with active_only=false")
    1322                 :         }
    1323                 :         Err(GetTenantError::Broken(_)) => {
    1324               0 :             unreachable!("we're calling get_tenant with active_only=false")
    1325                 :         }
    1326                 :     };
    1327 CBC        5099 :     let wait_time = Duration::from_secs(30);
    1328            5099 :     match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
    1329            5068 :         Ok(Ok(())) => Ok(tenant),
    1330                 :         // no .context(), the error message is good enough and some tests depend on it
    1331              31 :         Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
    1332                 :         Err(_) => {
    1333 UBC           0 :             let latest_state = tenant.current_state();
    1334               0 :             if latest_state == TenantState::Active {
    1335               0 :                 Ok(tenant)
    1336                 :             } else {
    1337               0 :                 Err(GetActiveTenantError::WaitForActiveTimeout {
    1338               0 :                     latest_state,
    1339               0 :                     wait_time,
    1340               0 :                 })
    1341                 :             }
    1342                 :         }
    1343                 :     }
    1344 CBC        5104 : }
    1345                 : 
    1346               6 : #[derive(Debug, thiserror::Error)]
    1347                 : enum GetActiveTimelineError {
    1348                 :     #[error(transparent)]
    1349                 :     Tenant(GetActiveTenantError),
    1350                 :     #[error(transparent)]
    1351                 :     Timeline(anyhow::Error),
    1352                 : }
    1353                 : 
    1354                 : impl From<GetActiveTimelineError> for QueryError {
    1355 UBC           0 :     fn from(e: GetActiveTimelineError) -> Self {
    1356               0 :         match e {
    1357               0 :             GetActiveTimelineError::Tenant(e) => e.into(),
    1358               0 :             GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
    1359                 :         }
    1360               0 :     }
    1361                 : }
    1362                 : 
    1363                 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
    1364 CBC         658 : async fn get_active_tenant_timeline(
    1365             658 :     tenant_id: TenantId,
    1366             658 :     timeline_id: TimelineId,
    1367             658 :     ctx: &RequestContext,
    1368             658 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
    1369             658 :     let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
    1370               7 :         .await
    1371             658 :         .map_err(GetActiveTimelineError::Tenant)?;
    1372             657 :     let timeline = tenant
    1373             657 :         .get_timeline(timeline_id, true)
    1374             657 :         .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
    1375             656 :     Ok(timeline)
    1376             658 : }
        

Generated by: LCOV version 2.1-beta