TLA Line data Source code
1 : //
2 : //! The Page Service listens for client connections and serves their GetPage@LSN
3 : //! requests.
4 : //
5 : // It is possible to connect here using usual psql/pgbench/libpq. Following
6 : // commands are supported now:
7 : // *status* -- show actual info about this pageserver,
8 : // *pagestream* -- enter mode where smgr and pageserver talk with their
9 : // custom protocol.
10 : //
11 :
12 : use anyhow::Context;
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use bytes::Bytes;
16 : use futures::Stream;
17 : use pageserver_api::models::TenantState;
18 : use pageserver_api::models::{
19 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
20 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
21 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
22 : PagestreamNblocksRequest, PagestreamNblocksResponse,
23 : };
24 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
25 : use pq_proto::framed::ConnectionError;
26 : use pq_proto::FeStartupPacket;
27 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
28 : use std::borrow::Cow;
29 : use std::io;
30 : use std::net::TcpListener;
31 : use std::pin::pin;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::Duration;
36 : use tokio::io::AsyncWriteExt;
37 : use tokio::io::{AsyncRead, AsyncWrite};
38 : use tokio_util::io::StreamReader;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::field;
41 : use tracing::*;
42 : use utils::id::ConnectionId;
43 : use utils::{
44 : auth::{Claims, Scope, SwappableJwtAuth},
45 : id::{TenantId, TimelineId},
46 : lsn::Lsn,
47 : simple_rcu::RcuReadGuard,
48 : };
49 :
50 : use crate::auth::check_permission;
51 : use crate::basebackup;
52 : use crate::config::PageServerConf;
53 : use crate::context::{DownloadBehavior, RequestContext};
54 : use crate::import_datadir::import_wal_from_tar;
55 : use crate::metrics;
56 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
57 : use crate::pgdatadir_mapping::{rel_block_to_key, Version};
58 : use crate::task_mgr;
59 : use crate::task_mgr::TaskKind;
60 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
61 : use crate::tenant::mgr;
62 : use crate::tenant::mgr::get_active_tenant_with_timeout;
63 : use crate::tenant::mgr::GetActiveTenantError;
64 : use crate::tenant::mgr::ShardSelector;
65 : use crate::tenant::timeline::WaitLsnError;
66 : use crate::tenant::GetTimelineError;
67 : use crate::tenant::PageReconstructError;
68 : use crate::tenant::Timeline;
69 : use crate::trace::Tracer;
70 :
71 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
72 : use postgres_ffi::BLCKSZ;
73 :
74 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
75 : // is not yet in state [`TenantState::Active`].
76 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
77 :
78 : /// Read the end of a tar archive.
79 : ///
80 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
81 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
82 : /// and check that there is no more data after the EOF marker.
83 : ///
84 : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
85 : /// Perhaps it should be a hard error?
86 CBC 11 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
87 11 : use tokio::io::AsyncReadExt;
88 11 : let mut buf = [0u8; 512];
89 11 :
90 11 : // Read the all-zeros block, and verify it
91 11 : let mut total_bytes = 0;
92 22 : while total_bytes < 512 {
93 11 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
94 11 : total_bytes += nbytes;
95 11 : if nbytes == 0 {
96 UBC 0 : break;
97 CBC 11 : }
98 : }
99 11 : if total_bytes < 512 {
100 UBC 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
101 CBC 11 : }
102 5632 : if !buf.iter().all(|&x| x == 0) {
103 UBC 0 : anyhow::bail!("invalid tar EOF marker");
104 CBC 11 : }
105 11 :
106 11 : // Drain any data after the EOF marker
107 11 : let mut trailing_bytes = 0;
108 : loop {
109 84 : let nbytes = reader.read(&mut buf).await?;
110 84 : trailing_bytes += nbytes;
111 84 : if nbytes == 0 {
112 11 : break;
113 73 : }
114 : }
115 11 : if trailing_bytes > 0 {
116 7 : warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
117 4 : }
118 11 : Ok(())
119 11 : }
120 :
121 : ///////////////////////////////////////////////////////////////////////////////
122 :
123 : ///
124 : /// Main loop of the page service.
125 : ///
126 : /// Listens for connections, and launches a new handler task for each.
127 : ///
128 557 : pub async fn libpq_listener_main(
129 557 : conf: &'static PageServerConf,
130 557 : broker_client: storage_broker::BrokerClientChannel,
131 557 : auth: Option<Arc<SwappableJwtAuth>>,
132 557 : listener: TcpListener,
133 557 : auth_type: AuthType,
134 557 : listener_ctx: RequestContext,
135 557 : cancel: CancellationToken,
136 557 : ) -> anyhow::Result<()> {
137 557 : listener.set_nonblocking(true)?;
138 557 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
139 :
140 : // Wait for a new connection to arrive, or for server shutdown.
141 8638 : while let Some(res) = tokio::select! {
142 : biased;
143 :
144 : _ = cancel.cancelled() => {
145 : // We were requested to shut down.
146 : None
147 : }
148 :
149 8081 : res = tokio_listener.accept() => {
150 : Some(res)
151 : }
152 : } {
153 8081 : match res {
154 8081 : Ok((socket, peer_addr)) => {
155 : // Connection established. Spawn a new task to handle it.
156 UBC 0 : debug!("accepted connection from {}", peer_addr);
157 CBC 8081 : let local_auth = auth.clone();
158 8081 :
159 8081 : let connection_ctx = listener_ctx
160 8081 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
161 8081 :
162 8081 : // PageRequestHandler tasks are not associated with any particular
163 8081 : // timeline in the task manager. In practice most connections will
164 8081 : // only deal with a particular timeline, but we don't know which one
165 8081 : // yet.
166 8081 : task_mgr::spawn(
167 8081 : &tokio::runtime::Handle::current(),
168 8081 : TaskKind::PageRequestHandler,
169 8081 : None,
170 8081 : None,
171 8081 : "serving compute connection task",
172 8081 : false,
173 8081 : page_service_conn_main(
174 8081 : conf,
175 8081 : broker_client.clone(),
176 8081 : local_auth,
177 8081 : socket,
178 8081 : auth_type,
179 8081 : connection_ctx,
180 8081 : ),
181 8081 : );
182 : }
183 UBC 0 : Err(err) => {
184 : // accept() failed. Log the error, and loop back to retry on next connection.
185 0 : error!("accept() failed: {:?}", err);
186 : }
187 : }
188 : }
189 :
190 0 : debug!("page_service loop terminated");
191 :
192 CBC 159 : Ok(())
193 159 : }
194 :
195 UBC 0 : #[instrument(skip_all, fields(peer_addr))]
196 : async fn page_service_conn_main(
197 : conf: &'static PageServerConf,
198 : broker_client: storage_broker::BrokerClientChannel,
199 : auth: Option<Arc<SwappableJwtAuth>>,
200 : socket: tokio::net::TcpStream,
201 : auth_type: AuthType,
202 : connection_ctx: RequestContext,
203 : ) -> anyhow::Result<()> {
204 : // Immediately increment the gauge, then create a job to decrement it on task exit.
205 : // One of the pros of `defer!` is that this will *most probably*
206 : // get called, even in presence of panics.
207 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
208 : gauge.inc();
209 CBC 8029 : scopeguard::defer! {
210 8029 : gauge.dec();
211 8029 : }
212 :
213 : socket
214 : .set_nodelay(true)
215 : .context("could not set TCP_NODELAY")?;
216 :
217 : let peer_addr = socket.peer_addr().context("get peer address")?;
218 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
219 :
220 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
221 : // - long enough for most valid compute connections
222 : // - less than infinite to stop us from "leaking" connections to long-gone computes
223 : //
224 : // no write timeout is used, because the kernel is assumed to error writes after some time.
225 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
226 :
227 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
228 8081 : let socket_timeout_ms = (|| {
229 8081 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
230 : // Exponential distribution for simulating
231 : // poor network conditions, expect about avg_timeout_ms to be around 15
232 : // in tests
233 GBC 5 : if let Some(avg_timeout_ms) = avg_timeout_ms {
234 5 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
235 5 : let u = rand::random::<f32>();
236 5 : ((1.0 - u).ln() / (-avg)) as u64
237 : } else {
238 UBC 0 : default_timeout_ms
239 : }
240 CBC 8081 : });
241 8076 : default_timeout_ms
242 : })();
243 :
244 : // A timeout here does not mean the client died, it can happen if it's just idle for
245 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
246 : // they reconnect.
247 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
248 : let socket = std::pin::pin!(socket);
249 :
250 : // XXX: pgbackend.run() should take the connection_ctx,
251 : // and create a child per-query context when it invokes process_query.
252 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
253 : // and create the per-query context in process_query ourselves.
254 : let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
255 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
256 :
257 : match pgbackend
258 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
259 : .await
260 : {
261 : Ok(()) => {
262 : // we've been requested to shut down
263 : Ok(())
264 : }
265 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
266 : if is_expected_io_error(&io_error) {
267 UBC 0 : info!("Postgres client disconnected ({io_error})");
268 : Ok(())
269 : } else {
270 : Err(io_error).context("Postgres connection error")
271 : }
272 : }
273 : other => other.context("Postgres query error"),
274 : }
275 : }
276 :
277 : struct PageServerHandler {
278 : _conf: &'static PageServerConf,
279 : broker_client: storage_broker::BrokerClientChannel,
280 : auth: Option<Arc<SwappableJwtAuth>>,
281 : claims: Option<Claims>,
282 :
283 : /// The context created for the lifetime of the connection
284 : /// services by this PageServerHandler.
285 : /// For each query received over the connection,
286 : /// `process_query` creates a child context from this one.
287 : connection_ctx: RequestContext,
288 : }
289 :
290 CBC 2 : #[derive(thiserror::Error, Debug)]
291 : enum PageStreamError {
292 : /// We encountered an error that should prompt the client to reconnect:
293 : /// in practice this means we drop the connection without sending a response.
294 : #[error("Reconnect required: {0}")]
295 : Reconnect(Cow<'static, str>),
296 :
297 : /// We were instructed to shutdown while processing the query
298 : #[error("Shutting down")]
299 : Shutdown,
300 :
301 : /// Something went wrong reading a page: this likely indicates a pageserver bug
302 : #[error("Read error: {0}")]
303 : Read(PageReconstructError),
304 :
305 : /// Ran out of time waiting for an LSN
306 : #[error("LSN timeout: {0}")]
307 : LsnTimeout(WaitLsnError),
308 :
309 : /// The entity required to serve the request (tenant or timeline) is not found,
310 : /// or is not found in a suitable state to serve a request.
311 : #[error("Not found: {0}")]
312 : NotFound(std::borrow::Cow<'static, str>),
313 :
314 : /// Request asked for something that doesn't make sense, like an invalid LSN
315 : #[error("Bad request: {0}")]
316 : BadRequest(std::borrow::Cow<'static, str>),
317 : }
318 :
319 : impl From<PageReconstructError> for PageStreamError {
320 3 : fn from(value: PageReconstructError) -> Self {
321 3 : match value {
322 3 : PageReconstructError::Cancelled => Self::Shutdown,
323 UBC 0 : e => Self::Read(e),
324 : }
325 CBC 3 : }
326 : }
327 :
328 : impl From<GetActiveTimelineError> for PageStreamError {
329 UBC 0 : fn from(value: GetActiveTimelineError) -> Self {
330 0 : match value {
331 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
332 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
333 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
334 : }
335 0 : }
336 : }
337 :
338 : impl From<WaitLsnError> for PageStreamError {
339 0 : fn from(value: WaitLsnError) -> Self {
340 0 : match value {
341 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
342 0 : WaitLsnError::Shutdown => Self::Shutdown,
343 0 : WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
344 : }
345 0 : }
346 : }
347 :
348 : impl PageServerHandler {
349 CBC 8081 : pub fn new(
350 8081 : conf: &'static PageServerConf,
351 8081 : broker_client: storage_broker::BrokerClientChannel,
352 8081 : auth: Option<Arc<SwappableJwtAuth>>,
353 8081 : connection_ctx: RequestContext,
354 8081 : ) -> Self {
355 8081 : PageServerHandler {
356 8081 : _conf: conf,
357 8081 : broker_client,
358 8081 : auth,
359 8081 : claims: None,
360 8081 : connection_ctx,
361 8081 : }
362 8081 : }
363 :
364 : /// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
365 : /// this rather than naked flush() in order to shut down promptly. Without this, we would
366 : /// block shutdown of a tenant if a postgres client was failing to consume bytes we send
367 : /// in the flush.
368 3649597 : async fn flush_cancellable<IO>(
369 3649597 : &self,
370 3649597 : pgb: &mut PostgresBackend<IO>,
371 3649597 : cancel: &CancellationToken,
372 3649597 : ) -> Result<(), QueryError>
373 3649597 : where
374 3649597 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
375 3649597 : {
376 3655643 : tokio::select!(
377 3649597 : flush_r = pgb.flush() => {
378 : Ok(flush_r?)
379 : },
380 : _ = cancel.cancelled() => {
381 : Err(QueryError::Shutdown)
382 : }
383 : )
384 3649597 : }
385 :
386 13 : fn copyin_stream<'a, IO>(
387 13 : &'a self,
388 13 : pgb: &'a mut PostgresBackend<IO>,
389 13 : cancel: &'a CancellationToken,
390 13 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
391 13 : where
392 13 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
393 13 : {
394 13 : async_stream::try_stream! {
395 : loop {
396 83402 : let msg = tokio::select! {
397 : biased;
398 :
399 : _ = cancel.cancelled() => {
400 : // We were requested to shut down.
401 : let msg = "pageserver is shutting down";
402 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
403 : Err(QueryError::Shutdown)
404 : }
405 :
406 83402 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
407 : };
408 :
409 83402 : match msg {
410 83402 : Ok(Some(message)) => {
411 83402 : let copy_data_bytes = match message {
412 83383 : FeMessage::CopyData(bytes) => bytes,
413 12 : FeMessage::CopyDone => { break },
414 7 : FeMessage::Sync => continue,
415 : FeMessage::Terminate => {
416 UBC 0 : let msg = "client terminated connection with Terminate message during COPY";
417 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
418 0 : // error can't happen here, ErrorResponse serialization should be always ok
419 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
420 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
421 0 : break;
422 : }
423 0 : m => {
424 0 : let msg = format!("unexpected message {m:?}");
425 0 : // error can't happen here, ErrorResponse serialization should be always ok
426 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
427 0 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
428 0 : break;
429 : }
430 : };
431 :
432 CBC 83383 : yield copy_data_bytes;
433 : }
434 : Ok(None) => {
435 UBC 0 : let msg = "client closed connection during COPY";
436 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
437 0 : // error can't happen here, ErrorResponse serialization should be always ok
438 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
439 0 : self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
440 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
441 : }
442 0 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
443 0 : Err(io_error)?;
444 : }
445 0 : Err(other) => {
446 0 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
447 : }
448 : };
449 : }
450 : }
451 CBC 13 : }
452 :
453 6961 : #[instrument(skip_all)]
454 : async fn handle_pagerequests<IO>(
455 : &self,
456 : pgb: &mut PostgresBackend<IO>,
457 : tenant_id: TenantId,
458 : timeline_id: TimelineId,
459 : ctx: RequestContext,
460 : ) -> Result<(), QueryError>
461 : where
462 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
463 : {
464 : debug_assert_current_span_has_tenant_and_timeline_id();
465 :
466 : // Note that since one connection may contain getpage requests that target different
467 : // shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
468 : // that we look up here may not be the one that serves all the actual requests: we will double
469 : // check the mapping of key->shard later before calling into Timeline for getpage requests.
470 : let tenant = mgr::get_active_tenant_with_timeout(
471 : tenant_id,
472 : ShardSelector::First,
473 : ACTIVE_TENANT_TIMEOUT,
474 : &task_mgr::shutdown_token(),
475 : )
476 : .await?;
477 :
478 : // Make request tracer if needed
479 : let mut tracer = if tenant.get_trace_read_requests() {
480 : let connection_id = ConnectionId::generate();
481 : let path =
482 : tenant
483 : .conf
484 : .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
485 : Some(Tracer::new(path))
486 : } else {
487 : None
488 : };
489 :
490 : // Check that the timeline exists
491 : let timeline = tenant
492 : .get_timeline(timeline_id, true)
493 UBC 0 : .map_err(|e| QueryError::NotFound(format!("{e}").into()))?;
494 :
495 : // Avoid starting new requests if the timeline has already started shutting down,
496 : // and block timeline shutdown until this request is complete, or drops out due
497 : // to cancellation.
498 0 : let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
499 :
500 : // switch client to COPYBOTH
501 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
502 : self.flush_cancellable(pgb, &timeline.cancel).await?;
503 :
504 : let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
505 :
506 : loop {
507 CBC 7029451 : let msg = tokio::select! {
508 : biased;
509 :
510 : _ = timeline.cancel.cancelled() => {
511 : // We were requested to shut down.
512 178 : info!("shutdown request received in page handler");
513 : return Err(QueryError::Shutdown)
514 : }
515 :
516 3648235 : msg = pgb.read_message() => { msg }
517 : };
518 :
519 : let copy_data_bytes = match msg? {
520 : Some(FeMessage::CopyData(bytes)) => bytes,
521 : Some(FeMessage::Terminate) => break,
522 : Some(m) => {
523 : return Err(QueryError::Other(anyhow::anyhow!(
524 : "unexpected message: {m:?} during COPY"
525 : )));
526 : }
527 : None => break, // client disconnected
528 : };
529 :
530 UBC 0 : trace!("query: {copy_data_bytes:?}");
531 :
532 : // Trace request if needed
533 : if let Some(t) = tracer.as_mut() {
534 : t.trace(©_data_bytes)
535 : }
536 :
537 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
538 :
539 : // TODO: We could create a new per-request context here, with unique ID.
540 : // Currently we use the same per-timeline context for all requests
541 :
542 : let (response, span) = match neon_fe_msg {
543 : PagestreamFeMessage::Exists(req) => {
544 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
545 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
546 : (
547 : self.handle_get_rel_exists_request(&timeline, &req, &ctx)
548 : .instrument(span.clone())
549 : .await,
550 : span,
551 : )
552 : }
553 : PagestreamFeMessage::Nblocks(req) => {
554 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
555 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
556 : (
557 : self.handle_get_nblocks_request(&timeline, &req, &ctx)
558 : .instrument(span.clone())
559 : .await,
560 : span,
561 : )
562 : }
563 : PagestreamFeMessage::GetPage(req) => {
564 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
565 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
566 : (
567 : self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
568 : .instrument(span.clone())
569 : .await,
570 : span,
571 : )
572 : }
573 : PagestreamFeMessage::DbSize(req) => {
574 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
575 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
576 : (
577 : self.handle_db_size_request(&timeline, &req, &ctx)
578 : .instrument(span.clone())
579 : .await,
580 : span,
581 : )
582 : }
583 : };
584 :
585 : match response {
586 : Err(PageStreamError::Shutdown) => {
587 : // If we fail to fulfil a request during shutdown, which may be _because_ of
588 : // shutdown, then do not send the error to the client. Instead just drop the
589 : // connection.
590 CBC 3 : span.in_scope(|| info!("dropping connection due to shutdown"));
591 : return Err(QueryError::Shutdown);
592 : }
593 : Err(PageStreamError::Reconnect(reason)) => {
594 UBC 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
595 : return Err(QueryError::Reconnect);
596 : }
597 : Err(e) if timeline.cancel.is_cancelled() || timeline.is_stopping() => {
598 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
599 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
600 : //
601 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
602 : // because wait_lsn etc will drop out
603 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
604 : // is_canceled(): [`Timeline::shutdown`]` has entered
605 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
606 : return Err(QueryError::Shutdown);
607 : }
608 : r => {
609 CBC 1 : let response_msg = r.unwrap_or_else(|e| {
610 1 : // print the all details to the log with {:#}, but for the client the
611 1 : // error message is enough. Do not log if shutting down, as the anyhow::Error
612 1 : // here includes cancellation which is not an error.
613 1 : span.in_scope(|| error!("error reading relation or page version: {:#}", e));
614 1 : PagestreamBeMessage::Error(PagestreamErrorResponse {
615 1 : message: e.to_string(),
616 1 : })
617 1 : });
618 :
619 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
620 : self.flush_cancellable(pgb, &timeline.cancel).await?;
621 : }
622 : }
623 : }
624 : Ok(())
625 : }
626 :
627 : #[allow(clippy::too_many_arguments)]
628 11 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
629 : async fn handle_import_basebackup<IO>(
630 : &self,
631 : pgb: &mut PostgresBackend<IO>,
632 : tenant_id: TenantId,
633 : timeline_id: TimelineId,
634 : base_lsn: Lsn,
635 : _end_lsn: Lsn,
636 : pg_version: u32,
637 : ctx: RequestContext,
638 : ) -> Result<(), QueryError>
639 : where
640 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
641 : {
642 : debug_assert_current_span_has_tenant_and_timeline_id();
643 :
644 : // Create empty timeline
645 11 : info!("creating new timeline");
646 : let tenant = get_active_tenant_with_timeout(
647 : tenant_id,
648 : ShardSelector::Zero,
649 : ACTIVE_TENANT_TIMEOUT,
650 : &task_mgr::shutdown_token(),
651 : )
652 : .await?;
653 : let timeline = tenant
654 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
655 : .await?;
656 :
657 : // TODO mark timeline as not ready until it reaches end_lsn.
658 : // We might have some wal to import as well, and we should prevent compute
659 : // from connecting before that and writing conflicting wal.
660 : //
661 : // This is not relevant for pageserver->pageserver migrations, since there's
662 : // no wal to import. But should be fixed if we want to import from postgres.
663 :
664 : // TODO leave clean state on error. For now you can use detach to clean
665 : // up broken state from a failed import.
666 :
667 : // Import basebackup provided via CopyData
668 11 : info!("importing basebackup");
669 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
670 : self.flush_cancellable(pgb, &tenant.cancel).await?;
671 :
672 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
673 : timeline
674 : .import_basebackup_from_tar(
675 : &mut copyin_reader,
676 : base_lsn,
677 : self.broker_client.clone(),
678 : &ctx,
679 : )
680 : .await?;
681 :
682 : // Read the end of the tar archive.
683 : read_tar_eof(copyin_reader).await?;
684 :
685 : // TODO check checksum
686 : // Meanwhile you can verify client-side by taking fullbackup
687 : // and checking that it matches in size with what was imported.
688 : // It wouldn't work if base came from vanilla postgres though,
689 : // since we discard some log files.
690 :
691 9 : info!("done");
692 : Ok(())
693 : }
694 :
695 UBC 0 : #[instrument(skip_all, fields(%start_lsn, %end_lsn))]
696 : async fn handle_import_wal<IO>(
697 : &self,
698 : pgb: &mut PostgresBackend<IO>,
699 : tenant_id: TenantId,
700 : timeline_id: TimelineId,
701 : start_lsn: Lsn,
702 : end_lsn: Lsn,
703 : ctx: RequestContext,
704 : ) -> Result<(), QueryError>
705 : where
706 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
707 : {
708 : debug_assert_current_span_has_tenant_and_timeline_id();
709 :
710 : let timeline = self
711 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
712 : .await?;
713 : let last_record_lsn = timeline.get_last_record_lsn();
714 : if last_record_lsn != start_lsn {
715 : return Err(QueryError::Other(
716 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
717 : );
718 : }
719 :
720 : // TODO leave clean state on error. For now you can use detach to clean
721 : // up broken state from a failed import.
722 :
723 : // Import wal provided via CopyData
724 CBC 2 : info!("importing wal");
725 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
726 : self.flush_cancellable(pgb, &timeline.cancel).await?;
727 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
728 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
729 2 : info!("wal import complete");
730 :
731 : // Read the end of the tar archive.
732 : read_tar_eof(copyin_reader).await?;
733 :
734 : // TODO Does it make sense to overshoot?
735 : if timeline.get_last_record_lsn() < end_lsn {
736 : return Err(QueryError::Other(
737 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
738 : );
739 : }
740 :
741 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
742 : // We only want to persist the data, and it doesn't matter if it's in the
743 : // shape of deltas or images.
744 2 : info!("flushing layers");
745 : timeline.freeze_and_flush().await?;
746 :
747 2 : info!("done");
748 : Ok(())
749 : }
750 :
751 : /// Helper function to handle the LSN from client request.
752 : ///
753 : /// Each GetPage (and Exists and Nblocks) request includes information about
754 : /// which version of the page is being requested. The client can request the
755 : /// latest version of the page, or the version that's valid at a particular
756 : /// LSN. The primary compute node will always request the latest page
757 : /// version, while a standby will request a version at the LSN that it's
758 : /// currently caught up to.
759 : ///
760 : /// In either case, if the page server hasn't received the WAL up to the
761 : /// requested LSN yet, we will wait for it to arrive. The return value is
762 : /// the LSN that should be used to look up the page versions.
763 3641536 : async fn wait_or_get_last_lsn(
764 3641536 : timeline: &Timeline,
765 3641536 : mut lsn: Lsn,
766 3641536 : latest: bool,
767 3641536 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
768 3641536 : ctx: &RequestContext,
769 3641536 : ) -> Result<Lsn, PageStreamError> {
770 3641536 : if latest {
771 : // Latest page version was requested. If LSN is given, it is a hint
772 : // to the page server that there have been no modifications to the
773 : // page after that LSN. If we haven't received WAL up to that point,
774 : // wait until it arrives.
775 3473262 : let last_record_lsn = timeline.get_last_record_lsn();
776 3473262 :
777 3473262 : // Note: this covers the special case that lsn == Lsn(0). That
778 3473262 : // special case means "return the latest version whatever it is",
779 3473262 : // and it's used for bootstrapping purposes, when the page server is
780 3473262 : // connected directly to the compute node. That is needed because
781 3473262 : // when you connect to the compute node, to receive the WAL, the
782 3473262 : // walsender process will do a look up in the pg_authid catalog
783 3473262 : // table for authentication. That poses a deadlock problem: the
784 3473262 : // catalog table lookup will send a GetPage request, but the GetPage
785 3473262 : // request will block in the page server because the recent WAL
786 3473262 : // hasn't been received yet, and it cannot be received until the
787 3473262 : // walsender completes the authentication and starts streaming the
788 3473262 : // WAL.
789 3473262 : if lsn <= last_record_lsn {
790 3420188 : lsn = last_record_lsn;
791 3420188 : } else {
792 103368 : timeline.wait_lsn(lsn, ctx).await?;
793 : // Since we waited for 'lsn' to arrive, that is now the last
794 : // record LSN. (Or close enough for our purposes; the
795 : // last-record LSN can advance immediately after we return
796 : // anyway)
797 : }
798 : } else {
799 168274 : if lsn == Lsn(0) {
800 1 : return Err(PageStreamError::BadRequest(
801 1 : "invalid LSN(0) in request".into(),
802 1 : ));
803 168273 : }
804 168273 : timeline.wait_lsn(lsn, ctx).await?;
805 : }
806 :
807 3641535 : if lsn < **latest_gc_cutoff_lsn {
808 UBC 0 : return Err(PageStreamError::BadRequest(format!(
809 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
810 0 : lsn, **latest_gc_cutoff_lsn
811 0 : ).into()));
812 CBC 3641535 : }
813 3641535 : Ok(lsn)
814 3641536 : }
815 :
816 42614 : async fn handle_get_rel_exists_request(
817 42614 : &self,
818 42614 : timeline: &Timeline,
819 42614 : req: &PagestreamExistsRequest,
820 42614 : ctx: &RequestContext,
821 42614 : ) -> Result<PagestreamBeMessage, PageStreamError> {
822 42614 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
823 42614 : let lsn =
824 42614 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
825 UBC 0 : .await?;
826 :
827 CBC 42614 : let exists = timeline
828 42614 : .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
829 9224 : .await?;
830 :
831 42614 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
832 42614 : exists,
833 42614 : }))
834 42614 : }
835 :
836 15892 : async fn handle_get_nblocks_request(
837 15892 : &self,
838 15892 : timeline: &Timeline,
839 15892 : req: &PagestreamNblocksRequest,
840 15892 : ctx: &RequestContext,
841 15892 : ) -> Result<PagestreamBeMessage, PageStreamError> {
842 15892 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
843 15892 : let lsn =
844 15892 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
845 200 : .await?;
846 :
847 15892 : let n_blocks = timeline
848 15892 : .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
849 289 : .await?;
850 :
851 15892 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
852 15892 : n_blocks,
853 15892 : }))
854 15892 : }
855 :
856 5 : async fn handle_db_size_request(
857 5 : &self,
858 5 : timeline: &Timeline,
859 5 : req: &PagestreamDbSizeRequest,
860 5 : ctx: &RequestContext,
861 5 : ) -> Result<PagestreamBeMessage, PageStreamError> {
862 5 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
863 5 : let lsn =
864 5 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
865 UBC 0 : .await?;
866 :
867 CBC 5 : let total_blocks = timeline
868 5 : .get_db_size(
869 5 : DEFAULTTABLESPACE_OID,
870 5 : req.dbnode,
871 5 : Version::Lsn(lsn),
872 5 : req.latest,
873 5 : ctx,
874 5 : )
875 91 : .await?;
876 5 : let db_size = total_blocks as i64 * BLCKSZ as i64;
877 5 :
878 5 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
879 5 : db_size,
880 5 : }))
881 5 : }
882 :
883 3583025 : async fn do_handle_get_page_at_lsn_request(
884 3583025 : &self,
885 3583025 : timeline: &Timeline,
886 3583025 : req: &PagestreamGetPageRequest,
887 3583025 : ctx: &RequestContext,
888 3583025 : ) -> Result<PagestreamBeMessage, PageStreamError> {
889 3583025 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
890 3583024 : let lsn =
891 3583025 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
892 103170 : .await?;
893 3583024 : let page = timeline
894 3583024 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
895 1489524 : .await?;
896 :
897 3582995 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
898 3582995 : page,
899 3582995 : }))
900 3582999 : }
901 :
902 3583025 : async fn handle_get_page_at_lsn_request(
903 3583025 : &self,
904 3583025 : timeline: &Timeline,
905 3583025 : req: &PagestreamGetPageRequest,
906 3583025 : ctx: &RequestContext,
907 3583025 : ) -> Result<PagestreamBeMessage, PageStreamError> {
908 3583025 : let key = rel_block_to_key(req.rel, req.blkno);
909 3583025 : if timeline.get_shard_identity().is_key_local(&key) {
910 3583025 : self.do_handle_get_page_at_lsn_request(timeline, req, ctx)
911 1592694 : .await
912 : } else {
913 : // The Tenant shard we looked up at connection start does not hold this particular
914 : // key: look for other shards in this tenant. This scenario occurs if a pageserver
915 : // has multiple shards for the same tenant.
916 : //
917 : // TODO: optimize this (https://github.com/neondatabase/neon/pull/6037)
918 UBC 0 : let timeline = match self
919 0 : .get_active_tenant_timeline(
920 0 : timeline.tenant_shard_id.tenant_id,
921 0 : timeline.timeline_id,
922 0 : ShardSelector::Page(key),
923 0 : )
924 0 : .await
925 : {
926 0 : Ok(t) => t,
927 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
928 : // We already know this tenant exists in general, because we resolved it at
929 : // start of connection. Getting a NotFound here indicates that the shard containing
930 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
931 : // mapping is out of date.
932 0 : tracing::info!("Page request routed to wrong shard: my identity {:?}, should go to shard {}, key {}",
933 0 : timeline.get_shard_identity(), timeline.get_shard_identity().get_shard_number(&key).0, key);
934 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
935 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
936 : // and talk to a different pageserver.
937 0 : return Err(PageStreamError::Reconnect(
938 0 : "getpage@lsn request routed to wrong shard".into(),
939 0 : ));
940 : }
941 0 : Err(e) => return Err(e.into()),
942 : };
943 :
944 : // Take a GateGuard for the duration of this request. If we were using our main Timeline object,
945 : // the GateGuard was already held over the whole connection.
946 0 : let _timeline_guard = timeline
947 0 : .gate
948 0 : .enter()
949 0 : .map_err(|_| PageStreamError::Shutdown)?;
950 :
951 0 : self.do_handle_get_page_at_lsn_request(&timeline, req, ctx)
952 0 : .await
953 : }
954 CBC 3582999 : }
955 :
956 : #[allow(clippy::too_many_arguments)]
957 185 : #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
958 : async fn handle_basebackup_request<IO>(
959 : &mut self,
960 : pgb: &mut PostgresBackend<IO>,
961 : tenant_id: TenantId,
962 : timeline_id: TimelineId,
963 : lsn: Option<Lsn>,
964 : prev_lsn: Option<Lsn>,
965 : full_backup: bool,
966 : gzip: bool,
967 : ctx: RequestContext,
968 : ) -> anyhow::Result<()>
969 : where
970 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
971 : {
972 : debug_assert_current_span_has_tenant_and_timeline_id();
973 :
974 : let started = std::time::Instant::now();
975 :
976 : // check that the timeline exists
977 : let timeline = self
978 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
979 : .await?;
980 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
981 : if let Some(lsn) = lsn {
982 : // Backup was requested at a particular LSN. Wait for it to arrive.
983 185 : info!("waiting for {}", lsn);
984 : timeline.wait_lsn(lsn, &ctx).await?;
985 : timeline
986 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
987 : .context("invalid basebackup lsn")?;
988 : }
989 :
990 : let lsn_awaited_after = started.elapsed();
991 :
992 : // switch client to COPYOUT
993 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
994 : self.flush_cancellable(pgb, &timeline.cancel).await?;
995 :
996 : // Send a tarball of the latest layer on the timeline. Compress if not
997 : // fullbackup. TODO Compress in that case too (tests need to be updated)
998 : if full_backup {
999 : let mut writer = pgb.copyout_writer();
1000 : basebackup::send_basebackup_tarball(
1001 : &mut writer,
1002 : &timeline,
1003 : lsn,
1004 : prev_lsn,
1005 : full_backup,
1006 : &ctx,
1007 : )
1008 : .await?;
1009 : } else {
1010 : let mut writer = pgb.copyout_writer();
1011 : if gzip {
1012 : let mut encoder = GzipEncoder::with_quality(
1013 : writer,
1014 : // NOTE using fast compression because it's on the critical path
1015 : // for compute startup. For an empty database, we get
1016 : // <100KB with this method. The Level::Best compression method
1017 : // gives us <20KB, but maybe we should add basebackup caching
1018 : // on compute shutdown first.
1019 : async_compression::Level::Fastest,
1020 : );
1021 : basebackup::send_basebackup_tarball(
1022 : &mut encoder,
1023 : &timeline,
1024 : lsn,
1025 : prev_lsn,
1026 : full_backup,
1027 : &ctx,
1028 : )
1029 : .await?;
1030 : // shutdown the encoder to ensure the gzip footer is written
1031 : encoder.shutdown().await?;
1032 : } else {
1033 : basebackup::send_basebackup_tarball(
1034 : &mut writer,
1035 : &timeline,
1036 : lsn,
1037 : prev_lsn,
1038 : full_backup,
1039 : &ctx,
1040 : )
1041 : .await?;
1042 : }
1043 : }
1044 :
1045 : pgb.write_message_noflush(&BeMessage::CopyDone)?;
1046 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1047 :
1048 : let basebackup_after = started
1049 : .elapsed()
1050 : .checked_sub(lsn_awaited_after)
1051 : .unwrap_or(Duration::ZERO);
1052 :
1053 557 : info!(
1054 557 : lsn_await_millis = lsn_awaited_after.as_millis(),
1055 557 : basebackup_millis = basebackup_after.as_millis(),
1056 557 : "basebackup complete"
1057 557 : );
1058 :
1059 : Ok(())
1060 : }
1061 :
1062 : // when accessing management api supply None as an argument
1063 : // when using to authorize tenant pass corresponding tenant id
1064 8069 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1065 8069 : if self.auth.is_none() {
1066 : // auth is set to Trust, nothing to check so just return ok
1067 7989 : return Ok(());
1068 80 : }
1069 80 : // auth is some, just checked above, when auth is some
1070 80 : // then claims are always present because of checks during connection init
1071 80 : // so this expect won't trigger
1072 80 : let claims = self
1073 80 : .claims
1074 80 : .as_ref()
1075 80 : .expect("claims presence already checked");
1076 80 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1077 8069 : }
1078 :
1079 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1080 576 : async fn get_active_tenant_timeline(
1081 576 : &self,
1082 576 : tenant_id: TenantId,
1083 576 : timeline_id: TimelineId,
1084 576 : selector: ShardSelector,
1085 576 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1086 576 : let tenant = get_active_tenant_with_timeout(
1087 576 : tenant_id,
1088 576 : selector,
1089 576 : ACTIVE_TENANT_TIMEOUT,
1090 576 : &task_mgr::shutdown_token(),
1091 576 : )
1092 3 : .await
1093 576 : .map_err(GetActiveTimelineError::Tenant)?;
1094 576 : let timeline = tenant.get_timeline(timeline_id, true)?;
1095 575 : Ok(timeline)
1096 576 : }
1097 : }
1098 :
1099 : #[async_trait::async_trait]
1100 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1101 : where
1102 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1103 : {
1104 82 : fn check_auth_jwt(
1105 82 : &mut self,
1106 82 : _pgb: &mut PostgresBackend<IO>,
1107 82 : jwt_response: &[u8],
1108 82 : ) -> Result<(), QueryError> {
1109 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1110 : // which requires auth to be present
1111 82 : let data = self
1112 82 : .auth
1113 82 : .as_ref()
1114 82 : .unwrap()
1115 82 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1116 82 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1117 :
1118 82 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1119 UBC 0 : return Err(QueryError::Unauthorized(
1120 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1121 0 : ));
1122 CBC 82 : }
1123 82 :
1124 82 : debug!(
1125 UBC 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1126 0 : data.claims.scope, data.claims.tenant_id,
1127 0 : );
1128 :
1129 CBC 82 : self.claims = Some(data.claims);
1130 82 : Ok(())
1131 82 : }
1132 :
1133 8081 : fn startup(
1134 8081 : &mut self,
1135 8081 : _pgb: &mut PostgresBackend<IO>,
1136 8081 : _sm: &FeStartupPacket,
1137 8081 : ) -> Result<(), QueryError> {
1138 8081 : Ok(())
1139 8081 : }
1140 :
1141 UBC 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1142 : async fn process_query(
1143 : &mut self,
1144 : pgb: &mut PostgresBackend<IO>,
1145 : query_string: &str,
1146 CBC 8096 : ) -> Result<(), QueryError> {
1147 8096 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1148 GBC 7 : info!("Hit failpoint for bad connection");
1149 7 : Err(QueryError::SimulatedConnectionError)
1150 CBC 8096 : });
1151 :
1152 8089 : let ctx = self.connection_ctx.attached_child();
1153 8089 : debug!("process query {query_string:?}");
1154 8089 : if query_string.starts_with("pagestream ") {
1155 7475 : let (_, params_raw) = query_string.split_at("pagestream ".len());
1156 7475 : let params = params_raw.split(' ').collect::<Vec<_>>();
1157 7475 : if params.len() != 2 {
1158 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1159 0 : "invalid param number for pagestream command"
1160 0 : )));
1161 CBC 7475 : }
1162 7475 : let tenant_id = TenantId::from_str(params[0])
1163 7475 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1164 7475 : let timeline_id = TimelineId::from_str(params[1])
1165 7475 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1166 :
1167 7475 : tracing::Span::current()
1168 7475 : .record("tenant_id", field::display(tenant_id))
1169 7475 : .record("timeline_id", field::display(timeline_id));
1170 7475 :
1171 7475 : self.check_permission(Some(tenant_id))?;
1172 :
1173 7475 : self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
1174 4989978 : .await?;
1175 614 : } else if query_string.starts_with("basebackup ") {
1176 550 : let (_, params_raw) = query_string.split_at("basebackup ".len());
1177 550 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1178 550 :
1179 550 : if params.len() < 2 {
1180 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1181 0 : "invalid param number for basebackup command"
1182 0 : )));
1183 CBC 550 : }
1184 :
1185 550 : let tenant_id = TenantId::from_str(params[0])
1186 550 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1187 550 : let timeline_id = TimelineId::from_str(params[1])
1188 550 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1189 :
1190 550 : tracing::Span::current()
1191 550 : .record("tenant_id", field::display(tenant_id))
1192 550 : .record("timeline_id", field::display(timeline_id));
1193 550 :
1194 550 : self.check_permission(Some(tenant_id))?;
1195 :
1196 550 : let lsn = if params.len() >= 3 {
1197 : Some(
1198 170 : Lsn::from_str(params[2])
1199 170 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1200 : )
1201 : } else {
1202 380 : None
1203 : };
1204 :
1205 550 : let gzip = if params.len() >= 4 {
1206 170 : if params[3] == "--gzip" {
1207 170 : true
1208 : } else {
1209 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1210 0 : "Parameter in position 3 unknown {}",
1211 0 : params[3],
1212 0 : )));
1213 : }
1214 : } else {
1215 CBC 380 : false
1216 : };
1217 :
1218 550 : ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
1219 550 : &*metrics::BASEBACKUP_QUERY_TIME,
1220 550 : async move {
1221 550 : self.handle_basebackup_request(
1222 550 : pgb,
1223 550 : tenant_id,
1224 550 : timeline_id,
1225 550 : lsn,
1226 550 : None,
1227 550 : false,
1228 550 : gzip,
1229 550 : ctx,
1230 550 : )
1231 28234 : .await?;
1232 542 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1233 542 : anyhow::Ok(())
1234 550 : },
1235 550 : )
1236 28234 : .await?;
1237 : }
1238 : // return pair of prev_lsn and last_lsn
1239 64 : else if query_string.starts_with("get_last_record_rlsn ") {
1240 11 : let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
1241 11 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1242 11 :
1243 11 : if params.len() != 2 {
1244 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1245 0 : "invalid param number for get_last_record_rlsn command"
1246 0 : )));
1247 CBC 11 : }
1248 :
1249 11 : let tenant_id = TenantId::from_str(params[0])
1250 11 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1251 11 : let timeline_id = TimelineId::from_str(params[1])
1252 11 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1253 :
1254 11 : tracing::Span::current()
1255 11 : .record("tenant_id", field::display(tenant_id))
1256 11 : .record("timeline_id", field::display(timeline_id));
1257 11 :
1258 11 : self.check_permission(Some(tenant_id))?;
1259 9 : let timeline = self
1260 9 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1261 UBC 0 : .await?;
1262 :
1263 CBC 9 : let end_of_timeline = timeline.get_last_record_rlsn();
1264 9 :
1265 9 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1266 9 : RowDescriptor::text_col(b"prev_lsn"),
1267 9 : RowDescriptor::text_col(b"last_lsn"),
1268 9 : ]))?
1269 9 : .write_message_noflush(&BeMessage::DataRow(&[
1270 9 : Some(end_of_timeline.prev.to_string().as_bytes()),
1271 9 : Some(end_of_timeline.last.to_string().as_bytes()),
1272 9 : ]))?
1273 9 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1274 : }
1275 : // same as basebackup, but result includes relational data as well
1276 53 : else if query_string.starts_with("fullbackup ") {
1277 15 : let (_, params_raw) = query_string.split_at("fullbackup ".len());
1278 15 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1279 15 :
1280 15 : if params.len() < 2 {
1281 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1282 0 : "invalid param number for fullbackup command"
1283 0 : )));
1284 CBC 15 : }
1285 :
1286 15 : let tenant_id = TenantId::from_str(params[0])
1287 15 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1288 15 : let timeline_id = TimelineId::from_str(params[1])
1289 15 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1290 :
1291 15 : tracing::Span::current()
1292 15 : .record("tenant_id", field::display(tenant_id))
1293 15 : .record("timeline_id", field::display(timeline_id));
1294 :
1295 : // The caller is responsible for providing correct lsn and prev_lsn.
1296 15 : let lsn = if params.len() > 2 {
1297 : Some(
1298 15 : Lsn::from_str(params[2])
1299 15 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1300 : )
1301 : } else {
1302 UBC 0 : None
1303 : };
1304 CBC 15 : let prev_lsn = if params.len() > 3 {
1305 : Some(
1306 12 : Lsn::from_str(params[3])
1307 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
1308 : )
1309 : } else {
1310 3 : None
1311 : };
1312 :
1313 15 : self.check_permission(Some(tenant_id))?;
1314 :
1315 : // Check that the timeline exists
1316 15 : self.handle_basebackup_request(
1317 15 : pgb,
1318 15 : tenant_id,
1319 15 : timeline_id,
1320 15 : lsn,
1321 15 : prev_lsn,
1322 15 : true,
1323 15 : false,
1324 15 : ctx,
1325 15 : )
1326 7559 : .await?;
1327 15 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1328 38 : } else if query_string.starts_with("import basebackup ") {
1329 : // Import the `base` section (everything but the wal) of a basebackup.
1330 : // Assumes the tenant already exists on this pageserver.
1331 : //
1332 : // Files are scheduled to be persisted to remote storage, and the
1333 : // caller should poll the http api to check when that is done.
1334 : //
1335 : // Example import command:
1336 : // 1. Get start/end LSN from backup_manifest file
1337 : // 2. Run:
1338 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1339 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1340 11 : let (_, params_raw) = query_string.split_at("import basebackup ".len());
1341 11 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1342 11 : if params.len() != 5 {
1343 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1344 0 : "invalid param number for import basebackup command"
1345 0 : )));
1346 CBC 11 : }
1347 11 : let tenant_id = TenantId::from_str(params[0])
1348 11 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1349 11 : let timeline_id = TimelineId::from_str(params[1])
1350 11 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1351 11 : let base_lsn = Lsn::from_str(params[2])
1352 11 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1353 11 : let end_lsn = Lsn::from_str(params[3])
1354 11 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1355 11 : let pg_version = u32::from_str(params[4])
1356 11 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1357 :
1358 11 : tracing::Span::current()
1359 11 : .record("tenant_id", field::display(tenant_id))
1360 11 : .record("timeline_id", field::display(timeline_id));
1361 11 :
1362 11 : self.check_permission(Some(tenant_id))?;
1363 :
1364 11 : match self
1365 11 : .handle_import_basebackup(
1366 11 : pgb,
1367 11 : tenant_id,
1368 11 : timeline_id,
1369 11 : base_lsn,
1370 11 : end_lsn,
1371 11 : pg_version,
1372 11 : ctx,
1373 11 : )
1374 10177 : .await
1375 : {
1376 9 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1377 2 : Err(e) => {
1378 2 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1379 2 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1380 2 : &e.to_string(),
1381 2 : Some(e.pg_error_code()),
1382 2 : ))?
1383 : }
1384 : };
1385 27 : } else if query_string.starts_with("import wal ") {
1386 : // Import the `pg_wal` section of a basebackup.
1387 : //
1388 : // Files are scheduled to be persisted to remote storage, and the
1389 : // caller should poll the http api to check when that is done.
1390 2 : let (_, params_raw) = query_string.split_at("import wal ".len());
1391 2 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1392 2 : if params.len() != 4 {
1393 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1394 0 : "invalid param number for import wal command"
1395 0 : )));
1396 CBC 2 : }
1397 2 : let tenant_id = TenantId::from_str(params[0])
1398 2 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1399 2 : let timeline_id = TimelineId::from_str(params[1])
1400 2 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1401 2 : let start_lsn = Lsn::from_str(params[2])
1402 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1403 2 : let end_lsn = Lsn::from_str(params[3])
1404 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1405 :
1406 2 : tracing::Span::current()
1407 2 : .record("tenant_id", field::display(tenant_id))
1408 2 : .record("timeline_id", field::display(timeline_id));
1409 2 :
1410 2 : self.check_permission(Some(tenant_id))?;
1411 :
1412 2 : match self
1413 2 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1414 3781 : .await
1415 : {
1416 2 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1417 UBC 0 : Err(e) => {
1418 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1419 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1420 0 : &e.to_string(),
1421 0 : Some(e.pg_error_code()),
1422 0 : ))?
1423 : }
1424 : };
1425 CBC 25 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1426 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1427 : // on connect
1428 20 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1429 5 : } else if query_string.starts_with("show ") {
1430 : // show <tenant_id>
1431 5 : let (_, params_raw) = query_string.split_at("show ".len());
1432 5 : let params = params_raw.split(' ').collect::<Vec<_>>();
1433 5 : if params.len() != 1 {
1434 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1435 0 : "invalid param number for config command"
1436 0 : )));
1437 CBC 5 : }
1438 5 : let tenant_id = TenantId::from_str(params[0])
1439 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1440 :
1441 5 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1442 5 :
1443 5 : self.check_permission(Some(tenant_id))?;
1444 :
1445 5 : let tenant = get_active_tenant_with_timeout(
1446 5 : tenant_id,
1447 5 : ShardSelector::Zero,
1448 5 : ACTIVE_TENANT_TIMEOUT,
1449 5 : &task_mgr::shutdown_token(),
1450 5 : )
1451 UBC 0 : .await?;
1452 CBC 5 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1453 5 : RowDescriptor::int8_col(b"checkpoint_distance"),
1454 5 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1455 5 : RowDescriptor::int8_col(b"compaction_target_size"),
1456 5 : RowDescriptor::int8_col(b"compaction_period"),
1457 5 : RowDescriptor::int8_col(b"compaction_threshold"),
1458 5 : RowDescriptor::int8_col(b"gc_horizon"),
1459 5 : RowDescriptor::int8_col(b"gc_period"),
1460 5 : RowDescriptor::int8_col(b"image_creation_threshold"),
1461 5 : RowDescriptor::int8_col(b"pitr_interval"),
1462 5 : ]))?
1463 5 : .write_message_noflush(&BeMessage::DataRow(&[
1464 5 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1465 5 : Some(
1466 5 : tenant
1467 5 : .get_checkpoint_timeout()
1468 5 : .as_secs()
1469 5 : .to_string()
1470 5 : .as_bytes(),
1471 5 : ),
1472 5 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1473 5 : Some(
1474 5 : tenant
1475 5 : .get_compaction_period()
1476 5 : .as_secs()
1477 5 : .to_string()
1478 5 : .as_bytes(),
1479 5 : ),
1480 5 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1481 5 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1482 5 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1483 5 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1484 5 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1485 5 : ]))?
1486 5 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1487 : } else {
1488 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1489 0 : "unknown command {query_string}"
1490 0 : )));
1491 : }
1492 :
1493 CBC 7294 : Ok(())
1494 16140 : }
1495 : }
1496 :
1497 : impl From<GetActiveTenantError> for QueryError {
1498 514 : fn from(e: GetActiveTenantError) -> Self {
1499 UBC 0 : match e {
1500 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1501 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1502 0 : ),
1503 : GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1504 0 : QueryError::Shutdown
1505 : }
1506 CBC 514 : e => QueryError::Other(anyhow::anyhow!(e)),
1507 : }
1508 514 : }
1509 : }
1510 :
1511 3 : #[derive(Debug, thiserror::Error)]
1512 : enum GetActiveTimelineError {
1513 : #[error(transparent)]
1514 : Tenant(GetActiveTenantError),
1515 : #[error(transparent)]
1516 : Timeline(#[from] GetTimelineError),
1517 : }
1518 :
1519 : impl From<GetActiveTimelineError> for QueryError {
1520 UBC 0 : fn from(e: GetActiveTimelineError) -> Self {
1521 0 : match e {
1522 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1523 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1524 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1525 : }
1526 0 : }
1527 : }
|