Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::config::{
11 : PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
12 : PageServiceProtocolPipelinedExecutionStrategy,
13 : };
14 : use pageserver_api::models::{self, TenantState};
15 : use pageserver_api::models::{
16 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
17 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
18 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
19 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
20 : PagestreamProtocolVersion, PagestreamRequest,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{
24 : is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError,
25 : };
26 : use pq_proto::framed::ConnectionError;
27 : use pq_proto::FeStartupPacket;
28 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
29 : use std::borrow::Cow;
30 : use std::io;
31 : use std::num::NonZeroUsize;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::SystemTime;
36 : use std::time::{Duration, Instant};
37 : use tokio::io::{AsyncRead, AsyncWrite};
38 : use tokio::io::{AsyncWriteExt, BufWriter};
39 : use tokio::task::JoinHandle;
40 : use tokio_util::sync::CancellationToken;
41 : use tracing::*;
42 : use utils::sync::spsc_fold;
43 : use utils::{
44 : auth::{Claims, Scope, SwappableJwtAuth},
45 : id::{TenantId, TimelineId},
46 : lsn::Lsn,
47 : simple_rcu::RcuReadGuard,
48 : };
49 :
50 : use crate::auth::check_permission;
51 : use crate::basebackup::BasebackupError;
52 : use crate::config::PageServerConf;
53 : use crate::context::{DownloadBehavior, RequestContext};
54 : use crate::metrics::{self, SmgrOpTimer};
55 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
56 : use crate::pgdatadir_mapping::Version;
57 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
58 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
59 : use crate::task_mgr::TaskKind;
60 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
61 : use crate::tenant::mgr::ShardSelector;
62 : use crate::tenant::mgr::TenantManager;
63 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
64 : use crate::tenant::timeline::{self, WaitLsnError};
65 : use crate::tenant::GetTimelineError;
66 : use crate::tenant::PageReconstructError;
67 : use crate::tenant::Timeline;
68 : use crate::{basebackup, timed_after_cancellation};
69 : use pageserver_api::key::rel_block_to_key;
70 : use pageserver_api::reltag::SlruKind;
71 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
72 : use postgres_ffi::BLCKSZ;
73 :
74 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
75 : /// is not yet in state [`TenantState::Active`].
76 : ///
77 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
78 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
79 :
80 : ///////////////////////////////////////////////////////////////////////////////
81 :
82 : pub struct Listener {
83 : cancel: CancellationToken,
84 : /// Cancel the listener task through `listen_cancel` to shut down the listener
85 : /// and get a handle on the existing connections.
86 : task: JoinHandle<Connections>,
87 : }
88 :
89 : pub struct Connections {
90 : cancel: CancellationToken,
91 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
92 : }
93 :
94 0 : pub fn spawn(
95 0 : conf: &'static PageServerConf,
96 0 : tenant_manager: Arc<TenantManager>,
97 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
98 0 : tcp_listener: tokio::net::TcpListener,
99 0 : ) -> Listener {
100 0 : let cancel = CancellationToken::new();
101 0 : let libpq_ctx = RequestContext::todo_child(
102 0 : TaskKind::LibpqEndpointListener,
103 0 : // listener task shouldn't need to download anything. (We will
104 0 : // create a separate sub-contexts for each connection, with their
105 0 : // own download behavior. This context is used only to listen and
106 0 : // accept connections.)
107 0 : DownloadBehavior::Error,
108 0 : );
109 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
110 0 : "libpq listener",
111 0 : libpq_listener_main(
112 0 : tenant_manager,
113 0 : pg_auth,
114 0 : tcp_listener,
115 0 : conf.pg_auth_type,
116 0 : conf.page_service_pipelining.clone(),
117 0 : libpq_ctx,
118 0 : cancel.clone(),
119 0 : )
120 0 : .map(anyhow::Ok),
121 0 : ));
122 0 :
123 0 : Listener { cancel, task }
124 0 : }
125 :
126 : impl Listener {
127 0 : pub async fn stop_accepting(self) -> Connections {
128 0 : self.cancel.cancel();
129 0 : self.task
130 0 : .await
131 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
132 0 : }
133 : }
134 : impl Connections {
135 0 : pub(crate) async fn shutdown(self) {
136 0 : let Self { cancel, mut tasks } = self;
137 0 : cancel.cancel();
138 0 : while let Some(res) = tasks.join_next().await {
139 0 : Self::handle_connection_completion(res);
140 0 : }
141 0 : }
142 :
143 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
144 0 : match res {
145 0 : Ok(Ok(())) => {}
146 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
147 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
148 : }
149 0 : }
150 : }
151 :
152 : ///
153 : /// Main loop of the page service.
154 : ///
155 : /// Listens for connections, and launches a new handler task for each.
156 : ///
157 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
158 : /// open connections.
159 : ///
160 0 : pub async fn libpq_listener_main(
161 0 : tenant_manager: Arc<TenantManager>,
162 0 : auth: Option<Arc<SwappableJwtAuth>>,
163 0 : listener: tokio::net::TcpListener,
164 0 : auth_type: AuthType,
165 0 : pipelining_config: PageServicePipeliningConfig,
166 0 : listener_ctx: RequestContext,
167 0 : listener_cancel: CancellationToken,
168 0 : ) -> Connections {
169 0 : let connections_cancel = CancellationToken::new();
170 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
171 :
172 : loop {
173 0 : let accepted = tokio::select! {
174 : biased;
175 0 : _ = listener_cancel.cancelled() => break,
176 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
177 0 : let res = next.expect("we dont poll while empty");
178 0 : Connections::handle_connection_completion(res);
179 0 : continue;
180 : }
181 0 : accepted = listener.accept() => accepted,
182 0 : };
183 0 :
184 0 : match accepted {
185 0 : Ok((socket, peer_addr)) => {
186 0 : // Connection established. Spawn a new task to handle it.
187 0 : debug!("accepted connection from {}", peer_addr);
188 0 : let local_auth = auth.clone();
189 0 : let connection_ctx = listener_ctx
190 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
191 0 : connection_handler_tasks.spawn(page_service_conn_main(
192 0 : tenant_manager.clone(),
193 0 : local_auth,
194 0 : socket,
195 0 : auth_type,
196 0 : pipelining_config.clone(),
197 0 : connection_ctx,
198 0 : connections_cancel.child_token(),
199 0 : ));
200 : }
201 0 : Err(err) => {
202 0 : // accept() failed. Log the error, and loop back to retry on next connection.
203 0 : error!("accept() failed: {:?}", err);
204 : }
205 : }
206 : }
207 :
208 0 : debug!("page_service listener loop terminated");
209 :
210 0 : Connections {
211 0 : cancel: connections_cancel,
212 0 : tasks: connection_handler_tasks,
213 0 : }
214 0 : }
215 :
216 : type ConnectionHandlerResult = anyhow::Result<()>;
217 :
218 : #[instrument(skip_all, fields(peer_addr))]
219 : async fn page_service_conn_main(
220 : tenant_manager: Arc<TenantManager>,
221 : auth: Option<Arc<SwappableJwtAuth>>,
222 : socket: tokio::net::TcpStream,
223 : auth_type: AuthType,
224 : pipelining_config: PageServicePipeliningConfig,
225 : connection_ctx: RequestContext,
226 : cancel: CancellationToken,
227 : ) -> ConnectionHandlerResult {
228 : let _guard = LIVE_CONNECTIONS
229 : .with_label_values(&["page_service"])
230 : .guard();
231 :
232 : socket
233 : .set_nodelay(true)
234 : .context("could not set TCP_NODELAY")?;
235 :
236 : let peer_addr = socket.peer_addr().context("get peer address")?;
237 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
238 :
239 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
240 : // - long enough for most valid compute connections
241 : // - less than infinite to stop us from "leaking" connections to long-gone computes
242 : //
243 : // no write timeout is used, because the kernel is assumed to error writes after some time.
244 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
245 :
246 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
247 0 : let socket_timeout_ms = (|| {
248 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
249 : // Exponential distribution for simulating
250 : // poor network conditions, expect about avg_timeout_ms to be around 15
251 : // in tests
252 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
253 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
254 0 : let u = rand::random::<f32>();
255 0 : ((1.0 - u).ln() / (-avg)) as u64
256 : } else {
257 0 : default_timeout_ms
258 : }
259 0 : });
260 0 : default_timeout_ms
261 : })();
262 :
263 : // A timeout here does not mean the client died, it can happen if it's just idle for
264 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
265 : // they reconnect.
266 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
267 : let socket = Box::pin(socket);
268 :
269 : fail::fail_point!("ps::connection-start::pre-login");
270 :
271 : // XXX: pgbackend.run() should take the connection_ctx,
272 : // and create a child per-query context when it invokes process_query.
273 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
274 : // and create the per-query context in process_query ourselves.
275 : let mut conn_handler = PageServerHandler::new(
276 : tenant_manager,
277 : auth,
278 : pipelining_config,
279 : connection_ctx,
280 : cancel.clone(),
281 : );
282 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
283 :
284 : match pgbackend.run(&mut conn_handler, &cancel).await {
285 : Ok(()) => {
286 : // we've been requested to shut down
287 : Ok(())
288 : }
289 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
290 : if is_expected_io_error(&io_error) {
291 : info!("Postgres client disconnected ({io_error})");
292 : Ok(())
293 : } else {
294 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
295 : Err(io_error).context(format!(
296 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
297 : tenant_id, peer_addr
298 : ))
299 : }
300 : }
301 : other => {
302 : let tenant_id = conn_handler.timeline_handles.as_ref().unwrap().tenant_id();
303 : other.context(format!(
304 : "Postgres query error for tenant_id={:?} client peer_addr={}",
305 : tenant_id, peer_addr
306 : ))
307 : }
308 : }
309 : }
310 :
311 : struct PageServerHandler {
312 : auth: Option<Arc<SwappableJwtAuth>>,
313 : claims: Option<Claims>,
314 :
315 : /// The context created for the lifetime of the connection
316 : /// services by this PageServerHandler.
317 : /// For each query received over the connection,
318 : /// `process_query` creates a child context from this one.
319 : connection_ctx: RequestContext,
320 :
321 : cancel: CancellationToken,
322 :
323 : /// None only while pagestream protocol is being processed.
324 : timeline_handles: Option<TimelineHandles>,
325 :
326 : pipelining_config: PageServicePipeliningConfig,
327 : }
328 :
329 : struct TimelineHandles {
330 : wrapper: TenantManagerWrapper,
331 : /// Note on size: the typical size of this map is 1. The largest size we expect
332 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
333 : /// or the ratio used when splitting shards (i.e. how many children created from one)
334 : /// parent shard, where a "large" number might be ~8.
335 : handles: timeline::handle::Cache<TenantManagerTypes>,
336 : }
337 :
338 : impl TimelineHandles {
339 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
340 0 : Self {
341 0 : wrapper: TenantManagerWrapper {
342 0 : tenant_manager,
343 0 : tenant_id: OnceCell::new(),
344 0 : },
345 0 : handles: Default::default(),
346 0 : }
347 0 : }
348 0 : async fn get(
349 0 : &mut self,
350 0 : tenant_id: TenantId,
351 0 : timeline_id: TimelineId,
352 0 : shard_selector: ShardSelector,
353 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
354 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
355 0 : return Err(GetActiveTimelineError::Tenant(
356 0 : GetActiveTenantError::SwitchedTenant,
357 0 : ));
358 0 : }
359 0 : self.handles
360 0 : .get(timeline_id, shard_selector, &self.wrapper)
361 0 : .await
362 0 : .map_err(|e| match e {
363 0 : timeline::handle::GetError::TenantManager(e) => e,
364 : timeline::handle::GetError::TimelineGateClosed => {
365 0 : trace!("timeline gate closed");
366 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
367 : }
368 : timeline::handle::GetError::PerTimelineStateShutDown => {
369 0 : trace!("per-timeline state shut down");
370 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
371 : }
372 0 : })
373 0 : }
374 :
375 0 : fn tenant_id(&self) -> Option<TenantId> {
376 0 : self.wrapper.tenant_id.get().copied()
377 0 : }
378 : }
379 :
380 : pub(crate) struct TenantManagerWrapper {
381 : tenant_manager: Arc<TenantManager>,
382 : // We do not support switching tenant_id on a connection at this point.
383 : // We can can add support for this later if needed without changing
384 : // the protocol.
385 : tenant_id: once_cell::sync::OnceCell<TenantId>,
386 : }
387 :
388 : #[derive(Debug)]
389 : pub(crate) struct TenantManagerTypes;
390 :
391 : impl timeline::handle::Types for TenantManagerTypes {
392 : type TenantManagerError = GetActiveTimelineError;
393 : type TenantManager = TenantManagerWrapper;
394 : type Timeline = Arc<Timeline>;
395 : }
396 :
397 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
398 0 : fn gate(&self) -> &utils::sync::gate::Gate {
399 0 : &self.gate
400 0 : }
401 :
402 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
403 0 : Timeline::shard_timeline_id(self)
404 0 : }
405 :
406 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
407 0 : &self.handles
408 0 : }
409 :
410 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
411 0 : Timeline::get_shard_identity(self)
412 0 : }
413 : }
414 :
415 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
416 0 : async fn resolve(
417 0 : &self,
418 0 : timeline_id: TimelineId,
419 0 : shard_selector: ShardSelector,
420 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
421 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
422 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
423 0 : let wait_start = Instant::now();
424 0 : let deadline = wait_start + timeout;
425 0 : let tenant_shard = loop {
426 0 : let resolved = self
427 0 : .tenant_manager
428 0 : .resolve_attached_shard(tenant_id, shard_selector);
429 0 : match resolved {
430 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
431 : ShardResolveResult::NotFound => {
432 0 : return Err(GetActiveTimelineError::Tenant(
433 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
434 0 : ));
435 : }
436 0 : ShardResolveResult::InProgress(barrier) => {
437 0 : // We can't authoritatively answer right now: wait for InProgress state
438 0 : // to end, then try again
439 0 : tokio::select! {
440 0 : _ = barrier.wait() => {
441 0 : // The barrier completed: proceed around the loop to try looking up again
442 0 : },
443 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
444 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
445 0 : latest_state: None,
446 0 : wait_time: timeout,
447 0 : }));
448 : }
449 : }
450 : }
451 : };
452 : };
453 :
454 0 : tracing::debug!("Waiting for tenant to enter active state...");
455 0 : tenant_shard
456 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
457 0 : .await
458 0 : .map_err(GetActiveTimelineError::Tenant)?;
459 :
460 0 : let timeline = tenant_shard
461 0 : .get_timeline(timeline_id, true)
462 0 : .map_err(GetActiveTimelineError::Timeline)?;
463 0 : set_tracing_field_shard_id(&timeline);
464 0 : Ok(timeline)
465 0 : }
466 : }
467 :
468 : #[derive(thiserror::Error, Debug)]
469 : enum PageStreamError {
470 : /// We encountered an error that should prompt the client to reconnect:
471 : /// in practice this means we drop the connection without sending a response.
472 : #[error("Reconnect required: {0}")]
473 : Reconnect(Cow<'static, str>),
474 :
475 : /// We were instructed to shutdown while processing the query
476 : #[error("Shutting down")]
477 : Shutdown,
478 :
479 : /// Something went wrong reading a page: this likely indicates a pageserver bug
480 : #[error("Read error")]
481 : Read(#[source] PageReconstructError),
482 :
483 : /// Ran out of time waiting for an LSN
484 : #[error("LSN timeout: {0}")]
485 : LsnTimeout(WaitLsnError),
486 :
487 : /// The entity required to serve the request (tenant or timeline) is not found,
488 : /// or is not found in a suitable state to serve a request.
489 : #[error("Not found: {0}")]
490 : NotFound(Cow<'static, str>),
491 :
492 : /// Request asked for something that doesn't make sense, like an invalid LSN
493 : #[error("Bad request: {0}")]
494 : BadRequest(Cow<'static, str>),
495 : }
496 :
497 : impl From<PageReconstructError> for PageStreamError {
498 0 : fn from(value: PageReconstructError) -> Self {
499 0 : match value {
500 0 : PageReconstructError::Cancelled => Self::Shutdown,
501 0 : e => Self::Read(e),
502 : }
503 0 : }
504 : }
505 :
506 : impl From<GetActiveTimelineError> for PageStreamError {
507 0 : fn from(value: GetActiveTimelineError) -> Self {
508 0 : match value {
509 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
510 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
511 : TenantState::Stopping { .. },
512 : ))
513 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
514 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
515 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
516 : }
517 0 : }
518 : }
519 :
520 : impl From<WaitLsnError> for PageStreamError {
521 0 : fn from(value: WaitLsnError) -> Self {
522 0 : match value {
523 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
524 0 : WaitLsnError::Shutdown => Self::Shutdown,
525 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
526 : }
527 0 : }
528 : }
529 :
530 : impl From<WaitLsnError> for QueryError {
531 0 : fn from(value: WaitLsnError) -> Self {
532 0 : match value {
533 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
534 0 : WaitLsnError::Shutdown => Self::Shutdown,
535 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
536 : }
537 0 : }
538 : }
539 :
540 : #[derive(thiserror::Error, Debug)]
541 : struct BatchedPageStreamError {
542 : req: PagestreamRequest,
543 : err: PageStreamError,
544 : }
545 :
546 : impl std::fmt::Display for BatchedPageStreamError {
547 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
548 0 : self.err.fmt(f)
549 0 : }
550 : }
551 :
552 : struct BatchedGetPageRequest {
553 : req: PagestreamGetPageRequest,
554 : timer: SmgrOpTimer,
555 : }
556 :
557 : enum BatchedFeMessage {
558 : Exists {
559 : span: Span,
560 : timer: SmgrOpTimer,
561 : shard: timeline::handle::Handle<TenantManagerTypes>,
562 : req: models::PagestreamExistsRequest,
563 : },
564 : Nblocks {
565 : span: Span,
566 : timer: SmgrOpTimer,
567 : shard: timeline::handle::Handle<TenantManagerTypes>,
568 : req: models::PagestreamNblocksRequest,
569 : },
570 : GetPage {
571 : span: Span,
572 : shard: timeline::handle::Handle<TenantManagerTypes>,
573 : effective_request_lsn: Lsn,
574 : pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
575 : },
576 : DbSize {
577 : span: Span,
578 : timer: SmgrOpTimer,
579 : shard: timeline::handle::Handle<TenantManagerTypes>,
580 : req: models::PagestreamDbSizeRequest,
581 : },
582 : GetSlruSegment {
583 : span: Span,
584 : timer: SmgrOpTimer,
585 : shard: timeline::handle::Handle<TenantManagerTypes>,
586 : req: models::PagestreamGetSlruSegmentRequest,
587 : },
588 : RespondError {
589 : span: Span,
590 : error: BatchedPageStreamError,
591 : },
592 : }
593 :
594 : impl BatchedFeMessage {
595 0 : async fn throttle_and_record_start_processing(
596 0 : &mut self,
597 0 : cancel: &CancellationToken,
598 0 : ) -> Result<(), QueryError> {
599 0 : let (shard, tokens, timers) = match self {
600 0 : BatchedFeMessage::Exists { shard, timer, .. }
601 0 : | BatchedFeMessage::Nblocks { shard, timer, .. }
602 0 : | BatchedFeMessage::DbSize { shard, timer, .. }
603 0 : | BatchedFeMessage::GetSlruSegment { shard, timer, .. } => {
604 0 : (
605 0 : shard,
606 0 : // 1 token is probably under-estimating because these
607 0 : // request handlers typically do several Timeline::get calls.
608 0 : 1,
609 0 : itertools::Either::Left(std::iter::once(timer)),
610 0 : )
611 : }
612 0 : BatchedFeMessage::GetPage { shard, pages, .. } => (
613 0 : shard,
614 0 : pages.len(),
615 0 : itertools::Either::Right(pages.iter_mut().map(|p| &mut p.timer)),
616 0 : ),
617 0 : BatchedFeMessage::RespondError { .. } => return Ok(()),
618 : };
619 0 : let throttled = tokio::select! {
620 0 : throttled = shard.pagestream_throttle.throttle(tokens) => { throttled }
621 0 : _ = shard.cancel.cancelled() => {
622 0 : return Err(QueryError::Shutdown);
623 : }
624 0 : _ = cancel.cancelled() => {
625 0 : return Err(QueryError::Shutdown);
626 : }
627 : };
628 0 : for timer in timers {
629 0 : timer.observe_throttle_done_execution_starting(&throttled);
630 0 : }
631 0 : Ok(())
632 0 : }
633 : }
634 :
635 : impl PageServerHandler {
636 0 : pub fn new(
637 0 : tenant_manager: Arc<TenantManager>,
638 0 : auth: Option<Arc<SwappableJwtAuth>>,
639 0 : pipelining_config: PageServicePipeliningConfig,
640 0 : connection_ctx: RequestContext,
641 0 : cancel: CancellationToken,
642 0 : ) -> Self {
643 0 : PageServerHandler {
644 0 : auth,
645 0 : claims: None,
646 0 : connection_ctx,
647 0 : timeline_handles: Some(TimelineHandles::new(tenant_manager)),
648 0 : cancel,
649 0 : pipelining_config,
650 0 : }
651 0 : }
652 :
653 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
654 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
655 : /// cancellation if there aren't any timelines in the cache.
656 : ///
657 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
658 : /// timeline cancellation token.
659 0 : async fn flush_cancellable<IO>(
660 0 : &self,
661 0 : pgb: &mut PostgresBackend<IO>,
662 0 : cancel: &CancellationToken,
663 0 : ) -> Result<(), QueryError>
664 0 : where
665 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
666 0 : {
667 0 : tokio::select!(
668 0 : flush_r = pgb.flush() => {
669 0 : Ok(flush_r?)
670 : },
671 0 : _ = cancel.cancelled() => {
672 0 : Err(QueryError::Shutdown)
673 : }
674 : )
675 0 : }
676 :
677 : #[allow(clippy::too_many_arguments)]
678 0 : async fn pagestream_read_message<IO>(
679 0 : pgb: &mut PostgresBackendReader<IO>,
680 0 : tenant_id: TenantId,
681 0 : timeline_id: TimelineId,
682 0 : timeline_handles: &mut TimelineHandles,
683 0 : cancel: &CancellationToken,
684 0 : ctx: &RequestContext,
685 0 : protocol_version: PagestreamProtocolVersion,
686 0 : parent_span: Span,
687 0 : ) -> Result<Option<BatchedFeMessage>, QueryError>
688 0 : where
689 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
690 0 : {
691 0 : let msg = tokio::select! {
692 : biased;
693 0 : _ = cancel.cancelled() => {
694 0 : return Err(QueryError::Shutdown)
695 : }
696 0 : msg = pgb.read_message() => { msg }
697 0 : };
698 0 :
699 0 : let received_at = Instant::now();
700 :
701 0 : let copy_data_bytes = match msg? {
702 0 : Some(FeMessage::CopyData(bytes)) => bytes,
703 : Some(FeMessage::Terminate) => {
704 0 : return Ok(None);
705 : }
706 0 : Some(m) => {
707 0 : return Err(QueryError::Other(anyhow::anyhow!(
708 0 : "unexpected message: {m:?} during COPY"
709 0 : )));
710 : }
711 : None => {
712 0 : return Ok(None);
713 : } // client disconnected
714 : };
715 0 : trace!("query: {copy_data_bytes:?}");
716 :
717 0 : fail::fail_point!("ps::handle-pagerequest-message");
718 :
719 : // parse request
720 0 : let neon_fe_msg =
721 0 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
722 :
723 0 : let batched_msg = match neon_fe_msg {
724 0 : PagestreamFeMessage::Exists(req) => {
725 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
726 0 : let shard = timeline_handles
727 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
728 0 : .instrument(span.clone()) // sets `shard_id` field
729 0 : .await?;
730 0 : let timer = shard
731 0 : .query_metrics
732 0 : .start_smgr_op(metrics::SmgrQueryType::GetRelExists, received_at);
733 0 : BatchedFeMessage::Exists {
734 0 : span,
735 0 : timer,
736 0 : shard,
737 0 : req,
738 0 : }
739 : }
740 0 : PagestreamFeMessage::Nblocks(req) => {
741 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
742 0 : let shard = timeline_handles
743 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
744 0 : .instrument(span.clone()) // sets `shard_id` field
745 0 : .await?;
746 0 : let timer = shard
747 0 : .query_metrics
748 0 : .start_smgr_op(metrics::SmgrQueryType::GetRelSize, received_at);
749 0 : BatchedFeMessage::Nblocks {
750 0 : span,
751 0 : timer,
752 0 : shard,
753 0 : req,
754 0 : }
755 : }
756 0 : PagestreamFeMessage::DbSize(req) => {
757 0 : let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn);
758 0 : let shard = timeline_handles
759 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
760 0 : .instrument(span.clone()) // sets `shard_id` field
761 0 : .await?;
762 0 : let timer = shard
763 0 : .query_metrics
764 0 : .start_smgr_op(metrics::SmgrQueryType::GetDbSize, received_at);
765 0 : BatchedFeMessage::DbSize {
766 0 : span,
767 0 : timer,
768 0 : shard,
769 0 : req,
770 0 : }
771 : }
772 0 : PagestreamFeMessage::GetSlruSegment(req) => {
773 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn);
774 0 : let shard = timeline_handles
775 0 : .get(tenant_id, timeline_id, ShardSelector::Zero)
776 0 : .instrument(span.clone()) // sets `shard_id` field
777 0 : .await?;
778 0 : let timer = shard
779 0 : .query_metrics
780 0 : .start_smgr_op(metrics::SmgrQueryType::GetSlruSegment, received_at);
781 0 : BatchedFeMessage::GetSlruSegment {
782 0 : span,
783 0 : timer,
784 0 : shard,
785 0 : req,
786 0 : }
787 : }
788 0 : PagestreamFeMessage::GetPage(req) => {
789 0 : let span = tracing::info_span!(parent: parent_span, "handle_get_page_at_lsn_request_batched", req_lsn = %req.hdr.request_lsn);
790 :
791 : macro_rules! respond_error {
792 : ($error:expr) => {{
793 : let error = BatchedFeMessage::RespondError {
794 : span,
795 : error: BatchedPageStreamError {
796 : req: req.hdr,
797 : err: $error,
798 : },
799 : };
800 : Ok(Some(error))
801 : }};
802 : }
803 :
804 0 : let key = rel_block_to_key(req.rel, req.blkno);
805 0 : let shard = match timeline_handles
806 0 : .get(tenant_id, timeline_id, ShardSelector::Page(key))
807 0 : .instrument(span.clone()) // sets `shard_id` field
808 0 : .await
809 : {
810 0 : Ok(tl) => tl,
811 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
812 : // We already know this tenant exists in general, because we resolved it at
813 : // start of connection. Getting a NotFound here indicates that the shard containing
814 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
815 : // mapping is out of date.
816 : //
817 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
818 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
819 : // and talk to a different pageserver.
820 0 : return respond_error!(PageStreamError::Reconnect(
821 0 : "getpage@lsn request routed to wrong shard".into()
822 0 : ));
823 : }
824 0 : Err(e) => {
825 0 : return respond_error!(e.into());
826 : }
827 : };
828 :
829 : // It's important to start the timer before waiting for the LSN
830 : // so that the _started counters are incremented before we do
831 : // any serious waiting, e.g., for LSNs.
832 0 : let timer = shard
833 0 : .query_metrics
834 0 : .start_smgr_op(metrics::SmgrQueryType::GetPageAtLsn, received_at);
835 :
836 0 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
837 0 : &shard,
838 0 : req.hdr.request_lsn,
839 0 : req.hdr.not_modified_since,
840 0 : &shard.get_latest_gc_cutoff_lsn(),
841 0 : ctx,
842 0 : )
843 0 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
844 0 : .await
845 : {
846 0 : Ok(lsn) => lsn,
847 0 : Err(e) => {
848 0 : return respond_error!(e);
849 : }
850 : };
851 : BatchedFeMessage::GetPage {
852 0 : span,
853 0 : shard,
854 0 : effective_request_lsn,
855 0 : pages: smallvec::smallvec![BatchedGetPageRequest { req, timer }],
856 : }
857 : }
858 : };
859 0 : Ok(Some(batched_msg))
860 0 : }
861 :
862 : /// Post-condition: `batch` is Some()
863 : #[instrument(skip_all, level = tracing::Level::TRACE)]
864 : #[allow(clippy::boxed_local)]
865 : fn pagestream_do_batch(
866 : max_batch_size: NonZeroUsize,
867 : batch: &mut Result<BatchedFeMessage, QueryError>,
868 : this_msg: Result<BatchedFeMessage, QueryError>,
869 : ) -> Result<(), Result<BatchedFeMessage, QueryError>> {
870 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
871 :
872 : let this_msg = match this_msg {
873 : Ok(this_msg) => this_msg,
874 : Err(e) => return Err(Err(e)),
875 : };
876 :
877 : match (&mut *batch, this_msg) {
878 : // something batched already, let's see if we can add this message to the batch
879 : (
880 : Ok(BatchedFeMessage::GetPage {
881 : span: _,
882 : shard: accum_shard,
883 : pages: ref mut accum_pages,
884 : effective_request_lsn: accum_lsn,
885 : }),
886 : BatchedFeMessage::GetPage {
887 : span: _,
888 : shard: this_shard,
889 : pages: this_pages,
890 : effective_request_lsn: this_lsn,
891 : },
892 0 : ) if (|| {
893 0 : assert_eq!(this_pages.len(), 1);
894 0 : if accum_pages.len() >= max_batch_size.get() {
895 0 : trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size");
896 0 : assert_eq!(accum_pages.len(), max_batch_size.get());
897 0 : return false;
898 0 : }
899 0 : if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
900 0 : != (this_shard.tenant_shard_id, this_shard.timeline_id)
901 : {
902 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
903 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
904 : // But the current logic for keeping responses in order does not support that.
905 0 : return false;
906 0 : }
907 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
908 0 : // as the effective request_lsn changes
909 0 : if *accum_lsn != this_lsn {
910 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
911 0 : return false;
912 0 : }
913 0 : true
914 : })() =>
915 : {
916 : // ok to batch
917 : accum_pages.extend(this_pages);
918 : Ok(())
919 : }
920 : // something batched already but this message is unbatchable
921 : (_, this_msg) => {
922 : // by default, don't continue batching
923 : Err(Ok(this_msg))
924 : }
925 : }
926 : }
927 :
928 0 : #[instrument(level = tracing::Level::DEBUG, skip_all)]
929 : async fn pagesteam_handle_batched_message<IO>(
930 : &mut self,
931 : pgb_writer: &mut PostgresBackend<IO>,
932 : batch: BatchedFeMessage,
933 : cancel: &CancellationToken,
934 : protocol_version: PagestreamProtocolVersion,
935 : ctx: &RequestContext,
936 : ) -> Result<(), QueryError>
937 : where
938 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
939 : {
940 : // invoke handler function
941 : let (handler_results, span): (
942 : Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
943 : _,
944 : ) = match batch {
945 : BatchedFeMessage::Exists {
946 : span,
947 : timer,
948 : shard,
949 : req,
950 : } => {
951 : fail::fail_point!("ps::handle-pagerequest-message::exists");
952 : (
953 : vec![self
954 : .handle_get_rel_exists_request(&shard, &req, ctx)
955 : .instrument(span.clone())
956 : .await
957 0 : .map(|msg| (msg, timer))
958 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
959 : span,
960 : )
961 : }
962 : BatchedFeMessage::Nblocks {
963 : span,
964 : timer,
965 : shard,
966 : req,
967 : } => {
968 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
969 : (
970 : vec![self
971 : .handle_get_nblocks_request(&shard, &req, ctx)
972 : .instrument(span.clone())
973 : .await
974 0 : .map(|msg| (msg, timer))
975 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
976 : span,
977 : )
978 : }
979 : BatchedFeMessage::GetPage {
980 : span,
981 : shard,
982 : effective_request_lsn,
983 : pages,
984 : } => {
985 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
986 : (
987 : {
988 : let npages = pages.len();
989 : trace!(npages, "handling getpage request");
990 : let res = self
991 : .handle_get_page_at_lsn_request_batched(
992 : &shard,
993 : effective_request_lsn,
994 : pages,
995 : ctx,
996 : )
997 : .instrument(span.clone())
998 : .await;
999 : assert_eq!(res.len(), npages);
1000 : res
1001 : },
1002 : span,
1003 : )
1004 : }
1005 : BatchedFeMessage::DbSize {
1006 : span,
1007 : timer,
1008 : shard,
1009 : req,
1010 : } => {
1011 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
1012 : (
1013 : vec![self
1014 : .handle_db_size_request(&shard, &req, ctx)
1015 : .instrument(span.clone())
1016 : .await
1017 0 : .map(|msg| (msg, timer))
1018 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1019 : span,
1020 : )
1021 : }
1022 : BatchedFeMessage::GetSlruSegment {
1023 : span,
1024 : timer,
1025 : shard,
1026 : req,
1027 : } => {
1028 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
1029 : (
1030 : vec![self
1031 : .handle_get_slru_segment_request(&shard, &req, ctx)
1032 : .instrument(span.clone())
1033 : .await
1034 0 : .map(|msg| (msg, timer))
1035 0 : .map_err(|err| BatchedPageStreamError { err, req: req.hdr })],
1036 : span,
1037 : )
1038 : }
1039 : BatchedFeMessage::RespondError { span, error } => {
1040 : // We've already decided to respond with an error, so we don't need to
1041 : // call the handler.
1042 : (vec![Err(error)], span)
1043 : }
1044 : };
1045 :
1046 : // Map handler result to protocol behavior.
1047 : // Some handler errors cause exit from pagestream protocol.
1048 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1049 : for handler_result in handler_results {
1050 : let (response_msg, timer) = match handler_result {
1051 : Err(e) => match &e.err {
1052 : PageStreamError::Shutdown => {
1053 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1054 : // shutdown, then do not send the error to the client. Instead just drop the
1055 : // connection.
1056 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1057 : return Err(QueryError::Shutdown);
1058 : }
1059 : PageStreamError::Reconnect(reason) => {
1060 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1061 : return Err(QueryError::Reconnect);
1062 : }
1063 : PageStreamError::Read(_)
1064 : | PageStreamError::LsnTimeout(_)
1065 : | PageStreamError::NotFound(_)
1066 : | PageStreamError::BadRequest(_) => {
1067 : // print the all details to the log with {:#}, but for the client the
1068 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1069 : // here includes cancellation which is not an error.
1070 : let full = utils::error::report_compact_sources(&e.err);
1071 0 : span.in_scope(|| {
1072 0 : error!("error reading relation or page version: {full:#}")
1073 0 : });
1074 : (
1075 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1076 : req: e.req,
1077 : message: e.err.to_string(),
1078 : }),
1079 : None, // TODO: measure errors
1080 : )
1081 : }
1082 : },
1083 : Ok((response_msg, timer)) => (response_msg, Some(timer)),
1084 : };
1085 :
1086 : //
1087 : // marshal & transmit response message
1088 : //
1089 :
1090 : pgb_writer.write_message_noflush(&BeMessage::CopyData(
1091 : &response_msg.serialize(protocol_version),
1092 : ))?;
1093 :
1094 : // We purposefully don't count flush time into the timer.
1095 : //
1096 : // The reason is that current compute client will not perform protocol processing
1097 : // if the postgres backend process is doing things other than `->smgr_read()`.
1098 : // This is especially the case for prefetch.
1099 : //
1100 : // If the compute doesn't read from the connection, eventually TCP will backpressure
1101 : // all the way into our flush call below.
1102 : //
1103 : // The timer's underlying metric is used for a storage-internal latency SLO and
1104 : // we don't want to include latency in it that we can't control.
1105 : // And as pointed out above, in this case, we don't control the time that flush will take.
1106 : let flushing_timer =
1107 0 : timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
1108 :
1109 : // what we want to do
1110 : let flush_fut = pgb_writer.flush();
1111 : // metric for how long flushing takes
1112 : let flush_fut = match flushing_timer {
1113 : Some(flushing_timer) => {
1114 : futures::future::Either::Left(flushing_timer.measure(flush_fut))
1115 : }
1116 : None => futures::future::Either::Right(flush_fut),
1117 : };
1118 : // do it while respecting cancellation
1119 0 : let _: () = async move {
1120 0 : tokio::select! {
1121 : biased;
1122 0 : _ = cancel.cancelled() => {
1123 : // We were requested to shut down.
1124 0 : info!("shutdown request received in page handler");
1125 0 : return Err(QueryError::Shutdown)
1126 : }
1127 0 : res = flush_fut => {
1128 0 : res?;
1129 : }
1130 : }
1131 0 : Ok(())
1132 0 : }
1133 : // and log the info! line inside the request span
1134 : .instrument(span.clone())
1135 : .await?;
1136 : }
1137 : Ok(())
1138 : }
1139 :
1140 : /// Pagestream sub-protocol handler.
1141 : ///
1142 : /// It is a simple request-response protocol inside a COPYBOTH session.
1143 : ///
1144 : /// # Coding Discipline
1145 : ///
1146 : /// Coding discipline within this function: all interaction with the `pgb` connection
1147 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1148 : /// This is so that we can shutdown page_service quickly.
1149 : #[instrument(skip_all)]
1150 : async fn handle_pagerequests<IO>(
1151 : &mut self,
1152 : pgb: &mut PostgresBackend<IO>,
1153 : tenant_id: TenantId,
1154 : timeline_id: TimelineId,
1155 : protocol_version: PagestreamProtocolVersion,
1156 : ctx: RequestContext,
1157 : ) -> Result<(), QueryError>
1158 : where
1159 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1160 : {
1161 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1162 :
1163 : // switch client to COPYBOTH
1164 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
1165 : tokio::select! {
1166 : biased;
1167 : _ = self.cancel.cancelled() => {
1168 : return Err(QueryError::Shutdown)
1169 : }
1170 : res = pgb.flush() => {
1171 : res?;
1172 : }
1173 : }
1174 :
1175 : let pgb_reader = pgb
1176 : .split()
1177 : .context("implementation error: split pgb into reader and writer")?;
1178 :
1179 : let timeline_handles = self
1180 : .timeline_handles
1181 : .take()
1182 : .expect("implementation error: timeline_handles should not be locked");
1183 :
1184 : let request_span = info_span!("request", shard_id = tracing::field::Empty);
1185 : let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
1186 : PageServicePipeliningConfig::Pipelined(pipelining_config) => {
1187 : self.handle_pagerequests_pipelined(
1188 : pgb,
1189 : pgb_reader,
1190 : tenant_id,
1191 : timeline_id,
1192 : timeline_handles,
1193 : request_span,
1194 : pipelining_config,
1195 : protocol_version,
1196 : &ctx,
1197 : )
1198 : .await
1199 : }
1200 : PageServicePipeliningConfig::Serial => {
1201 : self.handle_pagerequests_serial(
1202 : pgb,
1203 : pgb_reader,
1204 : tenant_id,
1205 : timeline_id,
1206 : timeline_handles,
1207 : request_span,
1208 : protocol_version,
1209 : &ctx,
1210 : )
1211 : .await
1212 : }
1213 : };
1214 :
1215 : debug!("pagestream subprotocol shut down cleanly");
1216 :
1217 : pgb.unsplit(pgb_reader)
1218 : .context("implementation error: unsplit pgb")?;
1219 :
1220 : let replaced = self.timeline_handles.replace(timeline_handles);
1221 : assert!(replaced.is_none());
1222 :
1223 : result
1224 : }
1225 :
1226 : #[allow(clippy::too_many_arguments)]
1227 0 : async fn handle_pagerequests_serial<IO>(
1228 0 : &mut self,
1229 0 : pgb_writer: &mut PostgresBackend<IO>,
1230 0 : mut pgb_reader: PostgresBackendReader<IO>,
1231 0 : tenant_id: TenantId,
1232 0 : timeline_id: TimelineId,
1233 0 : mut timeline_handles: TimelineHandles,
1234 0 : request_span: Span,
1235 0 : protocol_version: PagestreamProtocolVersion,
1236 0 : ctx: &RequestContext,
1237 0 : ) -> (
1238 0 : (PostgresBackendReader<IO>, TimelineHandles),
1239 0 : Result<(), QueryError>,
1240 0 : )
1241 0 : where
1242 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1243 0 : {
1244 0 : let cancel = self.cancel.clone();
1245 0 : let err = loop {
1246 0 : let msg = Self::pagestream_read_message(
1247 0 : &mut pgb_reader,
1248 0 : tenant_id,
1249 0 : timeline_id,
1250 0 : &mut timeline_handles,
1251 0 : &cancel,
1252 0 : ctx,
1253 0 : protocol_version,
1254 0 : request_span.clone(),
1255 0 : )
1256 0 : .await;
1257 0 : let msg = match msg {
1258 0 : Ok(msg) => msg,
1259 0 : Err(e) => break e,
1260 : };
1261 0 : let mut msg = match msg {
1262 0 : Some(msg) => msg,
1263 : None => {
1264 0 : debug!("pagestream subprotocol end observed");
1265 0 : return ((pgb_reader, timeline_handles), Ok(()));
1266 : }
1267 : };
1268 :
1269 0 : if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await {
1270 0 : break cancelled;
1271 0 : }
1272 :
1273 0 : let err = self
1274 0 : .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx)
1275 0 : .await;
1276 0 : match err {
1277 0 : Ok(()) => {}
1278 0 : Err(e) => break e,
1279 : }
1280 : };
1281 0 : ((pgb_reader, timeline_handles), Err(err))
1282 0 : }
1283 :
1284 : /// # Cancel-Safety
1285 : ///
1286 : /// May leak tokio tasks if not polled to completion.
1287 : #[allow(clippy::too_many_arguments)]
1288 0 : async fn handle_pagerequests_pipelined<IO>(
1289 0 : &mut self,
1290 0 : pgb_writer: &mut PostgresBackend<IO>,
1291 0 : pgb_reader: PostgresBackendReader<IO>,
1292 0 : tenant_id: TenantId,
1293 0 : timeline_id: TimelineId,
1294 0 : mut timeline_handles: TimelineHandles,
1295 0 : request_span: Span,
1296 0 : pipelining_config: PageServicePipeliningConfigPipelined,
1297 0 : protocol_version: PagestreamProtocolVersion,
1298 0 : ctx: &RequestContext,
1299 0 : ) -> (
1300 0 : (PostgresBackendReader<IO>, TimelineHandles),
1301 0 : Result<(), QueryError>,
1302 0 : )
1303 0 : where
1304 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
1305 0 : {
1306 0 : //
1307 0 : // Pipelined pagestream handling consists of
1308 0 : // - a Batcher that reads requests off the wire and
1309 0 : // and batches them if possible,
1310 0 : // - an Executor that processes the batched requests.
1311 0 : //
1312 0 : // The batch is built up inside an `spsc_fold` channel,
1313 0 : // shared betwen Batcher (Sender) and Executor (Receiver).
1314 0 : //
1315 0 : // The Batcher continously folds client requests into the batch,
1316 0 : // while the Executor can at any time take out what's in the batch
1317 0 : // in order to process it.
1318 0 : // This means the next batch builds up while the Executor
1319 0 : // executes the last batch.
1320 0 : //
1321 0 : // CANCELLATION
1322 0 : //
1323 0 : // We run both Batcher and Executor futures to completion before
1324 0 : // returning from this function.
1325 0 : //
1326 0 : // If Executor exits first, it signals cancellation to the Batcher
1327 0 : // via a CancellationToken that is child of `self.cancel`.
1328 0 : // If Batcher exits first, it signals cancellation to the Executor
1329 0 : // by dropping the spsc_fold channel Sender.
1330 0 : //
1331 0 : // CLEAN SHUTDOWN
1332 0 : //
1333 0 : // Clean shutdown means that the client ends the COPYBOTH session.
1334 0 : // In response to such a client message, the Batcher exits.
1335 0 : // The Executor continues to run, draining the spsc_fold channel.
1336 0 : // Once drained, the spsc_fold recv will fail with a distinct error
1337 0 : // indicating that the sender disconnected.
1338 0 : // The Executor exits with Ok(()) in response to that error.
1339 0 : //
1340 0 : // Server initiated shutdown is not clean shutdown, but instead
1341 0 : // is an error Err(QueryError::Shutdown) that is propagated through
1342 0 : // error propagation.
1343 0 : //
1344 0 : // ERROR PROPAGATION
1345 0 : //
1346 0 : // When the Batcher encounter an error, it sends it as a value
1347 0 : // through the spsc_fold channel and exits afterwards.
1348 0 : // When the Executor observes such an error in the channel,
1349 0 : // it exits returning that error value.
1350 0 : //
1351 0 : // This design ensures that the Executor stage will still process
1352 0 : // the batch that was in flight when the Batcher encountered an error,
1353 0 : // thereby beahving identical to a serial implementation.
1354 0 :
1355 0 : let PageServicePipeliningConfigPipelined {
1356 0 : max_batch_size,
1357 0 : execution,
1358 0 : } = pipelining_config;
1359 :
1360 : // Macro to _define_ a pipeline stage.
1361 : macro_rules! pipeline_stage {
1362 : ($name:literal, $cancel:expr, $make_fut:expr) => {{
1363 : let cancel: CancellationToken = $cancel;
1364 : let stage_fut = $make_fut(cancel.clone());
1365 0 : async move {
1366 0 : scopeguard::defer! {
1367 0 : debug!("exiting");
1368 0 : }
1369 0 : timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
1370 0 : .await
1371 0 : }
1372 : .instrument(tracing::info_span!($name))
1373 : }};
1374 : }
1375 :
1376 : //
1377 : // Batcher
1378 : //
1379 :
1380 0 : let cancel_batcher = self.cancel.child_token();
1381 0 : let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
1382 0 : let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
1383 0 : let ctx = ctx.attached_child();
1384 0 : async move {
1385 0 : let mut pgb_reader = pgb_reader;
1386 0 : let mut exit = false;
1387 0 : while !exit {
1388 0 : let read_res = Self::pagestream_read_message(
1389 0 : &mut pgb_reader,
1390 0 : tenant_id,
1391 0 : timeline_id,
1392 0 : &mut timeline_handles,
1393 0 : &cancel_batcher,
1394 0 : &ctx,
1395 0 : protocol_version,
1396 0 : request_span.clone(),
1397 0 : )
1398 0 : .await;
1399 0 : let Some(read_res) = read_res.transpose() else {
1400 0 : debug!("client-initiated shutdown");
1401 0 : break;
1402 : };
1403 0 : exit |= read_res.is_err();
1404 0 : let could_send = batch_tx
1405 0 : .send(read_res, |batch, res| {
1406 0 : Self::pagestream_do_batch(max_batch_size, batch, res)
1407 0 : })
1408 0 : .await;
1409 0 : exit |= could_send.is_err();
1410 : }
1411 0 : (pgb_reader, timeline_handles)
1412 0 : }
1413 0 : });
1414 :
1415 : //
1416 : // Executor
1417 : //
1418 :
1419 0 : let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
1420 0 : let ctx = ctx.attached_child();
1421 0 : async move {
1422 0 : let _cancel_batcher = cancel_batcher.drop_guard();
1423 : loop {
1424 0 : let maybe_batch = batch_rx.recv().await;
1425 0 : let batch = match maybe_batch {
1426 0 : Ok(batch) => batch,
1427 : Err(spsc_fold::RecvError::SenderGone) => {
1428 0 : debug!("upstream gone");
1429 0 : return Ok(());
1430 : }
1431 : };
1432 0 : let mut batch = match batch {
1433 0 : Ok(batch) => batch,
1434 0 : Err(e) => {
1435 0 : return Err(e);
1436 : }
1437 : };
1438 0 : batch
1439 0 : .throttle_and_record_start_processing(&self.cancel)
1440 0 : .await?;
1441 0 : self.pagesteam_handle_batched_message(
1442 0 : pgb_writer,
1443 0 : batch,
1444 0 : &cancel,
1445 0 : protocol_version,
1446 0 : &ctx,
1447 0 : )
1448 0 : .await?;
1449 : }
1450 0 : }
1451 0 : });
1452 :
1453 : //
1454 : // Execute the stages.
1455 : //
1456 :
1457 0 : match execution {
1458 : PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
1459 0 : tokio::join!(batcher, executor)
1460 : }
1461 : PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
1462 : // These tasks are not tracked anywhere.
1463 0 : let read_messages_task = tokio::spawn(batcher);
1464 0 : let (read_messages_task_res, executor_res_) =
1465 0 : tokio::join!(read_messages_task, executor,);
1466 0 : (
1467 0 : read_messages_task_res.expect("propagated panic from read_messages"),
1468 0 : executor_res_,
1469 0 : )
1470 : }
1471 : }
1472 0 : }
1473 :
1474 : /// Helper function to handle the LSN from client request.
1475 : ///
1476 : /// Each GetPage (and Exists and Nblocks) request includes information about
1477 : /// which version of the page is being requested. The primary compute node
1478 : /// will always request the latest page version, by setting 'request_lsn' to
1479 : /// the last inserted or flushed WAL position, while a standby will request
1480 : /// a version at the LSN that it's currently caught up to.
1481 : ///
1482 : /// In either case, if the page server hasn't received the WAL up to the
1483 : /// requested LSN yet, we will wait for it to arrive. The return value is
1484 : /// the LSN that should be used to look up the page versions.
1485 : ///
1486 : /// In addition to the request LSN, each request carries another LSN,
1487 : /// 'not_modified_since', which is a hint to the pageserver that the client
1488 : /// knows that the page has not been modified between 'not_modified_since'
1489 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1490 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1491 : /// information about when the page was modified, it will use
1492 : /// not_modified_since == lsn. If the client lies and sends a too low
1493 : /// not_modified_hint such that there are in fact later page versions, the
1494 : /// behavior is undefined: the pageserver may return any of the page versions
1495 : /// or an error.
1496 0 : async fn wait_or_get_last_lsn(
1497 0 : timeline: &Timeline,
1498 0 : request_lsn: Lsn,
1499 0 : not_modified_since: Lsn,
1500 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1501 0 : ctx: &RequestContext,
1502 0 : ) -> Result<Lsn, PageStreamError> {
1503 0 : let last_record_lsn = timeline.get_last_record_lsn();
1504 0 :
1505 0 : // Sanity check the request
1506 0 : if request_lsn < not_modified_since {
1507 0 : return Err(PageStreamError::BadRequest(
1508 0 : format!(
1509 0 : "invalid request with request LSN {} and not_modified_since {}",
1510 0 : request_lsn, not_modified_since,
1511 0 : )
1512 0 : .into(),
1513 0 : ));
1514 0 : }
1515 0 :
1516 0 : // Check explicitly for INVALID just to get a less scary error message if the request is obviously bogus
1517 0 : if request_lsn == Lsn::INVALID {
1518 0 : return Err(PageStreamError::BadRequest(
1519 0 : "invalid LSN(0) in request".into(),
1520 0 : ));
1521 0 : }
1522 0 :
1523 0 : // Clients should only read from recent LSNs on their timeline, or from locations holding an LSN lease.
1524 0 : //
1525 0 : // We may have older data available, but we make a best effort to detect this case and return an error,
1526 0 : // to distinguish a misbehaving client (asking for old LSN) from a storage issue (data missing at a legitimate LSN).
1527 0 : if request_lsn < **latest_gc_cutoff_lsn && !timeline.is_gc_blocked_by_lsn_lease_deadline() {
1528 0 : let gc_info = &timeline.gc_info.read().unwrap();
1529 0 : if !gc_info.leases.contains_key(&request_lsn) {
1530 0 : return Err(
1531 0 : PageStreamError::BadRequest(format!(
1532 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1533 0 : request_lsn, **latest_gc_cutoff_lsn
1534 0 : ).into())
1535 0 : );
1536 0 : }
1537 0 : }
1538 :
1539 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1540 0 : if not_modified_since > last_record_lsn {
1541 0 : timeline
1542 0 : .wait_lsn(
1543 0 : not_modified_since,
1544 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1545 0 : ctx,
1546 0 : )
1547 0 : .await?;
1548 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1549 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1550 : // advance immediately after we return anyway)
1551 0 : Ok(not_modified_since)
1552 : } else {
1553 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1554 : // here instead. That would give the same result, since we know that there
1555 : // haven't been any modifications since 'not_modified_since'. Using an older
1556 : // LSN might be faster, because that could allow skipping recent layers when
1557 : // finding the page. However, we have historically used 'last_record_lsn', so
1558 : // stick to that for now.
1559 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1560 : }
1561 0 : }
1562 :
1563 : /// Handles the lsn lease request.
1564 : /// If a lease cannot be obtained, the client will receive NULL.
1565 : #[instrument(skip_all, fields(shard_id, %lsn))]
1566 : async fn handle_make_lsn_lease<IO>(
1567 : &mut self,
1568 : pgb: &mut PostgresBackend<IO>,
1569 : tenant_shard_id: TenantShardId,
1570 : timeline_id: TimelineId,
1571 : lsn: Lsn,
1572 : ctx: &RequestContext,
1573 : ) -> Result<(), QueryError>
1574 : where
1575 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1576 : {
1577 : let timeline = self
1578 : .timeline_handles
1579 : .as_mut()
1580 : .unwrap()
1581 : .get(
1582 : tenant_shard_id.tenant_id,
1583 : timeline_id,
1584 : ShardSelector::Known(tenant_shard_id.to_index()),
1585 : )
1586 : .await?;
1587 : set_tracing_field_shard_id(&timeline);
1588 :
1589 : let lease = timeline
1590 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1591 0 : .inspect_err(|e| {
1592 0 : warn!("{e}");
1593 0 : })
1594 : .ok();
1595 0 : let valid_until_str = lease.map(|l| {
1596 0 : l.valid_until
1597 0 : .duration_since(SystemTime::UNIX_EPOCH)
1598 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1599 0 : .as_millis()
1600 0 : .to_string()
1601 0 : });
1602 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1603 :
1604 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1605 : b"valid_until",
1606 : )]))?
1607 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1608 :
1609 : Ok(())
1610 : }
1611 :
1612 : #[instrument(skip_all, fields(shard_id))]
1613 : async fn handle_get_rel_exists_request(
1614 : &mut self,
1615 : timeline: &Timeline,
1616 : req: &PagestreamExistsRequest,
1617 : ctx: &RequestContext,
1618 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1619 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1620 : let lsn = Self::wait_or_get_last_lsn(
1621 : timeline,
1622 : req.hdr.request_lsn,
1623 : req.hdr.not_modified_since,
1624 : &latest_gc_cutoff_lsn,
1625 : ctx,
1626 : )
1627 : .await?;
1628 :
1629 : let exists = timeline
1630 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1631 : .await?;
1632 :
1633 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1634 : req: *req,
1635 : exists,
1636 : }))
1637 : }
1638 :
1639 : #[instrument(skip_all, fields(shard_id))]
1640 : async fn handle_get_nblocks_request(
1641 : &mut self,
1642 : timeline: &Timeline,
1643 : req: &PagestreamNblocksRequest,
1644 : ctx: &RequestContext,
1645 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1646 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1647 : let lsn = Self::wait_or_get_last_lsn(
1648 : timeline,
1649 : req.hdr.request_lsn,
1650 : req.hdr.not_modified_since,
1651 : &latest_gc_cutoff_lsn,
1652 : ctx,
1653 : )
1654 : .await?;
1655 :
1656 : let n_blocks = timeline
1657 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1658 : .await?;
1659 :
1660 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1661 : req: *req,
1662 : n_blocks,
1663 : }))
1664 : }
1665 :
1666 : #[instrument(skip_all, fields(shard_id))]
1667 : async fn handle_db_size_request(
1668 : &mut self,
1669 : timeline: &Timeline,
1670 : req: &PagestreamDbSizeRequest,
1671 : ctx: &RequestContext,
1672 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1673 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1674 : let lsn = Self::wait_or_get_last_lsn(
1675 : timeline,
1676 : req.hdr.request_lsn,
1677 : req.hdr.not_modified_since,
1678 : &latest_gc_cutoff_lsn,
1679 : ctx,
1680 : )
1681 : .await?;
1682 :
1683 : let total_blocks = timeline
1684 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1685 : .await?;
1686 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1687 :
1688 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1689 : req: *req,
1690 : db_size,
1691 : }))
1692 : }
1693 :
1694 : #[instrument(skip_all)]
1695 : async fn handle_get_page_at_lsn_request_batched(
1696 : &mut self,
1697 : timeline: &Timeline,
1698 : effective_lsn: Lsn,
1699 : requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
1700 : ctx: &RequestContext,
1701 : ) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
1702 : debug_assert_current_span_has_tenant_and_timeline_id();
1703 :
1704 : timeline
1705 : .query_metrics
1706 : .observe_getpage_batch_start(requests.len());
1707 :
1708 : let results = timeline
1709 : .get_rel_page_at_lsn_batched(
1710 0 : requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
1711 : effective_lsn,
1712 : ctx,
1713 : )
1714 : .await;
1715 : assert_eq!(results.len(), requests.len());
1716 :
1717 : // TODO: avoid creating the new Vec here
1718 : Vec::from_iter(
1719 : requests
1720 : .into_iter()
1721 : .zip(results.into_iter())
1722 0 : .map(|(req, res)| {
1723 0 : res.map(|page| {
1724 0 : (
1725 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
1726 0 : req: req.req,
1727 0 : page,
1728 0 : }),
1729 0 : req.timer,
1730 0 : )
1731 0 : })
1732 0 : .map_err(|e| BatchedPageStreamError {
1733 0 : err: PageStreamError::from(e),
1734 0 : req: req.req.hdr,
1735 0 : })
1736 0 : }),
1737 : )
1738 : }
1739 :
1740 : #[instrument(skip_all, fields(shard_id))]
1741 : async fn handle_get_slru_segment_request(
1742 : &mut self,
1743 : timeline: &Timeline,
1744 : req: &PagestreamGetSlruSegmentRequest,
1745 : ctx: &RequestContext,
1746 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1747 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1748 : let lsn = Self::wait_or_get_last_lsn(
1749 : timeline,
1750 : req.hdr.request_lsn,
1751 : req.hdr.not_modified_since,
1752 : &latest_gc_cutoff_lsn,
1753 : ctx,
1754 : )
1755 : .await?;
1756 :
1757 : let kind = SlruKind::from_repr(req.kind)
1758 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1759 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1760 :
1761 : Ok(PagestreamBeMessage::GetSlruSegment(
1762 : PagestreamGetSlruSegmentResponse { req: *req, segment },
1763 : ))
1764 : }
1765 :
1766 : /// Note on "fullbackup":
1767 : /// Full basebackups should only be used for debugging purposes.
1768 : /// Originally, it was introduced to enable breaking storage format changes,
1769 : /// but that is not applicable anymore.
1770 : ///
1771 : /// # Coding Discipline
1772 : ///
1773 : /// Coding discipline within this function: all interaction with the `pgb` connection
1774 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1775 : /// This is so that we can shutdown page_service quickly.
1776 : ///
1777 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1778 : /// to connection cancellation.
1779 : #[allow(clippy::too_many_arguments)]
1780 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1781 : async fn handle_basebackup_request<IO>(
1782 : &mut self,
1783 : pgb: &mut PostgresBackend<IO>,
1784 : tenant_id: TenantId,
1785 : timeline_id: TimelineId,
1786 : lsn: Option<Lsn>,
1787 : prev_lsn: Option<Lsn>,
1788 : full_backup: bool,
1789 : gzip: bool,
1790 : replica: bool,
1791 : ctx: &RequestContext,
1792 : ) -> Result<(), QueryError>
1793 : where
1794 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1795 : {
1796 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1797 0 : match err {
1798 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1799 0 : BasebackupError::Server(e) => QueryError::Other(e),
1800 : }
1801 0 : }
1802 :
1803 : let started = std::time::Instant::now();
1804 :
1805 : let timeline = self
1806 : .timeline_handles
1807 : .as_mut()
1808 : .unwrap()
1809 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1810 : .await?;
1811 :
1812 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1813 : if let Some(lsn) = lsn {
1814 : // Backup was requested at a particular LSN. Wait for it to arrive.
1815 : info!("waiting for {}", lsn);
1816 : timeline
1817 : .wait_lsn(
1818 : lsn,
1819 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1820 : ctx,
1821 : )
1822 : .await?;
1823 : timeline
1824 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1825 : .context("invalid basebackup lsn")?;
1826 : }
1827 :
1828 : let lsn_awaited_after = started.elapsed();
1829 :
1830 : // switch client to COPYOUT
1831 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1832 : .map_err(QueryError::Disconnected)?;
1833 : self.flush_cancellable(pgb, &self.cancel).await?;
1834 :
1835 : // Send a tarball of the latest layer on the timeline. Compress if not
1836 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1837 : if full_backup {
1838 : let mut writer = pgb.copyout_writer();
1839 : basebackup::send_basebackup_tarball(
1840 : &mut writer,
1841 : &timeline,
1842 : lsn,
1843 : prev_lsn,
1844 : full_backup,
1845 : replica,
1846 : ctx,
1847 : )
1848 : .await
1849 : .map_err(map_basebackup_error)?;
1850 : } else {
1851 : let mut writer = BufWriter::new(pgb.copyout_writer());
1852 : if gzip {
1853 : let mut encoder = GzipEncoder::with_quality(
1854 : &mut writer,
1855 : // NOTE using fast compression because it's on the critical path
1856 : // for compute startup. For an empty database, we get
1857 : // <100KB with this method. The Level::Best compression method
1858 : // gives us <20KB, but maybe we should add basebackup caching
1859 : // on compute shutdown first.
1860 : async_compression::Level::Fastest,
1861 : );
1862 : basebackup::send_basebackup_tarball(
1863 : &mut encoder,
1864 : &timeline,
1865 : lsn,
1866 : prev_lsn,
1867 : full_backup,
1868 : replica,
1869 : ctx,
1870 : )
1871 : .await
1872 : .map_err(map_basebackup_error)?;
1873 : // shutdown the encoder to ensure the gzip footer is written
1874 : encoder
1875 : .shutdown()
1876 : .await
1877 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1878 : } else {
1879 : basebackup::send_basebackup_tarball(
1880 : &mut writer,
1881 : &timeline,
1882 : lsn,
1883 : prev_lsn,
1884 : full_backup,
1885 : replica,
1886 : ctx,
1887 : )
1888 : .await
1889 : .map_err(map_basebackup_error)?;
1890 : }
1891 : writer
1892 : .flush()
1893 : .await
1894 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
1895 : }
1896 :
1897 : pgb.write_message_noflush(&BeMessage::CopyDone)
1898 : .map_err(QueryError::Disconnected)?;
1899 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1900 :
1901 : let basebackup_after = started
1902 : .elapsed()
1903 : .checked_sub(lsn_awaited_after)
1904 : .unwrap_or(Duration::ZERO);
1905 :
1906 : info!(
1907 : lsn_await_millis = lsn_awaited_after.as_millis(),
1908 : basebackup_millis = basebackup_after.as_millis(),
1909 : "basebackup complete"
1910 : );
1911 :
1912 : Ok(())
1913 : }
1914 :
1915 : // when accessing management api supply None as an argument
1916 : // when using to authorize tenant pass corresponding tenant id
1917 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1918 0 : if self.auth.is_none() {
1919 : // auth is set to Trust, nothing to check so just return ok
1920 0 : return Ok(());
1921 0 : }
1922 0 : // auth is some, just checked above, when auth is some
1923 0 : // then claims are always present because of checks during connection init
1924 0 : // so this expect won't trigger
1925 0 : let claims = self
1926 0 : .claims
1927 0 : .as_ref()
1928 0 : .expect("claims presence already checked");
1929 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1930 0 : }
1931 : }
1932 :
1933 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
1934 : #[derive(Debug, Clone, Eq, PartialEq)]
1935 : struct BaseBackupCmd {
1936 : tenant_id: TenantId,
1937 : timeline_id: TimelineId,
1938 : lsn: Option<Lsn>,
1939 : gzip: bool,
1940 : replica: bool,
1941 : }
1942 :
1943 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
1944 : #[derive(Debug, Clone, Eq, PartialEq)]
1945 : struct FullBackupCmd {
1946 : tenant_id: TenantId,
1947 : timeline_id: TimelineId,
1948 : lsn: Option<Lsn>,
1949 : prev_lsn: Option<Lsn>,
1950 : }
1951 :
1952 : /// `pagestream_v2 tenant timeline`
1953 : #[derive(Debug, Clone, Eq, PartialEq)]
1954 : struct PageStreamCmd {
1955 : tenant_id: TenantId,
1956 : timeline_id: TimelineId,
1957 : protocol_version: PagestreamProtocolVersion,
1958 : }
1959 :
1960 : /// `lease lsn tenant timeline lsn`
1961 : #[derive(Debug, Clone, Eq, PartialEq)]
1962 : struct LeaseLsnCmd {
1963 : tenant_shard_id: TenantShardId,
1964 : timeline_id: TimelineId,
1965 : lsn: Lsn,
1966 : }
1967 :
1968 : #[derive(Debug, Clone, Eq, PartialEq)]
1969 : enum PageServiceCmd {
1970 : Set,
1971 : PageStream(PageStreamCmd),
1972 : BaseBackup(BaseBackupCmd),
1973 : FullBackup(FullBackupCmd),
1974 : LeaseLsn(LeaseLsnCmd),
1975 : }
1976 :
1977 : impl PageStreamCmd {
1978 6 : fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result<Self> {
1979 6 : let parameters = query.split_whitespace().collect_vec();
1980 6 : if parameters.len() != 2 {
1981 2 : bail!(
1982 2 : "invalid number of parameters for pagestream command: {}",
1983 2 : query
1984 2 : );
1985 4 : }
1986 4 : let tenant_id = TenantId::from_str(parameters[0])
1987 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1988 2 : let timeline_id = TimelineId::from_str(parameters[1])
1989 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1990 2 : Ok(Self {
1991 2 : tenant_id,
1992 2 : timeline_id,
1993 2 : protocol_version,
1994 2 : })
1995 6 : }
1996 : }
1997 :
1998 : impl FullBackupCmd {
1999 4 : fn parse(query: &str) -> anyhow::Result<Self> {
2000 4 : let parameters = query.split_whitespace().collect_vec();
2001 4 : if parameters.len() < 2 || parameters.len() > 4 {
2002 0 : bail!(
2003 0 : "invalid number of parameters for basebackup command: {}",
2004 0 : query
2005 0 : );
2006 4 : }
2007 4 : let tenant_id = TenantId::from_str(parameters[0])
2008 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2009 4 : let timeline_id = TimelineId::from_str(parameters[1])
2010 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2011 : // The caller is responsible for providing correct lsn and prev_lsn.
2012 4 : let lsn = if let Some(lsn_str) = parameters.get(2) {
2013 : Some(
2014 2 : Lsn::from_str(lsn_str)
2015 2 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
2016 : )
2017 : } else {
2018 2 : None
2019 : };
2020 4 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
2021 : Some(
2022 2 : Lsn::from_str(prev_lsn_str)
2023 2 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
2024 : )
2025 : } else {
2026 2 : None
2027 : };
2028 4 : Ok(Self {
2029 4 : tenant_id,
2030 4 : timeline_id,
2031 4 : lsn,
2032 4 : prev_lsn,
2033 4 : })
2034 4 : }
2035 : }
2036 :
2037 : impl BaseBackupCmd {
2038 18 : fn parse(query: &str) -> anyhow::Result<Self> {
2039 18 : let parameters = query.split_whitespace().collect_vec();
2040 18 : if parameters.len() < 2 {
2041 0 : bail!(
2042 0 : "invalid number of parameters for basebackup command: {}",
2043 0 : query
2044 0 : );
2045 18 : }
2046 18 : let tenant_id = TenantId::from_str(parameters[0])
2047 18 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2048 18 : let timeline_id = TimelineId::from_str(parameters[1])
2049 18 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2050 : let lsn;
2051 : let flags_parse_from;
2052 18 : if let Some(maybe_lsn) = parameters.get(2) {
2053 16 : if *maybe_lsn == "latest" {
2054 2 : lsn = None;
2055 2 : flags_parse_from = 3;
2056 14 : } else if maybe_lsn.starts_with("--") {
2057 10 : lsn = None;
2058 10 : flags_parse_from = 2;
2059 10 : } else {
2060 : lsn = Some(
2061 4 : Lsn::from_str(maybe_lsn)
2062 4 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
2063 : );
2064 4 : flags_parse_from = 3;
2065 : }
2066 2 : } else {
2067 2 : lsn = None;
2068 2 : flags_parse_from = 2;
2069 2 : }
2070 :
2071 18 : let mut gzip = false;
2072 18 : let mut replica = false;
2073 :
2074 22 : for ¶m in ¶meters[flags_parse_from..] {
2075 22 : match param {
2076 22 : "--gzip" => {
2077 14 : if gzip {
2078 2 : bail!("duplicate parameter for basebackup command: {param}")
2079 12 : }
2080 12 : gzip = true
2081 : }
2082 8 : "--replica" => {
2083 4 : if replica {
2084 0 : bail!("duplicate parameter for basebackup command: {param}")
2085 4 : }
2086 4 : replica = true
2087 : }
2088 4 : _ => bail!("invalid parameter for basebackup command: {param}"),
2089 : }
2090 : }
2091 12 : Ok(Self {
2092 12 : tenant_id,
2093 12 : timeline_id,
2094 12 : lsn,
2095 12 : gzip,
2096 12 : replica,
2097 12 : })
2098 18 : }
2099 : }
2100 :
2101 : impl LeaseLsnCmd {
2102 4 : fn parse(query: &str) -> anyhow::Result<Self> {
2103 4 : let parameters = query.split_whitespace().collect_vec();
2104 4 : if parameters.len() != 3 {
2105 0 : bail!(
2106 0 : "invalid number of parameters for lease lsn command: {}",
2107 0 : query
2108 0 : );
2109 4 : }
2110 4 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
2111 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
2112 4 : let timeline_id = TimelineId::from_str(parameters[1])
2113 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
2114 4 : let lsn = Lsn::from_str(parameters[2])
2115 4 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
2116 4 : Ok(Self {
2117 4 : tenant_shard_id,
2118 4 : timeline_id,
2119 4 : lsn,
2120 4 : })
2121 4 : }
2122 : }
2123 :
2124 : impl PageServiceCmd {
2125 42 : fn parse(query: &str) -> anyhow::Result<Self> {
2126 42 : let query = query.trim();
2127 42 : let Some((cmd, other)) = query.split_once(' ') else {
2128 4 : bail!("cannot parse query: {query}")
2129 : };
2130 38 : match cmd.to_ascii_lowercase().as_str() {
2131 38 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(
2132 6 : other,
2133 6 : PagestreamProtocolVersion::V2,
2134 6 : )?)),
2135 32 : "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse(
2136 0 : other,
2137 0 : PagestreamProtocolVersion::V3,
2138 0 : )?)),
2139 32 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
2140 14 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
2141 10 : "lease" => {
2142 6 : let Some((cmd2, other)) = other.split_once(' ') else {
2143 0 : bail!("invalid lease command: {cmd}");
2144 : };
2145 6 : let cmd2 = cmd2.to_ascii_lowercase();
2146 6 : if cmd2 == "lsn" {
2147 4 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
2148 : } else {
2149 2 : bail!("invalid lease command: {cmd}");
2150 : }
2151 : }
2152 4 : "set" => Ok(Self::Set),
2153 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
2154 : }
2155 42 : }
2156 : }
2157 :
2158 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
2159 : where
2160 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
2161 : {
2162 0 : fn check_auth_jwt(
2163 0 : &mut self,
2164 0 : _pgb: &mut PostgresBackend<IO>,
2165 0 : jwt_response: &[u8],
2166 0 : ) -> Result<(), QueryError> {
2167 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
2168 : // which requires auth to be present
2169 0 : let data = self
2170 0 : .auth
2171 0 : .as_ref()
2172 0 : .unwrap()
2173 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
2174 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
2175 :
2176 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
2177 0 : return Err(QueryError::Unauthorized(
2178 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
2179 0 : ));
2180 0 : }
2181 0 :
2182 0 : debug!(
2183 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
2184 : data.claims.scope, data.claims.tenant_id,
2185 : );
2186 :
2187 0 : self.claims = Some(data.claims);
2188 0 : Ok(())
2189 0 : }
2190 :
2191 0 : fn startup(
2192 0 : &mut self,
2193 0 : _pgb: &mut PostgresBackend<IO>,
2194 0 : _sm: &FeStartupPacket,
2195 0 : ) -> Result<(), QueryError> {
2196 0 : fail::fail_point!("ps::connection-start::startup-packet");
2197 0 : Ok(())
2198 0 : }
2199 :
2200 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
2201 : async fn process_query(
2202 : &mut self,
2203 : pgb: &mut PostgresBackend<IO>,
2204 : query_string: &str,
2205 : ) -> Result<(), QueryError> {
2206 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
2207 0 : info!("Hit failpoint for bad connection");
2208 0 : Err(QueryError::SimulatedConnectionError)
2209 0 : });
2210 :
2211 : fail::fail_point!("ps::connection-start::process-query");
2212 :
2213 : let ctx = self.connection_ctx.attached_child();
2214 : debug!("process query {query_string}");
2215 : let query = PageServiceCmd::parse(query_string)?;
2216 : match query {
2217 : PageServiceCmd::PageStream(PageStreamCmd {
2218 : tenant_id,
2219 : timeline_id,
2220 : protocol_version,
2221 : }) => {
2222 : tracing::Span::current()
2223 : .record("tenant_id", field::display(tenant_id))
2224 : .record("timeline_id", field::display(timeline_id));
2225 :
2226 : self.check_permission(Some(tenant_id))?;
2227 : let command_kind = match protocol_version {
2228 : PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2,
2229 : PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3,
2230 : };
2231 : COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc();
2232 :
2233 : self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx)
2234 : .await?;
2235 : }
2236 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2237 : tenant_id,
2238 : timeline_id,
2239 : lsn,
2240 : gzip,
2241 : replica,
2242 : }) => {
2243 : tracing::Span::current()
2244 : .record("tenant_id", field::display(tenant_id))
2245 : .record("timeline_id", field::display(timeline_id));
2246 :
2247 : self.check_permission(Some(tenant_id))?;
2248 :
2249 : COMPUTE_COMMANDS_COUNTERS
2250 : .for_command(ComputeCommandKind::Basebackup)
2251 : .inc();
2252 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording();
2253 0 : let res = async {
2254 0 : self.handle_basebackup_request(
2255 0 : pgb,
2256 0 : tenant_id,
2257 0 : timeline_id,
2258 0 : lsn,
2259 0 : None,
2260 0 : false,
2261 0 : gzip,
2262 0 : replica,
2263 0 : &ctx,
2264 0 : )
2265 0 : .await?;
2266 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2267 0 : Result::<(), QueryError>::Ok(())
2268 0 : }
2269 : .await;
2270 : metric_recording.observe(&res);
2271 : res?;
2272 : }
2273 : // same as basebackup, but result includes relational data as well
2274 : PageServiceCmd::FullBackup(FullBackupCmd {
2275 : tenant_id,
2276 : timeline_id,
2277 : lsn,
2278 : prev_lsn,
2279 : }) => {
2280 : tracing::Span::current()
2281 : .record("tenant_id", field::display(tenant_id))
2282 : .record("timeline_id", field::display(timeline_id));
2283 :
2284 : self.check_permission(Some(tenant_id))?;
2285 :
2286 : COMPUTE_COMMANDS_COUNTERS
2287 : .for_command(ComputeCommandKind::Fullbackup)
2288 : .inc();
2289 :
2290 : // Check that the timeline exists
2291 : self.handle_basebackup_request(
2292 : pgb,
2293 : tenant_id,
2294 : timeline_id,
2295 : lsn,
2296 : prev_lsn,
2297 : true,
2298 : false,
2299 : false,
2300 : &ctx,
2301 : )
2302 : .await?;
2303 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2304 : }
2305 : PageServiceCmd::Set => {
2306 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
2307 : // on connect
2308 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
2309 : }
2310 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2311 : tenant_shard_id,
2312 : timeline_id,
2313 : lsn,
2314 : }) => {
2315 : tracing::Span::current()
2316 : .record("tenant_id", field::display(tenant_shard_id))
2317 : .record("timeline_id", field::display(timeline_id));
2318 :
2319 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
2320 :
2321 : COMPUTE_COMMANDS_COUNTERS
2322 : .for_command(ComputeCommandKind::LeaseLsn)
2323 : .inc();
2324 :
2325 : match self
2326 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
2327 : .await
2328 : {
2329 : Ok(()) => {
2330 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
2331 : }
2332 : Err(e) => {
2333 : error!("error obtaining lsn lease for {lsn}: {e:?}");
2334 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
2335 : &e.to_string(),
2336 : Some(e.pg_error_code()),
2337 : ))?
2338 : }
2339 : };
2340 : }
2341 : }
2342 :
2343 : Ok(())
2344 : }
2345 : }
2346 :
2347 : impl From<GetActiveTenantError> for QueryError {
2348 0 : fn from(e: GetActiveTenantError) -> Self {
2349 0 : match e {
2350 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
2351 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
2352 0 : ),
2353 : GetActiveTenantError::Cancelled
2354 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
2355 0 : QueryError::Shutdown
2356 : }
2357 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
2358 0 : e => QueryError::Other(anyhow::anyhow!(e)),
2359 : }
2360 0 : }
2361 : }
2362 :
2363 : #[derive(Debug, thiserror::Error)]
2364 : pub(crate) enum GetActiveTimelineError {
2365 : #[error(transparent)]
2366 : Tenant(GetActiveTenantError),
2367 : #[error(transparent)]
2368 : Timeline(#[from] GetTimelineError),
2369 : }
2370 :
2371 : impl From<GetActiveTimelineError> for QueryError {
2372 0 : fn from(e: GetActiveTimelineError) -> Self {
2373 0 : match e {
2374 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
2375 0 : GetActiveTimelineError::Tenant(e) => e.into(),
2376 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
2377 : }
2378 0 : }
2379 : }
2380 :
2381 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
2382 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
2383 0 : tracing::Span::current().record(
2384 0 : "shard_id",
2385 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
2386 0 : );
2387 0 : debug_assert_current_span_has_tenant_and_timeline_id();
2388 0 : }
2389 :
2390 : struct WaitedForLsn(Lsn);
2391 : impl From<WaitedForLsn> for Lsn {
2392 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
2393 0 : lsn
2394 0 : }
2395 : }
2396 :
2397 : #[cfg(test)]
2398 : mod tests {
2399 : use utils::shard::ShardCount;
2400 :
2401 : use super::*;
2402 :
2403 : #[test]
2404 2 : fn pageservice_cmd_parse() {
2405 2 : let tenant_id = TenantId::generate();
2406 2 : let timeline_id = TimelineId::generate();
2407 2 : let cmd =
2408 2 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2409 2 : assert_eq!(
2410 2 : cmd,
2411 2 : PageServiceCmd::PageStream(PageStreamCmd {
2412 2 : tenant_id,
2413 2 : timeline_id,
2414 2 : protocol_version: PagestreamProtocolVersion::V2,
2415 2 : })
2416 2 : );
2417 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2418 2 : assert_eq!(
2419 2 : cmd,
2420 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2421 2 : tenant_id,
2422 2 : timeline_id,
2423 2 : lsn: None,
2424 2 : gzip: false,
2425 2 : replica: false
2426 2 : })
2427 2 : );
2428 2 : let cmd =
2429 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2430 2 : assert_eq!(
2431 2 : cmd,
2432 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2433 2 : tenant_id,
2434 2 : timeline_id,
2435 2 : lsn: None,
2436 2 : gzip: true,
2437 2 : replica: false
2438 2 : })
2439 2 : );
2440 2 : let cmd =
2441 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2442 2 : assert_eq!(
2443 2 : cmd,
2444 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2445 2 : tenant_id,
2446 2 : timeline_id,
2447 2 : lsn: None,
2448 2 : gzip: false,
2449 2 : replica: false
2450 2 : })
2451 2 : );
2452 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2453 2 : .unwrap();
2454 2 : assert_eq!(
2455 2 : cmd,
2456 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2457 2 : tenant_id,
2458 2 : timeline_id,
2459 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2460 2 : gzip: false,
2461 2 : replica: false
2462 2 : })
2463 2 : );
2464 2 : let cmd = PageServiceCmd::parse(&format!(
2465 2 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2466 2 : ))
2467 2 : .unwrap();
2468 2 : assert_eq!(
2469 2 : cmd,
2470 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2471 2 : tenant_id,
2472 2 : timeline_id,
2473 2 : lsn: None,
2474 2 : gzip: true,
2475 2 : replica: true
2476 2 : })
2477 2 : );
2478 2 : let cmd = PageServiceCmd::parse(&format!(
2479 2 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2480 2 : ))
2481 2 : .unwrap();
2482 2 : assert_eq!(
2483 2 : cmd,
2484 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2485 2 : tenant_id,
2486 2 : timeline_id,
2487 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2488 2 : gzip: true,
2489 2 : replica: true
2490 2 : })
2491 2 : );
2492 2 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2493 2 : assert_eq!(
2494 2 : cmd,
2495 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2496 2 : tenant_id,
2497 2 : timeline_id,
2498 2 : lsn: None,
2499 2 : prev_lsn: None
2500 2 : })
2501 2 : );
2502 2 : let cmd = PageServiceCmd::parse(&format!(
2503 2 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2504 2 : ))
2505 2 : .unwrap();
2506 2 : assert_eq!(
2507 2 : cmd,
2508 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2509 2 : tenant_id,
2510 2 : timeline_id,
2511 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2512 2 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2513 2 : })
2514 2 : );
2515 2 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2516 2 : let cmd = PageServiceCmd::parse(&format!(
2517 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2518 2 : ))
2519 2 : .unwrap();
2520 2 : assert_eq!(
2521 2 : cmd,
2522 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2523 2 : tenant_shard_id,
2524 2 : timeline_id,
2525 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2526 2 : })
2527 2 : );
2528 2 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2529 2 : let cmd = PageServiceCmd::parse(&format!(
2530 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2531 2 : ))
2532 2 : .unwrap();
2533 2 : assert_eq!(
2534 2 : cmd,
2535 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2536 2 : tenant_shard_id,
2537 2 : timeline_id,
2538 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2539 2 : })
2540 2 : );
2541 2 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2542 2 : assert_eq!(cmd, PageServiceCmd::Set);
2543 2 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2544 2 : assert_eq!(cmd, PageServiceCmd::Set);
2545 2 : }
2546 :
2547 : #[test]
2548 2 : fn pageservice_cmd_err_handling() {
2549 2 : let tenant_id = TenantId::generate();
2550 2 : let timeline_id = TimelineId::generate();
2551 2 : let cmd = PageServiceCmd::parse("unknown_command");
2552 2 : assert!(cmd.is_err());
2553 2 : let cmd = PageServiceCmd::parse("pagestream_v2");
2554 2 : assert!(cmd.is_err());
2555 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2556 2 : assert!(cmd.is_err());
2557 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2558 2 : assert!(cmd.is_err());
2559 2 : let cmd = PageServiceCmd::parse(&format!(
2560 2 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2561 2 : ));
2562 2 : assert!(cmd.is_err());
2563 2 : let cmd = PageServiceCmd::parse(&format!(
2564 2 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2565 2 : ));
2566 2 : assert!(cmd.is_err());
2567 2 : let cmd = PageServiceCmd::parse(&format!(
2568 2 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2569 2 : ));
2570 2 : assert!(cmd.is_err());
2571 2 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2572 2 : assert!(cmd.is_err());
2573 2 : }
2574 : }
|