Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::{bail, Context};
5 : use async_compression::tokio::write::GzipEncoder;
6 : use async_timer::Timer;
7 : use bytes::Buf;
8 : use futures::FutureExt;
9 : use itertools::Itertools;
10 : use once_cell::sync::OnceCell;
11 : use pageserver_api::models::{self, TenantState};
12 : use pageserver_api::models::{
13 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
14 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
15 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
16 : PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
17 : PagestreamProtocolVersion,
18 : };
19 : use pageserver_api::shard::TenantShardId;
20 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
21 : use pq_proto::framed::ConnectionError;
22 : use pq_proto::FeStartupPacket;
23 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
24 : use std::borrow::Cow;
25 : use std::io;
26 : use std::pin::Pin;
27 : use std::str;
28 : use std::str::FromStr;
29 : use std::sync::Arc;
30 : use std::time::SystemTime;
31 : use std::time::{Duration, Instant};
32 : use tokio::io::{AsyncRead, AsyncWrite};
33 : use tokio::io::{AsyncWriteExt, BufWriter};
34 : use tokio::task::JoinHandle;
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::*;
37 : use utils::{
38 : auth::{Claims, Scope, SwappableJwtAuth},
39 : id::{TenantId, TimelineId},
40 : lsn::Lsn,
41 : simple_rcu::RcuReadGuard,
42 : };
43 :
44 : use crate::auth::check_permission;
45 : use crate::basebackup;
46 : use crate::basebackup::BasebackupError;
47 : use crate::config::PageServerConf;
48 : use crate::context::{DownloadBehavior, RequestContext};
49 : use crate::metrics::{self};
50 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
51 : use crate::pgdatadir_mapping::Version;
52 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
53 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
54 : use crate::task_mgr::TaskKind;
55 : use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
56 : use crate::tenant::mgr::ShardSelector;
57 : use crate::tenant::mgr::TenantManager;
58 : use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
59 : use crate::tenant::timeline::{self, WaitLsnError};
60 : use crate::tenant::GetTimelineError;
61 : use crate::tenant::PageReconstructError;
62 : use crate::tenant::Timeline;
63 : use pageserver_api::key::rel_block_to_key;
64 : use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
65 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
66 : use postgres_ffi::BLCKSZ;
67 :
68 : /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
69 : /// is not yet in state [`TenantState::Active`].
70 : ///
71 : /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
72 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
73 :
74 : ///////////////////////////////////////////////////////////////////////////////
75 :
76 : pub struct Listener {
77 : cancel: CancellationToken,
78 : /// Cancel the listener task through `listen_cancel` to shut down the listener
79 : /// and get a handle on the existing connections.
80 : task: JoinHandle<Connections>,
81 : }
82 :
83 : pub struct Connections {
84 : cancel: CancellationToken,
85 : tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
86 : }
87 :
88 0 : pub fn spawn(
89 0 : conf: &'static PageServerConf,
90 0 : tenant_manager: Arc<TenantManager>,
91 0 : pg_auth: Option<Arc<SwappableJwtAuth>>,
92 0 : tcp_listener: tokio::net::TcpListener,
93 0 : ) -> Listener {
94 0 : let cancel = CancellationToken::new();
95 0 : let libpq_ctx = RequestContext::todo_child(
96 0 : TaskKind::LibpqEndpointListener,
97 0 : // listener task shouldn't need to download anything. (We will
98 0 : // create a separate sub-contexts for each connection, with their
99 0 : // own download behavior. This context is used only to listen and
100 0 : // accept connections.)
101 0 : DownloadBehavior::Error,
102 0 : );
103 0 : let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
104 0 : "libpq listener",
105 0 : libpq_listener_main(
106 0 : tenant_manager,
107 0 : pg_auth,
108 0 : tcp_listener,
109 0 : conf.pg_auth_type,
110 0 : conf.server_side_batch_timeout,
111 0 : libpq_ctx,
112 0 : cancel.clone(),
113 0 : )
114 0 : .map(anyhow::Ok),
115 0 : ));
116 0 :
117 0 : Listener { cancel, task }
118 0 : }
119 :
120 : impl Listener {
121 0 : pub async fn stop_accepting(self) -> Connections {
122 0 : self.cancel.cancel();
123 0 : self.task
124 0 : .await
125 0 : .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error")
126 0 : }
127 : }
128 : impl Connections {
129 0 : pub(crate) async fn shutdown(self) {
130 0 : let Self { cancel, mut tasks } = self;
131 0 : cancel.cancel();
132 0 : while let Some(res) = tasks.join_next().await {
133 0 : Self::handle_connection_completion(res);
134 0 : }
135 0 : }
136 :
137 0 : fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
138 0 : match res {
139 0 : Ok(Ok(())) => {}
140 0 : Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
141 0 : Err(e) => error!("page_service connection task panicked: {:?}", e),
142 : }
143 0 : }
144 : }
145 :
146 : ///
147 : /// Main loop of the page service.
148 : ///
149 : /// Listens for connections, and launches a new handler task for each.
150 : ///
151 : /// Returns Ok(()) upon cancellation via `cancel`, returning the set of
152 : /// open connections.
153 : ///
154 0 : pub async fn libpq_listener_main(
155 0 : tenant_manager: Arc<TenantManager>,
156 0 : auth: Option<Arc<SwappableJwtAuth>>,
157 0 : listener: tokio::net::TcpListener,
158 0 : auth_type: AuthType,
159 0 : server_side_batch_timeout: Option<Duration>,
160 0 : listener_ctx: RequestContext,
161 0 : listener_cancel: CancellationToken,
162 0 : ) -> Connections {
163 0 : let connections_cancel = CancellationToken::new();
164 0 : let mut connection_handler_tasks = tokio::task::JoinSet::default();
165 :
166 : loop {
167 0 : let accepted = tokio::select! {
168 : biased;
169 0 : _ = listener_cancel.cancelled() => break,
170 0 : next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
171 0 : let res = next.expect("we dont poll while empty");
172 0 : Connections::handle_connection_completion(res);
173 0 : continue;
174 : }
175 0 : accepted = listener.accept() => accepted,
176 0 : };
177 0 :
178 0 : match accepted {
179 0 : Ok((socket, peer_addr)) => {
180 0 : // Connection established. Spawn a new task to handle it.
181 0 : debug!("accepted connection from {}", peer_addr);
182 0 : let local_auth = auth.clone();
183 0 : let connection_ctx = listener_ctx
184 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
185 0 : connection_handler_tasks.spawn(page_service_conn_main(
186 0 : tenant_manager.clone(),
187 0 : local_auth,
188 0 : socket,
189 0 : auth_type,
190 0 : server_side_batch_timeout,
191 0 : connection_ctx,
192 0 : connections_cancel.child_token(),
193 0 : ));
194 : }
195 0 : Err(err) => {
196 0 : // accept() failed. Log the error, and loop back to retry on next connection.
197 0 : error!("accept() failed: {:?}", err);
198 : }
199 : }
200 : }
201 :
202 0 : debug!("page_service listener loop terminated");
203 :
204 0 : Connections {
205 0 : cancel: connections_cancel,
206 0 : tasks: connection_handler_tasks,
207 0 : }
208 0 : }
209 :
210 : type ConnectionHandlerResult = anyhow::Result<()>;
211 :
212 0 : #[instrument(skip_all, fields(peer_addr))]
213 : async fn page_service_conn_main(
214 : tenant_manager: Arc<TenantManager>,
215 : auth: Option<Arc<SwappableJwtAuth>>,
216 : socket: tokio::net::TcpStream,
217 : auth_type: AuthType,
218 : server_side_batch_timeout: Option<Duration>,
219 : connection_ctx: RequestContext,
220 : cancel: CancellationToken,
221 : ) -> ConnectionHandlerResult {
222 : let _guard = LIVE_CONNECTIONS
223 : .with_label_values(&["page_service"])
224 : .guard();
225 :
226 : socket
227 : .set_nodelay(true)
228 : .context("could not set TCP_NODELAY")?;
229 :
230 : let peer_addr = socket.peer_addr().context("get peer address")?;
231 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
232 :
233 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
234 : // - long enough for most valid compute connections
235 : // - less than infinite to stop us from "leaking" connections to long-gone computes
236 : //
237 : // no write timeout is used, because the kernel is assumed to error writes after some time.
238 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
239 :
240 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
241 0 : let socket_timeout_ms = (|| {
242 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
243 : // Exponential distribution for simulating
244 : // poor network conditions, expect about avg_timeout_ms to be around 15
245 : // in tests
246 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
247 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
248 0 : let u = rand::random::<f32>();
249 0 : ((1.0 - u).ln() / (-avg)) as u64
250 : } else {
251 0 : default_timeout_ms
252 : }
253 0 : });
254 0 : default_timeout_ms
255 : })();
256 :
257 : // A timeout here does not mean the client died, it can happen if it's just idle for
258 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
259 : // they reconnect.
260 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
261 : let socket = std::pin::pin!(socket);
262 :
263 : fail::fail_point!("ps::connection-start::pre-login");
264 :
265 : // XXX: pgbackend.run() should take the connection_ctx,
266 : // and create a child per-query context when it invokes process_query.
267 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
268 : // and create the per-query context in process_query ourselves.
269 : let mut conn_handler = PageServerHandler::new(
270 : tenant_manager,
271 : auth,
272 : server_side_batch_timeout,
273 : connection_ctx,
274 : cancel.clone(),
275 : );
276 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
277 :
278 : match pgbackend.run(&mut conn_handler, &cancel).await {
279 : Ok(()) => {
280 : // we've been requested to shut down
281 : Ok(())
282 : }
283 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
284 : if is_expected_io_error(&io_error) {
285 : info!("Postgres client disconnected ({io_error})");
286 : Ok(())
287 : } else {
288 : let tenant_id = conn_handler.timeline_handles.tenant_id();
289 : Err(io_error).context(format!(
290 : "Postgres connection error for tenant_id={:?} client at peer_addr={}",
291 : tenant_id, peer_addr
292 : ))
293 : }
294 : }
295 : other => {
296 : let tenant_id = conn_handler.timeline_handles.tenant_id();
297 : other.context(format!(
298 : "Postgres query error for tenant_id={:?} client peer_addr={}",
299 : tenant_id, peer_addr
300 : ))
301 : }
302 : }
303 : }
304 :
305 : struct PageServerHandler {
306 : auth: Option<Arc<SwappableJwtAuth>>,
307 : claims: Option<Claims>,
308 :
309 : /// The context created for the lifetime of the connection
310 : /// services by this PageServerHandler.
311 : /// For each query received over the connection,
312 : /// `process_query` creates a child context from this one.
313 : connection_ctx: RequestContext,
314 :
315 : cancel: CancellationToken,
316 :
317 : timeline_handles: TimelineHandles,
318 :
319 : /// See [`PageServerConf::server_side_batch_timeout`]
320 : server_side_batch_timeout: Option<Duration>,
321 :
322 : server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
323 : }
324 :
325 : struct Carry {
326 : msg: BatchedFeMessage,
327 : started_at: Instant,
328 : }
329 :
330 : struct TimelineHandles {
331 : wrapper: TenantManagerWrapper,
332 : /// Note on size: the typical size of this map is 1. The largest size we expect
333 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
334 : /// or the ratio used when splitting shards (i.e. how many children created from one)
335 : /// parent shard, where a "large" number might be ~8.
336 : handles: timeline::handle::Cache<TenantManagerTypes>,
337 : }
338 :
339 : impl TimelineHandles {
340 0 : fn new(tenant_manager: Arc<TenantManager>) -> Self {
341 0 : Self {
342 0 : wrapper: TenantManagerWrapper {
343 0 : tenant_manager,
344 0 : tenant_id: OnceCell::new(),
345 0 : },
346 0 : handles: Default::default(),
347 0 : }
348 0 : }
349 0 : async fn get(
350 0 : &mut self,
351 0 : tenant_id: TenantId,
352 0 : timeline_id: TimelineId,
353 0 : shard_selector: ShardSelector,
354 0 : ) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
355 0 : if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id {
356 0 : return Err(GetActiveTimelineError::Tenant(
357 0 : GetActiveTenantError::SwitchedTenant,
358 0 : ));
359 0 : }
360 0 : self.handles
361 0 : .get(timeline_id, shard_selector, &self.wrapper)
362 0 : .await
363 0 : .map_err(|e| match e {
364 0 : timeline::handle::GetError::TenantManager(e) => e,
365 : timeline::handle::GetError::TimelineGateClosed => {
366 0 : trace!("timeline gate closed");
367 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
368 : }
369 : timeline::handle::GetError::PerTimelineStateShutDown => {
370 0 : trace!("per-timeline state shut down");
371 0 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
372 : }
373 0 : })
374 0 : }
375 :
376 0 : fn tenant_id(&self) -> Option<TenantId> {
377 0 : self.wrapper.tenant_id.get().copied()
378 0 : }
379 : }
380 :
381 : pub(crate) struct TenantManagerWrapper {
382 : tenant_manager: Arc<TenantManager>,
383 : // We do not support switching tenant_id on a connection at this point.
384 : // We can can add support for this later if needed without changing
385 : // the protocol.
386 : tenant_id: once_cell::sync::OnceCell<TenantId>,
387 : }
388 :
389 : #[derive(Debug)]
390 : pub(crate) struct TenantManagerTypes;
391 :
392 : impl timeline::handle::Types for TenantManagerTypes {
393 : type TenantManagerError = GetActiveTimelineError;
394 : type TenantManager = TenantManagerWrapper;
395 : type Timeline = Arc<Timeline>;
396 : }
397 :
398 : impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
399 0 : fn gate(&self) -> &utils::sync::gate::Gate {
400 0 : &self.gate
401 0 : }
402 :
403 0 : fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId {
404 0 : Timeline::shard_timeline_id(self)
405 0 : }
406 :
407 0 : fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
408 0 : &self.handles
409 0 : }
410 :
411 0 : fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity {
412 0 : Timeline::get_shard_identity(self)
413 0 : }
414 : }
415 :
416 : impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrapper {
417 0 : async fn resolve(
418 0 : &self,
419 0 : timeline_id: TimelineId,
420 0 : shard_selector: ShardSelector,
421 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
422 0 : let tenant_id = self.tenant_id.get().expect("we set this in get()");
423 0 : let timeout = ACTIVE_TENANT_TIMEOUT;
424 0 : let wait_start = Instant::now();
425 0 : let deadline = wait_start + timeout;
426 0 : let tenant_shard = loop {
427 0 : let resolved = self
428 0 : .tenant_manager
429 0 : .resolve_attached_shard(tenant_id, shard_selector);
430 0 : match resolved {
431 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
432 : ShardResolveResult::NotFound => {
433 0 : return Err(GetActiveTimelineError::Tenant(
434 0 : GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)),
435 0 : ));
436 : }
437 0 : ShardResolveResult::InProgress(barrier) => {
438 0 : // We can't authoritatively answer right now: wait for InProgress state
439 0 : // to end, then try again
440 0 : tokio::select! {
441 0 : _ = barrier.wait() => {
442 0 : // The barrier completed: proceed around the loop to try looking up again
443 0 : },
444 0 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
445 0 : return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
446 0 : latest_state: None,
447 0 : wait_time: timeout,
448 0 : }));
449 : }
450 : }
451 : }
452 : };
453 : };
454 :
455 0 : tracing::debug!("Waiting for tenant to enter active state...");
456 0 : tenant_shard
457 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
458 0 : .await
459 0 : .map_err(GetActiveTimelineError::Tenant)?;
460 :
461 0 : let timeline = tenant_shard
462 0 : .get_timeline(timeline_id, true)
463 0 : .map_err(GetActiveTimelineError::Timeline)?;
464 0 : set_tracing_field_shard_id(&timeline);
465 0 : Ok(timeline)
466 0 : }
467 : }
468 :
469 0 : #[derive(thiserror::Error, Debug)]
470 : enum PageStreamError {
471 : /// We encountered an error that should prompt the client to reconnect:
472 : /// in practice this means we drop the connection without sending a response.
473 : #[error("Reconnect required: {0}")]
474 : Reconnect(Cow<'static, str>),
475 :
476 : /// We were instructed to shutdown while processing the query
477 : #[error("Shutting down")]
478 : Shutdown,
479 :
480 : /// Something went wrong reading a page: this likely indicates a pageserver bug
481 : #[error("Read error")]
482 : Read(#[source] PageReconstructError),
483 :
484 : /// Ran out of time waiting for an LSN
485 : #[error("LSN timeout: {0}")]
486 : LsnTimeout(WaitLsnError),
487 :
488 : /// The entity required to serve the request (tenant or timeline) is not found,
489 : /// or is not found in a suitable state to serve a request.
490 : #[error("Not found: {0}")]
491 : NotFound(Cow<'static, str>),
492 :
493 : /// Request asked for something that doesn't make sense, like an invalid LSN
494 : #[error("Bad request: {0}")]
495 : BadRequest(Cow<'static, str>),
496 : }
497 :
498 : impl From<PageReconstructError> for PageStreamError {
499 0 : fn from(value: PageReconstructError) -> Self {
500 0 : match value {
501 0 : PageReconstructError::Cancelled => Self::Shutdown,
502 0 : e => Self::Read(e),
503 : }
504 0 : }
505 : }
506 :
507 : impl From<GetActiveTimelineError> for PageStreamError {
508 0 : fn from(value: GetActiveTimelineError) -> Self {
509 0 : match value {
510 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
511 : | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive(
512 : TenantState::Stopping { .. },
513 : ))
514 0 : | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown,
515 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
516 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
517 : }
518 0 : }
519 : }
520 :
521 : impl From<WaitLsnError> for PageStreamError {
522 0 : fn from(value: WaitLsnError) -> Self {
523 0 : match value {
524 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
525 0 : WaitLsnError::Shutdown => Self::Shutdown,
526 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
527 : }
528 0 : }
529 : }
530 :
531 : impl From<WaitLsnError> for QueryError {
532 0 : fn from(value: WaitLsnError) -> Self {
533 0 : match value {
534 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
535 0 : WaitLsnError::Shutdown => Self::Shutdown,
536 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
537 : }
538 0 : }
539 : }
540 :
541 : enum BatchedFeMessage {
542 : Exists {
543 : span: Span,
544 : req: models::PagestreamExistsRequest,
545 : },
546 : Nblocks {
547 : span: Span,
548 : req: models::PagestreamNblocksRequest,
549 : },
550 : GetPage {
551 : span: Span,
552 : shard: timeline::handle::Handle<TenantManagerTypes>,
553 : effective_request_lsn: Lsn,
554 : pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
555 : },
556 : DbSize {
557 : span: Span,
558 : req: models::PagestreamDbSizeRequest,
559 : },
560 : GetSlruSegment {
561 : span: Span,
562 : req: models::PagestreamGetSlruSegmentRequest,
563 : },
564 : RespondError {
565 : span: Span,
566 : error: PageStreamError,
567 : },
568 : }
569 :
570 : enum BatchOrEof {
571 : /// In the common case, this has one entry.
572 : /// At most, it has two entries: the first is the leftover batch, the second is an error.
573 : Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>),
574 : Eof,
575 : }
576 :
577 : impl PageServerHandler {
578 0 : pub fn new(
579 0 : tenant_manager: Arc<TenantManager>,
580 0 : auth: Option<Arc<SwappableJwtAuth>>,
581 0 : server_side_batch_timeout: Option<Duration>,
582 0 : connection_ctx: RequestContext,
583 0 : cancel: CancellationToken,
584 0 : ) -> Self {
585 0 : PageServerHandler {
586 0 : auth,
587 0 : claims: None,
588 0 : connection_ctx,
589 0 : timeline_handles: TimelineHandles::new(tenant_manager),
590 0 : cancel,
591 0 : server_side_batch_timeout,
592 0 : server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
593 0 : }
594 0 : }
595 :
596 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
597 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
598 : /// cancellation if there aren't any timelines in the cache.
599 : ///
600 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
601 : /// timeline cancellation token.
602 0 : async fn flush_cancellable<IO>(
603 0 : &self,
604 0 : pgb: &mut PostgresBackend<IO>,
605 0 : cancel: &CancellationToken,
606 0 : ) -> Result<(), QueryError>
607 0 : where
608 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
609 0 : {
610 0 : tokio::select!(
611 0 : flush_r = pgb.flush() => {
612 0 : Ok(flush_r?)
613 : },
614 0 : _ = cancel.cancelled() => {
615 0 : Err(QueryError::Shutdown)
616 : }
617 : )
618 0 : }
619 :
620 0 : #[instrument(skip_all, level = tracing::Level::TRACE)]
621 : async fn read_batch_from_connection<IO>(
622 : &mut self,
623 : pgb: &mut PostgresBackend<IO>,
624 : tenant_id: &TenantId,
625 : timeline_id: &TimelineId,
626 : maybe_carry: &mut Option<Carry>,
627 : ctx: &RequestContext,
628 : ) -> Result<BatchOrEof, QueryError>
629 : where
630 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
631 : {
632 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
633 :
634 : let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
635 :
636 : loop {
637 : // Create a future that will become ready when we need to stop batching.
638 : use futures::future::Either;
639 : let batching_deadline = match (
640 : &*maybe_carry as &Option<Carry>,
641 : &mut batching_deadline_storage,
642 : ) {
643 : (None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
644 : (None, Some(_)) => unreachable!(),
645 : (Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
646 : (Some(carry), None) => {
647 : match self.server_side_batch_timeout {
648 : None => {
649 : return Ok(BatchOrEof::Batch(smallvec::smallvec![
650 : maybe_carry
651 : .take()
652 : .expect("we already checked it's Some")
653 : .msg
654 : ]))
655 : }
656 : Some(batch_timeout) => {
657 : // Take into consideration the time the carry spent waiting.
658 : let batch_timeout =
659 : batch_timeout.saturating_sub(carry.started_at.elapsed());
660 : if batch_timeout.is_zero() {
661 : // the timer doesn't support restarting with zero duration
662 : return Ok(BatchOrEof::Batch(smallvec::smallvec![
663 : maybe_carry
664 : .take()
665 : .expect("we already checked it's Some")
666 : .msg
667 : ]));
668 : } else {
669 : self.server_side_batch_timer.restart(batch_timeout);
670 : batching_deadline_storage = Some(&mut self.server_side_batch_timer);
671 : Either::Right(
672 : batching_deadline_storage.as_mut().expect("we just set it"),
673 : )
674 : }
675 : }
676 : }
677 : }
678 : };
679 : let msg = tokio::select! {
680 : biased;
681 : _ = self.cancel.cancelled() => {
682 : return Err(QueryError::Shutdown)
683 : }
684 : _ = batching_deadline => {
685 : return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg]));
686 : }
687 : msg = pgb.read_message() => { msg }
688 : };
689 :
690 : let msg_start = Instant::now();
691 :
692 : // Rest of this loop body is trying to batch `msg` into `batch`.
693 : // If we can add msg to batch we continue into the next loop iteration.
694 : // If we can't add msg to batch batch, we carry `msg` over to the next call.
695 :
696 : let copy_data_bytes = match msg? {
697 : Some(FeMessage::CopyData(bytes)) => bytes,
698 : Some(FeMessage::Terminate) => {
699 : return Ok(BatchOrEof::Eof);
700 : }
701 : Some(m) => {
702 : return Err(QueryError::Other(anyhow::anyhow!(
703 : "unexpected message: {m:?} during COPY"
704 : )));
705 : }
706 : None => {
707 : return Ok(BatchOrEof::Eof);
708 : } // client disconnected
709 : };
710 : trace!("query: {copy_data_bytes:?}");
711 :
712 : fail::fail_point!("ps::handle-pagerequest-message");
713 :
714 : // parse request
715 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
716 :
717 : let this_msg = match neon_fe_msg {
718 : PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists {
719 : span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn),
720 : req,
721 : },
722 : PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks {
723 : span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn),
724 : req,
725 : },
726 : PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize {
727 : span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn),
728 : req,
729 : },
730 : PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment {
731 : span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn),
732 : req,
733 : },
734 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
735 : request_lsn,
736 : not_modified_since,
737 : rel,
738 : blkno,
739 : }) => {
740 : // shard_id is filled in by the handler
741 : let span = tracing::info_span!(
742 : "handle_get_page_at_lsn_request_batched",
743 : %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn,
744 : batch_size = tracing::field::Empty, batch_id = tracing::field::Empty
745 : );
746 :
747 : macro_rules! current_batch_and_error {
748 : ($error:expr) => {{
749 : let error = BatchedFeMessage::RespondError {
750 : span,
751 : error: $error,
752 : };
753 : let batch_and_error = match maybe_carry.take() {
754 : Some(carry) => smallvec::smallvec![carry.msg, error],
755 : None => smallvec::smallvec![error],
756 : };
757 : Ok(BatchOrEof::Batch(batch_and_error))
758 : }};
759 : }
760 :
761 : let key = rel_block_to_key(rel, blkno);
762 : let shard = match self
763 : .timeline_handles
764 : .get(*tenant_id, *timeline_id, ShardSelector::Page(key))
765 : .instrument(span.clone())
766 : .await
767 : {
768 : Ok(tl) => tl,
769 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
770 : // We already know this tenant exists in general, because we resolved it at
771 : // start of connection. Getting a NotFound here indicates that the shard containing
772 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
773 : // mapping is out of date.
774 : //
775 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
776 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
777 : // and talk to a different pageserver.
778 : return current_batch_and_error!(PageStreamError::Reconnect(
779 : "getpage@lsn request routed to wrong shard".into()
780 : ));
781 : }
782 : Err(e) => {
783 : return current_batch_and_error!(e.into());
784 : }
785 : };
786 : let effective_request_lsn = match Self::wait_or_get_last_lsn(
787 : &shard,
788 : request_lsn,
789 : not_modified_since,
790 : &shard.get_latest_gc_cutoff_lsn(),
791 : ctx,
792 : )
793 : // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
794 : .await
795 : {
796 : Ok(lsn) => lsn,
797 : Err(e) => {
798 : return current_batch_and_error!(e);
799 : }
800 : };
801 : BatchedFeMessage::GetPage {
802 : span,
803 : shard,
804 : effective_request_lsn,
805 : pages: smallvec::smallvec![(rel, blkno)],
806 : }
807 : }
808 : };
809 :
810 : //
811 : // batch
812 : //
813 : match (maybe_carry.as_mut(), this_msg) {
814 : (None, this_msg) => {
815 : *maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
816 : }
817 : (
818 : Some(Carry { msg: BatchedFeMessage::GetPage {
819 : span: _,
820 : shard: accum_shard,
821 : pages: ref mut accum_pages,
822 : effective_request_lsn: accum_lsn,
823 : }, started_at: _}),
824 : BatchedFeMessage::GetPage {
825 : span: _,
826 : shard: this_shard,
827 : pages: this_pages,
828 : effective_request_lsn: this_lsn,
829 : },
830 0 : ) if async {
831 0 : assert_eq!(this_pages.len(), 1);
832 0 : if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
833 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
834 0 : assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
835 0 : return false;
836 0 : }
837 0 : if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
838 0 : != (this_shard.tenant_shard_id, this_shard.timeline_id)
839 : {
840 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
841 : // TODO: we _could_ batch & execute each shard seperately (and in parallel).
842 : // But the current logic for keeping responses in order does not support that.
843 0 : return false;
844 0 : }
845 0 : // the vectored get currently only supports a single LSN, so, bounce as soon
846 0 : // as the effective request_lsn changes
847 0 : if *accum_lsn != this_lsn {
848 0 : trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
849 0 : return false;
850 0 : }
851 0 : true
852 0 : }
853 : .await =>
854 : {
855 : // ok to batch
856 : accum_pages.extend(this_pages);
857 : }
858 : (Some(carry), this_msg) => {
859 : // by default, don't continue batching
860 : let carry = std::mem::replace(carry,
861 : Carry {
862 : msg: this_msg,
863 : started_at: msg_start,
864 : });
865 : return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
866 : }
867 : }
868 : }
869 : }
870 :
871 : /// Pagestream sub-protocol handler.
872 : ///
873 : /// It is a simple request-response protocol inside a COPYBOTH session.
874 : ///
875 : /// # Coding Discipline
876 : ///
877 : /// Coding discipline within this function: all interaction with the `pgb` connection
878 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
879 : /// This is so that we can shutdown page_service quickly.
880 0 : #[instrument(skip_all)]
881 : async fn handle_pagerequests<IO>(
882 : &mut self,
883 : pgb: &mut PostgresBackend<IO>,
884 : tenant_id: TenantId,
885 : timeline_id: TimelineId,
886 : _protocol_version: PagestreamProtocolVersion,
887 : ctx: RequestContext,
888 : ) -> Result<(), QueryError>
889 : where
890 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
891 : {
892 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
893 :
894 : // switch client to COPYBOTH
895 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
896 : tokio::select! {
897 : biased;
898 : _ = self.cancel.cancelled() => {
899 : return Err(QueryError::Shutdown)
900 : }
901 : res = pgb.flush() => {
902 : res?;
903 : }
904 : }
905 :
906 : let mut carry: Option<Carry> = None;
907 :
908 : loop {
909 : let maybe_batched = self
910 : .read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx)
911 : .await?;
912 : let batched = match maybe_batched {
913 : BatchOrEof::Batch(b) => b,
914 : BatchOrEof::Eof => {
915 : break;
916 : }
917 : };
918 :
919 : for batch in batched {
920 : // invoke handler function
921 : let (handler_results, span): (
922 : Vec<Result<PagestreamBeMessage, PageStreamError>>,
923 : _,
924 : ) = match batch {
925 : BatchedFeMessage::Exists { span, req } => {
926 : fail::fail_point!("ps::handle-pagerequest-message::exists");
927 : (
928 : vec![
929 : self.handle_get_rel_exists_request(
930 : tenant_id,
931 : timeline_id,
932 : &req,
933 : &ctx,
934 : )
935 : .instrument(span.clone())
936 : .await,
937 : ],
938 : span,
939 : )
940 : }
941 : BatchedFeMessage::Nblocks { span, req } => {
942 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
943 : (
944 : vec![
945 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
946 : .instrument(span.clone())
947 : .await,
948 : ],
949 : span,
950 : )
951 : }
952 : BatchedFeMessage::GetPage {
953 : span,
954 : shard,
955 : effective_request_lsn,
956 : pages,
957 : } => {
958 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
959 : (
960 : {
961 : let npages = pages.len();
962 : trace!(npages, "handling getpage request");
963 : let res = self
964 : .handle_get_page_at_lsn_request_batched(
965 : &shard,
966 : effective_request_lsn,
967 : pages,
968 : &ctx,
969 : )
970 : .instrument(span.clone())
971 : .await;
972 : assert_eq!(res.len(), npages);
973 : res
974 : },
975 : span,
976 : )
977 : }
978 : BatchedFeMessage::DbSize { span, req } => {
979 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
980 : (
981 : vec![
982 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
983 : .instrument(span.clone())
984 : .await,
985 : ],
986 : span,
987 : )
988 : }
989 : BatchedFeMessage::GetSlruSegment { span, req } => {
990 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
991 : (
992 : vec![
993 : self.handle_get_slru_segment_request(
994 : tenant_id,
995 : timeline_id,
996 : &req,
997 : &ctx,
998 : )
999 : .instrument(span.clone())
1000 : .await,
1001 : ],
1002 : span,
1003 : )
1004 : }
1005 : BatchedFeMessage::RespondError { span, error } => {
1006 : // We've already decided to respond with an error, so we don't need to
1007 : // call the handler.
1008 : (vec![Err(error)], span)
1009 : }
1010 : };
1011 :
1012 : // Map handler result to protocol behavior.
1013 : // Some handler errors cause exit from pagestream protocol.
1014 : // Other handler errors are sent back as an error message and we stay in pagestream protocol.
1015 : for handler_result in handler_results {
1016 : let response_msg = match handler_result {
1017 : Err(e) => match &e {
1018 : PageStreamError::Shutdown => {
1019 : // If we fail to fulfil a request during shutdown, which may be _because_ of
1020 : // shutdown, then do not send the error to the client. Instead just drop the
1021 : // connection.
1022 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
1023 : return Err(QueryError::Shutdown);
1024 : }
1025 : PageStreamError::Reconnect(reason) => {
1026 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
1027 : return Err(QueryError::Reconnect);
1028 : }
1029 : PageStreamError::Read(_)
1030 : | PageStreamError::LsnTimeout(_)
1031 : | PageStreamError::NotFound(_)
1032 : | PageStreamError::BadRequest(_) => {
1033 : // print the all details to the log with {:#}, but for the client the
1034 : // error message is enough. Do not log if shutting down, as the anyhow::Error
1035 : // here includes cancellation which is not an error.
1036 : let full = utils::error::report_compact_sources(&e);
1037 0 : span.in_scope(|| {
1038 0 : error!("error reading relation or page version: {full:#}")
1039 0 : });
1040 : PagestreamBeMessage::Error(PagestreamErrorResponse {
1041 : message: e.to_string(),
1042 : })
1043 : }
1044 : },
1045 : Ok(response_msg) => response_msg,
1046 : };
1047 :
1048 : // marshal & transmit response message
1049 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
1050 : }
1051 : tokio::select! {
1052 : biased;
1053 : _ = self.cancel.cancelled() => {
1054 : // We were requested to shut down.
1055 : info!("shutdown request received in page handler");
1056 : return Err(QueryError::Shutdown)
1057 : }
1058 : res = pgb.flush() => {
1059 : res?;
1060 : }
1061 : }
1062 : }
1063 : }
1064 : Ok(())
1065 : }
1066 :
1067 : /// Helper function to handle the LSN from client request.
1068 : ///
1069 : /// Each GetPage (and Exists and Nblocks) request includes information about
1070 : /// which version of the page is being requested. The primary compute node
1071 : /// will always request the latest page version, by setting 'request_lsn' to
1072 : /// the last inserted or flushed WAL position, while a standby will request
1073 : /// a version at the LSN that it's currently caught up to.
1074 : ///
1075 : /// In either case, if the page server hasn't received the WAL up to the
1076 : /// requested LSN yet, we will wait for it to arrive. The return value is
1077 : /// the LSN that should be used to look up the page versions.
1078 : ///
1079 : /// In addition to the request LSN, each request carries another LSN,
1080 : /// 'not_modified_since', which is a hint to the pageserver that the client
1081 : /// knows that the page has not been modified between 'not_modified_since'
1082 : /// and the request LSN. This allows skipping the wait, as long as the WAL
1083 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
1084 : /// information about when the page was modified, it will use
1085 : /// not_modified_since == lsn. If the client lies and sends a too low
1086 : /// not_modified_hint such that there are in fact later page versions, the
1087 : /// behavior is undefined: the pageserver may return any of the page versions
1088 : /// or an error.
1089 0 : async fn wait_or_get_last_lsn(
1090 0 : timeline: &Timeline,
1091 0 : request_lsn: Lsn,
1092 0 : not_modified_since: Lsn,
1093 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1094 0 : ctx: &RequestContext,
1095 0 : ) -> Result<Lsn, PageStreamError> {
1096 0 : let last_record_lsn = timeline.get_last_record_lsn();
1097 0 :
1098 0 : // Sanity check the request
1099 0 : if request_lsn < not_modified_since {
1100 0 : return Err(PageStreamError::BadRequest(
1101 0 : format!(
1102 0 : "invalid request with request LSN {} and not_modified_since {}",
1103 0 : request_lsn, not_modified_since,
1104 0 : )
1105 0 : .into(),
1106 0 : ));
1107 0 : }
1108 0 :
1109 0 : if request_lsn < **latest_gc_cutoff_lsn {
1110 0 : let gc_info = &timeline.gc_info.read().unwrap();
1111 0 : if !gc_info.leases.contains_key(&request_lsn) {
1112 : // The requested LSN is below gc cutoff and is not guarded by a lease.
1113 :
1114 : // Check explicitly for INVALID just to get a less scary error message if the
1115 : // request is obviously bogus
1116 0 : return Err(if request_lsn == Lsn::INVALID {
1117 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
1118 : } else {
1119 0 : PageStreamError::BadRequest(format!(
1120 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
1121 0 : request_lsn, **latest_gc_cutoff_lsn
1122 0 : ).into())
1123 : });
1124 0 : }
1125 0 : }
1126 :
1127 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
1128 0 : if not_modified_since > last_record_lsn {
1129 0 : timeline
1130 0 : .wait_lsn(
1131 0 : not_modified_since,
1132 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1133 0 : ctx,
1134 0 : )
1135 0 : .await?;
1136 : // Since we waited for 'not_modified_since' to arrive, that is now the last
1137 : // record LSN. (Or close enough for our purposes; the last-record LSN can
1138 : // advance immediately after we return anyway)
1139 0 : Ok(not_modified_since)
1140 : } else {
1141 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
1142 : // here instead. That would give the same result, since we know that there
1143 : // haven't been any modifications since 'not_modified_since'. Using an older
1144 : // LSN might be faster, because that could allow skipping recent layers when
1145 : // finding the page. However, we have historically used 'last_record_lsn', so
1146 : // stick to that for now.
1147 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
1148 : }
1149 0 : }
1150 :
1151 : /// Handles the lsn lease request.
1152 : /// If a lease cannot be obtained, the client will receive NULL.
1153 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
1154 : async fn handle_make_lsn_lease<IO>(
1155 : &mut self,
1156 : pgb: &mut PostgresBackend<IO>,
1157 : tenant_shard_id: TenantShardId,
1158 : timeline_id: TimelineId,
1159 : lsn: Lsn,
1160 : ctx: &RequestContext,
1161 : ) -> Result<(), QueryError>
1162 : where
1163 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1164 : {
1165 : let timeline = self
1166 : .timeline_handles
1167 : .get(
1168 : tenant_shard_id.tenant_id,
1169 : timeline_id,
1170 : ShardSelector::Known(tenant_shard_id.to_index()),
1171 : )
1172 : .await?;
1173 : set_tracing_field_shard_id(&timeline);
1174 :
1175 : let lease = timeline
1176 : .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
1177 0 : .inspect_err(|e| {
1178 0 : warn!("{e}");
1179 0 : })
1180 : .ok();
1181 0 : let valid_until_str = lease.map(|l| {
1182 0 : l.valid_until
1183 0 : .duration_since(SystemTime::UNIX_EPOCH)
1184 0 : .expect("valid_until is earlier than UNIX_EPOCH")
1185 0 : .as_millis()
1186 0 : .to_string()
1187 0 : });
1188 0 : let bytes = valid_until_str.as_ref().map(|x| x.as_bytes());
1189 :
1190 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
1191 : b"valid_until",
1192 : )]))?
1193 : .write_message_noflush(&BeMessage::DataRow(&[bytes]))?;
1194 :
1195 : Ok(())
1196 : }
1197 :
1198 0 : #[instrument(skip_all, fields(shard_id))]
1199 : async fn handle_get_rel_exists_request(
1200 : &mut self,
1201 : tenant_id: TenantId,
1202 : timeline_id: TimelineId,
1203 : req: &PagestreamExistsRequest,
1204 : ctx: &RequestContext,
1205 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1206 : let timeline = self
1207 : .timeline_handles
1208 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1209 : .await?;
1210 : let _timer = timeline
1211 : .query_metrics
1212 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
1213 :
1214 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1215 : let lsn = Self::wait_or_get_last_lsn(
1216 : &timeline,
1217 : req.request_lsn,
1218 : req.not_modified_since,
1219 : &latest_gc_cutoff_lsn,
1220 : ctx,
1221 : )
1222 : .await?;
1223 :
1224 : let exists = timeline
1225 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
1226 : .await?;
1227 :
1228 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
1229 : exists,
1230 : }))
1231 : }
1232 :
1233 0 : #[instrument(skip_all, fields(shard_id))]
1234 : async fn handle_get_nblocks_request(
1235 : &mut self,
1236 : tenant_id: TenantId,
1237 : timeline_id: TimelineId,
1238 : req: &PagestreamNblocksRequest,
1239 : ctx: &RequestContext,
1240 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1241 : let timeline = self
1242 : .timeline_handles
1243 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1244 : .await?;
1245 :
1246 : let _timer = timeline
1247 : .query_metrics
1248 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
1249 :
1250 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1251 : let lsn = Self::wait_or_get_last_lsn(
1252 : &timeline,
1253 : req.request_lsn,
1254 : req.not_modified_since,
1255 : &latest_gc_cutoff_lsn,
1256 : ctx,
1257 : )
1258 : .await?;
1259 :
1260 : let n_blocks = timeline
1261 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1262 : .await?;
1263 :
1264 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1265 : n_blocks,
1266 : }))
1267 : }
1268 :
1269 0 : #[instrument(skip_all, fields(shard_id))]
1270 : async fn handle_db_size_request(
1271 : &mut self,
1272 : tenant_id: TenantId,
1273 : timeline_id: TimelineId,
1274 : req: &PagestreamDbSizeRequest,
1275 : ctx: &RequestContext,
1276 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1277 : let timeline = self
1278 : .timeline_handles
1279 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1280 : .await?;
1281 :
1282 : let _timer = timeline
1283 : .query_metrics
1284 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
1285 :
1286 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1287 : let lsn = Self::wait_or_get_last_lsn(
1288 : &timeline,
1289 : req.request_lsn,
1290 : req.not_modified_since,
1291 : &latest_gc_cutoff_lsn,
1292 : ctx,
1293 : )
1294 : .await?;
1295 :
1296 : let total_blocks = timeline
1297 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1298 : .await?;
1299 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1300 :
1301 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1302 : db_size,
1303 : }))
1304 : }
1305 :
1306 0 : #[instrument(skip_all)]
1307 : async fn handle_get_page_at_lsn_request_batched(
1308 : &mut self,
1309 : timeline: &Timeline,
1310 : effective_lsn: Lsn,
1311 : pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
1312 : ctx: &RequestContext,
1313 : ) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
1314 : debug_assert_current_span_has_tenant_and_timeline_id();
1315 : let _timer = timeline.query_metrics.start_timer_many(
1316 : metrics::SmgrQueryType::GetPageAtLsn,
1317 : pages.len(),
1318 : ctx,
1319 : );
1320 :
1321 : let pages = timeline
1322 : .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
1323 : .await;
1324 :
1325 0 : Vec::from_iter(pages.into_iter().map(|page| {
1326 0 : page.map(|page| {
1327 0 : PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
1328 0 : })
1329 0 : .map_err(PageStreamError::from)
1330 0 : }))
1331 : }
1332 :
1333 0 : #[instrument(skip_all, fields(shard_id))]
1334 : async fn handle_get_slru_segment_request(
1335 : &mut self,
1336 : tenant_id: TenantId,
1337 : timeline_id: TimelineId,
1338 : req: &PagestreamGetSlruSegmentRequest,
1339 : ctx: &RequestContext,
1340 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1341 : let timeline = self
1342 : .timeline_handles
1343 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1344 : .await?;
1345 :
1346 : let _timer = timeline
1347 : .query_metrics
1348 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1349 :
1350 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1351 : let lsn = Self::wait_or_get_last_lsn(
1352 : &timeline,
1353 : req.request_lsn,
1354 : req.not_modified_since,
1355 : &latest_gc_cutoff_lsn,
1356 : ctx,
1357 : )
1358 : .await?;
1359 :
1360 : let kind = SlruKind::from_repr(req.kind)
1361 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1362 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1363 :
1364 : Ok(PagestreamBeMessage::GetSlruSegment(
1365 : PagestreamGetSlruSegmentResponse { segment },
1366 : ))
1367 : }
1368 :
1369 : /// Note on "fullbackup":
1370 : /// Full basebackups should only be used for debugging purposes.
1371 : /// Originally, it was introduced to enable breaking storage format changes,
1372 : /// but that is not applicable anymore.
1373 : ///
1374 : /// # Coding Discipline
1375 : ///
1376 : /// Coding discipline within this function: all interaction with the `pgb` connection
1377 : /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
1378 : /// This is so that we can shutdown page_service quickly.
1379 : ///
1380 : /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive
1381 : /// to connection cancellation.
1382 : #[allow(clippy::too_many_arguments)]
1383 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1384 : async fn handle_basebackup_request<IO>(
1385 : &mut self,
1386 : pgb: &mut PostgresBackend<IO>,
1387 : tenant_id: TenantId,
1388 : timeline_id: TimelineId,
1389 : lsn: Option<Lsn>,
1390 : prev_lsn: Option<Lsn>,
1391 : full_backup: bool,
1392 : gzip: bool,
1393 : replica: bool,
1394 : ctx: &RequestContext,
1395 : ) -> Result<(), QueryError>
1396 : where
1397 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1398 : {
1399 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1400 0 : match err {
1401 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1402 0 : BasebackupError::Server(e) => QueryError::Other(e),
1403 : }
1404 0 : }
1405 :
1406 : let started = std::time::Instant::now();
1407 :
1408 : let timeline = self
1409 : .timeline_handles
1410 : .get(tenant_id, timeline_id, ShardSelector::Zero)
1411 : .await?;
1412 :
1413 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1414 : if let Some(lsn) = lsn {
1415 : // Backup was requested at a particular LSN. Wait for it to arrive.
1416 : info!("waiting for {}", lsn);
1417 : timeline
1418 : .wait_lsn(
1419 : lsn,
1420 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1421 : ctx,
1422 : )
1423 : .await?;
1424 : timeline
1425 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1426 : .context("invalid basebackup lsn")?;
1427 : }
1428 :
1429 : let lsn_awaited_after = started.elapsed();
1430 :
1431 : // switch client to COPYOUT
1432 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1433 : .map_err(QueryError::Disconnected)?;
1434 : self.flush_cancellable(pgb, &self.cancel).await?;
1435 :
1436 : // Send a tarball of the latest layer on the timeline. Compress if not
1437 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1438 : if full_backup {
1439 : let mut writer = pgb.copyout_writer();
1440 : basebackup::send_basebackup_tarball(
1441 : &mut writer,
1442 : &timeline,
1443 : lsn,
1444 : prev_lsn,
1445 : full_backup,
1446 : replica,
1447 : ctx,
1448 : )
1449 : .await
1450 : .map_err(map_basebackup_error)?;
1451 : } else {
1452 : let mut writer = BufWriter::new(pgb.copyout_writer());
1453 : if gzip {
1454 : let mut encoder = GzipEncoder::with_quality(
1455 : &mut writer,
1456 : // NOTE using fast compression because it's on the critical path
1457 : // for compute startup. For an empty database, we get
1458 : // <100KB with this method. The Level::Best compression method
1459 : // gives us <20KB, but maybe we should add basebackup caching
1460 : // on compute shutdown first.
1461 : async_compression::Level::Fastest,
1462 : );
1463 : basebackup::send_basebackup_tarball(
1464 : &mut encoder,
1465 : &timeline,
1466 : lsn,
1467 : prev_lsn,
1468 : full_backup,
1469 : replica,
1470 : ctx,
1471 : )
1472 : .await
1473 : .map_err(map_basebackup_error)?;
1474 : // shutdown the encoder to ensure the gzip footer is written
1475 : encoder
1476 : .shutdown()
1477 : .await
1478 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1479 : } else {
1480 : basebackup::send_basebackup_tarball(
1481 : &mut writer,
1482 : &timeline,
1483 : lsn,
1484 : prev_lsn,
1485 : full_backup,
1486 : replica,
1487 : ctx,
1488 : )
1489 : .await
1490 : .map_err(map_basebackup_error)?;
1491 : }
1492 : writer
1493 : .flush()
1494 : .await
1495 0 : .map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
1496 : }
1497 :
1498 : pgb.write_message_noflush(&BeMessage::CopyDone)
1499 : .map_err(QueryError::Disconnected)?;
1500 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1501 :
1502 : let basebackup_after = started
1503 : .elapsed()
1504 : .checked_sub(lsn_awaited_after)
1505 : .unwrap_or(Duration::ZERO);
1506 :
1507 : info!(
1508 : lsn_await_millis = lsn_awaited_after.as_millis(),
1509 : basebackup_millis = basebackup_after.as_millis(),
1510 : "basebackup complete"
1511 : );
1512 :
1513 : Ok(())
1514 : }
1515 :
1516 : // when accessing management api supply None as an argument
1517 : // when using to authorize tenant pass corresponding tenant id
1518 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1519 0 : if self.auth.is_none() {
1520 : // auth is set to Trust, nothing to check so just return ok
1521 0 : return Ok(());
1522 0 : }
1523 0 : // auth is some, just checked above, when auth is some
1524 0 : // then claims are always present because of checks during connection init
1525 0 : // so this expect won't trigger
1526 0 : let claims = self
1527 0 : .claims
1528 0 : .as_ref()
1529 0 : .expect("claims presence already checked");
1530 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1531 0 : }
1532 : }
1533 :
1534 : /// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
1535 : #[derive(Debug, Clone, Eq, PartialEq)]
1536 : struct BaseBackupCmd {
1537 : tenant_id: TenantId,
1538 : timeline_id: TimelineId,
1539 : lsn: Option<Lsn>,
1540 : gzip: bool,
1541 : replica: bool,
1542 : }
1543 :
1544 : /// `fullbackup tenant timeline [lsn] [prev_lsn]`
1545 : #[derive(Debug, Clone, Eq, PartialEq)]
1546 : struct FullBackupCmd {
1547 : tenant_id: TenantId,
1548 : timeline_id: TimelineId,
1549 : lsn: Option<Lsn>,
1550 : prev_lsn: Option<Lsn>,
1551 : }
1552 :
1553 : /// `pagestream_v2 tenant timeline`
1554 : #[derive(Debug, Clone, Eq, PartialEq)]
1555 : struct PageStreamCmd {
1556 : tenant_id: TenantId,
1557 : timeline_id: TimelineId,
1558 : }
1559 :
1560 : /// `lease lsn tenant timeline lsn`
1561 : #[derive(Debug, Clone, Eq, PartialEq)]
1562 : struct LeaseLsnCmd {
1563 : tenant_shard_id: TenantShardId,
1564 : timeline_id: TimelineId,
1565 : lsn: Lsn,
1566 : }
1567 :
1568 : #[derive(Debug, Clone, Eq, PartialEq)]
1569 : enum PageServiceCmd {
1570 : Set,
1571 : PageStream(PageStreamCmd),
1572 : BaseBackup(BaseBackupCmd),
1573 : FullBackup(FullBackupCmd),
1574 : LeaseLsn(LeaseLsnCmd),
1575 : }
1576 :
1577 : impl PageStreamCmd {
1578 6 : fn parse(query: &str) -> anyhow::Result<Self> {
1579 6 : let parameters = query.split_whitespace().collect_vec();
1580 6 : if parameters.len() != 2 {
1581 2 : bail!(
1582 2 : "invalid number of parameters for pagestream command: {}",
1583 2 : query
1584 2 : );
1585 4 : }
1586 4 : let tenant_id = TenantId::from_str(parameters[0])
1587 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1588 2 : let timeline_id = TimelineId::from_str(parameters[1])
1589 2 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1590 2 : Ok(Self {
1591 2 : tenant_id,
1592 2 : timeline_id,
1593 2 : })
1594 6 : }
1595 : }
1596 :
1597 : impl FullBackupCmd {
1598 4 : fn parse(query: &str) -> anyhow::Result<Self> {
1599 4 : let parameters = query.split_whitespace().collect_vec();
1600 4 : if parameters.len() < 2 || parameters.len() > 4 {
1601 0 : bail!(
1602 0 : "invalid number of parameters for basebackup command: {}",
1603 0 : query
1604 0 : );
1605 4 : }
1606 4 : let tenant_id = TenantId::from_str(parameters[0])
1607 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1608 4 : let timeline_id = TimelineId::from_str(parameters[1])
1609 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1610 : // The caller is responsible for providing correct lsn and prev_lsn.
1611 4 : let lsn = if let Some(lsn_str) = parameters.get(2) {
1612 : Some(
1613 2 : Lsn::from_str(lsn_str)
1614 2 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1615 : )
1616 : } else {
1617 2 : None
1618 : };
1619 4 : let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
1620 : Some(
1621 2 : Lsn::from_str(prev_lsn_str)
1622 2 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1623 : )
1624 : } else {
1625 2 : None
1626 : };
1627 4 : Ok(Self {
1628 4 : tenant_id,
1629 4 : timeline_id,
1630 4 : lsn,
1631 4 : prev_lsn,
1632 4 : })
1633 4 : }
1634 : }
1635 :
1636 : impl BaseBackupCmd {
1637 18 : fn parse(query: &str) -> anyhow::Result<Self> {
1638 18 : let parameters = query.split_whitespace().collect_vec();
1639 18 : if parameters.len() < 2 {
1640 0 : bail!(
1641 0 : "invalid number of parameters for basebackup command: {}",
1642 0 : query
1643 0 : );
1644 18 : }
1645 18 : let tenant_id = TenantId::from_str(parameters[0])
1646 18 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1647 18 : let timeline_id = TimelineId::from_str(parameters[1])
1648 18 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1649 : let lsn;
1650 : let flags_parse_from;
1651 18 : if let Some(maybe_lsn) = parameters.get(2) {
1652 16 : if *maybe_lsn == "latest" {
1653 2 : lsn = None;
1654 2 : flags_parse_from = 3;
1655 14 : } else if maybe_lsn.starts_with("--") {
1656 10 : lsn = None;
1657 10 : flags_parse_from = 2;
1658 10 : } else {
1659 : lsn = Some(
1660 4 : Lsn::from_str(maybe_lsn)
1661 4 : .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
1662 : );
1663 4 : flags_parse_from = 3;
1664 : }
1665 2 : } else {
1666 2 : lsn = None;
1667 2 : flags_parse_from = 2;
1668 2 : }
1669 :
1670 18 : let mut gzip = false;
1671 18 : let mut replica = false;
1672 :
1673 22 : for ¶m in ¶meters[flags_parse_from..] {
1674 22 : match param {
1675 22 : "--gzip" => {
1676 14 : if gzip {
1677 2 : bail!("duplicate parameter for basebackup command: {param}")
1678 12 : }
1679 12 : gzip = true
1680 : }
1681 8 : "--replica" => {
1682 4 : if replica {
1683 0 : bail!("duplicate parameter for basebackup command: {param}")
1684 4 : }
1685 4 : replica = true
1686 : }
1687 4 : _ => bail!("invalid parameter for basebackup command: {param}"),
1688 : }
1689 : }
1690 12 : Ok(Self {
1691 12 : tenant_id,
1692 12 : timeline_id,
1693 12 : lsn,
1694 12 : gzip,
1695 12 : replica,
1696 12 : })
1697 18 : }
1698 : }
1699 :
1700 : impl LeaseLsnCmd {
1701 4 : fn parse(query: &str) -> anyhow::Result<Self> {
1702 4 : let parameters = query.split_whitespace().collect_vec();
1703 4 : if parameters.len() != 3 {
1704 0 : bail!(
1705 0 : "invalid number of parameters for lease lsn command: {}",
1706 0 : query
1707 0 : );
1708 4 : }
1709 4 : let tenant_shard_id = TenantShardId::from_str(parameters[0])
1710 4 : .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
1711 4 : let timeline_id = TimelineId::from_str(parameters[1])
1712 4 : .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
1713 4 : let lsn = Lsn::from_str(parameters[2])
1714 4 : .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
1715 4 : Ok(Self {
1716 4 : tenant_shard_id,
1717 4 : timeline_id,
1718 4 : lsn,
1719 4 : })
1720 4 : }
1721 : }
1722 :
1723 : impl PageServiceCmd {
1724 42 : fn parse(query: &str) -> anyhow::Result<Self> {
1725 42 : let query = query.trim();
1726 42 : let Some((cmd, other)) = query.split_once(' ') else {
1727 4 : bail!("cannot parse query: {query}")
1728 : };
1729 38 : match cmd.to_ascii_lowercase().as_str() {
1730 38 : "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)),
1731 32 : "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
1732 14 : "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
1733 10 : "lease" => {
1734 6 : let Some((cmd2, other)) = other.split_once(' ') else {
1735 0 : bail!("invalid lease command: {cmd}");
1736 : };
1737 6 : let cmd2 = cmd2.to_ascii_lowercase();
1738 6 : if cmd2 == "lsn" {
1739 4 : Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
1740 : } else {
1741 2 : bail!("invalid lease command: {cmd}");
1742 : }
1743 : }
1744 4 : "set" => Ok(Self::Set),
1745 0 : _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
1746 : }
1747 42 : }
1748 : }
1749 :
1750 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1751 : where
1752 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1753 : {
1754 0 : fn check_auth_jwt(
1755 0 : &mut self,
1756 0 : _pgb: &mut PostgresBackend<IO>,
1757 0 : jwt_response: &[u8],
1758 0 : ) -> Result<(), QueryError> {
1759 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1760 : // which requires auth to be present
1761 0 : let data = self
1762 0 : .auth
1763 0 : .as_ref()
1764 0 : .unwrap()
1765 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1766 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1767 :
1768 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1769 0 : return Err(QueryError::Unauthorized(
1770 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1771 0 : ));
1772 0 : }
1773 0 :
1774 0 : debug!(
1775 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1776 : data.claims.scope, data.claims.tenant_id,
1777 : );
1778 :
1779 0 : self.claims = Some(data.claims);
1780 0 : Ok(())
1781 0 : }
1782 :
1783 0 : fn startup(
1784 0 : &mut self,
1785 0 : _pgb: &mut PostgresBackend<IO>,
1786 0 : _sm: &FeStartupPacket,
1787 0 : ) -> Result<(), QueryError> {
1788 0 : fail::fail_point!("ps::connection-start::startup-packet");
1789 0 : Ok(())
1790 0 : }
1791 :
1792 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1793 : async fn process_query(
1794 : &mut self,
1795 : pgb: &mut PostgresBackend<IO>,
1796 : query_string: &str,
1797 : ) -> Result<(), QueryError> {
1798 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1799 0 : info!("Hit failpoint for bad connection");
1800 0 : Err(QueryError::SimulatedConnectionError)
1801 0 : });
1802 :
1803 : fail::fail_point!("ps::connection-start::process-query");
1804 :
1805 : let ctx = self.connection_ctx.attached_child();
1806 : debug!("process query {query_string}");
1807 : let query = PageServiceCmd::parse(query_string)?;
1808 : match query {
1809 : PageServiceCmd::PageStream(PageStreamCmd {
1810 : tenant_id,
1811 : timeline_id,
1812 : }) => {
1813 : tracing::Span::current()
1814 : .record("tenant_id", field::display(tenant_id))
1815 : .record("timeline_id", field::display(timeline_id));
1816 :
1817 : self.check_permission(Some(tenant_id))?;
1818 :
1819 : COMPUTE_COMMANDS_COUNTERS
1820 : .for_command(ComputeCommandKind::PageStreamV2)
1821 : .inc();
1822 :
1823 : self.handle_pagerequests(
1824 : pgb,
1825 : tenant_id,
1826 : timeline_id,
1827 : PagestreamProtocolVersion::V2,
1828 : ctx,
1829 : )
1830 : .await?;
1831 : }
1832 : PageServiceCmd::BaseBackup(BaseBackupCmd {
1833 : tenant_id,
1834 : timeline_id,
1835 : lsn,
1836 : gzip,
1837 : replica,
1838 : }) => {
1839 : tracing::Span::current()
1840 : .record("tenant_id", field::display(tenant_id))
1841 : .record("timeline_id", field::display(timeline_id));
1842 :
1843 : self.check_permission(Some(tenant_id))?;
1844 :
1845 : COMPUTE_COMMANDS_COUNTERS
1846 : .for_command(ComputeCommandKind::Basebackup)
1847 : .inc();
1848 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1849 0 : let res = async {
1850 0 : self.handle_basebackup_request(
1851 0 : pgb,
1852 0 : tenant_id,
1853 0 : timeline_id,
1854 0 : lsn,
1855 0 : None,
1856 0 : false,
1857 0 : gzip,
1858 0 : replica,
1859 0 : &ctx,
1860 0 : )
1861 0 : .await?;
1862 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1863 0 : Result::<(), QueryError>::Ok(())
1864 0 : }
1865 : .await;
1866 : metric_recording.observe(&res);
1867 : res?;
1868 : }
1869 : // same as basebackup, but result includes relational data as well
1870 : PageServiceCmd::FullBackup(FullBackupCmd {
1871 : tenant_id,
1872 : timeline_id,
1873 : lsn,
1874 : prev_lsn,
1875 : }) => {
1876 : tracing::Span::current()
1877 : .record("tenant_id", field::display(tenant_id))
1878 : .record("timeline_id", field::display(timeline_id));
1879 :
1880 : self.check_permission(Some(tenant_id))?;
1881 :
1882 : COMPUTE_COMMANDS_COUNTERS
1883 : .for_command(ComputeCommandKind::Fullbackup)
1884 : .inc();
1885 :
1886 : // Check that the timeline exists
1887 : self.handle_basebackup_request(
1888 : pgb,
1889 : tenant_id,
1890 : timeline_id,
1891 : lsn,
1892 : prev_lsn,
1893 : true,
1894 : false,
1895 : false,
1896 : &ctx,
1897 : )
1898 : .await?;
1899 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1900 : }
1901 : PageServiceCmd::Set => {
1902 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1903 : // on connect
1904 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1905 : }
1906 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
1907 : tenant_shard_id,
1908 : timeline_id,
1909 : lsn,
1910 : }) => {
1911 : tracing::Span::current()
1912 : .record("tenant_id", field::display(tenant_shard_id))
1913 : .record("timeline_id", field::display(timeline_id));
1914 :
1915 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1916 :
1917 : COMPUTE_COMMANDS_COUNTERS
1918 : .for_command(ComputeCommandKind::LeaseLsn)
1919 : .inc();
1920 :
1921 : match self
1922 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1923 : .await
1924 : {
1925 : Ok(()) => {
1926 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
1927 : }
1928 : Err(e) => {
1929 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1930 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1931 : &e.to_string(),
1932 : Some(e.pg_error_code()),
1933 : ))?
1934 : }
1935 : };
1936 : }
1937 : }
1938 :
1939 : Ok(())
1940 : }
1941 : }
1942 :
1943 : impl From<GetActiveTenantError> for QueryError {
1944 0 : fn from(e: GetActiveTenantError) -> Self {
1945 0 : match e {
1946 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1947 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1948 0 : ),
1949 : GetActiveTenantError::Cancelled
1950 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1951 0 : QueryError::Shutdown
1952 : }
1953 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1954 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1955 : }
1956 0 : }
1957 : }
1958 :
1959 0 : #[derive(Debug, thiserror::Error)]
1960 : pub(crate) enum GetActiveTimelineError {
1961 : #[error(transparent)]
1962 : Tenant(GetActiveTenantError),
1963 : #[error(transparent)]
1964 : Timeline(#[from] GetTimelineError),
1965 : }
1966 :
1967 : impl From<GetActiveTimelineError> for QueryError {
1968 0 : fn from(e: GetActiveTimelineError) -> Self {
1969 0 : match e {
1970 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1971 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1972 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1973 : }
1974 0 : }
1975 : }
1976 :
1977 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1978 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1979 0 : tracing::Span::current().record(
1980 0 : "shard_id",
1981 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1982 0 : );
1983 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1984 0 : }
1985 :
1986 : struct WaitedForLsn(Lsn);
1987 : impl From<WaitedForLsn> for Lsn {
1988 0 : fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
1989 0 : lsn
1990 0 : }
1991 : }
1992 :
1993 : #[cfg(test)]
1994 : mod tests {
1995 : use utils::shard::ShardCount;
1996 :
1997 : use super::*;
1998 :
1999 : #[test]
2000 2 : fn pageservice_cmd_parse() {
2001 2 : let tenant_id = TenantId::generate();
2002 2 : let timeline_id = TimelineId::generate();
2003 2 : let cmd =
2004 2 : PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
2005 2 : assert_eq!(
2006 2 : cmd,
2007 2 : PageServiceCmd::PageStream(PageStreamCmd {
2008 2 : tenant_id,
2009 2 : timeline_id
2010 2 : })
2011 2 : );
2012 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
2013 2 : assert_eq!(
2014 2 : cmd,
2015 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2016 2 : tenant_id,
2017 2 : timeline_id,
2018 2 : lsn: None,
2019 2 : gzip: false,
2020 2 : replica: false
2021 2 : })
2022 2 : );
2023 2 : let cmd =
2024 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
2025 2 : assert_eq!(
2026 2 : cmd,
2027 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2028 2 : tenant_id,
2029 2 : timeline_id,
2030 2 : lsn: None,
2031 2 : gzip: true,
2032 2 : replica: false
2033 2 : })
2034 2 : );
2035 2 : let cmd =
2036 2 : PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
2037 2 : assert_eq!(
2038 2 : cmd,
2039 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2040 2 : tenant_id,
2041 2 : timeline_id,
2042 2 : lsn: None,
2043 2 : gzip: false,
2044 2 : replica: false
2045 2 : })
2046 2 : );
2047 2 : let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
2048 2 : .unwrap();
2049 2 : assert_eq!(
2050 2 : cmd,
2051 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2052 2 : tenant_id,
2053 2 : timeline_id,
2054 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2055 2 : gzip: false,
2056 2 : replica: false
2057 2 : })
2058 2 : );
2059 2 : let cmd = PageServiceCmd::parse(&format!(
2060 2 : "basebackup {tenant_id} {timeline_id} --replica --gzip"
2061 2 : ))
2062 2 : .unwrap();
2063 2 : assert_eq!(
2064 2 : cmd,
2065 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2066 2 : tenant_id,
2067 2 : timeline_id,
2068 2 : lsn: None,
2069 2 : gzip: true,
2070 2 : replica: true
2071 2 : })
2072 2 : );
2073 2 : let cmd = PageServiceCmd::parse(&format!(
2074 2 : "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
2075 2 : ))
2076 2 : .unwrap();
2077 2 : assert_eq!(
2078 2 : cmd,
2079 2 : PageServiceCmd::BaseBackup(BaseBackupCmd {
2080 2 : tenant_id,
2081 2 : timeline_id,
2082 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2083 2 : gzip: true,
2084 2 : replica: true
2085 2 : })
2086 2 : );
2087 2 : let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
2088 2 : assert_eq!(
2089 2 : cmd,
2090 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2091 2 : tenant_id,
2092 2 : timeline_id,
2093 2 : lsn: None,
2094 2 : prev_lsn: None
2095 2 : })
2096 2 : );
2097 2 : let cmd = PageServiceCmd::parse(&format!(
2098 2 : "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
2099 2 : ))
2100 2 : .unwrap();
2101 2 : assert_eq!(
2102 2 : cmd,
2103 2 : PageServiceCmd::FullBackup(FullBackupCmd {
2104 2 : tenant_id,
2105 2 : timeline_id,
2106 2 : lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
2107 2 : prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
2108 2 : })
2109 2 : );
2110 2 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2111 2 : let cmd = PageServiceCmd::parse(&format!(
2112 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2113 2 : ))
2114 2 : .unwrap();
2115 2 : assert_eq!(
2116 2 : cmd,
2117 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2118 2 : tenant_shard_id,
2119 2 : timeline_id,
2120 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2121 2 : })
2122 2 : );
2123 2 : let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
2124 2 : let cmd = PageServiceCmd::parse(&format!(
2125 2 : "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
2126 2 : ))
2127 2 : .unwrap();
2128 2 : assert_eq!(
2129 2 : cmd,
2130 2 : PageServiceCmd::LeaseLsn(LeaseLsnCmd {
2131 2 : tenant_shard_id,
2132 2 : timeline_id,
2133 2 : lsn: Lsn::from_str("0/16ABCDE").unwrap(),
2134 2 : })
2135 2 : );
2136 2 : let cmd = PageServiceCmd::parse("set a = b").unwrap();
2137 2 : assert_eq!(cmd, PageServiceCmd::Set);
2138 2 : let cmd = PageServiceCmd::parse("SET foo").unwrap();
2139 2 : assert_eq!(cmd, PageServiceCmd::Set);
2140 2 : }
2141 :
2142 : #[test]
2143 2 : fn pageservice_cmd_err_handling() {
2144 2 : let tenant_id = TenantId::generate();
2145 2 : let timeline_id = TimelineId::generate();
2146 2 : let cmd = PageServiceCmd::parse("unknown_command");
2147 2 : assert!(cmd.is_err());
2148 2 : let cmd = PageServiceCmd::parse("pagestream_v2");
2149 2 : assert!(cmd.is_err());
2150 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
2151 2 : assert!(cmd.is_err());
2152 2 : let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
2153 2 : assert!(cmd.is_err());
2154 2 : let cmd = PageServiceCmd::parse(&format!(
2155 2 : "basebackup {tenant_id} {timeline_id} --gzip --gzip"
2156 2 : ));
2157 2 : assert!(cmd.is_err());
2158 2 : let cmd = PageServiceCmd::parse(&format!(
2159 2 : "basebackup {tenant_id} {timeline_id} --gzip --unknown"
2160 2 : ));
2161 2 : assert!(cmd.is_err());
2162 2 : let cmd = PageServiceCmd::parse(&format!(
2163 2 : "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
2164 2 : ));
2165 2 : assert!(cmd.is_err());
2166 2 : let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
2167 2 : assert!(cmd.is_err());
2168 2 : }
2169 : }
|