Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use futures::FutureExt;
8 : use itertools::Itertools;
9 : use once_cell::sync::OnceCell;
10 : use pageserver_api::models::TenantState;
11 : use pageserver_api::models::{
12 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
13 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
14 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
15 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
16 : PagestreamNblocksResponse, PagestreamProtocolVersion,
17 : };
18 : use pageserver_api::shard::TenantShardId;
19 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
20 : use pq_proto::framed::ConnectionError;
21 : use pq_proto::FeStartupPacket;
22 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
23 : use std::borrow::Cow;
24 : use std::io;
25 : use std::str;
26 : use std::str::FromStr;
27 : use std::sync::Arc;
28 : use std::time::SystemTime;
29 : use std::time::{Duration, Instant};
30 : use tokio::io::{AsyncRead, AsyncWrite};
31 : use tokio::io::{AsyncWriteExt, BufWriter};
32 : use tokio::task::JoinHandle;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::*;
35 : use utils::{
36 : auth::{Claims, Scope, SwappableJwtAuth},
37 : id::{TenantId, TimelineId},
38 : lsn::Lsn,
39 : simple_rcu::RcuReadGuard,
40 : };
41 :
42 : use crate::auth::check_permission;
43 : use crate::basebackup;
44 : use crate::basebackup::BasebackupError;
45 : use crate::config::PageServerConf;
46 : use crate::context::{DownloadBehavior, RequestContext};
47 : use crate::metrics;
48 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
49 : use crate::pgdatadir_mapping::Version;
50 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
51 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
52 : use crate::task_mgr::TaskKind;
53 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
54 : use crate::tenant::mgr::ShardSelector;
55 : use crate::tenant::mgr::TenantManager;
56 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
57 : use crate::tenant::timeline::{self, WaitLsnError};
58 : use crate::tenant::GetTimelineError;
59 : use crate::tenant::PageReconstructError;
60 : use crate::tenant::Timeline;
61 : use pageserver_api::key::rel_block_to_key;
62 : use pageserver_api::reltag::SlruKind;
63 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
64 : use postgres_ffi::BLCKSZ;
65 :
66 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
67 : /// is not yet in state [`TenantState::Active`].
68 : ///
69 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
70 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
71 :
72 : ///////////////////////////////////////////////////////////////////////////////
73 :
74 : pub struct Listener {
75 : cancel: CancellationToken,
76 : /// Cancel the listener task through `listen_cancel` to shut down the listener
77 : /// and get a handle on the existing connections.
78 : task: JoinHandle<Connections>,
79 : }
80 :
81 : pub struct Connections {
82 : cancel: CancellationToken,
83 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
84 : }
85 :
86 0 : pub fn spawn(
87 0 : conf: &'static PageServerConf,
88 0 : tenant_manager: Arc<TenantManager>,
89 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
90 0 : tcp_listener: tokio::net::TcpListener,
91 0 : ) -> Listener {
92 0 : let cancel = CancellationToken::new();
93 0 : let libpq_ctx = RequestContext::todo_child(
94 0 : TaskKind::LibpqEndpointListener,
95 0 : // listener task shouldn't need to download anything. (We will
96 0 : // create a separate sub-contexts for each connection, with their
97 0 : // own download behavior. This context is used only to listen and
98 0 : // accept connections.)
99 0 : DownloadBehavior::Error,
100 0 : );
101 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
102 0 : "libpq listener",
103 0 : libpq_listener_main(
104 0 : tenant_manager,
105 0 : pg_auth,
106 0 : tcp_listener,
107 0 : conf.pg_auth_type,
108 0 : libpq_ctx,
109 0 : cancel.clone(),
110 0 : )
111 0 : .map(anyhow::Ok),
112 0 : ));
113 0 :
114 0 : Listener { cancel, task }
115 0 : }
116 :
117 : impl Listener {
118 0 : pub async fn stop_accepting(self) -> Connections {
119 0 : self.cancel.cancel();
120 0 : self.task
121 0 : .await
122 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
123 0 : }
124 : }
125 : impl Connections {
126 0 : pub(crate) async fn shutdown(self) {
127 0 : let Self { cancel, mut tasks } = self;
128 0 : cancel.cancel();
129 0 : while let Some(res) = tasks.join_next().await {
130 0 : Self::handle_connection_completion(res);
131 0 : }
132 0 : }
133 :
134 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
135 0 : match res {
136 0 : Ok(Ok(())) => {}
137 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
138 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
139 : }
140 0 : }
141 : }
142 :
143 : ///
144 : /// Main loop of the page service.
145 : ///
146 : /// Listens for connections, and launches a new handler task for each.
147 : ///
148 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
149 : /// open connections.
150 : ///
151 0 : pub async fn libpq_listener_main(
152 0 : tenant_manager: Arc<TenantManager>,
153 0 : auth: Option<Arc<SwappableJwtAuth>>,
154 0 : listener: tokio::net::TcpListener,
155 0 : auth_type: AuthType,
156 0 : listener_ctx: RequestContext,
157 0 : listener_cancel: CancellationToken,
158 0 : ) -> Connections {
159 0 : let connections_cancel = CancellationToken::new();
160 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
161 :
162 : loop {
163 0 : let accepted = tokio::select! {
164 : biased;
165 0 : _ = listener_cancel.cancelled() => break,
166 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
167 0 : let res = next.expect("we dont poll while empty");
168 0 : Connections::handle_connection_completion(res);
169 0 : continue;
170 : }
171 0 : accepted = listener.accept() => accepted,
172 0 : };
173 0 :
174 0 : match accepted {
175 0 : Ok((socket, peer_addr)) => {
176 0 : // Connection established. Spawn a new task to handle it.
177 0 : debug!("accepted connection from {}", peer_addr);
178 0 : let local_auth = auth.clone();
179 0 : let connection_ctx = listener_ctx
180 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
181 0 : connection_handler_tasks.spawn(page_service_conn_main(
182 0 : tenant_manager.clone(),
183 0 : local_auth,
184 0 : socket,
185 0 : auth_type,
186 0 : connection_ctx,
187 0 : connections_cancel.child_token(),
188 0 : ));
189 : }
190 0 : Err(err) => {
191 0 : // accept() failed. Log the error, and loop back to retry on next connection.
192 0 : error!("accept() failed: {:?}", err);
193 : }
194 : }
195 : }
196 :
197 0 : debug!("page_service listener loop terminated");
198 :
199 0 : Connections {
200 0 : cancel: connections_cancel,
201 0 : tasks: connection_handler_tasks,
202 0 : }
203 0 : }
204 :
205 : type ConnectionHandlerResult = anyhow::Result<()>;
206 :
207 0 : #[instrument(skip_all, fields(peer_addr))]
208 : async fn page_service_conn_main(
209 : tenant_manager: Arc<TenantManager>,
210 : auth: Option<Arc<SwappableJwtAuth>>,
211 : socket: tokio::net::TcpStream,
212 : auth_type: AuthType,
213 : connection_ctx: RequestContext,
214 : cancel: CancellationToken,
215 : ) -> ConnectionHandlerResult {
216 : let _guard = LIVE_CONNECTIONS
217 : .with_label_values(&["page_service"])
218 : .guard();
219 :
220 : socket
221 : .set_nodelay(true)
222 : .context("could not set TCP_NODELAY")?;
223 :
224 : let peer_addr = socket.peer_addr().context("get peer address")?;
225 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
226 :
227 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
228 : // - long enough for most valid compute connections
229 : // - less than infinite to stop us from "leaking" connections to long-gone computes
230 : //
231 : // no write timeout is used, because the kernel is assumed to error writes after some time.
232 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
233 :
234 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
235 0 : let socket_timeout_ms = (|| {
236 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
237 : // Exponential distribution for simulating
238 : // poor network conditions, expect about avg_timeout_ms to be around 15
239 : // in tests
240 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
241 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
242 0 : let u = rand::random::<f32>();
243 0 : ((1.0 - u).ln() / (-avg)) as u64
244 : } else {
245 0 : default_timeout_ms
246 : }
247 0 : });
248 0 : default_timeout_ms
249 : })();
250 :
251 : // A timeout here does not mean the client died, it can happen if it's just idle for
252 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
253 : // they reconnect.
254 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
255 : let socket = std::pin::pin!(socket);
256 :
257 : fail::fail_point!("ps::connection-start::pre-login");
258 :
259 : // XXX: pgbackend.run() should take the connection_ctx,
260 : // and create a child per-query context when it invokes process_query.
261 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
262 : // and create the per-query context in process_query ourselves.
263 : let mut conn_handler =
264 : PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone());
265 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
266 :
267 : match pgbackend.run(&mut conn_handler, &cancel).await {
268 : Ok(()) => {
269 : // we've been requested to shut down
270 : Ok(())
271 : }
272 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
273 : if is_expected_io_error(&io_error) {
274 : info!("Postgres client disconnected ({io_error})");
275 : Ok(())
276 : } else {
277 : let tenant_id = conn_handler.timeline_handles.tenant_id();
278 : Err(io_error).context(format!(
279 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
280 : tenant_id, peer_addr
281 : ))
282 : }
283 : }
284 : other => {
285 : let tenant_id = conn_handler.timeline_handles.tenant_id();
286 : other.context(format!(
287 : "Postgres query error for tenant_id={:?} client peer_addr={}",
288 : tenant_id, peer_addr
289 : ))
290 : }
291 : }
292 : }
293 :
294 : struct PageServerHandler {
295 : auth: Option<Arc<SwappableJwtAuth>>,
296 : claims: Option<Claims>,
297 :
298 : /// The context created for the lifetime of the connection
299 : /// services by this PageServerHandler.
300 : /// For each query received over the connection,
301 : /// `process_query` creates a child context from this one.
302 : connection_ctx: RequestContext,
303 :
304 : cancel: CancellationToken,
305 :
306 : timeline_handles: TimelineHandles,
307 : }
308 :
309 : struct TimelineHandles {
310 : wrapper: TenantManagerWrapper,
311 : /// Note on size: the typical size of this map is 1. The largest size we expect
312 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
313 : /// or the ratio used when splitting shards (i.e. how many children created from one)
314 : /// parent shard, where a "large" number might be ~8.
315 : handles: timeline::handle::Cache<TenantManagerTypes>,
316 : }
317 :
318 : impl TimelineHandles {
319 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
320 0 : Self {
321 0 : wrapper: TenantManagerWrapper {
322 0 : tenant_manager,
323 0 : tenant_id: OnceCell::new(),
324 0 : },
325 0 : handles: Default::default(),
326 0 : }
327 0 : }
328 0 : async fn get(
329 0 : &mut self,
330 0 : tenant_id: TenantId,
331 0 : timeline_id: TimelineId,
332 0 : shard_selector: ShardSelector,
333 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
334 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
335 0 : return Err(GetActiveTimelineError::Tenant(
336 0 : GetActiveTenantError::SwitchedTenant,
337 0 : ));
338 0 : }
339 0 : self.handles
340 0 : .get(timeline_id, shard_selector, &self.wrapper)
341 0 : .await
342 0 : .map_err(|e| match e {
343 0 : timeline::handle::GetError::TenantManager(e) => e,
344 : timeline::handle::GetError::TimelineGateClosed => {
345 0 : trace!("timeline gate closed");
346 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
347 : }
348 : timeline::handle::GetError::PerTimelineStateShutDown => {
349 0 : trace!("per-timeline state shut down");
350 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
351 : }
352 0 : })
353 0 : }
354 :
355 0 : fn tenant_id(&self) -> Option<TenantId> {
356 0 : self.wrapper.tenant_id.get().copied()
357 0 : }
358 : }
359 :
360 : pub(crate) struct TenantManagerWrapper {
361 : tenant_manager: Arc<TenantManager>,
362 : // We do not support switching tenant_id on a connection at this point.
363 : // We can can add support for this later if needed without changing
364 : // the protocol.
365 : tenant_id: once_cell::sync::OnceCell<TenantId>,
366 : }
367 :
368 : #[derive(Debug)]
369 : pub(crate) struct TenantManagerTypes;
370 :
371 : impl timeline::handle::Types for TenantManagerTypes {
372 : type TenantManagerError = GetActiveTimelineError;
373 : type TenantManager = TenantManagerWrapper;
374 : type Timeline = Arc<Timeline>;
375 : }
376 :
377 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
378 0 : fn gate(&self) -> &utils::sync::gate::Gate {
379 0 : &self.gate
380 0 : }
381 :
382 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
383 0 : Timeline::shard_timeline_id(self)
384 0 : }
385 :
386 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
387 0 : &self.handles
388 0 : }
389 :
390 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
391 0 : Timeline::get_shard_identity(self)
392 0 : }
393 : }
394 :
395 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
396 0 : async fn resolve(
397 0 : &self,
398 0 : timeline_id: TimelineId,
399 0 : shard_selector: ShardSelector,
400 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
401 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
402 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
403 0 : let wait_start = Instant::now();
404 0 : let deadline = wait_start + timeout;
405 0 : let tenant_shard = loop {
406 0 : let resolved = self
407 0 : .tenant_manager
408 0 : .resolve_attached_shard(tenant_id, shard_selector);
409 0 : match resolved {
410 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
411 : ShardResolveResult::NotFound => {
412 0 : return Err(GetActiveTimelineError::Tenant(
413 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
414 0 : ));
415 : }
416 0 : ShardResolveResult::InProgress(barrier) => {
417 0 : // We can't authoritatively answer right now: wait for InProgress state
418 0 : // to end, then try again
419 0 : tokio::select! {
420 0 : _ = barrier.wait() => {
421 0 : // The barrier completed: proceed around the loop to try looking up again
422 0 : },
423 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
424 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
425 0 : latest_state: None,
426 0 : wait_time: timeout,
427 0 : }));
428 : }
429 : }
430 : }
431 : };
432 : };
433 :
434 0 : tracing::debug!("Waiting for tenant to enter active state...");
435 0 : tenant_shard
436 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
437 0 : .await
438 0 : .map_err(GetActiveTimelineError::Tenant)?;
439 :
440 0 : let timeline = tenant_shard
441 0 : .get_timeline(timeline_id, true)
442 0 : .map_err(GetActiveTimelineError::Timeline)?;
443 0 : set_tracing_field_shard_id(&timeline);
444 0 : Ok(timeline)
445 0 : }
446 : }
447 :
448 0 : #[derive(thiserror::Error, Debug)]
449 : enum PageStreamError {
450 : /// We encountered an error that should prompt the client to reconnect:
451 : /// in practice this means we drop the connection without sending a response.
452 : #[error("Reconnect required: {0}")]
453 : Reconnect(Cow<'static, str>),
454 :
455 : /// We were instructed to shutdown while processing the query
456 : #[error("Shutting down")]
457 : Shutdown,
458 :
459 : /// Something went wrong reading a page: this likely indicates a pageserver bug
460 : #[error("Read error")]
461 : Read(#[source] PageReconstructError),
462 :
463 : /// Ran out of time waiting for an LSN
464 : #[error("LSN timeout: {0}")]
465 : LsnTimeout(WaitLsnError),
466 :
467 : /// The entity required to serve the request (tenant or timeline) is not found,
468 : /// or is not found in a suitable state to serve a request.
469 : #[error("Not found: {0}")]
470 : NotFound(Cow<'static, str>),
471 :
472 : /// Request asked for something that doesn't make sense, like an invalid LSN
473 : #[error("Bad request: {0}")]
474 : BadRequest(Cow<'static, str>),
475 : }
476 :
477 : impl From<PageReconstructError> for PageStreamError {
478 0 : fn from(value: PageReconstructError) -> Self {
479 0 : match value {
480 0 : PageReconstructError::Cancelled => Self::Shutdown,
481 0 : e => Self::Read(e),
482 : }
483 0 : }
484 : }
485 :
486 : impl From<GetActiveTimelineError> for PageStreamError {
487 0 : fn from(value: GetActiveTimelineError) -> Self {
488 0 : match value {
489 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
490 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
491 : TenantState::Stopping { .. },
492 : ))
493 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
494 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
495 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
496 : }
497 0 : }
498 : }
499 :
500 : impl From<WaitLsnError> for PageStreamError {
501 0 : fn from(value: WaitLsnError) -> Self {
502 0 : match value {
503 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
504 0 : WaitLsnError::Shutdown => Self::Shutdown,
505 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
506 : }
507 0 : }
508 : }
509 :
510 : impl From<WaitLsnError> for QueryError {
511 0 : fn from(value: WaitLsnError) -> Self {
512 0 : match value {
513 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
514 0 : WaitLsnError::Shutdown => Self::Shutdown,
515 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
516 : }
517 0 : }
518 : }
519 :
520 : impl PageServerHandler {
521 0 : pub fn new(
522 0 : tenant_manager: Arc<TenantManager>,
523 0 : auth: Option<Arc<SwappableJwtAuth>>,
524 0 : connection_ctx: RequestContext,
525 0 : cancel: CancellationToken,
526 0 : ) -> Self {
527 0 : PageServerHandler {
528 0 : auth,
529 0 : claims: None,
530 0 : connection_ctx,
531 0 : timeline_handles: TimelineHandles::new(tenant_manager),
532 0 : cancel,
533 0 : }
534 0 : }
535 :
536 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
537 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
538 : /// cancellation if there aren't any timelines in the cache.
539 : ///
540 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
541 : /// timeline cancellation token.
542 0 : async fn flush_cancellable<IO>(
543 0 : &self,
544 0 : pgb: &mut PostgresBackend<IO>,
545 0 : cancel: &CancellationToken,
546 0 : ) -> Result<(), QueryError>
547 0 : where
548 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
549 0 : {
550 0 : tokio::select!(
551 0 : flush_r = pgb.flush() => {
552 0 : Ok(flush_r?)
553 : },
554 0 : _ = cancel.cancelled() => {
555 0 : Err(QueryError::Shutdown)
556 : }
557 : )
558 0 : }
559 :
560 : /// Pagestream sub-protocol handler.
561 : ///
562 : /// It is a simple request-response protocol inside a COPYBOTH session.
563 : ///
564 : /// # Coding Discipline
565 : ///
566 : /// Coding discipline within this function: all interaction with the `pgb` connection
567 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
568 : /// This is so that we can shutdown page_service quickly.
569 0 : #[instrument(skip_all)]
570 : async fn handle_pagerequests<IO>(
571 : &mut self,
572 : pgb: &mut PostgresBackend<IO>,
573 : tenant_id: TenantId,
574 : timeline_id: TimelineId,
575 : _protocol_version: PagestreamProtocolVersion,
576 : ctx: RequestContext,
577 : ) -> Result<(), QueryError>
578 : where
579 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
580 : {
581 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
582 :
583 : // switch client to COPYBOTH
584 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
585 : tokio::select! {
586 : biased;
587 : _ = self.cancel.cancelled() => {
588 : return Err(QueryError::Shutdown)
589 : }
590 : res = pgb.flush() => {
591 : res?;
592 : }
593 : }
594 :
595 : loop {
596 : // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
597 : let msg = tokio::select! {
598 : biased;
599 : _ = self.cancel.cancelled() => {
600 : return Err(QueryError::Shutdown)
601 : }
602 : msg = pgb.read_message() => { msg }
603 : };
604 : let copy_data_bytes = match msg? {
605 : Some(FeMessage::CopyData(bytes)) => bytes,
606 : Some(FeMessage::Terminate) => break,
607 : Some(m) => {
608 : return Err(QueryError::Other(anyhow::anyhow!(
609 : "unexpected message: {m:?} during COPY"
610 : )));
611 : }
612 : None => break, // client disconnected
613 : };
614 :
615 : trace!("query: {copy_data_bytes:?}");
616 : fail::fail_point!("ps::handle-pagerequest-message");
617 :
618 : // parse request
619 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
620 :
621 : // invoke handler function
622 : let (handler_result, span) = match neon_fe_msg {
623 : PagestreamFeMessage::Exists(req) => {
624 : fail::fail_point!("ps::handle-pagerequest-message::exists");
625 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
626 : (
627 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
628 : .instrument(span.clone())
629 : .await,
630 : span,
631 : )
632 : }
633 : PagestreamFeMessage::Nblocks(req) => {
634 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
635 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
636 : (
637 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
638 : .instrument(span.clone())
639 : .await,
640 : span,
641 : )
642 : }
643 : PagestreamFeMessage::GetPage(req) => {
644 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
645 : // shard_id is filled in by the handler
646 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
647 : (
648 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
649 : .instrument(span.clone())
650 : .await,
651 : span,
652 : )
653 : }
654 : PagestreamFeMessage::DbSize(req) => {
655 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
656 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
657 : (
658 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
659 : .instrument(span.clone())
660 : .await,
661 : span,
662 : )
663 : }
664 : PagestreamFeMessage::GetSlruSegment(req) => {
665 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
666 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
667 : (
668 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
669 : .instrument(span.clone())
670 : .await,
671 : span,
672 : )
673 : }
674 : };
675 :
676 : // Map handler result to protocol behavior.
677 : // Some handler errors cause exit from pagestream protocol.
678 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
679 : let response_msg = match handler_result {
680 : Err(e) => match &e {
681 : PageStreamError::Shutdown => {
682 : // If we fail to fulfil a request during shutdown, which may be _because_ of
683 : // shutdown, then do not send the error to the client. Instead just drop the
684 : // connection.
685 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
686 : return Err(QueryError::Shutdown);
687 : }
688 : PageStreamError::Reconnect(reason) => {
689 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
690 : return Err(QueryError::Reconnect);
691 : }
692 : PageStreamError::Read(_)
693 : | PageStreamError::LsnTimeout(_)
694 : | PageStreamError::NotFound(_)
695 : | PageStreamError::BadRequest(_) => {
696 : // print the all details to the log with {:#}, but for the client the
697 : // error message is enough. Do not log if shutting down, as the anyhow::Error
698 : // here includes cancellation which is not an error.
699 : let full = utils::error::report_compact_sources(&e);
700 0 : span.in_scope(|| {
701 0 : error!("error reading relation or page version: {full:#}")
702 0 : });
703 : PagestreamBeMessage::Error(PagestreamErrorResponse {
704 : message: e.to_string(),
705 : })
706 : }
707 : },
708 : Ok(response_msg) => response_msg,
709 : };
710 :
711 : // marshal & transmit response message
712 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
713 : tokio::select! {
714 : biased;
715 : _ = self.cancel.cancelled() => {
716 : // We were requested to shut down.
717 : info!("shutdown request received in page handler");
718 : return Err(QueryError::Shutdown)
719 : }
720 : res = pgb.flush() => {
721 : res?;
722 : }
723 : }
724 : }
725 : Ok(())
726 : }
727 :
728 : /// Helper function to handle the LSN from client request.
729 : ///
730 : /// Each GetPage (and Exists and Nblocks) request includes information about
731 : /// which version of the page is being requested. The primary compute node
732 : /// will always request the latest page version, by setting 'request_lsn' to
733 : /// the last inserted or flushed WAL position, while a standby will request
734 : /// a version at the LSN that it's currently caught up to.
735 : ///
736 : /// In either case, if the page server hasn't received the WAL up to the
737 : /// requested LSN yet, we will wait for it to arrive. The return value is
738 : /// the LSN that should be used to look up the page versions.
739 : ///
740 : /// In addition to the request LSN, each request carries another LSN,
741 : /// 'not_modified_since', which is a hint to the pageserver that the client
742 : /// knows that the page has not been modified between 'not_modified_since'
743 : /// and the request LSN. This allows skipping the wait, as long as the WAL
744 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
745 : /// information about when the page was modified, it will use
746 : /// not_modified_since == lsn. If the client lies and sends a too low
747 : /// not_modified_hint such that there are in fact later page versions, the
748 : /// behavior is undefined: the pageserver may return any of the page versions
749 : /// or an error.
750 0 : async fn wait_or_get_last_lsn(
751 0 : timeline: &Timeline,
752 0 : request_lsn: Lsn,
753 0 : not_modified_since: Lsn,
754 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
755 0 : ctx: &RequestContext,
756 0 : ) -> Result<Lsn, PageStreamError> {
757 0 : let last_record_lsn = timeline.get_last_record_lsn();
758 0 :
759 0 : // Sanity check the request
760 0 : if request_lsn < not_modified_since {
761 0 : return Err(PageStreamError::BadRequest(
762 0 : format!(
763 0 : "invalid request with request LSN {} and not_modified_since {}",
764 0 : request_lsn, not_modified_since,
765 0 : )
766 0 : .into(),
767 0 : ));
768 0 : }
769 0 :
770 0 : if request_lsn < **latest_gc_cutoff_lsn {
771 0 : let gc_info = &timeline.gc_info.read().unwrap();
772 0 : if !gc_info.leases.contains_key(&request_lsn) {
773 : // The requested LSN is below gc cutoff and is not guarded by a lease.
774 :
775 : // Check explicitly for INVALID just to get a less scary error message if the
776 : // request is obviously bogus
777 0 : return Err(if request_lsn == Lsn::INVALID {
778 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
779 : } else {
780 0 : PageStreamError::BadRequest(format!(
781 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
782 0 : request_lsn, **latest_gc_cutoff_lsn
783 0 : ).into())
784 : });
785 0 : }
786 0 : }
787 :
788 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
789 0 : if not_modified_since > last_record_lsn {
790 0 : timeline
791 0 : .wait_lsn(
792 0 : not_modified_since,
793 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
794 0 : ctx,
795 0 : )
796 0 : .await?;
797 : // Since we waited for 'not_modified_since' to arrive, that is now the last
798 : // record LSN. (Or close enough for our purposes; the last-record LSN can
799 : // advance immediately after we return anyway)
800 0 : Ok(not_modified_since)
801 : } else {
802 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
803 : // here instead. That would give the same result, since we know that there
804 : // haven't been any modifications since 'not_modified_since'. Using an older
805 : // LSN might be faster, because that could allow skipping recent layers when
806 : // finding the page. However, we have historically used 'last_record_lsn', so
807 : // stick to that for now.
808 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
809 : }
810 0 : }
811 :
812 : /// Handles the lsn lease request.
813 : /// If a lease cannot be obtained, the client will receive NULL.
814 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
815 : async fn handle_make_lsn_lease<IO>(
816 : &mut self,
817 : pgb: &mut PostgresBackend<IO>,
818 : tenant_shard_id: TenantShardId,
819 : timeline_id: TimelineId,
820 : lsn: Lsn,
821 : ctx: &RequestContext,
822 : ) -> Result<(), QueryError>
823 : where
824 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
825 : {
826 : let timeline = self
827 : .timeline_handles
828 : .get(
829 : tenant_shard_id.tenant_id,
830 : timeline_id,
831 : ShardSelector::Known(tenant_shard_id.to_index()),
832 : )
833 : .await?;
834 : set_tracing_field_shard_id(&timeline);
835 :
836 : let lease = timeline
837 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
838 0 : .inspect_err(|e| {
839 0 : warn!("{e}");
840 0 : })
841 : .ok();
842 0 : let valid_until_str = lease.map(|l| {
843 0 : l.valid_until
844 0 : .duration_since(SystemTime::UNIX_EPOCH)
845 0 : .expect("valid_until is earlier than UNIX_EPOCH")
846 0 : .as_millis()
847 0 : .to_string()
848 0 : });
849 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
850 :
851 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
852 : b"valid_until",
853 : )]))?
854 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
855 :
856 : Ok(())
857 : }
858 :
859 0 : #[instrument(skip_all, fields(shard_id))]
860 : async fn handle_get_rel_exists_request(
861 : &mut self,
862 : tenant_id: TenantId,
863 : timeline_id: TimelineId,
864 : req: &PagestreamExistsRequest,
865 : ctx: &RequestContext,
866 : ) -> Result<PagestreamBeMessage, PageStreamError> {
867 : let timeline = self
868 : .timeline_handles
869 : .get(tenant_id, timeline_id, ShardSelector::Zero)
870 : .await?;
871 : let _timer = timeline
872 : .query_metrics
873 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
874 :
875 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
876 : let lsn = Self::wait_or_get_last_lsn(
877 : &timeline,
878 : req.request_lsn,
879 : req.not_modified_since,
880 : &latest_gc_cutoff_lsn,
881 : ctx,
882 : )
883 : .await?;
884 :
885 : let exists = timeline
886 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
887 : .await?;
888 :
889 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
890 : exists,
891 : }))
892 : }
893 :
894 0 : #[instrument(skip_all, fields(shard_id))]
895 : async fn handle_get_nblocks_request(
896 : &mut self,
897 : tenant_id: TenantId,
898 : timeline_id: TimelineId,
899 : req: &PagestreamNblocksRequest,
900 : ctx: &RequestContext,
901 : ) -> Result<PagestreamBeMessage, PageStreamError> {
902 : let timeline = self
903 : .timeline_handles
904 : .get(tenant_id, timeline_id, ShardSelector::Zero)
905 : .await?;
906 :
907 : let _timer = timeline
908 : .query_metrics
909 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
910 :
911 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
912 : let lsn = Self::wait_or_get_last_lsn(
913 : &timeline,
914 : req.request_lsn,
915 : req.not_modified_since,
916 : &latest_gc_cutoff_lsn,
917 : ctx,
918 : )
919 : .await?;
920 :
921 : let n_blocks = timeline
922 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
923 : .await?;
924 :
925 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
926 : n_blocks,
927 : }))
928 : }
929 :
930 0 : #[instrument(skip_all, fields(shard_id))]
931 : async fn handle_db_size_request(
932 : &mut self,
933 : tenant_id: TenantId,
934 : timeline_id: TimelineId,
935 : req: &PagestreamDbSizeRequest,
936 : ctx: &RequestContext,
937 : ) -> Result<PagestreamBeMessage, PageStreamError> {
938 : let timeline = self
939 : .timeline_handles
940 : .get(tenant_id, timeline_id, ShardSelector::Zero)
941 : .await?;
942 :
943 : let _timer = timeline
944 : .query_metrics
945 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
946 :
947 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
948 : let lsn = Self::wait_or_get_last_lsn(
949 : &timeline,
950 : req.request_lsn,
951 : req.not_modified_since,
952 : &latest_gc_cutoff_lsn,
953 : ctx,
954 : )
955 : .await?;
956 :
957 : let total_blocks = timeline
958 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
959 : .await?;
960 : let db_size = total_blocks as i64 * BLCKSZ as i64;
961 :
962 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
963 : db_size,
964 : }))
965 : }
966 :
967 0 : #[instrument(skip_all, fields(shard_id))]
968 : async fn handle_get_page_at_lsn_request(
969 : &mut self,
970 : tenant_id: TenantId,
971 : timeline_id: TimelineId,
972 : req: &PagestreamGetPageRequest,
973 : ctx: &RequestContext,
974 : ) -> Result<PagestreamBeMessage, PageStreamError> {
975 : let timeline = match self
976 : .timeline_handles
977 : .get(
978 : tenant_id,
979 : timeline_id,
980 : ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
981 : )
982 : .await
983 : {
984 : Ok(tl) => tl,
985 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
986 : // We already know this tenant exists in general, because we resolved it at
987 : // start of connection. Getting a NotFound here indicates that the shard containing
988 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
989 : // mapping is out of date.
990 : //
991 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
992 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
993 : // and talk to a different pageserver.
994 : return Err(PageStreamError::Reconnect(
995 : "getpage@lsn request routed to wrong shard".into(),
996 : ));
997 : }
998 : Err(e) => return Err(e.into()),
999 : };
1000 :
1001 : let _timer = timeline
1002 : .query_metrics
1003 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
1004 :
1005 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1006 : let lsn = Self::wait_or_get_last_lsn(
1007 : &timeline,
1008 : req.request_lsn,
1009 : req.not_modified_since,
1010 : &latest_gc_cutoff_lsn,
1011 : ctx,
1012 : )
1013 : .await?;
1014 :
1015 : let page = timeline
1016 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
1017 : .await?;
1018 :
1019 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1020 : page,
1021 : }))
1022 : }
1023 :
1024 0 : #[instrument(skip_all, fields(shard_id))]
1025 : async fn handle_get_slru_segment_request(
1026 : &mut self,
1027 : tenant_id: TenantId,
1028 : timeline_id: TimelineId,
1029 : req: &PagestreamGetSlruSegmentRequest,
1030 : ctx: &RequestContext,
1031 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1032 : let timeline = self
1033 : .timeline_handles
1034 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1035 : .await?;
1036 :
1037 : let _timer = timeline
1038 : .query_metrics
1039 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1040 :
1041 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1042 : let lsn = Self::wait_or_get_last_lsn(
1043 : &timeline,
1044 : req.request_lsn,
1045 : req.not_modified_since,
1046 : &latest_gc_cutoff_lsn,
1047 : ctx,
1048 : )
1049 : .await?;
1050 :
1051 : let kind = SlruKind::from_repr(req.kind)
1052 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1053 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1054 :
1055 : Ok(PagestreamBeMessage::GetSlruSegment(
1056 : PagestreamGetSlruSegmentResponse { segment },
1057 : ))
1058 : }
1059 :
1060 : /// Note on "fullbackup":
1061 : /// Full basebackups should only be used for debugging purposes.
1062 : /// Originally, it was introduced to enable breaking storage format changes,
1063 : /// but that is not applicable anymore.
1064 : ///
1065 : /// # Coding Discipline
1066 : ///
1067 : /// Coding discipline within this function: all interaction with the `pgb` connection
1068 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1069 : /// This is so that we can shutdown page_service quickly.
1070 : ///
1071 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1072 : /// to connection cancellation.
1073 : #[allow(clippy::too_many_arguments)]
1074 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1075 : async fn handle_basebackup_request<IO>(
1076 : &mut self,
1077 : pgb: &mut PostgresBackend<IO>,
1078 : tenant_id: TenantId,
1079 : timeline_id: TimelineId,
1080 : lsn: Option<Lsn>,
1081 : prev_lsn: Option<Lsn>,
1082 : full_backup: bool,
1083 : gzip: bool,
1084 : replica: bool,
1085 : ctx: &RequestContext,
1086 : ) -> Result<(), QueryError>
1087 : where
1088 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1089 : {
1090 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1091 0 : match err {
1092 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1093 0 : BasebackupError::Server(e) => QueryError::Other(e),
1094 : }
1095 0 : }
1096 :
1097 : let started = std::time::Instant::now();
1098 :
1099 : let timeline = self
1100 : .timeline_handles
1101 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1102 : .await?;
1103 :
1104 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1105 : if let Some(lsn) = lsn {
1106 : // Backup was requested at a particular LSN. Wait for it to arrive.
1107 : info!("waiting for {}", lsn);
1108 : timeline
1109 : .wait_lsn(
1110 : lsn,
1111 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1112 : ctx,
1113 : )
1114 : .await?;
1115 : timeline
1116 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1117 : .context("invalid basebackup lsn")?;
1118 : }
1119 :
1120 : let lsn_awaited_after = started.elapsed();
1121 :
1122 : // switch client to COPYOUT
1123 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1124 : .map_err(QueryError::Disconnected)?;
1125 : self.flush_cancellable(pgb, &self.cancel).await?;
1126 :
1127 : // Send a tarball of the latest layer on the timeline. Compress if not
1128 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1129 : if full_backup {
1130 : let mut writer = pgb.copyout_writer();
1131 : basebackup::send_basebackup_tarball(
1132 : &mut writer,
1133 : &timeline,
1134 : lsn,
1135 : prev_lsn,
1136 : full_backup,
1137 : replica,
1138 : ctx,
1139 : )
1140 : .await
1141 : .map_err(map_basebackup_error)?;
1142 : } else {
1143 : let mut writer = BufWriter::new(pgb.copyout_writer());
1144 : if gzip {
1145 : let mut encoder = GzipEncoder::with_quality(
1146 : &mut writer,
1147 : // NOTE using fast compression because it's on the critical path
1148 : // for compute startup. For an empty database, we get
1149 : // <100KB with this method. The Level::Best compression method
1150 : // gives us <20KB, but maybe we should add basebackup caching
1151 : // on compute shutdown first.
1152 : async_compression::Level::Fastest,
1153 : );
1154 : basebackup::send_basebackup_tarball(
1155 : &mut encoder,
1156 : &timeline,
1157 : lsn,
1158 : prev_lsn,
1159 : full_backup,
1160 : replica,
1161 : ctx,
1162 : )
1163 : .await
1164 : .map_err(map_basebackup_error)?;
1165 : // shutdown the encoder to ensure the gzip footer is written
1166 : encoder
1167 : .shutdown()
1168 : .await
1169 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1170 : } else {
1171 : basebackup::send_basebackup_tarball(
1172 : &mut writer,
1173 : &timeline,
1174 : lsn,
1175 : prev_lsn,
1176 : full_backup,
1177 : replica,
1178 : ctx,
1179 : )
1180 : .await
1181 : .map_err(map_basebackup_error)?;
1182 : }
1183 : writer
1184 : .flush()
1185 : .await
1186 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
1187 : }
1188 :
1189 : pgb.write_message_noflush(&BeMessage::CopyDone)
1190 : .map_err(QueryError::Disconnected)?;
1191 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1192 :
1193 : let basebackup_after = started
1194 : .elapsed()
1195 : .checked_sub(lsn_awaited_after)
1196 : .unwrap_or(Duration::ZERO);
1197 :
1198 : info!(
1199 : lsn_await_millis = lsn_awaited_after.as_millis(),
1200 : basebackup_millis = basebackup_after.as_millis(),
1201 : "basebackup complete"
1202 : );
1203 :
1204 : Ok(())
1205 : }
1206 :
1207 : // when accessing management api supply None as an argument
1208 : // when using to authorize tenant pass corresponding tenant id
1209 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1210 0 : if self.auth.is_none() {
1211 : // auth is set to Trust, nothing to check so just return ok
1212 0 : return Ok(());
1213 0 : }
1214 0 : // auth is some, just checked above, when auth is some
1215 0 : // then claims are always present because of checks during connection init
1216 0 : // so this expect won't trigger
1217 0 : let claims = self
1218 0 : .claims
1219 0 : .as_ref()
1220 0 : .expect("claims presence already checked");
1221 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1222 0 : }
1223 : }
1224 :
1225 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
1226 : #[derive(Debug, Clone, Eq, PartialEq)]
1227 : struct BaseBackupCmd {
1228 : tenant_id: TenantId,
1229 : timeline_id: TimelineId,
1230 : lsn: Option<Lsn>,
1231 : gzip: bool,
1232 : replica: bool,
1233 : }
1234 :
1235 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
1236 : #[derive(Debug, Clone, Eq, PartialEq)]
1237 : struct FullBackupCmd {
1238 : tenant_id: TenantId,
1239 : timeline_id: TimelineId,
1240 : lsn: Option<Lsn>,
1241 : prev_lsn: Option<Lsn>,
1242 : }
1243 :
1244 : /// `pagestream_v2 tenant timeline`
1245 : #[derive(Debug, Clone, Eq, PartialEq)]
1246 : struct PageStreamCmd {
1247 : tenant_id: TenantId,
1248 : timeline_id: TimelineId,
1249 : }
1250 :
1251 : /// `lease lsn tenant timeline lsn`
1252 : #[derive(Debug, Clone, Eq, PartialEq)]
1253 : struct LeaseLsnCmd {
1254 : tenant_shard_id: TenantShardId,
1255 : timeline_id: TimelineId,
1256 : lsn: Lsn,
1257 : }
1258 :
1259 : #[derive(Debug, Clone, Eq, PartialEq)]
1260 : enum PageServiceCmd {
1261 : Set,
1262 : PageStream(PageStreamCmd),
1263 : BaseBackup(BaseBackupCmd),
1264 : FullBackup(FullBackupCmd),
1265 : LeaseLsn(LeaseLsnCmd),
1266 : }
1267 :
1268 : impl PageStreamCmd {
1269 6 : fn parse(query: &str) -> anyhow::Result<Self> {
1270 6 : let parameters = query.split_whitespace().collect_vec();
1271 6 : if parameters.len() != 2 {
1272 2 : bail!(
1273 2 : "invalid number of parameters for pagestream command: {}",
1274 2 : query
1275 2 : );
1276 4 : }
1277 4 : let tenant_id = TenantId::from_str(parameters[0])
1278 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1279 2 : let timeline_id = TimelineId::from_str(parameters[1])
1280 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1281 2 : Ok(Self {
1282 2 : tenant_id,
1283 2 : timeline_id,
1284 2 : })
1285 6 : }
1286 : }
1287 :
1288 : impl FullBackupCmd {
1289 4 : fn parse(query: &str) -> anyhow::Result<Self> {
1290 4 : let parameters = query.split_whitespace().collect_vec();
1291 4 : if parameters.len() < 2 || parameters.len() > 4 {
1292 0 : bail!(
1293 0 : "invalid number of parameters for basebackup command: {}",
1294 0 : query
1295 0 : );
1296 4 : }
1297 4 : let tenant_id = TenantId::from_str(parameters[0])
1298 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1299 4 : let timeline_id = TimelineId::from_str(parameters[1])
1300 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1301 : // The caller is responsible for providing correct lsn and prev_lsn.
1302 4 : let lsn = if let Some(lsn_str) = parameters.get(2) {
1303 : Some(
1304 2 : Lsn::from_str(lsn_str)
1305 2 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1306 : )
1307 : } else {
1308 2 : None
1309 : };
1310 4 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
1311 : Some(
1312 2 : Lsn::from_str(prev_lsn_str)
1313 2 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1314 : )
1315 : } else {
1316 2 : None
1317 : };
1318 4 : Ok(Self {
1319 4 : tenant_id,
1320 4 : timeline_id,
1321 4 : lsn,
1322 4 : prev_lsn,
1323 4 : })
1324 4 : }
1325 : }
1326 :
1327 : impl BaseBackupCmd {
1328 18 : fn parse(query: &str) -> anyhow::Result<Self> {
1329 18 : let parameters = query.split_whitespace().collect_vec();
1330 18 : if parameters.len() < 2 {
1331 0 : bail!(
1332 0 : "invalid number of parameters for basebackup command: {}",
1333 0 : query
1334 0 : );
1335 18 : }
1336 18 : let tenant_id = TenantId::from_str(parameters[0])
1337 18 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1338 18 : let timeline_id = TimelineId::from_str(parameters[1])
1339 18 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1340 : let lsn;
1341 : let flags_parse_from;
1342 18 : if let Some(maybe_lsn) = parameters.get(2) {
1343 16 : if *maybe_lsn == "latest" {
1344 2 : lsn = None;
1345 2 : flags_parse_from = 3;
1346 14 : } else if maybe_lsn.starts_with("--") {
1347 10 : lsn = None;
1348 10 : flags_parse_from = 2;
1349 10 : } else {
1350 : lsn = Some(
1351 4 : Lsn::from_str(maybe_lsn)
1352 4 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
1353 : );
1354 4 : flags_parse_from = 3;
1355 : }
1356 2 : } else {
1357 2 : lsn = None;
1358 2 : flags_parse_from = 2;
1359 2 : }
1360 :
1361 18 : let mut gzip = false;
1362 18 : let mut replica = false;
1363 :
1364 22 : for ¶m in ¶meters[flags_parse_from..] {
1365 22 : match param {
1366 22 : "--gzip" => {
1367 14 : if gzip {
1368 2 : bail!("duplicate parameter for basebackup command: {param}")
1369 12 : }
1370 12 : gzip = true
1371 : }
1372 8 : "--replica" => {
1373 4 : if replica {
1374 0 : bail!("duplicate parameter for basebackup command: {param}")
1375 4 : }
1376 4 : replica = true
1377 : }
1378 4 : _ => bail!("invalid parameter for basebackup command: {param}"),
1379 : }
1380 : }
1381 12 : Ok(Self {
1382 12 : tenant_id,
1383 12 : timeline_id,
1384 12 : lsn,
1385 12 : gzip,
1386 12 : replica,
1387 12 : })
1388 18 : }
1389 : }
1390 :
1391 : impl LeaseLsnCmd {
1392 4 : fn parse(query: &str) -> anyhow::Result<Self> {
1393 4 : let parameters = query.split_whitespace().collect_vec();
1394 4 : if parameters.len() != 3 {
1395 0 : bail!(
1396 0 : "invalid number of parameters for lease lsn command: {}",
1397 0 : query
1398 0 : );
1399 4 : }
1400 4 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
1401 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1402 4 : let timeline_id = TimelineId::from_str(parameters[1])
1403 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1404 4 : let lsn = Lsn::from_str(parameters[2])
1405 4 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
1406 4 : Ok(Self {
1407 4 : tenant_shard_id,
1408 4 : timeline_id,
1409 4 : lsn,
1410 4 : })
1411 4 : }
1412 : }
1413 :
1414 : impl PageServiceCmd {
1415 42 : fn parse(query: &str) -> anyhow::Result<Self> {
1416 42 : let query = query.trim();
1417 42 : let Some((cmd, other)) = query.split_once(' ') else {
1418 4 : bail!("cannot parse query: {query}")
1419 : };
1420 38 : match cmd.to_ascii_lowercase().as_str() {
1421 38 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)),
1422 32 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
1423 14 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
1424 10 : "lease" => {
1425 6 : let Some((cmd2, other)) = other.split_once(' ') else {
1426 0 : bail!("invalid lease command: {cmd}");
1427 : };
1428 6 : let cmd2 = cmd2.to_ascii_lowercase();
1429 6 : if cmd2 == "lsn" {
1430 4 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
1431 : } else {
1432 2 : bail!("invalid lease command: {cmd}");
1433 : }
1434 : }
1435 4 : "set" => Ok(Self::Set),
1436 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
1437 : }
1438 42 : }
1439 : }
1440 :
1441 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1442 : where
1443 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1444 : {
1445 0 : fn check_auth_jwt(
1446 0 : &mut self,
1447 0 : _pgb: &mut PostgresBackend<IO>,
1448 0 : jwt_response: &[u8],
1449 0 : ) -> Result<(), QueryError> {
1450 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1451 : // which requires auth to be present
1452 0 : let data = self
1453 0 : .auth
1454 0 : .as_ref()
1455 0 : .unwrap()
1456 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1457 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1458 :
1459 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1460 0 : return Err(QueryError::Unauthorized(
1461 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1462 0 : ));
1463 0 : }
1464 0 :
1465 0 : debug!(
1466 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1467 : data.claims.scope, data.claims.tenant_id,
1468 : );
1469 :
1470 0 : self.claims = Some(data.claims);
1471 0 : Ok(())
1472 0 : }
1473 :
1474 0 : fn startup(
1475 0 : &mut self,
1476 0 : _pgb: &mut PostgresBackend<IO>,
1477 0 : _sm: &FeStartupPacket,
1478 0 : ) -> Result<(), QueryError> {
1479 0 : fail::fail_point!("ps::connection-start::startup-packet");
1480 0 : Ok(())
1481 0 : }
1482 :
1483 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1484 : async fn process_query(
1485 : &mut self,
1486 : pgb: &mut PostgresBackend<IO>,
1487 : query_string: &str,
1488 : ) -> Result<(), QueryError> {
1489 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1490 0 : info!("Hit failpoint for bad connection");
1491 0 : Err(QueryError::SimulatedConnectionError)
1492 0 : });
1493 :
1494 : fail::fail_point!("ps::connection-start::process-query");
1495 :
1496 : let ctx = self.connection_ctx.attached_child();
1497 : debug!("process query {query_string}");
1498 : let query = PageServiceCmd::parse(query_string)?;
1499 : match query {
1500 : PageServiceCmd::PageStream(PageStreamCmd {
1501 : tenant_id,
1502 : timeline_id,
1503 : }) => {
1504 : tracing::Span::current()
1505 : .record("tenant_id", field::display(tenant_id))
1506 : .record("timeline_id", field::display(timeline_id));
1507 :
1508 : self.check_permission(Some(tenant_id))?;
1509 :
1510 : COMPUTE_COMMANDS_COUNTERS
1511 : .for_command(ComputeCommandKind::PageStreamV2)
1512 : .inc();
1513 :
1514 : self.handle_pagerequests(
1515 : pgb,
1516 : tenant_id,
1517 : timeline_id,
1518 : PagestreamProtocolVersion::V2,
1519 : ctx,
1520 : )
1521 : .await?;
1522 : }
1523 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1524 : tenant_id,
1525 : timeline_id,
1526 : lsn,
1527 : gzip,
1528 : replica,
1529 : }) => {
1530 : tracing::Span::current()
1531 : .record("tenant_id", field::display(tenant_id))
1532 : .record("timeline_id", field::display(timeline_id));
1533 :
1534 : self.check_permission(Some(tenant_id))?;
1535 :
1536 : COMPUTE_COMMANDS_COUNTERS
1537 : .for_command(ComputeCommandKind::Basebackup)
1538 : .inc();
1539 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1540 0 : let res = async {
1541 0 : self.handle_basebackup_request(
1542 0 : pgb,
1543 0 : tenant_id,
1544 0 : timeline_id,
1545 0 : lsn,
1546 0 : None,
1547 0 : false,
1548 0 : gzip,
1549 0 : replica,
1550 0 : &ctx,
1551 0 : )
1552 0 : .await?;
1553 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1554 0 : Result::<(), QueryError>::Ok(())
1555 0 : }
1556 : .await;
1557 : metric_recording.observe(&res);
1558 : res?;
1559 : }
1560 : // same as basebackup, but result includes relational data as well
1561 : PageServiceCmd::FullBackup(FullBackupCmd {
1562 : tenant_id,
1563 : timeline_id,
1564 : lsn,
1565 : prev_lsn,
1566 : }) => {
1567 : tracing::Span::current()
1568 : .record("tenant_id", field::display(tenant_id))
1569 : .record("timeline_id", field::display(timeline_id));
1570 :
1571 : self.check_permission(Some(tenant_id))?;
1572 :
1573 : COMPUTE_COMMANDS_COUNTERS
1574 : .for_command(ComputeCommandKind::Fullbackup)
1575 : .inc();
1576 :
1577 : // Check that the timeline exists
1578 : self.handle_basebackup_request(
1579 : pgb,
1580 : tenant_id,
1581 : timeline_id,
1582 : lsn,
1583 : prev_lsn,
1584 : true,
1585 : false,
1586 : false,
1587 : &ctx,
1588 : )
1589 : .await?;
1590 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1591 : }
1592 : PageServiceCmd::Set => {
1593 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1594 : // on connect
1595 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1596 : }
1597 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
1598 : tenant_shard_id,
1599 : timeline_id,
1600 : lsn,
1601 : }) => {
1602 : tracing::Span::current()
1603 : .record("tenant_id", field::display(tenant_shard_id))
1604 : .record("timeline_id", field::display(timeline_id));
1605 :
1606 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1607 :
1608 : COMPUTE_COMMANDS_COUNTERS
1609 : .for_command(ComputeCommandKind::LeaseLsn)
1610 : .inc();
1611 :
1612 : match self
1613 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1614 : .await
1615 : {
1616 : Ok(()) => {
1617 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
1618 : }
1619 : Err(e) => {
1620 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1621 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1622 : &e.to_string(),
1623 : Some(e.pg_error_code()),
1624 : ))?
1625 : }
1626 : };
1627 : }
1628 : }
1629 :
1630 : Ok(())
1631 : }
1632 : }
1633 :
1634 : impl From<GetActiveTenantError> for QueryError {
1635 0 : fn from(e: GetActiveTenantError) -> Self {
1636 0 : match e {
1637 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1638 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1639 0 : ),
1640 : GetActiveTenantError::Cancelled
1641 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1642 0 : QueryError::Shutdown
1643 : }
1644 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1645 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1646 : }
1647 0 : }
1648 : }
1649 :
1650 0 : #[derive(Debug, thiserror::Error)]
1651 : pub(crate) enum GetActiveTimelineError {
1652 : #[error(transparent)]
1653 : Tenant(GetActiveTenantError),
1654 : #[error(transparent)]
1655 : Timeline(#[from] GetTimelineError),
1656 : }
1657 :
1658 : impl From<GetActiveTimelineError> for QueryError {
1659 0 : fn from(e: GetActiveTimelineError) -> Self {
1660 0 : match e {
1661 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1662 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1663 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1664 : }
1665 0 : }
1666 : }
1667 :
1668 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1669 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1670 0 : tracing::Span::current().record(
1671 0 : "shard_id",
1672 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1673 0 : );
1674 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1675 0 : }
1676 :
1677 : #[cfg(test)]
1678 : mod tests {
1679 : use utils::shard::ShardCount;
1680 :
1681 : use super::*;
1682 :
1683 : #[test]
1684 2 : fn pageservice_cmd_parse() {
1685 2 : let tenant_id = TenantId::generate();
1686 2 : let timeline_id = TimelineId::generate();
1687 2 : let cmd =
1688 2 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
1689 2 : assert_eq!(
1690 2 : cmd,
1691 2 : PageServiceCmd::PageStream(PageStreamCmd {
1692 2 : tenant_id,
1693 2 : timeline_id
1694 2 : })
1695 2 : );
1696 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
1697 2 : assert_eq!(
1698 2 : cmd,
1699 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1700 2 : tenant_id,
1701 2 : timeline_id,
1702 2 : lsn: None,
1703 2 : gzip: false,
1704 2 : replica: false
1705 2 : })
1706 2 : );
1707 2 : let cmd =
1708 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
1709 2 : assert_eq!(
1710 2 : cmd,
1711 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1712 2 : tenant_id,
1713 2 : timeline_id,
1714 2 : lsn: None,
1715 2 : gzip: true,
1716 2 : replica: false
1717 2 : })
1718 2 : );
1719 2 : let cmd =
1720 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
1721 2 : assert_eq!(
1722 2 : cmd,
1723 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1724 2 : tenant_id,
1725 2 : timeline_id,
1726 2 : lsn: None,
1727 2 : gzip: false,
1728 2 : replica: false
1729 2 : })
1730 2 : );
1731 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
1732 2 : .unwrap();
1733 2 : assert_eq!(
1734 2 : cmd,
1735 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1736 2 : tenant_id,
1737 2 : timeline_id,
1738 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
1739 2 : gzip: false,
1740 2 : replica: false
1741 2 : })
1742 2 : );
1743 2 : let cmd = PageServiceCmd::parse(&format!(
1744 2 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
1745 2 : ))
1746 2 : .unwrap();
1747 2 : assert_eq!(
1748 2 : cmd,
1749 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1750 2 : tenant_id,
1751 2 : timeline_id,
1752 2 : lsn: None,
1753 2 : gzip: true,
1754 2 : replica: true
1755 2 : })
1756 2 : );
1757 2 : let cmd = PageServiceCmd::parse(&format!(
1758 2 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
1759 2 : ))
1760 2 : .unwrap();
1761 2 : assert_eq!(
1762 2 : cmd,
1763 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1764 2 : tenant_id,
1765 2 : timeline_id,
1766 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
1767 2 : gzip: true,
1768 2 : replica: true
1769 2 : })
1770 2 : );
1771 2 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
1772 2 : assert_eq!(
1773 2 : cmd,
1774 2 : PageServiceCmd::FullBackup(FullBackupCmd {
1775 2 : tenant_id,
1776 2 : timeline_id,
1777 2 : lsn: None,
1778 2 : prev_lsn: None
1779 2 : })
1780 2 : );
1781 2 : let cmd = PageServiceCmd::parse(&format!(
1782 2 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
1783 2 : ))
1784 2 : .unwrap();
1785 2 : assert_eq!(
1786 2 : cmd,
1787 2 : PageServiceCmd::FullBackup(FullBackupCmd {
1788 2 : tenant_id,
1789 2 : timeline_id,
1790 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
1791 2 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
1792 2 : })
1793 2 : );
1794 2 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1795 2 : let cmd = PageServiceCmd::parse(&format!(
1796 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
1797 2 : ))
1798 2 : .unwrap();
1799 2 : assert_eq!(
1800 2 : cmd,
1801 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
1802 2 : tenant_shard_id,
1803 2 : timeline_id,
1804 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
1805 2 : })
1806 2 : );
1807 2 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
1808 2 : let cmd = PageServiceCmd::parse(&format!(
1809 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
1810 2 : ))
1811 2 : .unwrap();
1812 2 : assert_eq!(
1813 2 : cmd,
1814 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
1815 2 : tenant_shard_id,
1816 2 : timeline_id,
1817 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
1818 2 : })
1819 2 : );
1820 2 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
1821 2 : assert_eq!(cmd, PageServiceCmd::Set);
1822 2 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
1823 2 : assert_eq!(cmd, PageServiceCmd::Set);
1824 2 : }
1825 :
1826 : #[test]
1827 2 : fn pageservice_cmd_err_handling() {
1828 2 : let tenant_id = TenantId::generate();
1829 2 : let timeline_id = TimelineId::generate();
1830 2 : let cmd = PageServiceCmd::parse("unknown_command");
1831 2 : assert!(cmd.is_err());
1832 2 : let cmd = PageServiceCmd::parse("pagestream_v2");
1833 2 : assert!(cmd.is_err());
1834 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
1835 2 : assert!(cmd.is_err());
1836 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
1837 2 : assert!(cmd.is_err());
1838 2 : let cmd = PageServiceCmd::parse(&format!(
1839 2 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
1840 2 : ));
1841 2 : assert!(cmd.is_err());
1842 2 : let cmd = PageServiceCmd::parse(&format!(
1843 2 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
1844 2 : ));
1845 2 : assert!(cmd.is_err());
1846 2 : let cmd = PageServiceCmd::parse(&format!(
1847 2 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
1848 2 : ));
1849 2 : assert!(cmd.is_err());
1850 2 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
1851 2 : assert!(cmd.is_err());
1852 2 : }
1853 : }
|