Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::models::{self, TenantState};
11 : use pageserver_api::models::{
12 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
13 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
14 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
15 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
16 : PagestreamProtocolVersion,
17 : };
18 : use pageserver_api::shard::TenantShardId;
19 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
20 : use pq_proto::framed::ConnectionError;
21 : use pq_proto::FeStartupPacket;
22 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
23 : use std::borrow::Cow;
24 : use std::io;
25 : use std::str;
26 : use std::str::FromStr;
27 : use std::sync::Arc;
28 : use std::time::SystemTime;
29 : use std::time::{Duration, Instant};
30 : use tokio::io::{AsyncRead, AsyncWrite};
31 : use tokio::io::{AsyncWriteExt, BufWriter};
32 : use tokio::task::JoinHandle;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::*;
35 : use utils::{
36 : auth::{Claims, Scope, SwappableJwtAuth},
37 : id::{TenantId, TimelineId},
38 : lsn::Lsn,
39 : simple_rcu::RcuReadGuard,
40 : };
41 :
42 : use crate::auth::check_permission;
43 : use crate::basebackup;
44 : use crate::basebackup::BasebackupError;
45 : use crate::config::PageServerConf;
46 : use crate::context::{DownloadBehavior, RequestContext};
47 : use crate::metrics::{self};
48 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
49 : use crate::pgdatadir_mapping::Version;
50 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
51 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
52 : use crate::task_mgr::TaskKind;
53 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
54 : use crate::tenant::mgr::ShardSelector;
55 : use crate::tenant::mgr::TenantManager;
56 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
57 : use crate::tenant::timeline::{self, WaitLsnError};
58 : use crate::tenant::GetTimelineError;
59 : use crate::tenant::PageReconstructError;
60 : use crate::tenant::Timeline;
61 : use pageserver_api::key::rel_block_to_key;
62 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
63 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
64 : use postgres_ffi::BLCKSZ;
65 :
66 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
67 : /// is not yet in state [`TenantState::Active`].
68 : ///
69 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
70 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
71 :
72 : ///////////////////////////////////////////////////////////////////////////////
73 :
74 : pub struct Listener {
75 : cancel: CancellationToken,
76 : /// Cancel the listener task through `listen_cancel` to shut down the listener
77 : /// and get a handle on the existing connections.
78 : task: JoinHandle<Connections>,
79 : }
80 :
81 : pub struct Connections {
82 : cancel: CancellationToken,
83 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
84 : }
85 :
86 0 : pub fn spawn(
87 0 : conf: &'static PageServerConf,
88 0 : tenant_manager: Arc<TenantManager>,
89 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
90 0 : tcp_listener: tokio::net::TcpListener,
91 0 : ) -> Listener {
92 0 : let cancel = CancellationToken::new();
93 0 : let libpq_ctx = RequestContext::todo_child(
94 0 : TaskKind::LibpqEndpointListener,
95 0 : // listener task shouldn't need to download anything. (We will
96 0 : // create a separate sub-contexts for each connection, with their
97 0 : // own download behavior. This context is used only to listen and
98 0 : // accept connections.)
99 0 : DownloadBehavior::Error,
100 0 : );
101 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
102 0 : "libpq listener",
103 0 : libpq_listener_main(
104 0 : tenant_manager,
105 0 : pg_auth,
106 0 : tcp_listener,
107 0 : conf.pg_auth_type,
108 0 : conf.server_side_batch_timeout,
109 0 : libpq_ctx,
110 0 : cancel.clone(),
111 0 : )
112 0 : .map(anyhow::Ok),
113 0 : ));
114 0 :
115 0 : Listener { cancel, task }
116 0 : }
117 :
118 : impl Listener {
119 0 : pub async fn stop_accepting(self) -> Connections {
120 0 : self.cancel.cancel();
121 0 : self.task
122 0 : .await
123 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
124 0 : }
125 : }
126 : impl Connections {
127 0 : pub(crate) async fn shutdown(self) {
128 0 : let Self { cancel, mut tasks } = self;
129 0 : cancel.cancel();
130 0 : while let Some(res) = tasks.join_next().await {
131 0 : Self::handle_connection_completion(res);
132 0 : }
133 0 : }
134 :
135 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
136 0 : match res {
137 0 : Ok(Ok(())) => {}
138 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
139 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
140 : }
141 0 : }
142 : }
143 :
144 : ///
145 : /// Main loop of the page service.
146 : ///
147 : /// Listens for connections, and launches a new handler task for each.
148 : ///
149 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
150 : /// open connections.
151 : ///
152 0 : pub async fn libpq_listener_main(
153 0 : tenant_manager: Arc<TenantManager>,
154 0 : auth: Option<Arc<SwappableJwtAuth>>,
155 0 : listener: tokio::net::TcpListener,
156 0 : auth_type: AuthType,
157 0 : server_side_batch_timeout: Option<Duration>,
158 0 : listener_ctx: RequestContext,
159 0 : listener_cancel: CancellationToken,
160 0 : ) -> Connections {
161 0 : let connections_cancel = CancellationToken::new();
162 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
163 :
164 : loop {
165 0 : let accepted = tokio::select! {
166 : biased;
167 0 : _ = listener_cancel.cancelled() => break,
168 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
169 0 : let res = next.expect("we dont poll while empty");
170 0 : Connections::handle_connection_completion(res);
171 0 : continue;
172 : }
173 0 : accepted = listener.accept() => accepted,
174 0 : };
175 0 :
176 0 : match accepted {
177 0 : Ok((socket, peer_addr)) => {
178 0 : // Connection established. Spawn a new task to handle it.
179 0 : debug!("accepted connection from {}", peer_addr);
180 0 : let local_auth = auth.clone();
181 0 : let connection_ctx = listener_ctx
182 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
183 0 : connection_handler_tasks.spawn(page_service_conn_main(
184 0 : tenant_manager.clone(),
185 0 : local_auth,
186 0 : socket,
187 0 : auth_type,
188 0 : server_side_batch_timeout,
189 0 : connection_ctx,
190 0 : connections_cancel.child_token(),
191 0 : ));
192 : }
193 0 : Err(err) => {
194 0 : // accept() failed. Log the error, and loop back to retry on next connection.
195 0 : error!("accept() failed: {:?}", err);
196 : }
197 : }
198 : }
199 :
200 0 : debug!("page_service listener loop terminated");
201 :
202 0 : Connections {
203 0 : cancel: connections_cancel,
204 0 : tasks: connection_handler_tasks,
205 0 : }
206 0 : }
207 :
208 : type ConnectionHandlerResult = anyhow::Result<()>;
209 :
210 0 : #[instrument(skip_all, fields(peer_addr))]
211 : async fn page_service_conn_main(
212 : tenant_manager: Arc<TenantManager>,
213 : auth: Option<Arc<SwappableJwtAuth>>,
214 : socket: tokio::net::TcpStream,
215 : auth_type: AuthType,
216 : server_side_batch_timeout: Option<Duration>,
217 : connection_ctx: RequestContext,
218 : cancel: CancellationToken,
219 : ) -> ConnectionHandlerResult {
220 : let _guard = LIVE_CONNECTIONS
221 : .with_label_values(&["page_service"])
222 : .guard();
223 :
224 : socket
225 : .set_nodelay(true)
226 : .context("could not set TCP_NODELAY")?;
227 :
228 : let peer_addr = socket.peer_addr().context("get peer address")?;
229 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
230 :
231 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
232 : // - long enough for most valid compute connections
233 : // - less than infinite to stop us from "leaking" connections to long-gone computes
234 : //
235 : // no write timeout is used, because the kernel is assumed to error writes after some time.
236 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
237 :
238 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
239 0 : let socket_timeout_ms = (|| {
240 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
241 : // Exponential distribution for simulating
242 : // poor network conditions, expect about avg_timeout_ms to be around 15
243 : // in tests
244 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
245 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
246 0 : let u = rand::random::<f32>();
247 0 : ((1.0 - u).ln() / (-avg)) as u64
248 : } else {
249 0 : default_timeout_ms
250 : }
251 0 : });
252 0 : default_timeout_ms
253 : })();
254 :
255 : // A timeout here does not mean the client died, it can happen if it's just idle for
256 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
257 : // they reconnect.
258 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
259 : let socket = std::pin::pin!(socket);
260 :
261 : fail::fail_point!("ps::connection-start::pre-login");
262 :
263 : // XXX: pgbackend.run() should take the connection_ctx,
264 : // and create a child per-query context when it invokes process_query.
265 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
266 : // and create the per-query context in process_query ourselves.
267 : let mut conn_handler = PageServerHandler::new(
268 : tenant_manager,
269 : auth,
270 : server_side_batch_timeout,
271 : connection_ctx,
272 : cancel.clone(),
273 : );
274 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
275 :
276 : match pgbackend.run(&mut conn_handler, &cancel).await {
277 : Ok(()) => {
278 : // we've been requested to shut down
279 : Ok(())
280 : }
281 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
282 : if is_expected_io_error(&io_error) {
283 : info!("Postgres client disconnected ({io_error})");
284 : Ok(())
285 : } else {
286 : let tenant_id = conn_handler.timeline_handles.tenant_id();
287 : Err(io_error).context(format!(
288 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
289 : tenant_id, peer_addr
290 : ))
291 : }
292 : }
293 : other => {
294 : let tenant_id = conn_handler.timeline_handles.tenant_id();
295 : other.context(format!(
296 : "Postgres query error for tenant_id={:?} client peer_addr={}",
297 : tenant_id, peer_addr
298 : ))
299 : }
300 : }
301 : }
302 :
303 : struct PageServerHandler {
304 : auth: Option<Arc<SwappableJwtAuth>>,
305 : claims: Option<Claims>,
306 :
307 : /// The context created for the lifetime of the connection
308 : /// services by this PageServerHandler.
309 : /// For each query received over the connection,
310 : /// `process_query` creates a child context from this one.
311 : connection_ctx: RequestContext,
312 :
313 : cancel: CancellationToken,
314 :
315 : timeline_handles: TimelineHandles,
316 :
317 : /// Messages queued up for the next processing batch
318 : next_batch: Option<BatchedFeMessage>,
319 :
320 : /// See [`PageServerConf::server_side_batch_timeout`]
321 : server_side_batch_timeout: Option<Duration>,
322 : }
323 :
324 : struct TimelineHandles {
325 : wrapper: TenantManagerWrapper,
326 : /// Note on size: the typical size of this map is 1. The largest size we expect
327 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
328 : /// or the ratio used when splitting shards (i.e. how many children created from one)
329 : /// parent shard, where a "large" number might be ~8.
330 : handles: timeline::handle::Cache<TenantManagerTypes>,
331 : }
332 :
333 : impl TimelineHandles {
334 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
335 0 : Self {
336 0 : wrapper: TenantManagerWrapper {
337 0 : tenant_manager,
338 0 : tenant_id: OnceCell::new(),
339 0 : },
340 0 : handles: Default::default(),
341 0 : }
342 0 : }
343 0 : async fn get(
344 0 : &mut self,
345 0 : tenant_id: TenantId,
346 0 : timeline_id: TimelineId,
347 0 : shard_selector: ShardSelector,
348 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
349 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
350 0 : return Err(GetActiveTimelineError::Tenant(
351 0 : GetActiveTenantError::SwitchedTenant,
352 0 : ));
353 0 : }
354 0 : self.handles
355 0 : .get(timeline_id, shard_selector, &self.wrapper)
356 0 : .await
357 0 : .map_err(|e| match e {
358 0 : timeline::handle::GetError::TenantManager(e) => e,
359 : timeline::handle::GetError::TimelineGateClosed => {
360 0 : trace!("timeline gate closed");
361 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
362 : }
363 : timeline::handle::GetError::PerTimelineStateShutDown => {
364 0 : trace!("per-timeline state shut down");
365 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
366 : }
367 0 : })
368 0 : }
369 :
370 0 : fn tenant_id(&self) -> Option<TenantId> {
371 0 : self.wrapper.tenant_id.get().copied()
372 0 : }
373 : }
374 :
375 : pub(crate) struct TenantManagerWrapper {
376 : tenant_manager: Arc<TenantManager>,
377 : // We do not support switching tenant_id on a connection at this point.
378 : // We can can add support for this later if needed without changing
379 : // the protocol.
380 : tenant_id: once_cell::sync::OnceCell<TenantId>,
381 : }
382 :
383 : #[derive(Debug)]
384 : pub(crate) struct TenantManagerTypes;
385 :
386 : impl timeline::handle::Types for TenantManagerTypes {
387 : type TenantManagerError = GetActiveTimelineError;
388 : type TenantManager = TenantManagerWrapper;
389 : type Timeline = Arc<Timeline>;
390 : }
391 :
392 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
393 0 : fn gate(&self) -> &utils::sync::gate::Gate {
394 0 : &self.gate
395 0 : }
396 :
397 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
398 0 : Timeline::shard_timeline_id(self)
399 0 : }
400 :
401 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
402 0 : &self.handles
403 0 : }
404 :
405 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
406 0 : Timeline::get_shard_identity(self)
407 0 : }
408 : }
409 :
410 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
411 0 : async fn resolve(
412 0 : &self,
413 0 : timeline_id: TimelineId,
414 0 : shard_selector: ShardSelector,
415 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
416 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
417 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
418 0 : let wait_start = Instant::now();
419 0 : let deadline = wait_start + timeout;
420 0 : let tenant_shard = loop {
421 0 : let resolved = self
422 0 : .tenant_manager
423 0 : .resolve_attached_shard(tenant_id, shard_selector);
424 0 : match resolved {
425 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
426 : ShardResolveResult::NotFound => {
427 0 : return Err(GetActiveTimelineError::Tenant(
428 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
429 0 : ));
430 : }
431 0 : ShardResolveResult::InProgress(barrier) => {
432 0 : // We can't authoritatively answer right now: wait for InProgress state
433 0 : // to end, then try again
434 0 : tokio::select! {
435 0 : _ = barrier.wait() => {
436 0 : // The barrier completed: proceed around the loop to try looking up again
437 0 : },
438 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
439 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
440 0 : latest_state: None,
441 0 : wait_time: timeout,
442 0 : }));
443 : }
444 : }
445 : }
446 : };
447 : };
448 :
449 0 : tracing::debug!("Waiting for tenant to enter active state...");
450 0 : tenant_shard
451 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
452 0 : .await
453 0 : .map_err(GetActiveTimelineError::Tenant)?;
454 :
455 0 : let timeline = tenant_shard
456 0 : .get_timeline(timeline_id, true)
457 0 : .map_err(GetActiveTimelineError::Timeline)?;
458 0 : set_tracing_field_shard_id(&timeline);
459 0 : Ok(timeline)
460 0 : }
461 : }
462 :
463 0 : #[derive(thiserror::Error, Debug)]
464 : enum PageStreamError {
465 : /// We encountered an error that should prompt the client to reconnect:
466 : /// in practice this means we drop the connection without sending a response.
467 : #[error("Reconnect required: {0}")]
468 : Reconnect(Cow<'static, str>),
469 :
470 : /// We were instructed to shutdown while processing the query
471 : #[error("Shutting down")]
472 : Shutdown,
473 :
474 : /// Something went wrong reading a page: this likely indicates a pageserver bug
475 : #[error("Read error")]
476 : Read(#[source] PageReconstructError),
477 :
478 : /// Ran out of time waiting for an LSN
479 : #[error("LSN timeout: {0}")]
480 : LsnTimeout(WaitLsnError),
481 :
482 : /// The entity required to serve the request (tenant or timeline) is not found,
483 : /// or is not found in a suitable state to serve a request.
484 : #[error("Not found: {0}")]
485 : NotFound(Cow<'static, str>),
486 :
487 : /// Request asked for something that doesn't make sense, like an invalid LSN
488 : #[error("Bad request: {0}")]
489 : BadRequest(Cow<'static, str>),
490 : }
491 :
492 : impl From<PageReconstructError> for PageStreamError {
493 0 : fn from(value: PageReconstructError) -> Self {
494 0 : match value {
495 0 : PageReconstructError::Cancelled => Self::Shutdown,
496 0 : e => Self::Read(e),
497 : }
498 0 : }
499 : }
500 :
501 : impl From<GetActiveTimelineError> for PageStreamError {
502 0 : fn from(value: GetActiveTimelineError) -> Self {
503 0 : match value {
504 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
505 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
506 : TenantState::Stopping { .. },
507 : ))
508 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
509 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
510 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
511 : }
512 0 : }
513 : }
514 :
515 : impl From<WaitLsnError> for PageStreamError {
516 0 : fn from(value: WaitLsnError) -> Self {
517 0 : match value {
518 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
519 0 : WaitLsnError::Shutdown => Self::Shutdown,
520 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
521 : }
522 0 : }
523 : }
524 :
525 : impl From<WaitLsnError> for QueryError {
526 0 : fn from(value: WaitLsnError) -> Self {
527 0 : match value {
528 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
529 0 : WaitLsnError::Shutdown => Self::Shutdown,
530 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
531 : }
532 0 : }
533 : }
534 :
535 : enum BatchedFeMessage {
536 : Exists {
537 : span: Span,
538 : req: models::PagestreamExistsRequest,
539 : },
540 : Nblocks {
541 : span: Span,
542 : req: models::PagestreamNblocksRequest,
543 : },
544 : GetPage {
545 : span: Span,
546 : shard: timeline::handle::Handle<TenantManagerTypes>,
547 : effective_request_lsn: Lsn,
548 : pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
549 : },
550 : DbSize {
551 : span: Span,
552 : req: models::PagestreamDbSizeRequest,
553 : },
554 : GetSlruSegment {
555 : span: Span,
556 : req: models::PagestreamGetSlruSegmentRequest,
557 : },
558 : RespondError {
559 : span: Span,
560 : error: PageStreamError,
561 : },
562 : }
563 :
564 : enum BatchOrEof {
565 : /// In the common case, this has one entry.
566 : /// At most, it has two entries: the first is the leftover batch, the second is an error.
567 : Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>),
568 : Eof,
569 : }
570 :
571 : impl PageServerHandler {
572 0 : pub fn new(
573 0 : tenant_manager: Arc<TenantManager>,
574 0 : auth: Option<Arc<SwappableJwtAuth>>,
575 0 : server_side_batch_timeout: Option<Duration>,
576 0 : connection_ctx: RequestContext,
577 0 : cancel: CancellationToken,
578 0 : ) -> Self {
579 0 : PageServerHandler {
580 0 : auth,
581 0 : claims: None,
582 0 : connection_ctx,
583 0 : timeline_handles: TimelineHandles::new(tenant_manager),
584 0 : cancel,
585 0 : next_batch: None,
586 0 : server_side_batch_timeout,
587 0 : }
588 0 : }
589 :
590 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
591 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
592 : /// cancellation if there aren't any timelines in the cache.
593 : ///
594 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
595 : /// timeline cancellation token.
596 0 : async fn flush_cancellable<IO>(
597 0 : &self,
598 0 : pgb: &mut PostgresBackend<IO>,
599 0 : cancel: &CancellationToken,
600 0 : ) -> Result<(), QueryError>
601 0 : where
602 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
603 0 : {
604 0 : tokio::select!(
605 0 : flush_r = pgb.flush() => {
606 0 : Ok(flush_r?)
607 : },
608 0 : _ = cancel.cancelled() => {
609 0 : Err(QueryError::Shutdown)
610 : }
611 : )
612 0 : }
613 :
614 0 : async fn read_batch_from_connection<IO>(
615 0 : &mut self,
616 0 : pgb: &mut PostgresBackend<IO>,
617 0 : tenant_id: &TenantId,
618 0 : timeline_id: &TimelineId,
619 0 : ctx: &RequestContext,
620 0 : ) -> Result<Option<BatchOrEof>, QueryError>
621 0 : where
622 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
623 0 : {
624 0 : let mut batch = self.next_batch.take();
625 0 : let mut batch_started_at: Option<std::time::Instant> = None;
626 :
627 0 : let next_batch: Option<BatchedFeMessage> = loop {
628 0 : let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) {
629 0 : (Some(batch_timeout), Some(started_at)) => futures::future::Either::Left(
630 0 : tokio::time::sleep_until((started_at + batch_timeout).into()),
631 0 : ),
632 0 : _ => futures::future::Either::Right(futures::future::pending()),
633 : };
634 :
635 0 : let msg = tokio::select! {
636 : biased;
637 0 : _ = self.cancel.cancelled() => {
638 0 : return Err(QueryError::Shutdown)
639 : }
640 0 : msg = pgb.read_message() => {
641 0 : msg
642 : }
643 0 : _ = sleep_fut => {
644 0 : assert!(batch.is_some());
645 0 : break None;
646 : }
647 : };
648 0 : let copy_data_bytes = match msg? {
649 0 : Some(FeMessage::CopyData(bytes)) => bytes,
650 : Some(FeMessage::Terminate) => {
651 0 : return Ok(Some(BatchOrEof::Eof));
652 : }
653 0 : Some(m) => {
654 0 : return Err(QueryError::Other(anyhow::anyhow!(
655 0 : "unexpected message: {m:?} during COPY"
656 0 : )));
657 : }
658 : None => {
659 0 : return Ok(Some(BatchOrEof::Eof));
660 : } // client disconnected
661 : };
662 0 : trace!("query: {copy_data_bytes:?}");
663 0 : fail::fail_point!("ps::handle-pagerequest-message");
664 :
665 : // parse request
666 0 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
667 :
668 0 : let this_msg = match neon_fe_msg {
669 0 : PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists {
670 0 : span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn),
671 0 : req,
672 : },
673 0 : PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks {
674 0 : span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn),
675 0 : req,
676 : },
677 0 : PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize {
678 0 : span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn),
679 0 : req,
680 : },
681 0 : PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment {
682 0 : span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn),
683 0 : req,
684 : },
685 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
686 0 : request_lsn,
687 0 : not_modified_since,
688 0 : rel,
689 0 : blkno,
690 : }) => {
691 : // shard_id is filled in by the handler
692 0 : let span = tracing::info_span!(
693 0 : "handle_get_page_at_lsn_request_batched",
694 0 : %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn,
695 0 : batch_size = tracing::field::Empty, batch_id = tracing::field::Empty
696 0 : );
697 :
698 : macro_rules! current_batch_and_error {
699 : ($error:expr) => {{
700 : let error = BatchedFeMessage::RespondError {
701 : span,
702 : error: $error,
703 : };
704 : let batch_and_error = match batch {
705 : Some(b) => smallvec::smallvec![b, error],
706 : None => smallvec::smallvec![error],
707 : };
708 : Ok(Some(BatchOrEof::Batch(batch_and_error)))
709 : }};
710 : }
711 :
712 0 : let key = rel_block_to_key(rel, blkno);
713 0 : let shard = match self
714 0 : .timeline_handles
715 0 : .get(*tenant_id, *timeline_id, ShardSelector::Page(key))
716 0 : .instrument(span.clone())
717 0 : .await
718 : {
719 0 : Ok(tl) => tl,
720 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
721 : // We already know this tenant exists in general, because we resolved it at
722 : // start of connection. Getting a NotFound here indicates that the shard containing
723 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
724 : // mapping is out of date.
725 : //
726 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
727 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
728 : // and talk to a different pageserver.
729 0 : return current_batch_and_error!(PageStreamError::Reconnect(
730 0 : "getpage@lsn request routed to wrong shard".into()
731 0 : ));
732 : }
733 0 : Err(e) => {
734 0 : return current_batch_and_error!(e.into());
735 : }
736 : };
737 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
738 0 : &shard,
739 0 : request_lsn,
740 0 : not_modified_since,
741 0 : &shard.get_latest_gc_cutoff_lsn(),
742 0 : ctx,
743 0 : )
744 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
745 0 : .await
746 : {
747 0 : Ok(lsn) => lsn,
748 0 : Err(e) => {
749 0 : return current_batch_and_error!(e);
750 : }
751 : };
752 : BatchedFeMessage::GetPage {
753 0 : span,
754 0 : shard,
755 0 : effective_request_lsn,
756 0 : pages: smallvec::smallvec![(rel, blkno)],
757 : }
758 : }
759 : };
760 :
761 0 : let batch_timeout = match self.server_side_batch_timeout {
762 0 : Some(value) => value,
763 : None => {
764 : // Batching is not enabled - stop on the first message.
765 0 : return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg])));
766 : }
767 : };
768 :
769 : // check if we can batch
770 0 : match (&mut batch, this_msg) {
771 0 : (None, this_msg) => {
772 0 : batch = Some(this_msg);
773 0 : }
774 : (
775 : Some(BatchedFeMessage::GetPage {
776 : span: _,
777 0 : shard: accum_shard,
778 0 : pages: accum_pages,
779 0 : effective_request_lsn: accum_lsn,
780 : }),
781 : BatchedFeMessage::GetPage {
782 : span: _,
783 0 : shard: this_shard,
784 0 : pages: this_pages,
785 0 : effective_request_lsn: this_lsn,
786 : },
787 0 : ) if async {
788 0 : assert_eq!(this_pages.len(), 1);
789 0 : if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
790 0 : assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
791 0 : return false;
792 0 : }
793 0 : if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
794 0 : != (this_shard.tenant_shard_id, this_shard.timeline_id)
795 : {
796 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
797 : // But the current logic for keeping responses in order does not support that.
798 0 : return false;
799 0 : }
800 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
801 0 : // as the effective request_lsn changes
802 0 : if *accum_lsn != this_lsn {
803 0 : return false;
804 0 : }
805 0 : true
806 0 : }
807 0 : .await =>
808 0 : {
809 0 : // ok to batch
810 0 : accum_pages.extend(this_pages);
811 0 : }
812 0 : (Some(_), this_msg) => {
813 0 : // by default, don't continue batching
814 0 : break Some(this_msg);
815 : }
816 : }
817 :
818 : // batching impl piece
819 0 : let started_at = batch_started_at.get_or_insert_with(Instant::now);
820 0 : if started_at.elapsed() > batch_timeout {
821 0 : break None;
822 0 : }
823 : };
824 :
825 0 : self.next_batch = next_batch;
826 0 : Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b])))
827 0 : }
828 :
829 : /// Pagestream sub-protocol handler.
830 : ///
831 : /// It is a simple request-response protocol inside a COPYBOTH session.
832 : ///
833 : /// # Coding Discipline
834 : ///
835 : /// Coding discipline within this function: all interaction with the `pgb` connection
836 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
837 : /// This is so that we can shutdown page_service quickly.
838 0 : #[instrument(skip_all)]
839 : async fn handle_pagerequests<IO>(
840 : &mut self,
841 : pgb: &mut PostgresBackend<IO>,
842 : tenant_id: TenantId,
843 : timeline_id: TimelineId,
844 : _protocol_version: PagestreamProtocolVersion,
845 : ctx: RequestContext,
846 : ) -> Result<(), QueryError>
847 : where
848 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
849 : {
850 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
851 :
852 : // switch client to COPYBOTH
853 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
854 : tokio::select! {
855 : biased;
856 : _ = self.cancel.cancelled() => {
857 : return Err(QueryError::Shutdown)
858 : }
859 : res = pgb.flush() => {
860 : res?;
861 : }
862 : }
863 :
864 : // If [`PageServerHandler`] is reused for multiple pagestreams,
865 : // then make sure to not process requests from the previous ones.
866 : self.next_batch = None;
867 :
868 : loop {
869 : let maybe_batched = self
870 : .read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx)
871 : .await?;
872 : let batched = match maybe_batched {
873 : Some(BatchOrEof::Batch(b)) => b,
874 : Some(BatchOrEof::Eof) => {
875 : break;
876 : }
877 : None => {
878 : continue;
879 : }
880 : };
881 :
882 : for batch in batched {
883 : // invoke handler function
884 : let (handler_results, span): (
885 : Vec<Result<PagestreamBeMessage, PageStreamError>>,
886 : _,
887 : ) = match batch {
888 : BatchedFeMessage::Exists { span, req } => {
889 : fail::fail_point!("ps::handle-pagerequest-message::exists");
890 : (
891 : vec![
892 : self.handle_get_rel_exists_request(
893 : tenant_id,
894 : timeline_id,
895 : &req,
896 : &ctx,
897 : )
898 : .instrument(span.clone())
899 : .await,
900 : ],
901 : span,
902 : )
903 : }
904 : BatchedFeMessage::Nblocks { span, req } => {
905 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
906 : (
907 : vec![
908 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
909 : .instrument(span.clone())
910 : .await,
911 : ],
912 : span,
913 : )
914 : }
915 : BatchedFeMessage::GetPage {
916 : span,
917 : shard,
918 : effective_request_lsn,
919 : pages,
920 : } => {
921 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
922 : (
923 : {
924 : let npages = pages.len();
925 : let res = self
926 : .handle_get_page_at_lsn_request_batched(
927 : &shard,
928 : effective_request_lsn,
929 : pages,
930 : &ctx,
931 : )
932 : .instrument(span.clone())
933 : .await;
934 : assert_eq!(res.len(), npages);
935 : res
936 : },
937 : span,
938 : )
939 : }
940 : BatchedFeMessage::DbSize { span, req } => {
941 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
942 : (
943 : vec![
944 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
945 : .instrument(span.clone())
946 : .await,
947 : ],
948 : span,
949 : )
950 : }
951 : BatchedFeMessage::GetSlruSegment { span, req } => {
952 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
953 : (
954 : vec![
955 : self.handle_get_slru_segment_request(
956 : tenant_id,
957 : timeline_id,
958 : &req,
959 : &ctx,
960 : )
961 : .instrument(span.clone())
962 : .await,
963 : ],
964 : span,
965 : )
966 : }
967 : BatchedFeMessage::RespondError { span, error } => {
968 : // We've already decided to respond with an error, so we don't need to
969 : // call the handler.
970 : (vec![Err(error)], span)
971 : }
972 : };
973 :
974 : // Map handler result to protocol behavior.
975 : // Some handler errors cause exit from pagestream protocol.
976 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
977 : for handler_result in handler_results {
978 : let response_msg = match handler_result {
979 : Err(e) => match &e {
980 : PageStreamError::Shutdown => {
981 : // If we fail to fulfil a request during shutdown, which may be _because_ of
982 : // shutdown, then do not send the error to the client. Instead just drop the
983 : // connection.
984 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
985 : return Err(QueryError::Shutdown);
986 : }
987 : PageStreamError::Reconnect(reason) => {
988 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
989 : return Err(QueryError::Reconnect);
990 : }
991 : PageStreamError::Read(_)
992 : | PageStreamError::LsnTimeout(_)
993 : | PageStreamError::NotFound(_)
994 : | PageStreamError::BadRequest(_) => {
995 : // print the all details to the log with {:#}, but for the client the
996 : // error message is enough. Do not log if shutting down, as the anyhow::Error
997 : // here includes cancellation which is not an error.
998 : let full = utils::error::report_compact_sources(&e);
999 0 : span.in_scope(|| {
1000 0 : error!("error reading relation or page version: {full:#}")
1001 0 : });
1002 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1003 : message: e.to_string(),
1004 : })
1005 : }
1006 : },
1007 : Ok(response_msg) => response_msg,
1008 : };
1009 :
1010 : // marshal & transmit response message
1011 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
1012 : }
1013 : tokio::select! {
1014 : biased;
1015 : _ = self.cancel.cancelled() => {
1016 : // We were requested to shut down.
1017 : info!("shutdown request received in page handler");
1018 : return Err(QueryError::Shutdown)
1019 : }
1020 : res = pgb.flush() => {
1021 : res?;
1022 : }
1023 : }
1024 : }
1025 : }
1026 : Ok(())
1027 : }
1028 :
1029 : /// Helper function to handle the LSN from client request.
1030 : ///
1031 : /// Each GetPage (and Exists and Nblocks) request includes information about
1032 : /// which version of the page is being requested. The primary compute node
1033 : /// will always request the latest page version, by setting 'request_lsn' to
1034 : /// the last inserted or flushed WAL position, while a standby will request
1035 : /// a version at the LSN that it's currently caught up to.
1036 : ///
1037 : /// In either case, if the page server hasn't received the WAL up to the
1038 : /// requested LSN yet, we will wait for it to arrive. The return value is
1039 : /// the LSN that should be used to look up the page versions.
1040 : ///
1041 : /// In addition to the request LSN, each request carries another LSN,
1042 : /// 'not_modified_since', which is a hint to the pageserver that the client
1043 : /// knows that the page has not been modified between 'not_modified_since'
1044 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1045 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1046 : /// information about when the page was modified, it will use
1047 : /// not_modified_since == lsn. If the client lies and sends a too low
1048 : /// not_modified_hint such that there are in fact later page versions, the
1049 : /// behavior is undefined: the pageserver may return any of the page versions
1050 : /// or an error.
1051 0 : async fn wait_or_get_last_lsn(
1052 0 : timeline: &Timeline,
1053 0 : request_lsn: Lsn,
1054 0 : not_modified_since: Lsn,
1055 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1056 0 : ctx: &RequestContext,
1057 0 : ) -> Result<Lsn, PageStreamError> {
1058 0 : let last_record_lsn = timeline.get_last_record_lsn();
1059 0 :
1060 0 : // Sanity check the request
1061 0 : if request_lsn < not_modified_since {
1062 0 : return Err(PageStreamError::BadRequest(
1063 0 : format!(
1064 0 : "invalid request with request LSN {} and not_modified_since {}",
1065 0 : request_lsn, not_modified_since,
1066 0 : )
1067 0 : .into(),
1068 0 : ));
1069 0 : }
1070 0 :
1071 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1072 0 : if request_lsn == Lsn::INVALID {
1073 0 : return Err(PageStreamError::BadRequest(
1074 0 : "invalid LSN(0) in request".into(),
1075 0 : ));
1076 0 : }
1077 0 :
1078 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1079 0 : //
1080 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1081 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1082 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1083 0 : let gc_info = &timeline.gc_info.read().unwrap();
1084 0 : if !gc_info.leases.contains_key(&request_lsn) {
1085 0 : return Err(
1086 0 : PageStreamError::BadRequest(format!(
1087 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1088 0 : request_lsn, **latest_gc_cutoff_lsn
1089 0 : ).into())
1090 0 : );
1091 0 : }
1092 0 : }
1093 :
1094 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1095 0 : if not_modified_since > last_record_lsn {
1096 0 : timeline
1097 0 : .wait_lsn(
1098 0 : not_modified_since,
1099 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1100 0 : ctx,
1101 0 : )
1102 0 : .await?;
1103 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1104 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1105 : // advance immediately after we return anyway)
1106 0 : Ok(not_modified_since)
1107 : } else {
1108 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1109 : // here instead. That would give the same result, since we know that there
1110 : // haven't been any modifications since 'not_modified_since'. Using an older
1111 : // LSN might be faster, because that could allow skipping recent layers when
1112 : // finding the page. However, we have historically used 'last_record_lsn', so
1113 : // stick to that for now.
1114 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1115 : }
1116 0 : }
1117 :
1118 : /// Handles the lsn lease request.
1119 : /// If a lease cannot be obtained, the client will receive NULL.
1120 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
1121 : async fn handle_make_lsn_lease<IO>(
1122 : &mut self,
1123 : pgb: &mut PostgresBackend<IO>,
1124 : tenant_shard_id: TenantShardId,
1125 : timeline_id: TimelineId,
1126 : lsn: Lsn,
1127 : ctx: &RequestContext,
1128 : ) -> Result<(), QueryError>
1129 : where
1130 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1131 : {
1132 : let timeline = self
1133 : .timeline_handles
1134 : .get(
1135 : tenant_shard_id.tenant_id,
1136 : timeline_id,
1137 : ShardSelector::Known(tenant_shard_id.to_index()),
1138 : )
1139 : .await?;
1140 : set_tracing_field_shard_id(&timeline);
1141 :
1142 : let lease = timeline
1143 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1144 0 : .inspect_err(|e| {
1145 0 : warn!("{e}");
1146 0 : })
1147 : .ok();
1148 0 : let valid_until_str = lease.map(|l| {
1149 0 : l.valid_until
1150 0 : .duration_since(SystemTime::UNIX_EPOCH)
1151 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1152 0 : .as_millis()
1153 0 : .to_string()
1154 0 : });
1155 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1156 :
1157 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1158 : b"valid_until",
1159 : )]))?
1160 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1161 :
1162 : Ok(())
1163 : }
1164 :
1165 0 : #[instrument(skip_all, fields(shard_id))]
1166 : async fn handle_get_rel_exists_request(
1167 : &mut self,
1168 : tenant_id: TenantId,
1169 : timeline_id: TimelineId,
1170 : req: &PagestreamExistsRequest,
1171 : ctx: &RequestContext,
1172 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1173 : let timeline = self
1174 : .timeline_handles
1175 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1176 : .await?;
1177 : let _timer = timeline
1178 : .query_metrics
1179 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
1180 :
1181 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1182 : let lsn = Self::wait_or_get_last_lsn(
1183 : &timeline,
1184 : req.request_lsn,
1185 : req.not_modified_since,
1186 : &latest_gc_cutoff_lsn,
1187 : ctx,
1188 : )
1189 : .await?;
1190 :
1191 : let exists = timeline
1192 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1193 : .await?;
1194 :
1195 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1196 : exists,
1197 : }))
1198 : }
1199 :
1200 0 : #[instrument(skip_all, fields(shard_id))]
1201 : async fn handle_get_nblocks_request(
1202 : &mut self,
1203 : tenant_id: TenantId,
1204 : timeline_id: TimelineId,
1205 : req: &PagestreamNblocksRequest,
1206 : ctx: &RequestContext,
1207 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1208 : let timeline = self
1209 : .timeline_handles
1210 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1211 : .await?;
1212 :
1213 : let _timer = timeline
1214 : .query_metrics
1215 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
1216 :
1217 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1218 : let lsn = Self::wait_or_get_last_lsn(
1219 : &timeline,
1220 : req.request_lsn,
1221 : req.not_modified_since,
1222 : &latest_gc_cutoff_lsn,
1223 : ctx,
1224 : )
1225 : .await?;
1226 :
1227 : let n_blocks = timeline
1228 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1229 : .await?;
1230 :
1231 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1232 : n_blocks,
1233 : }))
1234 : }
1235 :
1236 0 : #[instrument(skip_all, fields(shard_id))]
1237 : async fn handle_db_size_request(
1238 : &mut self,
1239 : tenant_id: TenantId,
1240 : timeline_id: TimelineId,
1241 : req: &PagestreamDbSizeRequest,
1242 : ctx: &RequestContext,
1243 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1244 : let timeline = self
1245 : .timeline_handles
1246 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1247 : .await?;
1248 :
1249 : let _timer = timeline
1250 : .query_metrics
1251 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
1252 :
1253 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1254 : let lsn = Self::wait_or_get_last_lsn(
1255 : &timeline,
1256 : req.request_lsn,
1257 : req.not_modified_since,
1258 : &latest_gc_cutoff_lsn,
1259 : ctx,
1260 : )
1261 : .await?;
1262 :
1263 : let total_blocks = timeline
1264 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1265 : .await?;
1266 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1267 :
1268 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1269 : db_size,
1270 : }))
1271 : }
1272 :
1273 0 : #[instrument(skip_all)]
1274 : async fn handle_get_page_at_lsn_request_batched(
1275 : &mut self,
1276 : timeline: &Timeline,
1277 : effective_lsn: Lsn,
1278 : pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
1279 : ctx: &RequestContext,
1280 : ) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
1281 : debug_assert_current_span_has_tenant_and_timeline_id();
1282 : let _timer = timeline.query_metrics.start_timer_many(
1283 : metrics::SmgrQueryType::GetPageAtLsn,
1284 : pages.len(),
1285 : ctx,
1286 : );
1287 :
1288 : let pages = timeline
1289 : .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
1290 : .await;
1291 :
1292 0 : Vec::from_iter(pages.into_iter().map(|page| {
1293 0 : page.map(|page| {
1294 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
1295 0 : })
1296 0 : .map_err(PageStreamError::from)
1297 0 : }))
1298 : }
1299 :
1300 0 : #[instrument(skip_all, fields(shard_id))]
1301 : async fn handle_get_slru_segment_request(
1302 : &mut self,
1303 : tenant_id: TenantId,
1304 : timeline_id: TimelineId,
1305 : req: &PagestreamGetSlruSegmentRequest,
1306 : ctx: &RequestContext,
1307 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1308 : let timeline = self
1309 : .timeline_handles
1310 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1311 : .await?;
1312 :
1313 : let _timer = timeline
1314 : .query_metrics
1315 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1316 :
1317 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1318 : let lsn = Self::wait_or_get_last_lsn(
1319 : &timeline,
1320 : req.request_lsn,
1321 : req.not_modified_since,
1322 : &latest_gc_cutoff_lsn,
1323 : ctx,
1324 : )
1325 : .await?;
1326 :
1327 : let kind = SlruKind::from_repr(req.kind)
1328 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1329 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1330 :
1331 : Ok(PagestreamBeMessage::GetSlruSegment(
1332 : PagestreamGetSlruSegmentResponse { segment },
1333 : ))
1334 : }
1335 :
1336 : /// Note on "fullbackup":
1337 : /// Full basebackups should only be used for debugging purposes.
1338 : /// Originally, it was introduced to enable breaking storage format changes,
1339 : /// but that is not applicable anymore.
1340 : ///
1341 : /// # Coding Discipline
1342 : ///
1343 : /// Coding discipline within this function: all interaction with the `pgb` connection
1344 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1345 : /// This is so that we can shutdown page_service quickly.
1346 : ///
1347 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1348 : /// to connection cancellation.
1349 : #[allow(clippy::too_many_arguments)]
1350 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1351 : async fn handle_basebackup_request<IO>(
1352 : &mut self,
1353 : pgb: &mut PostgresBackend<IO>,
1354 : tenant_id: TenantId,
1355 : timeline_id: TimelineId,
1356 : lsn: Option<Lsn>,
1357 : prev_lsn: Option<Lsn>,
1358 : full_backup: bool,
1359 : gzip: bool,
1360 : replica: bool,
1361 : ctx: &RequestContext,
1362 : ) -> Result<(), QueryError>
1363 : where
1364 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1365 : {
1366 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1367 0 : match err {
1368 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1369 0 : BasebackupError::Server(e) => QueryError::Other(e),
1370 : }
1371 0 : }
1372 :
1373 : let started = std::time::Instant::now();
1374 :
1375 : let timeline = self
1376 : .timeline_handles
1377 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1378 : .await?;
1379 :
1380 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1381 : if let Some(lsn) = lsn {
1382 : // Backup was requested at a particular LSN. Wait for it to arrive.
1383 : info!("waiting for {}", lsn);
1384 : timeline
1385 : .wait_lsn(
1386 : lsn,
1387 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1388 : ctx,
1389 : )
1390 : .await?;
1391 : timeline
1392 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1393 : .context("invalid basebackup lsn")?;
1394 : }
1395 :
1396 : let lsn_awaited_after = started.elapsed();
1397 :
1398 : // switch client to COPYOUT
1399 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1400 : .map_err(QueryError::Disconnected)?;
1401 : self.flush_cancellable(pgb, &self.cancel).await?;
1402 :
1403 : // Send a tarball of the latest layer on the timeline. Compress if not
1404 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1405 : if full_backup {
1406 : let mut writer = pgb.copyout_writer();
1407 : basebackup::send_basebackup_tarball(
1408 : &mut writer,
1409 : &timeline,
1410 : lsn,
1411 : prev_lsn,
1412 : full_backup,
1413 : replica,
1414 : ctx,
1415 : )
1416 : .await
1417 : .map_err(map_basebackup_error)?;
1418 : } else {
1419 : let mut writer = BufWriter::new(pgb.copyout_writer());
1420 : if gzip {
1421 : let mut encoder = GzipEncoder::with_quality(
1422 : &mut writer,
1423 : // NOTE using fast compression because it's on the critical path
1424 : // for compute startup. For an empty database, we get
1425 : // <100KB with this method. The Level::Best compression method
1426 : // gives us <20KB, but maybe we should add basebackup caching
1427 : // on compute shutdown first.
1428 : async_compression::Level::Fastest,
1429 : );
1430 : basebackup::send_basebackup_tarball(
1431 : &mut encoder,
1432 : &timeline,
1433 : lsn,
1434 : prev_lsn,
1435 : full_backup,
1436 : replica,
1437 : ctx,
1438 : )
1439 : .await
1440 : .map_err(map_basebackup_error)?;
1441 : // shutdown the encoder to ensure the gzip footer is written
1442 : encoder
1443 : .shutdown()
1444 : .await
1445 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1446 : } else {
1447 : basebackup::send_basebackup_tarball(
1448 : &mut writer,
1449 : &timeline,
1450 : lsn,
1451 : prev_lsn,
1452 : full_backup,
1453 : replica,
1454 : ctx,
1455 : )
1456 : .await
1457 : .map_err(map_basebackup_error)?;
1458 : }
1459 : writer
1460 : .flush()
1461 : .await
1462 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
1463 : }
1464 :
1465 : pgb.write_message_noflush(&BeMessage::CopyDone)
1466 : .map_err(QueryError::Disconnected)?;
1467 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1468 :
1469 : let basebackup_after = started
1470 : .elapsed()
1471 : .checked_sub(lsn_awaited_after)
1472 : .unwrap_or(Duration::ZERO);
1473 :
1474 : info!(
1475 : lsn_await_millis = lsn_awaited_after.as_millis(),
1476 : basebackup_millis = basebackup_after.as_millis(),
1477 : "basebackup complete"
1478 : );
1479 :
1480 : Ok(())
1481 : }
1482 :
1483 : // when accessing management api supply None as an argument
1484 : // when using to authorize tenant pass corresponding tenant id
1485 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1486 0 : if self.auth.is_none() {
1487 : // auth is set to Trust, nothing to check so just return ok
1488 0 : return Ok(());
1489 0 : }
1490 0 : // auth is some, just checked above, when auth is some
1491 0 : // then claims are always present because of checks during connection init
1492 0 : // so this expect won't trigger
1493 0 : let claims = self
1494 0 : .claims
1495 0 : .as_ref()
1496 0 : .expect("claims presence already checked");
1497 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1498 0 : }
1499 : }
1500 :
1501 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
1502 : #[derive(Debug, Clone, Eq, PartialEq)]
1503 : struct BaseBackupCmd {
1504 : tenant_id: TenantId,
1505 : timeline_id: TimelineId,
1506 : lsn: Option<Lsn>,
1507 : gzip: bool,
1508 : replica: bool,
1509 : }
1510 :
1511 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
1512 : #[derive(Debug, Clone, Eq, PartialEq)]
1513 : struct FullBackupCmd {
1514 : tenant_id: TenantId,
1515 : timeline_id: TimelineId,
1516 : lsn: Option<Lsn>,
1517 : prev_lsn: Option<Lsn>,
1518 : }
1519 :
1520 : /// `pagestream_v2 tenant timeline`
1521 : #[derive(Debug, Clone, Eq, PartialEq)]
1522 : struct PageStreamCmd {
1523 : tenant_id: TenantId,
1524 : timeline_id: TimelineId,
1525 : }
1526 :
1527 : /// `lease lsn tenant timeline lsn`
1528 : #[derive(Debug, Clone, Eq, PartialEq)]
1529 : struct LeaseLsnCmd {
1530 : tenant_shard_id: TenantShardId,
1531 : timeline_id: TimelineId,
1532 : lsn: Lsn,
1533 : }
1534 :
1535 : #[derive(Debug, Clone, Eq, PartialEq)]
1536 : enum PageServiceCmd {
1537 : Set,
1538 : PageStream(PageStreamCmd),
1539 : BaseBackup(BaseBackupCmd),
1540 : FullBackup(FullBackupCmd),
1541 : LeaseLsn(LeaseLsnCmd),
1542 : }
1543 :
1544 : impl PageStreamCmd {
1545 6 : fn parse(query: &str) -> anyhow::Result<Self> {
1546 6 : let parameters = query.split_whitespace().collect_vec();
1547 6 : if parameters.len() != 2 {
1548 2 : bail!(
1549 2 : "invalid number of parameters for pagestream command: {}",
1550 2 : query
1551 2 : );
1552 4 : }
1553 4 : let tenant_id = TenantId::from_str(parameters[0])
1554 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1555 2 : let timeline_id = TimelineId::from_str(parameters[1])
1556 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1557 2 : Ok(Self {
1558 2 : tenant_id,
1559 2 : timeline_id,
1560 2 : })
1561 6 : }
1562 : }
1563 :
1564 : impl FullBackupCmd {
1565 4 : fn parse(query: &str) -> anyhow::Result<Self> {
1566 4 : let parameters = query.split_whitespace().collect_vec();
1567 4 : if parameters.len() < 2 || parameters.len() > 4 {
1568 0 : bail!(
1569 0 : "invalid number of parameters for basebackup command: {}",
1570 0 : query
1571 0 : );
1572 4 : }
1573 4 : let tenant_id = TenantId::from_str(parameters[0])
1574 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1575 4 : let timeline_id = TimelineId::from_str(parameters[1])
1576 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1577 : // The caller is responsible for providing correct lsn and prev_lsn.
1578 4 : let lsn = if let Some(lsn_str) = parameters.get(2) {
1579 : Some(
1580 2 : Lsn::from_str(lsn_str)
1581 2 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1582 : )
1583 : } else {
1584 2 : None
1585 : };
1586 4 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
1587 : Some(
1588 2 : Lsn::from_str(prev_lsn_str)
1589 2 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1590 : )
1591 : } else {
1592 2 : None
1593 : };
1594 4 : Ok(Self {
1595 4 : tenant_id,
1596 4 : timeline_id,
1597 4 : lsn,
1598 4 : prev_lsn,
1599 4 : })
1600 4 : }
1601 : }
1602 :
1603 : impl BaseBackupCmd {
1604 18 : fn parse(query: &str) -> anyhow::Result<Self> {
1605 18 : let parameters = query.split_whitespace().collect_vec();
1606 18 : if parameters.len() < 2 {
1607 0 : bail!(
1608 0 : "invalid number of parameters for basebackup command: {}",
1609 0 : query
1610 0 : );
1611 18 : }
1612 18 : let tenant_id = TenantId::from_str(parameters[0])
1613 18 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1614 18 : let timeline_id = TimelineId::from_str(parameters[1])
1615 18 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1616 : let lsn;
1617 : let flags_parse_from;
1618 18 : if let Some(maybe_lsn) = parameters.get(2) {
1619 16 : if *maybe_lsn == "latest" {
1620 2 : lsn = None;
1621 2 : flags_parse_from = 3;
1622 14 : } else if maybe_lsn.starts_with("--") {
1623 10 : lsn = None;
1624 10 : flags_parse_from = 2;
1625 10 : } else {
1626 : lsn = Some(
1627 4 : Lsn::from_str(maybe_lsn)
1628 4 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
1629 : );
1630 4 : flags_parse_from = 3;
1631 : }
1632 2 : } else {
1633 2 : lsn = None;
1634 2 : flags_parse_from = 2;
1635 2 : }
1636 :
1637 18 : let mut gzip = false;
1638 18 : let mut replica = false;
1639 :
1640 22 : for ¶m in ¶meters[flags_parse_from..] {
1641 22 : match param {
1642 22 : "--gzip" => {
1643 14 : if gzip {
1644 2 : bail!("duplicate parameter for basebackup command: {param}")
1645 12 : }
1646 12 : gzip = true
1647 : }
1648 8 : "--replica" => {
1649 4 : if replica {
1650 0 : bail!("duplicate parameter for basebackup command: {param}")
1651 4 : }
1652 4 : replica = true
1653 : }
1654 4 : _ => bail!("invalid parameter for basebackup command: {param}"),
1655 : }
1656 : }
1657 12 : Ok(Self {
1658 12 : tenant_id,
1659 12 : timeline_id,
1660 12 : lsn,
1661 12 : gzip,
1662 12 : replica,
1663 12 : })
1664 18 : }
1665 : }
1666 :
1667 : impl LeaseLsnCmd {
1668 4 : fn parse(query: &str) -> anyhow::Result<Self> {
1669 4 : let parameters = query.split_whitespace().collect_vec();
1670 4 : if parameters.len() != 3 {
1671 0 : bail!(
1672 0 : "invalid number of parameters for lease lsn command: {}",
1673 0 : query
1674 0 : );
1675 4 : }
1676 4 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
1677 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1678 4 : let timeline_id = TimelineId::from_str(parameters[1])
1679 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1680 4 : let lsn = Lsn::from_str(parameters[2])
1681 4 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
1682 4 : Ok(Self {
1683 4 : tenant_shard_id,
1684 4 : timeline_id,
1685 4 : lsn,
1686 4 : })
1687 4 : }
1688 : }
1689 :
1690 : impl PageServiceCmd {
1691 42 : fn parse(query: &str) -> anyhow::Result<Self> {
1692 42 : let query = query.trim();
1693 42 : let Some((cmd, other)) = query.split_once(' ') else {
1694 4 : bail!("cannot parse query: {query}")
1695 : };
1696 38 : match cmd.to_ascii_lowercase().as_str() {
1697 38 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)),
1698 32 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
1699 14 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
1700 10 : "lease" => {
1701 6 : let Some((cmd2, other)) = other.split_once(' ') else {
1702 0 : bail!("invalid lease command: {cmd}");
1703 : };
1704 6 : let cmd2 = cmd2.to_ascii_lowercase();
1705 6 : if cmd2 == "lsn" {
1706 4 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
1707 : } else {
1708 2 : bail!("invalid lease command: {cmd}");
1709 : }
1710 : }
1711 4 : "set" => Ok(Self::Set),
1712 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
1713 : }
1714 42 : }
1715 : }
1716 :
1717 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1718 : where
1719 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1720 : {
1721 0 : fn check_auth_jwt(
1722 0 : &mut self,
1723 0 : _pgb: &mut PostgresBackend<IO>,
1724 0 : jwt_response: &[u8],
1725 0 : ) -> Result<(), QueryError> {
1726 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1727 : // which requires auth to be present
1728 0 : let data = self
1729 0 : .auth
1730 0 : .as_ref()
1731 0 : .unwrap()
1732 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1733 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1734 :
1735 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1736 0 : return Err(QueryError::Unauthorized(
1737 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1738 0 : ));
1739 0 : }
1740 0 :
1741 0 : debug!(
1742 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1743 : data.claims.scope, data.claims.tenant_id,
1744 : );
1745 :
1746 0 : self.claims = Some(data.claims);
1747 0 : Ok(())
1748 0 : }
1749 :
1750 0 : fn startup(
1751 0 : &mut self,
1752 0 : _pgb: &mut PostgresBackend<IO>,
1753 0 : _sm: &FeStartupPacket,
1754 0 : ) -> Result<(), QueryError> {
1755 0 : fail::fail_point!("ps::connection-start::startup-packet");
1756 0 : Ok(())
1757 0 : }
1758 :
1759 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1760 : async fn process_query(
1761 : &mut self,
1762 : pgb: &mut PostgresBackend<IO>,
1763 : query_string: &str,
1764 : ) -> Result<(), QueryError> {
1765 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1766 0 : info!("Hit failpoint for bad connection");
1767 0 : Err(QueryError::SimulatedConnectionError)
1768 0 : });
1769 :
1770 : fail::fail_point!("ps::connection-start::process-query");
1771 :
1772 : let ctx = self.connection_ctx.attached_child();
1773 : debug!("process query {query_string}");
1774 : let query = PageServiceCmd::parse(query_string)?;
1775 : match query {
1776 : PageServiceCmd::PageStream(PageStreamCmd {
1777 : tenant_id,
1778 : timeline_id,
1779 : }) => {
1780 : tracing::Span::current()
1781 : .record("tenant_id", field::display(tenant_id))
1782 : .record("timeline_id", field::display(timeline_id));
1783 :
1784 : self.check_permission(Some(tenant_id))?;
1785 :
1786 : COMPUTE_COMMANDS_COUNTERS
1787 : .for_command(ComputeCommandKind::PageStreamV2)
1788 : .inc();
1789 :
1790 : self.handle_pagerequests(
1791 : pgb,
1792 : tenant_id,
1793 : timeline_id,
1794 : PagestreamProtocolVersion::V2,
1795 : ctx,
1796 : )
1797 : .await?;
1798 : }
1799 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1800 : tenant_id,
1801 : timeline_id,
1802 : lsn,
1803 : gzip,
1804 : replica,
1805 : }) => {
1806 : tracing::Span::current()
1807 : .record("tenant_id", field::display(tenant_id))
1808 : .record("timeline_id", field::display(timeline_id));
1809 :
1810 : self.check_permission(Some(tenant_id))?;
1811 :
1812 : COMPUTE_COMMANDS_COUNTERS
1813 : .for_command(ComputeCommandKind::Basebackup)
1814 : .inc();
1815 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1816 0 : let res = async {
1817 0 : self.handle_basebackup_request(
1818 0 : pgb,
1819 0 : tenant_id,
1820 0 : timeline_id,
1821 0 : lsn,
1822 0 : None,
1823 0 : false,
1824 0 : gzip,
1825 0 : replica,
1826 0 : &ctx,
1827 0 : )
1828 0 : .await?;
1829 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1830 0 : Result::<(), QueryError>::Ok(())
1831 0 : }
1832 : .await;
1833 : metric_recording.observe(&res);
1834 : res?;
1835 : }
1836 : // same as basebackup, but result includes relational data as well
1837 : PageServiceCmd::FullBackup(FullBackupCmd {
1838 : tenant_id,
1839 : timeline_id,
1840 : lsn,
1841 : prev_lsn,
1842 : }) => {
1843 : tracing::Span::current()
1844 : .record("tenant_id", field::display(tenant_id))
1845 : .record("timeline_id", field::display(timeline_id));
1846 :
1847 : self.check_permission(Some(tenant_id))?;
1848 :
1849 : COMPUTE_COMMANDS_COUNTERS
1850 : .for_command(ComputeCommandKind::Fullbackup)
1851 : .inc();
1852 :
1853 : // Check that the timeline exists
1854 : self.handle_basebackup_request(
1855 : pgb,
1856 : tenant_id,
1857 : timeline_id,
1858 : lsn,
1859 : prev_lsn,
1860 : true,
1861 : false,
1862 : false,
1863 : &ctx,
1864 : )
1865 : .await?;
1866 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1867 : }
1868 : PageServiceCmd::Set => {
1869 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1870 : // on connect
1871 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1872 : }
1873 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
1874 : tenant_shard_id,
1875 : timeline_id,
1876 : lsn,
1877 : }) => {
1878 : tracing::Span::current()
1879 : .record("tenant_id", field::display(tenant_shard_id))
1880 : .record("timeline_id", field::display(timeline_id));
1881 :
1882 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1883 :
1884 : COMPUTE_COMMANDS_COUNTERS
1885 : .for_command(ComputeCommandKind::LeaseLsn)
1886 : .inc();
1887 :
1888 : match self
1889 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1890 : .await
1891 : {
1892 : Ok(()) => {
1893 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
1894 : }
1895 : Err(e) => {
1896 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1897 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1898 : &e.to_string(),
1899 : Some(e.pg_error_code()),
1900 : ))?
1901 : }
1902 : };
1903 : }
1904 : }
1905 :
1906 : Ok(())
1907 : }
1908 : }
1909 :
1910 : impl From<GetActiveTenantError> for QueryError {
1911 0 : fn from(e: GetActiveTenantError) -> Self {
1912 0 : match e {
1913 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1914 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1915 0 : ),
1916 : GetActiveTenantError::Cancelled
1917 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1918 0 : QueryError::Shutdown
1919 : }
1920 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1921 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1922 : }
1923 0 : }
1924 : }
1925 :
1926 0 : #[derive(Debug, thiserror::Error)]
1927 : pub(crate) enum GetActiveTimelineError {
1928 : #[error(transparent)]
1929 : Tenant(GetActiveTenantError),
1930 : #[error(transparent)]
1931 : Timeline(#[from] GetTimelineError),
1932 : }
1933 :
1934 : impl From<GetActiveTimelineError> for QueryError {
1935 0 : fn from(e: GetActiveTimelineError) -> Self {
1936 0 : match e {
1937 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1938 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1939 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1940 : }
1941 0 : }
1942 : }
1943 :
1944 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1945 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1946 0 : tracing::Span::current().record(
1947 0 : "shard_id",
1948 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1949 0 : );
1950 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1951 0 : }
1952 :
1953 : struct WaitedForLsn(Lsn);
1954 : impl From<WaitedForLsn> for Lsn {
1955 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
1956 0 : lsn
1957 0 : }
1958 : }
1959 :
1960 : #[cfg(test)]
1961 : mod tests {
1962 : use utils::shard::ShardCount;
1963 :
1964 : use super::*;
1965 :
1966 : #[test]
1967 2 : fn pageservice_cmd_parse() {
1968 2 : let tenant_id = TenantId::generate();
1969 2 : let timeline_id = TimelineId::generate();
1970 2 : let cmd =
1971 2 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
1972 2 : assert_eq!(
1973 2 : cmd,
1974 2 : PageServiceCmd::PageStream(PageStreamCmd {
1975 2 : tenant_id,
1976 2 : timeline_id
1977 2 : })
1978 2 : );
1979 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
1980 2 : assert_eq!(
1981 2 : cmd,
1982 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1983 2 : tenant_id,
1984 2 : timeline_id,
1985 2 : lsn: None,
1986 2 : gzip: false,
1987 2 : replica: false
1988 2 : })
1989 2 : );
1990 2 : let cmd =
1991 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
1992 2 : assert_eq!(
1993 2 : cmd,
1994 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1995 2 : tenant_id,
1996 2 : timeline_id,
1997 2 : lsn: None,
1998 2 : gzip: true,
1999 2 : replica: false
2000 2 : })
2001 2 : );
2002 2 : let cmd =
2003 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2004 2 : assert_eq!(
2005 2 : cmd,
2006 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2007 2 : tenant_id,
2008 2 : timeline_id,
2009 2 : lsn: None,
2010 2 : gzip: false,
2011 2 : replica: false
2012 2 : })
2013 2 : );
2014 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2015 2 : .unwrap();
2016 2 : assert_eq!(
2017 2 : cmd,
2018 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2019 2 : tenant_id,
2020 2 : timeline_id,
2021 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2022 2 : gzip: false,
2023 2 : replica: false
2024 2 : })
2025 2 : );
2026 2 : let cmd = PageServiceCmd::parse(&format!(
2027 2 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2028 2 : ))
2029 2 : .unwrap();
2030 2 : assert_eq!(
2031 2 : cmd,
2032 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2033 2 : tenant_id,
2034 2 : timeline_id,
2035 2 : lsn: None,
2036 2 : gzip: true,
2037 2 : replica: true
2038 2 : })
2039 2 : );
2040 2 : let cmd = PageServiceCmd::parse(&format!(
2041 2 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2042 2 : ))
2043 2 : .unwrap();
2044 2 : assert_eq!(
2045 2 : cmd,
2046 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2047 2 : tenant_id,
2048 2 : timeline_id,
2049 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2050 2 : gzip: true,
2051 2 : replica: true
2052 2 : })
2053 2 : );
2054 2 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2055 2 : assert_eq!(
2056 2 : cmd,
2057 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2058 2 : tenant_id,
2059 2 : timeline_id,
2060 2 : lsn: None,
2061 2 : prev_lsn: None
2062 2 : })
2063 2 : );
2064 2 : let cmd = PageServiceCmd::parse(&format!(
2065 2 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2066 2 : ))
2067 2 : .unwrap();
2068 2 : assert_eq!(
2069 2 : cmd,
2070 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2071 2 : tenant_id,
2072 2 : timeline_id,
2073 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2074 2 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2075 2 : })
2076 2 : );
2077 2 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2078 2 : let cmd = PageServiceCmd::parse(&format!(
2079 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2080 2 : ))
2081 2 : .unwrap();
2082 2 : assert_eq!(
2083 2 : cmd,
2084 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2085 2 : tenant_shard_id,
2086 2 : timeline_id,
2087 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2088 2 : })
2089 2 : );
2090 2 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2091 2 : let cmd = PageServiceCmd::parse(&format!(
2092 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2093 2 : ))
2094 2 : .unwrap();
2095 2 : assert_eq!(
2096 2 : cmd,
2097 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2098 2 : tenant_shard_id,
2099 2 : timeline_id,
2100 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2101 2 : })
2102 2 : );
2103 2 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2104 2 : assert_eq!(cmd, PageServiceCmd::Set);
2105 2 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2106 2 : assert_eq!(cmd, PageServiceCmd::Set);
2107 2 : }
2108 :
2109 : #[test]
2110 2 : fn pageservice_cmd_err_handling() {
2111 2 : let tenant_id = TenantId::generate();
2112 2 : let timeline_id = TimelineId::generate();
2113 2 : let cmd = PageServiceCmd::parse("unknown_command");
2114 2 : assert!(cmd.is_err());
2115 2 : let cmd = PageServiceCmd::parse("pagestream_v2");
2116 2 : assert!(cmd.is_err());
2117 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2118 2 : assert!(cmd.is_err());
2119 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2120 2 : assert!(cmd.is_err());
2121 2 : let cmd = PageServiceCmd::parse(&format!(
2122 2 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2123 2 : ));
2124 2 : assert!(cmd.is_err());
2125 2 : let cmd = PageServiceCmd::parse(&format!(
2126 2 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2127 2 : ));
2128 2 : assert!(cmd.is_err());
2129 2 : let cmd = PageServiceCmd::parse(&format!(
2130 2 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2131 2 : ));
2132 2 : assert!(cmd.is_err());
2133 2 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2134 2 : assert!(cmd.is_err());
2135 2 : }
2136 : }
|