Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::config::{
11 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
12 : PageServiceProtocolPipelinedExecutionStrategy,
13 : };
14 : use pageserver_api::models::{self, TenantState};
15 : use pageserver_api::models::{
16 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
17 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
18 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
19 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
20 : PagestreamProtocolVersion, PagestreamRequest,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{
24 : is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError,
25 : };
26 : use pq_proto::framed::ConnectionError;
27 : use pq_proto::FeStartupPacket;
28 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
29 : use std::borrow::Cow;
30 : use std::io;
31 : use std::num::NonZeroUsize;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::SystemTime;
36 : use std::time::{Duration, Instant};
37 : use tokio::io::{AsyncRead, AsyncWrite};
38 : use tokio::io::{AsyncWriteExt, BufWriter};
39 : use tokio::task::JoinHandle;
40 : use tokio_util::sync::CancellationToken;
41 : use tracing::*;
42 : use utils::sync::spsc_fold;
43 : use utils::{
44 : auth::{Claims, Scope, SwappableJwtAuth},
45 : id::{TenantId, TimelineId},
46 : lsn::Lsn,
47 : simple_rcu::RcuReadGuard,
48 : };
49 :
50 : use crate::auth::check_permission;
51 : use crate::basebackup::BasebackupError;
52 : use crate::config::PageServerConf;
53 : use crate::context::{DownloadBehavior, RequestContext};
54 : use crate::metrics::{self, SmgrOpTimer};
55 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
56 : use crate::pgdatadir_mapping::Version;
57 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
58 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
59 : use crate::task_mgr::TaskKind;
60 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
61 : use crate::tenant::mgr::ShardSelector;
62 : use crate::tenant::mgr::TenantManager;
63 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
64 : use crate::tenant::timeline::{self, WaitLsnError};
65 : use crate::tenant::GetTimelineError;
66 : use crate::tenant::PageReconstructError;
67 : use crate::tenant::Timeline;
68 : use crate::{basebackup, timed_after_cancellation};
69 : use pageserver_api::key::rel_block_to_key;
70 : use pageserver_api::models::PageTraceEvent;
71 : use pageserver_api::reltag::SlruKind;
72 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
73 : use postgres_ffi::BLCKSZ;
74 :
75 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
76 : /// is not yet in state [`TenantState::Active`].
77 : ///
78 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
79 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
80 :
81 : ///////////////////////////////////////////////////////////////////////////////
82 :
83 : pub struct Listener {
84 : cancel: CancellationToken,
85 : /// Cancel the listener task through `listen_cancel` to shut down the listener
86 : /// and get a handle on the existing connections.
87 : task: JoinHandle<Connections>,
88 : }
89 :
90 : pub struct Connections {
91 : cancel: CancellationToken,
92 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
93 : }
94 :
95 0 : pub fn spawn(
96 0 : conf: &'static PageServerConf,
97 0 : tenant_manager: Arc<TenantManager>,
98 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
99 0 : tcp_listener: tokio::net::TcpListener,
100 0 : ) -> Listener {
101 0 : let cancel = CancellationToken::new();
102 0 : let libpq_ctx = RequestContext::todo_child(
103 0 : TaskKind::LibpqEndpointListener,
104 0 : // listener task shouldn't need to download anything. (We will
105 0 : // create a separate sub-contexts for each connection, with their
106 0 : // own download behavior. This context is used only to listen and
107 0 : // accept connections.)
108 0 : DownloadBehavior::Error,
109 0 : );
110 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
111 0 : "libpq listener",
112 0 : libpq_listener_main(
113 0 : tenant_manager,
114 0 : pg_auth,
115 0 : tcp_listener,
116 0 : conf.pg_auth_type,
117 0 : conf.page_service_pipelining.clone(),
118 0 : libpq_ctx,
119 0 : cancel.clone(),
120 0 : )
121 0 : .map(anyhow::Ok),
122 0 : ));
123 0 :
124 0 : Listener { cancel, task }
125 0 : }
126 :
127 : impl Listener {
128 0 : pub async fn stop_accepting(self) -> Connections {
129 0 : self.cancel.cancel();
130 0 : self.task
131 0 : .await
132 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
133 0 : }
134 : }
135 : impl Connections {
136 0 : pub(crate) async fn shutdown(self) {
137 0 : let Self { cancel, mut tasks } = self;
138 0 : cancel.cancel();
139 0 : while let Some(res) = tasks.join_next().await {
140 0 : Self::handle_connection_completion(res);
141 0 : }
142 0 : }
143 :
144 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
145 0 : match res {
146 0 : Ok(Ok(())) => {}
147 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
148 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
149 : }
150 0 : }
151 : }
152 :
153 : ///
154 : /// Main loop of the page service.
155 : ///
156 : /// Listens for connections, and launches a new handler task for each.
157 : ///
158 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
159 : /// open connections.
160 : ///
161 0 : pub async fn libpq_listener_main(
162 0 : tenant_manager: Arc<TenantManager>,
163 0 : auth: Option<Arc<SwappableJwtAuth>>,
164 0 : listener: tokio::net::TcpListener,
165 0 : auth_type: AuthType,
166 0 : pipelining_config: PageServicePipeliningConfig,
167 0 : listener_ctx: RequestContext,
168 0 : listener_cancel: CancellationToken,
169 0 : ) -> Connections {
170 0 : let connections_cancel = CancellationToken::new();
171 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
172 :
173 : loop {
174 0 : let accepted = tokio::select! {
175 : biased;
176 0 : _ = listener_cancel.cancelled() => break,
177 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
178 0 : let res = next.expect("we dont poll while empty");
179 0 : Connections::handle_connection_completion(res);
180 0 : continue;
181 : }
182 0 : accepted = listener.accept() => accepted,
183 0 : };
184 0 :
185 0 : match accepted {
186 0 : Ok((socket, peer_addr)) => {
187 0 : // Connection established. Spawn a new task to handle it.
188 0 : debug!("accepted connection from {}", peer_addr);
189 0 : let local_auth = auth.clone();
190 0 : let connection_ctx = listener_ctx
191 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
192 0 : connection_handler_tasks.spawn(page_service_conn_main(
193 0 : tenant_manager.clone(),
194 0 : local_auth,
195 0 : socket,
196 0 : auth_type,
197 0 : pipelining_config.clone(),
198 0 : connection_ctx,
199 0 : connections_cancel.child_token(),
200 0 : ));
201 : }
202 0 : Err(err) => {
203 0 : // accept() failed. Log the error, and loop back to retry on next connection.
204 0 : error!("accept() failed: {:?}", err);
205 : }
206 : }
207 : }
208 :
209 0 : debug!("page_service listener loop terminated");
210 :
211 0 : Connections {
212 0 : cancel: connections_cancel,
213 0 : tasks: connection_handler_tasks,
214 0 : }
215 0 : }
216 :
217 : type ConnectionHandlerResult = anyhow::Result<()>;
218 :
219 : #[instrument(skip_all, fields(peer_addr))]
220 : async fn page_service_conn_main(
221 : tenant_manager: Arc<TenantManager>,
222 : auth: Option<Arc<SwappableJwtAuth>>,
223 : socket: tokio::net::TcpStream,
224 : auth_type: AuthType,
225 : pipelining_config: PageServicePipeliningConfig,
226 : connection_ctx: RequestContext,
227 : cancel: CancellationToken,
228 : ) -> ConnectionHandlerResult {
229 : let _guard = LIVE_CONNECTIONS
230 : .with_label_values(&["page_service"])
231 : .guard();
232 :
233 : socket
234 : .set_nodelay(true)
235 : .context("could not set TCP_NODELAY")?;
236 :
237 : let peer_addr = socket.peer_addr().context("get peer address")?;
238 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
239 :
240 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
241 : // - long enough for most valid compute connections
242 : // - less than infinite to stop us from "leaking" connections to long-gone computes
243 : //
244 : // no write timeout is used, because the kernel is assumed to error writes after some time.
245 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
246 :
247 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
248 0 : let socket_timeout_ms = (|| {
249 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
250 : // Exponential distribution for simulating
251 : // poor network conditions, expect about avg_timeout_ms to be around 15
252 : // in tests
253 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
254 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
255 0 : let u = rand::random::<f32>();
256 0 : ((1.0 - u).ln() / (-avg)) as u64
257 : } else {
258 0 : default_timeout_ms
259 : }
260 0 : });
261 0 : default_timeout_ms
262 : })();
263 :
264 : // A timeout here does not mean the client died, it can happen if it's just idle for
265 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
266 : // they reconnect.
267 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
268 : let socket = Box::pin(socket);
269 :
270 : fail::fail_point!("ps::connection-start::pre-login");
271 :
272 : // XXX: pgbackend.run() should take the connection_ctx,
273 : // and create a child per-query context when it invokes process_query.
274 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
275 : // and create the per-query context in process_query ourselves.
276 : let mut conn_handler = PageServerHandler::new(
277 : tenant_manager,
278 : auth,
279 : pipelining_config,
280 : connection_ctx,
281 : cancel.clone(),
282 : );
283 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
284 :
285 : match pgbackend.run(&mut conn_handler, &cancel).await {
286 : Ok(()) => {
287 : // we've been requested to shut down
288 : Ok(())
289 : }
290 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
291 : if is_expected_io_error(&io_error) {
292 : info!("Postgres client disconnected ({io_error})");
293 : Ok(())
294 : } else {
295 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
296 : Err(io_error).context(format!(
297 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
298 : tenant_id, peer_addr
299 : ))
300 : }
301 : }
302 : other => {
303 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
304 : other.context(format!(
305 : "Postgres query error for tenant_id={:?} client peer_addr={}",
306 : tenant_id, peer_addr
307 : ))
308 : }
309 : }
310 : }
311 :
312 : struct PageServerHandler {
313 : auth: Option<Arc<SwappableJwtAuth>>,
314 : claims: Option<Claims>,
315 :
316 : /// The context created for the lifetime of the connection
317 : /// services by this PageServerHandler.
318 : /// For each query received over the connection,
319 : /// `process_query` creates a child context from this one.
320 : connection_ctx: RequestContext,
321 :
322 : cancel: CancellationToken,
323 :
324 : /// None only while pagestream protocol is being processed.
325 : timeline_handles: Option<TimelineHandles>,
326 :
327 : pipelining_config: PageServicePipeliningConfig,
328 : }
329 :
330 : struct TimelineHandles {
331 : wrapper: TenantManagerWrapper,
332 : /// Note on size: the typical size of this map is 1. The largest size we expect
333 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
334 : /// or the ratio used when splitting shards (i.e. how many children created from one)
335 : /// parent shard, where a "large" number might be ~8.
336 : handles: timeline::handle::Cache<TenantManagerTypes>,
337 : }
338 :
339 : impl TimelineHandles {
340 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
341 0 : Self {
342 0 : wrapper: TenantManagerWrapper {
343 0 : tenant_manager,
344 0 : tenant_id: OnceCell::new(),
345 0 : },
346 0 : handles: Default::default(),
347 0 : }
348 0 : }
349 0 : async fn get(
350 0 : &mut self,
351 0 : tenant_id: TenantId,
352 0 : timeline_id: TimelineId,
353 0 : shard_selector: ShardSelector,
354 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
355 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
356 0 : return Err(GetActiveTimelineError::Tenant(
357 0 : GetActiveTenantError::SwitchedTenant,
358 0 : ));
359 0 : }
360 0 : self.handles
361 0 : .get(timeline_id, shard_selector, &self.wrapper)
362 0 : .await
363 0 : .map_err(|e| match e {
364 0 : timeline::handle::GetError::TenantManager(e) => e,
365 : timeline::handle::GetError::TimelineGateClosed => {
366 0 : trace!("timeline gate closed");
367 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
368 : }
369 : timeline::handle::GetError::PerTimelineStateShutDown => {
370 0 : trace!("per-timeline state shut down");
371 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
372 : }
373 0 : })
374 0 : }
375 :
376 0 : fn tenant_id(&self) -> Option<TenantId> {
377 0 : self.wrapper.tenant_id.get().copied()
378 0 : }
379 : }
380 :
381 : pub(crate) struct TenantManagerWrapper {
382 : tenant_manager: Arc<TenantManager>,
383 : // We do not support switching tenant_id on a connection at this point.
384 : // We can can add support for this later if needed without changing
385 : // the protocol.
386 : tenant_id: once_cell::sync::OnceCell<TenantId>,
387 : }
388 :
389 : #[derive(Debug)]
390 : pub(crate) struct TenantManagerTypes;
391 :
392 : impl timeline::handle::Types for TenantManagerTypes {
393 : type TenantManagerError = GetActiveTimelineError;
394 : type TenantManager = TenantManagerWrapper;
395 : type Timeline = Arc<Timeline>;
396 : }
397 :
398 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
399 0 : fn gate(&self) -> &utils::sync::gate::Gate {
400 0 : &self.gate
401 0 : }
402 :
403 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
404 0 : Timeline::shard_timeline_id(self)
405 0 : }
406 :
407 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
408 0 : &self.handles
409 0 : }
410 :
411 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
412 0 : Timeline::get_shard_identity(self)
413 0 : }
414 : }
415 :
416 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
417 0 : async fn resolve(
418 0 : &self,
419 0 : timeline_id: TimelineId,
420 0 : shard_selector: ShardSelector,
421 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
422 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
423 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
424 0 : let wait_start = Instant::now();
425 0 : let deadline = wait_start + timeout;
426 0 : let tenant_shard = loop {
427 0 : let resolved = self
428 0 : .tenant_manager
429 0 : .resolve_attached_shard(tenant_id, shard_selector);
430 0 : match resolved {
431 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
432 : ShardResolveResult::NotFound => {
433 0 : return Err(GetActiveTimelineError::Tenant(
434 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
435 0 : ));
436 : }
437 0 : ShardResolveResult::InProgress(barrier) => {
438 0 : // We can't authoritatively answer right now: wait for InProgress state
439 0 : // to end, then try again
440 0 : tokio::select! {
441 0 : _ = barrier.wait() => {
442 0 : // The barrier completed: proceed around the loop to try looking up again
443 0 : },
444 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
445 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
446 0 : latest_state: None,
447 0 : wait_time: timeout,
448 0 : }));
449 : }
450 : }
451 : }
452 : };
453 : };
454 :
455 0 : tracing::debug!("Waiting for tenant to enter active state...");
456 0 : tenant_shard
457 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
458 0 : .await
459 0 : .map_err(GetActiveTimelineError::Tenant)?;
460 :
461 0 : let timeline = tenant_shard
462 0 : .get_timeline(timeline_id, true)
463 0 : .map_err(GetActiveTimelineError::Timeline)?;
464 0 : set_tracing_field_shard_id(&timeline);
465 0 : Ok(timeline)
466 0 : }
467 : }
468 :
469 : #[derive(thiserror::Error, Debug)]
470 : enum PageStreamError {
471 : /// We encountered an error that should prompt the client to reconnect:
472 : /// in practice this means we drop the connection without sending a response.
473 : #[error("Reconnect required: {0}")]
474 : Reconnect(Cow<'static, str>),
475 :
476 : /// We were instructed to shutdown while processing the query
477 : #[error("Shutting down")]
478 : Shutdown,
479 :
480 : /// Something went wrong reading a page: this likely indicates a pageserver bug
481 : #[error("Read error")]
482 : Read(#[source] PageReconstructError),
483 :
484 : /// Ran out of time waiting for an LSN
485 : #[error("LSN timeout: {0}")]
486 : LsnTimeout(WaitLsnError),
487 :
488 : /// The entity required to serve the request (tenant or timeline) is not found,
489 : /// or is not found in a suitable state to serve a request.
490 : #[error("Not found: {0}")]
491 : NotFound(Cow<'static, str>),
492 :
493 : /// Request asked for something that doesn't make sense, like an invalid LSN
494 : #[error("Bad request: {0}")]
495 : BadRequest(Cow<'static, str>),
496 : }
497 :
498 : impl From<PageReconstructError> for PageStreamError {
499 0 : fn from(value: PageReconstructError) -> Self {
500 0 : match value {
501 0 : PageReconstructError::Cancelled => Self::Shutdown,
502 0 : e => Self::Read(e),
503 : }
504 0 : }
505 : }
506 :
507 : impl From<GetActiveTimelineError> for PageStreamError {
508 0 : fn from(value: GetActiveTimelineError) -> Self {
509 0 : match value {
510 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
511 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
512 : TenantState::Stopping { .. },
513 : ))
514 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
515 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
516 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
517 : }
518 0 : }
519 : }
520 :
521 : impl From<WaitLsnError> for PageStreamError {
522 0 : fn from(value: WaitLsnError) -> Self {
523 0 : match value {
524 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
525 0 : WaitLsnError::Shutdown => Self::Shutdown,
526 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
527 : }
528 0 : }
529 : }
530 :
531 : impl From<WaitLsnError> for QueryError {
532 0 : fn from(value: WaitLsnError) -> Self {
533 0 : match value {
534 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
535 0 : WaitLsnError::Shutdown => Self::Shutdown,
536 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
537 : }
538 0 : }
539 : }
540 :
541 : #[derive(thiserror::Error, Debug)]
542 : struct BatchedPageStreamError {
543 : req: PagestreamRequest,
544 : err: PageStreamError,
545 : }
546 :
547 : impl std::fmt::Display for BatchedPageStreamError {
548 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549 0 : self.err.fmt(f)
550 0 : }
551 : }
552 :
553 : struct BatchedGetPageRequest {
554 : req: PagestreamGetPageRequest,
555 : timer: SmgrOpTimer,
556 : }
557 :
558 : enum BatchedFeMessage {
559 : Exists {
560 : span: Span,
561 : timer: SmgrOpTimer,
562 : shard: timeline::handle::Handle<TenantManagerTypes>,
563 : req: models::PagestreamExistsRequest,
564 : },
565 : Nblocks {
566 : span: Span,
567 : timer: SmgrOpTimer,
568 : shard: timeline::handle::Handle<TenantManagerTypes>,
569 : req: models::PagestreamNblocksRequest,
570 : },
571 : GetPage {
572 : span: Span,
573 : shard: timeline::handle::Handle<TenantManagerTypes>,
574 : effective_request_lsn: Lsn,
575 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
576 : },
577 : DbSize {
578 : span: Span,
579 : timer: SmgrOpTimer,
580 : shard: timeline::handle::Handle<TenantManagerTypes>,
581 : req: models::PagestreamDbSizeRequest,
582 : },
583 : GetSlruSegment {
584 : span: Span,
585 : timer: SmgrOpTimer,
586 : shard: timeline::handle::Handle<TenantManagerTypes>,
587 : req: models::PagestreamGetSlruSegmentRequest,
588 : },
589 : RespondError {
590 : span: Span,
591 : error: BatchedPageStreamError,
592 : },
593 : }
594 :
595 : impl BatchedFeMessage {
596 0 : fn observe_execution_start(&mut self, at: Instant) {
597 0 : match self {
598 0 : BatchedFeMessage::Exists { timer, .. }
599 0 : | BatchedFeMessage::Nblocks { timer, .. }
600 0 : | BatchedFeMessage::DbSize { timer, .. }
601 0 : | BatchedFeMessage::GetSlruSegment { timer, .. } => {
602 0 : timer.observe_execution_start(at);
603 0 : }
604 0 : BatchedFeMessage::GetPage { pages, .. } => {
605 0 : for page in pages {
606 0 : page.timer.observe_execution_start(at);
607 0 : }
608 : }
609 0 : BatchedFeMessage::RespondError { .. } => {}
610 : }
611 0 : }
612 : }
613 :
614 : impl PageServerHandler {
615 0 : pub fn new(
616 0 : tenant_manager: Arc<TenantManager>,
617 0 : auth: Option<Arc<SwappableJwtAuth>>,
618 0 : pipelining_config: PageServicePipeliningConfig,
619 0 : connection_ctx: RequestContext,
620 0 : cancel: CancellationToken,
621 0 : ) -> Self {
622 0 : PageServerHandler {
623 0 : auth,
624 0 : claims: None,
625 0 : connection_ctx,
626 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
627 0 : cancel,
628 0 : pipelining_config,
629 0 : }
630 0 : }
631 :
632 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
633 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
634 : /// cancellation if there aren't any timelines in the cache.
635 : ///
636 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
637 : /// timeline cancellation token.
638 0 : async fn flush_cancellable<IO>(
639 0 : &self,
640 0 : pgb: &mut PostgresBackend<IO>,
641 0 : cancel: &CancellationToken,
642 0 : ) -> Result<(), QueryError>
643 0 : where
644 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
645 0 : {
646 0 : tokio::select!(
647 0 : flush_r = pgb.flush() => {
648 0 : Ok(flush_r?)
649 : },
650 0 : _ = cancel.cancelled() => {
651 0 : Err(QueryError::Shutdown)
652 : }
653 : )
654 0 : }
655 :
656 : #[allow(clippy::too_many_arguments)]
657 0 : async fn pagestream_read_message<IO>(
658 0 : pgb: &mut PostgresBackendReader<IO>,
659 0 : tenant_id: TenantId,
660 0 : timeline_id: TimelineId,
661 0 : timeline_handles: &mut TimelineHandles,
662 0 : cancel: &CancellationToken,
663 0 : ctx: &RequestContext,
664 0 : protocol_version: PagestreamProtocolVersion,
665 0 : parent_span: Span,
666 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
667 0 : where
668 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
669 0 : {
670 0 : let msg = tokio::select! {
671 : biased;
672 0 : _ = cancel.cancelled() => {
673 0 : return Err(QueryError::Shutdown)
674 : }
675 0 : msg = pgb.read_message() => { msg }
676 0 : };
677 0 :
678 0 : let received_at = Instant::now();
679 :
680 0 : let copy_data_bytes = match msg? {
681 0 : Some(FeMessage::CopyData(bytes)) => bytes,
682 : Some(FeMessage::Terminate) => {
683 0 : return Ok(None);
684 : }
685 0 : Some(m) => {
686 0 : return Err(QueryError::Other(anyhow::anyhow!(
687 0 : "unexpected message: {m:?} during COPY"
688 0 : )));
689 : }
690 : None => {
691 0 : return Ok(None);
692 : } // client disconnected
693 : };
694 0 : trace!("query: {copy_data_bytes:?}");
695 :
696 0 : fail::fail_point!("ps::handle-pagerequest-message");
697 :
698 : // parse request
699 0 : let neon_fe_msg =
700 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
701 :
702 : // TODO: turn in to async closure once available to avoid repeating received_at
703 0 : async fn record_op_start_and_throttle(
704 0 : shard: &timeline::handle::Handle<TenantManagerTypes>,
705 0 : op: metrics::SmgrQueryType,
706 0 : received_at: Instant,
707 0 : ) -> Result<SmgrOpTimer, QueryError> {
708 0 : // It's important to start the smgr op metric recorder as early as possible
709 0 : // so that the _started counters are incremented before we do
710 0 : // any serious waiting, e.g., for throttle, batching, or actual request handling.
711 0 : let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
712 0 : let now = Instant::now();
713 0 : timer.observe_throttle_start(now);
714 0 : let throttled = tokio::select! {
715 0 : res = shard.pagestream_throttle.throttle(1, now) => res,
716 0 : _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
717 : };
718 0 : timer.observe_throttle_done(throttled);
719 0 : Ok(timer)
720 0 : }
721 :
722 0 : let batched_msg = match neon_fe_msg {
723 0 : PagestreamFeMessage::Exists(req) => {
724 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
725 0 : let shard = timeline_handles
726 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
727 0 : .instrument(span.clone()) // sets `shard_id` field
728 0 : .await?;
729 0 : let timer = record_op_start_and_throttle(
730 0 : &shard,
731 0 : metrics::SmgrQueryType::GetRelExists,
732 0 : received_at,
733 0 : )
734 0 : .await?;
735 0 : BatchedFeMessage::Exists {
736 0 : span,
737 0 : timer,
738 0 : shard,
739 0 : req,
740 0 : }
741 : }
742 0 : PagestreamFeMessage::Nblocks(req) => {
743 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
744 0 : let shard = timeline_handles
745 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
746 0 : .instrument(span.clone()) // sets `shard_id` field
747 0 : .await?;
748 0 : let timer = record_op_start_and_throttle(
749 0 : &shard,
750 0 : metrics::SmgrQueryType::GetRelSize,
751 0 : received_at,
752 0 : )
753 0 : .await?;
754 0 : BatchedFeMessage::Nblocks {
755 0 : span,
756 0 : timer,
757 0 : shard,
758 0 : req,
759 0 : }
760 : }
761 0 : PagestreamFeMessage::DbSize(req) => {
762 0 : let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
763 0 : let shard = timeline_handles
764 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
765 0 : .instrument(span.clone()) // sets `shard_id` field
766 0 : .await?;
767 0 : let timer = record_op_start_and_throttle(
768 0 : &shard,
769 0 : metrics::SmgrQueryType::GetDbSize,
770 0 : received_at,
771 0 : )
772 0 : .await?;
773 0 : BatchedFeMessage::DbSize {
774 0 : span,
775 0 : timer,
776 0 : shard,
777 0 : req,
778 0 : }
779 : }
780 0 : PagestreamFeMessage::GetSlruSegment(req) => {
781 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
782 0 : let shard = timeline_handles
783 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
784 0 : .instrument(span.clone()) // sets `shard_id` field
785 0 : .await?;
786 0 : let timer = record_op_start_and_throttle(
787 0 : &shard,
788 0 : metrics::SmgrQueryType::GetSlruSegment,
789 0 : received_at,
790 0 : )
791 0 : .await?;
792 0 : BatchedFeMessage::GetSlruSegment {
793 0 : span,
794 0 : timer,
795 0 : shard,
796 0 : req,
797 0 : }
798 : }
799 0 : PagestreamFeMessage::GetPage(req) => {
800 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
801 :
802 : macro_rules! respond_error {
803 : ($error:expr) => {{
804 : let error = BatchedFeMessage::RespondError {
805 : span,
806 : error: BatchedPageStreamError {
807 : req: req.hdr,
808 : err: $error,
809 : },
810 : };
811 : Ok(Some(error))
812 : }};
813 : }
814 :
815 0 : let key = rel_block_to_key(req.rel, req.blkno);
816 0 : let shard = match timeline_handles
817 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
818 0 : .instrument(span.clone()) // sets `shard_id` field
819 0 : .await
820 : {
821 0 : Ok(tl) => tl,
822 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
823 : // We already know this tenant exists in general, because we resolved it at
824 : // start of connection. Getting a NotFound here indicates that the shard containing
825 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
826 : // mapping is out of date.
827 : //
828 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
829 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
830 : // and talk to a different pageserver.
831 0 : return respond_error!(PageStreamError::Reconnect(
832 0 : "getpage@lsn request routed to wrong shard".into()
833 0 : ));
834 : }
835 0 : Err(e) => {
836 0 : return respond_error!(e.into());
837 : }
838 : };
839 :
840 0 : let timer = record_op_start_and_throttle(
841 0 : &shard,
842 0 : metrics::SmgrQueryType::GetPageAtLsn,
843 0 : received_at,
844 0 : )
845 0 : .await?;
846 :
847 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
848 0 : &shard,
849 0 : req.hdr.request_lsn,
850 0 : req.hdr.not_modified_since,
851 0 : &shard.get_latest_gc_cutoff_lsn(),
852 0 : ctx,
853 0 : )
854 0 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
855 0 : .await
856 : {
857 0 : Ok(lsn) => lsn,
858 0 : Err(e) => {
859 0 : return respond_error!(e);
860 : }
861 : };
862 : BatchedFeMessage::GetPage {
863 0 : span,
864 0 : shard,
865 0 : effective_request_lsn,
866 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
867 : }
868 : }
869 : };
870 0 : Ok(Some(batched_msg))
871 0 : }
872 :
873 : /// Post-condition: `batch` is Some()
874 : #[instrument(skip_all, level = tracing::Level::TRACE)]
875 : #[allow(clippy::boxed_local)]
876 : fn pagestream_do_batch(
877 : max_batch_size: NonZeroUsize,
878 : batch: &mut Result<BatchedFeMessage, QueryError>,
879 : this_msg: Result<BatchedFeMessage, QueryError>,
880 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
881 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
882 :
883 : let this_msg = match this_msg {
884 : Ok(this_msg) => this_msg,
885 : Err(e) => return Err(Err(e)),
886 : };
887 :
888 : match (&mut *batch, this_msg) {
889 : // something batched already, let's see if we can add this message to the batch
890 : (
891 : Ok(BatchedFeMessage::GetPage {
892 : span: _,
893 : shard: accum_shard,
894 : pages: ref mut accum_pages,
895 : effective_request_lsn: accum_lsn,
896 : }),
897 : BatchedFeMessage::GetPage {
898 : span: _,
899 : shard: this_shard,
900 : pages: this_pages,
901 : effective_request_lsn: this_lsn,
902 : },
903 0 : ) if (|| {
904 0 : assert_eq!(this_pages.len(), 1);
905 0 : if accum_pages.len() >= max_batch_size.get() {
906 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
907 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
908 0 : return false;
909 0 : }
910 0 : if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
911 0 : != (this_shard.tenant_shard_id, this_shard.timeline_id)
912 : {
913 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
914 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
915 : // But the current logic for keeping responses in order does not support that.
916 0 : return false;
917 0 : }
918 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
919 0 : // as the effective request_lsn changes
920 0 : if *accum_lsn != this_lsn {
921 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
922 0 : return false;
923 0 : }
924 0 : true
925 : })() =>
926 : {
927 : // ok to batch
928 : accum_pages.extend(this_pages);
929 : Ok(())
930 : }
931 : // something batched already but this message is unbatchable
932 : (_, this_msg) => {
933 : // by default, don't continue batching
934 : Err(Ok(this_msg))
935 : }
936 : }
937 : }
938 :
939 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
940 : async fn pagesteam_handle_batched_message<IO>(
941 : &mut self,
942 : pgb_writer: &mut PostgresBackend<IO>,
943 : batch: BatchedFeMessage,
944 : cancel: &CancellationToken,
945 : protocol_version: PagestreamProtocolVersion,
946 : ctx: &RequestContext,
947 : ) -> Result<(), QueryError>
948 : where
949 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
950 : {
951 : let started_at = Instant::now();
952 : let batch = {
953 : let mut batch = batch;
954 : batch.observe_execution_start(started_at);
955 : batch
956 : };
957 :
958 : // invoke handler function
959 : let (handler_results, span): (
960 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
961 : _,
962 : ) = match batch {
963 : BatchedFeMessage::Exists {
964 : span,
965 : timer,
966 : shard,
967 : req,
968 : } => {
969 : fail::fail_point!("ps::handle-pagerequest-message::exists");
970 : (
971 : vec![self
972 : .handle_get_rel_exists_request(&shard, &req, ctx)
973 : .instrument(span.clone())
974 : .await
975 0 : .map(|msg| (msg, timer))
976 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
977 : span,
978 : )
979 : }
980 : BatchedFeMessage::Nblocks {
981 : span,
982 : timer,
983 : shard,
984 : req,
985 : } => {
986 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
987 : (
988 : vec![self
989 : .handle_get_nblocks_request(&shard, &req, ctx)
990 : .instrument(span.clone())
991 : .await
992 0 : .map(|msg| (msg, timer))
993 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
994 : span,
995 : )
996 : }
997 : BatchedFeMessage::GetPage {
998 : span,
999 : shard,
1000 : effective_request_lsn,
1001 : pages,
1002 : } => {
1003 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
1004 : (
1005 : {
1006 : let npages = pages.len();
1007 : trace!(npages, "handling getpage request");
1008 : let res = self
1009 : .handle_get_page_at_lsn_request_batched(
1010 : &shard,
1011 : effective_request_lsn,
1012 : pages,
1013 : ctx,
1014 : )
1015 : .instrument(span.clone())
1016 : .await;
1017 : assert_eq!(res.len(), npages);
1018 : res
1019 : },
1020 : span,
1021 : )
1022 : }
1023 : BatchedFeMessage::DbSize {
1024 : span,
1025 : timer,
1026 : shard,
1027 : req,
1028 : } => {
1029 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1030 : (
1031 : vec![self
1032 : .handle_db_size_request(&shard, &req, ctx)
1033 : .instrument(span.clone())
1034 : .await
1035 0 : .map(|msg| (msg, timer))
1036 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1037 : span,
1038 : )
1039 : }
1040 : BatchedFeMessage::GetSlruSegment {
1041 : span,
1042 : timer,
1043 : shard,
1044 : req,
1045 : } => {
1046 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1047 : (
1048 : vec![self
1049 : .handle_get_slru_segment_request(&shard, &req, ctx)
1050 : .instrument(span.clone())
1051 : .await
1052 0 : .map(|msg| (msg, timer))
1053 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1054 : span,
1055 : )
1056 : }
1057 : BatchedFeMessage::RespondError { span, error } => {
1058 : // We've already decided to respond with an error, so we don't need to
1059 : // call the handler.
1060 : (vec![Err(error)], span)
1061 : }
1062 : };
1063 :
1064 : // Map handler result to protocol behavior.
1065 : // Some handler errors cause exit from pagestream protocol.
1066 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1067 : for handler_result in handler_results {
1068 : let (response_msg, timer) = match handler_result {
1069 : Err(e) => match &e.err {
1070 : PageStreamError::Shutdown => {
1071 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1072 : // shutdown, then do not send the error to the client. Instead just drop the
1073 : // connection.
1074 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1075 : return Err(QueryError::Shutdown);
1076 : }
1077 : PageStreamError::Reconnect(reason) => {
1078 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1079 : return Err(QueryError::Reconnect);
1080 : }
1081 : PageStreamError::Read(_)
1082 : | PageStreamError::LsnTimeout(_)
1083 : | PageStreamError::NotFound(_)
1084 : | PageStreamError::BadRequest(_) => {
1085 : // print the all details to the log with {:#}, but for the client the
1086 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1087 : // here includes cancellation which is not an error.
1088 : let full = utils::error::report_compact_sources(&e.err);
1089 0 : span.in_scope(|| {
1090 0 : error!("error reading relation or page version: {full:#}")
1091 0 : });
1092 : (
1093 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1094 : req: e.req,
1095 : message: e.err.to_string(),
1096 : }),
1097 : None, // TODO: measure errors
1098 : )
1099 : }
1100 : },
1101 : Ok((response_msg, timer)) => (response_msg, Some(timer)),
1102 : };
1103 :
1104 : //
1105 : // marshal & transmit response message
1106 : //
1107 :
1108 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1109 : &response_msg.serialize(protocol_version),
1110 : ))?;
1111 :
1112 : // We purposefully don't count flush time into the timer.
1113 : //
1114 : // The reason is that current compute client will not perform protocol processing
1115 : // if the postgres backend process is doing things other than `->smgr_read()`.
1116 : // This is especially the case for prefetch.
1117 : //
1118 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1119 : // all the way into our flush call below.
1120 : //
1121 : // The timer's underlying metric is used for a storage-internal latency SLO and
1122 : // we don't want to include latency in it that we can't control.
1123 : // And as pointed out above, in this case, we don't control the time that flush will take.
1124 0 : let flushing_timer = timer.map(|mut timer| {
1125 0 : timer
1126 0 : .observe_execution_end_flush_start(Instant::now())
1127 0 : .expect("we are the first caller")
1128 0 : });
1129 :
1130 : // what we want to do
1131 : let flush_fut = pgb_writer.flush();
1132 : // metric for how long flushing takes
1133 : let flush_fut = match flushing_timer {
1134 : Some(flushing_timer) => {
1135 : futures::future::Either::Left(flushing_timer.measure(flush_fut))
1136 : }
1137 : None => futures::future::Either::Right(flush_fut),
1138 : };
1139 : // do it while respecting cancellation
1140 0 : let _: () = async move {
1141 0 : tokio::select! {
1142 : biased;
1143 0 : _ = cancel.cancelled() => {
1144 : // We were requested to shut down.
1145 0 : info!("shutdown request received in page handler");
1146 0 : return Err(QueryError::Shutdown)
1147 : }
1148 0 : res = flush_fut => {
1149 0 : res?;
1150 : }
1151 : }
1152 0 : Ok(())
1153 0 : }
1154 : // and log the info! line inside the request span
1155 : .instrument(span.clone())
1156 : .await?;
1157 : }
1158 : Ok(())
1159 : }
1160 :
1161 : /// Pagestream sub-protocol handler.
1162 : ///
1163 : /// It is a simple request-response protocol inside a COPYBOTH session.
1164 : ///
1165 : /// # Coding Discipline
1166 : ///
1167 : /// Coding discipline within this function: all interaction with the `pgb` connection
1168 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1169 : /// This is so that we can shutdown page_service quickly.
1170 : #[instrument(skip_all)]
1171 : async fn handle_pagerequests<IO>(
1172 : &mut self,
1173 : pgb: &mut PostgresBackend<IO>,
1174 : tenant_id: TenantId,
1175 : timeline_id: TimelineId,
1176 : protocol_version: PagestreamProtocolVersion,
1177 : ctx: RequestContext,
1178 : ) -> Result<(), QueryError>
1179 : where
1180 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1181 : {
1182 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1183 :
1184 : // switch client to COPYBOTH
1185 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1186 : tokio::select! {
1187 : biased;
1188 : _ = self.cancel.cancelled() => {
1189 : return Err(QueryError::Shutdown)
1190 : }
1191 : res = pgb.flush() => {
1192 : res?;
1193 : }
1194 : }
1195 :
1196 : let pgb_reader = pgb
1197 : .split()
1198 : .context("implementation error: split pgb into reader and writer")?;
1199 :
1200 : let timeline_handles = self
1201 : .timeline_handles
1202 : .take()
1203 : .expect("implementation error: timeline_handles should not be locked");
1204 :
1205 : let request_span = info_span!("request", shard_id = tracing::field::Empty);
1206 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1207 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1208 : self.handle_pagerequests_pipelined(
1209 : pgb,
1210 : pgb_reader,
1211 : tenant_id,
1212 : timeline_id,
1213 : timeline_handles,
1214 : request_span,
1215 : pipelining_config,
1216 : protocol_version,
1217 : &ctx,
1218 : )
1219 : .await
1220 : }
1221 : PageServicePipeliningConfig::Serial => {
1222 : self.handle_pagerequests_serial(
1223 : pgb,
1224 : pgb_reader,
1225 : tenant_id,
1226 : timeline_id,
1227 : timeline_handles,
1228 : request_span,
1229 : protocol_version,
1230 : &ctx,
1231 : )
1232 : .await
1233 : }
1234 : };
1235 :
1236 : debug!("pagestream subprotocol shut down cleanly");
1237 :
1238 : pgb.unsplit(pgb_reader)
1239 : .context("implementation error: unsplit pgb")?;
1240 :
1241 : let replaced = self.timeline_handles.replace(timeline_handles);
1242 : assert!(replaced.is_none());
1243 :
1244 : result
1245 : }
1246 :
1247 : #[allow(clippy::too_many_arguments)]
1248 0 : async fn handle_pagerequests_serial<IO>(
1249 0 : &mut self,
1250 0 : pgb_writer: &mut PostgresBackend<IO>,
1251 0 : mut pgb_reader: PostgresBackendReader<IO>,
1252 0 : tenant_id: TenantId,
1253 0 : timeline_id: TimelineId,
1254 0 : mut timeline_handles: TimelineHandles,
1255 0 : request_span: Span,
1256 0 : protocol_version: PagestreamProtocolVersion,
1257 0 : ctx: &RequestContext,
1258 0 : ) -> (
1259 0 : (PostgresBackendReader<IO>, TimelineHandles),
1260 0 : Result<(), QueryError>,
1261 0 : )
1262 0 : where
1263 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1264 0 : {
1265 0 : let cancel = self.cancel.clone();
1266 0 : let err = loop {
1267 0 : let msg = Self::pagestream_read_message(
1268 0 : &mut pgb_reader,
1269 0 : tenant_id,
1270 0 : timeline_id,
1271 0 : &mut timeline_handles,
1272 0 : &cancel,
1273 0 : ctx,
1274 0 : protocol_version,
1275 0 : request_span.clone(),
1276 0 : )
1277 0 : .await;
1278 0 : let msg = match msg {
1279 0 : Ok(msg) => msg,
1280 0 : Err(e) => break e,
1281 : };
1282 0 : let msg = match msg {
1283 0 : Some(msg) => msg,
1284 : None => {
1285 0 : debug!("pagestream subprotocol end observed");
1286 0 : return ((pgb_reader, timeline_handles), Ok(()));
1287 : }
1288 : };
1289 :
1290 0 : let err = self
1291 0 : .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx)
1292 0 : .await;
1293 0 : match err {
1294 0 : Ok(()) => {}
1295 0 : Err(e) => break e,
1296 : }
1297 : };
1298 0 : ((pgb_reader, timeline_handles), Err(err))
1299 0 : }
1300 :
1301 : /// # Cancel-Safety
1302 : ///
1303 : /// May leak tokio tasks if not polled to completion.
1304 : #[allow(clippy::too_many_arguments)]
1305 0 : async fn handle_pagerequests_pipelined<IO>(
1306 0 : &mut self,
1307 0 : pgb_writer: &mut PostgresBackend<IO>,
1308 0 : pgb_reader: PostgresBackendReader<IO>,
1309 0 : tenant_id: TenantId,
1310 0 : timeline_id: TimelineId,
1311 0 : mut timeline_handles: TimelineHandles,
1312 0 : request_span: Span,
1313 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1314 0 : protocol_version: PagestreamProtocolVersion,
1315 0 : ctx: &RequestContext,
1316 0 : ) -> (
1317 0 : (PostgresBackendReader<IO>, TimelineHandles),
1318 0 : Result<(), QueryError>,
1319 0 : )
1320 0 : where
1321 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1322 0 : {
1323 0 : //
1324 0 : // Pipelined pagestream handling consists of
1325 0 : // - a Batcher that reads requests off the wire and
1326 0 : // and batches them if possible,
1327 0 : // - an Executor that processes the batched requests.
1328 0 : //
1329 0 : // The batch is built up inside an `spsc_fold` channel,
1330 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1331 0 : //
1332 0 : // The Batcher continously folds client requests into the batch,
1333 0 : // while the Executor can at any time take out what's in the batch
1334 0 : // in order to process it.
1335 0 : // This means the next batch builds up while the Executor
1336 0 : // executes the last batch.
1337 0 : //
1338 0 : // CANCELLATION
1339 0 : //
1340 0 : // We run both Batcher and Executor futures to completion before
1341 0 : // returning from this function.
1342 0 : //
1343 0 : // If Executor exits first, it signals cancellation to the Batcher
1344 0 : // via a CancellationToken that is child of `self.cancel`.
1345 0 : // If Batcher exits first, it signals cancellation to the Executor
1346 0 : // by dropping the spsc_fold channel Sender.
1347 0 : //
1348 0 : // CLEAN SHUTDOWN
1349 0 : //
1350 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1351 0 : // In response to such a client message, the Batcher exits.
1352 0 : // The Executor continues to run, draining the spsc_fold channel.
1353 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1354 0 : // indicating that the sender disconnected.
1355 0 : // The Executor exits with Ok(()) in response to that error.
1356 0 : //
1357 0 : // Server initiated shutdown is not clean shutdown, but instead
1358 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1359 0 : // error propagation.
1360 0 : //
1361 0 : // ERROR PROPAGATION
1362 0 : //
1363 0 : // When the Batcher encounter an error, it sends it as a value
1364 0 : // through the spsc_fold channel and exits afterwards.
1365 0 : // When the Executor observes such an error in the channel,
1366 0 : // it exits returning that error value.
1367 0 : //
1368 0 : // This design ensures that the Executor stage will still process
1369 0 : // the batch that was in flight when the Batcher encountered an error,
1370 0 : // thereby beahving identical to a serial implementation.
1371 0 :
1372 0 : let PageServicePipeliningConfigPipelined {
1373 0 : max_batch_size,
1374 0 : execution,
1375 0 : } = pipelining_config;
1376 :
1377 : // Macro to _define_ a pipeline stage.
1378 : macro_rules! pipeline_stage {
1379 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1380 : let cancel: CancellationToken = $cancel;
1381 : let stage_fut = $make_fut(cancel.clone());
1382 0 : async move {
1383 0 : scopeguard::defer! {
1384 0 : debug!("exiting");
1385 0 : }
1386 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1387 0 : .await
1388 0 : }
1389 : .instrument(tracing::info_span!($name))
1390 : }};
1391 : }
1392 :
1393 : //
1394 : // Batcher
1395 : //
1396 :
1397 0 : let cancel_batcher = self.cancel.child_token();
1398 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1399 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1400 0 : let ctx = ctx.attached_child();
1401 0 : async move {
1402 0 : let mut pgb_reader = pgb_reader;
1403 0 : let mut exit = false;
1404 0 : while !exit {
1405 0 : let read_res = Self::pagestream_read_message(
1406 0 : &mut pgb_reader,
1407 0 : tenant_id,
1408 0 : timeline_id,
1409 0 : &mut timeline_handles,
1410 0 : &cancel_batcher,
1411 0 : &ctx,
1412 0 : protocol_version,
1413 0 : request_span.clone(),
1414 0 : )
1415 0 : .await;
1416 0 : let Some(read_res) = read_res.transpose() else {
1417 0 : debug!("client-initiated shutdown");
1418 0 : break;
1419 : };
1420 0 : exit |= read_res.is_err();
1421 0 : let could_send = batch_tx
1422 0 : .send(read_res, |batch, res| {
1423 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1424 0 : })
1425 0 : .await;
1426 0 : exit |= could_send.is_err();
1427 : }
1428 0 : (pgb_reader, timeline_handles)
1429 0 : }
1430 0 : });
1431 :
1432 : //
1433 : // Executor
1434 : //
1435 :
1436 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1437 0 : let ctx = ctx.attached_child();
1438 0 : async move {
1439 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1440 : loop {
1441 0 : let maybe_batch = batch_rx.recv().await;
1442 0 : let batch = match maybe_batch {
1443 0 : Ok(batch) => batch,
1444 : Err(spsc_fold::RecvError::SenderGone) => {
1445 0 : debug!("upstream gone");
1446 0 : return Ok(());
1447 : }
1448 : };
1449 0 : let batch = match batch {
1450 0 : Ok(batch) => batch,
1451 0 : Err(e) => {
1452 0 : return Err(e);
1453 : }
1454 : };
1455 0 : self.pagesteam_handle_batched_message(
1456 0 : pgb_writer,
1457 0 : batch,
1458 0 : &cancel,
1459 0 : protocol_version,
1460 0 : &ctx,
1461 0 : )
1462 0 : .await?;
1463 : }
1464 0 : }
1465 0 : });
1466 :
1467 : //
1468 : // Execute the stages.
1469 : //
1470 :
1471 0 : match execution {
1472 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1473 0 : tokio::join!(batcher, executor)
1474 : }
1475 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1476 : // These tasks are not tracked anywhere.
1477 0 : let read_messages_task = tokio::spawn(batcher);
1478 0 : let (read_messages_task_res, executor_res_) =
1479 0 : tokio::join!(read_messages_task, executor,);
1480 0 : (
1481 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1482 0 : executor_res_,
1483 0 : )
1484 : }
1485 : }
1486 0 : }
1487 :
1488 : /// Helper function to handle the LSN from client request.
1489 : ///
1490 : /// Each GetPage (and Exists and Nblocks) request includes information about
1491 : /// which version of the page is being requested. The primary compute node
1492 : /// will always request the latest page version, by setting 'request_lsn' to
1493 : /// the last inserted or flushed WAL position, while a standby will request
1494 : /// a version at the LSN that it's currently caught up to.
1495 : ///
1496 : /// In either case, if the page server hasn't received the WAL up to the
1497 : /// requested LSN yet, we will wait for it to arrive. The return value is
1498 : /// the LSN that should be used to look up the page versions.
1499 : ///
1500 : /// In addition to the request LSN, each request carries another LSN,
1501 : /// 'not_modified_since', which is a hint to the pageserver that the client
1502 : /// knows that the page has not been modified between 'not_modified_since'
1503 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1504 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1505 : /// information about when the page was modified, it will use
1506 : /// not_modified_since == lsn. If the client lies and sends a too low
1507 : /// not_modified_hint such that there are in fact later page versions, the
1508 : /// behavior is undefined: the pageserver may return any of the page versions
1509 : /// or an error.
1510 0 : async fn wait_or_get_last_lsn(
1511 0 : timeline: &Timeline,
1512 0 : request_lsn: Lsn,
1513 0 : not_modified_since: Lsn,
1514 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1515 0 : ctx: &RequestContext,
1516 0 : ) -> Result<Lsn, PageStreamError> {
1517 0 : let last_record_lsn = timeline.get_last_record_lsn();
1518 0 :
1519 0 : // Sanity check the request
1520 0 : if request_lsn < not_modified_since {
1521 0 : return Err(PageStreamError::BadRequest(
1522 0 : format!(
1523 0 : "invalid request with request LSN {} and not_modified_since {}",
1524 0 : request_lsn, not_modified_since,
1525 0 : )
1526 0 : .into(),
1527 0 : ));
1528 0 : }
1529 0 :
1530 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1531 0 : if request_lsn == Lsn::INVALID {
1532 0 : return Err(PageStreamError::BadRequest(
1533 0 : "invalid LSN(0) in request".into(),
1534 0 : ));
1535 0 : }
1536 0 :
1537 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1538 0 : //
1539 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1540 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1541 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1542 0 : let gc_info = &timeline.gc_info.read().unwrap();
1543 0 : if !gc_info.leases.contains_key(&request_lsn) {
1544 0 : return Err(
1545 0 : PageStreamError::BadRequest(format!(
1546 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1547 0 : request_lsn, **latest_gc_cutoff_lsn
1548 0 : ).into())
1549 0 : );
1550 0 : }
1551 0 : }
1552 :
1553 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1554 0 : if not_modified_since > last_record_lsn {
1555 0 : timeline
1556 0 : .wait_lsn(
1557 0 : not_modified_since,
1558 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1559 0 : ctx,
1560 0 : )
1561 0 : .await?;
1562 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1563 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1564 : // advance immediately after we return anyway)
1565 0 : Ok(not_modified_since)
1566 : } else {
1567 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1568 : // here instead. That would give the same result, since we know that there
1569 : // haven't been any modifications since 'not_modified_since'. Using an older
1570 : // LSN might be faster, because that could allow skipping recent layers when
1571 : // finding the page. However, we have historically used 'last_record_lsn', so
1572 : // stick to that for now.
1573 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1574 : }
1575 0 : }
1576 :
1577 : /// Handles the lsn lease request.
1578 : /// If a lease cannot be obtained, the client will receive NULL.
1579 : #[instrument(skip_all, fields(shard_id, %lsn))]
1580 : async fn handle_make_lsn_lease<IO>(
1581 : &mut self,
1582 : pgb: &mut PostgresBackend<IO>,
1583 : tenant_shard_id: TenantShardId,
1584 : timeline_id: TimelineId,
1585 : lsn: Lsn,
1586 : ctx: &RequestContext,
1587 : ) -> Result<(), QueryError>
1588 : where
1589 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1590 : {
1591 : let timeline = self
1592 : .timeline_handles
1593 : .as_mut()
1594 : .unwrap()
1595 : .get(
1596 : tenant_shard_id.tenant_id,
1597 : timeline_id,
1598 : ShardSelector::Known(tenant_shard_id.to_index()),
1599 : )
1600 : .await?;
1601 : set_tracing_field_shard_id(&timeline);
1602 :
1603 : let lease = timeline
1604 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1605 0 : .inspect_err(|e| {
1606 0 : warn!("{e}");
1607 0 : })
1608 : .ok();
1609 0 : let valid_until_str = lease.map(|l| {
1610 0 : l.valid_until
1611 0 : .duration_since(SystemTime::UNIX_EPOCH)
1612 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1613 0 : .as_millis()
1614 0 : .to_string()
1615 0 : });
1616 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1617 :
1618 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1619 : b"valid_until",
1620 : )]))?
1621 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1622 :
1623 : Ok(())
1624 : }
1625 :
1626 : #[instrument(skip_all, fields(shard_id))]
1627 : async fn handle_get_rel_exists_request(
1628 : &mut self,
1629 : timeline: &Timeline,
1630 : req: &PagestreamExistsRequest,
1631 : ctx: &RequestContext,
1632 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1633 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1634 : let lsn = Self::wait_or_get_last_lsn(
1635 : timeline,
1636 : req.hdr.request_lsn,
1637 : req.hdr.not_modified_since,
1638 : &latest_gc_cutoff_lsn,
1639 : ctx,
1640 : )
1641 : .await?;
1642 :
1643 : let exists = timeline
1644 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1645 : .await?;
1646 :
1647 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1648 : req: *req,
1649 : exists,
1650 : }))
1651 : }
1652 :
1653 : #[instrument(skip_all, fields(shard_id))]
1654 : async fn handle_get_nblocks_request(
1655 : &mut self,
1656 : timeline: &Timeline,
1657 : req: &PagestreamNblocksRequest,
1658 : ctx: &RequestContext,
1659 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1660 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1661 : let lsn = Self::wait_or_get_last_lsn(
1662 : timeline,
1663 : req.hdr.request_lsn,
1664 : req.hdr.not_modified_since,
1665 : &latest_gc_cutoff_lsn,
1666 : ctx,
1667 : )
1668 : .await?;
1669 :
1670 : let n_blocks = timeline
1671 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1672 : .await?;
1673 :
1674 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1675 : req: *req,
1676 : n_blocks,
1677 : }))
1678 : }
1679 :
1680 : #[instrument(skip_all, fields(shard_id))]
1681 : async fn handle_db_size_request(
1682 : &mut self,
1683 : timeline: &Timeline,
1684 : req: &PagestreamDbSizeRequest,
1685 : ctx: &RequestContext,
1686 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1687 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1688 : let lsn = Self::wait_or_get_last_lsn(
1689 : timeline,
1690 : req.hdr.request_lsn,
1691 : req.hdr.not_modified_since,
1692 : &latest_gc_cutoff_lsn,
1693 : ctx,
1694 : )
1695 : .await?;
1696 :
1697 : let total_blocks = timeline
1698 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1699 : .await?;
1700 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1701 :
1702 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1703 : req: *req,
1704 : db_size,
1705 : }))
1706 : }
1707 :
1708 : #[instrument(skip_all)]
1709 : async fn handle_get_page_at_lsn_request_batched(
1710 : &mut self,
1711 : timeline: &Timeline,
1712 : effective_lsn: Lsn,
1713 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
1714 : ctx: &RequestContext,
1715 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1716 : debug_assert_current_span_has_tenant_and_timeline_id();
1717 :
1718 : timeline
1719 : .query_metrics
1720 : .observe_getpage_batch_start(requests.len());
1721 :
1722 : // If a page trace is running, submit an event for this request.
1723 : if let Some(page_trace) = timeline.page_trace.load().as_ref() {
1724 : let time = SystemTime::now();
1725 : for batch in &requests {
1726 : let key = rel_block_to_key(batch.req.rel, batch.req.blkno).to_compact();
1727 : // Ignore error (trace buffer may be full or tracer may have disconnected).
1728 : _ = page_trace.try_send(PageTraceEvent {
1729 : key,
1730 : effective_lsn,
1731 : time,
1732 : });
1733 : }
1734 : }
1735 :
1736 : let results = timeline
1737 : .get_rel_page_at_lsn_batched(
1738 0 : requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
1739 : effective_lsn,
1740 : ctx,
1741 : )
1742 : .await;
1743 : assert_eq!(results.len(), requests.len());
1744 :
1745 : // TODO: avoid creating the new Vec here
1746 : Vec::from_iter(
1747 : requests
1748 : .into_iter()
1749 : .zip(results.into_iter())
1750 0 : .map(|(req, res)| {
1751 0 : res.map(|page| {
1752 0 : (
1753 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
1754 0 : req: req.req,
1755 0 : page,
1756 0 : }),
1757 0 : req.timer,
1758 0 : )
1759 0 : })
1760 0 : .map_err(|e| BatchedPageStreamError {
1761 0 : err: PageStreamError::from(e),
1762 0 : req: req.req.hdr,
1763 0 : })
1764 0 : }),
1765 : )
1766 : }
1767 :
1768 : #[instrument(skip_all, fields(shard_id))]
1769 : async fn handle_get_slru_segment_request(
1770 : &mut self,
1771 : timeline: &Timeline,
1772 : req: &PagestreamGetSlruSegmentRequest,
1773 : ctx: &RequestContext,
1774 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1775 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1776 : let lsn = Self::wait_or_get_last_lsn(
1777 : timeline,
1778 : req.hdr.request_lsn,
1779 : req.hdr.not_modified_since,
1780 : &latest_gc_cutoff_lsn,
1781 : ctx,
1782 : )
1783 : .await?;
1784 :
1785 : let kind = SlruKind::from_repr(req.kind)
1786 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1787 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1788 :
1789 : Ok(PagestreamBeMessage::GetSlruSegment(
1790 : PagestreamGetSlruSegmentResponse { req: *req, segment },
1791 : ))
1792 : }
1793 :
1794 : /// Note on "fullbackup":
1795 : /// Full basebackups should only be used for debugging purposes.
1796 : /// Originally, it was introduced to enable breaking storage format changes,
1797 : /// but that is not applicable anymore.
1798 : ///
1799 : /// # Coding Discipline
1800 : ///
1801 : /// Coding discipline within this function: all interaction with the `pgb` connection
1802 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1803 : /// This is so that we can shutdown page_service quickly.
1804 : ///
1805 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1806 : /// to connection cancellation.
1807 : #[allow(clippy::too_many_arguments)]
1808 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1809 : async fn handle_basebackup_request<IO>(
1810 : &mut self,
1811 : pgb: &mut PostgresBackend<IO>,
1812 : tenant_id: TenantId,
1813 : timeline_id: TimelineId,
1814 : lsn: Option<Lsn>,
1815 : prev_lsn: Option<Lsn>,
1816 : full_backup: bool,
1817 : gzip: bool,
1818 : replica: bool,
1819 : ctx: &RequestContext,
1820 : ) -> Result<(), QueryError>
1821 : where
1822 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1823 : {
1824 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1825 0 : match err {
1826 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1827 0 : BasebackupError::Server(e) => QueryError::Other(e),
1828 : }
1829 0 : }
1830 :
1831 : let started = std::time::Instant::now();
1832 :
1833 : let timeline = self
1834 : .timeline_handles
1835 : .as_mut()
1836 : .unwrap()
1837 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1838 : .await?;
1839 :
1840 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1841 : if let Some(lsn) = lsn {
1842 : // Backup was requested at a particular LSN. Wait for it to arrive.
1843 : info!("waiting for {}", lsn);
1844 : timeline
1845 : .wait_lsn(
1846 : lsn,
1847 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1848 : ctx,
1849 : )
1850 : .await?;
1851 : timeline
1852 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1853 : .context("invalid basebackup lsn")?;
1854 : }
1855 :
1856 : let lsn_awaited_after = started.elapsed();
1857 :
1858 : // switch client to COPYOUT
1859 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1860 : .map_err(QueryError::Disconnected)?;
1861 : self.flush_cancellable(pgb, &self.cancel).await?;
1862 :
1863 : // Send a tarball of the latest layer on the timeline. Compress if not
1864 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1865 : if full_backup {
1866 : let mut writer = pgb.copyout_writer();
1867 : basebackup::send_basebackup_tarball(
1868 : &mut writer,
1869 : &timeline,
1870 : lsn,
1871 : prev_lsn,
1872 : full_backup,
1873 : replica,
1874 : ctx,
1875 : )
1876 : .await
1877 : .map_err(map_basebackup_error)?;
1878 : } else {
1879 : let mut writer = BufWriter::new(pgb.copyout_writer());
1880 : if gzip {
1881 : let mut encoder = GzipEncoder::with_quality(
1882 : &mut writer,
1883 : // NOTE using fast compression because it's on the critical path
1884 : // for compute startup. For an empty database, we get
1885 : // <100KB with this method. The Level::Best compression method
1886 : // gives us <20KB, but maybe we should add basebackup caching
1887 : // on compute shutdown first.
1888 : async_compression::Level::Fastest,
1889 : );
1890 : basebackup::send_basebackup_tarball(
1891 : &mut encoder,
1892 : &timeline,
1893 : lsn,
1894 : prev_lsn,
1895 : full_backup,
1896 : replica,
1897 : ctx,
1898 : )
1899 : .await
1900 : .map_err(map_basebackup_error)?;
1901 : // shutdown the encoder to ensure the gzip footer is written
1902 : encoder
1903 : .shutdown()
1904 : .await
1905 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1906 : } else {
1907 : basebackup::send_basebackup_tarball(
1908 : &mut writer,
1909 : &timeline,
1910 : lsn,
1911 : prev_lsn,
1912 : full_backup,
1913 : replica,
1914 : ctx,
1915 : )
1916 : .await
1917 : .map_err(map_basebackup_error)?;
1918 : }
1919 : writer
1920 : .flush()
1921 : .await
1922 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
1923 : }
1924 :
1925 : pgb.write_message_noflush(&BeMessage::CopyDone)
1926 : .map_err(QueryError::Disconnected)?;
1927 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1928 :
1929 : let basebackup_after = started
1930 : .elapsed()
1931 : .checked_sub(lsn_awaited_after)
1932 : .unwrap_or(Duration::ZERO);
1933 :
1934 : info!(
1935 : lsn_await_millis = lsn_awaited_after.as_millis(),
1936 : basebackup_millis = basebackup_after.as_millis(),
1937 : "basebackup complete"
1938 : );
1939 :
1940 : Ok(())
1941 : }
1942 :
1943 : // when accessing management api supply None as an argument
1944 : // when using to authorize tenant pass corresponding tenant id
1945 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1946 0 : if self.auth.is_none() {
1947 : // auth is set to Trust, nothing to check so just return ok
1948 0 : return Ok(());
1949 0 : }
1950 0 : // auth is some, just checked above, when auth is some
1951 0 : // then claims are always present because of checks during connection init
1952 0 : // so this expect won't trigger
1953 0 : let claims = self
1954 0 : .claims
1955 0 : .as_ref()
1956 0 : .expect("claims presence already checked");
1957 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1958 0 : }
1959 : }
1960 :
1961 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
1962 : #[derive(Debug, Clone, Eq, PartialEq)]
1963 : struct BaseBackupCmd {
1964 : tenant_id: TenantId,
1965 : timeline_id: TimelineId,
1966 : lsn: Option<Lsn>,
1967 : gzip: bool,
1968 : replica: bool,
1969 : }
1970 :
1971 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
1972 : #[derive(Debug, Clone, Eq, PartialEq)]
1973 : struct FullBackupCmd {
1974 : tenant_id: TenantId,
1975 : timeline_id: TimelineId,
1976 : lsn: Option<Lsn>,
1977 : prev_lsn: Option<Lsn>,
1978 : }
1979 :
1980 : /// `pagestream_v2 tenant timeline`
1981 : #[derive(Debug, Clone, Eq, PartialEq)]
1982 : struct PageStreamCmd {
1983 : tenant_id: TenantId,
1984 : timeline_id: TimelineId,
1985 : protocol_version: PagestreamProtocolVersion,
1986 : }
1987 :
1988 : /// `lease lsn tenant timeline lsn`
1989 : #[derive(Debug, Clone, Eq, PartialEq)]
1990 : struct LeaseLsnCmd {
1991 : tenant_shard_id: TenantShardId,
1992 : timeline_id: TimelineId,
1993 : lsn: Lsn,
1994 : }
1995 :
1996 : #[derive(Debug, Clone, Eq, PartialEq)]
1997 : enum PageServiceCmd {
1998 : Set,
1999 : PageStream(PageStreamCmd),
2000 : BaseBackup(BaseBackupCmd),
2001 : FullBackup(FullBackupCmd),
2002 : LeaseLsn(LeaseLsnCmd),
2003 : }
2004 :
2005 : impl PageStreamCmd {
2006 6 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
2007 6 : let parameters = query.split_whitespace().collect_vec();
2008 6 : if parameters.len() != 2 {
2009 2 : bail!(
2010 2 : "invalid number of parameters for pagestream command: {}",
2011 2 : query
2012 2 : );
2013 4 : }
2014 4 : let tenant_id = TenantId::from_str(parameters[0])
2015 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2016 2 : let timeline_id = TimelineId::from_str(parameters[1])
2017 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2018 2 : Ok(Self {
2019 2 : tenant_id,
2020 2 : timeline_id,
2021 2 : protocol_version,
2022 2 : })
2023 6 : }
2024 : }
2025 :
2026 : impl FullBackupCmd {
2027 4 : fn parse(query: &str) -> anyhow::Result<Self> {
2028 4 : let parameters = query.split_whitespace().collect_vec();
2029 4 : if parameters.len() < 2 || parameters.len() > 4 {
2030 0 : bail!(
2031 0 : "invalid number of parameters for basebackup command: {}",
2032 0 : query
2033 0 : );
2034 4 : }
2035 4 : let tenant_id = TenantId::from_str(parameters[0])
2036 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2037 4 : let timeline_id = TimelineId::from_str(parameters[1])
2038 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2039 : // The caller is responsible for providing correct lsn and prev_lsn.
2040 4 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2041 : Some(
2042 2 : Lsn::from_str(lsn_str)
2043 2 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2044 : )
2045 : } else {
2046 2 : None
2047 : };
2048 4 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2049 : Some(
2050 2 : Lsn::from_str(prev_lsn_str)
2051 2 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2052 : )
2053 : } else {
2054 2 : None
2055 : };
2056 4 : Ok(Self {
2057 4 : tenant_id,
2058 4 : timeline_id,
2059 4 : lsn,
2060 4 : prev_lsn,
2061 4 : })
2062 4 : }
2063 : }
2064 :
2065 : impl BaseBackupCmd {
2066 18 : fn parse(query: &str) -> anyhow::Result<Self> {
2067 18 : let parameters = query.split_whitespace().collect_vec();
2068 18 : if parameters.len() < 2 {
2069 0 : bail!(
2070 0 : "invalid number of parameters for basebackup command: {}",
2071 0 : query
2072 0 : );
2073 18 : }
2074 18 : let tenant_id = TenantId::from_str(parameters[0])
2075 18 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2076 18 : let timeline_id = TimelineId::from_str(parameters[1])
2077 18 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2078 : let lsn;
2079 : let flags_parse_from;
2080 18 : if let Some(maybe_lsn) = parameters.get(2) {
2081 16 : if *maybe_lsn == "latest" {
2082 2 : lsn = None;
2083 2 : flags_parse_from = 3;
2084 14 : } else if maybe_lsn.starts_with("--") {
2085 10 : lsn = None;
2086 10 : flags_parse_from = 2;
2087 10 : } else {
2088 : lsn = Some(
2089 4 : Lsn::from_str(maybe_lsn)
2090 4 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2091 : );
2092 4 : flags_parse_from = 3;
2093 : }
2094 2 : } else {
2095 2 : lsn = None;
2096 2 : flags_parse_from = 2;
2097 2 : }
2098 :
2099 18 : let mut gzip = false;
2100 18 : let mut replica = false;
2101 :
2102 22 : for ¶m in ¶meters[flags_parse_from..] {
2103 22 : match param {
2104 22 : "--gzip" => {
2105 14 : if gzip {
2106 2 : bail!("duplicate parameter for basebackup command: {param}")
2107 12 : }
2108 12 : gzip = true
2109 : }
2110 8 : "--replica" => {
2111 4 : if replica {
2112 0 : bail!("duplicate parameter for basebackup command: {param}")
2113 4 : }
2114 4 : replica = true
2115 : }
2116 4 : _ => bail!("invalid parameter for basebackup command: {param}"),
2117 : }
2118 : }
2119 12 : Ok(Self {
2120 12 : tenant_id,
2121 12 : timeline_id,
2122 12 : lsn,
2123 12 : gzip,
2124 12 : replica,
2125 12 : })
2126 18 : }
2127 : }
2128 :
2129 : impl LeaseLsnCmd {
2130 4 : fn parse(query: &str) -> anyhow::Result<Self> {
2131 4 : let parameters = query.split_whitespace().collect_vec();
2132 4 : if parameters.len() != 3 {
2133 0 : bail!(
2134 0 : "invalid number of parameters for lease lsn command: {}",
2135 0 : query
2136 0 : );
2137 4 : }
2138 4 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2139 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2140 4 : let timeline_id = TimelineId::from_str(parameters[1])
2141 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2142 4 : let lsn = Lsn::from_str(parameters[2])
2143 4 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2144 4 : Ok(Self {
2145 4 : tenant_shard_id,
2146 4 : timeline_id,
2147 4 : lsn,
2148 4 : })
2149 4 : }
2150 : }
2151 :
2152 : impl PageServiceCmd {
2153 42 : fn parse(query: &str) -> anyhow::Result<Self> {
2154 42 : let query = query.trim();
2155 42 : let Some((cmd, other)) = query.split_once(' ') else {
2156 4 : bail!("cannot parse query: {query}")
2157 : };
2158 38 : match cmd.to_ascii_lowercase().as_str() {
2159 38 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2160 6 : other,
2161 6 : PagestreamProtocolVersion::V2,
2162 6 : )?)),
2163 32 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2164 0 : other,
2165 0 : PagestreamProtocolVersion::V3,
2166 0 : )?)),
2167 32 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2168 14 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2169 10 : "lease" => {
2170 6 : let Some((cmd2, other)) = other.split_once(' ') else {
2171 0 : bail!("invalid lease command: {cmd}");
2172 : };
2173 6 : let cmd2 = cmd2.to_ascii_lowercase();
2174 6 : if cmd2 == "lsn" {
2175 4 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2176 : } else {
2177 2 : bail!("invalid lease command: {cmd}");
2178 : }
2179 : }
2180 4 : "set" => Ok(Self::Set),
2181 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2182 : }
2183 42 : }
2184 : }
2185 :
2186 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2187 : where
2188 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2189 : {
2190 0 : fn check_auth_jwt(
2191 0 : &mut self,
2192 0 : _pgb: &mut PostgresBackend<IO>,
2193 0 : jwt_response: &[u8],
2194 0 : ) -> Result<(), QueryError> {
2195 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2196 : // which requires auth to be present
2197 0 : let data = self
2198 0 : .auth
2199 0 : .as_ref()
2200 0 : .unwrap()
2201 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2202 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2203 :
2204 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2205 0 : return Err(QueryError::Unauthorized(
2206 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2207 0 : ));
2208 0 : }
2209 0 :
2210 0 : debug!(
2211 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2212 : data.claims.scope, data.claims.tenant_id,
2213 : );
2214 :
2215 0 : self.claims = Some(data.claims);
2216 0 : Ok(())
2217 0 : }
2218 :
2219 0 : fn startup(
2220 0 : &mut self,
2221 0 : _pgb: &mut PostgresBackend<IO>,
2222 0 : _sm: &FeStartupPacket,
2223 0 : ) -> Result<(), QueryError> {
2224 0 : fail::fail_point!("ps::connection-start::startup-packet");
2225 0 : Ok(())
2226 0 : }
2227 :
2228 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2229 : async fn process_query(
2230 : &mut self,
2231 : pgb: &mut PostgresBackend<IO>,
2232 : query_string: &str,
2233 : ) -> Result<(), QueryError> {
2234 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2235 0 : info!("Hit failpoint for bad connection");
2236 0 : Err(QueryError::SimulatedConnectionError)
2237 0 : });
2238 :
2239 : fail::fail_point!("ps::connection-start::process-query");
2240 :
2241 : let ctx = self.connection_ctx.attached_child();
2242 : debug!("process query {query_string}");
2243 : let query = PageServiceCmd::parse(query_string)?;
2244 : match query {
2245 : PageServiceCmd::PageStream(PageStreamCmd {
2246 : tenant_id,
2247 : timeline_id,
2248 : protocol_version,
2249 : }) => {
2250 : tracing::Span::current()
2251 : .record("tenant_id", field::display(tenant_id))
2252 : .record("timeline_id", field::display(timeline_id));
2253 :
2254 : self.check_permission(Some(tenant_id))?;
2255 : let command_kind = match protocol_version {
2256 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2257 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2258 : };
2259 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2260 :
2261 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2262 : .await?;
2263 : }
2264 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2265 : tenant_id,
2266 : timeline_id,
2267 : lsn,
2268 : gzip,
2269 : replica,
2270 : }) => {
2271 : tracing::Span::current()
2272 : .record("tenant_id", field::display(tenant_id))
2273 : .record("timeline_id", field::display(timeline_id));
2274 :
2275 : self.check_permission(Some(tenant_id))?;
2276 :
2277 : COMPUTE_COMMANDS_COUNTERS
2278 : .for_command(ComputeCommandKind::Basebackup)
2279 : .inc();
2280 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2281 0 : let res = async {
2282 0 : self.handle_basebackup_request(
2283 0 : pgb,
2284 0 : tenant_id,
2285 0 : timeline_id,
2286 0 : lsn,
2287 0 : None,
2288 0 : false,
2289 0 : gzip,
2290 0 : replica,
2291 0 : &ctx,
2292 0 : )
2293 0 : .await?;
2294 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2295 0 : Result::<(), QueryError>::Ok(())
2296 0 : }
2297 : .await;
2298 : metric_recording.observe(&res);
2299 : res?;
2300 : }
2301 : // same as basebackup, but result includes relational data as well
2302 : PageServiceCmd::FullBackup(FullBackupCmd {
2303 : tenant_id,
2304 : timeline_id,
2305 : lsn,
2306 : prev_lsn,
2307 : }) => {
2308 : tracing::Span::current()
2309 : .record("tenant_id", field::display(tenant_id))
2310 : .record("timeline_id", field::display(timeline_id));
2311 :
2312 : self.check_permission(Some(tenant_id))?;
2313 :
2314 : COMPUTE_COMMANDS_COUNTERS
2315 : .for_command(ComputeCommandKind::Fullbackup)
2316 : .inc();
2317 :
2318 : // Check that the timeline exists
2319 : self.handle_basebackup_request(
2320 : pgb,
2321 : tenant_id,
2322 : timeline_id,
2323 : lsn,
2324 : prev_lsn,
2325 : true,
2326 : false,
2327 : false,
2328 : &ctx,
2329 : )
2330 : .await?;
2331 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2332 : }
2333 : PageServiceCmd::Set => {
2334 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2335 : // on connect
2336 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2337 : }
2338 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2339 : tenant_shard_id,
2340 : timeline_id,
2341 : lsn,
2342 : }) => {
2343 : tracing::Span::current()
2344 : .record("tenant_id", field::display(tenant_shard_id))
2345 : .record("timeline_id", field::display(timeline_id));
2346 :
2347 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2348 :
2349 : COMPUTE_COMMANDS_COUNTERS
2350 : .for_command(ComputeCommandKind::LeaseLsn)
2351 : .inc();
2352 :
2353 : match self
2354 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2355 : .await
2356 : {
2357 : Ok(()) => {
2358 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2359 : }
2360 : Err(e) => {
2361 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2362 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2363 : &e.to_string(),
2364 : Some(e.pg_error_code()),
2365 : ))?
2366 : }
2367 : };
2368 : }
2369 : }
2370 :
2371 : Ok(())
2372 : }
2373 : }
2374 :
2375 : impl From<GetActiveTenantError> for QueryError {
2376 0 : fn from(e: GetActiveTenantError) -> Self {
2377 0 : match e {
2378 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2379 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2380 0 : ),
2381 : GetActiveTenantError::Cancelled
2382 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2383 0 : QueryError::Shutdown
2384 : }
2385 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2386 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2387 : }
2388 0 : }
2389 : }
2390 :
2391 : #[derive(Debug, thiserror::Error)]
2392 : pub(crate) enum GetActiveTimelineError {
2393 : #[error(transparent)]
2394 : Tenant(GetActiveTenantError),
2395 : #[error(transparent)]
2396 : Timeline(#[from] GetTimelineError),
2397 : }
2398 :
2399 : impl From<GetActiveTimelineError> for QueryError {
2400 0 : fn from(e: GetActiveTimelineError) -> Self {
2401 0 : match e {
2402 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2403 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2404 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2405 : }
2406 0 : }
2407 : }
2408 :
2409 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2410 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2411 0 : tracing::Span::current().record(
2412 0 : "shard_id",
2413 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2414 0 : );
2415 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2416 0 : }
2417 :
2418 : struct WaitedForLsn(Lsn);
2419 : impl From<WaitedForLsn> for Lsn {
2420 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2421 0 : lsn
2422 0 : }
2423 : }
2424 :
2425 : #[cfg(test)]
2426 : mod tests {
2427 : use utils::shard::ShardCount;
2428 :
2429 : use super::*;
2430 :
2431 : #[test]
2432 2 : fn pageservice_cmd_parse() {
2433 2 : let tenant_id = TenantId::generate();
2434 2 : let timeline_id = TimelineId::generate();
2435 2 : let cmd =
2436 2 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2437 2 : assert_eq!(
2438 2 : cmd,
2439 2 : PageServiceCmd::PageStream(PageStreamCmd {
2440 2 : tenant_id,
2441 2 : timeline_id,
2442 2 : protocol_version: PagestreamProtocolVersion::V2,
2443 2 : })
2444 2 : );
2445 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2446 2 : assert_eq!(
2447 2 : cmd,
2448 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2449 2 : tenant_id,
2450 2 : timeline_id,
2451 2 : lsn: None,
2452 2 : gzip: false,
2453 2 : replica: false
2454 2 : })
2455 2 : );
2456 2 : let cmd =
2457 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2458 2 : assert_eq!(
2459 2 : cmd,
2460 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2461 2 : tenant_id,
2462 2 : timeline_id,
2463 2 : lsn: None,
2464 2 : gzip: true,
2465 2 : replica: false
2466 2 : })
2467 2 : );
2468 2 : let cmd =
2469 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2470 2 : assert_eq!(
2471 2 : cmd,
2472 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2473 2 : tenant_id,
2474 2 : timeline_id,
2475 2 : lsn: None,
2476 2 : gzip: false,
2477 2 : replica: false
2478 2 : })
2479 2 : );
2480 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2481 2 : .unwrap();
2482 2 : assert_eq!(
2483 2 : cmd,
2484 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2485 2 : tenant_id,
2486 2 : timeline_id,
2487 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2488 2 : gzip: false,
2489 2 : replica: false
2490 2 : })
2491 2 : );
2492 2 : let cmd = PageServiceCmd::parse(&format!(
2493 2 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2494 2 : ))
2495 2 : .unwrap();
2496 2 : assert_eq!(
2497 2 : cmd,
2498 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2499 2 : tenant_id,
2500 2 : timeline_id,
2501 2 : lsn: None,
2502 2 : gzip: true,
2503 2 : replica: true
2504 2 : })
2505 2 : );
2506 2 : let cmd = PageServiceCmd::parse(&format!(
2507 2 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2508 2 : ))
2509 2 : .unwrap();
2510 2 : assert_eq!(
2511 2 : cmd,
2512 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2513 2 : tenant_id,
2514 2 : timeline_id,
2515 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2516 2 : gzip: true,
2517 2 : replica: true
2518 2 : })
2519 2 : );
2520 2 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2521 2 : assert_eq!(
2522 2 : cmd,
2523 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2524 2 : tenant_id,
2525 2 : timeline_id,
2526 2 : lsn: None,
2527 2 : prev_lsn: None
2528 2 : })
2529 2 : );
2530 2 : let cmd = PageServiceCmd::parse(&format!(
2531 2 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2532 2 : ))
2533 2 : .unwrap();
2534 2 : assert_eq!(
2535 2 : cmd,
2536 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2537 2 : tenant_id,
2538 2 : timeline_id,
2539 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2540 2 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2541 2 : })
2542 2 : );
2543 2 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2544 2 : let cmd = PageServiceCmd::parse(&format!(
2545 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2546 2 : ))
2547 2 : .unwrap();
2548 2 : assert_eq!(
2549 2 : cmd,
2550 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2551 2 : tenant_shard_id,
2552 2 : timeline_id,
2553 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2554 2 : })
2555 2 : );
2556 2 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2557 2 : let cmd = PageServiceCmd::parse(&format!(
2558 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2559 2 : ))
2560 2 : .unwrap();
2561 2 : assert_eq!(
2562 2 : cmd,
2563 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2564 2 : tenant_shard_id,
2565 2 : timeline_id,
2566 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2567 2 : })
2568 2 : );
2569 2 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2570 2 : assert_eq!(cmd, PageServiceCmd::Set);
2571 2 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2572 2 : assert_eq!(cmd, PageServiceCmd::Set);
2573 2 : }
2574 :
2575 : #[test]
2576 2 : fn pageservice_cmd_err_handling() {
2577 2 : let tenant_id = TenantId::generate();
2578 2 : let timeline_id = TimelineId::generate();
2579 2 : let cmd = PageServiceCmd::parse("unknown_command");
2580 2 : assert!(cmd.is_err());
2581 2 : let cmd = PageServiceCmd::parse("pagestream_v2");
2582 2 : assert!(cmd.is_err());
2583 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2584 2 : assert!(cmd.is_err());
2585 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2586 2 : assert!(cmd.is_err());
2587 2 : let cmd = PageServiceCmd::parse(&format!(
2588 2 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2589 2 : ));
2590 2 : assert!(cmd.is_err());
2591 2 : let cmd = PageServiceCmd::parse(&format!(
2592 2 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2593 2 : ));
2594 2 : assert!(cmd.is_err());
2595 2 : let cmd = PageServiceCmd::parse(&format!(
2596 2 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2597 2 : ));
2598 2 : assert!(cmd.is_err());
2599 2 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2600 2 : assert!(cmd.is_err());
2601 2 : }
2602 : }
|