Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use bytes::Bytes;
8 : use futures::stream::FuturesUnordered;
9 : use futures::Stream;
10 : use futures::StreamExt;
11 : use pageserver_api::key::Key;
12 : use pageserver_api::models::TenantState;
13 : use pageserver_api::models::{
14 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
15 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
16 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
17 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
18 : PagestreamNblocksResponse, PagestreamProtocolVersion,
19 : };
20 : use pageserver_api::shard::ShardIndex;
21 : use pageserver_api::shard::ShardNumber;
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
24 : use pq_proto::framed::ConnectionError;
25 : use pq_proto::FeStartupPacket;
26 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
27 : use std::borrow::Cow;
28 : use std::collections::HashMap;
29 : use std::io;
30 : use std::net::TcpListener;
31 : use std::pin::pin;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::Duration;
36 : use std::time::Instant;
37 : use std::time::SystemTime;
38 : use tokio::io::AsyncWriteExt;
39 : use tokio::io::{AsyncRead, AsyncWrite};
40 : use tokio_util::io::StreamReader;
41 : use tokio_util::sync::CancellationToken;
42 : use tracing::*;
43 : use utils::id::ConnectionId;
44 : use utils::sync::gate::GateGuard;
45 : use utils::{
46 : auth::{Claims, Scope, SwappableJwtAuth},
47 : id::{TenantId, TimelineId},
48 : lsn::Lsn,
49 : simple_rcu::RcuReadGuard,
50 : };
51 :
52 : use crate::auth::check_permission;
53 : use crate::basebackup;
54 : use crate::basebackup::BasebackupError;
55 : use crate::context::{DownloadBehavior, RequestContext};
56 : use crate::import_datadir::import_wal_from_tar;
57 : use crate::metrics;
58 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
59 : use crate::pgdatadir_mapping::Version;
60 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
61 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
62 : use crate::task_mgr;
63 : use crate::task_mgr::TaskKind;
64 : use crate::tenant::mgr::GetActiveTenantError;
65 : use crate::tenant::mgr::GetTenantError;
66 : use crate::tenant::mgr::ShardResolveResult;
67 : use crate::tenant::mgr::ShardSelector;
68 : use crate::tenant::mgr::TenantManager;
69 : use crate::tenant::timeline::FlushLayerError;
70 : use crate::tenant::timeline::WaitLsnError;
71 : use crate::tenant::GetTimelineError;
72 : use crate::tenant::PageReconstructError;
73 : use crate::tenant::Tenant;
74 : use crate::tenant::Timeline;
75 : use crate::trace::Tracer;
76 : use pageserver_api::key::rel_block_to_key;
77 : use pageserver_api::reltag::SlruKind;
78 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
79 : use postgres_ffi::BLCKSZ;
80 :
81 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
82 : // is not yet in state [`TenantState::Active`].
83 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
84 :
85 : /// Read the end of a tar archive.
86 : ///
87 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
88 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
89 : /// and check that there is no more data after the EOF marker.
90 : ///
91 : /// 'tar' command can also write extra blocks of zeros, up to a record
92 : /// size, controlled by the --record-size argument. Ignore them too.
93 0 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
94 0 : use tokio::io::AsyncReadExt;
95 0 : let mut buf = [0u8; 512];
96 0 :
97 0 : // Read the all-zeros block, and verify it
98 0 : let mut total_bytes = 0;
99 0 : while total_bytes < 512 {
100 0 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
101 0 : total_bytes += nbytes;
102 0 : if nbytes == 0 {
103 0 : break;
104 0 : }
105 : }
106 0 : if total_bytes < 512 {
107 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
108 0 : }
109 0 : if !buf.iter().all(|&x| x == 0) {
110 0 : anyhow::bail!("invalid tar EOF marker");
111 0 : }
112 0 :
113 0 : // Drain any extra zero-blocks after the EOF marker
114 0 : let mut trailing_bytes = 0;
115 0 : let mut seen_nonzero_bytes = false;
116 : loop {
117 0 : let nbytes = reader.read(&mut buf).await?;
118 0 : trailing_bytes += nbytes;
119 0 : if !buf.iter().all(|&x| x == 0) {
120 0 : seen_nonzero_bytes = true;
121 0 : }
122 0 : if nbytes == 0 {
123 0 : break;
124 0 : }
125 : }
126 0 : if seen_nonzero_bytes {
127 0 : anyhow::bail!("unexpected non-zero bytes after the tar archive");
128 0 : }
129 0 : if trailing_bytes % 512 != 0 {
130 0 : anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
131 0 : }
132 0 : Ok(())
133 0 : }
134 :
135 : ///////////////////////////////////////////////////////////////////////////////
136 :
137 : ///
138 : /// Main loop of the page service.
139 : ///
140 : /// Listens for connections, and launches a new handler task for each.
141 : ///
142 0 : pub async fn libpq_listener_main(
143 0 : tenant_manager: Arc<TenantManager>,
144 0 : broker_client: storage_broker::BrokerClientChannel,
145 0 : auth: Option<Arc<SwappableJwtAuth>>,
146 0 : listener: TcpListener,
147 0 : auth_type: AuthType,
148 0 : listener_ctx: RequestContext,
149 0 : cancel: CancellationToken,
150 0 : ) -> anyhow::Result<()> {
151 0 : listener.set_nonblocking(true)?;
152 0 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
153 :
154 : // Wait for a new connection to arrive, or for server shutdown.
155 0 : while let Some(res) = tokio::select! {
156 : biased;
157 :
158 : _ = cancel.cancelled() => {
159 : // We were requested to shut down.
160 : None
161 : }
162 :
163 : res = tokio_listener.accept() => {
164 : Some(res)
165 : }
166 : } {
167 0 : match res {
168 0 : Ok((socket, peer_addr)) => {
169 0 : // Connection established. Spawn a new task to handle it.
170 0 : debug!("accepted connection from {}", peer_addr);
171 0 : let local_auth = auth.clone();
172 0 :
173 0 : let connection_ctx = listener_ctx
174 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
175 0 :
176 0 : // PageRequestHandler tasks are not associated with any particular
177 0 : // timeline in the task manager. In practice most connections will
178 0 : // only deal with a particular timeline, but we don't know which one
179 0 : // yet.
180 0 : task_mgr::spawn(
181 0 : &tokio::runtime::Handle::current(),
182 0 : TaskKind::PageRequestHandler,
183 0 : None,
184 0 : None,
185 0 : "serving compute connection task",
186 0 : false,
187 0 : page_service_conn_main(
188 0 : tenant_manager.clone(),
189 0 : broker_client.clone(),
190 0 : local_auth,
191 0 : socket,
192 0 : auth_type,
193 0 : connection_ctx,
194 0 : ),
195 0 : );
196 : }
197 0 : Err(err) => {
198 0 : // accept() failed. Log the error, and loop back to retry on next connection.
199 0 : error!("accept() failed: {:?}", err);
200 : }
201 : }
202 : }
203 :
204 0 : debug!("page_service loop terminated");
205 :
206 0 : Ok(())
207 0 : }
208 :
209 0 : #[instrument(skip_all, fields(peer_addr))]
210 : async fn page_service_conn_main(
211 : tenant_manager: Arc<TenantManager>,
212 : broker_client: storage_broker::BrokerClientChannel,
213 : auth: Option<Arc<SwappableJwtAuth>>,
214 : socket: tokio::net::TcpStream,
215 : auth_type: AuthType,
216 : connection_ctx: RequestContext,
217 : ) -> anyhow::Result<()> {
218 : // Immediately increment the gauge, then create a job to decrement it on task exit.
219 : // One of the pros of `defer!` is that this will *most probably*
220 : // get called, even in presence of panics.
221 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
222 : gauge.inc();
223 : scopeguard::defer! {
224 : gauge.dec();
225 : }
226 :
227 : socket
228 : .set_nodelay(true)
229 : .context("could not set TCP_NODELAY")?;
230 :
231 : let peer_addr = socket.peer_addr().context("get peer address")?;
232 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
233 :
234 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
235 : // - long enough for most valid compute connections
236 : // - less than infinite to stop us from "leaking" connections to long-gone computes
237 : //
238 : // no write timeout is used, because the kernel is assumed to error writes after some time.
239 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
240 :
241 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
242 0 : let socket_timeout_ms = (|| {
243 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
244 : // Exponential distribution for simulating
245 : // poor network conditions, expect about avg_timeout_ms to be around 15
246 : // in tests
247 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
248 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
249 0 : let u = rand::random::<f32>();
250 0 : ((1.0 - u).ln() / (-avg)) as u64
251 : } else {
252 0 : default_timeout_ms
253 : }
254 0 : });
255 0 : default_timeout_ms
256 : })();
257 :
258 : // A timeout here does not mean the client died, it can happen if it's just idle for
259 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
260 : // they reconnect.
261 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
262 : let socket = std::pin::pin!(socket);
263 :
264 : fail::fail_point!("ps::connection-start::pre-login");
265 :
266 : // XXX: pgbackend.run() should take the connection_ctx,
267 : // and create a child per-query context when it invokes process_query.
268 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
269 : // and create the per-query context in process_query ourselves.
270 : let mut conn_handler =
271 : PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx);
272 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
273 :
274 : match pgbackend
275 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
276 : .await
277 : {
278 : Ok(()) => {
279 : // we've been requested to shut down
280 : Ok(())
281 : }
282 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
283 : if is_expected_io_error(&io_error) {
284 : info!("Postgres client disconnected ({io_error})");
285 : Ok(())
286 : } else {
287 : Err(io_error).context("Postgres connection error")
288 : }
289 : }
290 : other => other.context("Postgres query error"),
291 : }
292 : }
293 :
294 : /// While a handler holds a reference to a Timeline, it also holds a the
295 : /// timeline's Gate open.
296 : struct HandlerTimeline {
297 : timeline: Arc<Timeline>,
298 : _guard: GateGuard,
299 : }
300 :
301 : struct PageServerHandler {
302 : broker_client: storage_broker::BrokerClientChannel,
303 : auth: Option<Arc<SwappableJwtAuth>>,
304 : claims: Option<Claims>,
305 :
306 : tenant_manager: Arc<TenantManager>,
307 :
308 : /// The context created for the lifetime of the connection
309 : /// services by this PageServerHandler.
310 : /// For each query received over the connection,
311 : /// `process_query` creates a child context from this one.
312 : connection_ctx: RequestContext,
313 :
314 : /// See [`Self::cache_timeline`] for usage.
315 : ///
316 : /// Note on size: the typical size of this map is 1. The largest size we expect
317 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
318 : /// or the ratio used when splitting shards (i.e. how many children created from one)
319 : /// parent shard, where a "large" number might be ~8.
320 : shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
321 : }
322 :
323 0 : #[derive(thiserror::Error, Debug)]
324 : enum PageStreamError {
325 : /// We encountered an error that should prompt the client to reconnect:
326 : /// in practice this means we drop the connection without sending a response.
327 : #[error("Reconnect required: {0}")]
328 : Reconnect(Cow<'static, str>),
329 :
330 : /// We were instructed to shutdown while processing the query
331 : #[error("Shutting down")]
332 : Shutdown,
333 :
334 : /// Something went wrong reading a page: this likely indicates a pageserver bug
335 : #[error("Read error")]
336 : Read(#[source] PageReconstructError),
337 :
338 : /// Ran out of time waiting for an LSN
339 : #[error("LSN timeout: {0}")]
340 : LsnTimeout(WaitLsnError),
341 :
342 : /// The entity required to serve the request (tenant or timeline) is not found,
343 : /// or is not found in a suitable state to serve a request.
344 : #[error("Not found: {0}")]
345 : NotFound(Cow<'static, str>),
346 :
347 : /// Request asked for something that doesn't make sense, like an invalid LSN
348 : #[error("Bad request: {0}")]
349 : BadRequest(Cow<'static, str>),
350 : }
351 :
352 : impl From<PageReconstructError> for PageStreamError {
353 0 : fn from(value: PageReconstructError) -> Self {
354 0 : match value {
355 0 : PageReconstructError::Cancelled => Self::Shutdown,
356 0 : e => Self::Read(e),
357 : }
358 0 : }
359 : }
360 :
361 : impl From<GetActiveTimelineError> for PageStreamError {
362 0 : fn from(value: GetActiveTimelineError) -> Self {
363 0 : match value {
364 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
365 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
366 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
367 : }
368 0 : }
369 : }
370 :
371 : impl From<WaitLsnError> for PageStreamError {
372 0 : fn from(value: WaitLsnError) -> Self {
373 0 : match value {
374 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
375 0 : WaitLsnError::Shutdown => Self::Shutdown,
376 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
377 : }
378 0 : }
379 : }
380 :
381 : impl From<WaitLsnError> for QueryError {
382 0 : fn from(value: WaitLsnError) -> Self {
383 0 : match value {
384 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
385 0 : WaitLsnError::Shutdown => Self::Shutdown,
386 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
387 : }
388 0 : }
389 : }
390 :
391 : impl PageServerHandler {
392 0 : pub fn new(
393 0 : tenant_manager: Arc<TenantManager>,
394 0 : broker_client: storage_broker::BrokerClientChannel,
395 0 : auth: Option<Arc<SwappableJwtAuth>>,
396 0 : connection_ctx: RequestContext,
397 0 : ) -> Self {
398 0 : PageServerHandler {
399 0 : tenant_manager,
400 0 : broker_client,
401 0 : auth,
402 0 : claims: None,
403 0 : connection_ctx,
404 0 : shard_timelines: HashMap::new(),
405 0 : }
406 0 : }
407 :
408 : /// Future that completes when we need to shut down the connection.
409 : ///
410 : /// We currently need to shut down when any of the following happens:
411 : /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
412 : /// 2. task_mgr requests shutdown of the connection
413 : ///
414 : /// NB on (1): the connection's lifecycle is not actually tied to any of the
415 : /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
416 : /// implementation to be responsive to timeline cancellation because
417 : /// the connection holds their `GateGuards` open (sored in `shard_timelines`).
418 : /// We currently do the easy thing and terminate the connection if any of the
419 : /// shard_timelines gets cancelled. But really, we cuold spend more effort
420 : /// and simply remove the cancelled timeline from the `shard_timelines`, thereby
421 : /// dropping the guard.
422 : ///
423 : /// NB: keep in sync with [`Self::is_connection_cancelled`]
424 0 : async fn await_connection_cancelled(&self) {
425 0 : // A short wait before we expend the cycles to walk our timeline map. This avoids incurring
426 0 : // that cost every time we check for cancellation.
427 0 : tokio::time::sleep(Duration::from_millis(10)).await;
428 :
429 : // This function is never called concurrently with code that adds timelines to shard_timelines,
430 : // which is enforced by the borrow checker (the future returned by this function carries the
431 : // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
432 : // missing any inserts to the map.
433 :
434 0 : let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
435 0 : use futures::future::Either;
436 0 : cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
437 0 : cancellation_sources.extend(
438 0 : self.shard_timelines
439 0 : .values()
440 0 : .map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
441 0 : );
442 0 : FuturesUnordered::from_iter(cancellation_sources)
443 0 : .next()
444 0 : .await;
445 0 : }
446 :
447 : /// Checking variant of [`Self::await_connection_cancelled`].
448 0 : fn is_connection_cancelled(&self) -> bool {
449 0 : task_mgr::is_shutdown_requested()
450 0 : || self
451 0 : .shard_timelines
452 0 : .values()
453 0 : .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
454 0 : }
455 :
456 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
457 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
458 : /// cancellation if there aren't any timelines in the cache.
459 : ///
460 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
461 : /// timeline cancellation token.
462 0 : async fn flush_cancellable<IO>(
463 0 : &self,
464 0 : pgb: &mut PostgresBackend<IO>,
465 0 : cancel: &CancellationToken,
466 0 : ) -> Result<(), QueryError>
467 0 : where
468 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
469 0 : {
470 : tokio::select!(
471 : flush_r = pgb.flush() => {
472 : Ok(flush_r?)
473 : },
474 : _ = self.await_connection_cancelled() => {
475 : Err(QueryError::Shutdown)
476 : }
477 : _ = cancel.cancelled() => {
478 : Err(QueryError::Shutdown)
479 : }
480 : )
481 0 : }
482 :
483 0 : fn copyin_stream<'a, IO>(
484 0 : &'a self,
485 0 : pgb: &'a mut PostgresBackend<IO>,
486 0 : cancel: &'a CancellationToken,
487 0 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
488 0 : where
489 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
490 0 : {
491 : async_stream::try_stream! {
492 : loop {
493 : let msg = tokio::select! {
494 : biased;
495 :
496 : _ = cancel.cancelled() => {
497 : // We were requested to shut down.
498 : let msg = "pageserver is shutting down";
499 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
500 : Err(QueryError::Shutdown)
501 : }
502 :
503 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
504 : };
505 :
506 : match msg {
507 : Ok(Some(message)) => {
508 : let copy_data_bytes = match message {
509 : FeMessage::CopyData(bytes) => bytes,
510 : FeMessage::CopyDone => { break },
511 : FeMessage::Sync => continue,
512 : FeMessage::Terminate => {
513 : let msg = "client terminated connection with Terminate message during COPY";
514 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
515 : // error can't happen here, ErrorResponse serialization should be always ok
516 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
517 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
518 : break;
519 : }
520 : m => {
521 : let msg = format!("unexpected message {m:?}");
522 : // error can't happen here, ErrorResponse serialization should be always ok
523 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
524 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
525 : break;
526 : }
527 : };
528 :
529 : yield copy_data_bytes;
530 : }
531 : Ok(None) => {
532 : let msg = "client closed connection during COPY";
533 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
534 : // error can't happen here, ErrorResponse serialization should be always ok
535 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
536 0 : self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
537 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
538 : }
539 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
540 : Err(io_error)?;
541 : }
542 : Err(other) => {
543 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
544 : }
545 : };
546 : }
547 : }
548 0 : }
549 :
550 0 : #[instrument(skip_all)]
551 : async fn handle_pagerequests<IO>(
552 : &mut self,
553 : pgb: &mut PostgresBackend<IO>,
554 : tenant_id: TenantId,
555 : timeline_id: TimelineId,
556 : protocol_version: PagestreamProtocolVersion,
557 : ctx: RequestContext,
558 : ) -> Result<(), QueryError>
559 : where
560 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
561 : {
562 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
563 :
564 : let tenant = self
565 : .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT)
566 : .await?;
567 :
568 : // Make request tracer if needed
569 : let mut tracer = if tenant.get_trace_read_requests() {
570 : let connection_id = ConnectionId::generate();
571 : let path =
572 : tenant
573 : .conf
574 : .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
575 : Some(Tracer::new(path))
576 : } else {
577 : None
578 : };
579 :
580 : // switch client to COPYBOTH
581 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
582 : self.flush_cancellable(pgb, &tenant.cancel).await?;
583 :
584 : loop {
585 : let msg = tokio::select! {
586 : biased;
587 :
588 : _ = self.await_connection_cancelled() => {
589 : // We were requested to shut down.
590 : info!("shutdown request received in page handler");
591 : return Err(QueryError::Shutdown)
592 : }
593 :
594 : msg = pgb.read_message() => { msg }
595 : };
596 :
597 : let copy_data_bytes = match msg? {
598 : Some(FeMessage::CopyData(bytes)) => bytes,
599 : Some(FeMessage::Terminate) => break,
600 : Some(m) => {
601 : return Err(QueryError::Other(anyhow::anyhow!(
602 : "unexpected message: {m:?} during COPY"
603 : )));
604 : }
605 : None => break, // client disconnected
606 : };
607 :
608 : trace!("query: {copy_data_bytes:?}");
609 : fail::fail_point!("ps::handle-pagerequest-message");
610 :
611 : // Trace request if needed
612 : if let Some(t) = tracer.as_mut() {
613 : t.trace(©_data_bytes)
614 : }
615 :
616 : let neon_fe_msg =
617 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
618 :
619 : // TODO: We could create a new per-request context here, with unique ID.
620 : // Currently we use the same per-timeline context for all requests
621 :
622 : let (response, span) = match neon_fe_msg {
623 : PagestreamFeMessage::Exists(req) => {
624 : fail::fail_point!("ps::handle-pagerequest-message::exists");
625 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
626 : (
627 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
628 : .instrument(span.clone())
629 : .await,
630 : span,
631 : )
632 : }
633 : PagestreamFeMessage::Nblocks(req) => {
634 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
635 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
636 : (
637 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
638 : .instrument(span.clone())
639 : .await,
640 : span,
641 : )
642 : }
643 : PagestreamFeMessage::GetPage(req) => {
644 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
645 : // shard_id is filled in by the handler
646 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
647 : (
648 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
649 : .instrument(span.clone())
650 : .await,
651 : span,
652 : )
653 : }
654 : PagestreamFeMessage::DbSize(req) => {
655 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
656 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
657 : (
658 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
659 : .instrument(span.clone())
660 : .await,
661 : span,
662 : )
663 : }
664 : PagestreamFeMessage::GetSlruSegment(req) => {
665 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
666 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
667 : (
668 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
669 : .instrument(span.clone())
670 : .await,
671 : span,
672 : )
673 : }
674 : };
675 :
676 : match response {
677 : Err(PageStreamError::Shutdown) => {
678 : // If we fail to fulfil a request during shutdown, which may be _because_ of
679 : // shutdown, then do not send the error to the client. Instead just drop the
680 : // connection.
681 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
682 : return Err(QueryError::Shutdown);
683 : }
684 : Err(PageStreamError::Reconnect(reason)) => {
685 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
686 : return Err(QueryError::Reconnect);
687 : }
688 : Err(e) if self.is_connection_cancelled() => {
689 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
690 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
691 : //
692 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
693 : // because wait_lsn etc will drop out
694 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
695 : // is_canceled(): [`Timeline::shutdown`]` has entered
696 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
697 : return Err(QueryError::Shutdown);
698 : }
699 : r => {
700 0 : let response_msg = r.unwrap_or_else(|e| {
701 0 : // print the all details to the log with {:#}, but for the client the
702 0 : // error message is enough. Do not log if shutting down, as the anyhow::Error
703 0 : // here includes cancellation which is not an error.
704 0 : let full = utils::error::report_compact_sources(&e);
705 0 : span.in_scope(|| {
706 0 : error!("error reading relation or page version: {full:#}")
707 0 : });
708 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
709 0 : message: e.to_string(),
710 0 : })
711 0 : });
712 :
713 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
714 : self.flush_cancellable(pgb, &tenant.cancel).await?;
715 : }
716 : }
717 : }
718 : Ok(())
719 : }
720 :
721 : #[allow(clippy::too_many_arguments)]
722 0 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
723 : async fn handle_import_basebackup<IO>(
724 : &self,
725 : pgb: &mut PostgresBackend<IO>,
726 : tenant_id: TenantId,
727 : timeline_id: TimelineId,
728 : base_lsn: Lsn,
729 : _end_lsn: Lsn,
730 : pg_version: u32,
731 : ctx: RequestContext,
732 : ) -> Result<(), QueryError>
733 : where
734 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
735 : {
736 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
737 :
738 : // Create empty timeline
739 : info!("creating new timeline");
740 : let tenant = self
741 : .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT)
742 : .await?;
743 : let timeline = tenant
744 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
745 : .await?;
746 :
747 : // TODO mark timeline as not ready until it reaches end_lsn.
748 : // We might have some wal to import as well, and we should prevent compute
749 : // from connecting before that and writing conflicting wal.
750 : //
751 : // This is not relevant for pageserver->pageserver migrations, since there's
752 : // no wal to import. But should be fixed if we want to import from postgres.
753 :
754 : // TODO leave clean state on error. For now you can use detach to clean
755 : // up broken state from a failed import.
756 :
757 : // Import basebackup provided via CopyData
758 : info!("importing basebackup");
759 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
760 : self.flush_cancellable(pgb, &tenant.cancel).await?;
761 :
762 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
763 : timeline
764 : .import_basebackup_from_tar(
765 : tenant.clone(),
766 : &mut copyin_reader,
767 : base_lsn,
768 : self.broker_client.clone(),
769 : &ctx,
770 : )
771 : .await?;
772 :
773 : // Read the end of the tar archive.
774 : read_tar_eof(copyin_reader).await?;
775 :
776 : // TODO check checksum
777 : // Meanwhile you can verify client-side by taking fullbackup
778 : // and checking that it matches in size with what was imported.
779 : // It wouldn't work if base came from vanilla postgres though,
780 : // since we discard some log files.
781 :
782 : info!("done");
783 : Ok(())
784 : }
785 :
786 0 : #[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))]
787 : async fn handle_import_wal<IO>(
788 : &self,
789 : pgb: &mut PostgresBackend<IO>,
790 : tenant_id: TenantId,
791 : timeline_id: TimelineId,
792 : start_lsn: Lsn,
793 : end_lsn: Lsn,
794 : ctx: RequestContext,
795 : ) -> Result<(), QueryError>
796 : where
797 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
798 : {
799 : let timeline = self
800 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
801 : .await?;
802 : let last_record_lsn = timeline.get_last_record_lsn();
803 : if last_record_lsn != start_lsn {
804 : return Err(QueryError::Other(
805 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
806 : );
807 : }
808 :
809 : // TODO leave clean state on error. For now you can use detach to clean
810 : // up broken state from a failed import.
811 :
812 : // Import wal provided via CopyData
813 : info!("importing wal");
814 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
815 : self.flush_cancellable(pgb, &timeline.cancel).await?;
816 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
817 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
818 : info!("wal import complete");
819 :
820 : // Read the end of the tar archive.
821 : read_tar_eof(copyin_reader).await?;
822 :
823 : // TODO Does it make sense to overshoot?
824 : if timeline.get_last_record_lsn() < end_lsn {
825 : return Err(QueryError::Other(
826 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
827 : );
828 : }
829 :
830 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
831 : // We only want to persist the data, and it doesn't matter if it's in the
832 : // shape of deltas or images.
833 : info!("flushing layers");
834 0 : timeline.freeze_and_flush().await.map_err(|e| match e {
835 0 : FlushLayerError::Cancelled => QueryError::Shutdown,
836 0 : other => QueryError::Other(other.into()),
837 0 : })?;
838 :
839 : info!("done");
840 : Ok(())
841 : }
842 :
843 : /// Helper function to handle the LSN from client request.
844 : ///
845 : /// Each GetPage (and Exists and Nblocks) request includes information about
846 : /// which version of the page is being requested. The primary compute node
847 : /// will always request the latest page version, by setting 'request_lsn' to
848 : /// the last inserted or flushed WAL position, while a standby will request
849 : /// a version at the LSN that it's currently caught up to.
850 : ///
851 : /// In either case, if the page server hasn't received the WAL up to the
852 : /// requested LSN yet, we will wait for it to arrive. The return value is
853 : /// the LSN that should be used to look up the page versions.
854 : ///
855 : /// In addition to the request LSN, each request carries another LSN,
856 : /// 'not_modified_since', which is a hint to the pageserver that the client
857 : /// knows that the page has not been modified between 'not_modified_since'
858 : /// and the request LSN. This allows skipping the wait, as long as the WAL
859 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
860 : /// information about when the page was modified, it will use
861 : /// not_modified_since == lsn. If the client lies and sends a too low
862 : /// not_modified_hint such that there are in fact later page versions, the
863 : /// behavior is undefined: the pageserver may return any of the page versions
864 : /// or an error.
865 0 : async fn wait_or_get_last_lsn(
866 0 : timeline: &Timeline,
867 0 : request_lsn: Lsn,
868 0 : not_modified_since: Lsn,
869 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
870 0 : ctx: &RequestContext,
871 0 : ) -> Result<Lsn, PageStreamError> {
872 0 : let last_record_lsn = timeline.get_last_record_lsn();
873 0 :
874 0 : // Sanity check the request
875 0 : if request_lsn < not_modified_since {
876 0 : return Err(PageStreamError::BadRequest(
877 0 : format!(
878 0 : "invalid request with request LSN {} and not_modified_since {}",
879 0 : request_lsn, not_modified_since,
880 0 : )
881 0 : .into(),
882 0 : ));
883 0 : }
884 0 :
885 0 : if request_lsn < **latest_gc_cutoff_lsn {
886 : // Check explicitly for INVALID just to get a less scary error message if the
887 : // request is obviously bogus
888 0 : return Err(if request_lsn == Lsn::INVALID {
889 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
890 : } else {
891 0 : PageStreamError::BadRequest(format!(
892 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
893 0 : request_lsn, **latest_gc_cutoff_lsn
894 0 : ).into())
895 : });
896 0 : }
897 0 :
898 0 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
899 0 : if not_modified_since > last_record_lsn {
900 0 : timeline
901 0 : .wait_lsn(
902 0 : not_modified_since,
903 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
904 0 : ctx,
905 0 : )
906 0 : .await?;
907 : // Since we waited for 'not_modified_since' to arrive, that is now the last
908 : // record LSN. (Or close enough for our purposes; the last-record LSN can
909 : // advance immediately after we return anyway)
910 0 : Ok(not_modified_since)
911 : } else {
912 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
913 : // here instead. That would give the same result, since we know that there
914 : // haven't been any modifications since 'not_modified_since'. Using an older
915 : // LSN might be faster, because that could allow skipping recent layers when
916 : // finding the page. However, we have historically used 'last_record_lsn', so
917 : // stick to that for now.
918 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
919 : }
920 0 : }
921 :
922 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
923 : async fn handle_make_lsn_lease<IO>(
924 : &self,
925 : pgb: &mut PostgresBackend<IO>,
926 : tenant_shard_id: TenantShardId,
927 : timeline_id: TimelineId,
928 : lsn: Lsn,
929 : ctx: &RequestContext,
930 : ) -> Result<(), QueryError>
931 : where
932 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
933 : {
934 : let shard_selector = ShardSelector::Known(tenant_shard_id.to_index());
935 : let timeline = self
936 : .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
937 : .await?;
938 : let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
939 : let valid_until = lease
940 : .valid_until
941 : .duration_since(SystemTime::UNIX_EPOCH)
942 0 : .map_err(|e| QueryError::Other(e.into()))?;
943 :
944 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
945 : b"valid_until",
946 : )]))?
947 : .write_message_noflush(&BeMessage::DataRow(&[Some(
948 : &valid_until.as_millis().to_be_bytes(),
949 : )]))?
950 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
951 :
952 : Ok(())
953 : }
954 :
955 0 : #[instrument(skip_all, fields(shard_id))]
956 : async fn handle_get_rel_exists_request(
957 : &mut self,
958 : tenant_id: TenantId,
959 : timeline_id: TimelineId,
960 : req: &PagestreamExistsRequest,
961 : ctx: &RequestContext,
962 : ) -> Result<PagestreamBeMessage, PageStreamError> {
963 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
964 : let _timer = timeline
965 : .query_metrics
966 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
967 :
968 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
969 : let lsn = Self::wait_or_get_last_lsn(
970 : timeline,
971 : req.request_lsn,
972 : req.not_modified_since,
973 : &latest_gc_cutoff_lsn,
974 : ctx,
975 : )
976 : .await?;
977 :
978 : let exists = timeline
979 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
980 : .await?;
981 :
982 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
983 : exists,
984 : }))
985 : }
986 :
987 0 : #[instrument(skip_all, fields(shard_id))]
988 : async fn handle_get_nblocks_request(
989 : &mut self,
990 : tenant_id: TenantId,
991 : timeline_id: TimelineId,
992 : req: &PagestreamNblocksRequest,
993 : ctx: &RequestContext,
994 : ) -> Result<PagestreamBeMessage, PageStreamError> {
995 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
996 :
997 : let _timer = timeline
998 : .query_metrics
999 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
1000 :
1001 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1002 : let lsn = Self::wait_or_get_last_lsn(
1003 : timeline,
1004 : req.request_lsn,
1005 : req.not_modified_since,
1006 : &latest_gc_cutoff_lsn,
1007 : ctx,
1008 : )
1009 : .await?;
1010 :
1011 : let n_blocks = timeline
1012 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1013 : .await?;
1014 :
1015 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1016 : n_blocks,
1017 : }))
1018 : }
1019 :
1020 0 : #[instrument(skip_all, fields(shard_id))]
1021 : async fn handle_db_size_request(
1022 : &mut self,
1023 : tenant_id: TenantId,
1024 : timeline_id: TimelineId,
1025 : req: &PagestreamDbSizeRequest,
1026 : ctx: &RequestContext,
1027 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1028 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1029 :
1030 : let _timer = timeline
1031 : .query_metrics
1032 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
1033 :
1034 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1035 : let lsn = Self::wait_or_get_last_lsn(
1036 : timeline,
1037 : req.request_lsn,
1038 : req.not_modified_since,
1039 : &latest_gc_cutoff_lsn,
1040 : ctx,
1041 : )
1042 : .await?;
1043 :
1044 : let total_blocks = timeline
1045 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1046 : .await?;
1047 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1048 :
1049 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1050 : db_size,
1051 : }))
1052 : }
1053 :
1054 : /// For most getpage requests, we will already have a Timeline to serve the request: this function
1055 : /// looks up such a Timeline synchronously and without touching any global state.
1056 0 : fn get_cached_timeline_for_page(
1057 0 : &mut self,
1058 0 : req: &PagestreamGetPageRequest,
1059 0 : ) -> Result<&Arc<Timeline>, Key> {
1060 0 : let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
1061 : // Fastest path: single sharded case
1062 0 : if first_idx.shard_count.count() == 1 {
1063 0 : return Ok(&first_timeline.timeline);
1064 0 : }
1065 0 :
1066 0 : let key = rel_block_to_key(req.rel, req.blkno);
1067 0 : let shard_num = first_timeline
1068 0 : .timeline
1069 0 : .get_shard_identity()
1070 0 : .get_shard_number(&key);
1071 0 :
1072 0 : // Fast path: matched the first timeline in our local handler map. This case is common if
1073 0 : // only one shard per tenant is attached to this pageserver.
1074 0 : if first_timeline.timeline.get_shard_identity().number == shard_num {
1075 0 : return Ok(&first_timeline.timeline);
1076 0 : }
1077 0 :
1078 0 : let shard_index = ShardIndex {
1079 0 : shard_number: shard_num,
1080 0 : shard_count: first_timeline.timeline.get_shard_identity().count,
1081 0 : };
1082 :
1083 : // Fast-ish path: timeline is in the connection handler's local cache
1084 0 : if let Some(found) = self.shard_timelines.get(&shard_index) {
1085 0 : return Ok(&found.timeline);
1086 0 : }
1087 0 :
1088 0 : key
1089 : } else {
1090 0 : rel_block_to_key(req.rel, req.blkno)
1091 : };
1092 :
1093 0 : Err(key)
1094 0 : }
1095 :
1096 : /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
1097 : /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
1098 : /// again.
1099 : ///
1100 : /// Note that all the Timelines in this cache are for the same timeline_id: they're differ
1101 : /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
1102 : /// based on key.
1103 : ///
1104 : /// The typical size of this cache is 1, as we generally create shards to distribute work
1105 : /// across pageservers, so don't tend to have multiple shards for the same tenant on the
1106 : /// same pageserver.
1107 0 : fn cache_timeline(
1108 0 : &mut self,
1109 0 : timeline: Arc<Timeline>,
1110 0 : ) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
1111 0 : let gate_guard = timeline
1112 0 : .gate
1113 0 : .enter()
1114 0 : .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
1115 :
1116 0 : let shard_index = timeline.tenant_shard_id.to_index();
1117 0 : let entry = self
1118 0 : .shard_timelines
1119 0 : .entry(shard_index)
1120 0 : .or_insert(HandlerTimeline {
1121 0 : timeline,
1122 0 : _guard: gate_guard,
1123 0 : });
1124 0 :
1125 0 : Ok(&entry.timeline)
1126 0 : }
1127 :
1128 : /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
1129 : /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
1130 : /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
1131 0 : async fn load_timeline_for_page(
1132 0 : &mut self,
1133 0 : tenant_id: TenantId,
1134 0 : timeline_id: TimelineId,
1135 0 : key: Key,
1136 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1137 : // Slow path: we must call out to the TenantManager to find the timeline for this Key
1138 0 : let timeline = self
1139 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
1140 0 : .await?;
1141 :
1142 0 : self.cache_timeline(timeline)
1143 0 : }
1144 :
1145 0 : async fn get_timeline_shard_zero(
1146 0 : &mut self,
1147 0 : tenant_id: TenantId,
1148 0 : timeline_id: TimelineId,
1149 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1150 : // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
1151 : // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
1152 : // ref to salf. So instead, we first build a bool, and then return while not borrowing self.
1153 0 : let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
1154 0 : idx.shard_number == ShardNumber(0)
1155 : } else {
1156 0 : false
1157 : };
1158 :
1159 0 : if have_cached {
1160 0 : let entry = self.shard_timelines.iter().next().unwrap();
1161 0 : Ok(&entry.1.timeline)
1162 : } else {
1163 0 : let timeline = self
1164 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1165 0 : .await?;
1166 0 : Ok(self.cache_timeline(timeline)?)
1167 : }
1168 0 : }
1169 :
1170 0 : #[instrument(skip_all, fields(shard_id))]
1171 : async fn handle_get_page_at_lsn_request(
1172 : &mut self,
1173 : tenant_id: TenantId,
1174 : timeline_id: TimelineId,
1175 : req: &PagestreamGetPageRequest,
1176 : ctx: &RequestContext,
1177 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1178 : let timeline = match self.get_cached_timeline_for_page(req) {
1179 : Ok(tl) => {
1180 : set_tracing_field_shard_id(tl);
1181 : tl
1182 : }
1183 : Err(key) => {
1184 : match self
1185 : .load_timeline_for_page(tenant_id, timeline_id, key)
1186 : .await
1187 : {
1188 : Ok(t) => t,
1189 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
1190 : // We already know this tenant exists in general, because we resolved it at
1191 : // start of connection. Getting a NotFound here indicates that the shard containing
1192 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1193 : // mapping is out of date.
1194 : //
1195 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1196 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1197 : // and talk to a different pageserver.
1198 : return Err(PageStreamError::Reconnect(
1199 : "getpage@lsn request routed to wrong shard".into(),
1200 : ));
1201 : }
1202 : Err(e) => return Err(e.into()),
1203 : }
1204 : }
1205 : };
1206 :
1207 : let _timer = timeline
1208 : .query_metrics
1209 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
1210 :
1211 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1212 : let lsn = Self::wait_or_get_last_lsn(
1213 : timeline,
1214 : req.request_lsn,
1215 : req.not_modified_since,
1216 : &latest_gc_cutoff_lsn,
1217 : ctx,
1218 : )
1219 : .await?;
1220 :
1221 : let page = timeline
1222 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
1223 : .await?;
1224 :
1225 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1226 : page,
1227 : }))
1228 : }
1229 :
1230 0 : #[instrument(skip_all, fields(shard_id))]
1231 : async fn handle_get_slru_segment_request(
1232 : &mut self,
1233 : tenant_id: TenantId,
1234 : timeline_id: TimelineId,
1235 : req: &PagestreamGetSlruSegmentRequest,
1236 : ctx: &RequestContext,
1237 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1238 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1239 :
1240 : let _timer = timeline
1241 : .query_metrics
1242 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1243 :
1244 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1245 : let lsn = Self::wait_or_get_last_lsn(
1246 : timeline,
1247 : req.request_lsn,
1248 : req.not_modified_since,
1249 : &latest_gc_cutoff_lsn,
1250 : ctx,
1251 : )
1252 : .await?;
1253 :
1254 : let kind = SlruKind::from_repr(req.kind)
1255 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1256 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1257 :
1258 : Ok(PagestreamBeMessage::GetSlruSegment(
1259 : PagestreamGetSlruSegmentResponse { segment },
1260 : ))
1261 : }
1262 :
1263 : /// Note on "fullbackup":
1264 : /// Full basebackups should only be used for debugging purposes.
1265 : /// Originally, it was introduced to enable breaking storage format changes,
1266 : /// but that is not applicable anymore.
1267 : #[allow(clippy::too_many_arguments)]
1268 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1269 : async fn handle_basebackup_request<IO>(
1270 : &mut self,
1271 : pgb: &mut PostgresBackend<IO>,
1272 : tenant_id: TenantId,
1273 : timeline_id: TimelineId,
1274 : lsn: Option<Lsn>,
1275 : prev_lsn: Option<Lsn>,
1276 : full_backup: bool,
1277 : gzip: bool,
1278 : ctx: &RequestContext,
1279 : ) -> Result<(), QueryError>
1280 : where
1281 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1282 : {
1283 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1284 0 : match err {
1285 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1286 0 : BasebackupError::Server(e) => QueryError::Other(e),
1287 : }
1288 0 : }
1289 :
1290 : let started = std::time::Instant::now();
1291 :
1292 : // check that the timeline exists
1293 : let timeline = self
1294 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1295 : .await?;
1296 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1297 : if let Some(lsn) = lsn {
1298 : // Backup was requested at a particular LSN. Wait for it to arrive.
1299 : info!("waiting for {}", lsn);
1300 : timeline
1301 : .wait_lsn(
1302 : lsn,
1303 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1304 : ctx,
1305 : )
1306 : .await?;
1307 : timeline
1308 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1309 : .context("invalid basebackup lsn")?;
1310 : }
1311 :
1312 : let lsn_awaited_after = started.elapsed();
1313 :
1314 : // switch client to COPYOUT
1315 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1316 : .map_err(QueryError::Disconnected)?;
1317 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1318 :
1319 : // Send a tarball of the latest layer on the timeline. Compress if not
1320 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1321 : if full_backup {
1322 : let mut writer = pgb.copyout_writer();
1323 : basebackup::send_basebackup_tarball(
1324 : &mut writer,
1325 : &timeline,
1326 : lsn,
1327 : prev_lsn,
1328 : full_backup,
1329 : ctx,
1330 : )
1331 : .await
1332 : .map_err(map_basebackup_error)?;
1333 : } else {
1334 : let mut writer = pgb.copyout_writer();
1335 : if gzip {
1336 : let mut encoder = GzipEncoder::with_quality(
1337 : writer,
1338 : // NOTE using fast compression because it's on the critical path
1339 : // for compute startup. For an empty database, we get
1340 : // <100KB with this method. The Level::Best compression method
1341 : // gives us <20KB, but maybe we should add basebackup caching
1342 : // on compute shutdown first.
1343 : async_compression::Level::Fastest,
1344 : );
1345 : basebackup::send_basebackup_tarball(
1346 : &mut encoder,
1347 : &timeline,
1348 : lsn,
1349 : prev_lsn,
1350 : full_backup,
1351 : ctx,
1352 : )
1353 : .await
1354 : .map_err(map_basebackup_error)?;
1355 : // shutdown the encoder to ensure the gzip footer is written
1356 : encoder
1357 : .shutdown()
1358 : .await
1359 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1360 : } else {
1361 : basebackup::send_basebackup_tarball(
1362 : &mut writer,
1363 : &timeline,
1364 : lsn,
1365 : prev_lsn,
1366 : full_backup,
1367 : ctx,
1368 : )
1369 : .await
1370 : .map_err(map_basebackup_error)?;
1371 : }
1372 : }
1373 :
1374 : pgb.write_message_noflush(&BeMessage::CopyDone)
1375 : .map_err(QueryError::Disconnected)?;
1376 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1377 :
1378 : let basebackup_after = started
1379 : .elapsed()
1380 : .checked_sub(lsn_awaited_after)
1381 : .unwrap_or(Duration::ZERO);
1382 :
1383 : info!(
1384 : lsn_await_millis = lsn_awaited_after.as_millis(),
1385 : basebackup_millis = basebackup_after.as_millis(),
1386 : "basebackup complete"
1387 : );
1388 :
1389 : Ok(())
1390 : }
1391 :
1392 : // when accessing management api supply None as an argument
1393 : // when using to authorize tenant pass corresponding tenant id
1394 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1395 0 : if self.auth.is_none() {
1396 : // auth is set to Trust, nothing to check so just return ok
1397 0 : return Ok(());
1398 0 : }
1399 0 : // auth is some, just checked above, when auth is some
1400 0 : // then claims are always present because of checks during connection init
1401 0 : // so this expect won't trigger
1402 0 : let claims = self
1403 0 : .claims
1404 0 : .as_ref()
1405 0 : .expect("claims presence already checked");
1406 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1407 0 : }
1408 :
1409 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1410 0 : async fn get_active_tenant_timeline(
1411 0 : &self,
1412 0 : tenant_id: TenantId,
1413 0 : timeline_id: TimelineId,
1414 0 : selector: ShardSelector,
1415 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1416 0 : let tenant = self
1417 0 : .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT)
1418 0 : .await
1419 0 : .map_err(GetActiveTimelineError::Tenant)?;
1420 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
1421 0 : set_tracing_field_shard_id(&timeline);
1422 0 : Ok(timeline)
1423 0 : }
1424 :
1425 : /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some
1426 : /// slots for this tenant are `InProgress` then we will wait.
1427 : /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait.
1428 : ///
1429 : /// `timeout` is used as a total timeout for the whole wait operation.
1430 0 : async fn get_active_tenant_with_timeout(
1431 0 : &self,
1432 0 : tenant_id: TenantId,
1433 0 : shard_selector: ShardSelector,
1434 0 : timeout: Duration,
1435 0 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1436 0 : let wait_start = Instant::now();
1437 0 : let deadline = wait_start + timeout;
1438 :
1439 : // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is
1440 : // for handling the rare case that the slot we're accessing is InProgress.
1441 0 : let tenant_shard = loop {
1442 0 : let resolved = self
1443 0 : .tenant_manager
1444 0 : .resolve_attached_shard(&tenant_id, shard_selector);
1445 0 : match resolved {
1446 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
1447 : ShardResolveResult::NotFound => {
1448 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1449 0 : tenant_id,
1450 0 : )));
1451 : }
1452 0 : ShardResolveResult::InProgress(barrier) => {
1453 : // We can't authoritatively answer right now: wait for InProgress state
1454 : // to end, then try again
1455 : tokio::select! {
1456 : _ = self.await_connection_cancelled() => {
1457 : return Err(GetActiveTenantError::Cancelled)
1458 : },
1459 : _ = barrier.wait() => {
1460 : // The barrier completed: proceed around the loop to try looking up again
1461 : },
1462 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
1463 : return Err(GetActiveTenantError::WaitForActiveTimeout {
1464 : latest_state: None,
1465 : wait_time: timeout,
1466 : });
1467 : }
1468 : }
1469 : }
1470 : };
1471 : };
1472 :
1473 0 : tracing::debug!("Waiting for tenant to enter active state...");
1474 0 : tenant_shard
1475 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1476 0 : .await?;
1477 0 : Ok(tenant_shard)
1478 0 : }
1479 : }
1480 :
1481 : #[async_trait::async_trait]
1482 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1483 : where
1484 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1485 : {
1486 0 : fn check_auth_jwt(
1487 0 : &mut self,
1488 0 : _pgb: &mut PostgresBackend<IO>,
1489 0 : jwt_response: &[u8],
1490 0 : ) -> Result<(), QueryError> {
1491 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1492 : // which requires auth to be present
1493 0 : let data = self
1494 0 : .auth
1495 0 : .as_ref()
1496 0 : .unwrap()
1497 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1498 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1499 :
1500 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1501 0 : return Err(QueryError::Unauthorized(
1502 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1503 0 : ));
1504 0 : }
1505 0 :
1506 0 : debug!(
1507 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1508 : data.claims.scope, data.claims.tenant_id,
1509 : );
1510 :
1511 0 : self.claims = Some(data.claims);
1512 0 : Ok(())
1513 0 : }
1514 :
1515 0 : fn startup(
1516 0 : &mut self,
1517 0 : _pgb: &mut PostgresBackend<IO>,
1518 0 : _sm: &FeStartupPacket,
1519 0 : ) -> Result<(), QueryError> {
1520 : fail::fail_point!("ps::connection-start::startup-packet");
1521 0 : Ok(())
1522 0 : }
1523 :
1524 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1525 : async fn process_query(
1526 : &mut self,
1527 : pgb: &mut PostgresBackend<IO>,
1528 : query_string: &str,
1529 0 : ) -> Result<(), QueryError> {
1530 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1531 0 : info!("Hit failpoint for bad connection");
1532 0 : Err(QueryError::SimulatedConnectionError)
1533 0 : });
1534 :
1535 : fail::fail_point!("ps::connection-start::process-query");
1536 :
1537 0 : let ctx = self.connection_ctx.attached_child();
1538 0 : debug!("process query {query_string:?}");
1539 0 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1540 0 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1541 0 : if params.len() != 2 {
1542 0 : return Err(QueryError::Other(anyhow::anyhow!(
1543 0 : "invalid param number for pagestream command"
1544 0 : )));
1545 0 : }
1546 0 : let tenant_id = TenantId::from_str(params[0])
1547 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1548 0 : let timeline_id = TimelineId::from_str(params[1])
1549 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1550 :
1551 0 : tracing::Span::current()
1552 0 : .record("tenant_id", field::display(tenant_id))
1553 0 : .record("timeline_id", field::display(timeline_id));
1554 0 :
1555 0 : self.check_permission(Some(tenant_id))?;
1556 :
1557 0 : self.handle_pagerequests(
1558 0 : pgb,
1559 0 : tenant_id,
1560 0 : timeline_id,
1561 0 : PagestreamProtocolVersion::V2,
1562 0 : ctx,
1563 0 : )
1564 0 : .await?;
1565 0 : } else if let Some(params) = parts.strip_prefix(&["pagestream"]) {
1566 0 : if params.len() != 2 {
1567 0 : return Err(QueryError::Other(anyhow::anyhow!(
1568 0 : "invalid param number for pagestream command"
1569 0 : )));
1570 0 : }
1571 0 : let tenant_id = TenantId::from_str(params[0])
1572 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1573 0 : let timeline_id = TimelineId::from_str(params[1])
1574 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1575 :
1576 0 : tracing::Span::current()
1577 0 : .record("tenant_id", field::display(tenant_id))
1578 0 : .record("timeline_id", field::display(timeline_id));
1579 0 :
1580 0 : self.check_permission(Some(tenant_id))?;
1581 :
1582 0 : self.handle_pagerequests(
1583 0 : pgb,
1584 0 : tenant_id,
1585 0 : timeline_id,
1586 0 : PagestreamProtocolVersion::V1,
1587 0 : ctx,
1588 0 : )
1589 0 : .await?;
1590 0 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1591 0 : if params.len() < 2 {
1592 0 : return Err(QueryError::Other(anyhow::anyhow!(
1593 0 : "invalid param number for basebackup command"
1594 0 : )));
1595 0 : }
1596 :
1597 0 : let tenant_id = TenantId::from_str(params[0])
1598 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1599 0 : let timeline_id = TimelineId::from_str(params[1])
1600 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1601 :
1602 0 : tracing::Span::current()
1603 0 : .record("tenant_id", field::display(tenant_id))
1604 0 : .record("timeline_id", field::display(timeline_id));
1605 0 :
1606 0 : self.check_permission(Some(tenant_id))?;
1607 :
1608 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1609 : Some(
1610 0 : Lsn::from_str(lsn_str)
1611 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1612 : )
1613 : } else {
1614 0 : None
1615 : };
1616 :
1617 0 : let gzip = match params.get(3) {
1618 0 : Some(&"--gzip") => true,
1619 0 : None => false,
1620 0 : Some(third_param) => {
1621 0 : return Err(QueryError::Other(anyhow::anyhow!(
1622 0 : "Parameter in position 3 unknown {third_param}",
1623 0 : )))
1624 : }
1625 : };
1626 :
1627 0 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1628 0 : let res = async {
1629 0 : self.handle_basebackup_request(
1630 0 : pgb,
1631 0 : tenant_id,
1632 0 : timeline_id,
1633 0 : lsn,
1634 0 : None,
1635 0 : false,
1636 0 : gzip,
1637 0 : &ctx,
1638 0 : )
1639 0 : .await?;
1640 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1641 0 : Result::<(), QueryError>::Ok(())
1642 0 : }
1643 0 : .await;
1644 0 : metric_recording.observe(&res);
1645 0 : res?;
1646 : }
1647 : // return pair of prev_lsn and last_lsn
1648 0 : else if let Some(params) = parts.strip_prefix(&["get_last_record_rlsn"]) {
1649 0 : if params.len() != 2 {
1650 0 : return Err(QueryError::Other(anyhow::anyhow!(
1651 0 : "invalid param number for get_last_record_rlsn command"
1652 0 : )));
1653 0 : }
1654 :
1655 0 : let tenant_id = TenantId::from_str(params[0])
1656 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1657 0 : let timeline_id = TimelineId::from_str(params[1])
1658 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1659 :
1660 0 : tracing::Span::current()
1661 0 : .record("tenant_id", field::display(tenant_id))
1662 0 : .record("timeline_id", field::display(timeline_id));
1663 0 :
1664 0 : self.check_permission(Some(tenant_id))?;
1665 0 : async {
1666 0 : let timeline = self
1667 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1668 0 : .await?;
1669 :
1670 0 : let end_of_timeline = timeline.get_last_record_rlsn();
1671 0 :
1672 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1673 0 : RowDescriptor::text_col(b"prev_lsn"),
1674 0 : RowDescriptor::text_col(b"last_lsn"),
1675 0 : ]))?
1676 0 : .write_message_noflush(&BeMessage::DataRow(&[
1677 0 : Some(end_of_timeline.prev.to_string().as_bytes()),
1678 0 : Some(end_of_timeline.last.to_string().as_bytes()),
1679 0 : ]))?
1680 0 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1681 0 : anyhow::Ok(())
1682 0 : }
1683 0 : .instrument(info_span!(
1684 0 : "handle_get_last_record_lsn",
1685 0 : shard_id = tracing::field::Empty
1686 0 : ))
1687 0 : .await?;
1688 : }
1689 : // same as basebackup, but result includes relational data as well
1690 0 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1691 0 : if params.len() < 2 {
1692 0 : return Err(QueryError::Other(anyhow::anyhow!(
1693 0 : "invalid param number for fullbackup command"
1694 0 : )));
1695 0 : }
1696 :
1697 0 : let tenant_id = TenantId::from_str(params[0])
1698 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1699 0 : let timeline_id = TimelineId::from_str(params[1])
1700 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1701 :
1702 0 : tracing::Span::current()
1703 0 : .record("tenant_id", field::display(tenant_id))
1704 0 : .record("timeline_id", field::display(timeline_id));
1705 :
1706 : // The caller is responsible for providing correct lsn and prev_lsn.
1707 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1708 : Some(
1709 0 : Lsn::from_str(lsn_str)
1710 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1711 : )
1712 : } else {
1713 0 : None
1714 : };
1715 0 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1716 : Some(
1717 0 : Lsn::from_str(prev_lsn_str)
1718 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1719 : )
1720 : } else {
1721 0 : None
1722 : };
1723 :
1724 0 : self.check_permission(Some(tenant_id))?;
1725 :
1726 : // Check that the timeline exists
1727 0 : self.handle_basebackup_request(
1728 0 : pgb,
1729 0 : tenant_id,
1730 0 : timeline_id,
1731 0 : lsn,
1732 0 : prev_lsn,
1733 0 : true,
1734 0 : false,
1735 0 : &ctx,
1736 0 : )
1737 0 : .await?;
1738 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1739 0 : } else if query_string.starts_with("import basebackup ") {
1740 : // Import the `base` section (everything but the wal) of a basebackup.
1741 : // Assumes the tenant already exists on this pageserver.
1742 : //
1743 : // Files are scheduled to be persisted to remote storage, and the
1744 : // caller should poll the http api to check when that is done.
1745 : //
1746 : // Example import command:
1747 : // 1. Get start/end LSN from backup_manifest file
1748 : // 2. Run:
1749 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1750 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1751 0 : let params = &parts[2..];
1752 0 : if params.len() != 5 {
1753 0 : return Err(QueryError::Other(anyhow::anyhow!(
1754 0 : "invalid param number for import basebackup command"
1755 0 : )));
1756 0 : }
1757 0 : let tenant_id = TenantId::from_str(params[0])
1758 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1759 0 : let timeline_id = TimelineId::from_str(params[1])
1760 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1761 0 : let base_lsn = Lsn::from_str(params[2])
1762 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1763 0 : let end_lsn = Lsn::from_str(params[3])
1764 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1765 0 : let pg_version = u32::from_str(params[4])
1766 0 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1767 :
1768 0 : tracing::Span::current()
1769 0 : .record("tenant_id", field::display(tenant_id))
1770 0 : .record("timeline_id", field::display(timeline_id));
1771 0 :
1772 0 : self.check_permission(Some(tenant_id))?;
1773 :
1774 0 : match self
1775 0 : .handle_import_basebackup(
1776 0 : pgb,
1777 0 : tenant_id,
1778 0 : timeline_id,
1779 0 : base_lsn,
1780 0 : end_lsn,
1781 0 : pg_version,
1782 0 : ctx,
1783 0 : )
1784 0 : .await
1785 : {
1786 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1787 0 : Err(e) => {
1788 0 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1789 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1790 0 : &e.to_string(),
1791 0 : Some(e.pg_error_code()),
1792 0 : ))?
1793 : }
1794 : };
1795 0 : } else if query_string.starts_with("import wal ") {
1796 : // Import the `pg_wal` section of a basebackup.
1797 : //
1798 : // Files are scheduled to be persisted to remote storage, and the
1799 : // caller should poll the http api to check when that is done.
1800 0 : let params = &parts[2..];
1801 0 : if params.len() != 4 {
1802 0 : return Err(QueryError::Other(anyhow::anyhow!(
1803 0 : "invalid param number for import wal command"
1804 0 : )));
1805 0 : }
1806 0 : let tenant_id = TenantId::from_str(params[0])
1807 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1808 0 : let timeline_id = TimelineId::from_str(params[1])
1809 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1810 0 : let start_lsn = Lsn::from_str(params[2])
1811 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1812 0 : let end_lsn = Lsn::from_str(params[3])
1813 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1814 :
1815 0 : tracing::Span::current()
1816 0 : .record("tenant_id", field::display(tenant_id))
1817 0 : .record("timeline_id", field::display(timeline_id));
1818 0 :
1819 0 : self.check_permission(Some(tenant_id))?;
1820 :
1821 0 : match self
1822 0 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1823 0 : .await
1824 : {
1825 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1826 0 : Err(e) => {
1827 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1828 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1829 0 : &e.to_string(),
1830 0 : Some(e.pg_error_code()),
1831 0 : ))?
1832 : }
1833 : };
1834 0 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1835 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1836 : // on connect
1837 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1838 0 : } else if query_string.starts_with("lease lsn ") {
1839 0 : let params = &parts[2..];
1840 0 : if params.len() != 3 {
1841 0 : return Err(QueryError::Other(anyhow::anyhow!(
1842 0 : "invalid param number {} for lease lsn command",
1843 0 : params.len()
1844 0 : )));
1845 0 : }
1846 :
1847 0 : let tenant_shard_id = TenantShardId::from_str(params[0])
1848 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1849 0 : let timeline_id = TimelineId::from_str(params[1])
1850 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1851 :
1852 0 : tracing::Span::current()
1853 0 : .record("tenant_id", field::display(tenant_shard_id))
1854 0 : .record("timeline_id", field::display(timeline_id));
1855 0 :
1856 0 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1857 :
1858 : // The caller is responsible for providing correct lsn.
1859 0 : let lsn = Lsn::from_str(params[2])
1860 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1861 :
1862 0 : match self
1863 0 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1864 0 : .await
1865 : {
1866 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1867 0 : Err(e) => {
1868 0 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1869 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1870 0 : &e.to_string(),
1871 0 : Some(e.pg_error_code()),
1872 0 : ))?
1873 : }
1874 : };
1875 0 : } else if let Some(params) = parts.strip_prefix(&["show"]) {
1876 : // show <tenant_id>
1877 0 : if params.len() != 1 {
1878 0 : return Err(QueryError::Other(anyhow::anyhow!(
1879 0 : "invalid param number for config command"
1880 0 : )));
1881 0 : }
1882 0 : let tenant_id = TenantId::from_str(params[0])
1883 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1884 :
1885 0 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1886 0 :
1887 0 : self.check_permission(Some(tenant_id))?;
1888 :
1889 0 : let tenant = self
1890 0 : .get_active_tenant_with_timeout(
1891 0 : tenant_id,
1892 0 : ShardSelector::Zero,
1893 0 : ACTIVE_TENANT_TIMEOUT,
1894 0 : )
1895 0 : .await?;
1896 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1897 0 : RowDescriptor::int8_col(b"checkpoint_distance"),
1898 0 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1899 0 : RowDescriptor::int8_col(b"compaction_target_size"),
1900 0 : RowDescriptor::int8_col(b"compaction_period"),
1901 0 : RowDescriptor::int8_col(b"compaction_threshold"),
1902 0 : RowDescriptor::int8_col(b"gc_horizon"),
1903 0 : RowDescriptor::int8_col(b"gc_period"),
1904 0 : RowDescriptor::int8_col(b"image_creation_threshold"),
1905 0 : RowDescriptor::int8_col(b"pitr_interval"),
1906 0 : ]))?
1907 0 : .write_message_noflush(&BeMessage::DataRow(&[
1908 0 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1909 0 : Some(
1910 0 : tenant
1911 0 : .get_checkpoint_timeout()
1912 0 : .as_secs()
1913 0 : .to_string()
1914 0 : .as_bytes(),
1915 0 : ),
1916 0 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1917 0 : Some(
1918 0 : tenant
1919 0 : .get_compaction_period()
1920 0 : .as_secs()
1921 0 : .to_string()
1922 0 : .as_bytes(),
1923 0 : ),
1924 0 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1925 0 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1926 0 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1927 0 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1928 0 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1929 0 : ]))?
1930 0 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1931 : } else {
1932 0 : return Err(QueryError::Other(anyhow::anyhow!(
1933 0 : "unknown command {query_string}"
1934 0 : )));
1935 : }
1936 :
1937 0 : Ok(())
1938 0 : }
1939 : }
1940 :
1941 : impl From<GetActiveTenantError> for QueryError {
1942 0 : fn from(e: GetActiveTenantError) -> Self {
1943 0 : match e {
1944 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1945 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1946 0 : ),
1947 : GetActiveTenantError::Cancelled
1948 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1949 0 : QueryError::Shutdown
1950 : }
1951 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1952 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1953 : }
1954 0 : }
1955 : }
1956 :
1957 0 : #[derive(Debug, thiserror::Error)]
1958 : enum GetActiveTimelineError {
1959 : #[error(transparent)]
1960 : Tenant(GetActiveTenantError),
1961 : #[error(transparent)]
1962 : Timeline(#[from] GetTimelineError),
1963 : }
1964 :
1965 : impl From<GetActiveTimelineError> for QueryError {
1966 0 : fn from(e: GetActiveTimelineError) -> Self {
1967 0 : match e {
1968 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1969 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1970 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1971 : }
1972 0 : }
1973 : }
1974 :
1975 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1976 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1977 0 : tracing::Span::current().record(
1978 0 : "shard_id",
1979 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1980 0 : );
1981 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1982 0 : }
|