Line data Source code
1 : //
2 : //! The Page Service listens for client connections and serves their GetPage@LSN
3 : //! requests.
4 : //
5 : // It is possible to connect here using usual psql/pgbench/libpq. Following
6 : // commands are supported now:
7 : // *status* -- show actual info about this pageserver,
8 : // *pagestream* -- enter mode where smgr and pageserver talk with their
9 : // custom protocol.
10 : //
11 :
12 : use anyhow::Context;
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use bytes::Bytes;
16 : use futures::stream::FuturesUnordered;
17 : use futures::Stream;
18 : use futures::StreamExt;
19 : use pageserver_api::key::Key;
20 : use pageserver_api::models::TenantState;
21 : use pageserver_api::models::{
22 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
23 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
24 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
25 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
26 : PagestreamNblocksResponse,
27 : };
28 : use pageserver_api::shard::ShardIndex;
29 : use pageserver_api::shard::ShardNumber;
30 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
31 : use pq_proto::framed::ConnectionError;
32 : use pq_proto::FeStartupPacket;
33 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
34 : use std::borrow::Cow;
35 : use std::collections::HashMap;
36 : use std::io;
37 : use std::net::TcpListener;
38 : use std::pin::pin;
39 : use std::str;
40 : use std::str::FromStr;
41 : use std::sync::Arc;
42 : use std::time::Duration;
43 : use tokio::io::AsyncWriteExt;
44 : use tokio::io::{AsyncRead, AsyncWrite};
45 : use tokio_util::io::StreamReader;
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::*;
48 : use utils::id::ConnectionId;
49 : use utils::sync::gate::GateGuard;
50 : use utils::{
51 : auth::{Claims, Scope, SwappableJwtAuth},
52 : id::{TenantId, TimelineId},
53 : lsn::Lsn,
54 : simple_rcu::RcuReadGuard,
55 : };
56 :
57 : use crate::auth::check_permission;
58 : use crate::basebackup;
59 : use crate::config::PageServerConf;
60 : use crate::context::{DownloadBehavior, RequestContext};
61 : use crate::import_datadir::import_wal_from_tar;
62 : use crate::metrics;
63 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
64 : use crate::pgdatadir_mapping::Version;
65 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
66 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
67 : use crate::task_mgr;
68 : use crate::task_mgr::TaskKind;
69 : use crate::tenant::mgr;
70 : use crate::tenant::mgr::get_active_tenant_with_timeout;
71 : use crate::tenant::mgr::GetActiveTenantError;
72 : use crate::tenant::mgr::ShardSelector;
73 : use crate::tenant::timeline::WaitLsnError;
74 : use crate::tenant::GetTimelineError;
75 : use crate::tenant::PageReconstructError;
76 : use crate::tenant::Timeline;
77 : use crate::trace::Tracer;
78 : use pageserver_api::key::rel_block_to_key;
79 : use pageserver_api::reltag::SlruKind;
80 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
81 : use postgres_ffi::BLCKSZ;
82 :
83 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
84 : // is not yet in state [`TenantState::Active`].
85 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
86 :
87 : /// Read the end of a tar archive.
88 : ///
89 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
90 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
91 : /// and check that there is no more data after the EOF marker.
92 : ///
93 : /// 'tar' command can also write extra blocks of zeros, up to a record
94 : /// size, controlled by the --record-size argument. Ignore them too.
95 0 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
96 0 : use tokio::io::AsyncReadExt;
97 0 : let mut buf = [0u8; 512];
98 0 :
99 0 : // Read the all-zeros block, and verify it
100 0 : let mut total_bytes = 0;
101 0 : while total_bytes < 512 {
102 0 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
103 0 : total_bytes += nbytes;
104 0 : if nbytes == 0 {
105 0 : break;
106 0 : }
107 : }
108 0 : if total_bytes < 512 {
109 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
110 0 : }
111 0 : if !buf.iter().all(|&x| x == 0) {
112 0 : anyhow::bail!("invalid tar EOF marker");
113 0 : }
114 0 :
115 0 : // Drain any extra zero-blocks after the EOF marker
116 0 : let mut trailing_bytes = 0;
117 0 : let mut seen_nonzero_bytes = false;
118 : loop {
119 0 : let nbytes = reader.read(&mut buf).await?;
120 0 : trailing_bytes += nbytes;
121 0 : if !buf.iter().all(|&x| x == 0) {
122 0 : seen_nonzero_bytes = true;
123 0 : }
124 0 : if nbytes == 0 {
125 0 : break;
126 0 : }
127 : }
128 0 : if seen_nonzero_bytes {
129 0 : anyhow::bail!("unexpected non-zero bytes after the tar archive");
130 0 : }
131 0 : if trailing_bytes % 512 != 0 {
132 0 : anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
133 0 : }
134 0 : Ok(())
135 0 : }
136 :
137 : ///////////////////////////////////////////////////////////////////////////////
138 :
139 : ///
140 : /// Main loop of the page service.
141 : ///
142 : /// Listens for connections, and launches a new handler task for each.
143 : ///
144 0 : pub async fn libpq_listener_main(
145 0 : conf: &'static PageServerConf,
146 0 : broker_client: storage_broker::BrokerClientChannel,
147 0 : auth: Option<Arc<SwappableJwtAuth>>,
148 0 : listener: TcpListener,
149 0 : auth_type: AuthType,
150 0 : listener_ctx: RequestContext,
151 0 : cancel: CancellationToken,
152 0 : ) -> anyhow::Result<()> {
153 0 : listener.set_nonblocking(true)?;
154 0 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
155 :
156 : // Wait for a new connection to arrive, or for server shutdown.
157 0 : while let Some(res) = tokio::select! {
158 : biased;
159 :
160 : _ = cancel.cancelled() => {
161 : // We were requested to shut down.
162 : None
163 : }
164 :
165 0 : res = tokio_listener.accept() => {
166 : Some(res)
167 : }
168 : } {
169 0 : match res {
170 0 : Ok((socket, peer_addr)) => {
171 0 : // Connection established. Spawn a new task to handle it.
172 0 : debug!("accepted connection from {}", peer_addr);
173 0 : let local_auth = auth.clone();
174 0 :
175 0 : let connection_ctx = listener_ctx
176 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
177 0 :
178 0 : // PageRequestHandler tasks are not associated with any particular
179 0 : // timeline in the task manager. In practice most connections will
180 0 : // only deal with a particular timeline, but we don't know which one
181 0 : // yet.
182 0 : task_mgr::spawn(
183 0 : &tokio::runtime::Handle::current(),
184 0 : TaskKind::PageRequestHandler,
185 0 : None,
186 0 : None,
187 0 : "serving compute connection task",
188 0 : false,
189 0 : page_service_conn_main(
190 0 : conf,
191 0 : broker_client.clone(),
192 0 : local_auth,
193 0 : socket,
194 0 : auth_type,
195 0 : connection_ctx,
196 0 : ),
197 0 : );
198 : }
199 0 : Err(err) => {
200 0 : // accept() failed. Log the error, and loop back to retry on next connection.
201 0 : error!("accept() failed: {:?}", err);
202 : }
203 : }
204 : }
205 :
206 0 : debug!("page_service loop terminated");
207 :
208 0 : Ok(())
209 0 : }
210 :
211 0 : #[instrument(skip_all, fields(peer_addr))]
212 : async fn page_service_conn_main(
213 : conf: &'static PageServerConf,
214 : broker_client: storage_broker::BrokerClientChannel,
215 : auth: Option<Arc<SwappableJwtAuth>>,
216 : socket: tokio::net::TcpStream,
217 : auth_type: AuthType,
218 : connection_ctx: RequestContext,
219 : ) -> anyhow::Result<()> {
220 : // Immediately increment the gauge, then create a job to decrement it on task exit.
221 : // One of the pros of `defer!` is that this will *most probably*
222 : // get called, even in presence of panics.
223 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
224 : gauge.inc();
225 0 : scopeguard::defer! {
226 0 : gauge.dec();
227 0 : }
228 :
229 : socket
230 : .set_nodelay(true)
231 : .context("could not set TCP_NODELAY")?;
232 :
233 : let peer_addr = socket.peer_addr().context("get peer address")?;
234 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
235 :
236 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
237 : // - long enough for most valid compute connections
238 : // - less than infinite to stop us from "leaking" connections to long-gone computes
239 : //
240 : // no write timeout is used, because the kernel is assumed to error writes after some time.
241 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
242 :
243 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
244 0 : let socket_timeout_ms = (|| {
245 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
246 : // Exponential distribution for simulating
247 : // poor network conditions, expect about avg_timeout_ms to be around 15
248 : // in tests
249 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
250 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
251 0 : let u = rand::random::<f32>();
252 0 : ((1.0 - u).ln() / (-avg)) as u64
253 : } else {
254 0 : default_timeout_ms
255 : }
256 0 : });
257 0 : default_timeout_ms
258 : })();
259 :
260 : // A timeout here does not mean the client died, it can happen if it's just idle for
261 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
262 : // they reconnect.
263 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
264 : let socket = std::pin::pin!(socket);
265 :
266 : // XXX: pgbackend.run() should take the connection_ctx,
267 : // and create a child per-query context when it invokes process_query.
268 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
269 : // and create the per-query context in process_query ourselves.
270 : let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
271 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
272 :
273 : match pgbackend
274 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
275 : .await
276 : {
277 : Ok(()) => {
278 : // we've been requested to shut down
279 : Ok(())
280 : }
281 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
282 : if is_expected_io_error(&io_error) {
283 0 : info!("Postgres client disconnected ({io_error})");
284 : Ok(())
285 : } else {
286 : Err(io_error).context("Postgres connection error")
287 : }
288 : }
289 : other => other.context("Postgres query error"),
290 : }
291 : }
292 :
293 : /// While a handler holds a reference to a Timeline, it also holds a the
294 : /// timeline's Gate open.
295 : struct HandlerTimeline {
296 : timeline: Arc<Timeline>,
297 : _guard: GateGuard,
298 : }
299 :
300 : struct PageServerHandler {
301 : _conf: &'static PageServerConf,
302 : broker_client: storage_broker::BrokerClientChannel,
303 : auth: Option<Arc<SwappableJwtAuth>>,
304 : claims: Option<Claims>,
305 :
306 : /// The context created for the lifetime of the connection
307 : /// services by this PageServerHandler.
308 : /// For each query received over the connection,
309 : /// `process_query` creates a child context from this one.
310 : connection_ctx: RequestContext,
311 :
312 : /// See [`Self::cache_timeline`] for usage.
313 : ///
314 : /// Note on size: the typical size of this map is 1. The largest size we expect
315 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
316 : /// or the ratio used when splitting shards (i.e. how many children created from one)
317 : /// parent shard, where a "large" number might be ~8.
318 : shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
319 : }
320 :
321 0 : #[derive(thiserror::Error, Debug)]
322 : enum PageStreamError {
323 : /// We encountered an error that should prompt the client to reconnect:
324 : /// in practice this means we drop the connection without sending a response.
325 : #[error("Reconnect required: {0}")]
326 : Reconnect(Cow<'static, str>),
327 :
328 : /// We were instructed to shutdown while processing the query
329 : #[error("Shutting down")]
330 : Shutdown,
331 :
332 : /// Something went wrong reading a page: this likely indicates a pageserver bug
333 : #[error("Read error")]
334 : Read(#[source] PageReconstructError),
335 :
336 : /// Ran out of time waiting for an LSN
337 : #[error("LSN timeout: {0}")]
338 : LsnTimeout(WaitLsnError),
339 :
340 : /// The entity required to serve the request (tenant or timeline) is not found,
341 : /// or is not found in a suitable state to serve a request.
342 : #[error("Not found: {0}")]
343 : NotFound(Cow<'static, str>),
344 :
345 : /// Request asked for something that doesn't make sense, like an invalid LSN
346 : #[error("Bad request: {0}")]
347 : BadRequest(Cow<'static, str>),
348 : }
349 :
350 : impl From<PageReconstructError> for PageStreamError {
351 0 : fn from(value: PageReconstructError) -> Self {
352 0 : match value {
353 0 : PageReconstructError::Cancelled => Self::Shutdown,
354 0 : e => Self::Read(e),
355 : }
356 0 : }
357 : }
358 :
359 : impl From<GetActiveTimelineError> for PageStreamError {
360 0 : fn from(value: GetActiveTimelineError) -> Self {
361 0 : match value {
362 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
363 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
364 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
365 : }
366 0 : }
367 : }
368 :
369 : impl From<WaitLsnError> for PageStreamError {
370 0 : fn from(value: WaitLsnError) -> Self {
371 0 : match value {
372 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
373 0 : WaitLsnError::Shutdown => Self::Shutdown,
374 0 : WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
375 : }
376 0 : }
377 : }
378 :
379 : impl From<WaitLsnError> for QueryError {
380 0 : fn from(value: WaitLsnError) -> Self {
381 0 : match value {
382 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
383 0 : WaitLsnError::Shutdown => Self::Shutdown,
384 0 : WaitLsnError::BadState => Self::Reconnect,
385 : }
386 0 : }
387 : }
388 :
389 : impl PageServerHandler {
390 0 : pub fn new(
391 0 : conf: &'static PageServerConf,
392 0 : broker_client: storage_broker::BrokerClientChannel,
393 0 : auth: Option<Arc<SwappableJwtAuth>>,
394 0 : connection_ctx: RequestContext,
395 0 : ) -> Self {
396 0 : PageServerHandler {
397 0 : _conf: conf,
398 0 : broker_client,
399 0 : auth,
400 0 : claims: None,
401 0 : connection_ctx,
402 0 : shard_timelines: HashMap::new(),
403 0 : }
404 0 : }
405 :
406 : /// Future that completes when we need to shut down the connection.
407 : ///
408 : /// We currently need to shut down when any of the following happens:
409 : /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
410 : /// 2. task_mgr requests shutdown of the connection
411 : ///
412 : /// NB on (1): the connection's lifecycle is not actually tied to any of the
413 : /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
414 : /// implementation to be responsive to timeline cancellation because
415 : /// the connection holds their `GateGuards` open (sored in `shard_timelines`).
416 : /// We currently do the easy thing and terminate the connection if any of the
417 : /// shard_timelines gets cancelled. But really, we cuold spend more effort
418 : /// and simply remove the cancelled timeline from the `shard_timelines`, thereby
419 : /// dropping the guard.
420 : ///
421 : /// NB: keep in sync with [`Self::is_connection_cancelled`]
422 0 : async fn await_connection_cancelled(&self) {
423 0 : // A short wait before we expend the cycles to walk our timeline map. This avoids incurring
424 0 : // that cost every time we check for cancellation.
425 0 : tokio::time::sleep(Duration::from_millis(10)).await;
426 :
427 : // This function is never called concurrently with code that adds timelines to shard_timelines,
428 : // which is enforced by the borrow checker (the future returned by this function carries the
429 : // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
430 : // missing any inserts to the map.
431 :
432 0 : let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
433 0 : use futures::future::Either;
434 0 : cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
435 0 : cancellation_sources.extend(
436 0 : self.shard_timelines
437 0 : .values()
438 0 : .map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
439 0 : );
440 0 : FuturesUnordered::from_iter(cancellation_sources)
441 0 : .next()
442 0 : .await;
443 0 : }
444 :
445 : /// Checking variant of [`Self::await_connection_cancelled`].
446 0 : fn is_connection_cancelled(&self) -> bool {
447 0 : task_mgr::is_shutdown_requested()
448 0 : || self
449 0 : .shard_timelines
450 0 : .values()
451 0 : .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
452 0 : }
453 :
454 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
455 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
456 : /// cancellation if there aren't any timelines in the cache.
457 : ///
458 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
459 : /// timeline cancellation token.
460 0 : async fn flush_cancellable<IO>(
461 0 : &self,
462 0 : pgb: &mut PostgresBackend<IO>,
463 0 : cancel: &CancellationToken,
464 0 : ) -> Result<(), QueryError>
465 0 : where
466 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
467 0 : {
468 0 : tokio::select!(
469 0 : flush_r = pgb.flush() => {
470 : Ok(flush_r?)
471 : },
472 : _ = self.await_connection_cancelled() => {
473 : Err(QueryError::Shutdown)
474 : }
475 : _ = cancel.cancelled() => {
476 : Err(QueryError::Shutdown)
477 : }
478 : )
479 0 : }
480 :
481 0 : fn copyin_stream<'a, IO>(
482 0 : &'a self,
483 0 : pgb: &'a mut PostgresBackend<IO>,
484 0 : cancel: &'a CancellationToken,
485 0 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
486 0 : where
487 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
488 0 : {
489 0 : async_stream::try_stream! {
490 : loop {
491 0 : let msg = tokio::select! {
492 : biased;
493 :
494 : _ = cancel.cancelled() => {
495 : // We were requested to shut down.
496 : let msg = "pageserver is shutting down";
497 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
498 : Err(QueryError::Shutdown)
499 : }
500 :
501 0 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
502 : };
503 :
504 0 : match msg {
505 0 : Ok(Some(message)) => {
506 0 : let copy_data_bytes = match message {
507 0 : FeMessage::CopyData(bytes) => bytes,
508 0 : FeMessage::CopyDone => { break },
509 0 : FeMessage::Sync => continue,
510 : FeMessage::Terminate => {
511 0 : let msg = "client terminated connection with Terminate message during COPY";
512 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
513 0 : // error can't happen here, ErrorResponse serialization should be always ok
514 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
515 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
516 0 : break;
517 : }
518 0 : m => {
519 0 : let msg = format!("unexpected message {m:?}");
520 0 : // error can't happen here, ErrorResponse serialization should be always ok
521 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
522 0 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
523 0 : break;
524 : }
525 : };
526 :
527 0 : yield copy_data_bytes;
528 : }
529 : Ok(None) => {
530 0 : let msg = "client closed connection during COPY";
531 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
532 0 : // error can't happen here, ErrorResponse serialization should be always ok
533 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
534 0 : self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
535 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
536 : }
537 0 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
538 0 : Err(io_error)?;
539 : }
540 0 : Err(other) => {
541 0 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
542 : }
543 : };
544 : }
545 : }
546 0 : }
547 :
548 0 : #[instrument(skip_all)]
549 : async fn handle_pagerequests<IO>(
550 : &mut self,
551 : pgb: &mut PostgresBackend<IO>,
552 : tenant_id: TenantId,
553 : timeline_id: TimelineId,
554 : ctx: RequestContext,
555 : ) -> Result<(), QueryError>
556 : where
557 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
558 : {
559 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
560 :
561 : let tenant = mgr::get_active_tenant_with_timeout(
562 : tenant_id,
563 : ShardSelector::First,
564 : ACTIVE_TENANT_TIMEOUT,
565 : &task_mgr::shutdown_token(),
566 : )
567 : .await?;
568 :
569 : // Make request tracer if needed
570 : let mut tracer = if tenant.get_trace_read_requests() {
571 : let connection_id = ConnectionId::generate();
572 : let path =
573 : tenant
574 : .conf
575 : .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
576 : Some(Tracer::new(path))
577 : } else {
578 : None
579 : };
580 :
581 : // switch client to COPYBOTH
582 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
583 : self.flush_cancellable(pgb, &tenant.cancel).await?;
584 :
585 : loop {
586 0 : let msg = tokio::select! {
587 : biased;
588 :
589 : _ = self.await_connection_cancelled() => {
590 : // We were requested to shut down.
591 0 : info!("shutdown request received in page handler");
592 : return Err(QueryError::Shutdown)
593 : }
594 :
595 0 : msg = pgb.read_message() => { msg }
596 : };
597 :
598 : let copy_data_bytes = match msg? {
599 : Some(FeMessage::CopyData(bytes)) => bytes,
600 : Some(FeMessage::Terminate) => break,
601 : Some(m) => {
602 : return Err(QueryError::Other(anyhow::anyhow!(
603 : "unexpected message: {m:?} during COPY"
604 : )));
605 : }
606 : None => break, // client disconnected
607 : };
608 :
609 0 : trace!("query: {copy_data_bytes:?}");
610 :
611 : // Trace request if needed
612 : if let Some(t) = tracer.as_mut() {
613 : t.trace(©_data_bytes)
614 : }
615 :
616 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
617 :
618 : // TODO: We could create a new per-request context here, with unique ID.
619 : // Currently we use the same per-timeline context for all requests
620 :
621 : let (response, span) = match neon_fe_msg {
622 : PagestreamFeMessage::Exists(req) => {
623 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
624 : (
625 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
626 : .instrument(span.clone())
627 : .await,
628 : span,
629 : )
630 : }
631 : PagestreamFeMessage::Nblocks(req) => {
632 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
633 : (
634 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
635 : .instrument(span.clone())
636 : .await,
637 : span,
638 : )
639 : }
640 : PagestreamFeMessage::GetPage(req) => {
641 : // shard_id is filled in by the handler
642 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
643 : (
644 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
645 : .instrument(span.clone())
646 : .await,
647 : span,
648 : )
649 : }
650 : PagestreamFeMessage::DbSize(req) => {
651 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
652 : (
653 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
654 : .instrument(span.clone())
655 : .await,
656 : span,
657 : )
658 : }
659 : PagestreamFeMessage::GetSlruSegment(req) => {
660 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
661 : (
662 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
663 : .instrument(span.clone())
664 : .await,
665 : span,
666 : )
667 : }
668 : };
669 :
670 : match response {
671 : Err(PageStreamError::Shutdown) => {
672 : // If we fail to fulfil a request during shutdown, which may be _because_ of
673 : // shutdown, then do not send the error to the client. Instead just drop the
674 : // connection.
675 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
676 : return Err(QueryError::Shutdown);
677 : }
678 : Err(PageStreamError::Reconnect(reason)) => {
679 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
680 : return Err(QueryError::Reconnect);
681 : }
682 : Err(e) if self.is_connection_cancelled() => {
683 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
684 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
685 : //
686 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
687 : // because wait_lsn etc will drop out
688 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
689 : // is_canceled(): [`Timeline::shutdown`]` has entered
690 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
691 : return Err(QueryError::Shutdown);
692 : }
693 : r => {
694 0 : let response_msg = r.unwrap_or_else(|e| {
695 0 : // print the all details to the log with {:#}, but for the client the
696 0 : // error message is enough. Do not log if shutting down, as the anyhow::Error
697 0 : // here includes cancellation which is not an error.
698 0 : let full = utils::error::report_compact_sources(&e);
699 0 : span.in_scope(|| {
700 0 : error!("error reading relation or page version: {full:#}")
701 0 : });
702 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
703 0 : message: e.to_string(),
704 0 : })
705 0 : });
706 :
707 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
708 : self.flush_cancellable(pgb, &tenant.cancel).await?;
709 : }
710 : }
711 : }
712 : Ok(())
713 : }
714 :
715 : #[allow(clippy::too_many_arguments)]
716 0 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
717 : async fn handle_import_basebackup<IO>(
718 : &self,
719 : pgb: &mut PostgresBackend<IO>,
720 : tenant_id: TenantId,
721 : timeline_id: TimelineId,
722 : base_lsn: Lsn,
723 : _end_lsn: Lsn,
724 : pg_version: u32,
725 : ctx: RequestContext,
726 : ) -> Result<(), QueryError>
727 : where
728 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
729 : {
730 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
731 :
732 : // Create empty timeline
733 0 : info!("creating new timeline");
734 : let tenant = get_active_tenant_with_timeout(
735 : tenant_id,
736 : ShardSelector::Zero,
737 : ACTIVE_TENANT_TIMEOUT,
738 : &task_mgr::shutdown_token(),
739 : )
740 : .await?;
741 : let timeline = tenant
742 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
743 : .await?;
744 :
745 : // TODO mark timeline as not ready until it reaches end_lsn.
746 : // We might have some wal to import as well, and we should prevent compute
747 : // from connecting before that and writing conflicting wal.
748 : //
749 : // This is not relevant for pageserver->pageserver migrations, since there's
750 : // no wal to import. But should be fixed if we want to import from postgres.
751 :
752 : // TODO leave clean state on error. For now you can use detach to clean
753 : // up broken state from a failed import.
754 :
755 : // Import basebackup provided via CopyData
756 0 : info!("importing basebackup");
757 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
758 : self.flush_cancellable(pgb, &tenant.cancel).await?;
759 :
760 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
761 : timeline
762 : .import_basebackup_from_tar(
763 : tenant.clone(),
764 : &mut copyin_reader,
765 : base_lsn,
766 : self.broker_client.clone(),
767 : &ctx,
768 : )
769 : .await?;
770 :
771 : // Read the end of the tar archive.
772 : read_tar_eof(copyin_reader).await?;
773 :
774 : // TODO check checksum
775 : // Meanwhile you can verify client-side by taking fullbackup
776 : // and checking that it matches in size with what was imported.
777 : // It wouldn't work if base came from vanilla postgres though,
778 : // since we discard some log files.
779 :
780 0 : info!("done");
781 : Ok(())
782 : }
783 :
784 0 : #[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))]
785 : async fn handle_import_wal<IO>(
786 : &self,
787 : pgb: &mut PostgresBackend<IO>,
788 : tenant_id: TenantId,
789 : timeline_id: TimelineId,
790 : start_lsn: Lsn,
791 : end_lsn: Lsn,
792 : ctx: RequestContext,
793 : ) -> Result<(), QueryError>
794 : where
795 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
796 : {
797 : let timeline = self
798 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
799 : .await?;
800 : let last_record_lsn = timeline.get_last_record_lsn();
801 : if last_record_lsn != start_lsn {
802 : return Err(QueryError::Other(
803 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
804 : );
805 : }
806 :
807 : // TODO leave clean state on error. For now you can use detach to clean
808 : // up broken state from a failed import.
809 :
810 : // Import wal provided via CopyData
811 0 : info!("importing wal");
812 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
813 : self.flush_cancellable(pgb, &timeline.cancel).await?;
814 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
815 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
816 0 : info!("wal import complete");
817 :
818 : // Read the end of the tar archive.
819 : read_tar_eof(copyin_reader).await?;
820 :
821 : // TODO Does it make sense to overshoot?
822 : if timeline.get_last_record_lsn() < end_lsn {
823 : return Err(QueryError::Other(
824 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
825 : );
826 : }
827 :
828 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
829 : // We only want to persist the data, and it doesn't matter if it's in the
830 : // shape of deltas or images.
831 0 : info!("flushing layers");
832 : timeline.freeze_and_flush().await?;
833 :
834 0 : info!("done");
835 : Ok(())
836 : }
837 :
838 : /// Helper function to handle the LSN from client request.
839 : ///
840 : /// Each GetPage (and Exists and Nblocks) request includes information about
841 : /// which version of the page is being requested. The client can request the
842 : /// latest version of the page, or the version that's valid at a particular
843 : /// LSN. The primary compute node will always request the latest page
844 : /// version, while a standby will request a version at the LSN that it's
845 : /// currently caught up to.
846 : ///
847 : /// In either case, if the page server hasn't received the WAL up to the
848 : /// requested LSN yet, we will wait for it to arrive. The return value is
849 : /// the LSN that should be used to look up the page versions.
850 0 : async fn wait_or_get_last_lsn(
851 0 : timeline: &Timeline,
852 0 : mut lsn: Lsn,
853 0 : latest: bool,
854 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
855 0 : ctx: &RequestContext,
856 0 : ) -> Result<Lsn, PageStreamError> {
857 0 : if latest {
858 : // Latest page version was requested. If LSN is given, it is a hint
859 : // to the page server that there have been no modifications to the
860 : // page after that LSN. If we haven't received WAL up to that point,
861 : // wait until it arrives.
862 0 : let last_record_lsn = timeline.get_last_record_lsn();
863 0 :
864 0 : // Note: this covers the special case that lsn == Lsn(0). That
865 0 : // special case means "return the latest version whatever it is",
866 0 : // and it's used for bootstrapping purposes, when the page server is
867 0 : // connected directly to the compute node. That is needed because
868 0 : // when you connect to the compute node, to receive the WAL, the
869 0 : // walsender process will do a look up in the pg_authid catalog
870 0 : // table for authentication. That poses a deadlock problem: the
871 0 : // catalog table lookup will send a GetPage request, but the GetPage
872 0 : // request will block in the page server because the recent WAL
873 0 : // hasn't been received yet, and it cannot be received until the
874 0 : // walsender completes the authentication and starts streaming the
875 0 : // WAL.
876 0 : if lsn <= last_record_lsn {
877 0 : lsn = last_record_lsn;
878 0 : } else {
879 0 : timeline
880 0 : .wait_lsn(
881 0 : lsn,
882 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
883 0 : ctx,
884 0 : )
885 0 : .await?;
886 : // Since we waited for 'lsn' to arrive, that is now the last
887 : // record LSN. (Or close enough for our purposes; the
888 : // last-record LSN can advance immediately after we return
889 : // anyway)
890 : }
891 : } else {
892 0 : if lsn == Lsn(0) {
893 0 : return Err(PageStreamError::BadRequest(
894 0 : "invalid LSN(0) in request".into(),
895 0 : ));
896 0 : }
897 0 : timeline
898 0 : .wait_lsn(
899 0 : lsn,
900 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
901 0 : ctx,
902 0 : )
903 0 : .await?;
904 : }
905 :
906 0 : if lsn < **latest_gc_cutoff_lsn {
907 0 : return Err(PageStreamError::BadRequest(format!(
908 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
909 0 : lsn, **latest_gc_cutoff_lsn
910 0 : ).into()));
911 0 : }
912 0 : Ok(lsn)
913 0 : }
914 :
915 0 : #[instrument(skip_all, fields(shard_id))]
916 : async fn handle_get_rel_exists_request(
917 : &mut self,
918 : tenant_id: TenantId,
919 : timeline_id: TimelineId,
920 : req: &PagestreamExistsRequest,
921 : ctx: &RequestContext,
922 : ) -> Result<PagestreamBeMessage, PageStreamError> {
923 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
924 : let _timer = timeline
925 : .query_metrics
926 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
927 :
928 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
929 : let lsn =
930 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
931 : .await?;
932 :
933 : let exists = timeline
934 : .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
935 : .await?;
936 :
937 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
938 : exists,
939 : }))
940 : }
941 :
942 0 : #[instrument(skip_all, fields(shard_id))]
943 : async fn handle_get_nblocks_request(
944 : &mut self,
945 : tenant_id: TenantId,
946 : timeline_id: TimelineId,
947 : req: &PagestreamNblocksRequest,
948 : ctx: &RequestContext,
949 : ) -> Result<PagestreamBeMessage, PageStreamError> {
950 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
951 :
952 : let _timer = timeline
953 : .query_metrics
954 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
955 :
956 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
957 : let lsn =
958 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
959 : .await?;
960 :
961 : let n_blocks = timeline
962 : .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
963 : .await?;
964 :
965 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
966 : n_blocks,
967 : }))
968 : }
969 :
970 0 : #[instrument(skip_all, fields(shard_id))]
971 : async fn handle_db_size_request(
972 : &mut self,
973 : tenant_id: TenantId,
974 : timeline_id: TimelineId,
975 : req: &PagestreamDbSizeRequest,
976 : ctx: &RequestContext,
977 : ) -> Result<PagestreamBeMessage, PageStreamError> {
978 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
979 :
980 : let _timer = timeline
981 : .query_metrics
982 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
983 :
984 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
985 : let lsn =
986 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
987 : .await?;
988 :
989 : let total_blocks = timeline
990 : .get_db_size(
991 : DEFAULTTABLESPACE_OID,
992 : req.dbnode,
993 : Version::Lsn(lsn),
994 : req.latest,
995 : ctx,
996 : )
997 : .await?;
998 : let db_size = total_blocks as i64 * BLCKSZ as i64;
999 :
1000 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1001 : db_size,
1002 : }))
1003 : }
1004 :
1005 : /// For most getpage requests, we will already have a Timeline to serve the request: this function
1006 : /// looks up such a Timeline synchronously and without touching any global state.
1007 0 : fn get_cached_timeline_for_page(
1008 0 : &mut self,
1009 0 : req: &PagestreamGetPageRequest,
1010 0 : ) -> Result<&Arc<Timeline>, Key> {
1011 0 : let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
1012 : // Fastest path: single sharded case
1013 0 : if first_idx.shard_count.count() == 1 {
1014 0 : return Ok(&first_timeline.timeline);
1015 0 : }
1016 0 :
1017 0 : let key = rel_block_to_key(req.rel, req.blkno);
1018 0 : let shard_num = first_timeline
1019 0 : .timeline
1020 0 : .get_shard_identity()
1021 0 : .get_shard_number(&key);
1022 0 :
1023 0 : // Fast path: matched the first timeline in our local handler map. This case is common if
1024 0 : // only one shard per tenant is attached to this pageserver.
1025 0 : if first_timeline.timeline.get_shard_identity().number == shard_num {
1026 0 : return Ok(&first_timeline.timeline);
1027 0 : }
1028 0 :
1029 0 : let shard_index = ShardIndex {
1030 0 : shard_number: shard_num,
1031 0 : shard_count: first_timeline.timeline.get_shard_identity().count,
1032 0 : };
1033 :
1034 : // Fast-ish path: timeline is in the connection handler's local cache
1035 0 : if let Some(found) = self.shard_timelines.get(&shard_index) {
1036 0 : return Ok(&found.timeline);
1037 0 : }
1038 0 :
1039 0 : key
1040 : } else {
1041 0 : rel_block_to_key(req.rel, req.blkno)
1042 : };
1043 :
1044 0 : Err(key)
1045 0 : }
1046 :
1047 : /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
1048 : /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
1049 : /// again.
1050 : ///
1051 : /// Note that all the Timelines in this cache are for the same timeline_id: they're differ
1052 : /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
1053 : /// based on key.
1054 : ///
1055 : /// The typical size of this cache is 1, as we generally create shards to distribute work
1056 : /// across pageservers, so don't tend to have multiple shards for the same tenant on the
1057 : /// same pageserver.
1058 0 : fn cache_timeline(
1059 0 : &mut self,
1060 0 : timeline: Arc<Timeline>,
1061 0 : ) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
1062 0 : let gate_guard = timeline
1063 0 : .gate
1064 0 : .enter()
1065 0 : .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
1066 :
1067 0 : let shard_index = timeline.tenant_shard_id.to_index();
1068 0 : let entry = self
1069 0 : .shard_timelines
1070 0 : .entry(shard_index)
1071 0 : .or_insert(HandlerTimeline {
1072 0 : timeline,
1073 0 : _guard: gate_guard,
1074 0 : });
1075 0 :
1076 0 : Ok(&entry.timeline)
1077 0 : }
1078 :
1079 : /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
1080 : /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
1081 : /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
1082 0 : async fn load_timeline_for_page(
1083 0 : &mut self,
1084 0 : tenant_id: TenantId,
1085 0 : timeline_id: TimelineId,
1086 0 : key: Key,
1087 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1088 : // Slow path: we must call out to the TenantManager to find the timeline for this Key
1089 0 : let timeline = self
1090 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
1091 0 : .await?;
1092 :
1093 0 : self.cache_timeline(timeline)
1094 0 : }
1095 :
1096 0 : async fn get_timeline_shard_zero(
1097 0 : &mut self,
1098 0 : tenant_id: TenantId,
1099 0 : timeline_id: TimelineId,
1100 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1101 : // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
1102 : // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
1103 : // ref to salf. So instead, we first build a bool, and then return while not borrowing self.
1104 0 : let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
1105 0 : idx.shard_number == ShardNumber(0)
1106 : } else {
1107 0 : false
1108 : };
1109 :
1110 0 : if have_cached {
1111 0 : let entry = self.shard_timelines.iter().next().unwrap();
1112 0 : Ok(&entry.1.timeline)
1113 : } else {
1114 0 : let timeline = self
1115 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1116 0 : .await?;
1117 0 : Ok(self.cache_timeline(timeline)?)
1118 : }
1119 0 : }
1120 :
1121 0 : #[instrument(skip_all, fields(shard_id))]
1122 : async fn handle_get_page_at_lsn_request(
1123 : &mut self,
1124 : tenant_id: TenantId,
1125 : timeline_id: TimelineId,
1126 : req: &PagestreamGetPageRequest,
1127 : ctx: &RequestContext,
1128 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1129 : let timeline = match self.get_cached_timeline_for_page(req) {
1130 : Ok(tl) => {
1131 : set_tracing_field_shard_id(tl);
1132 : tl
1133 : }
1134 : Err(key) => {
1135 : match self
1136 : .load_timeline_for_page(tenant_id, timeline_id, key)
1137 : .await
1138 : {
1139 : Ok(t) => t,
1140 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
1141 : // We already know this tenant exists in general, because we resolved it at
1142 : // start of connection. Getting a NotFound here indicates that the shard containing
1143 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1144 : // mapping is out of date.
1145 : //
1146 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1147 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1148 : // and talk to a different pageserver.
1149 : return Err(PageStreamError::Reconnect(
1150 : "getpage@lsn request routed to wrong shard".into(),
1151 : ));
1152 : }
1153 : Err(e) => return Err(e.into()),
1154 : }
1155 : }
1156 : };
1157 :
1158 : let _timer = timeline
1159 : .query_metrics
1160 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
1161 :
1162 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1163 : let lsn =
1164 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
1165 : .await?;
1166 :
1167 : let page = timeline
1168 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
1169 : .await?;
1170 :
1171 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1172 : page,
1173 : }))
1174 : }
1175 :
1176 0 : #[instrument(skip_all, fields(shard_id))]
1177 : async fn handle_get_slru_segment_request(
1178 : &mut self,
1179 : tenant_id: TenantId,
1180 : timeline_id: TimelineId,
1181 : req: &PagestreamGetSlruSegmentRequest,
1182 : ctx: &RequestContext,
1183 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1184 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1185 :
1186 : let _timer = timeline
1187 : .query_metrics
1188 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1189 :
1190 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1191 : let lsn =
1192 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
1193 : .await?;
1194 :
1195 : let kind = SlruKind::from_repr(req.kind)
1196 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1197 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1198 :
1199 : Ok(PagestreamBeMessage::GetSlruSegment(
1200 : PagestreamGetSlruSegmentResponse { segment },
1201 : ))
1202 : }
1203 :
1204 : #[allow(clippy::too_many_arguments)]
1205 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1206 : async fn handle_basebackup_request<IO>(
1207 : &mut self,
1208 : pgb: &mut PostgresBackend<IO>,
1209 : tenant_id: TenantId,
1210 : timeline_id: TimelineId,
1211 : lsn: Option<Lsn>,
1212 : prev_lsn: Option<Lsn>,
1213 : full_backup: bool,
1214 : gzip: bool,
1215 : ctx: &RequestContext,
1216 : ) -> Result<(), QueryError>
1217 : where
1218 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1219 : {
1220 : let started = std::time::Instant::now();
1221 :
1222 : // check that the timeline exists
1223 : let timeline = self
1224 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1225 : .await?;
1226 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1227 : if let Some(lsn) = lsn {
1228 : // Backup was requested at a particular LSN. Wait for it to arrive.
1229 0 : info!("waiting for {}", lsn);
1230 : timeline
1231 : .wait_lsn(
1232 : lsn,
1233 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1234 : ctx,
1235 : )
1236 : .await?;
1237 : timeline
1238 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1239 : .context("invalid basebackup lsn")?;
1240 : }
1241 :
1242 : let lsn_awaited_after = started.elapsed();
1243 :
1244 : // switch client to COPYOUT
1245 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
1246 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1247 :
1248 : // Send a tarball of the latest layer on the timeline. Compress if not
1249 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1250 : if full_backup {
1251 : let mut writer = pgb.copyout_writer();
1252 : basebackup::send_basebackup_tarball(
1253 : &mut writer,
1254 : &timeline,
1255 : lsn,
1256 : prev_lsn,
1257 : full_backup,
1258 : ctx,
1259 : )
1260 : .await?;
1261 : } else {
1262 : let mut writer = pgb.copyout_writer();
1263 : if gzip {
1264 : let mut encoder = GzipEncoder::with_quality(
1265 : writer,
1266 : // NOTE using fast compression because it's on the critical path
1267 : // for compute startup. For an empty database, we get
1268 : // <100KB with this method. The Level::Best compression method
1269 : // gives us <20KB, but maybe we should add basebackup caching
1270 : // on compute shutdown first.
1271 : async_compression::Level::Fastest,
1272 : );
1273 : basebackup::send_basebackup_tarball(
1274 : &mut encoder,
1275 : &timeline,
1276 : lsn,
1277 : prev_lsn,
1278 : full_backup,
1279 : ctx,
1280 : )
1281 : .await?;
1282 : // shutdown the encoder to ensure the gzip footer is written
1283 : encoder.shutdown().await?;
1284 : } else {
1285 : basebackup::send_basebackup_tarball(
1286 : &mut writer,
1287 : &timeline,
1288 : lsn,
1289 : prev_lsn,
1290 : full_backup,
1291 : ctx,
1292 : )
1293 : .await?;
1294 : }
1295 : }
1296 :
1297 : pgb.write_message_noflush(&BeMessage::CopyDone)?;
1298 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1299 :
1300 : let basebackup_after = started
1301 : .elapsed()
1302 : .checked_sub(lsn_awaited_after)
1303 : .unwrap_or(Duration::ZERO);
1304 :
1305 0 : info!(
1306 0 : lsn_await_millis = lsn_awaited_after.as_millis(),
1307 0 : basebackup_millis = basebackup_after.as_millis(),
1308 0 : "basebackup complete"
1309 0 : );
1310 :
1311 : Ok(())
1312 : }
1313 :
1314 : // when accessing management api supply None as an argument
1315 : // when using to authorize tenant pass corresponding tenant id
1316 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1317 0 : if self.auth.is_none() {
1318 : // auth is set to Trust, nothing to check so just return ok
1319 0 : return Ok(());
1320 0 : }
1321 0 : // auth is some, just checked above, when auth is some
1322 0 : // then claims are always present because of checks during connection init
1323 0 : // so this expect won't trigger
1324 0 : let claims = self
1325 0 : .claims
1326 0 : .as_ref()
1327 0 : .expect("claims presence already checked");
1328 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1329 0 : }
1330 :
1331 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1332 0 : async fn get_active_tenant_timeline(
1333 0 : &self,
1334 0 : tenant_id: TenantId,
1335 0 : timeline_id: TimelineId,
1336 0 : selector: ShardSelector,
1337 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1338 0 : let tenant = get_active_tenant_with_timeout(
1339 0 : tenant_id,
1340 0 : selector,
1341 0 : ACTIVE_TENANT_TIMEOUT,
1342 0 : &task_mgr::shutdown_token(),
1343 0 : )
1344 0 : .await
1345 0 : .map_err(GetActiveTimelineError::Tenant)?;
1346 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
1347 0 : set_tracing_field_shard_id(&timeline);
1348 0 : Ok(timeline)
1349 0 : }
1350 : }
1351 :
1352 : #[async_trait::async_trait]
1353 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1354 : where
1355 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1356 : {
1357 0 : fn check_auth_jwt(
1358 0 : &mut self,
1359 0 : _pgb: &mut PostgresBackend<IO>,
1360 0 : jwt_response: &[u8],
1361 0 : ) -> Result<(), QueryError> {
1362 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1363 : // which requires auth to be present
1364 0 : let data = self
1365 0 : .auth
1366 0 : .as_ref()
1367 0 : .unwrap()
1368 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1369 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1370 :
1371 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1372 0 : return Err(QueryError::Unauthorized(
1373 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1374 0 : ));
1375 0 : }
1376 0 :
1377 0 : debug!(
1378 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1379 0 : data.claims.scope, data.claims.tenant_id,
1380 0 : );
1381 :
1382 0 : self.claims = Some(data.claims);
1383 0 : Ok(())
1384 0 : }
1385 :
1386 0 : fn startup(
1387 0 : &mut self,
1388 0 : _pgb: &mut PostgresBackend<IO>,
1389 0 : _sm: &FeStartupPacket,
1390 0 : ) -> Result<(), QueryError> {
1391 0 : Ok(())
1392 0 : }
1393 :
1394 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1395 : async fn process_query(
1396 : &mut self,
1397 : pgb: &mut PostgresBackend<IO>,
1398 : query_string: &str,
1399 0 : ) -> Result<(), QueryError> {
1400 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1401 0 : info!("Hit failpoint for bad connection");
1402 0 : Err(QueryError::SimulatedConnectionError)
1403 0 : });
1404 :
1405 0 : let ctx = self.connection_ctx.attached_child();
1406 0 : debug!("process query {query_string:?}");
1407 0 : if query_string.starts_with("pagestream ") {
1408 0 : let (_, params_raw) = query_string.split_at("pagestream ".len());
1409 0 : let params = params_raw.split(' ').collect::<Vec<_>>();
1410 0 : if params.len() != 2 {
1411 0 : return Err(QueryError::Other(anyhow::anyhow!(
1412 0 : "invalid param number for pagestream command"
1413 0 : )));
1414 0 : }
1415 0 : let tenant_id = TenantId::from_str(params[0])
1416 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1417 0 : let timeline_id = TimelineId::from_str(params[1])
1418 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1419 :
1420 0 : tracing::Span::current()
1421 0 : .record("tenant_id", field::display(tenant_id))
1422 0 : .record("timeline_id", field::display(timeline_id));
1423 0 :
1424 0 : self.check_permission(Some(tenant_id))?;
1425 :
1426 0 : self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
1427 0 : .await?;
1428 0 : } else if query_string.starts_with("basebackup ") {
1429 0 : let (_, params_raw) = query_string.split_at("basebackup ".len());
1430 0 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1431 0 :
1432 0 : if params.len() < 2 {
1433 0 : return Err(QueryError::Other(anyhow::anyhow!(
1434 0 : "invalid param number for basebackup command"
1435 0 : )));
1436 0 : }
1437 :
1438 0 : let tenant_id = TenantId::from_str(params[0])
1439 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1440 0 : let timeline_id = TimelineId::from_str(params[1])
1441 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1442 :
1443 0 : tracing::Span::current()
1444 0 : .record("tenant_id", field::display(tenant_id))
1445 0 : .record("timeline_id", field::display(timeline_id));
1446 0 :
1447 0 : self.check_permission(Some(tenant_id))?;
1448 :
1449 0 : let lsn = if params.len() >= 3 {
1450 : Some(
1451 0 : Lsn::from_str(params[2])
1452 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1453 : )
1454 : } else {
1455 0 : None
1456 : };
1457 :
1458 0 : let gzip = if params.len() >= 4 {
1459 0 : if params[3] == "--gzip" {
1460 0 : true
1461 : } else {
1462 0 : return Err(QueryError::Other(anyhow::anyhow!(
1463 0 : "Parameter in position 3 unknown {}",
1464 0 : params[3],
1465 0 : )));
1466 : }
1467 : } else {
1468 0 : false
1469 : };
1470 :
1471 0 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1472 0 : let res = async {
1473 0 : self.handle_basebackup_request(
1474 0 : pgb,
1475 0 : tenant_id,
1476 0 : timeline_id,
1477 0 : lsn,
1478 0 : None,
1479 0 : false,
1480 0 : gzip,
1481 0 : &ctx,
1482 0 : )
1483 0 : .await?;
1484 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1485 0 : Result::<(), QueryError>::Ok(())
1486 0 : }
1487 0 : .await;
1488 0 : metric_recording.observe(&res);
1489 0 : res?;
1490 : }
1491 : // return pair of prev_lsn and last_lsn
1492 0 : else if query_string.starts_with("get_last_record_rlsn ") {
1493 0 : let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
1494 0 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1495 0 :
1496 0 : if params.len() != 2 {
1497 0 : return Err(QueryError::Other(anyhow::anyhow!(
1498 0 : "invalid param number for get_last_record_rlsn command"
1499 0 : )));
1500 0 : }
1501 :
1502 0 : let tenant_id = TenantId::from_str(params[0])
1503 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1504 0 : let timeline_id = TimelineId::from_str(params[1])
1505 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1506 :
1507 0 : tracing::Span::current()
1508 0 : .record("tenant_id", field::display(tenant_id))
1509 0 : .record("timeline_id", field::display(timeline_id));
1510 0 :
1511 0 : self.check_permission(Some(tenant_id))?;
1512 0 : async {
1513 0 : let timeline = self
1514 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1515 0 : .await?;
1516 :
1517 0 : let end_of_timeline = timeline.get_last_record_rlsn();
1518 0 :
1519 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1520 0 : RowDescriptor::text_col(b"prev_lsn"),
1521 0 : RowDescriptor::text_col(b"last_lsn"),
1522 0 : ]))?
1523 0 : .write_message_noflush(&BeMessage::DataRow(&[
1524 0 : Some(end_of_timeline.prev.to_string().as_bytes()),
1525 0 : Some(end_of_timeline.last.to_string().as_bytes()),
1526 0 : ]))?
1527 0 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1528 0 : anyhow::Ok(())
1529 0 : }
1530 0 : .instrument(info_span!(
1531 0 : "handle_get_last_record_lsn",
1532 0 : shard_id = tracing::field::Empty
1533 0 : ))
1534 0 : .await?;
1535 : }
1536 : // same as basebackup, but result includes relational data as well
1537 0 : else if query_string.starts_with("fullbackup ") {
1538 0 : let (_, params_raw) = query_string.split_at("fullbackup ".len());
1539 0 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1540 0 :
1541 0 : if params.len() < 2 {
1542 0 : return Err(QueryError::Other(anyhow::anyhow!(
1543 0 : "invalid param number for fullbackup command"
1544 0 : )));
1545 0 : }
1546 :
1547 0 : let tenant_id = TenantId::from_str(params[0])
1548 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1549 0 : let timeline_id = TimelineId::from_str(params[1])
1550 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1551 :
1552 0 : tracing::Span::current()
1553 0 : .record("tenant_id", field::display(tenant_id))
1554 0 : .record("timeline_id", field::display(timeline_id));
1555 :
1556 : // The caller is responsible for providing correct lsn and prev_lsn.
1557 0 : let lsn = if params.len() > 2 {
1558 : Some(
1559 0 : Lsn::from_str(params[2])
1560 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1561 : )
1562 : } else {
1563 0 : None
1564 : };
1565 0 : let prev_lsn = if params.len() > 3 {
1566 : Some(
1567 0 : Lsn::from_str(params[3])
1568 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
1569 : )
1570 : } else {
1571 0 : None
1572 : };
1573 :
1574 0 : self.check_permission(Some(tenant_id))?;
1575 :
1576 : // Check that the timeline exists
1577 0 : self.handle_basebackup_request(
1578 0 : pgb,
1579 0 : tenant_id,
1580 0 : timeline_id,
1581 0 : lsn,
1582 0 : prev_lsn,
1583 0 : true,
1584 0 : false,
1585 0 : &ctx,
1586 0 : )
1587 0 : .await?;
1588 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1589 0 : } else if query_string.starts_with("import basebackup ") {
1590 : // Import the `base` section (everything but the wal) of a basebackup.
1591 : // Assumes the tenant already exists on this pageserver.
1592 : //
1593 : // Files are scheduled to be persisted to remote storage, and the
1594 : // caller should poll the http api to check when that is done.
1595 : //
1596 : // Example import command:
1597 : // 1. Get start/end LSN from backup_manifest file
1598 : // 2. Run:
1599 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1600 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1601 0 : let (_, params_raw) = query_string.split_at("import basebackup ".len());
1602 0 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1603 0 : if params.len() != 5 {
1604 0 : return Err(QueryError::Other(anyhow::anyhow!(
1605 0 : "invalid param number for import basebackup command"
1606 0 : )));
1607 0 : }
1608 0 : let tenant_id = TenantId::from_str(params[0])
1609 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1610 0 : let timeline_id = TimelineId::from_str(params[1])
1611 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1612 0 : let base_lsn = Lsn::from_str(params[2])
1613 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1614 0 : let end_lsn = Lsn::from_str(params[3])
1615 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1616 0 : let pg_version = u32::from_str(params[4])
1617 0 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1618 :
1619 0 : tracing::Span::current()
1620 0 : .record("tenant_id", field::display(tenant_id))
1621 0 : .record("timeline_id", field::display(timeline_id));
1622 0 :
1623 0 : self.check_permission(Some(tenant_id))?;
1624 :
1625 0 : match self
1626 0 : .handle_import_basebackup(
1627 0 : pgb,
1628 0 : tenant_id,
1629 0 : timeline_id,
1630 0 : base_lsn,
1631 0 : end_lsn,
1632 0 : pg_version,
1633 0 : ctx,
1634 0 : )
1635 0 : .await
1636 : {
1637 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1638 0 : Err(e) => {
1639 0 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1640 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1641 0 : &e.to_string(),
1642 0 : Some(e.pg_error_code()),
1643 0 : ))?
1644 : }
1645 : };
1646 0 : } else if query_string.starts_with("import wal ") {
1647 : // Import the `pg_wal` section of a basebackup.
1648 : //
1649 : // Files are scheduled to be persisted to remote storage, and the
1650 : // caller should poll the http api to check when that is done.
1651 0 : let (_, params_raw) = query_string.split_at("import wal ".len());
1652 0 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1653 0 : if params.len() != 4 {
1654 0 : return Err(QueryError::Other(anyhow::anyhow!(
1655 0 : "invalid param number for import wal command"
1656 0 : )));
1657 0 : }
1658 0 : let tenant_id = TenantId::from_str(params[0])
1659 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1660 0 : let timeline_id = TimelineId::from_str(params[1])
1661 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1662 0 : let start_lsn = Lsn::from_str(params[2])
1663 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1664 0 : let end_lsn = Lsn::from_str(params[3])
1665 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1666 :
1667 0 : tracing::Span::current()
1668 0 : .record("tenant_id", field::display(tenant_id))
1669 0 : .record("timeline_id", field::display(timeline_id));
1670 0 :
1671 0 : self.check_permission(Some(tenant_id))?;
1672 :
1673 0 : match self
1674 0 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1675 0 : .await
1676 : {
1677 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1678 0 : Err(e) => {
1679 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1680 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1681 0 : &e.to_string(),
1682 0 : Some(e.pg_error_code()),
1683 0 : ))?
1684 : }
1685 : };
1686 0 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1687 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1688 : // on connect
1689 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1690 0 : } else if query_string.starts_with("show ") {
1691 : // show <tenant_id>
1692 0 : let (_, params_raw) = query_string.split_at("show ".len());
1693 0 : let params = params_raw.split(' ').collect::<Vec<_>>();
1694 0 : if params.len() != 1 {
1695 0 : return Err(QueryError::Other(anyhow::anyhow!(
1696 0 : "invalid param number for config command"
1697 0 : )));
1698 0 : }
1699 0 : let tenant_id = TenantId::from_str(params[0])
1700 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1701 :
1702 0 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1703 0 :
1704 0 : self.check_permission(Some(tenant_id))?;
1705 :
1706 0 : let tenant = get_active_tenant_with_timeout(
1707 0 : tenant_id,
1708 0 : ShardSelector::Zero,
1709 0 : ACTIVE_TENANT_TIMEOUT,
1710 0 : &task_mgr::shutdown_token(),
1711 0 : )
1712 0 : .await?;
1713 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1714 0 : RowDescriptor::int8_col(b"checkpoint_distance"),
1715 0 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1716 0 : RowDescriptor::int8_col(b"compaction_target_size"),
1717 0 : RowDescriptor::int8_col(b"compaction_period"),
1718 0 : RowDescriptor::int8_col(b"compaction_threshold"),
1719 0 : RowDescriptor::int8_col(b"gc_horizon"),
1720 0 : RowDescriptor::int8_col(b"gc_period"),
1721 0 : RowDescriptor::int8_col(b"image_creation_threshold"),
1722 0 : RowDescriptor::int8_col(b"pitr_interval"),
1723 0 : ]))?
1724 0 : .write_message_noflush(&BeMessage::DataRow(&[
1725 0 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1726 0 : Some(
1727 0 : tenant
1728 0 : .get_checkpoint_timeout()
1729 0 : .as_secs()
1730 0 : .to_string()
1731 0 : .as_bytes(),
1732 0 : ),
1733 0 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1734 0 : Some(
1735 0 : tenant
1736 0 : .get_compaction_period()
1737 0 : .as_secs()
1738 0 : .to_string()
1739 0 : .as_bytes(),
1740 0 : ),
1741 0 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1742 0 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1743 0 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1744 0 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1745 0 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1746 0 : ]))?
1747 0 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1748 : } else {
1749 0 : return Err(QueryError::Other(anyhow::anyhow!(
1750 0 : "unknown command {query_string}"
1751 0 : )));
1752 : }
1753 :
1754 0 : Ok(())
1755 0 : }
1756 : }
1757 :
1758 : impl From<GetActiveTenantError> for QueryError {
1759 0 : fn from(e: GetActiveTenantError) -> Self {
1760 0 : match e {
1761 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1762 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1763 0 : ),
1764 : GetActiveTenantError::Cancelled
1765 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1766 0 : QueryError::Shutdown
1767 : }
1768 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1769 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1770 : }
1771 0 : }
1772 : }
1773 :
1774 0 : #[derive(Debug, thiserror::Error)]
1775 : enum GetActiveTimelineError {
1776 : #[error(transparent)]
1777 : Tenant(GetActiveTenantError),
1778 : #[error(transparent)]
1779 : Timeline(#[from] GetTimelineError),
1780 : }
1781 :
1782 : impl From<GetActiveTimelineError> for QueryError {
1783 0 : fn from(e: GetActiveTimelineError) -> Self {
1784 0 : match e {
1785 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1786 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1787 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1788 : }
1789 0 : }
1790 : }
1791 :
1792 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1793 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1794 0 : tracing::Span::current().record(
1795 0 : "shard_id",
1796 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1797 0 : );
1798 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1799 0 : }
|