Line data Source code
1 : //
2 : //! The Page Service listens for client connections and serves their GetPage@LSN
3 : //! requests.
4 : //
5 : // It is possible to connect here using usual psql/pgbench/libpq. Following
6 : // commands are supported now:
7 : // *status* -- show actual info about this pageserver,
8 : // *pagestream* -- enter mode where smgr and pageserver talk with their
9 : // custom protocol.
10 : //
11 :
12 : use anyhow::Context;
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use bytes::Bytes;
16 : use futures::stream::FuturesUnordered;
17 : use futures::Stream;
18 : use futures::StreamExt;
19 : use pageserver_api::key::Key;
20 : use pageserver_api::models::TenantState;
21 : use pageserver_api::models::{
22 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
23 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
24 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
25 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
26 : PagestreamNblocksResponse,
27 : };
28 : use pageserver_api::shard::ShardIndex;
29 : use pageserver_api::shard::{ShardCount, ShardNumber};
30 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
31 : use pq_proto::framed::ConnectionError;
32 : use pq_proto::FeStartupPacket;
33 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
34 : use std::borrow::Cow;
35 : use std::collections::HashMap;
36 : use std::io;
37 : use std::net::TcpListener;
38 : use std::pin::pin;
39 : use std::str;
40 : use std::str::FromStr;
41 : use std::sync::Arc;
42 : use std::time::Duration;
43 : use tokio::io::AsyncWriteExt;
44 : use tokio::io::{AsyncRead, AsyncWrite};
45 : use tokio_util::io::StreamReader;
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::field;
48 : use tracing::*;
49 : use utils::id::ConnectionId;
50 : use utils::sync::gate::GateGuard;
51 : use utils::{
52 : auth::{Claims, Scope, SwappableJwtAuth},
53 : id::{TenantId, TimelineId},
54 : lsn::Lsn,
55 : simple_rcu::RcuReadGuard,
56 : };
57 :
58 : use crate::auth::check_permission;
59 : use crate::basebackup;
60 : use crate::config::PageServerConf;
61 : use crate::context::{DownloadBehavior, RequestContext};
62 : use crate::import_datadir::import_wal_from_tar;
63 : use crate::metrics;
64 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
65 : use crate::pgdatadir_mapping::Version;
66 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
67 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
68 : use crate::task_mgr;
69 : use crate::task_mgr::TaskKind;
70 : use crate::tenant::mgr;
71 : use crate::tenant::mgr::get_active_tenant_with_timeout;
72 : use crate::tenant::mgr::GetActiveTenantError;
73 : use crate::tenant::mgr::ShardSelector;
74 : use crate::tenant::timeline::WaitLsnError;
75 : use crate::tenant::GetTimelineError;
76 : use crate::tenant::PageReconstructError;
77 : use crate::tenant::Timeline;
78 : use crate::trace::Tracer;
79 : use pageserver_api::key::rel_block_to_key;
80 : use pageserver_api::reltag::SlruKind;
81 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
82 : use postgres_ffi::BLCKSZ;
83 :
84 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
85 : // is not yet in state [`TenantState::Active`].
86 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
87 :
88 : /// Read the end of a tar archive.
89 : ///
90 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
91 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
92 : /// and check that there is no more data after the EOF marker.
93 : ///
94 : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
95 : /// Perhaps it should be a hard error?
96 13 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
97 13 : use tokio::io::AsyncReadExt;
98 13 : let mut buf = [0u8; 512];
99 13 :
100 13 : // Read the all-zeros block, and verify it
101 13 : let mut total_bytes = 0;
102 26 : while total_bytes < 512 {
103 13 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
104 13 : total_bytes += nbytes;
105 13 : if nbytes == 0 {
106 0 : break;
107 13 : }
108 : }
109 13 : if total_bytes < 512 {
110 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
111 13 : }
112 6656 : if !buf.iter().all(|&x| x == 0) {
113 0 : anyhow::bail!("invalid tar EOF marker");
114 13 : }
115 13 :
116 13 : // Drain any data after the EOF marker
117 13 : let mut trailing_bytes = 0;
118 : loop {
119 86 : let nbytes = reader.read(&mut buf).await?;
120 86 : trailing_bytes += nbytes;
121 86 : if nbytes == 0 {
122 13 : break;
123 73 : }
124 : }
125 13 : if trailing_bytes > 0 {
126 7 : warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
127 6 : }
128 13 : Ok(())
129 13 : }
130 :
131 : ///////////////////////////////////////////////////////////////////////////////
132 :
133 : ///
134 : /// Main loop of the page service.
135 : ///
136 : /// Listens for connections, and launches a new handler task for each.
137 : ///
138 604 : pub async fn libpq_listener_main(
139 604 : conf: &'static PageServerConf,
140 604 : broker_client: storage_broker::BrokerClientChannel,
141 604 : auth: Option<Arc<SwappableJwtAuth>>,
142 604 : listener: TcpListener,
143 604 : auth_type: AuthType,
144 604 : listener_ctx: RequestContext,
145 604 : cancel: CancellationToken,
146 604 : ) -> anyhow::Result<()> {
147 604 : listener.set_nonblocking(true)?;
148 604 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
149 :
150 : // Wait for a new connection to arrive, or for server shutdown.
151 11323 : while let Some(res) = tokio::select! {
152 : biased;
153 :
154 : _ = cancel.cancelled() => {
155 : // We were requested to shut down.
156 : None
157 : }
158 :
159 10719 : res = tokio_listener.accept() => {
160 : Some(res)
161 : }
162 : } {
163 10719 : match res {
164 10719 : Ok((socket, peer_addr)) => {
165 : // Connection established. Spawn a new task to handle it.
166 0 : debug!("accepted connection from {}", peer_addr);
167 10719 : let local_auth = auth.clone();
168 10719 :
169 10719 : let connection_ctx = listener_ctx
170 10719 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
171 10719 :
172 10719 : // PageRequestHandler tasks are not associated with any particular
173 10719 : // timeline in the task manager. In practice most connections will
174 10719 : // only deal with a particular timeline, but we don't know which one
175 10719 : // yet.
176 10719 : task_mgr::spawn(
177 10719 : &tokio::runtime::Handle::current(),
178 10719 : TaskKind::PageRequestHandler,
179 10719 : None,
180 10719 : None,
181 10719 : "serving compute connection task",
182 10719 : false,
183 10719 : page_service_conn_main(
184 10719 : conf,
185 10719 : broker_client.clone(),
186 10719 : local_auth,
187 10719 : socket,
188 10719 : auth_type,
189 10719 : connection_ctx,
190 10719 : ),
191 10719 : );
192 : }
193 0 : Err(err) => {
194 : // accept() failed. Log the error, and loop back to retry on next connection.
195 0 : error!("accept() failed: {:?}", err);
196 : }
197 : }
198 : }
199 :
200 0 : debug!("page_service loop terminated");
201 :
202 172 : Ok(())
203 172 : }
204 :
205 0 : #[instrument(skip_all, fields(peer_addr))]
206 : async fn page_service_conn_main(
207 : conf: &'static PageServerConf,
208 : broker_client: storage_broker::BrokerClientChannel,
209 : auth: Option<Arc<SwappableJwtAuth>>,
210 : socket: tokio::net::TcpStream,
211 : auth_type: AuthType,
212 : connection_ctx: RequestContext,
213 : ) -> anyhow::Result<()> {
214 : // Immediately increment the gauge, then create a job to decrement it on task exit.
215 : // One of the pros of `defer!` is that this will *most probably*
216 : // get called, even in presence of panics.
217 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
218 : gauge.inc();
219 10705 : scopeguard::defer! {
220 10705 : gauge.dec();
221 10705 : }
222 :
223 : socket
224 : .set_nodelay(true)
225 : .context("could not set TCP_NODELAY")?;
226 :
227 : let peer_addr = socket.peer_addr().context("get peer address")?;
228 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
229 :
230 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
231 : // - long enough for most valid compute connections
232 : // - less than infinite to stop us from "leaking" connections to long-gone computes
233 : //
234 : // no write timeout is used, because the kernel is assumed to error writes after some time.
235 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
236 :
237 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
238 10719 : let socket_timeout_ms = (|| {
239 10719 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
240 : // Exponential distribution for simulating
241 : // poor network conditions, expect about avg_timeout_ms to be around 15
242 : // in tests
243 4 : if let Some(avg_timeout_ms) = avg_timeout_ms {
244 4 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
245 4 : let u = rand::random::<f32>();
246 4 : ((1.0 - u).ln() / (-avg)) as u64
247 : } else {
248 0 : default_timeout_ms
249 : }
250 10719 : });
251 10715 : default_timeout_ms
252 : })();
253 :
254 : // A timeout here does not mean the client died, it can happen if it's just idle for
255 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
256 : // they reconnect.
257 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
258 : let socket = std::pin::pin!(socket);
259 :
260 : // XXX: pgbackend.run() should take the connection_ctx,
261 : // and create a child per-query context when it invokes process_query.
262 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
263 : // and create the per-query context in process_query ourselves.
264 : let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
265 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
266 :
267 : match pgbackend
268 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
269 : .await
270 : {
271 : Ok(()) => {
272 : // we've been requested to shut down
273 : Ok(())
274 : }
275 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
276 : if is_expected_io_error(&io_error) {
277 0 : info!("Postgres client disconnected ({io_error})");
278 : Ok(())
279 : } else {
280 : Err(io_error).context("Postgres connection error")
281 : }
282 : }
283 : other => other.context("Postgres query error"),
284 : }
285 : }
286 :
287 : /// While a handler holds a reference to a Timeline, it also holds a the
288 : /// timeline's Gate open.
289 : struct HandlerTimeline {
290 : timeline: Arc<Timeline>,
291 : _guard: GateGuard,
292 : }
293 :
294 : struct PageServerHandler {
295 : _conf: &'static PageServerConf,
296 : broker_client: storage_broker::BrokerClientChannel,
297 : auth: Option<Arc<SwappableJwtAuth>>,
298 : claims: Option<Claims>,
299 :
300 : /// The context created for the lifetime of the connection
301 : /// services by this PageServerHandler.
302 : /// For each query received over the connection,
303 : /// `process_query` creates a child context from this one.
304 : connection_ctx: RequestContext,
305 :
306 : /// See [`Self::cache_timeline`] for usage.
307 : ///
308 : /// Note on size: the typical size of this map is 1. The largest size we expect
309 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
310 : /// or the ratio used when splitting shards (i.e. how many children created from one)
311 : /// parent shard, where a "large" number might be ~8.
312 : shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
313 : }
314 :
315 9 : #[derive(thiserror::Error, Debug)]
316 : enum PageStreamError {
317 : /// We encountered an error that should prompt the client to reconnect:
318 : /// in practice this means we drop the connection without sending a response.
319 : #[error("Reconnect required: {0}")]
320 : Reconnect(Cow<'static, str>),
321 :
322 : /// We were instructed to shutdown while processing the query
323 : #[error("Shutting down")]
324 : Shutdown,
325 :
326 : /// Something went wrong reading a page: this likely indicates a pageserver bug
327 : #[error("Read error")]
328 : Read(#[source] PageReconstructError),
329 :
330 : /// Ran out of time waiting for an LSN
331 : #[error("LSN timeout: {0}")]
332 : LsnTimeout(WaitLsnError),
333 :
334 : /// The entity required to serve the request (tenant or timeline) is not found,
335 : /// or is not found in a suitable state to serve a request.
336 : #[error("Not found: {0}")]
337 : NotFound(Cow<'static, str>),
338 :
339 : /// Request asked for something that doesn't make sense, like an invalid LSN
340 : #[error("Bad request: {0}")]
341 : BadRequest(Cow<'static, str>),
342 : }
343 :
344 : impl From<PageReconstructError> for PageStreamError {
345 6 : fn from(value: PageReconstructError) -> Self {
346 6 : match value {
347 6 : PageReconstructError::Cancelled => Self::Shutdown,
348 0 : e => Self::Read(e),
349 : }
350 6 : }
351 : }
352 :
353 : impl From<GetActiveTimelineError> for PageStreamError {
354 0 : fn from(value: GetActiveTimelineError) -> Self {
355 0 : match value {
356 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
357 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
358 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
359 : }
360 0 : }
361 : }
362 :
363 : impl From<WaitLsnError> for PageStreamError {
364 0 : fn from(value: WaitLsnError) -> Self {
365 0 : match value {
366 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
367 0 : WaitLsnError::Shutdown => Self::Shutdown,
368 0 : WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
369 : }
370 0 : }
371 : }
372 :
373 : impl From<WaitLsnError> for QueryError {
374 24 : fn from(value: WaitLsnError) -> Self {
375 24 : match value {
376 24 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
377 0 : WaitLsnError::Shutdown => Self::Shutdown,
378 0 : WaitLsnError::BadState => Self::Reconnect,
379 : }
380 24 : }
381 : }
382 :
383 : impl PageServerHandler {
384 10719 : pub fn new(
385 10719 : conf: &'static PageServerConf,
386 10719 : broker_client: storage_broker::BrokerClientChannel,
387 10719 : auth: Option<Arc<SwappableJwtAuth>>,
388 10719 : connection_ctx: RequestContext,
389 10719 : ) -> Self {
390 10719 : PageServerHandler {
391 10719 : _conf: conf,
392 10719 : broker_client,
393 10719 : auth,
394 10719 : claims: None,
395 10719 : connection_ctx,
396 10719 : shard_timelines: HashMap::new(),
397 10719 : }
398 10719 : }
399 :
400 : /// Future that completes when we need to shut down the connection.
401 : ///
402 : /// We currently need to shut down when any of the following happens:
403 : /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
404 : /// 2. task_mgr requests shutdown of the connection
405 : ///
406 : /// NB on (1): the connection's lifecycle is not actually tied to any of the
407 : /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
408 : /// implementation to be responsive to timeline cancellation because
409 : /// the connection holds their `GateGuards` open (sored in `shard_timelines`).
410 : /// We currently do the easy thing and terminate the connection if any of the
411 : /// shard_timelines gets cancelled. But really, we cuold spend more effort
412 : /// and simply remove the cancelled timeline from the `shard_timelines`, thereby
413 : /// dropping the guard.
414 : ///
415 : /// NB: keep in sync with [`Self::is_connection_cancelled`]
416 11257880 : async fn await_connection_cancelled(&self) {
417 7509870 : // A short wait before we expend the cycles to walk our timeline map. This avoids incurring
418 7509870 : // that cost every time we check for cancellation.
419 7509870 : tokio::time::sleep(Duration::from_millis(10)).await;
420 :
421 : // This function is never called concurrently with code that adds timelines to shard_timelines,
422 : // which is enforced by the borrow checker (the future returned by this function carries the
423 : // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
424 : // missing any inserts to the map.
425 :
426 51613 : let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
427 51613 : use futures::future::Either;
428 51613 : cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
429 51613 : cancellation_sources.extend(
430 51613 : self.shard_timelines
431 51613 : .values()
432 51613 : .map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
433 51613 : );
434 51613 : FuturesUnordered::from_iter(cancellation_sources)
435 51613 : .next()
436 97144 : .await;
437 177 : }
438 :
439 : /// Checking variant of [`Self::await_connection_cancelled`].
440 3 : fn is_connection_cancelled(&self) -> bool {
441 3 : task_mgr::is_shutdown_requested()
442 3 : || self
443 3 : .shard_timelines
444 3 : .values()
445 3 : .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
446 3 : }
447 :
448 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
449 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
450 : /// cancellation if there aren't any timelines in the cache.
451 : ///
452 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
453 : /// timeline cancellation token.
454 5629593 : async fn flush_cancellable<IO>(
455 5629593 : &self,
456 5629593 : pgb: &mut PostgresBackend<IO>,
457 5629593 : cancel: &CancellationToken,
458 5629593 : ) -> Result<(), QueryError>
459 5629593 : where
460 5629593 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
461 5629593 : {
462 5637198 : tokio::select!(
463 5629593 : flush_r = pgb.flush() => {
464 : Ok(flush_r?)
465 : },
466 : _ = self.await_connection_cancelled() => {
467 : Err(QueryError::Shutdown)
468 : }
469 : _ = cancel.cancelled() => {
470 : Err(QueryError::Shutdown)
471 : }
472 : )
473 5629593 : }
474 :
475 15 : fn copyin_stream<'a, IO>(
476 15 : &'a self,
477 15 : pgb: &'a mut PostgresBackend<IO>,
478 15 : cancel: &'a CancellationToken,
479 15 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
480 15 : where
481 15 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
482 15 : {
483 15 : async_stream::try_stream! {
484 : loop {
485 97020 : let msg = tokio::select! {
486 : biased;
487 :
488 : _ = cancel.cancelled() => {
489 : // We were requested to shut down.
490 : let msg = "pageserver is shutting down";
491 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
492 : Err(QueryError::Shutdown)
493 : }
494 :
495 97020 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
496 : };
497 :
498 97020 : match msg {
499 97020 : Ok(Some(message)) => {
500 97020 : let copy_data_bytes = match message {
501 96997 : FeMessage::CopyData(bytes) => bytes,
502 14 : FeMessage::CopyDone => { break },
503 9 : FeMessage::Sync => continue,
504 : FeMessage::Terminate => {
505 0 : let msg = "client terminated connection with Terminate message during COPY";
506 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
507 0 : // error can't happen here, ErrorResponse serialization should be always ok
508 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
509 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
510 0 : break;
511 : }
512 0 : m => {
513 0 : let msg = format!("unexpected message {m:?}");
514 0 : // error can't happen here, ErrorResponse serialization should be always ok
515 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
516 0 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
517 0 : break;
518 : }
519 : };
520 :
521 96997 : yield copy_data_bytes;
522 : }
523 : Ok(None) => {
524 0 : let msg = "client closed connection during COPY";
525 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
526 0 : // error can't happen here, ErrorResponse serialization should be always ok
527 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
528 0 : self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
529 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
530 : }
531 0 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
532 0 : Err(io_error)?;
533 : }
534 0 : Err(other) => {
535 0 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
536 : }
537 : };
538 : }
539 : }
540 15 : }
541 :
542 5628287 : #[instrument(skip_all)]
543 : async fn handle_pagerequests<IO>(
544 : &mut self,
545 : pgb: &mut PostgresBackend<IO>,
546 : tenant_id: TenantId,
547 : timeline_id: TimelineId,
548 : ctx: RequestContext,
549 : ) -> Result<(), QueryError>
550 : where
551 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
552 : {
553 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
554 :
555 : let tenant = mgr::get_active_tenant_with_timeout(
556 : tenant_id,
557 : ShardSelector::First,
558 : ACTIVE_TENANT_TIMEOUT,
559 : &task_mgr::shutdown_token(),
560 : )
561 : .await?;
562 :
563 : // Make request tracer if needed
564 : let mut tracer = if tenant.get_trace_read_requests() {
565 : let connection_id = ConnectionId::generate();
566 : let path =
567 : tenant
568 : .conf
569 : .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
570 : Some(Tracer::new(path))
571 : } else {
572 : None
573 : };
574 :
575 : // switch client to COPYBOTH
576 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
577 : self.flush_cancellable(pgb, &tenant.cancel).await?;
578 :
579 : loop {
580 10957102 : let msg = tokio::select! {
581 : biased;
582 :
583 : _ = self.await_connection_cancelled() => {
584 : // We were requested to shut down.
585 177 : info!("shutdown request received in page handler");
586 : return Err(QueryError::Shutdown)
587 : }
588 :
589 5628097 : msg = pgb.read_message() => { msg }
590 : };
591 :
592 : let copy_data_bytes = match msg? {
593 : Some(FeMessage::CopyData(bytes)) => bytes,
594 : Some(FeMessage::Terminate) => break,
595 : Some(m) => {
596 : return Err(QueryError::Other(anyhow::anyhow!(
597 : "unexpected message: {m:?} during COPY"
598 : )));
599 : }
600 : None => break, // client disconnected
601 : };
602 :
603 0 : trace!("query: {copy_data_bytes:?}");
604 :
605 : // Trace request if needed
606 : if let Some(t) = tracer.as_mut() {
607 : t.trace(©_data_bytes)
608 : }
609 :
610 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
611 :
612 : // TODO: We could create a new per-request context here, with unique ID.
613 : // Currently we use the same per-timeline context for all requests
614 :
615 : let (response, span) = match neon_fe_msg {
616 : PagestreamFeMessage::Exists(req) => {
617 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
618 : (
619 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
620 : .instrument(span.clone())
621 : .await,
622 : span,
623 : )
624 : }
625 : PagestreamFeMessage::Nblocks(req) => {
626 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
627 : (
628 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
629 : .instrument(span.clone())
630 : .await,
631 : span,
632 : )
633 : }
634 : PagestreamFeMessage::GetLatestPage(old_req) => {
635 : let req = PagestreamGetPageRequest {
636 : horizon: if old_req.latest {
637 : Lsn::MAX
638 : } else {
639 : old_req.lsn
640 : },
641 : lsn: old_req.lsn,
642 : rel: old_req.rel,
643 : blkno: old_req.blkno,
644 : };
645 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
646 : (
647 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
648 : .instrument(span.clone())
649 : .await,
650 : span,
651 : )
652 : }
653 : PagestreamFeMessage::GetPage(req) => {
654 : // shard_id is filled in by the handler
655 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
656 : (
657 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
658 : .instrument(span.clone())
659 : .await,
660 : span,
661 : )
662 : }
663 : PagestreamFeMessage::DbSize(req) => {
664 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
665 : (
666 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
667 : .instrument(span.clone())
668 : .await,
669 : span,
670 : )
671 : }
672 : PagestreamFeMessage::GetSlruSegment(req) => {
673 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
674 : (
675 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
676 : .instrument(span.clone())
677 : .await,
678 : span,
679 : )
680 : }
681 : };
682 :
683 : match response {
684 : Err(PageStreamError::Shutdown) => {
685 : // If we fail to fulfil a request during shutdown, which may be _because_ of
686 : // shutdown, then do not send the error to the client. Instead just drop the
687 : // connection.
688 6 : span.in_scope(|| info!("dropping connection due to shutdown"));
689 : return Err(QueryError::Shutdown);
690 : }
691 : Err(PageStreamError::Reconnect(reason)) => {
692 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
693 : return Err(QueryError::Reconnect);
694 : }
695 : Err(e) if self.is_connection_cancelled() => {
696 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
697 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
698 : //
699 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
700 : // because wait_lsn etc will drop out
701 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
702 : // is_canceled(): [`Timeline::shutdown`]` has entered
703 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
704 : return Err(QueryError::Shutdown);
705 : }
706 : r => {
707 3 : let response_msg = r.unwrap_or_else(|e| {
708 3 : // print the all details to the log with {:#}, but for the client the
709 3 : // error message is enough. Do not log if shutting down, as the anyhow::Error
710 3 : // here includes cancellation which is not an error.
711 3 : let full = utils::error::report_compact_sources(&e);
712 3 : span.in_scope(|| {
713 3 : error!("error reading relation or page version: {full:#}")
714 3 : });
715 3 : PagestreamBeMessage::Error(PagestreamErrorResponse {
716 3 : message: e.to_string(),
717 3 : })
718 3 : });
719 :
720 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
721 : self.flush_cancellable(pgb, &tenant.cancel).await?;
722 : }
723 : }
724 : }
725 : Ok(())
726 : }
727 :
728 : #[allow(clippy::too_many_arguments)]
729 12 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
730 : async fn handle_import_basebackup<IO>(
731 : &self,
732 : pgb: &mut PostgresBackend<IO>,
733 : tenant_id: TenantId,
734 : timeline_id: TimelineId,
735 : base_lsn: Lsn,
736 : _end_lsn: Lsn,
737 : pg_version: u32,
738 : ctx: RequestContext,
739 : ) -> Result<(), QueryError>
740 : where
741 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
742 : {
743 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
744 :
745 : // Create empty timeline
746 12 : info!("creating new timeline");
747 : let tenant = get_active_tenant_with_timeout(
748 : tenant_id,
749 : ShardSelector::Zero,
750 : ACTIVE_TENANT_TIMEOUT,
751 : &task_mgr::shutdown_token(),
752 : )
753 : .await?;
754 : let timeline = tenant
755 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
756 : .await?;
757 :
758 : // TODO mark timeline as not ready until it reaches end_lsn.
759 : // We might have some wal to import as well, and we should prevent compute
760 : // from connecting before that and writing conflicting wal.
761 : //
762 : // This is not relevant for pageserver->pageserver migrations, since there's
763 : // no wal to import. But should be fixed if we want to import from postgres.
764 :
765 : // TODO leave clean state on error. For now you can use detach to clean
766 : // up broken state from a failed import.
767 :
768 : // Import basebackup provided via CopyData
769 12 : info!("importing basebackup");
770 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
771 : self.flush_cancellable(pgb, &tenant.cancel).await?;
772 :
773 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
774 : timeline
775 : .import_basebackup_from_tar(
776 : &mut copyin_reader,
777 : base_lsn,
778 : self.broker_client.clone(),
779 : &ctx,
780 : )
781 : .await?;
782 :
783 : // Read the end of the tar archive.
784 : read_tar_eof(copyin_reader).await?;
785 :
786 : // TODO check checksum
787 : // Meanwhile you can verify client-side by taking fullbackup
788 : // and checking that it matches in size with what was imported.
789 : // It wouldn't work if base came from vanilla postgres though,
790 : // since we discard some log files.
791 :
792 10 : info!("done");
793 : Ok(())
794 : }
795 :
796 0 : #[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))]
797 : async fn handle_import_wal<IO>(
798 : &self,
799 : pgb: &mut PostgresBackend<IO>,
800 : tenant_id: TenantId,
801 : timeline_id: TimelineId,
802 : start_lsn: Lsn,
803 : end_lsn: Lsn,
804 : ctx: RequestContext,
805 : ) -> Result<(), QueryError>
806 : where
807 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
808 : {
809 : let timeline = self
810 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
811 : .await?;
812 : let last_record_lsn = timeline.get_last_record_lsn();
813 : if last_record_lsn != start_lsn {
814 : return Err(QueryError::Other(
815 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
816 : );
817 : }
818 :
819 : // TODO leave clean state on error. For now you can use detach to clean
820 : // up broken state from a failed import.
821 :
822 : // Import wal provided via CopyData
823 3 : info!("importing wal");
824 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
825 : self.flush_cancellable(pgb, &timeline.cancel).await?;
826 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
827 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
828 3 : info!("wal import complete");
829 :
830 : // Read the end of the tar archive.
831 : read_tar_eof(copyin_reader).await?;
832 :
833 : // TODO Does it make sense to overshoot?
834 : if timeline.get_last_record_lsn() < end_lsn {
835 : return Err(QueryError::Other(
836 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
837 : );
838 : }
839 :
840 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
841 : // We only want to persist the data, and it doesn't matter if it's in the
842 : // shape of deltas or images.
843 3 : info!("flushing layers");
844 : timeline.freeze_and_flush().await?;
845 :
846 3 : info!("done");
847 : Ok(())
848 : }
849 :
850 : /// Helper function to handle the LSN from client request.
851 : ///
852 : /// Each GetPage (and Exists and Nblocks) request includes information about
853 : /// which version of the page is being requested. The client can request the
854 : /// latest version of the page, or the version that's valid at a particular
855 : /// LSN. The primary compute node will always request the latest page
856 : /// version, while a standby will request a version at the LSN that it's
857 : /// currently caught up to.
858 : ///
859 : /// In either case, if the page server hasn't received the WAL up to the
860 : /// requested LSN yet, we will wait for it to arrive. The return value is
861 : /// the LSN that should be used to look up the page versions.
862 5618354 : async fn wait_or_get_last_lsn(
863 5618354 : timeline: &Timeline,
864 5618354 : lsn: Lsn,
865 5618354 : horizon: Lsn,
866 5618354 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
867 5618354 : ctx: &RequestContext,
868 5618354 : ) -> Result<Lsn, PageStreamError> {
869 5618354 : let last_record_lsn = timeline.get_last_record_lsn();
870 : // Horizon = 0 (INVALID) is treated as LSN interval degenerated to point [lsn,lsn].
871 : // It as done mostly for convenience (because such get_page commands are widely used in tests) and
872 : // also seems to be logical: Lsn::MAX moves upper boundary of LSN interval till last_record_lsn and
873 : // Lsn(0) moves upper boundary to lower boundary.
874 5618354 : let request_horizon = if horizon == Lsn::INVALID {
875 4 : lsn
876 : } else {
877 5618350 : horizon
878 : };
879 5618354 : let effective_lsn = Lsn::max(lsn, Lsn::min(request_horizon, last_record_lsn));
880 5618354 : if effective_lsn > last_record_lsn {
881 112396 : timeline.wait_lsn(effective_lsn, ctx).await?;
882 : // Since we waited for 'lsn' to arrive, that is now the last
883 : // record LSN. (Or close enough for our purposes; the
884 : // last-record LSN can advance immediately after we return
885 : // anyway)
886 5559069 : } else if effective_lsn == Lsn(0) {
887 1 : return Err(PageStreamError::BadRequest(
888 1 : "invalid LSN(0) in request".into(),
889 1 : ));
890 5559068 : }
891 :
892 5618352 : if effective_lsn < **latest_gc_cutoff_lsn {
893 2 : return Err(PageStreamError::BadRequest(format!(
894 2 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
895 2 : effective_lsn, **latest_gc_cutoff_lsn
896 2 : ).into()));
897 5618350 : }
898 5618350 : Ok(effective_lsn)
899 5618353 : }
900 :
901 56246 : #[instrument(skip_all, fields(shard_id))]
902 : async fn handle_get_rel_exists_request(
903 : &mut self,
904 : tenant_id: TenantId,
905 : timeline_id: TimelineId,
906 : req: &PagestreamExistsRequest,
907 : ctx: &RequestContext,
908 : ) -> Result<PagestreamBeMessage, PageStreamError> {
909 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
910 : let _timer = timeline
911 : .query_metrics
912 : .start_timer(metrics::SmgrQueryType::GetRelExists);
913 :
914 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
915 : let lsn =
916 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
917 : .await?;
918 :
919 : let exists = timeline
920 : .get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx)
921 : .await?;
922 :
923 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
924 : exists,
925 : }))
926 : }
927 :
928 19792 : #[instrument(skip_all, fields(shard_id))]
929 : async fn handle_get_nblocks_request(
930 : &mut self,
931 : tenant_id: TenantId,
932 : timeline_id: TimelineId,
933 : req: &PagestreamNblocksRequest,
934 : ctx: &RequestContext,
935 : ) -> Result<PagestreamBeMessage, PageStreamError> {
936 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
937 :
938 : let _timer = timeline
939 : .query_metrics
940 : .start_timer(metrics::SmgrQueryType::GetRelSize);
941 :
942 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
943 : let lsn =
944 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
945 : .await?;
946 :
947 : let n_blocks = timeline
948 : .get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx)
949 : .await?;
950 :
951 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
952 : n_blocks,
953 : }))
954 : }
955 :
956 5 : #[instrument(skip_all, fields(shard_id))]
957 : async fn handle_db_size_request(
958 : &mut self,
959 : tenant_id: TenantId,
960 : timeline_id: TimelineId,
961 : req: &PagestreamDbSizeRequest,
962 : ctx: &RequestContext,
963 : ) -> Result<PagestreamBeMessage, PageStreamError> {
964 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
965 :
966 : let _timer = timeline
967 : .query_metrics
968 : .start_timer(metrics::SmgrQueryType::GetDbSize);
969 :
970 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
971 : let lsn =
972 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
973 : .await?;
974 :
975 : let total_blocks = timeline
976 : .get_db_size(
977 : DEFAULTTABLESPACE_OID,
978 : req.dbnode,
979 : Version::Lsn(lsn),
980 : req.horizon,
981 : ctx,
982 : )
983 : .await?;
984 : let db_size = total_blocks as i64 * BLCKSZ as i64;
985 :
986 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
987 : db_size,
988 : }))
989 : }
990 :
991 : /// For most getpage requests, we will already have a Timeline to serve the request: this function
992 : /// looks up such a Timeline synchronously and without touching any global state.
993 5542311 : fn get_cached_timeline_for_page(
994 5542311 : &mut self,
995 5542311 : req: &PagestreamGetPageRequest,
996 5542311 : ) -> Result<&Arc<Timeline>, Key> {
997 5542311 : let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
998 : // Fastest path: single sharded case
999 5534486 : if first_idx.shard_count < ShardCount(2) {
1000 4998252 : return Ok(&first_timeline.timeline);
1001 536234 : }
1002 536234 :
1003 536234 : let key = rel_block_to_key(req.rel, req.blkno);
1004 536234 : let shard_num = first_timeline
1005 536234 : .timeline
1006 536234 : .get_shard_identity()
1007 536234 : .get_shard_number(&key);
1008 536234 :
1009 536234 : // Fast path: matched the first timeline in our local handler map. This case is common if
1010 536234 : // only one shard per tenant is attached to this pageserver.
1011 536234 : if first_timeline.timeline.get_shard_identity().number == shard_num {
1012 536234 : return Ok(&first_timeline.timeline);
1013 0 : }
1014 0 :
1015 0 : let shard_index = ShardIndex {
1016 0 : shard_number: shard_num,
1017 0 : shard_count: first_timeline.timeline.get_shard_identity().count,
1018 0 : };
1019 :
1020 : // Fast-ish path: timeline is in the connection handler's local cache
1021 0 : if let Some(found) = self.shard_timelines.get(&shard_index) {
1022 0 : return Ok(&found.timeline);
1023 0 : }
1024 0 :
1025 0 : key
1026 : } else {
1027 7825 : rel_block_to_key(req.rel, req.blkno)
1028 : };
1029 :
1030 7825 : Err(key)
1031 5542311 : }
1032 :
1033 : /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
1034 : /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
1035 : /// again.
1036 : ///
1037 : /// Note that all the Timelines in this cache are for the same timeline_id: they're differ
1038 : /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
1039 : /// based on key.
1040 : ///
1041 : /// The typical size of this cache is 1, as we generally create shards to distribute work
1042 : /// across pageservers, so don't tend to have multiple shards for the same tenant on the
1043 : /// same pageserver.
1044 9985 : fn cache_timeline(
1045 9985 : &mut self,
1046 9985 : timeline: Arc<Timeline>,
1047 9985 : ) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
1048 9985 : let gate_guard = timeline
1049 9985 : .gate
1050 9985 : .enter()
1051 9985 : .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
1052 :
1053 9985 : let shard_index = timeline.tenant_shard_id.to_index();
1054 9985 : let entry = self
1055 9985 : .shard_timelines
1056 9985 : .entry(shard_index)
1057 9985 : .or_insert(HandlerTimeline {
1058 9985 : timeline,
1059 9985 : _guard: gate_guard,
1060 9985 : });
1061 9985 :
1062 9985 : Ok(&entry.timeline)
1063 9985 : }
1064 :
1065 : /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
1066 : /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
1067 : /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
1068 7825 : async fn load_timeline_for_page(
1069 7825 : &mut self,
1070 7825 : tenant_id: TenantId,
1071 7825 : timeline_id: TimelineId,
1072 7825 : key: Key,
1073 7825 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1074 : // Slow path: we must call out to the TenantManager to find the timeline for this Key
1075 7825 : let timeline = self
1076 7825 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
1077 0 : .await?;
1078 :
1079 7825 : self.cache_timeline(timeline)
1080 7825 : }
1081 :
1082 76043 : async fn get_timeline_shard_zero(
1083 76043 : &mut self,
1084 76043 : tenant_id: TenantId,
1085 76043 : timeline_id: TimelineId,
1086 76043 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1087 : // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
1088 : // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
1089 : // ref to salf. So instead, we first build a bool, and then return while not borrowing self.
1090 76043 : let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
1091 73883 : idx.shard_number == ShardNumber(0)
1092 : } else {
1093 2160 : false
1094 : };
1095 :
1096 76043 : if have_cached {
1097 73883 : let entry = self.shard_timelines.iter().next().unwrap();
1098 73883 : Ok(&entry.1.timeline)
1099 : } else {
1100 2160 : let timeline = self
1101 2160 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1102 0 : .await?;
1103 2160 : Ok(self.cache_timeline(timeline)?)
1104 : }
1105 76043 : }
1106 :
1107 5542311 : #[instrument(skip_all, fields(shard_id))]
1108 : async fn handle_get_page_at_lsn_request(
1109 : &mut self,
1110 : tenant_id: TenantId,
1111 : timeline_id: TimelineId,
1112 : req: &PagestreamGetPageRequest,
1113 : ctx: &RequestContext,
1114 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1115 : let timeline = match self.get_cached_timeline_for_page(req) {
1116 : Ok(tl) => tl,
1117 : Err(key) => {
1118 : match self
1119 : .load_timeline_for_page(tenant_id, timeline_id, key)
1120 : .await
1121 : {
1122 : Ok(t) => t,
1123 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
1124 : // We already know this tenant exists in general, because we resolved it at
1125 : // start of connection. Getting a NotFound here indicates that the shard containing
1126 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1127 : // mapping is out of date.
1128 : //
1129 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1130 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1131 : // and talk to a different pageserver.
1132 : return Err(PageStreamError::Reconnect(
1133 : "getpage@lsn request routed to wrong shard".into(),
1134 : ));
1135 : }
1136 : Err(e) => return Err(e.into()),
1137 : }
1138 : }
1139 : };
1140 :
1141 : // load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
1142 : set_tracing_field_shard_id(timeline);
1143 :
1144 : let _timer = timeline
1145 : .query_metrics
1146 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn);
1147 :
1148 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1149 : let lsn =
1150 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
1151 : .await?;
1152 :
1153 : let page = timeline
1154 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx)
1155 : .await?;
1156 :
1157 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1158 : page,
1159 : }))
1160 : }
1161 :
1162 0 : #[instrument(skip_all, fields(shard_id))]
1163 : async fn handle_get_slru_segment_request(
1164 : &mut self,
1165 : tenant_id: TenantId,
1166 : timeline_id: TimelineId,
1167 : req: &PagestreamGetSlruSegmentRequest,
1168 : ctx: &RequestContext,
1169 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1170 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1171 :
1172 : let _timer = timeline
1173 : .query_metrics
1174 : .start_timer(metrics::SmgrQueryType::GetSlruSegment);
1175 :
1176 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1177 : let lsn =
1178 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
1179 : .await?;
1180 :
1181 : let kind = SlruKind::from_repr(req.kind)
1182 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1183 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1184 :
1185 : Ok(PagestreamBeMessage::GetSlruSegment(
1186 : PagestreamGetSlruSegmentResponse { segment },
1187 : ))
1188 : }
1189 :
1190 : #[allow(clippy::too_many_arguments)]
1191 220 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1192 : async fn handle_basebackup_request<IO>(
1193 : &mut self,
1194 : pgb: &mut PostgresBackend<IO>,
1195 : tenant_id: TenantId,
1196 : timeline_id: TimelineId,
1197 : lsn: Option<Lsn>,
1198 : prev_lsn: Option<Lsn>,
1199 : full_backup: bool,
1200 : gzip: bool,
1201 : ctx: RequestContext,
1202 : ) -> Result<(), QueryError>
1203 : where
1204 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1205 : {
1206 : let started = std::time::Instant::now();
1207 :
1208 : // check that the timeline exists
1209 : let timeline = self
1210 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1211 : .await?;
1212 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1213 : if let Some(lsn) = lsn {
1214 : // Backup was requested at a particular LSN. Wait for it to arrive.
1215 220 : info!("waiting for {}", lsn);
1216 : timeline.wait_lsn(lsn, &ctx).await?;
1217 : timeline
1218 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1219 : .context("invalid basebackup lsn")?;
1220 : }
1221 :
1222 : let lsn_awaited_after = started.elapsed();
1223 :
1224 : // switch client to COPYOUT
1225 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
1226 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1227 :
1228 : // Send a tarball of the latest layer on the timeline. Compress if not
1229 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1230 : if full_backup {
1231 : let mut writer = pgb.copyout_writer();
1232 : basebackup::send_basebackup_tarball(
1233 : &mut writer,
1234 : &timeline,
1235 : lsn,
1236 : prev_lsn,
1237 : full_backup,
1238 : &ctx,
1239 : )
1240 : .await?;
1241 : } else {
1242 : let mut writer = pgb.copyout_writer();
1243 : if gzip {
1244 : let mut encoder = GzipEncoder::with_quality(
1245 : writer,
1246 : // NOTE using fast compression because it's on the critical path
1247 : // for compute startup. For an empty database, we get
1248 : // <100KB with this method. The Level::Best compression method
1249 : // gives us <20KB, but maybe we should add basebackup caching
1250 : // on compute shutdown first.
1251 : async_compression::Level::Fastest,
1252 : );
1253 : basebackup::send_basebackup_tarball(
1254 : &mut encoder,
1255 : &timeline,
1256 : lsn,
1257 : prev_lsn,
1258 : full_backup,
1259 : &ctx,
1260 : )
1261 : .await?;
1262 : // shutdown the encoder to ensure the gzip footer is written
1263 : encoder.shutdown().await?;
1264 : } else {
1265 : basebackup::send_basebackup_tarball(
1266 : &mut writer,
1267 : &timeline,
1268 : lsn,
1269 : prev_lsn,
1270 : full_backup,
1271 : &ctx,
1272 : )
1273 : .await?;
1274 : }
1275 : }
1276 :
1277 : pgb.write_message_noflush(&BeMessage::CopyDone)?;
1278 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1279 :
1280 : let basebackup_after = started
1281 : .elapsed()
1282 : .checked_sub(lsn_awaited_after)
1283 : .unwrap_or(Duration::ZERO);
1284 :
1285 597 : info!(
1286 597 : lsn_await_millis = lsn_awaited_after.as_millis(),
1287 597 : basebackup_millis = basebackup_after.as_millis(),
1288 597 : "basebackup complete"
1289 597 : );
1290 :
1291 : Ok(())
1292 : }
1293 :
1294 : // when accessing management api supply None as an argument
1295 : // when using to authorize tenant pass corresponding tenant id
1296 10713 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1297 10713 : if self.auth.is_none() {
1298 : // auth is set to Trust, nothing to check so just return ok
1299 10630 : return Ok(());
1300 83 : }
1301 83 : // auth is some, just checked above, when auth is some
1302 83 : // then claims are always present because of checks during connection init
1303 83 : // so this expect won't trigger
1304 83 : let claims = self
1305 83 : .claims
1306 83 : .as_ref()
1307 83 : .expect("claims presence already checked");
1308 83 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1309 10713 : }
1310 :
1311 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1312 10642 : async fn get_active_tenant_timeline(
1313 10642 : &self,
1314 10642 : tenant_id: TenantId,
1315 10642 : timeline_id: TimelineId,
1316 10642 : selector: ShardSelector,
1317 10642 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1318 10642 : let tenant = get_active_tenant_with_timeout(
1319 10642 : tenant_id,
1320 10642 : selector,
1321 10642 : ACTIVE_TENANT_TIMEOUT,
1322 10642 : &task_mgr::shutdown_token(),
1323 10642 : )
1324 3 : .await
1325 10642 : .map_err(GetActiveTimelineError::Tenant)?;
1326 10642 : let timeline = tenant.get_timeline(timeline_id, true)?;
1327 10636 : set_tracing_field_shard_id(&timeline);
1328 10636 : Ok(timeline)
1329 10642 : }
1330 : }
1331 :
1332 : #[async_trait::async_trait]
1333 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1334 : where
1335 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1336 : {
1337 85 : fn check_auth_jwt(
1338 85 : &mut self,
1339 85 : _pgb: &mut PostgresBackend<IO>,
1340 85 : jwt_response: &[u8],
1341 85 : ) -> Result<(), QueryError> {
1342 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1343 : // which requires auth to be present
1344 85 : let data = self
1345 85 : .auth
1346 85 : .as_ref()
1347 85 : .unwrap()
1348 85 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1349 85 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1350 :
1351 85 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1352 0 : return Err(QueryError::Unauthorized(
1353 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1354 0 : ));
1355 85 : }
1356 85 :
1357 85 : debug!(
1358 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1359 0 : data.claims.scope, data.claims.tenant_id,
1360 0 : );
1361 :
1362 85 : self.claims = Some(data.claims);
1363 85 : Ok(())
1364 85 : }
1365 :
1366 10719 : fn startup(
1367 10719 : &mut self,
1368 10719 : _pgb: &mut PostgresBackend<IO>,
1369 10719 : _sm: &FeStartupPacket,
1370 10719 : ) -> Result<(), QueryError> {
1371 10719 : Ok(())
1372 10719 : }
1373 :
1374 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1375 : async fn process_query(
1376 : &mut self,
1377 : pgb: &mut PostgresBackend<IO>,
1378 : query_string: &str,
1379 10735 : ) -> Result<(), QueryError> {
1380 10735 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1381 2 : info!("Hit failpoint for bad connection");
1382 2 : Err(QueryError::SimulatedConnectionError)
1383 10735 : });
1384 :
1385 10733 : let ctx = self.connection_ctx.attached_child();
1386 10733 : debug!("process query {query_string:?}");
1387 10733 : if query_string.starts_with("pagestream ") {
1388 10037 : let (_, params_raw) = query_string.split_at("pagestream ".len());
1389 10037 : let params = params_raw.split(' ').collect::<Vec<_>>();
1390 10037 : if params.len() != 2 {
1391 0 : return Err(QueryError::Other(anyhow::anyhow!(
1392 0 : "invalid param number for pagestream command"
1393 0 : )));
1394 10037 : }
1395 10037 : let tenant_id = TenantId::from_str(params[0])
1396 10037 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1397 10037 : let timeline_id = TimelineId::from_str(params[1])
1398 10037 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1399 :
1400 10037 : tracing::Span::current()
1401 10037 : .record("tenant_id", field::display(tenant_id))
1402 10037 : .record("timeline_id", field::display(timeline_id));
1403 10037 :
1404 10037 : self.check_permission(Some(tenant_id))?;
1405 :
1406 10037 : self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
1407 7236081 : .await?;
1408 696 : } else if query_string.starts_with("basebackup ") {
1409 630 : let (_, params_raw) = query_string.split_at("basebackup ".len());
1410 630 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1411 630 :
1412 630 : if params.len() < 2 {
1413 0 : return Err(QueryError::Other(anyhow::anyhow!(
1414 0 : "invalid param number for basebackup command"
1415 0 : )));
1416 630 : }
1417 :
1418 630 : let tenant_id = TenantId::from_str(params[0])
1419 630 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1420 630 : let timeline_id = TimelineId::from_str(params[1])
1421 630 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1422 :
1423 630 : tracing::Span::current()
1424 630 : .record("tenant_id", field::display(tenant_id))
1425 630 : .record("timeline_id", field::display(timeline_id));
1426 630 :
1427 630 : self.check_permission(Some(tenant_id))?;
1428 :
1429 630 : let lsn = if params.len() >= 3 {
1430 : Some(
1431 205 : Lsn::from_str(params[2])
1432 205 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1433 : )
1434 : } else {
1435 425 : None
1436 : };
1437 :
1438 630 : let gzip = if params.len() >= 4 {
1439 205 : if params[3] == "--gzip" {
1440 205 : true
1441 : } else {
1442 0 : return Err(QueryError::Other(anyhow::anyhow!(
1443 0 : "Parameter in position 3 unknown {}",
1444 0 : params[3],
1445 0 : )));
1446 : }
1447 : } else {
1448 425 : false
1449 : };
1450 :
1451 630 : ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
1452 630 : &*metrics::BASEBACKUP_QUERY_TIME,
1453 630 : async move {
1454 630 : self.handle_basebackup_request(
1455 630 : pgb,
1456 630 : tenant_id,
1457 630 : timeline_id,
1458 630 : lsn,
1459 630 : None,
1460 630 : false,
1461 630 : gzip,
1462 630 : ctx,
1463 630 : )
1464 46070 : .await?;
1465 582 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1466 582 : Result::<(), QueryError>::Ok(())
1467 630 : },
1468 630 : )
1469 46070 : .await?;
1470 : }
1471 : // return pair of prev_lsn and last_lsn
1472 66 : else if query_string.starts_with("get_last_record_rlsn ") {
1473 11 : let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
1474 11 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1475 11 :
1476 11 : if params.len() != 2 {
1477 0 : return Err(QueryError::Other(anyhow::anyhow!(
1478 0 : "invalid param number for get_last_record_rlsn command"
1479 0 : )));
1480 11 : }
1481 :
1482 11 : let tenant_id = TenantId::from_str(params[0])
1483 11 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1484 11 : let timeline_id = TimelineId::from_str(params[1])
1485 11 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1486 :
1487 11 : tracing::Span::current()
1488 11 : .record("tenant_id", field::display(tenant_id))
1489 11 : .record("timeline_id", field::display(timeline_id));
1490 11 :
1491 11 : self.check_permission(Some(tenant_id))?;
1492 9 : async {
1493 9 : let timeline = self
1494 9 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1495 0 : .await?;
1496 :
1497 9 : let end_of_timeline = timeline.get_last_record_rlsn();
1498 9 :
1499 9 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1500 9 : RowDescriptor::text_col(b"prev_lsn"),
1501 9 : RowDescriptor::text_col(b"last_lsn"),
1502 9 : ]))?
1503 9 : .write_message_noflush(&BeMessage::DataRow(&[
1504 9 : Some(end_of_timeline.prev.to_string().as_bytes()),
1505 9 : Some(end_of_timeline.last.to_string().as_bytes()),
1506 9 : ]))?
1507 9 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1508 9 : anyhow::Ok(())
1509 9 : }
1510 9 : .instrument(info_span!(
1511 9 : "handle_get_last_record_lsn",
1512 9 : shard_id = tracing::field::Empty
1513 9 : ))
1514 0 : .await?;
1515 : }
1516 : // same as basebackup, but result includes relational data as well
1517 55 : else if query_string.starts_with("fullbackup ") {
1518 15 : let (_, params_raw) = query_string.split_at("fullbackup ".len());
1519 15 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1520 15 :
1521 15 : if params.len() < 2 {
1522 0 : return Err(QueryError::Other(anyhow::anyhow!(
1523 0 : "invalid param number for fullbackup command"
1524 0 : )));
1525 15 : }
1526 :
1527 15 : let tenant_id = TenantId::from_str(params[0])
1528 15 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1529 15 : let timeline_id = TimelineId::from_str(params[1])
1530 15 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1531 :
1532 15 : tracing::Span::current()
1533 15 : .record("tenant_id", field::display(tenant_id))
1534 15 : .record("timeline_id", field::display(timeline_id));
1535 :
1536 : // The caller is responsible for providing correct lsn and prev_lsn.
1537 15 : let lsn = if params.len() > 2 {
1538 : Some(
1539 15 : Lsn::from_str(params[2])
1540 15 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1541 : )
1542 : } else {
1543 0 : None
1544 : };
1545 15 : let prev_lsn = if params.len() > 3 {
1546 : Some(
1547 12 : Lsn::from_str(params[3])
1548 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
1549 : )
1550 : } else {
1551 3 : None
1552 : };
1553 :
1554 15 : self.check_permission(Some(tenant_id))?;
1555 :
1556 : // Check that the timeline exists
1557 15 : self.handle_basebackup_request(
1558 15 : pgb,
1559 15 : tenant_id,
1560 15 : timeline_id,
1561 15 : lsn,
1562 15 : prev_lsn,
1563 15 : true,
1564 15 : false,
1565 15 : ctx,
1566 15 : )
1567 8802 : .await?;
1568 15 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1569 40 : } else if query_string.starts_with("import basebackup ") {
1570 : // Import the `base` section (everything but the wal) of a basebackup.
1571 : // Assumes the tenant already exists on this pageserver.
1572 : //
1573 : // Files are scheduled to be persisted to remote storage, and the
1574 : // caller should poll the http api to check when that is done.
1575 : //
1576 : // Example import command:
1577 : // 1. Get start/end LSN from backup_manifest file
1578 : // 2. Run:
1579 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1580 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1581 12 : let (_, params_raw) = query_string.split_at("import basebackup ".len());
1582 12 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1583 12 : if params.len() != 5 {
1584 0 : return Err(QueryError::Other(anyhow::anyhow!(
1585 0 : "invalid param number for import basebackup command"
1586 0 : )));
1587 12 : }
1588 12 : let tenant_id = TenantId::from_str(params[0])
1589 12 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1590 12 : let timeline_id = TimelineId::from_str(params[1])
1591 12 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1592 12 : let base_lsn = Lsn::from_str(params[2])
1593 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1594 12 : let end_lsn = Lsn::from_str(params[3])
1595 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1596 12 : let pg_version = u32::from_str(params[4])
1597 12 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1598 :
1599 12 : tracing::Span::current()
1600 12 : .record("tenant_id", field::display(tenant_id))
1601 12 : .record("timeline_id", field::display(timeline_id));
1602 12 :
1603 12 : self.check_permission(Some(tenant_id))?;
1604 :
1605 12 : match self
1606 12 : .handle_import_basebackup(
1607 12 : pgb,
1608 12 : tenant_id,
1609 12 : timeline_id,
1610 12 : base_lsn,
1611 12 : end_lsn,
1612 12 : pg_version,
1613 12 : ctx,
1614 12 : )
1615 7707 : .await
1616 : {
1617 10 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1618 2 : Err(e) => {
1619 2 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1620 2 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1621 2 : &e.to_string(),
1622 2 : Some(e.pg_error_code()),
1623 2 : ))?
1624 : }
1625 : };
1626 28 : } else if query_string.starts_with("import wal ") {
1627 : // Import the `pg_wal` section of a basebackup.
1628 : //
1629 : // Files are scheduled to be persisted to remote storage, and the
1630 : // caller should poll the http api to check when that is done.
1631 3 : let (_, params_raw) = query_string.split_at("import wal ".len());
1632 3 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1633 3 : if params.len() != 4 {
1634 0 : return Err(QueryError::Other(anyhow::anyhow!(
1635 0 : "invalid param number for import wal command"
1636 0 : )));
1637 3 : }
1638 3 : let tenant_id = TenantId::from_str(params[0])
1639 3 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1640 3 : let timeline_id = TimelineId::from_str(params[1])
1641 3 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1642 3 : let start_lsn = Lsn::from_str(params[2])
1643 3 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1644 3 : let end_lsn = Lsn::from_str(params[3])
1645 3 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1646 :
1647 3 : tracing::Span::current()
1648 3 : .record("tenant_id", field::display(tenant_id))
1649 3 : .record("timeline_id", field::display(timeline_id));
1650 3 :
1651 3 : self.check_permission(Some(tenant_id))?;
1652 :
1653 3 : match self
1654 3 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1655 3685 : .await
1656 : {
1657 3 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1658 0 : Err(e) => {
1659 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1660 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1661 0 : &e.to_string(),
1662 0 : Some(e.pg_error_code()),
1663 0 : ))?
1664 : }
1665 : };
1666 25 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1667 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1668 : // on connect
1669 20 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1670 5 : } else if query_string.starts_with("show ") {
1671 : // show <tenant_id>
1672 5 : let (_, params_raw) = query_string.split_at("show ".len());
1673 5 : let params = params_raw.split(' ').collect::<Vec<_>>();
1674 5 : if params.len() != 1 {
1675 0 : return Err(QueryError::Other(anyhow::anyhow!(
1676 0 : "invalid param number for config command"
1677 0 : )));
1678 5 : }
1679 5 : let tenant_id = TenantId::from_str(params[0])
1680 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1681 :
1682 5 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1683 5 :
1684 5 : self.check_permission(Some(tenant_id))?;
1685 :
1686 5 : let tenant = get_active_tenant_with_timeout(
1687 5 : tenant_id,
1688 5 : ShardSelector::Zero,
1689 5 : ACTIVE_TENANT_TIMEOUT,
1690 5 : &task_mgr::shutdown_token(),
1691 5 : )
1692 0 : .await?;
1693 5 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1694 5 : RowDescriptor::int8_col(b"checkpoint_distance"),
1695 5 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1696 5 : RowDescriptor::int8_col(b"compaction_target_size"),
1697 5 : RowDescriptor::int8_col(b"compaction_period"),
1698 5 : RowDescriptor::int8_col(b"compaction_threshold"),
1699 5 : RowDescriptor::int8_col(b"gc_horizon"),
1700 5 : RowDescriptor::int8_col(b"gc_period"),
1701 5 : RowDescriptor::int8_col(b"image_creation_threshold"),
1702 5 : RowDescriptor::int8_col(b"pitr_interval"),
1703 5 : ]))?
1704 5 : .write_message_noflush(&BeMessage::DataRow(&[
1705 5 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1706 5 : Some(
1707 5 : tenant
1708 5 : .get_checkpoint_timeout()
1709 5 : .as_secs()
1710 5 : .to_string()
1711 5 : .as_bytes(),
1712 5 : ),
1713 5 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1714 5 : Some(
1715 5 : tenant
1716 5 : .get_compaction_period()
1717 5 : .as_secs()
1718 5 : .to_string()
1719 5 : .as_bytes(),
1720 5 : ),
1721 5 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1722 5 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1723 5 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1724 5 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1725 5 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1726 5 : ]))?
1727 5 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1728 : } else {
1729 0 : return Err(QueryError::Other(anyhow::anyhow!(
1730 0 : "unknown command {query_string}"
1731 0 : )));
1732 : }
1733 :
1734 10358 : Ok(())
1735 21456 : }
1736 : }
1737 :
1738 : impl From<GetActiveTenantError> for QueryError {
1739 12 : fn from(e: GetActiveTenantError) -> Self {
1740 0 : match e {
1741 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1742 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1743 0 : ),
1744 : GetActiveTenantError::Cancelled
1745 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1746 0 : QueryError::Shutdown
1747 : }
1748 12 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1749 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1750 : }
1751 12 : }
1752 : }
1753 :
1754 0 : #[derive(Debug, thiserror::Error)]
1755 : enum GetActiveTimelineError {
1756 : #[error(transparent)]
1757 : Tenant(GetActiveTenantError),
1758 : #[error(transparent)]
1759 : Timeline(#[from] GetTimelineError),
1760 : }
1761 :
1762 : impl From<GetActiveTimelineError> for QueryError {
1763 6 : fn from(e: GetActiveTimelineError) -> Self {
1764 0 : match e {
1765 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1766 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1767 6 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1768 : }
1769 6 : }
1770 : }
1771 :
1772 5552947 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1773 5552947 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1774 5552947 : tracing::Span::current().record(
1775 5552947 : "shard_id",
1776 5552947 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1777 5552947 : );
1778 5552947 : debug_assert_current_span_has_tenant_and_timeline_id();
1779 5552947 : }
|