Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::stream::FuturesUnordered;
8 : use futures::StreamExt;
9 : use pageserver_api::key::Key;
10 : use pageserver_api::models::TenantState;
11 : use pageserver_api::models::{
12 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
13 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
14 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
15 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
16 : PagestreamNblocksResponse, PagestreamProtocolVersion,
17 : };
18 : use pageserver_api::shard::ShardIndex;
19 : use pageserver_api::shard::ShardNumber;
20 : use pageserver_api::shard::TenantShardId;
21 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
22 : use pq_proto::framed::ConnectionError;
23 : use pq_proto::FeStartupPacket;
24 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
25 : use std::borrow::Cow;
26 : use std::collections::HashMap;
27 : use std::io;
28 : use std::net::TcpListener;
29 : use std::str;
30 : use std::str::FromStr;
31 : use std::sync::Arc;
32 : use std::time::Duration;
33 : use std::time::Instant;
34 : use std::time::SystemTime;
35 : use tokio::io::AsyncWriteExt;
36 : use tokio::io::{AsyncRead, AsyncWrite};
37 : use tokio_util::sync::CancellationToken;
38 : use tracing::*;
39 : use utils::sync::gate::GateGuard;
40 : use utils::{
41 : auth::{Claims, Scope, SwappableJwtAuth},
42 : id::{TenantId, TimelineId},
43 : lsn::Lsn,
44 : simple_rcu::RcuReadGuard,
45 : };
46 :
47 : use crate::auth::check_permission;
48 : use crate::basebackup;
49 : use crate::basebackup::BasebackupError;
50 : use crate::context::{DownloadBehavior, RequestContext};
51 : use crate::metrics;
52 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
53 : use crate::pgdatadir_mapping::Version;
54 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
55 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
56 : use crate::task_mgr;
57 : use crate::task_mgr::TaskKind;
58 : use crate::tenant::mgr::GetActiveTenantError;
59 : use crate::tenant::mgr::GetTenantError;
60 : use crate::tenant::mgr::ShardResolveResult;
61 : use crate::tenant::mgr::ShardSelector;
62 : use crate::tenant::mgr::TenantManager;
63 : use crate::tenant::timeline::WaitLsnError;
64 : use crate::tenant::GetTimelineError;
65 : use crate::tenant::PageReconstructError;
66 : use crate::tenant::Tenant;
67 : use crate::tenant::Timeline;
68 : use pageserver_api::key::rel_block_to_key;
69 : use pageserver_api::reltag::SlruKind;
70 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
71 : use postgres_ffi::BLCKSZ;
72 :
73 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
74 : // is not yet in state [`TenantState::Active`].
75 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
76 :
77 : ///////////////////////////////////////////////////////////////////////////////
78 :
79 : ///
80 : /// Main loop of the page service.
81 : ///
82 : /// Listens for connections, and launches a new handler task for each.
83 : ///
84 0 : pub async fn libpq_listener_main(
85 0 : tenant_manager: Arc<TenantManager>,
86 0 : auth: Option<Arc<SwappableJwtAuth>>,
87 0 : listener: TcpListener,
88 0 : auth_type: AuthType,
89 0 : listener_ctx: RequestContext,
90 0 : cancel: CancellationToken,
91 0 : ) -> anyhow::Result<()> {
92 0 : listener.set_nonblocking(true)?;
93 0 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
94 :
95 : // Wait for a new connection to arrive, or for server shutdown.
96 0 : while let Some(res) = tokio::select! {
97 : biased;
98 :
99 : _ = cancel.cancelled() => {
100 : // We were requested to shut down.
101 : None
102 : }
103 :
104 : res = tokio_listener.accept() => {
105 : Some(res)
106 : }
107 : } {
108 0 : match res {
109 0 : Ok((socket, peer_addr)) => {
110 0 : // Connection established. Spawn a new task to handle it.
111 0 : debug!("accepted connection from {}", peer_addr);
112 0 : let local_auth = auth.clone();
113 0 :
114 0 : let connection_ctx = listener_ctx
115 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
116 0 :
117 0 : // PageRequestHandler tasks are not associated with any particular
118 0 : // timeline in the task manager. In practice most connections will
119 0 : // only deal with a particular timeline, but we don't know which one
120 0 : // yet.
121 0 : task_mgr::spawn(
122 0 : &tokio::runtime::Handle::current(),
123 0 : TaskKind::PageRequestHandler,
124 0 : None,
125 0 : None,
126 0 : "serving compute connection task",
127 0 : false,
128 0 : page_service_conn_main(
129 0 : tenant_manager.clone(),
130 0 : local_auth,
131 0 : socket,
132 0 : auth_type,
133 0 : connection_ctx,
134 0 : ),
135 0 : );
136 : }
137 0 : Err(err) => {
138 0 : // accept() failed. Log the error, and loop back to retry on next connection.
139 0 : error!("accept() failed: {:?}", err);
140 : }
141 : }
142 : }
143 :
144 0 : debug!("page_service loop terminated");
145 :
146 0 : Ok(())
147 0 : }
148 :
149 0 : #[instrument(skip_all, fields(peer_addr))]
150 : async fn page_service_conn_main(
151 : tenant_manager: Arc<TenantManager>,
152 : auth: Option<Arc<SwappableJwtAuth>>,
153 : socket: tokio::net::TcpStream,
154 : auth_type: AuthType,
155 : connection_ctx: RequestContext,
156 : ) -> anyhow::Result<()> {
157 : let _guard = LIVE_CONNECTIONS
158 : .with_label_values(&["page_service"])
159 : .guard();
160 :
161 : socket
162 : .set_nodelay(true)
163 : .context("could not set TCP_NODELAY")?;
164 :
165 : let peer_addr = socket.peer_addr().context("get peer address")?;
166 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
167 :
168 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
169 : // - long enough for most valid compute connections
170 : // - less than infinite to stop us from "leaking" connections to long-gone computes
171 : //
172 : // no write timeout is used, because the kernel is assumed to error writes after some time.
173 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
174 :
175 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
176 0 : let socket_timeout_ms = (|| {
177 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
178 : // Exponential distribution for simulating
179 : // poor network conditions, expect about avg_timeout_ms to be around 15
180 : // in tests
181 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
182 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
183 0 : let u = rand::random::<f32>();
184 0 : ((1.0 - u).ln() / (-avg)) as u64
185 : } else {
186 0 : default_timeout_ms
187 : }
188 0 : });
189 0 : default_timeout_ms
190 : })();
191 :
192 : // A timeout here does not mean the client died, it can happen if it's just idle for
193 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
194 : // they reconnect.
195 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
196 : let socket = std::pin::pin!(socket);
197 :
198 : fail::fail_point!("ps::connection-start::pre-login");
199 :
200 : // XXX: pgbackend.run() should take the connection_ctx,
201 : // and create a child per-query context when it invokes process_query.
202 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
203 : // and create the per-query context in process_query ourselves.
204 : let mut conn_handler = PageServerHandler::new(tenant_manager, auth, connection_ctx);
205 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
206 :
207 : match pgbackend
208 : .run(&mut conn_handler, &task_mgr::shutdown_token())
209 : .await
210 : {
211 : Ok(()) => {
212 : // we've been requested to shut down
213 : Ok(())
214 : }
215 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
216 : if is_expected_io_error(&io_error) {
217 : info!("Postgres client disconnected ({io_error})");
218 : Ok(())
219 : } else {
220 : Err(io_error).context("Postgres connection error")
221 : }
222 : }
223 : other => other.context("Postgres query error"),
224 : }
225 : }
226 :
227 : /// While a handler holds a reference to a Timeline, it also holds a the
228 : /// timeline's Gate open.
229 : struct HandlerTimeline {
230 : timeline: Arc<Timeline>,
231 : _guard: GateGuard,
232 : }
233 :
234 : struct PageServerHandler {
235 : auth: Option<Arc<SwappableJwtAuth>>,
236 : claims: Option<Claims>,
237 :
238 : tenant_manager: Arc<TenantManager>,
239 :
240 : /// The context created for the lifetime of the connection
241 : /// services by this PageServerHandler.
242 : /// For each query received over the connection,
243 : /// `process_query` creates a child context from this one.
244 : connection_ctx: RequestContext,
245 :
246 : /// See [`Self::cache_timeline`] for usage.
247 : ///
248 : /// Note on size: the typical size of this map is 1. The largest size we expect
249 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
250 : /// or the ratio used when splitting shards (i.e. how many children created from one)
251 : /// parent shard, where a "large" number might be ~8.
252 : shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
253 : }
254 :
255 0 : #[derive(thiserror::Error, Debug)]
256 : enum PageStreamError {
257 : /// We encountered an error that should prompt the client to reconnect:
258 : /// in practice this means we drop the connection without sending a response.
259 : #[error("Reconnect required: {0}")]
260 : Reconnect(Cow<'static, str>),
261 :
262 : /// We were instructed to shutdown while processing the query
263 : #[error("Shutting down")]
264 : Shutdown,
265 :
266 : /// Something went wrong reading a page: this likely indicates a pageserver bug
267 : #[error("Read error")]
268 : Read(#[source] PageReconstructError),
269 :
270 : /// Ran out of time waiting for an LSN
271 : #[error("LSN timeout: {0}")]
272 : LsnTimeout(WaitLsnError),
273 :
274 : /// The entity required to serve the request (tenant or timeline) is not found,
275 : /// or is not found in a suitable state to serve a request.
276 : #[error("Not found: {0}")]
277 : NotFound(Cow<'static, str>),
278 :
279 : /// Request asked for something that doesn't make sense, like an invalid LSN
280 : #[error("Bad request: {0}")]
281 : BadRequest(Cow<'static, str>),
282 : }
283 :
284 : impl From<PageReconstructError> for PageStreamError {
285 0 : fn from(value: PageReconstructError) -> Self {
286 0 : match value {
287 0 : PageReconstructError::Cancelled => Self::Shutdown,
288 0 : e => Self::Read(e),
289 : }
290 0 : }
291 : }
292 :
293 : impl From<GetActiveTimelineError> for PageStreamError {
294 0 : fn from(value: GetActiveTimelineError) -> Self {
295 0 : match value {
296 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
297 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
298 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
299 : }
300 0 : }
301 : }
302 :
303 : impl From<WaitLsnError> for PageStreamError {
304 0 : fn from(value: WaitLsnError) -> Self {
305 0 : match value {
306 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
307 0 : WaitLsnError::Shutdown => Self::Shutdown,
308 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
309 : }
310 0 : }
311 : }
312 :
313 : impl From<WaitLsnError> for QueryError {
314 0 : fn from(value: WaitLsnError) -> Self {
315 0 : match value {
316 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
317 0 : WaitLsnError::Shutdown => Self::Shutdown,
318 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
319 : }
320 0 : }
321 : }
322 :
323 : impl PageServerHandler {
324 0 : pub fn new(
325 0 : tenant_manager: Arc<TenantManager>,
326 0 : auth: Option<Arc<SwappableJwtAuth>>,
327 0 : connection_ctx: RequestContext,
328 0 : ) -> Self {
329 0 : PageServerHandler {
330 0 : tenant_manager,
331 0 : auth,
332 0 : claims: None,
333 0 : connection_ctx,
334 0 : shard_timelines: HashMap::new(),
335 0 : }
336 0 : }
337 :
338 : /// Future that completes when we need to shut down the connection.
339 : ///
340 : /// We currently need to shut down when any of the following happens:
341 : /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
342 : /// 2. task_mgr requests shutdown of the connection
343 : ///
344 : /// NB on (1): the connection's lifecycle is not actually tied to any of the
345 : /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
346 : /// implementation to be responsive to timeline cancellation because
347 : /// the connection holds their `GateGuards` open (sored in `shard_timelines`).
348 : /// We currently do the easy thing and terminate the connection if any of the
349 : /// shard_timelines gets cancelled. But really, we cuold spend more effort
350 : /// and simply remove the cancelled timeline from the `shard_timelines`, thereby
351 : /// dropping the guard.
352 : ///
353 : /// NB: keep in sync with [`Self::is_connection_cancelled`]
354 0 : async fn await_connection_cancelled(&self) {
355 0 : // A short wait before we expend the cycles to walk our timeline map. This avoids incurring
356 0 : // that cost every time we check for cancellation.
357 0 : tokio::time::sleep(Duration::from_millis(10)).await;
358 :
359 : // This function is never called concurrently with code that adds timelines to shard_timelines,
360 : // which is enforced by the borrow checker (the future returned by this function carries the
361 : // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
362 : // missing any inserts to the map.
363 :
364 0 : let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
365 0 : use futures::future::Either;
366 0 : cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
367 0 : cancellation_sources.extend(
368 0 : self.shard_timelines
369 0 : .values()
370 0 : .map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
371 0 : );
372 0 : FuturesUnordered::from_iter(cancellation_sources)
373 0 : .next()
374 0 : .await;
375 0 : }
376 :
377 : /// Checking variant of [`Self::await_connection_cancelled`].
378 0 : fn is_connection_cancelled(&self) -> bool {
379 0 : task_mgr::is_shutdown_requested()
380 0 : || self
381 0 : .shard_timelines
382 0 : .values()
383 0 : .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
384 0 : }
385 :
386 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
387 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
388 : /// cancellation if there aren't any timelines in the cache.
389 : ///
390 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
391 : /// timeline cancellation token.
392 0 : async fn flush_cancellable<IO>(
393 0 : &self,
394 0 : pgb: &mut PostgresBackend<IO>,
395 0 : cancel: &CancellationToken,
396 0 : ) -> Result<(), QueryError>
397 0 : where
398 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
399 0 : {
400 : tokio::select!(
401 : flush_r = pgb.flush() => {
402 : Ok(flush_r?)
403 : },
404 : _ = self.await_connection_cancelled() => {
405 : Err(QueryError::Shutdown)
406 : }
407 : _ = cancel.cancelled() => {
408 : Err(QueryError::Shutdown)
409 : }
410 : )
411 0 : }
412 :
413 0 : #[instrument(skip_all)]
414 : async fn handle_pagerequests<IO>(
415 : &mut self,
416 : pgb: &mut PostgresBackend<IO>,
417 : tenant_id: TenantId,
418 : timeline_id: TimelineId,
419 : protocol_version: PagestreamProtocolVersion,
420 : ctx: RequestContext,
421 : ) -> Result<(), QueryError>
422 : where
423 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
424 : {
425 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
426 :
427 : let tenant = self
428 : .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT)
429 : .await?;
430 :
431 : // switch client to COPYBOTH
432 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
433 : self.flush_cancellable(pgb, &tenant.cancel).await?;
434 :
435 : loop {
436 : let msg = tokio::select! {
437 : biased;
438 :
439 : _ = self.await_connection_cancelled() => {
440 : // We were requested to shut down.
441 : info!("shutdown request received in page handler");
442 : return Err(QueryError::Shutdown)
443 : }
444 :
445 : msg = pgb.read_message() => { msg }
446 : };
447 :
448 : let copy_data_bytes = match msg? {
449 : Some(FeMessage::CopyData(bytes)) => bytes,
450 : Some(FeMessage::Terminate) => break,
451 : Some(m) => {
452 : return Err(QueryError::Other(anyhow::anyhow!(
453 : "unexpected message: {m:?} during COPY"
454 : )));
455 : }
456 : None => break, // client disconnected
457 : };
458 :
459 : trace!("query: {copy_data_bytes:?}");
460 : fail::fail_point!("ps::handle-pagerequest-message");
461 :
462 : let neon_fe_msg =
463 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
464 :
465 : // TODO: We could create a new per-request context here, with unique ID.
466 : // Currently we use the same per-timeline context for all requests
467 :
468 : let (response, span) = match neon_fe_msg {
469 : PagestreamFeMessage::Exists(req) => {
470 : fail::fail_point!("ps::handle-pagerequest-message::exists");
471 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
472 : (
473 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
474 : .instrument(span.clone())
475 : .await,
476 : span,
477 : )
478 : }
479 : PagestreamFeMessage::Nblocks(req) => {
480 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
481 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
482 : (
483 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
484 : .instrument(span.clone())
485 : .await,
486 : span,
487 : )
488 : }
489 : PagestreamFeMessage::GetPage(req) => {
490 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
491 : // shard_id is filled in by the handler
492 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
493 : (
494 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
495 : .instrument(span.clone())
496 : .await,
497 : span,
498 : )
499 : }
500 : PagestreamFeMessage::DbSize(req) => {
501 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
502 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
503 : (
504 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
505 : .instrument(span.clone())
506 : .await,
507 : span,
508 : )
509 : }
510 : PagestreamFeMessage::GetSlruSegment(req) => {
511 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
512 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
513 : (
514 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
515 : .instrument(span.clone())
516 : .await,
517 : span,
518 : )
519 : }
520 : };
521 :
522 : match response {
523 : Err(PageStreamError::Shutdown) => {
524 : // If we fail to fulfil a request during shutdown, which may be _because_ of
525 : // shutdown, then do not send the error to the client. Instead just drop the
526 : // connection.
527 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
528 : return Err(QueryError::Shutdown);
529 : }
530 : Err(PageStreamError::Reconnect(reason)) => {
531 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
532 : return Err(QueryError::Reconnect);
533 : }
534 : Err(e) if self.is_connection_cancelled() => {
535 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
536 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
537 : //
538 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
539 : // because wait_lsn etc will drop out
540 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
541 : // is_canceled(): [`Timeline::shutdown`]` has entered
542 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
543 : return Err(QueryError::Shutdown);
544 : }
545 : r => {
546 0 : let response_msg = r.unwrap_or_else(|e| {
547 0 : // print the all details to the log with {:#}, but for the client the
548 0 : // error message is enough. Do not log if shutting down, as the anyhow::Error
549 0 : // here includes cancellation which is not an error.
550 0 : let full = utils::error::report_compact_sources(&e);
551 0 : span.in_scope(|| {
552 0 : error!("error reading relation or page version: {full:#}")
553 0 : });
554 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
555 0 : message: e.to_string(),
556 0 : })
557 0 : });
558 :
559 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
560 : self.flush_cancellable(pgb, &tenant.cancel).await?;
561 : }
562 : }
563 : }
564 : Ok(())
565 : }
566 :
567 : /// Helper function to handle the LSN from client request.
568 : ///
569 : /// Each GetPage (and Exists and Nblocks) request includes information about
570 : /// which version of the page is being requested. The primary compute node
571 : /// will always request the latest page version, by setting 'request_lsn' to
572 : /// the last inserted or flushed WAL position, while a standby will request
573 : /// a version at the LSN that it's currently caught up to.
574 : ///
575 : /// In either case, if the page server hasn't received the WAL up to the
576 : /// requested LSN yet, we will wait for it to arrive. The return value is
577 : /// the LSN that should be used to look up the page versions.
578 : ///
579 : /// In addition to the request LSN, each request carries another LSN,
580 : /// 'not_modified_since', which is a hint to the pageserver that the client
581 : /// knows that the page has not been modified between 'not_modified_since'
582 : /// and the request LSN. This allows skipping the wait, as long as the WAL
583 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
584 : /// information about when the page was modified, it will use
585 : /// not_modified_since == lsn. If the client lies and sends a too low
586 : /// not_modified_hint such that there are in fact later page versions, the
587 : /// behavior is undefined: the pageserver may return any of the page versions
588 : /// or an error.
589 0 : async fn wait_or_get_last_lsn(
590 0 : timeline: &Timeline,
591 0 : request_lsn: Lsn,
592 0 : not_modified_since: Lsn,
593 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
594 0 : ctx: &RequestContext,
595 0 : ) -> Result<Lsn, PageStreamError> {
596 0 : let last_record_lsn = timeline.get_last_record_lsn();
597 0 :
598 0 : // Sanity check the request
599 0 : if request_lsn < not_modified_since {
600 0 : return Err(PageStreamError::BadRequest(
601 0 : format!(
602 0 : "invalid request with request LSN {} and not_modified_since {}",
603 0 : request_lsn, not_modified_since,
604 0 : )
605 0 : .into(),
606 0 : ));
607 0 : }
608 0 :
609 0 : if request_lsn < **latest_gc_cutoff_lsn {
610 : // Check explicitly for INVALID just to get a less scary error message if the
611 : // request is obviously bogus
612 0 : return Err(if request_lsn == Lsn::INVALID {
613 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
614 : } else {
615 0 : PageStreamError::BadRequest(format!(
616 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
617 0 : request_lsn, **latest_gc_cutoff_lsn
618 0 : ).into())
619 : });
620 0 : }
621 0 :
622 0 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
623 0 : if not_modified_since > last_record_lsn {
624 0 : timeline
625 0 : .wait_lsn(
626 0 : not_modified_since,
627 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
628 0 : ctx,
629 0 : )
630 0 : .await?;
631 : // Since we waited for 'not_modified_since' to arrive, that is now the last
632 : // record LSN. (Or close enough for our purposes; the last-record LSN can
633 : // advance immediately after we return anyway)
634 0 : Ok(not_modified_since)
635 : } else {
636 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
637 : // here instead. That would give the same result, since we know that there
638 : // haven't been any modifications since 'not_modified_since'. Using an older
639 : // LSN might be faster, because that could allow skipping recent layers when
640 : // finding the page. However, we have historically used 'last_record_lsn', so
641 : // stick to that for now.
642 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
643 : }
644 0 : }
645 :
646 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
647 : async fn handle_make_lsn_lease<IO>(
648 : &self,
649 : pgb: &mut PostgresBackend<IO>,
650 : tenant_shard_id: TenantShardId,
651 : timeline_id: TimelineId,
652 : lsn: Lsn,
653 : ctx: &RequestContext,
654 : ) -> Result<(), QueryError>
655 : where
656 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
657 : {
658 : let shard_selector = ShardSelector::Known(tenant_shard_id.to_index());
659 : let timeline = self
660 : .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
661 : .await?;
662 : let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
663 : let valid_until = lease
664 : .valid_until
665 : .duration_since(SystemTime::UNIX_EPOCH)
666 0 : .map_err(|e| QueryError::Other(e.into()))?;
667 :
668 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
669 : b"valid_until",
670 : )]))?
671 : .write_message_noflush(&BeMessage::DataRow(&[Some(
672 : &valid_until.as_millis().to_be_bytes(),
673 : )]))?
674 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
675 :
676 : Ok(())
677 : }
678 :
679 0 : #[instrument(skip_all, fields(shard_id))]
680 : async fn handle_get_rel_exists_request(
681 : &mut self,
682 : tenant_id: TenantId,
683 : timeline_id: TimelineId,
684 : req: &PagestreamExistsRequest,
685 : ctx: &RequestContext,
686 : ) -> Result<PagestreamBeMessage, PageStreamError> {
687 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
688 : let _timer = timeline
689 : .query_metrics
690 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
691 :
692 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
693 : let lsn = Self::wait_or_get_last_lsn(
694 : timeline,
695 : req.request_lsn,
696 : req.not_modified_since,
697 : &latest_gc_cutoff_lsn,
698 : ctx,
699 : )
700 : .await?;
701 :
702 : let exists = timeline
703 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
704 : .await?;
705 :
706 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
707 : exists,
708 : }))
709 : }
710 :
711 0 : #[instrument(skip_all, fields(shard_id))]
712 : async fn handle_get_nblocks_request(
713 : &mut self,
714 : tenant_id: TenantId,
715 : timeline_id: TimelineId,
716 : req: &PagestreamNblocksRequest,
717 : ctx: &RequestContext,
718 : ) -> Result<PagestreamBeMessage, PageStreamError> {
719 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
720 :
721 : let _timer = timeline
722 : .query_metrics
723 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
724 :
725 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
726 : let lsn = Self::wait_or_get_last_lsn(
727 : timeline,
728 : req.request_lsn,
729 : req.not_modified_since,
730 : &latest_gc_cutoff_lsn,
731 : ctx,
732 : )
733 : .await?;
734 :
735 : let n_blocks = timeline
736 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
737 : .await?;
738 :
739 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
740 : n_blocks,
741 : }))
742 : }
743 :
744 0 : #[instrument(skip_all, fields(shard_id))]
745 : async fn handle_db_size_request(
746 : &mut self,
747 : tenant_id: TenantId,
748 : timeline_id: TimelineId,
749 : req: &PagestreamDbSizeRequest,
750 : ctx: &RequestContext,
751 : ) -> Result<PagestreamBeMessage, PageStreamError> {
752 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
753 :
754 : let _timer = timeline
755 : .query_metrics
756 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
757 :
758 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
759 : let lsn = Self::wait_or_get_last_lsn(
760 : timeline,
761 : req.request_lsn,
762 : req.not_modified_since,
763 : &latest_gc_cutoff_lsn,
764 : ctx,
765 : )
766 : .await?;
767 :
768 : let total_blocks = timeline
769 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
770 : .await?;
771 : let db_size = total_blocks as i64 * BLCKSZ as i64;
772 :
773 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
774 : db_size,
775 : }))
776 : }
777 :
778 : /// For most getpage requests, we will already have a Timeline to serve the request: this function
779 : /// looks up such a Timeline synchronously and without touching any global state.
780 0 : fn get_cached_timeline_for_page(
781 0 : &mut self,
782 0 : req: &PagestreamGetPageRequest,
783 0 : ) -> Result<&Arc<Timeline>, Key> {
784 0 : let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
785 : // Fastest path: single sharded case
786 0 : if first_idx.shard_count.count() == 1 {
787 0 : return Ok(&first_timeline.timeline);
788 0 : }
789 0 :
790 0 : let key = rel_block_to_key(req.rel, req.blkno);
791 0 : let shard_num = first_timeline
792 0 : .timeline
793 0 : .get_shard_identity()
794 0 : .get_shard_number(&key);
795 0 :
796 0 : // Fast path: matched the first timeline in our local handler map. This case is common if
797 0 : // only one shard per tenant is attached to this pageserver.
798 0 : if first_timeline.timeline.get_shard_identity().number == shard_num {
799 0 : return Ok(&first_timeline.timeline);
800 0 : }
801 0 :
802 0 : let shard_index = ShardIndex {
803 0 : shard_number: shard_num,
804 0 : shard_count: first_timeline.timeline.get_shard_identity().count,
805 0 : };
806 :
807 : // Fast-ish path: timeline is in the connection handler's local cache
808 0 : if let Some(found) = self.shard_timelines.get(&shard_index) {
809 0 : return Ok(&found.timeline);
810 0 : }
811 0 :
812 0 : key
813 : } else {
814 0 : rel_block_to_key(req.rel, req.blkno)
815 : };
816 :
817 0 : Err(key)
818 0 : }
819 :
820 : /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
821 : /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
822 : /// again.
823 : ///
824 : /// Note that all the Timelines in this cache are for the same timeline_id: they're differ
825 : /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
826 : /// based on key.
827 : ///
828 : /// The typical size of this cache is 1, as we generally create shards to distribute work
829 : /// across pageservers, so don't tend to have multiple shards for the same tenant on the
830 : /// same pageserver.
831 0 : fn cache_timeline(
832 0 : &mut self,
833 0 : timeline: Arc<Timeline>,
834 0 : ) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
835 0 : let gate_guard = timeline
836 0 : .gate
837 0 : .enter()
838 0 : .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
839 :
840 0 : let shard_index = timeline.tenant_shard_id.to_index();
841 0 : let entry = self
842 0 : .shard_timelines
843 0 : .entry(shard_index)
844 0 : .or_insert(HandlerTimeline {
845 0 : timeline,
846 0 : _guard: gate_guard,
847 0 : });
848 0 :
849 0 : Ok(&entry.timeline)
850 0 : }
851 :
852 : /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
853 : /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
854 : /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
855 0 : async fn load_timeline_for_page(
856 0 : &mut self,
857 0 : tenant_id: TenantId,
858 0 : timeline_id: TimelineId,
859 0 : key: Key,
860 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
861 : // Slow path: we must call out to the TenantManager to find the timeline for this Key
862 0 : let timeline = self
863 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
864 0 : .await?;
865 :
866 0 : self.cache_timeline(timeline)
867 0 : }
868 :
869 0 : async fn get_timeline_shard_zero(
870 0 : &mut self,
871 0 : tenant_id: TenantId,
872 0 : timeline_id: TimelineId,
873 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
874 : // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
875 : // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
876 : // ref to salf. So instead, we first build a bool, and then return while not borrowing self.
877 0 : let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
878 0 : idx.shard_number == ShardNumber(0)
879 : } else {
880 0 : false
881 : };
882 :
883 0 : if have_cached {
884 0 : let entry = self.shard_timelines.iter().next().unwrap();
885 0 : Ok(&entry.1.timeline)
886 : } else {
887 0 : let timeline = self
888 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
889 0 : .await?;
890 0 : Ok(self.cache_timeline(timeline)?)
891 : }
892 0 : }
893 :
894 0 : #[instrument(skip_all, fields(shard_id))]
895 : async fn handle_get_page_at_lsn_request(
896 : &mut self,
897 : tenant_id: TenantId,
898 : timeline_id: TimelineId,
899 : req: &PagestreamGetPageRequest,
900 : ctx: &RequestContext,
901 : ) -> Result<PagestreamBeMessage, PageStreamError> {
902 : let timeline = match self.get_cached_timeline_for_page(req) {
903 : Ok(tl) => {
904 : set_tracing_field_shard_id(tl);
905 : tl
906 : }
907 : Err(key) => {
908 : match self
909 : .load_timeline_for_page(tenant_id, timeline_id, key)
910 : .await
911 : {
912 : Ok(t) => t,
913 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
914 : // We already know this tenant exists in general, because we resolved it at
915 : // start of connection. Getting a NotFound here indicates that the shard containing
916 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
917 : // mapping is out of date.
918 : //
919 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
920 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
921 : // and talk to a different pageserver.
922 : return Err(PageStreamError::Reconnect(
923 : "getpage@lsn request routed to wrong shard".into(),
924 : ));
925 : }
926 : Err(e) => return Err(e.into()),
927 : }
928 : }
929 : };
930 :
931 : let _timer = timeline
932 : .query_metrics
933 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
934 :
935 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
936 : let lsn = Self::wait_or_get_last_lsn(
937 : timeline,
938 : req.request_lsn,
939 : req.not_modified_since,
940 : &latest_gc_cutoff_lsn,
941 : ctx,
942 : )
943 : .await?;
944 :
945 : let page = timeline
946 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
947 : .await?;
948 :
949 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
950 : page,
951 : }))
952 : }
953 :
954 0 : #[instrument(skip_all, fields(shard_id))]
955 : async fn handle_get_slru_segment_request(
956 : &mut self,
957 : tenant_id: TenantId,
958 : timeline_id: TimelineId,
959 : req: &PagestreamGetSlruSegmentRequest,
960 : ctx: &RequestContext,
961 : ) -> Result<PagestreamBeMessage, PageStreamError> {
962 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
963 :
964 : let _timer = timeline
965 : .query_metrics
966 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
967 :
968 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
969 : let lsn = Self::wait_or_get_last_lsn(
970 : timeline,
971 : req.request_lsn,
972 : req.not_modified_since,
973 : &latest_gc_cutoff_lsn,
974 : ctx,
975 : )
976 : .await?;
977 :
978 : let kind = SlruKind::from_repr(req.kind)
979 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
980 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
981 :
982 : Ok(PagestreamBeMessage::GetSlruSegment(
983 : PagestreamGetSlruSegmentResponse { segment },
984 : ))
985 : }
986 :
987 : /// Note on "fullbackup":
988 : /// Full basebackups should only be used for debugging purposes.
989 : /// Originally, it was introduced to enable breaking storage format changes,
990 : /// but that is not applicable anymore.
991 : #[allow(clippy::too_many_arguments)]
992 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
993 : async fn handle_basebackup_request<IO>(
994 : &mut self,
995 : pgb: &mut PostgresBackend<IO>,
996 : tenant_id: TenantId,
997 : timeline_id: TimelineId,
998 : lsn: Option<Lsn>,
999 : prev_lsn: Option<Lsn>,
1000 : full_backup: bool,
1001 : gzip: bool,
1002 : ctx: &RequestContext,
1003 : ) -> Result<(), QueryError>
1004 : where
1005 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1006 : {
1007 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1008 0 : match err {
1009 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1010 0 : BasebackupError::Server(e) => QueryError::Other(e),
1011 : }
1012 0 : }
1013 :
1014 : let started = std::time::Instant::now();
1015 :
1016 : // check that the timeline exists
1017 : let timeline = self
1018 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1019 : .await?;
1020 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1021 : if let Some(lsn) = lsn {
1022 : // Backup was requested at a particular LSN. Wait for it to arrive.
1023 : info!("waiting for {}", lsn);
1024 : timeline
1025 : .wait_lsn(
1026 : lsn,
1027 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1028 : ctx,
1029 : )
1030 : .await?;
1031 : timeline
1032 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1033 : .context("invalid basebackup lsn")?;
1034 : }
1035 :
1036 : let lsn_awaited_after = started.elapsed();
1037 :
1038 : // switch client to COPYOUT
1039 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1040 : .map_err(QueryError::Disconnected)?;
1041 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1042 :
1043 : // Send a tarball of the latest layer on the timeline. Compress if not
1044 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1045 : if full_backup {
1046 : let mut writer = pgb.copyout_writer();
1047 : basebackup::send_basebackup_tarball(
1048 : &mut writer,
1049 : &timeline,
1050 : lsn,
1051 : prev_lsn,
1052 : full_backup,
1053 : ctx,
1054 : )
1055 : .await
1056 : .map_err(map_basebackup_error)?;
1057 : } else {
1058 : let mut writer = pgb.copyout_writer();
1059 : if gzip {
1060 : let mut encoder = GzipEncoder::with_quality(
1061 : writer,
1062 : // NOTE using fast compression because it's on the critical path
1063 : // for compute startup. For an empty database, we get
1064 : // <100KB with this method. The Level::Best compression method
1065 : // gives us <20KB, but maybe we should add basebackup caching
1066 : // on compute shutdown first.
1067 : async_compression::Level::Fastest,
1068 : );
1069 : basebackup::send_basebackup_tarball(
1070 : &mut encoder,
1071 : &timeline,
1072 : lsn,
1073 : prev_lsn,
1074 : full_backup,
1075 : ctx,
1076 : )
1077 : .await
1078 : .map_err(map_basebackup_error)?;
1079 : // shutdown the encoder to ensure the gzip footer is written
1080 : encoder
1081 : .shutdown()
1082 : .await
1083 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1084 : } else {
1085 : basebackup::send_basebackup_tarball(
1086 : &mut writer,
1087 : &timeline,
1088 : lsn,
1089 : prev_lsn,
1090 : full_backup,
1091 : ctx,
1092 : )
1093 : .await
1094 : .map_err(map_basebackup_error)?;
1095 : }
1096 : }
1097 :
1098 : pgb.write_message_noflush(&BeMessage::CopyDone)
1099 : .map_err(QueryError::Disconnected)?;
1100 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1101 :
1102 : let basebackup_after = started
1103 : .elapsed()
1104 : .checked_sub(lsn_awaited_after)
1105 : .unwrap_or(Duration::ZERO);
1106 :
1107 : info!(
1108 : lsn_await_millis = lsn_awaited_after.as_millis(),
1109 : basebackup_millis = basebackup_after.as_millis(),
1110 : "basebackup complete"
1111 : );
1112 :
1113 : Ok(())
1114 : }
1115 :
1116 : // when accessing management api supply None as an argument
1117 : // when using to authorize tenant pass corresponding tenant id
1118 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1119 0 : if self.auth.is_none() {
1120 : // auth is set to Trust, nothing to check so just return ok
1121 0 : return Ok(());
1122 0 : }
1123 0 : // auth is some, just checked above, when auth is some
1124 0 : // then claims are always present because of checks during connection init
1125 0 : // so this expect won't trigger
1126 0 : let claims = self
1127 0 : .claims
1128 0 : .as_ref()
1129 0 : .expect("claims presence already checked");
1130 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1131 0 : }
1132 :
1133 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1134 0 : async fn get_active_tenant_timeline(
1135 0 : &self,
1136 0 : tenant_id: TenantId,
1137 0 : timeline_id: TimelineId,
1138 0 : selector: ShardSelector,
1139 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1140 0 : let tenant = self
1141 0 : .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT)
1142 0 : .await
1143 0 : .map_err(GetActiveTimelineError::Tenant)?;
1144 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
1145 0 : set_tracing_field_shard_id(&timeline);
1146 0 : Ok(timeline)
1147 0 : }
1148 :
1149 : /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some
1150 : /// slots for this tenant are `InProgress` then we will wait.
1151 : /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait.
1152 : ///
1153 : /// `timeout` is used as a total timeout for the whole wait operation.
1154 0 : async fn get_active_tenant_with_timeout(
1155 0 : &self,
1156 0 : tenant_id: TenantId,
1157 0 : shard_selector: ShardSelector,
1158 0 : timeout: Duration,
1159 0 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1160 0 : let wait_start = Instant::now();
1161 0 : let deadline = wait_start + timeout;
1162 :
1163 : // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is
1164 : // for handling the rare case that the slot we're accessing is InProgress.
1165 0 : let tenant_shard = loop {
1166 0 : let resolved = self
1167 0 : .tenant_manager
1168 0 : .resolve_attached_shard(&tenant_id, shard_selector);
1169 0 : match resolved {
1170 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
1171 : ShardResolveResult::NotFound => {
1172 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1173 0 : tenant_id,
1174 0 : )));
1175 : }
1176 0 : ShardResolveResult::InProgress(barrier) => {
1177 : // We can't authoritatively answer right now: wait for InProgress state
1178 : // to end, then try again
1179 : tokio::select! {
1180 : _ = self.await_connection_cancelled() => {
1181 : return Err(GetActiveTenantError::Cancelled)
1182 : },
1183 : _ = barrier.wait() => {
1184 : // The barrier completed: proceed around the loop to try looking up again
1185 : },
1186 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
1187 : return Err(GetActiveTenantError::WaitForActiveTimeout {
1188 : latest_state: None,
1189 : wait_time: timeout,
1190 : });
1191 : }
1192 : }
1193 : }
1194 : };
1195 : };
1196 :
1197 0 : tracing::debug!("Waiting for tenant to enter active state...");
1198 0 : tenant_shard
1199 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1200 0 : .await?;
1201 0 : Ok(tenant_shard)
1202 0 : }
1203 : }
1204 :
1205 : #[async_trait::async_trait]
1206 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1207 : where
1208 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1209 : {
1210 0 : fn check_auth_jwt(
1211 0 : &mut self,
1212 0 : _pgb: &mut PostgresBackend<IO>,
1213 0 : jwt_response: &[u8],
1214 0 : ) -> Result<(), QueryError> {
1215 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1216 : // which requires auth to be present
1217 0 : let data = self
1218 0 : .auth
1219 0 : .as_ref()
1220 0 : .unwrap()
1221 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1222 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1223 :
1224 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1225 0 : return Err(QueryError::Unauthorized(
1226 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1227 0 : ));
1228 0 : }
1229 0 :
1230 0 : debug!(
1231 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1232 : data.claims.scope, data.claims.tenant_id,
1233 : );
1234 :
1235 0 : self.claims = Some(data.claims);
1236 0 : Ok(())
1237 0 : }
1238 :
1239 0 : fn startup(
1240 0 : &mut self,
1241 0 : _pgb: &mut PostgresBackend<IO>,
1242 0 : _sm: &FeStartupPacket,
1243 0 : ) -> Result<(), QueryError> {
1244 : fail::fail_point!("ps::connection-start::startup-packet");
1245 0 : Ok(())
1246 0 : }
1247 :
1248 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1249 : async fn process_query(
1250 : &mut self,
1251 : pgb: &mut PostgresBackend<IO>,
1252 : query_string: &str,
1253 0 : ) -> Result<(), QueryError> {
1254 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1255 0 : info!("Hit failpoint for bad connection");
1256 0 : Err(QueryError::SimulatedConnectionError)
1257 0 : });
1258 :
1259 : fail::fail_point!("ps::connection-start::process-query");
1260 :
1261 0 : let ctx = self.connection_ctx.attached_child();
1262 0 : debug!("process query {query_string:?}");
1263 0 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1264 0 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1265 0 : if params.len() != 2 {
1266 0 : return Err(QueryError::Other(anyhow::anyhow!(
1267 0 : "invalid param number for pagestream command"
1268 0 : )));
1269 0 : }
1270 0 : let tenant_id = TenantId::from_str(params[0])
1271 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1272 0 : let timeline_id = TimelineId::from_str(params[1])
1273 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1274 :
1275 0 : tracing::Span::current()
1276 0 : .record("tenant_id", field::display(tenant_id))
1277 0 : .record("timeline_id", field::display(timeline_id));
1278 0 :
1279 0 : self.check_permission(Some(tenant_id))?;
1280 :
1281 0 : COMPUTE_COMMANDS_COUNTERS
1282 0 : .for_command(ComputeCommandKind::PageStreamV2)
1283 0 : .inc();
1284 0 :
1285 0 : self.handle_pagerequests(
1286 0 : pgb,
1287 0 : tenant_id,
1288 0 : timeline_id,
1289 0 : PagestreamProtocolVersion::V2,
1290 0 : ctx,
1291 0 : )
1292 0 : .await?;
1293 0 : } else if let Some(params) = parts.strip_prefix(&["pagestream"]) {
1294 0 : if params.len() != 2 {
1295 0 : return Err(QueryError::Other(anyhow::anyhow!(
1296 0 : "invalid param number for pagestream command"
1297 0 : )));
1298 0 : }
1299 0 : let tenant_id = TenantId::from_str(params[0])
1300 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1301 0 : let timeline_id = TimelineId::from_str(params[1])
1302 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1303 :
1304 0 : tracing::Span::current()
1305 0 : .record("tenant_id", field::display(tenant_id))
1306 0 : .record("timeline_id", field::display(timeline_id));
1307 0 :
1308 0 : self.check_permission(Some(tenant_id))?;
1309 :
1310 0 : COMPUTE_COMMANDS_COUNTERS
1311 0 : .for_command(ComputeCommandKind::PageStream)
1312 0 : .inc();
1313 0 :
1314 0 : self.handle_pagerequests(
1315 0 : pgb,
1316 0 : tenant_id,
1317 0 : timeline_id,
1318 0 : PagestreamProtocolVersion::V1,
1319 0 : ctx,
1320 0 : )
1321 0 : .await?;
1322 0 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1323 0 : if params.len() < 2 {
1324 0 : return Err(QueryError::Other(anyhow::anyhow!(
1325 0 : "invalid param number for basebackup command"
1326 0 : )));
1327 0 : }
1328 :
1329 0 : let tenant_id = TenantId::from_str(params[0])
1330 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1331 0 : let timeline_id = TimelineId::from_str(params[1])
1332 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1333 :
1334 0 : tracing::Span::current()
1335 0 : .record("tenant_id", field::display(tenant_id))
1336 0 : .record("timeline_id", field::display(timeline_id));
1337 0 :
1338 0 : self.check_permission(Some(tenant_id))?;
1339 :
1340 0 : COMPUTE_COMMANDS_COUNTERS
1341 0 : .for_command(ComputeCommandKind::Basebackup)
1342 0 : .inc();
1343 :
1344 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1345 : Some(
1346 0 : Lsn::from_str(lsn_str)
1347 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1348 : )
1349 : } else {
1350 0 : None
1351 : };
1352 :
1353 0 : let gzip = match params.get(3) {
1354 0 : Some(&"--gzip") => true,
1355 0 : None => false,
1356 0 : Some(third_param) => {
1357 0 : return Err(QueryError::Other(anyhow::anyhow!(
1358 0 : "Parameter in position 3 unknown {third_param}",
1359 0 : )))
1360 : }
1361 : };
1362 :
1363 0 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1364 0 : let res = async {
1365 0 : self.handle_basebackup_request(
1366 0 : pgb,
1367 0 : tenant_id,
1368 0 : timeline_id,
1369 0 : lsn,
1370 0 : None,
1371 0 : false,
1372 0 : gzip,
1373 0 : &ctx,
1374 0 : )
1375 0 : .await?;
1376 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1377 0 : Result::<(), QueryError>::Ok(())
1378 0 : }
1379 0 : .await;
1380 0 : metric_recording.observe(&res);
1381 0 : res?;
1382 : }
1383 : // same as basebackup, but result includes relational data as well
1384 0 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1385 0 : if params.len() < 2 {
1386 0 : return Err(QueryError::Other(anyhow::anyhow!(
1387 0 : "invalid param number for fullbackup command"
1388 0 : )));
1389 0 : }
1390 :
1391 0 : let tenant_id = TenantId::from_str(params[0])
1392 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1393 0 : let timeline_id = TimelineId::from_str(params[1])
1394 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1395 :
1396 0 : tracing::Span::current()
1397 0 : .record("tenant_id", field::display(tenant_id))
1398 0 : .record("timeline_id", field::display(timeline_id));
1399 :
1400 : // The caller is responsible for providing correct lsn and prev_lsn.
1401 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1402 : Some(
1403 0 : Lsn::from_str(lsn_str)
1404 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1405 : )
1406 : } else {
1407 0 : None
1408 : };
1409 0 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1410 : Some(
1411 0 : Lsn::from_str(prev_lsn_str)
1412 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1413 : )
1414 : } else {
1415 0 : None
1416 : };
1417 :
1418 0 : self.check_permission(Some(tenant_id))?;
1419 :
1420 0 : COMPUTE_COMMANDS_COUNTERS
1421 0 : .for_command(ComputeCommandKind::Fullbackup)
1422 0 : .inc();
1423 0 :
1424 0 : // Check that the timeline exists
1425 0 : self.handle_basebackup_request(
1426 0 : pgb,
1427 0 : tenant_id,
1428 0 : timeline_id,
1429 0 : lsn,
1430 0 : prev_lsn,
1431 0 : true,
1432 0 : false,
1433 0 : &ctx,
1434 0 : )
1435 0 : .await?;
1436 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1437 0 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1438 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1439 : // on connect
1440 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1441 0 : } else if query_string.starts_with("lease lsn ") {
1442 0 : let params = &parts[2..];
1443 0 : if params.len() != 3 {
1444 0 : return Err(QueryError::Other(anyhow::anyhow!(
1445 0 : "invalid param number {} for lease lsn command",
1446 0 : params.len()
1447 0 : )));
1448 0 : }
1449 :
1450 0 : let tenant_shard_id = TenantShardId::from_str(params[0])
1451 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1452 0 : let timeline_id = TimelineId::from_str(params[1])
1453 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1454 :
1455 0 : tracing::Span::current()
1456 0 : .record("tenant_id", field::display(tenant_shard_id))
1457 0 : .record("timeline_id", field::display(timeline_id));
1458 0 :
1459 0 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1460 :
1461 0 : COMPUTE_COMMANDS_COUNTERS
1462 0 : .for_command(ComputeCommandKind::LeaseLsn)
1463 0 : .inc();
1464 :
1465 : // The caller is responsible for providing correct lsn.
1466 0 : let lsn = Lsn::from_str(params[2])
1467 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1468 :
1469 0 : match self
1470 0 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1471 0 : .await
1472 : {
1473 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1474 0 : Err(e) => {
1475 0 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1476 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1477 0 : &e.to_string(),
1478 0 : Some(e.pg_error_code()),
1479 0 : ))?
1480 : }
1481 : };
1482 : } else {
1483 0 : return Err(QueryError::Other(anyhow::anyhow!(
1484 0 : "unknown command {query_string}"
1485 0 : )));
1486 : }
1487 :
1488 0 : Ok(())
1489 0 : }
1490 : }
1491 :
1492 : impl From<GetActiveTenantError> for QueryError {
1493 0 : fn from(e: GetActiveTenantError) -> Self {
1494 0 : match e {
1495 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1496 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1497 0 : ),
1498 : GetActiveTenantError::Cancelled
1499 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1500 0 : QueryError::Shutdown
1501 : }
1502 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1503 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1504 : }
1505 0 : }
1506 : }
1507 :
1508 0 : #[derive(Debug, thiserror::Error)]
1509 : enum GetActiveTimelineError {
1510 : #[error(transparent)]
1511 : Tenant(GetActiveTenantError),
1512 : #[error(transparent)]
1513 : Timeline(#[from] GetTimelineError),
1514 : }
1515 :
1516 : impl From<GetActiveTimelineError> for QueryError {
1517 0 : fn from(e: GetActiveTimelineError) -> Self {
1518 0 : match e {
1519 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1520 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1521 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1522 : }
1523 0 : }
1524 : }
1525 :
1526 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1527 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1528 0 : tracing::Span::current().record(
1529 0 : "shard_id",
1530 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1531 0 : );
1532 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1533 0 : }
|