Line data Source code
1 : //
2 : //! The Page Service listens for client connections and serves their GetPage@LSN
3 : //! requests.
4 : //
5 : // It is possible to connect here using usual psql/pgbench/libpq. Following
6 : // commands are supported now:
7 : // *status* -- show actual info about this pageserver,
8 : // *pagestream* -- enter mode where smgr and pageserver talk with their
9 : // custom protocol.
10 : //
11 :
12 : use anyhow::Context;
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use bytes::Bytes;
16 : use futures::stream::FuturesUnordered;
17 : use futures::Stream;
18 : use futures::StreamExt;
19 : use pageserver_api::key::Key;
20 : use pageserver_api::models::TenantState;
21 : use pageserver_api::models::{
22 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
23 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
24 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
25 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
26 : PagestreamNblocksResponse,
27 : };
28 : use pageserver_api::shard::ShardIndex;
29 : use pageserver_api::shard::{ShardCount, ShardNumber};
30 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
31 : use pq_proto::framed::ConnectionError;
32 : use pq_proto::FeStartupPacket;
33 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
34 : use std::borrow::Cow;
35 : use std::collections::HashMap;
36 : use std::io;
37 : use std::net::TcpListener;
38 : use std::pin::pin;
39 : use std::str;
40 : use std::str::FromStr;
41 : use std::sync::Arc;
42 : use std::time::Duration;
43 : use tokio::io::AsyncWriteExt;
44 : use tokio::io::{AsyncRead, AsyncWrite};
45 : use tokio_util::io::StreamReader;
46 : use tokio_util::sync::CancellationToken;
47 : use tracing::field;
48 : use tracing::*;
49 : use utils::id::ConnectionId;
50 : use utils::sync::gate::GateGuard;
51 : use utils::{
52 : auth::{Claims, Scope, SwappableJwtAuth},
53 : id::{TenantId, TimelineId},
54 : lsn::Lsn,
55 : simple_rcu::RcuReadGuard,
56 : };
57 :
58 : use crate::auth::check_permission;
59 : use crate::basebackup;
60 : use crate::config::PageServerConf;
61 : use crate::context::{DownloadBehavior, RequestContext};
62 : use crate::import_datadir::import_wal_from_tar;
63 : use crate::metrics;
64 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
65 : use crate::pgdatadir_mapping::Version;
66 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
67 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
68 : use crate::task_mgr;
69 : use crate::task_mgr::TaskKind;
70 : use crate::tenant::mgr;
71 : use crate::tenant::mgr::get_active_tenant_with_timeout;
72 : use crate::tenant::mgr::GetActiveTenantError;
73 : use crate::tenant::mgr::ShardSelector;
74 : use crate::tenant::timeline::WaitLsnError;
75 : use crate::tenant::GetTimelineError;
76 : use crate::tenant::PageReconstructError;
77 : use crate::tenant::Timeline;
78 : use crate::trace::Tracer;
79 : use pageserver_api::key::rel_block_to_key;
80 : use pageserver_api::reltag::SlruKind;
81 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
82 : use postgres_ffi::BLCKSZ;
83 :
84 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
85 : // is not yet in state [`TenantState::Active`].
86 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
87 :
88 : /// Read the end of a tar archive.
89 : ///
90 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
91 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
92 : /// and check that there is no more data after the EOF marker.
93 : ///
94 : /// 'tar' command can also write extra blocks of zeros, up to a record
95 : /// size, controlled by the --record-size argument. Ignore them too.
96 12 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
97 12 : use tokio::io::AsyncReadExt;
98 12 : let mut buf = [0u8; 512];
99 12 :
100 12 : // Read the all-zeros block, and verify it
101 12 : let mut total_bytes = 0;
102 24 : while total_bytes < 512 {
103 12 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
104 12 : total_bytes += nbytes;
105 12 : if nbytes == 0 {
106 0 : break;
107 12 : }
108 : }
109 12 : if total_bytes < 512 {
110 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
111 12 : }
112 6144 : if !buf.iter().all(|&x| x == 0) {
113 0 : anyhow::bail!("invalid tar EOF marker");
114 12 : }
115 12 :
116 12 : // Drain any extra zero-blocks after the EOF marker
117 12 : let mut trailing_bytes = 0;
118 12 : let mut seen_nonzero_bytes = false;
119 : loop {
120 93 : let nbytes = reader.read(&mut buf).await?;
121 93 : trailing_bytes += nbytes;
122 46594 : if !buf.iter().all(|&x| x == 0) {
123 2 : seen_nonzero_bytes = true;
124 91 : }
125 93 : if nbytes == 0 {
126 12 : break;
127 81 : }
128 : }
129 12 : if seen_nonzero_bytes {
130 1 : anyhow::bail!("unexpected non-zero bytes after the tar archive");
131 11 : }
132 11 : if trailing_bytes % 512 != 0 {
133 0 : anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
134 11 : }
135 11 : Ok(())
136 12 : }
137 :
138 : ///////////////////////////////////////////////////////////////////////////////
139 :
140 : ///
141 : /// Main loop of the page service.
142 : ///
143 : /// Listens for connections, and launches a new handler task for each.
144 : ///
145 624 : pub async fn libpq_listener_main(
146 624 : conf: &'static PageServerConf,
147 624 : broker_client: storage_broker::BrokerClientChannel,
148 624 : auth: Option<Arc<SwappableJwtAuth>>,
149 624 : listener: TcpListener,
150 624 : auth_type: AuthType,
151 624 : listener_ctx: RequestContext,
152 624 : cancel: CancellationToken,
153 624 : ) -> anyhow::Result<()> {
154 624 : listener.set_nonblocking(true)?;
155 624 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
156 :
157 : // Wait for a new connection to arrive, or for server shutdown.
158 11190 : while let Some(res) = tokio::select! {
159 : biased;
160 :
161 : _ = cancel.cancelled() => {
162 : // We were requested to shut down.
163 : None
164 : }
165 :
166 10566 : res = tokio_listener.accept() => {
167 : Some(res)
168 : }
169 : } {
170 10566 : match res {
171 10566 : Ok((socket, peer_addr)) => {
172 : // Connection established. Spawn a new task to handle it.
173 0 : debug!("accepted connection from {}", peer_addr);
174 10566 : let local_auth = auth.clone();
175 10566 :
176 10566 : let connection_ctx = listener_ctx
177 10566 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
178 10566 :
179 10566 : // PageRequestHandler tasks are not associated with any particular
180 10566 : // timeline in the task manager. In practice most connections will
181 10566 : // only deal with a particular timeline, but we don't know which one
182 10566 : // yet.
183 10566 : task_mgr::spawn(
184 10566 : &tokio::runtime::Handle::current(),
185 10566 : TaskKind::PageRequestHandler,
186 10566 : None,
187 10566 : None,
188 10566 : "serving compute connection task",
189 10566 : false,
190 10566 : page_service_conn_main(
191 10566 : conf,
192 10566 : broker_client.clone(),
193 10566 : local_auth,
194 10566 : socket,
195 10566 : auth_type,
196 10566 : connection_ctx,
197 10566 : ),
198 10566 : );
199 : }
200 0 : Err(err) => {
201 : // accept() failed. Log the error, and loop back to retry on next connection.
202 0 : error!("accept() failed: {:?}", err);
203 : }
204 : }
205 : }
206 :
207 0 : debug!("page_service loop terminated");
208 :
209 182 : Ok(())
210 182 : }
211 :
212 21132 : #[instrument(skip_all, fields(peer_addr))]
213 : async fn page_service_conn_main(
214 : conf: &'static PageServerConf,
215 : broker_client: storage_broker::BrokerClientChannel,
216 : auth: Option<Arc<SwappableJwtAuth>>,
217 : socket: tokio::net::TcpStream,
218 : auth_type: AuthType,
219 : connection_ctx: RequestContext,
220 : ) -> anyhow::Result<()> {
221 : // Immediately increment the gauge, then create a job to decrement it on task exit.
222 : // One of the pros of `defer!` is that this will *most probably*
223 : // get called, even in presence of panics.
224 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
225 : gauge.inc();
226 10552 : scopeguard::defer! {
227 10552 : gauge.dec();
228 10552 : }
229 :
230 : socket
231 : .set_nodelay(true)
232 : .context("could not set TCP_NODELAY")?;
233 :
234 : let peer_addr = socket.peer_addr().context("get peer address")?;
235 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
236 :
237 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
238 : // - long enough for most valid compute connections
239 : // - less than infinite to stop us from "leaking" connections to long-gone computes
240 : //
241 : // no write timeout is used, because the kernel is assumed to error writes after some time.
242 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
243 :
244 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
245 10566 : let socket_timeout_ms = (|| {
246 10566 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
247 : // Exponential distribution for simulating
248 : // poor network conditions, expect about avg_timeout_ms to be around 15
249 : // in tests
250 22 : if let Some(avg_timeout_ms) = avg_timeout_ms {
251 22 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
252 22 : let u = rand::random::<f32>();
253 22 : ((1.0 - u).ln() / (-avg)) as u64
254 : } else {
255 0 : default_timeout_ms
256 : }
257 10566 : });
258 10544 : default_timeout_ms
259 : })();
260 :
261 : // A timeout here does not mean the client died, it can happen if it's just idle for
262 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
263 : // they reconnect.
264 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
265 : let socket = std::pin::pin!(socket);
266 :
267 : // XXX: pgbackend.run() should take the connection_ctx,
268 : // and create a child per-query context when it invokes process_query.
269 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
270 : // and create the per-query context in process_query ourselves.
271 : let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
272 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
273 :
274 : match pgbackend
275 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
276 : .await
277 : {
278 : Ok(()) => {
279 : // we've been requested to shut down
280 : Ok(())
281 : }
282 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
283 : if is_expected_io_error(&io_error) {
284 0 : info!("Postgres client disconnected ({io_error})");
285 : Ok(())
286 : } else {
287 : Err(io_error).context("Postgres connection error")
288 : }
289 : }
290 : other => other.context("Postgres query error"),
291 : }
292 : }
293 :
294 : /// While a handler holds a reference to a Timeline, it also holds a the
295 : /// timeline's Gate open.
296 : struct HandlerTimeline {
297 : timeline: Arc<Timeline>,
298 : _guard: GateGuard,
299 : }
300 :
301 : struct PageServerHandler {
302 : _conf: &'static PageServerConf,
303 : broker_client: storage_broker::BrokerClientChannel,
304 : auth: Option<Arc<SwappableJwtAuth>>,
305 : claims: Option<Claims>,
306 :
307 : /// The context created for the lifetime of the connection
308 : /// services by this PageServerHandler.
309 : /// For each query received over the connection,
310 : /// `process_query` creates a child context from this one.
311 : connection_ctx: RequestContext,
312 :
313 : /// See [`Self::cache_timeline`] for usage.
314 : ///
315 : /// Note on size: the typical size of this map is 1. The largest size we expect
316 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
317 : /// or the ratio used when splitting shards (i.e. how many children created from one)
318 : /// parent shard, where a "large" number might be ~8.
319 : shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
320 : }
321 :
322 3 : #[derive(thiserror::Error, Debug)]
323 : enum PageStreamError {
324 : /// We encountered an error that should prompt the client to reconnect:
325 : /// in practice this means we drop the connection without sending a response.
326 : #[error("Reconnect required: {0}")]
327 : Reconnect(Cow<'static, str>),
328 :
329 : /// We were instructed to shutdown while processing the query
330 : #[error("Shutting down")]
331 : Shutdown,
332 :
333 : /// Something went wrong reading a page: this likely indicates a pageserver bug
334 : #[error("Read error")]
335 : Read(#[source] PageReconstructError),
336 :
337 : /// Ran out of time waiting for an LSN
338 : #[error("LSN timeout: {0}")]
339 : LsnTimeout(WaitLsnError),
340 :
341 : /// The entity required to serve the request (tenant or timeline) is not found,
342 : /// or is not found in a suitable state to serve a request.
343 : #[error("Not found: {0}")]
344 : NotFound(Cow<'static, str>),
345 :
346 : /// Request asked for something that doesn't make sense, like an invalid LSN
347 : #[error("Bad request: {0}")]
348 : BadRequest(Cow<'static, str>),
349 : }
350 :
351 : impl From<PageReconstructError> for PageStreamError {
352 4 : fn from(value: PageReconstructError) -> Self {
353 4 : match value {
354 4 : PageReconstructError::Cancelled => Self::Shutdown,
355 0 : e => Self::Read(e),
356 : }
357 4 : }
358 : }
359 :
360 : impl From<GetActiveTimelineError> for PageStreamError {
361 0 : fn from(value: GetActiveTimelineError) -> Self {
362 0 : match value {
363 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
364 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
365 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
366 : }
367 0 : }
368 : }
369 :
370 : impl From<WaitLsnError> for PageStreamError {
371 0 : fn from(value: WaitLsnError) -> Self {
372 0 : match value {
373 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
374 0 : WaitLsnError::Shutdown => Self::Shutdown,
375 0 : WaitLsnError::BadState => Self::Reconnect("Timeline is not active".into()),
376 : }
377 0 : }
378 : }
379 :
380 : impl From<WaitLsnError> for QueryError {
381 24 : fn from(value: WaitLsnError) -> Self {
382 24 : match value {
383 24 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
384 0 : WaitLsnError::Shutdown => Self::Shutdown,
385 0 : WaitLsnError::BadState => Self::Reconnect,
386 : }
387 24 : }
388 : }
389 :
390 : impl PageServerHandler {
391 10566 : pub fn new(
392 10566 : conf: &'static PageServerConf,
393 10566 : broker_client: storage_broker::BrokerClientChannel,
394 10566 : auth: Option<Arc<SwappableJwtAuth>>,
395 10566 : connection_ctx: RequestContext,
396 10566 : ) -> Self {
397 10566 : PageServerHandler {
398 10566 : _conf: conf,
399 10566 : broker_client,
400 10566 : auth,
401 10566 : claims: None,
402 10566 : connection_ctx,
403 10566 : shard_timelines: HashMap::new(),
404 10566 : }
405 10566 : }
406 :
407 : /// Future that completes when we need to shut down the connection.
408 : ///
409 : /// We currently need to shut down when any of the following happens:
410 : /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
411 : /// 2. task_mgr requests shutdown of the connection
412 : ///
413 : /// NB on (1): the connection's lifecycle is not actually tied to any of the
414 : /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
415 : /// implementation to be responsive to timeline cancellation because
416 : /// the connection holds their `GateGuards` open (sored in `shard_timelines`).
417 : /// We currently do the easy thing and terminate the connection if any of the
418 : /// shard_timelines gets cancelled. But really, we cuold spend more effort
419 : /// and simply remove the cancelled timeline from the `shard_timelines`, thereby
420 : /// dropping the guard.
421 : ///
422 : /// NB: keep in sync with [`Self::is_connection_cancelled`]
423 9039091 : async fn await_connection_cancelled(&self) {
424 6032686 : // A short wait before we expend the cycles to walk our timeline map. This avoids incurring
425 6032686 : // that cost every time we check for cancellation.
426 6032686 : tokio::time::sleep(Duration::from_millis(10)).await;
427 :
428 : // This function is never called concurrently with code that adds timelines to shard_timelines,
429 : // which is enforced by the borrow checker (the future returned by this function carries the
430 : // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
431 : // missing any inserts to the map.
432 :
433 51213 : let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
434 51213 : use futures::future::Either;
435 51213 : cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
436 51213 : cancellation_sources.extend(
437 51213 : self.shard_timelines
438 51213 : .values()
439 51213 : .map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
440 51213 : );
441 51213 : FuturesUnordered::from_iter(cancellation_sources)
442 51213 : .next()
443 96407 : .await;
444 187 : }
445 :
446 : /// Checking variant of [`Self::await_connection_cancelled`].
447 1 : fn is_connection_cancelled(&self) -> bool {
448 1 : task_mgr::is_shutdown_requested()
449 1 : || self
450 1 : .shard_timelines
451 1 : .values()
452 1 : .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
453 1 : }
454 :
455 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
456 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
457 : /// cancellation if there aren't any timelines in the cache.
458 : ///
459 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
460 : /// timeline cancellation token.
461 4520196 : async fn flush_cancellable<IO>(
462 4520196 : &self,
463 4520196 : pgb: &mut PostgresBackend<IO>,
464 4520196 : cancel: &CancellationToken,
465 4520196 : ) -> Result<(), QueryError>
466 4520196 : where
467 4520196 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
468 4520196 : {
469 4529379 : tokio::select!(
470 4520196 : flush_r = pgb.flush() => {
471 : Ok(flush_r?)
472 : },
473 : _ = self.await_connection_cancelled() => {
474 : Err(QueryError::Shutdown)
475 : }
476 : _ = cancel.cancelled() => {
477 : Err(QueryError::Shutdown)
478 : }
479 : )
480 4520196 : }
481 :
482 14 : fn copyin_stream<'a, IO>(
483 14 : &'a self,
484 14 : pgb: &'a mut PostgresBackend<IO>,
485 14 : cancel: &'a CancellationToken,
486 14 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
487 14 : where
488 14 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
489 14 : {
490 14 : async_stream::try_stream! {
491 : loop {
492 93934 : let msg = tokio::select! {
493 : biased;
494 :
495 : _ = cancel.cancelled() => {
496 : // We were requested to shut down.
497 : let msg = "pageserver is shutting down";
498 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
499 : Err(QueryError::Shutdown)
500 : }
501 :
502 93934 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
503 : };
504 :
505 93934 : match msg {
506 93934 : Ok(Some(message)) => {
507 93934 : let copy_data_bytes = match message {
508 93913 : FeMessage::CopyData(bytes) => bytes,
509 13 : FeMessage::CopyDone => { break },
510 8 : FeMessage::Sync => continue,
511 : FeMessage::Terminate => {
512 0 : let msg = "client terminated connection with Terminate message during COPY";
513 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
514 0 : // error can't happen here, ErrorResponse serialization should be always ok
515 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
516 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
517 0 : break;
518 : }
519 0 : m => {
520 0 : let msg = format!("unexpected message {m:?}");
521 0 : // error can't happen here, ErrorResponse serialization should be always ok
522 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
523 0 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
524 0 : break;
525 : }
526 : };
527 :
528 93913 : yield copy_data_bytes;
529 : }
530 : Ok(None) => {
531 0 : let msg = "client closed connection during COPY";
532 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
533 0 : // error can't happen here, ErrorResponse serialization should be always ok
534 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
535 0 : self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
536 0 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
537 : }
538 0 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
539 0 : Err(io_error)?;
540 : }
541 0 : Err(other) => {
542 0 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
543 : }
544 : };
545 : }
546 : }
547 14 : }
548 :
549 19748 : #[instrument(skip_all)]
550 : async fn handle_pagerequests<IO>(
551 : &mut self,
552 : pgb: &mut PostgresBackend<IO>,
553 : tenant_id: TenantId,
554 : timeline_id: TimelineId,
555 : ctx: RequestContext,
556 : ) -> Result<(), QueryError>
557 : where
558 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
559 : {
560 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
561 :
562 : let tenant = mgr::get_active_tenant_with_timeout(
563 : tenant_id,
564 : ShardSelector::First,
565 : ACTIVE_TENANT_TIMEOUT,
566 : &task_mgr::shutdown_token(),
567 : )
568 : .await?;
569 :
570 : // Make request tracer if needed
571 : let mut tracer = if tenant.get_trace_read_requests() {
572 : let connection_id = ConnectionId::generate();
573 : let path =
574 : tenant
575 : .conf
576 : .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
577 : Some(Tracer::new(path))
578 : } else {
579 : None
580 : };
581 :
582 : // switch client to COPYBOTH
583 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
584 : self.flush_cancellable(pgb, &tenant.cancel).await?;
585 :
586 : loop {
587 8734645 : let msg = tokio::select! {
588 : biased;
589 :
590 : _ = self.await_connection_cancelled() => {
591 : // We were requested to shut down.
592 187 : info!("shutdown request received in page handler");
593 : return Err(QueryError::Shutdown)
594 : }
595 :
596 4518694 : msg = pgb.read_message() => { msg }
597 : };
598 :
599 : let copy_data_bytes = match msg? {
600 : Some(FeMessage::CopyData(bytes)) => bytes,
601 : Some(FeMessage::Terminate) => break,
602 : Some(m) => {
603 : return Err(QueryError::Other(anyhow::anyhow!(
604 : "unexpected message: {m:?} during COPY"
605 : )));
606 : }
607 : None => break, // client disconnected
608 : };
609 :
610 0 : trace!("query: {copy_data_bytes:?}");
611 :
612 : // Trace request if needed
613 : if let Some(t) = tracer.as_mut() {
614 : t.trace(©_data_bytes)
615 : }
616 :
617 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
618 :
619 : // TODO: We could create a new per-request context here, with unique ID.
620 : // Currently we use the same per-timeline context for all requests
621 :
622 : let (response, span) = match neon_fe_msg {
623 : PagestreamFeMessage::Exists(req) => {
624 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
625 : (
626 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
627 : .instrument(span.clone())
628 : .await,
629 : span,
630 : )
631 : }
632 : PagestreamFeMessage::Nblocks(req) => {
633 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
634 : (
635 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
636 : .instrument(span.clone())
637 : .await,
638 : span,
639 : )
640 : }
641 : PagestreamFeMessage::GetPage(req) => {
642 : // shard_id is filled in by the handler
643 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
644 : (
645 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
646 : .instrument(span.clone())
647 : .await,
648 : span,
649 : )
650 : }
651 : PagestreamFeMessage::DbSize(req) => {
652 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
653 : (
654 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
655 : .instrument(span.clone())
656 : .await,
657 : span,
658 : )
659 : }
660 : PagestreamFeMessage::GetSlruSegment(req) => {
661 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
662 : (
663 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
664 : .instrument(span.clone())
665 : .await,
666 : span,
667 : )
668 : }
669 : };
670 :
671 : match response {
672 : Err(PageStreamError::Shutdown) => {
673 : // If we fail to fulfil a request during shutdown, which may be _because_ of
674 : // shutdown, then do not send the error to the client. Instead just drop the
675 : // connection.
676 4 : span.in_scope(|| info!("dropping connection due to shutdown"));
677 : return Err(QueryError::Shutdown);
678 : }
679 : Err(PageStreamError::Reconnect(reason)) => {
680 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
681 : return Err(QueryError::Reconnect);
682 : }
683 : Err(e) if self.is_connection_cancelled() => {
684 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
685 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
686 : //
687 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
688 : // because wait_lsn etc will drop out
689 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
690 : // is_canceled(): [`Timeline::shutdown`]` has entered
691 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
692 : return Err(QueryError::Shutdown);
693 : }
694 : r => {
695 1 : let response_msg = r.unwrap_or_else(|e| {
696 1 : // print the all details to the log with {:#}, but for the client the
697 1 : // error message is enough. Do not log if shutting down, as the anyhow::Error
698 1 : // here includes cancellation which is not an error.
699 1 : let full = utils::error::report_compact_sources(&e);
700 1 : span.in_scope(|| {
701 1 : error!("error reading relation or page version: {full:#}")
702 1 : });
703 1 : PagestreamBeMessage::Error(PagestreamErrorResponse {
704 1 : message: e.to_string(),
705 1 : })
706 1 : });
707 :
708 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
709 : self.flush_cancellable(pgb, &tenant.cancel).await?;
710 : }
711 : }
712 : }
713 : Ok(())
714 : }
715 :
716 : #[allow(clippy::too_many_arguments)]
717 24 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
718 : async fn handle_import_basebackup<IO>(
719 : &self,
720 : pgb: &mut PostgresBackend<IO>,
721 : tenant_id: TenantId,
722 : timeline_id: TimelineId,
723 : base_lsn: Lsn,
724 : _end_lsn: Lsn,
725 : pg_version: u32,
726 : ctx: RequestContext,
727 : ) -> Result<(), QueryError>
728 : where
729 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
730 : {
731 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
732 :
733 : // Create empty timeline
734 12 : info!("creating new timeline");
735 : let tenant = get_active_tenant_with_timeout(
736 : tenant_id,
737 : ShardSelector::Zero,
738 : ACTIVE_TENANT_TIMEOUT,
739 : &task_mgr::shutdown_token(),
740 : )
741 : .await?;
742 : let timeline = tenant
743 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
744 : .await?;
745 :
746 : // TODO mark timeline as not ready until it reaches end_lsn.
747 : // We might have some wal to import as well, and we should prevent compute
748 : // from connecting before that and writing conflicting wal.
749 : //
750 : // This is not relevant for pageserver->pageserver migrations, since there's
751 : // no wal to import. But should be fixed if we want to import from postgres.
752 :
753 : // TODO leave clean state on error. For now you can use detach to clean
754 : // up broken state from a failed import.
755 :
756 : // Import basebackup provided via CopyData
757 12 : info!("importing basebackup");
758 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
759 : self.flush_cancellable(pgb, &tenant.cancel).await?;
760 :
761 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
762 : timeline
763 : .import_basebackup_from_tar(
764 : &mut copyin_reader,
765 : base_lsn,
766 : self.broker_client.clone(),
767 : &ctx,
768 : )
769 : .await?;
770 :
771 : // Read the end of the tar archive.
772 : read_tar_eof(copyin_reader).await?;
773 :
774 : // TODO check checksum
775 : // Meanwhile you can verify client-side by taking fullbackup
776 : // and checking that it matches in size with what was imported.
777 : // It wouldn't work if base came from vanilla postgres though,
778 : // since we discard some log files.
779 :
780 9 : info!("done");
781 : Ok(())
782 : }
783 :
784 4 : #[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))]
785 : async fn handle_import_wal<IO>(
786 : &self,
787 : pgb: &mut PostgresBackend<IO>,
788 : tenant_id: TenantId,
789 : timeline_id: TimelineId,
790 : start_lsn: Lsn,
791 : end_lsn: Lsn,
792 : ctx: RequestContext,
793 : ) -> Result<(), QueryError>
794 : where
795 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
796 : {
797 : let timeline = self
798 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
799 : .await?;
800 : let last_record_lsn = timeline.get_last_record_lsn();
801 : if last_record_lsn != start_lsn {
802 : return Err(QueryError::Other(
803 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
804 : );
805 : }
806 :
807 : // TODO leave clean state on error. For now you can use detach to clean
808 : // up broken state from a failed import.
809 :
810 : // Import wal provided via CopyData
811 2 : info!("importing wal");
812 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
813 : self.flush_cancellable(pgb, &timeline.cancel).await?;
814 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
815 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
816 2 : info!("wal import complete");
817 :
818 : // Read the end of the tar archive.
819 : read_tar_eof(copyin_reader).await?;
820 :
821 : // TODO Does it make sense to overshoot?
822 : if timeline.get_last_record_lsn() < end_lsn {
823 : return Err(QueryError::Other(
824 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
825 : );
826 : }
827 :
828 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
829 : // We only want to persist the data, and it doesn't matter if it's in the
830 : // shape of deltas or images.
831 2 : info!("flushing layers");
832 : timeline.freeze_and_flush().await?;
833 :
834 2 : info!("done");
835 : Ok(())
836 : }
837 :
838 : /// Helper function to handle the LSN from client request.
839 : ///
840 : /// Each GetPage (and Exists and Nblocks) request includes information about
841 : /// which version of the page is being requested. The client can request the
842 : /// latest version of the page, or the version that's valid at a particular
843 : /// LSN. The primary compute node will always request the latest page
844 : /// version, while a standby will request a version at the LSN that it's
845 : /// currently caught up to.
846 : ///
847 : /// In either case, if the page server hasn't received the WAL up to the
848 : /// requested LSN yet, we will wait for it to arrive. The return value is
849 : /// the LSN that should be used to look up the page versions.
850 4509137 : async fn wait_or_get_last_lsn(
851 4509137 : timeline: &Timeline,
852 4509137 : mut lsn: Lsn,
853 4509137 : latest: bool,
854 4509137 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
855 4509137 : ctx: &RequestContext,
856 4509137 : ) -> Result<Lsn, PageStreamError> {
857 4509137 : if latest {
858 : // Latest page version was requested. If LSN is given, it is a hint
859 : // to the page server that there have been no modifications to the
860 : // page after that LSN. If we haven't received WAL up to that point,
861 : // wait until it arrives.
862 4338656 : let last_record_lsn = timeline.get_last_record_lsn();
863 4338656 :
864 4338656 : // Note: this covers the special case that lsn == Lsn(0). That
865 4338656 : // special case means "return the latest version whatever it is",
866 4338656 : // and it's used for bootstrapping purposes, when the page server is
867 4338656 : // connected directly to the compute node. That is needed because
868 4338656 : // when you connect to the compute node, to receive the WAL, the
869 4338656 : // walsender process will do a look up in the pg_authid catalog
870 4338656 : // table for authentication. That poses a deadlock problem: the
871 4338656 : // catalog table lookup will send a GetPage request, but the GetPage
872 4338656 : // request will block in the page server because the recent WAL
873 4338656 : // hasn't been received yet, and it cannot be received until the
874 4338656 : // walsender completes the authentication and starts streaming the
875 4338656 : // WAL.
876 4338656 : if lsn <= last_record_lsn {
877 4280798 : lsn = last_record_lsn;
878 4280798 : } else {
879 110326 : timeline.wait_lsn(lsn, ctx).await?;
880 : // Since we waited for 'lsn' to arrive, that is now the last
881 : // record LSN. (Or close enough for our purposes; the
882 : // last-record LSN can advance immediately after we return
883 : // anyway)
884 : }
885 : } else {
886 170481 : if lsn == Lsn(0) {
887 1 : return Err(PageStreamError::BadRequest(
888 1 : "invalid LSN(0) in request".into(),
889 1 : ));
890 170480 : }
891 170480 : timeline.wait_lsn(lsn, ctx).await?;
892 : }
893 :
894 4509136 : if lsn < **latest_gc_cutoff_lsn {
895 0 : return Err(PageStreamError::BadRequest(format!(
896 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
897 0 : lsn, **latest_gc_cutoff_lsn
898 0 : ).into()));
899 4509136 : }
900 4509136 : Ok(lsn)
901 4509137 : }
902 :
903 113406 : #[instrument(skip_all, fields(shard_id))]
904 : async fn handle_get_rel_exists_request(
905 : &mut self,
906 : tenant_id: TenantId,
907 : timeline_id: TimelineId,
908 : req: &PagestreamExistsRequest,
909 : ctx: &RequestContext,
910 : ) -> Result<PagestreamBeMessage, PageStreamError> {
911 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
912 : let _timer = timeline
913 : .query_metrics
914 : .start_timer(metrics::SmgrQueryType::GetRelExists);
915 :
916 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
917 : let lsn =
918 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
919 : .await?;
920 :
921 : let exists = timeline
922 : .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
923 : .await?;
924 :
925 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
926 : exists,
927 : }))
928 : }
929 :
930 40446 : #[instrument(skip_all, fields(shard_id))]
931 : async fn handle_get_nblocks_request(
932 : &mut self,
933 : tenant_id: TenantId,
934 : timeline_id: TimelineId,
935 : req: &PagestreamNblocksRequest,
936 : ctx: &RequestContext,
937 : ) -> Result<PagestreamBeMessage, PageStreamError> {
938 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
939 :
940 : let _timer = timeline
941 : .query_metrics
942 : .start_timer(metrics::SmgrQueryType::GetRelSize);
943 :
944 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
945 : let lsn =
946 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
947 : .await?;
948 :
949 : let n_blocks = timeline
950 : .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
951 : .await?;
952 :
953 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
954 : n_blocks,
955 : }))
956 : }
957 :
958 10 : #[instrument(skip_all, fields(shard_id))]
959 : async fn handle_db_size_request(
960 : &mut self,
961 : tenant_id: TenantId,
962 : timeline_id: TimelineId,
963 : req: &PagestreamDbSizeRequest,
964 : ctx: &RequestContext,
965 : ) -> Result<PagestreamBeMessage, PageStreamError> {
966 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
967 :
968 : let _timer = timeline
969 : .query_metrics
970 : .start_timer(metrics::SmgrQueryType::GetDbSize);
971 :
972 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
973 : let lsn =
974 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
975 : .await?;
976 :
977 : let total_blocks = timeline
978 : .get_db_size(
979 : DEFAULTTABLESPACE_OID,
980 : req.dbnode,
981 : Version::Lsn(lsn),
982 : req.latest,
983 : ctx,
984 : )
985 : .await?;
986 : let db_size = total_blocks as i64 * BLCKSZ as i64;
987 :
988 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
989 : db_size,
990 : }))
991 : }
992 :
993 : /// For most getpage requests, we will already have a Timeline to serve the request: this function
994 : /// looks up such a Timeline synchronously and without touching any global state.
995 4432206 : fn get_cached_timeline_for_page(
996 4432206 : &mut self,
997 4432206 : req: &PagestreamGetPageRequest,
998 4432206 : ) -> Result<&Arc<Timeline>, Key> {
999 4432206 : let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
1000 : // Fastest path: single sharded case
1001 4424515 : if first_idx.shard_count < ShardCount(2) {
1002 3857014 : return Ok(&first_timeline.timeline);
1003 567501 : }
1004 567501 :
1005 567501 : let key = rel_block_to_key(req.rel, req.blkno);
1006 567501 : let shard_num = first_timeline
1007 567501 : .timeline
1008 567501 : .get_shard_identity()
1009 567501 : .get_shard_number(&key);
1010 567501 :
1011 567501 : // Fast path: matched the first timeline in our local handler map. This case is common if
1012 567501 : // only one shard per tenant is attached to this pageserver.
1013 567501 : if first_timeline.timeline.get_shard_identity().number == shard_num {
1014 567501 : return Ok(&first_timeline.timeline);
1015 0 : }
1016 0 :
1017 0 : let shard_index = ShardIndex {
1018 0 : shard_number: shard_num,
1019 0 : shard_count: first_timeline.timeline.get_shard_identity().count,
1020 0 : };
1021 :
1022 : // Fast-ish path: timeline is in the connection handler's local cache
1023 0 : if let Some(found) = self.shard_timelines.get(&shard_index) {
1024 0 : return Ok(&found.timeline);
1025 0 : }
1026 0 :
1027 0 : key
1028 : } else {
1029 7691 : rel_block_to_key(req.rel, req.blkno)
1030 : };
1031 :
1032 7691 : Err(key)
1033 4432206 : }
1034 :
1035 : /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
1036 : /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
1037 : /// again.
1038 : ///
1039 : /// Note that all the Timelines in this cache are for the same timeline_id: they're differ
1040 : /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
1041 : /// based on key.
1042 : ///
1043 : /// The typical size of this cache is 1, as we generally create shards to distribute work
1044 : /// across pageservers, so don't tend to have multiple shards for the same tenant on the
1045 : /// same pageserver.
1046 9844 : fn cache_timeline(
1047 9844 : &mut self,
1048 9844 : timeline: Arc<Timeline>,
1049 9844 : ) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
1050 9844 : let gate_guard = timeline
1051 9844 : .gate
1052 9844 : .enter()
1053 9844 : .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
1054 :
1055 9844 : let shard_index = timeline.tenant_shard_id.to_index();
1056 9844 : let entry = self
1057 9844 : .shard_timelines
1058 9844 : .entry(shard_index)
1059 9844 : .or_insert(HandlerTimeline {
1060 9844 : timeline,
1061 9844 : _guard: gate_guard,
1062 9844 : });
1063 9844 :
1064 9844 : Ok(&entry.timeline)
1065 9844 : }
1066 :
1067 : /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
1068 : /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
1069 : /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
1070 7691 : async fn load_timeline_for_page(
1071 7691 : &mut self,
1072 7691 : tenant_id: TenantId,
1073 7691 : timeline_id: TimelineId,
1074 7691 : key: Key,
1075 7691 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1076 : // Slow path: we must call out to the TenantManager to find the timeline for this Key
1077 7691 : let timeline = self
1078 7691 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
1079 0 : .await?;
1080 :
1081 7691 : self.cache_timeline(timeline)
1082 7691 : }
1083 :
1084 76931 : async fn get_timeline_shard_zero(
1085 76931 : &mut self,
1086 76931 : tenant_id: TenantId,
1087 76931 : timeline_id: TimelineId,
1088 76931 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1089 : // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
1090 : // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
1091 : // ref to salf. So instead, we first build a bool, and then return while not borrowing self.
1092 76931 : let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
1093 74778 : idx.shard_number == ShardNumber(0)
1094 : } else {
1095 2153 : false
1096 : };
1097 :
1098 76931 : if have_cached {
1099 74778 : let entry = self.shard_timelines.iter().next().unwrap();
1100 74778 : Ok(&entry.1.timeline)
1101 : } else {
1102 2153 : let timeline = self
1103 2153 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1104 0 : .await?;
1105 2153 : Ok(self.cache_timeline(timeline)?)
1106 : }
1107 76931 : }
1108 :
1109 8864412 : #[instrument(skip_all, fields(shard_id))]
1110 : async fn handle_get_page_at_lsn_request(
1111 : &mut self,
1112 : tenant_id: TenantId,
1113 : timeline_id: TimelineId,
1114 : req: &PagestreamGetPageRequest,
1115 : ctx: &RequestContext,
1116 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1117 : let timeline = match self.get_cached_timeline_for_page(req) {
1118 : Ok(tl) => tl,
1119 : Err(key) => {
1120 : match self
1121 : .load_timeline_for_page(tenant_id, timeline_id, key)
1122 : .await
1123 : {
1124 : Ok(t) => t,
1125 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
1126 : // We already know this tenant exists in general, because we resolved it at
1127 : // start of connection. Getting a NotFound here indicates that the shard containing
1128 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1129 : // mapping is out of date.
1130 : //
1131 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1132 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1133 : // and talk to a different pageserver.
1134 : return Err(PageStreamError::Reconnect(
1135 : "getpage@lsn request routed to wrong shard".into(),
1136 : ));
1137 : }
1138 : Err(e) => return Err(e.into()),
1139 : }
1140 : }
1141 : };
1142 :
1143 : // load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
1144 : set_tracing_field_shard_id(timeline);
1145 :
1146 : let _timer = timeline
1147 : .query_metrics
1148 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn);
1149 :
1150 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1151 : let lsn =
1152 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
1153 : .await?;
1154 :
1155 : let page = timeline
1156 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
1157 : .await?;
1158 :
1159 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1160 : page,
1161 : }))
1162 : }
1163 :
1164 0 : #[instrument(skip_all, fields(shard_id))]
1165 : async fn handle_get_slru_segment_request(
1166 : &mut self,
1167 : tenant_id: TenantId,
1168 : timeline_id: TimelineId,
1169 : req: &PagestreamGetSlruSegmentRequest,
1170 : ctx: &RequestContext,
1171 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1172 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1173 :
1174 : let _timer = timeline
1175 : .query_metrics
1176 : .start_timer(metrics::SmgrQueryType::GetSlruSegment);
1177 :
1178 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1179 : let lsn =
1180 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
1181 : .await?;
1182 :
1183 : let kind = SlruKind::from_repr(req.kind)
1184 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1185 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1186 :
1187 : Ok(PagestreamBeMessage::GetSlruSegment(
1188 : PagestreamGetSlruSegmentResponse { segment },
1189 : ))
1190 : }
1191 :
1192 : #[allow(clippy::too_many_arguments)]
1193 1286 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1194 : async fn handle_basebackup_request<IO>(
1195 : &mut self,
1196 : pgb: &mut PostgresBackend<IO>,
1197 : tenant_id: TenantId,
1198 : timeline_id: TimelineId,
1199 : lsn: Option<Lsn>,
1200 : prev_lsn: Option<Lsn>,
1201 : full_backup: bool,
1202 : gzip: bool,
1203 : ctx: RequestContext,
1204 : ) -> Result<(), QueryError>
1205 : where
1206 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1207 : {
1208 : let started = std::time::Instant::now();
1209 :
1210 : // check that the timeline exists
1211 : let timeline = self
1212 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1213 : .await?;
1214 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1215 : if let Some(lsn) = lsn {
1216 : // Backup was requested at a particular LSN. Wait for it to arrive.
1217 228 : info!("waiting for {}", lsn);
1218 : timeline.wait_lsn(lsn, &ctx).await?;
1219 : timeline
1220 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1221 : .context("invalid basebackup lsn")?;
1222 : }
1223 :
1224 : let lsn_awaited_after = started.elapsed();
1225 :
1226 : // switch client to COPYOUT
1227 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
1228 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1229 :
1230 : // Send a tarball of the latest layer on the timeline. Compress if not
1231 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1232 : if full_backup {
1233 : let mut writer = pgb.copyout_writer();
1234 : basebackup::send_basebackup_tarball(
1235 : &mut writer,
1236 : &timeline,
1237 : lsn,
1238 : prev_lsn,
1239 : full_backup,
1240 : &ctx,
1241 : )
1242 : .await?;
1243 : } else {
1244 : let mut writer = pgb.copyout_writer();
1245 : if gzip {
1246 : let mut encoder = GzipEncoder::with_quality(
1247 : writer,
1248 : // NOTE using fast compression because it's on the critical path
1249 : // for compute startup. For an empty database, we get
1250 : // <100KB with this method. The Level::Best compression method
1251 : // gives us <20KB, but maybe we should add basebackup caching
1252 : // on compute shutdown first.
1253 : async_compression::Level::Fastest,
1254 : );
1255 : basebackup::send_basebackup_tarball(
1256 : &mut encoder,
1257 : &timeline,
1258 : lsn,
1259 : prev_lsn,
1260 : full_backup,
1261 : &ctx,
1262 : )
1263 : .await?;
1264 : // shutdown the encoder to ensure the gzip footer is written
1265 : encoder.shutdown().await?;
1266 : } else {
1267 : basebackup::send_basebackup_tarball(
1268 : &mut writer,
1269 : &timeline,
1270 : lsn,
1271 : prev_lsn,
1272 : full_backup,
1273 : &ctx,
1274 : )
1275 : .await?;
1276 : }
1277 : }
1278 :
1279 : pgb.write_message_noflush(&BeMessage::CopyDone)?;
1280 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1281 :
1282 : let basebackup_after = started
1283 : .elapsed()
1284 : .checked_sub(lsn_awaited_after)
1285 : .unwrap_or(Duration::ZERO);
1286 :
1287 595 : info!(
1288 595 : lsn_await_millis = lsn_awaited_after.as_millis(),
1289 595 : basebackup_millis = basebackup_after.as_millis(),
1290 595 : "basebackup complete"
1291 595 : );
1292 :
1293 : Ok(())
1294 : }
1295 :
1296 : // when accessing management api supply None as an argument
1297 : // when using to authorize tenant pass corresponding tenant id
1298 10547 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1299 10547 : if self.auth.is_none() {
1300 : // auth is set to Trust, nothing to check so just return ok
1301 10464 : return Ok(());
1302 83 : }
1303 83 : // auth is some, just checked above, when auth is some
1304 83 : // then claims are always present because of checks during connection init
1305 83 : // so this expect won't trigger
1306 83 : let claims = self
1307 83 : .claims
1308 83 : .as_ref()
1309 83 : .expect("claims presence already checked");
1310 83 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1311 10547 : }
1312 :
1313 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1314 10498 : async fn get_active_tenant_timeline(
1315 10498 : &self,
1316 10498 : tenant_id: TenantId,
1317 10498 : timeline_id: TimelineId,
1318 10498 : selector: ShardSelector,
1319 10498 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1320 10498 : let tenant = get_active_tenant_with_timeout(
1321 10498 : tenant_id,
1322 10498 : selector,
1323 10498 : ACTIVE_TENANT_TIMEOUT,
1324 10498 : &task_mgr::shutdown_token(),
1325 10498 : )
1326 3 : .await
1327 10498 : .map_err(GetActiveTimelineError::Tenant)?;
1328 10498 : let timeline = tenant.get_timeline(timeline_id, true)?;
1329 10492 : set_tracing_field_shard_id(&timeline);
1330 10492 : Ok(timeline)
1331 10498 : }
1332 : }
1333 :
1334 : #[async_trait::async_trait]
1335 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1336 : where
1337 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1338 : {
1339 85 : fn check_auth_jwt(
1340 85 : &mut self,
1341 85 : _pgb: &mut PostgresBackend<IO>,
1342 85 : jwt_response: &[u8],
1343 85 : ) -> Result<(), QueryError> {
1344 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1345 : // which requires auth to be present
1346 85 : let data = self
1347 85 : .auth
1348 85 : .as_ref()
1349 85 : .unwrap()
1350 85 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1351 85 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1352 :
1353 85 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1354 0 : return Err(QueryError::Unauthorized(
1355 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1356 0 : ));
1357 85 : }
1358 85 :
1359 85 : debug!(
1360 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1361 0 : data.claims.scope, data.claims.tenant_id,
1362 0 : );
1363 :
1364 85 : self.claims = Some(data.claims);
1365 85 : Ok(())
1366 85 : }
1367 :
1368 10565 : fn startup(
1369 10565 : &mut self,
1370 10565 : _pgb: &mut PostgresBackend<IO>,
1371 10565 : _sm: &FeStartupPacket,
1372 10565 : ) -> Result<(), QueryError> {
1373 10565 : Ok(())
1374 10565 : }
1375 :
1376 10580 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1377 : async fn process_query(
1378 : &mut self,
1379 : pgb: &mut PostgresBackend<IO>,
1380 : query_string: &str,
1381 10580 : ) -> Result<(), QueryError> {
1382 10580 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1383 13 : info!("Hit failpoint for bad connection");
1384 13 : Err(QueryError::SimulatedConnectionError)
1385 10580 : });
1386 :
1387 10567 : let ctx = self.connection_ctx.attached_child();
1388 10567 : debug!("process query {query_string:?}");
1389 10567 : if query_string.starts_with("pagestream ") {
1390 9874 : let (_, params_raw) = query_string.split_at("pagestream ".len());
1391 9874 : let params = params_raw.split(' ').collect::<Vec<_>>();
1392 9874 : if params.len() != 2 {
1393 0 : return Err(QueryError::Other(anyhow::anyhow!(
1394 0 : "invalid param number for pagestream command"
1395 0 : )));
1396 9874 : }
1397 9874 : let tenant_id = TenantId::from_str(params[0])
1398 9874 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1399 9874 : let timeline_id = TimelineId::from_str(params[1])
1400 9874 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1401 :
1402 9874 : tracing::Span::current()
1403 9874 : .record("tenant_id", field::display(tenant_id))
1404 9874 : .record("timeline_id", field::display(timeline_id));
1405 9874 :
1406 9874 : self.check_permission(Some(tenant_id))?;
1407 :
1408 9874 : self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
1409 5937354 : .await?;
1410 693 : } else if query_string.starts_with("basebackup ") {
1411 628 : let (_, params_raw) = query_string.split_at("basebackup ".len());
1412 628 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1413 628 :
1414 628 : if params.len() < 2 {
1415 0 : return Err(QueryError::Other(anyhow::anyhow!(
1416 0 : "invalid param number for basebackup command"
1417 0 : )));
1418 628 : }
1419 :
1420 628 : let tenant_id = TenantId::from_str(params[0])
1421 628 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1422 628 : let timeline_id = TimelineId::from_str(params[1])
1423 628 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1424 :
1425 628 : tracing::Span::current()
1426 628 : .record("tenant_id", field::display(tenant_id))
1427 628 : .record("timeline_id", field::display(timeline_id));
1428 628 :
1429 628 : self.check_permission(Some(tenant_id))?;
1430 :
1431 628 : let lsn = if params.len() >= 3 {
1432 : Some(
1433 213 : Lsn::from_str(params[2])
1434 213 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1435 : )
1436 : } else {
1437 415 : None
1438 : };
1439 :
1440 628 : let gzip = if params.len() >= 4 {
1441 206 : if params[3] == "--gzip" {
1442 206 : true
1443 : } else {
1444 0 : return Err(QueryError::Other(anyhow::anyhow!(
1445 0 : "Parameter in position 3 unknown {}",
1446 0 : params[3],
1447 0 : )));
1448 : }
1449 : } else {
1450 422 : false
1451 : };
1452 :
1453 628 : ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
1454 628 : &*metrics::BASEBACKUP_QUERY_TIME,
1455 628 : async move {
1456 628 : self.handle_basebackup_request(
1457 628 : pgb,
1458 628 : tenant_id,
1459 628 : timeline_id,
1460 628 : lsn,
1461 628 : None,
1462 628 : false,
1463 628 : gzip,
1464 628 : ctx,
1465 628 : )
1466 46549 : .await?;
1467 580 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1468 580 : Result::<(), QueryError>::Ok(())
1469 628 : },
1470 628 : )
1471 46549 : .await?;
1472 : }
1473 : // return pair of prev_lsn and last_lsn
1474 65 : else if query_string.starts_with("get_last_record_rlsn ") {
1475 11 : let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
1476 11 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1477 11 :
1478 11 : if params.len() != 2 {
1479 0 : return Err(QueryError::Other(anyhow::anyhow!(
1480 0 : "invalid param number for get_last_record_rlsn command"
1481 0 : )));
1482 11 : }
1483 :
1484 11 : let tenant_id = TenantId::from_str(params[0])
1485 11 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1486 11 : let timeline_id = TimelineId::from_str(params[1])
1487 11 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1488 :
1489 11 : tracing::Span::current()
1490 11 : .record("tenant_id", field::display(tenant_id))
1491 11 : .record("timeline_id", field::display(timeline_id));
1492 11 :
1493 11 : self.check_permission(Some(tenant_id))?;
1494 9 : async {
1495 9 : let timeline = self
1496 9 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1497 0 : .await?;
1498 :
1499 9 : let end_of_timeline = timeline.get_last_record_rlsn();
1500 9 :
1501 9 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1502 9 : RowDescriptor::text_col(b"prev_lsn"),
1503 9 : RowDescriptor::text_col(b"last_lsn"),
1504 9 : ]))?
1505 9 : .write_message_noflush(&BeMessage::DataRow(&[
1506 9 : Some(end_of_timeline.prev.to_string().as_bytes()),
1507 9 : Some(end_of_timeline.last.to_string().as_bytes()),
1508 9 : ]))?
1509 9 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1510 9 : anyhow::Ok(())
1511 9 : }
1512 9 : .instrument(info_span!(
1513 9 : "handle_get_last_record_lsn",
1514 9 : shard_id = tracing::field::Empty
1515 9 : ))
1516 0 : .await?;
1517 : }
1518 : // same as basebackup, but result includes relational data as well
1519 54 : else if query_string.starts_with("fullbackup ") {
1520 15 : let (_, params_raw) = query_string.split_at("fullbackup ".len());
1521 15 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1522 15 :
1523 15 : if params.len() < 2 {
1524 0 : return Err(QueryError::Other(anyhow::anyhow!(
1525 0 : "invalid param number for fullbackup command"
1526 0 : )));
1527 15 : }
1528 :
1529 15 : let tenant_id = TenantId::from_str(params[0])
1530 15 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1531 15 : let timeline_id = TimelineId::from_str(params[1])
1532 15 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1533 :
1534 15 : tracing::Span::current()
1535 15 : .record("tenant_id", field::display(tenant_id))
1536 15 : .record("timeline_id", field::display(timeline_id));
1537 :
1538 : // The caller is responsible for providing correct lsn and prev_lsn.
1539 15 : let lsn = if params.len() > 2 {
1540 : Some(
1541 15 : Lsn::from_str(params[2])
1542 15 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1543 : )
1544 : } else {
1545 0 : None
1546 : };
1547 15 : let prev_lsn = if params.len() > 3 {
1548 : Some(
1549 12 : Lsn::from_str(params[3])
1550 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
1551 : )
1552 : } else {
1553 3 : None
1554 : };
1555 :
1556 15 : self.check_permission(Some(tenant_id))?;
1557 :
1558 : // Check that the timeline exists
1559 15 : self.handle_basebackup_request(
1560 15 : pgb,
1561 15 : tenant_id,
1562 15 : timeline_id,
1563 15 : lsn,
1564 15 : prev_lsn,
1565 15 : true,
1566 15 : false,
1567 15 : ctx,
1568 15 : )
1569 7864 : .await?;
1570 15 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1571 39 : } else if query_string.starts_with("import basebackup ") {
1572 : // Import the `base` section (everything but the wal) of a basebackup.
1573 : // Assumes the tenant already exists on this pageserver.
1574 : //
1575 : // Files are scheduled to be persisted to remote storage, and the
1576 : // caller should poll the http api to check when that is done.
1577 : //
1578 : // Example import command:
1579 : // 1. Get start/end LSN from backup_manifest file
1580 : // 2. Run:
1581 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1582 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1583 12 : let (_, params_raw) = query_string.split_at("import basebackup ".len());
1584 12 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1585 12 : if params.len() != 5 {
1586 0 : return Err(QueryError::Other(anyhow::anyhow!(
1587 0 : "invalid param number for import basebackup command"
1588 0 : )));
1589 12 : }
1590 12 : let tenant_id = TenantId::from_str(params[0])
1591 12 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1592 12 : let timeline_id = TimelineId::from_str(params[1])
1593 12 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1594 12 : let base_lsn = Lsn::from_str(params[2])
1595 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1596 12 : let end_lsn = Lsn::from_str(params[3])
1597 12 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1598 12 : let pg_version = u32::from_str(params[4])
1599 12 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1600 :
1601 12 : tracing::Span::current()
1602 12 : .record("tenant_id", field::display(tenant_id))
1603 12 : .record("timeline_id", field::display(timeline_id));
1604 12 :
1605 12 : self.check_permission(Some(tenant_id))?;
1606 :
1607 12 : match self
1608 12 : .handle_import_basebackup(
1609 12 : pgb,
1610 12 : tenant_id,
1611 12 : timeline_id,
1612 12 : base_lsn,
1613 12 : end_lsn,
1614 12 : pg_version,
1615 12 : ctx,
1616 12 : )
1617 6394 : .await
1618 : {
1619 9 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1620 3 : Err(e) => {
1621 3 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1622 3 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1623 3 : &e.to_string(),
1624 3 : Some(e.pg_error_code()),
1625 3 : ))?
1626 : }
1627 : };
1628 27 : } else if query_string.starts_with("import wal ") {
1629 : // Import the `pg_wal` section of a basebackup.
1630 : //
1631 : // Files are scheduled to be persisted to remote storage, and the
1632 : // caller should poll the http api to check when that is done.
1633 2 : let (_, params_raw) = query_string.split_at("import wal ".len());
1634 2 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1635 2 : if params.len() != 4 {
1636 0 : return Err(QueryError::Other(anyhow::anyhow!(
1637 0 : "invalid param number for import wal command"
1638 0 : )));
1639 2 : }
1640 2 : let tenant_id = TenantId::from_str(params[0])
1641 2 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1642 2 : let timeline_id = TimelineId::from_str(params[1])
1643 2 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1644 2 : let start_lsn = Lsn::from_str(params[2])
1645 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1646 2 : let end_lsn = Lsn::from_str(params[3])
1647 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1648 :
1649 2 : tracing::Span::current()
1650 2 : .record("tenant_id", field::display(tenant_id))
1651 2 : .record("timeline_id", field::display(timeline_id));
1652 2 :
1653 2 : self.check_permission(Some(tenant_id))?;
1654 :
1655 2 : match self
1656 2 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1657 2265 : .await
1658 : {
1659 2 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1660 0 : Err(e) => {
1661 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1662 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1663 0 : &e.to_string(),
1664 0 : Some(e.pg_error_code()),
1665 0 : ))?
1666 : }
1667 : };
1668 25 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1669 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1670 : // on connect
1671 20 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1672 5 : } else if query_string.starts_with("show ") {
1673 : // show <tenant_id>
1674 5 : let (_, params_raw) = query_string.split_at("show ".len());
1675 5 : let params = params_raw.split(' ').collect::<Vec<_>>();
1676 5 : if params.len() != 1 {
1677 0 : return Err(QueryError::Other(anyhow::anyhow!(
1678 0 : "invalid param number for config command"
1679 0 : )));
1680 5 : }
1681 5 : let tenant_id = TenantId::from_str(params[0])
1682 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1683 :
1684 5 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1685 5 :
1686 5 : self.check_permission(Some(tenant_id))?;
1687 :
1688 5 : let tenant = get_active_tenant_with_timeout(
1689 5 : tenant_id,
1690 5 : ShardSelector::Zero,
1691 5 : ACTIVE_TENANT_TIMEOUT,
1692 5 : &task_mgr::shutdown_token(),
1693 5 : )
1694 0 : .await?;
1695 5 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1696 5 : RowDescriptor::int8_col(b"checkpoint_distance"),
1697 5 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1698 5 : RowDescriptor::int8_col(b"compaction_target_size"),
1699 5 : RowDescriptor::int8_col(b"compaction_period"),
1700 5 : RowDescriptor::int8_col(b"compaction_threshold"),
1701 5 : RowDescriptor::int8_col(b"gc_horizon"),
1702 5 : RowDescriptor::int8_col(b"gc_period"),
1703 5 : RowDescriptor::int8_col(b"image_creation_threshold"),
1704 5 : RowDescriptor::int8_col(b"pitr_interval"),
1705 5 : ]))?
1706 5 : .write_message_noflush(&BeMessage::DataRow(&[
1707 5 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1708 5 : Some(
1709 5 : tenant
1710 5 : .get_checkpoint_timeout()
1711 5 : .as_secs()
1712 5 : .to_string()
1713 5 : .as_bytes(),
1714 5 : ),
1715 5 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1716 5 : Some(
1717 5 : tenant
1718 5 : .get_compaction_period()
1719 5 : .as_secs()
1720 5 : .to_string()
1721 5 : .as_bytes(),
1722 5 : ),
1723 5 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1724 5 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1725 5 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1726 5 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1727 5 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1728 5 : ]))?
1729 5 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1730 : } else {
1731 0 : return Err(QueryError::Other(anyhow::anyhow!(
1732 0 : "unknown command {query_string}"
1733 0 : )));
1734 : }
1735 :
1736 10161 : Ok(())
1737 21146 : }
1738 : }
1739 :
1740 : impl From<GetActiveTenantError> for QueryError {
1741 27 : fn from(e: GetActiveTenantError) -> Self {
1742 0 : match e {
1743 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1744 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1745 0 : ),
1746 : GetActiveTenantError::Cancelled
1747 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1748 0 : QueryError::Shutdown
1749 : }
1750 27 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1751 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1752 : }
1753 27 : }
1754 : }
1755 :
1756 0 : #[derive(Debug, thiserror::Error)]
1757 : enum GetActiveTimelineError {
1758 : #[error(transparent)]
1759 : Tenant(GetActiveTenantError),
1760 : #[error(transparent)]
1761 : Timeline(#[from] GetTimelineError),
1762 : }
1763 :
1764 : impl From<GetActiveTimelineError> for QueryError {
1765 6 : fn from(e: GetActiveTimelineError) -> Self {
1766 0 : match e {
1767 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1768 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1769 6 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1770 : }
1771 6 : }
1772 : }
1773 :
1774 4442698 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1775 4442698 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1776 4442698 : tracing::Span::current().record(
1777 4442698 : "shard_id",
1778 4442698 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1779 4442698 : );
1780 4442698 : debug_assert_current_span_has_tenant_and_timeline_id();
1781 4442698 : }
|