Line data Source code
1 : //! The Page Service listens for client connections and serves their GetPage@LSN
2 : //! requests.
3 :
4 : use anyhow::Context;
5 : use async_compression::tokio::write::GzipEncoder;
6 : use bytes::Buf;
7 : use bytes::Bytes;
8 : use futures::stream::FuturesUnordered;
9 : use futures::Stream;
10 : use futures::StreamExt;
11 : use pageserver_api::key::Key;
12 : use pageserver_api::models::TenantState;
13 : use pageserver_api::models::{
14 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
15 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
16 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
17 : PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
18 : PagestreamNblocksResponse, PagestreamProtocolVersion,
19 : };
20 : use pageserver_api::shard::ShardIndex;
21 : use pageserver_api::shard::ShardNumber;
22 : use pageserver_api::shard::TenantShardId;
23 : use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
24 : use pq_proto::framed::ConnectionError;
25 : use pq_proto::FeStartupPacket;
26 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
27 : use std::borrow::Cow;
28 : use std::collections::HashMap;
29 : use std::io;
30 : use std::net::TcpListener;
31 : use std::pin::pin;
32 : use std::str;
33 : use std::str::FromStr;
34 : use std::sync::Arc;
35 : use std::time::Duration;
36 : use std::time::Instant;
37 : use std::time::SystemTime;
38 : use tokio::io::AsyncWriteExt;
39 : use tokio::io::{AsyncRead, AsyncWrite};
40 : use tokio_util::io::StreamReader;
41 : use tokio_util::sync::CancellationToken;
42 : use tracing::*;
43 : use utils::id::ConnectionId;
44 : use utils::sync::gate::GateGuard;
45 : use utils::{
46 : auth::{Claims, Scope, SwappableJwtAuth},
47 : id::{TenantId, TimelineId},
48 : lsn::Lsn,
49 : simple_rcu::RcuReadGuard,
50 : };
51 :
52 : use crate::auth::check_permission;
53 : use crate::basebackup;
54 : use crate::basebackup::BasebackupError;
55 : use crate::context::{DownloadBehavior, RequestContext};
56 : use crate::import_datadir::import_wal_from_tar;
57 : use crate::metrics;
58 : use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
59 : use crate::pgdatadir_mapping::Version;
60 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
61 : use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
62 : use crate::task_mgr;
63 : use crate::task_mgr::TaskKind;
64 : use crate::tenant::mgr::GetActiveTenantError;
65 : use crate::tenant::mgr::GetTenantError;
66 : use crate::tenant::mgr::ShardResolveResult;
67 : use crate::tenant::mgr::ShardSelector;
68 : use crate::tenant::mgr::TenantManager;
69 : use crate::tenant::timeline::FlushLayerError;
70 : use crate::tenant::timeline::WaitLsnError;
71 : use crate::tenant::GetTimelineError;
72 : use crate::tenant::PageReconstructError;
73 : use crate::tenant::Tenant;
74 : use crate::tenant::Timeline;
75 : use crate::trace::Tracer;
76 : use pageserver_api::key::rel_block_to_key;
77 : use pageserver_api::reltag::SlruKind;
78 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
79 : use postgres_ffi::BLCKSZ;
80 :
81 : // How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which
82 : // is not yet in state [`TenantState::Active`].
83 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
84 :
85 : /// Read the end of a tar archive.
86 : ///
87 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
88 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
89 : /// and check that there is no more data after the EOF marker.
90 : ///
91 : /// 'tar' command can also write extra blocks of zeros, up to a record
92 : /// size, controlled by the --record-size argument. Ignore them too.
93 0 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
94 0 : use tokio::io::AsyncReadExt;
95 0 : let mut buf = [0u8; 512];
96 0 :
97 0 : // Read the all-zeros block, and verify it
98 0 : let mut total_bytes = 0;
99 0 : while total_bytes < 512 {
100 0 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
101 0 : total_bytes += nbytes;
102 0 : if nbytes == 0 {
103 0 : break;
104 0 : }
105 : }
106 0 : if total_bytes < 512 {
107 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
108 0 : }
109 0 : if !buf.iter().all(|&x| x == 0) {
110 0 : anyhow::bail!("invalid tar EOF marker");
111 0 : }
112 0 :
113 0 : // Drain any extra zero-blocks after the EOF marker
114 0 : let mut trailing_bytes = 0;
115 0 : let mut seen_nonzero_bytes = false;
116 : loop {
117 0 : let nbytes = reader.read(&mut buf).await?;
118 0 : trailing_bytes += nbytes;
119 0 : if !buf.iter().all(|&x| x == 0) {
120 0 : seen_nonzero_bytes = true;
121 0 : }
122 0 : if nbytes == 0 {
123 0 : break;
124 0 : }
125 : }
126 0 : if seen_nonzero_bytes {
127 0 : anyhow::bail!("unexpected non-zero bytes after the tar archive");
128 0 : }
129 0 : if trailing_bytes % 512 != 0 {
130 0 : anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
131 0 : }
132 0 : Ok(())
133 0 : }
134 :
135 : ///////////////////////////////////////////////////////////////////////////////
136 :
137 : ///
138 : /// Main loop of the page service.
139 : ///
140 : /// Listens for connections, and launches a new handler task for each.
141 : ///
142 0 : pub async fn libpq_listener_main(
143 0 : tenant_manager: Arc<TenantManager>,
144 0 : broker_client: storage_broker::BrokerClientChannel,
145 0 : auth: Option<Arc<SwappableJwtAuth>>,
146 0 : listener: TcpListener,
147 0 : auth_type: AuthType,
148 0 : listener_ctx: RequestContext,
149 0 : cancel: CancellationToken,
150 0 : ) -> anyhow::Result<()> {
151 0 : listener.set_nonblocking(true)?;
152 0 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
153 :
154 : // Wait for a new connection to arrive, or for server shutdown.
155 0 : while let Some(res) = tokio::select! {
156 : biased;
157 :
158 : _ = cancel.cancelled() => {
159 : // We were requested to shut down.
160 : None
161 : }
162 :
163 : res = tokio_listener.accept() => {
164 : Some(res)
165 : }
166 : } {
167 0 : match res {
168 0 : Ok((socket, peer_addr)) => {
169 0 : // Connection established. Spawn a new task to handle it.
170 0 : debug!("accepted connection from {}", peer_addr);
171 0 : let local_auth = auth.clone();
172 0 :
173 0 : let connection_ctx = listener_ctx
174 0 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
175 0 :
176 0 : // PageRequestHandler tasks are not associated with any particular
177 0 : // timeline in the task manager. In practice most connections will
178 0 : // only deal with a particular timeline, but we don't know which one
179 0 : // yet.
180 0 : task_mgr::spawn(
181 0 : &tokio::runtime::Handle::current(),
182 0 : TaskKind::PageRequestHandler,
183 0 : None,
184 0 : None,
185 0 : "serving compute connection task",
186 0 : false,
187 0 : page_service_conn_main(
188 0 : tenant_manager.clone(),
189 0 : broker_client.clone(),
190 0 : local_auth,
191 0 : socket,
192 0 : auth_type,
193 0 : connection_ctx,
194 0 : ),
195 0 : );
196 : }
197 0 : Err(err) => {
198 0 : // accept() failed. Log the error, and loop back to retry on next connection.
199 0 : error!("accept() failed: {:?}", err);
200 : }
201 : }
202 : }
203 :
204 0 : debug!("page_service loop terminated");
205 :
206 0 : Ok(())
207 0 : }
208 :
209 0 : #[instrument(skip_all, fields(peer_addr))]
210 : async fn page_service_conn_main(
211 : tenant_manager: Arc<TenantManager>,
212 : broker_client: storage_broker::BrokerClientChannel,
213 : auth: Option<Arc<SwappableJwtAuth>>,
214 : socket: tokio::net::TcpStream,
215 : auth_type: AuthType,
216 : connection_ctx: RequestContext,
217 : ) -> anyhow::Result<()> {
218 : let _guard = LIVE_CONNECTIONS
219 : .with_label_values(&["page_service"])
220 : .guard();
221 :
222 : socket
223 : .set_nodelay(true)
224 : .context("could not set TCP_NODELAY")?;
225 :
226 : let peer_addr = socket.peer_addr().context("get peer address")?;
227 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
228 :
229 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
230 : // - long enough for most valid compute connections
231 : // - less than infinite to stop us from "leaking" connections to long-gone computes
232 : //
233 : // no write timeout is used, because the kernel is assumed to error writes after some time.
234 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
235 :
236 : let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
237 0 : let socket_timeout_ms = (|| {
238 0 : fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
239 : // Exponential distribution for simulating
240 : // poor network conditions, expect about avg_timeout_ms to be around 15
241 : // in tests
242 0 : if let Some(avg_timeout_ms) = avg_timeout_ms {
243 0 : let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
244 0 : let u = rand::random::<f32>();
245 0 : ((1.0 - u).ln() / (-avg)) as u64
246 : } else {
247 0 : default_timeout_ms
248 : }
249 0 : });
250 0 : default_timeout_ms
251 : })();
252 :
253 : // A timeout here does not mean the client died, it can happen if it's just idle for
254 : // a while: we will tear down this PageServerHandler and instantiate a new one if/when
255 : // they reconnect.
256 : socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms)));
257 : let socket = std::pin::pin!(socket);
258 :
259 : fail::fail_point!("ps::connection-start::pre-login");
260 :
261 : // XXX: pgbackend.run() should take the connection_ctx,
262 : // and create a child per-query context when it invokes process_query.
263 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
264 : // and create the per-query context in process_query ourselves.
265 : let mut conn_handler =
266 : PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx);
267 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
268 :
269 : match pgbackend
270 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
271 : .await
272 : {
273 : Ok(()) => {
274 : // we've been requested to shut down
275 : Ok(())
276 : }
277 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
278 : if is_expected_io_error(&io_error) {
279 : info!("Postgres client disconnected ({io_error})");
280 : Ok(())
281 : } else {
282 : Err(io_error).context("Postgres connection error")
283 : }
284 : }
285 : other => other.context("Postgres query error"),
286 : }
287 : }
288 :
289 : /// While a handler holds a reference to a Timeline, it also holds a the
290 : /// timeline's Gate open.
291 : struct HandlerTimeline {
292 : timeline: Arc<Timeline>,
293 : _guard: GateGuard,
294 : }
295 :
296 : struct PageServerHandler {
297 : broker_client: storage_broker::BrokerClientChannel,
298 : auth: Option<Arc<SwappableJwtAuth>>,
299 : claims: Option<Claims>,
300 :
301 : tenant_manager: Arc<TenantManager>,
302 :
303 : /// The context created for the lifetime of the connection
304 : /// services by this PageServerHandler.
305 : /// For each query received over the connection,
306 : /// `process_query` creates a child context from this one.
307 : connection_ctx: RequestContext,
308 :
309 : /// See [`Self::cache_timeline`] for usage.
310 : ///
311 : /// Note on size: the typical size of this map is 1. The largest size we expect
312 : /// to see is the number of shards divided by the number of pageservers (typically < 2),
313 : /// or the ratio used when splitting shards (i.e. how many children created from one)
314 : /// parent shard, where a "large" number might be ~8.
315 : shard_timelines: HashMap<ShardIndex, HandlerTimeline>,
316 : }
317 :
318 0 : #[derive(thiserror::Error, Debug)]
319 : enum PageStreamError {
320 : /// We encountered an error that should prompt the client to reconnect:
321 : /// in practice this means we drop the connection without sending a response.
322 : #[error("Reconnect required: {0}")]
323 : Reconnect(Cow<'static, str>),
324 :
325 : /// We were instructed to shutdown while processing the query
326 : #[error("Shutting down")]
327 : Shutdown,
328 :
329 : /// Something went wrong reading a page: this likely indicates a pageserver bug
330 : #[error("Read error")]
331 : Read(#[source] PageReconstructError),
332 :
333 : /// Ran out of time waiting for an LSN
334 : #[error("LSN timeout: {0}")]
335 : LsnTimeout(WaitLsnError),
336 :
337 : /// The entity required to serve the request (tenant or timeline) is not found,
338 : /// or is not found in a suitable state to serve a request.
339 : #[error("Not found: {0}")]
340 : NotFound(Cow<'static, str>),
341 :
342 : /// Request asked for something that doesn't make sense, like an invalid LSN
343 : #[error("Bad request: {0}")]
344 : BadRequest(Cow<'static, str>),
345 : }
346 :
347 : impl From<PageReconstructError> for PageStreamError {
348 0 : fn from(value: PageReconstructError) -> Self {
349 0 : match value {
350 0 : PageReconstructError::Cancelled => Self::Shutdown,
351 0 : e => Self::Read(e),
352 : }
353 0 : }
354 : }
355 :
356 : impl From<GetActiveTimelineError> for PageStreamError {
357 0 : fn from(value: GetActiveTimelineError) -> Self {
358 0 : match value {
359 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown,
360 0 : GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()),
361 0 : GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()),
362 : }
363 0 : }
364 : }
365 :
366 : impl From<WaitLsnError> for PageStreamError {
367 0 : fn from(value: WaitLsnError) -> Self {
368 0 : match value {
369 0 : e @ WaitLsnError::Timeout(_) => Self::LsnTimeout(e),
370 0 : WaitLsnError::Shutdown => Self::Shutdown,
371 0 : e @ WaitLsnError::BadState { .. } => Self::Reconnect(format!("{e}").into()),
372 : }
373 0 : }
374 : }
375 :
376 : impl From<WaitLsnError> for QueryError {
377 0 : fn from(value: WaitLsnError) -> Self {
378 0 : match value {
379 0 : e @ WaitLsnError::Timeout(_) => Self::Other(anyhow::Error::new(e)),
380 0 : WaitLsnError::Shutdown => Self::Shutdown,
381 0 : WaitLsnError::BadState { .. } => Self::Reconnect,
382 : }
383 0 : }
384 : }
385 :
386 : impl PageServerHandler {
387 0 : pub fn new(
388 0 : tenant_manager: Arc<TenantManager>,
389 0 : broker_client: storage_broker::BrokerClientChannel,
390 0 : auth: Option<Arc<SwappableJwtAuth>>,
391 0 : connection_ctx: RequestContext,
392 0 : ) -> Self {
393 0 : PageServerHandler {
394 0 : tenant_manager,
395 0 : broker_client,
396 0 : auth,
397 0 : claims: None,
398 0 : connection_ctx,
399 0 : shard_timelines: HashMap::new(),
400 0 : }
401 0 : }
402 :
403 : /// Future that completes when we need to shut down the connection.
404 : ///
405 : /// We currently need to shut down when any of the following happens:
406 : /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
407 : /// 2. task_mgr requests shutdown of the connection
408 : ///
409 : /// NB on (1): the connection's lifecycle is not actually tied to any of the
410 : /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current
411 : /// implementation to be responsive to timeline cancellation because
412 : /// the connection holds their `GateGuards` open (sored in `shard_timelines`).
413 : /// We currently do the easy thing and terminate the connection if any of the
414 : /// shard_timelines gets cancelled. But really, we cuold spend more effort
415 : /// and simply remove the cancelled timeline from the `shard_timelines`, thereby
416 : /// dropping the guard.
417 : ///
418 : /// NB: keep in sync with [`Self::is_connection_cancelled`]
419 0 : async fn await_connection_cancelled(&self) {
420 0 : // A short wait before we expend the cycles to walk our timeline map. This avoids incurring
421 0 : // that cost every time we check for cancellation.
422 0 : tokio::time::sleep(Duration::from_millis(10)).await;
423 :
424 : // This function is never called concurrently with code that adds timelines to shard_timelines,
425 : // which is enforced by the borrow checker (the future returned by this function carries the
426 : // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk
427 : // missing any inserts to the map.
428 :
429 0 : let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len());
430 0 : use futures::future::Either;
431 0 : cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher()));
432 0 : cancellation_sources.extend(
433 0 : self.shard_timelines
434 0 : .values()
435 0 : .map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
436 0 : );
437 0 : FuturesUnordered::from_iter(cancellation_sources)
438 0 : .next()
439 0 : .await;
440 0 : }
441 :
442 : /// Checking variant of [`Self::await_connection_cancelled`].
443 0 : fn is_connection_cancelled(&self) -> bool {
444 0 : task_mgr::is_shutdown_requested()
445 0 : || self
446 0 : .shard_timelines
447 0 : .values()
448 0 : .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
449 0 : }
450 :
451 : /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
452 : /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect
453 : /// cancellation if there aren't any timelines in the cache.
454 : ///
455 : /// If calling from a function that doesn't use the `[Self::shard_timelines]` cache, then pass in the
456 : /// timeline cancellation token.
457 0 : async fn flush_cancellable<IO>(
458 0 : &self,
459 0 : pgb: &mut PostgresBackend<IO>,
460 0 : cancel: &CancellationToken,
461 0 : ) -> Result<(), QueryError>
462 0 : where
463 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
464 0 : {
465 : tokio::select!(
466 : flush_r = pgb.flush() => {
467 : Ok(flush_r?)
468 : },
469 : _ = self.await_connection_cancelled() => {
470 : Err(QueryError::Shutdown)
471 : }
472 : _ = cancel.cancelled() => {
473 : Err(QueryError::Shutdown)
474 : }
475 : )
476 0 : }
477 :
478 0 : fn copyin_stream<'a, IO>(
479 0 : &'a self,
480 0 : pgb: &'a mut PostgresBackend<IO>,
481 0 : cancel: &'a CancellationToken,
482 0 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
483 0 : where
484 0 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
485 0 : {
486 : async_stream::try_stream! {
487 : loop {
488 : let msg = tokio::select! {
489 : biased;
490 :
491 : _ = cancel.cancelled() => {
492 : // We were requested to shut down.
493 : let msg = "pageserver is shutting down";
494 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
495 : Err(QueryError::Shutdown)
496 : }
497 :
498 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
499 : };
500 :
501 : match msg {
502 : Ok(Some(message)) => {
503 : let copy_data_bytes = match message {
504 : FeMessage::CopyData(bytes) => bytes,
505 : FeMessage::CopyDone => { break },
506 : FeMessage::Sync => continue,
507 : FeMessage::Terminate => {
508 : let msg = "client terminated connection with Terminate message during COPY";
509 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
510 : // error can't happen here, ErrorResponse serialization should be always ok
511 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
512 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
513 : break;
514 : }
515 : m => {
516 : let msg = format!("unexpected message {m:?}");
517 : // error can't happen here, ErrorResponse serialization should be always ok
518 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
519 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
520 : break;
521 : }
522 : };
523 :
524 : yield copy_data_bytes;
525 : }
526 : Ok(None) => {
527 : let msg = "client closed connection during COPY";
528 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
529 : // error can't happen here, ErrorResponse serialization should be always ok
530 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
531 0 : self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
532 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
533 : }
534 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
535 : Err(io_error)?;
536 : }
537 : Err(other) => {
538 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
539 : }
540 : };
541 : }
542 : }
543 0 : }
544 :
545 0 : #[instrument(skip_all)]
546 : async fn handle_pagerequests<IO>(
547 : &mut self,
548 : pgb: &mut PostgresBackend<IO>,
549 : tenant_id: TenantId,
550 : timeline_id: TimelineId,
551 : protocol_version: PagestreamProtocolVersion,
552 : ctx: RequestContext,
553 : ) -> Result<(), QueryError>
554 : where
555 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
556 : {
557 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
558 :
559 : let tenant = self
560 : .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT)
561 : .await?;
562 :
563 : // Make request tracer if needed
564 : let mut tracer = if tenant.get_trace_read_requests() {
565 : let connection_id = ConnectionId::generate();
566 : let path =
567 : tenant
568 : .conf
569 : .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
570 : Some(Tracer::new(path))
571 : } else {
572 : None
573 : };
574 :
575 : // switch client to COPYBOTH
576 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
577 : self.flush_cancellable(pgb, &tenant.cancel).await?;
578 :
579 : loop {
580 : let msg = tokio::select! {
581 : biased;
582 :
583 : _ = self.await_connection_cancelled() => {
584 : // We were requested to shut down.
585 : info!("shutdown request received in page handler");
586 : return Err(QueryError::Shutdown)
587 : }
588 :
589 : msg = pgb.read_message() => { msg }
590 : };
591 :
592 : let copy_data_bytes = match msg? {
593 : Some(FeMessage::CopyData(bytes)) => bytes,
594 : Some(FeMessage::Terminate) => break,
595 : Some(m) => {
596 : return Err(QueryError::Other(anyhow::anyhow!(
597 : "unexpected message: {m:?} during COPY"
598 : )));
599 : }
600 : None => break, // client disconnected
601 : };
602 :
603 : trace!("query: {copy_data_bytes:?}");
604 : fail::fail_point!("ps::handle-pagerequest-message");
605 :
606 : // Trace request if needed
607 : if let Some(t) = tracer.as_mut() {
608 : t.trace(©_data_bytes)
609 : }
610 :
611 : let neon_fe_msg =
612 : PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
613 :
614 : // TODO: We could create a new per-request context here, with unique ID.
615 : // Currently we use the same per-timeline context for all requests
616 :
617 : let (response, span) = match neon_fe_msg {
618 : PagestreamFeMessage::Exists(req) => {
619 : fail::fail_point!("ps::handle-pagerequest-message::exists");
620 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
621 : (
622 : self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
623 : .instrument(span.clone())
624 : .await,
625 : span,
626 : )
627 : }
628 : PagestreamFeMessage::Nblocks(req) => {
629 : fail::fail_point!("ps::handle-pagerequest-message::nblocks");
630 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
631 : (
632 : self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
633 : .instrument(span.clone())
634 : .await,
635 : span,
636 : )
637 : }
638 : PagestreamFeMessage::GetPage(req) => {
639 : fail::fail_point!("ps::handle-pagerequest-message::getpage");
640 : // shard_id is filled in by the handler
641 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
642 : (
643 : self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
644 : .instrument(span.clone())
645 : .await,
646 : span,
647 : )
648 : }
649 : PagestreamFeMessage::DbSize(req) => {
650 : fail::fail_point!("ps::handle-pagerequest-message::dbsize");
651 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
652 : (
653 : self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
654 : .instrument(span.clone())
655 : .await,
656 : span,
657 : )
658 : }
659 : PagestreamFeMessage::GetSlruSegment(req) => {
660 : fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
661 : let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
662 : (
663 : self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
664 : .instrument(span.clone())
665 : .await,
666 : span,
667 : )
668 : }
669 : };
670 :
671 : match response {
672 : Err(PageStreamError::Shutdown) => {
673 : // If we fail to fulfil a request during shutdown, which may be _because_ of
674 : // shutdown, then do not send the error to the client. Instead just drop the
675 : // connection.
676 0 : span.in_scope(|| info!("dropping connection due to shutdown"));
677 : return Err(QueryError::Shutdown);
678 : }
679 : Err(PageStreamError::Reconnect(reason)) => {
680 0 : span.in_scope(|| info!("handler requested reconnect: {reason}"));
681 : return Err(QueryError::Reconnect);
682 : }
683 : Err(e) if self.is_connection_cancelled() => {
684 : // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
685 : // shutdown error, this may be buried inside a PageReconstructError::Other for example.
686 : //
687 : // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
688 : // because wait_lsn etc will drop out
689 : // is_stopping(): [`Timeline::flush_and_shutdown`] has entered
690 : // is_canceled(): [`Timeline::shutdown`]` has entered
691 0 : span.in_scope(|| info!("dropped error response during shutdown: {e:#}"));
692 : return Err(QueryError::Shutdown);
693 : }
694 : r => {
695 0 : let response_msg = r.unwrap_or_else(|e| {
696 0 : // print the all details to the log with {:#}, but for the client the
697 0 : // error message is enough. Do not log if shutting down, as the anyhow::Error
698 0 : // here includes cancellation which is not an error.
699 0 : let full = utils::error::report_compact_sources(&e);
700 0 : span.in_scope(|| {
701 0 : error!("error reading relation or page version: {full:#}")
702 0 : });
703 0 : PagestreamBeMessage::Error(PagestreamErrorResponse {
704 0 : message: e.to_string(),
705 0 : })
706 0 : });
707 :
708 : pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
709 : self.flush_cancellable(pgb, &tenant.cancel).await?;
710 : }
711 : }
712 : }
713 : Ok(())
714 : }
715 :
716 : #[allow(clippy::too_many_arguments)]
717 0 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
718 : async fn handle_import_basebackup<IO>(
719 : &self,
720 : pgb: &mut PostgresBackend<IO>,
721 : tenant_id: TenantId,
722 : timeline_id: TimelineId,
723 : base_lsn: Lsn,
724 : _end_lsn: Lsn,
725 : pg_version: u32,
726 : ctx: RequestContext,
727 : ) -> Result<(), QueryError>
728 : where
729 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
730 : {
731 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
732 :
733 : // Create empty timeline
734 : info!("creating new timeline");
735 : let tenant = self
736 : .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT)
737 : .await?;
738 : let timeline = tenant
739 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
740 : .await?;
741 :
742 : // TODO mark timeline as not ready until it reaches end_lsn.
743 : // We might have some wal to import as well, and we should prevent compute
744 : // from connecting before that and writing conflicting wal.
745 : //
746 : // This is not relevant for pageserver->pageserver migrations, since there's
747 : // no wal to import. But should be fixed if we want to import from postgres.
748 :
749 : // TODO leave clean state on error. For now you can use detach to clean
750 : // up broken state from a failed import.
751 :
752 : // Import basebackup provided via CopyData
753 : info!("importing basebackup");
754 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
755 : self.flush_cancellable(pgb, &tenant.cancel).await?;
756 :
757 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
758 : timeline
759 : .import_basebackup_from_tar(
760 : tenant.clone(),
761 : &mut copyin_reader,
762 : base_lsn,
763 : self.broker_client.clone(),
764 : &ctx,
765 : )
766 : .await?;
767 :
768 : // Read the end of the tar archive.
769 : read_tar_eof(copyin_reader).await?;
770 :
771 : // TODO check checksum
772 : // Meanwhile you can verify client-side by taking fullbackup
773 : // and checking that it matches in size with what was imported.
774 : // It wouldn't work if base came from vanilla postgres though,
775 : // since we discard some log files.
776 :
777 : info!("done");
778 : Ok(())
779 : }
780 :
781 0 : #[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))]
782 : async fn handle_import_wal<IO>(
783 : &self,
784 : pgb: &mut PostgresBackend<IO>,
785 : tenant_id: TenantId,
786 : timeline_id: TimelineId,
787 : start_lsn: Lsn,
788 : end_lsn: Lsn,
789 : ctx: RequestContext,
790 : ) -> Result<(), QueryError>
791 : where
792 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
793 : {
794 : let timeline = self
795 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
796 : .await?;
797 : let last_record_lsn = timeline.get_last_record_lsn();
798 : if last_record_lsn != start_lsn {
799 : return Err(QueryError::Other(
800 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
801 : );
802 : }
803 :
804 : // TODO leave clean state on error. For now you can use detach to clean
805 : // up broken state from a failed import.
806 :
807 : // Import wal provided via CopyData
808 : info!("importing wal");
809 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
810 : self.flush_cancellable(pgb, &timeline.cancel).await?;
811 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
812 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
813 : info!("wal import complete");
814 :
815 : // Read the end of the tar archive.
816 : read_tar_eof(copyin_reader).await?;
817 :
818 : // TODO Does it make sense to overshoot?
819 : if timeline.get_last_record_lsn() < end_lsn {
820 : return Err(QueryError::Other(
821 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
822 : );
823 : }
824 :
825 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
826 : // We only want to persist the data, and it doesn't matter if it's in the
827 : // shape of deltas or images.
828 : info!("flushing layers");
829 0 : timeline.freeze_and_flush().await.map_err(|e| match e {
830 0 : FlushLayerError::Cancelled => QueryError::Shutdown,
831 0 : other => QueryError::Other(other.into()),
832 0 : })?;
833 :
834 : info!("done");
835 : Ok(())
836 : }
837 :
838 : /// Helper function to handle the LSN from client request.
839 : ///
840 : /// Each GetPage (and Exists and Nblocks) request includes information about
841 : /// which version of the page is being requested. The primary compute node
842 : /// will always request the latest page version, by setting 'request_lsn' to
843 : /// the last inserted or flushed WAL position, while a standby will request
844 : /// a version at the LSN that it's currently caught up to.
845 : ///
846 : /// In either case, if the page server hasn't received the WAL up to the
847 : /// requested LSN yet, we will wait for it to arrive. The return value is
848 : /// the LSN that should be used to look up the page versions.
849 : ///
850 : /// In addition to the request LSN, each request carries another LSN,
851 : /// 'not_modified_since', which is a hint to the pageserver that the client
852 : /// knows that the page has not been modified between 'not_modified_since'
853 : /// and the request LSN. This allows skipping the wait, as long as the WAL
854 : /// up to 'not_modified_since' has arrived. If the client doesn't have any
855 : /// information about when the page was modified, it will use
856 : /// not_modified_since == lsn. If the client lies and sends a too low
857 : /// not_modified_hint such that there are in fact later page versions, the
858 : /// behavior is undefined: the pageserver may return any of the page versions
859 : /// or an error.
860 0 : async fn wait_or_get_last_lsn(
861 0 : timeline: &Timeline,
862 0 : request_lsn: Lsn,
863 0 : not_modified_since: Lsn,
864 0 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
865 0 : ctx: &RequestContext,
866 0 : ) -> Result<Lsn, PageStreamError> {
867 0 : let last_record_lsn = timeline.get_last_record_lsn();
868 0 :
869 0 : // Sanity check the request
870 0 : if request_lsn < not_modified_since {
871 0 : return Err(PageStreamError::BadRequest(
872 0 : format!(
873 0 : "invalid request with request LSN {} and not_modified_since {}",
874 0 : request_lsn, not_modified_since,
875 0 : )
876 0 : .into(),
877 0 : ));
878 0 : }
879 0 :
880 0 : if request_lsn < **latest_gc_cutoff_lsn {
881 : // Check explicitly for INVALID just to get a less scary error message if the
882 : // request is obviously bogus
883 0 : return Err(if request_lsn == Lsn::INVALID {
884 0 : PageStreamError::BadRequest("invalid LSN(0) in request".into())
885 : } else {
886 0 : PageStreamError::BadRequest(format!(
887 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
888 0 : request_lsn, **latest_gc_cutoff_lsn
889 0 : ).into())
890 : });
891 0 : }
892 0 :
893 0 : // Wait for WAL up to 'not_modified_since' to arrive, if necessary
894 0 : if not_modified_since > last_record_lsn {
895 0 : timeline
896 0 : .wait_lsn(
897 0 : not_modified_since,
898 0 : crate::tenant::timeline::WaitLsnWaiter::PageService,
899 0 : ctx,
900 0 : )
901 0 : .await?;
902 : // Since we waited for 'not_modified_since' to arrive, that is now the last
903 : // record LSN. (Or close enough for our purposes; the last-record LSN can
904 : // advance immediately after we return anyway)
905 0 : Ok(not_modified_since)
906 : } else {
907 : // It might be better to use max(not_modified_since, latest_gc_cutoff_lsn)
908 : // here instead. That would give the same result, since we know that there
909 : // haven't been any modifications since 'not_modified_since'. Using an older
910 : // LSN might be faster, because that could allow skipping recent layers when
911 : // finding the page. However, we have historically used 'last_record_lsn', so
912 : // stick to that for now.
913 0 : Ok(std::cmp::min(last_record_lsn, request_lsn))
914 : }
915 0 : }
916 :
917 0 : #[instrument(skip_all, fields(shard_id, %lsn))]
918 : async fn handle_make_lsn_lease<IO>(
919 : &self,
920 : pgb: &mut PostgresBackend<IO>,
921 : tenant_shard_id: TenantShardId,
922 : timeline_id: TimelineId,
923 : lsn: Lsn,
924 : ctx: &RequestContext,
925 : ) -> Result<(), QueryError>
926 : where
927 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
928 : {
929 : let shard_selector = ShardSelector::Known(tenant_shard_id.to_index());
930 : let timeline = self
931 : .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector)
932 : .await?;
933 : let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?;
934 : let valid_until = lease
935 : .valid_until
936 : .duration_since(SystemTime::UNIX_EPOCH)
937 0 : .map_err(|e| QueryError::Other(e.into()))?;
938 :
939 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
940 : b"valid_until",
941 : )]))?
942 : .write_message_noflush(&BeMessage::DataRow(&[Some(
943 : &valid_until.as_millis().to_be_bytes(),
944 : )]))?
945 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
946 :
947 : Ok(())
948 : }
949 :
950 0 : #[instrument(skip_all, fields(shard_id))]
951 : async fn handle_get_rel_exists_request(
952 : &mut self,
953 : tenant_id: TenantId,
954 : timeline_id: TimelineId,
955 : req: &PagestreamExistsRequest,
956 : ctx: &RequestContext,
957 : ) -> Result<PagestreamBeMessage, PageStreamError> {
958 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
959 : let _timer = timeline
960 : .query_metrics
961 : .start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
962 :
963 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
964 : let lsn = Self::wait_or_get_last_lsn(
965 : timeline,
966 : req.request_lsn,
967 : req.not_modified_since,
968 : &latest_gc_cutoff_lsn,
969 : ctx,
970 : )
971 : .await?;
972 :
973 : let exists = timeline
974 : .get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
975 : .await?;
976 :
977 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
978 : exists,
979 : }))
980 : }
981 :
982 0 : #[instrument(skip_all, fields(shard_id))]
983 : async fn handle_get_nblocks_request(
984 : &mut self,
985 : tenant_id: TenantId,
986 : timeline_id: TimelineId,
987 : req: &PagestreamNblocksRequest,
988 : ctx: &RequestContext,
989 : ) -> Result<PagestreamBeMessage, PageStreamError> {
990 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
991 :
992 : let _timer = timeline
993 : .query_metrics
994 : .start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
995 :
996 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
997 : let lsn = Self::wait_or_get_last_lsn(
998 : timeline,
999 : req.request_lsn,
1000 : req.not_modified_since,
1001 : &latest_gc_cutoff_lsn,
1002 : ctx,
1003 : )
1004 : .await?;
1005 :
1006 : let n_blocks = timeline
1007 : .get_rel_size(req.rel, Version::Lsn(lsn), ctx)
1008 : .await?;
1009 :
1010 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
1011 : n_blocks,
1012 : }))
1013 : }
1014 :
1015 0 : #[instrument(skip_all, fields(shard_id))]
1016 : async fn handle_db_size_request(
1017 : &mut self,
1018 : tenant_id: TenantId,
1019 : timeline_id: TimelineId,
1020 : req: &PagestreamDbSizeRequest,
1021 : ctx: &RequestContext,
1022 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1023 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1024 :
1025 : let _timer = timeline
1026 : .query_metrics
1027 : .start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
1028 :
1029 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1030 : let lsn = Self::wait_or_get_last_lsn(
1031 : timeline,
1032 : req.request_lsn,
1033 : req.not_modified_since,
1034 : &latest_gc_cutoff_lsn,
1035 : ctx,
1036 : )
1037 : .await?;
1038 :
1039 : let total_blocks = timeline
1040 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
1041 : .await?;
1042 : let db_size = total_blocks as i64 * BLCKSZ as i64;
1043 :
1044 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
1045 : db_size,
1046 : }))
1047 : }
1048 :
1049 : /// For most getpage requests, we will already have a Timeline to serve the request: this function
1050 : /// looks up such a Timeline synchronously and without touching any global state.
1051 0 : fn get_cached_timeline_for_page(
1052 0 : &mut self,
1053 0 : req: &PagestreamGetPageRequest,
1054 0 : ) -> Result<&Arc<Timeline>, Key> {
1055 0 : let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
1056 : // Fastest path: single sharded case
1057 0 : if first_idx.shard_count.count() == 1 {
1058 0 : return Ok(&first_timeline.timeline);
1059 0 : }
1060 0 :
1061 0 : let key = rel_block_to_key(req.rel, req.blkno);
1062 0 : let shard_num = first_timeline
1063 0 : .timeline
1064 0 : .get_shard_identity()
1065 0 : .get_shard_number(&key);
1066 0 :
1067 0 : // Fast path: matched the first timeline in our local handler map. This case is common if
1068 0 : // only one shard per tenant is attached to this pageserver.
1069 0 : if first_timeline.timeline.get_shard_identity().number == shard_num {
1070 0 : return Ok(&first_timeline.timeline);
1071 0 : }
1072 0 :
1073 0 : let shard_index = ShardIndex {
1074 0 : shard_number: shard_num,
1075 0 : shard_count: first_timeline.timeline.get_shard_identity().count,
1076 0 : };
1077 :
1078 : // Fast-ish path: timeline is in the connection handler's local cache
1079 0 : if let Some(found) = self.shard_timelines.get(&shard_index) {
1080 0 : return Ok(&found.timeline);
1081 0 : }
1082 0 :
1083 0 : key
1084 : } else {
1085 0 : rel_block_to_key(req.rel, req.blkno)
1086 : };
1087 :
1088 0 : Err(key)
1089 0 : }
1090 :
1091 : /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
1092 : /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`]
1093 : /// again.
1094 : ///
1095 : /// Note that all the Timelines in this cache are for the same timeline_id: they're differ
1096 : /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard
1097 : /// based on key.
1098 : ///
1099 : /// The typical size of this cache is 1, as we generally create shards to distribute work
1100 : /// across pageservers, so don't tend to have multiple shards for the same tenant on the
1101 : /// same pageserver.
1102 0 : fn cache_timeline(
1103 0 : &mut self,
1104 0 : timeline: Arc<Timeline>,
1105 0 : ) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
1106 0 : let gate_guard = timeline
1107 0 : .gate
1108 0 : .enter()
1109 0 : .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
1110 :
1111 0 : let shard_index = timeline.tenant_shard_id.to_index();
1112 0 : let entry = self
1113 0 : .shard_timelines
1114 0 : .entry(shard_index)
1115 0 : .or_insert(HandlerTimeline {
1116 0 : timeline,
1117 0 : _guard: gate_guard,
1118 0 : });
1119 0 :
1120 0 : Ok(&entry.timeline)
1121 0 : }
1122 :
1123 : /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
1124 : /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such
1125 : /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node).
1126 0 : async fn load_timeline_for_page(
1127 0 : &mut self,
1128 0 : tenant_id: TenantId,
1129 0 : timeline_id: TimelineId,
1130 0 : key: Key,
1131 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1132 : // Slow path: we must call out to the TenantManager to find the timeline for this Key
1133 0 : let timeline = self
1134 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
1135 0 : .await?;
1136 :
1137 0 : self.cache_timeline(timeline)
1138 0 : }
1139 :
1140 0 : async fn get_timeline_shard_zero(
1141 0 : &mut self,
1142 0 : tenant_id: TenantId,
1143 0 : timeline_id: TimelineId,
1144 0 : ) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
1145 : // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
1146 : // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
1147 : // ref to salf. So instead, we first build a bool, and then return while not borrowing self.
1148 0 : let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
1149 0 : idx.shard_number == ShardNumber(0)
1150 : } else {
1151 0 : false
1152 : };
1153 :
1154 0 : if have_cached {
1155 0 : let entry = self.shard_timelines.iter().next().unwrap();
1156 0 : Ok(&entry.1.timeline)
1157 : } else {
1158 0 : let timeline = self
1159 0 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1160 0 : .await?;
1161 0 : Ok(self.cache_timeline(timeline)?)
1162 : }
1163 0 : }
1164 :
1165 0 : #[instrument(skip_all, fields(shard_id))]
1166 : async fn handle_get_page_at_lsn_request(
1167 : &mut self,
1168 : tenant_id: TenantId,
1169 : timeline_id: TimelineId,
1170 : req: &PagestreamGetPageRequest,
1171 : ctx: &RequestContext,
1172 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1173 : let timeline = match self.get_cached_timeline_for_page(req) {
1174 : Ok(tl) => {
1175 : set_tracing_field_shard_id(tl);
1176 : tl
1177 : }
1178 : Err(key) => {
1179 : match self
1180 : .load_timeline_for_page(tenant_id, timeline_id, key)
1181 : .await
1182 : {
1183 : Ok(t) => t,
1184 : Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
1185 : // We already know this tenant exists in general, because we resolved it at
1186 : // start of connection. Getting a NotFound here indicates that the shard containing
1187 : // the requested page is not present on this node: the client's knowledge of shard->pageserver
1188 : // mapping is out of date.
1189 : //
1190 : // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
1191 : // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
1192 : // and talk to a different pageserver.
1193 : return Err(PageStreamError::Reconnect(
1194 : "getpage@lsn request routed to wrong shard".into(),
1195 : ));
1196 : }
1197 : Err(e) => return Err(e.into()),
1198 : }
1199 : }
1200 : };
1201 :
1202 : let _timer = timeline
1203 : .query_metrics
1204 : .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
1205 :
1206 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1207 : let lsn = Self::wait_or_get_last_lsn(
1208 : timeline,
1209 : req.request_lsn,
1210 : req.not_modified_since,
1211 : &latest_gc_cutoff_lsn,
1212 : ctx,
1213 : )
1214 : .await?;
1215 :
1216 : let page = timeline
1217 : .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
1218 : .await?;
1219 :
1220 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
1221 : page,
1222 : }))
1223 : }
1224 :
1225 0 : #[instrument(skip_all, fields(shard_id))]
1226 : async fn handle_get_slru_segment_request(
1227 : &mut self,
1228 : tenant_id: TenantId,
1229 : timeline_id: TimelineId,
1230 : req: &PagestreamGetSlruSegmentRequest,
1231 : ctx: &RequestContext,
1232 : ) -> Result<PagestreamBeMessage, PageStreamError> {
1233 : let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
1234 :
1235 : let _timer = timeline
1236 : .query_metrics
1237 : .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
1238 :
1239 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1240 : let lsn = Self::wait_or_get_last_lsn(
1241 : timeline,
1242 : req.request_lsn,
1243 : req.not_modified_since,
1244 : &latest_gc_cutoff_lsn,
1245 : ctx,
1246 : )
1247 : .await?;
1248 :
1249 : let kind = SlruKind::from_repr(req.kind)
1250 : .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
1251 : let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
1252 :
1253 : Ok(PagestreamBeMessage::GetSlruSegment(
1254 : PagestreamGetSlruSegmentResponse { segment },
1255 : ))
1256 : }
1257 :
1258 : /// Note on "fullbackup":
1259 : /// Full basebackups should only be used for debugging purposes.
1260 : /// Originally, it was introduced to enable breaking storage format changes,
1261 : /// but that is not applicable anymore.
1262 : #[allow(clippy::too_many_arguments)]
1263 0 : #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))]
1264 : async fn handle_basebackup_request<IO>(
1265 : &mut self,
1266 : pgb: &mut PostgresBackend<IO>,
1267 : tenant_id: TenantId,
1268 : timeline_id: TimelineId,
1269 : lsn: Option<Lsn>,
1270 : prev_lsn: Option<Lsn>,
1271 : full_backup: bool,
1272 : gzip: bool,
1273 : ctx: &RequestContext,
1274 : ) -> Result<(), QueryError>
1275 : where
1276 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1277 : {
1278 0 : fn map_basebackup_error(err: BasebackupError) -> QueryError {
1279 0 : match err {
1280 0 : BasebackupError::Client(e) => QueryError::Disconnected(ConnectionError::Io(e)),
1281 0 : BasebackupError::Server(e) => QueryError::Other(e),
1282 : }
1283 0 : }
1284 :
1285 : let started = std::time::Instant::now();
1286 :
1287 : // check that the timeline exists
1288 : let timeline = self
1289 : .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
1290 : .await?;
1291 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
1292 : if let Some(lsn) = lsn {
1293 : // Backup was requested at a particular LSN. Wait for it to arrive.
1294 : info!("waiting for {}", lsn);
1295 : timeline
1296 : .wait_lsn(
1297 : lsn,
1298 : crate::tenant::timeline::WaitLsnWaiter::PageService,
1299 : ctx,
1300 : )
1301 : .await?;
1302 : timeline
1303 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
1304 : .context("invalid basebackup lsn")?;
1305 : }
1306 :
1307 : let lsn_awaited_after = started.elapsed();
1308 :
1309 : // switch client to COPYOUT
1310 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)
1311 : .map_err(QueryError::Disconnected)?;
1312 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1313 :
1314 : // Send a tarball of the latest layer on the timeline. Compress if not
1315 : // fullbackup. TODO Compress in that case too (tests need to be updated)
1316 : if full_backup {
1317 : let mut writer = pgb.copyout_writer();
1318 : basebackup::send_basebackup_tarball(
1319 : &mut writer,
1320 : &timeline,
1321 : lsn,
1322 : prev_lsn,
1323 : full_backup,
1324 : ctx,
1325 : )
1326 : .await
1327 : .map_err(map_basebackup_error)?;
1328 : } else {
1329 : let mut writer = pgb.copyout_writer();
1330 : if gzip {
1331 : let mut encoder = GzipEncoder::with_quality(
1332 : writer,
1333 : // NOTE using fast compression because it's on the critical path
1334 : // for compute startup. For an empty database, we get
1335 : // <100KB with this method. The Level::Best compression method
1336 : // gives us <20KB, but maybe we should add basebackup caching
1337 : // on compute shutdown first.
1338 : async_compression::Level::Fastest,
1339 : );
1340 : basebackup::send_basebackup_tarball(
1341 : &mut encoder,
1342 : &timeline,
1343 : lsn,
1344 : prev_lsn,
1345 : full_backup,
1346 : ctx,
1347 : )
1348 : .await
1349 : .map_err(map_basebackup_error)?;
1350 : // shutdown the encoder to ensure the gzip footer is written
1351 : encoder
1352 : .shutdown()
1353 : .await
1354 0 : .map_err(|e| QueryError::Disconnected(ConnectionError::Io(e)))?;
1355 : } else {
1356 : basebackup::send_basebackup_tarball(
1357 : &mut writer,
1358 : &timeline,
1359 : lsn,
1360 : prev_lsn,
1361 : full_backup,
1362 : ctx,
1363 : )
1364 : .await
1365 : .map_err(map_basebackup_error)?;
1366 : }
1367 : }
1368 :
1369 : pgb.write_message_noflush(&BeMessage::CopyDone)
1370 : .map_err(QueryError::Disconnected)?;
1371 : self.flush_cancellable(pgb, &timeline.cancel).await?;
1372 :
1373 : let basebackup_after = started
1374 : .elapsed()
1375 : .checked_sub(lsn_awaited_after)
1376 : .unwrap_or(Duration::ZERO);
1377 :
1378 : info!(
1379 : lsn_await_millis = lsn_awaited_after.as_millis(),
1380 : basebackup_millis = basebackup_after.as_millis(),
1381 : "basebackup complete"
1382 : );
1383 :
1384 : Ok(())
1385 : }
1386 :
1387 : // when accessing management api supply None as an argument
1388 : // when using to authorize tenant pass corresponding tenant id
1389 0 : fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
1390 0 : if self.auth.is_none() {
1391 : // auth is set to Trust, nothing to check so just return ok
1392 0 : return Ok(());
1393 0 : }
1394 0 : // auth is some, just checked above, when auth is some
1395 0 : // then claims are always present because of checks during connection init
1396 0 : // so this expect won't trigger
1397 0 : let claims = self
1398 0 : .claims
1399 0 : .as_ref()
1400 0 : .expect("claims presence already checked");
1401 0 : check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0))
1402 0 : }
1403 :
1404 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1405 0 : async fn get_active_tenant_timeline(
1406 0 : &self,
1407 0 : tenant_id: TenantId,
1408 0 : timeline_id: TimelineId,
1409 0 : selector: ShardSelector,
1410 0 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1411 0 : let tenant = self
1412 0 : .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT)
1413 0 : .await
1414 0 : .map_err(GetActiveTimelineError::Tenant)?;
1415 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
1416 0 : set_tracing_field_shard_id(&timeline);
1417 0 : Ok(timeline)
1418 0 : }
1419 :
1420 : /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some
1421 : /// slots for this tenant are `InProgress` then we will wait.
1422 : /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait.
1423 : ///
1424 : /// `timeout` is used as a total timeout for the whole wait operation.
1425 0 : async fn get_active_tenant_with_timeout(
1426 0 : &self,
1427 0 : tenant_id: TenantId,
1428 0 : shard_selector: ShardSelector,
1429 0 : timeout: Duration,
1430 0 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1431 0 : let wait_start = Instant::now();
1432 0 : let deadline = wait_start + timeout;
1433 :
1434 : // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is
1435 : // for handling the rare case that the slot we're accessing is InProgress.
1436 0 : let tenant_shard = loop {
1437 0 : let resolved = self
1438 0 : .tenant_manager
1439 0 : .resolve_attached_shard(&tenant_id, shard_selector);
1440 0 : match resolved {
1441 0 : ShardResolveResult::Found(tenant_shard) => break tenant_shard,
1442 : ShardResolveResult::NotFound => {
1443 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1444 0 : tenant_id,
1445 0 : )));
1446 : }
1447 0 : ShardResolveResult::InProgress(barrier) => {
1448 : // We can't authoritatively answer right now: wait for InProgress state
1449 : // to end, then try again
1450 : tokio::select! {
1451 : _ = self.await_connection_cancelled() => {
1452 : return Err(GetActiveTenantError::Cancelled)
1453 : },
1454 : _ = barrier.wait() => {
1455 : // The barrier completed: proceed around the loop to try looking up again
1456 : },
1457 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
1458 : return Err(GetActiveTenantError::WaitForActiveTimeout {
1459 : latest_state: None,
1460 : wait_time: timeout,
1461 : });
1462 : }
1463 : }
1464 : }
1465 : };
1466 : };
1467 :
1468 0 : tracing::debug!("Waiting for tenant to enter active state...");
1469 0 : tenant_shard
1470 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1471 0 : .await?;
1472 0 : Ok(tenant_shard)
1473 0 : }
1474 : }
1475 :
1476 : #[async_trait::async_trait]
1477 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
1478 : where
1479 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
1480 : {
1481 0 : fn check_auth_jwt(
1482 0 : &mut self,
1483 0 : _pgb: &mut PostgresBackend<IO>,
1484 0 : jwt_response: &[u8],
1485 0 : ) -> Result<(), QueryError> {
1486 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
1487 : // which requires auth to be present
1488 0 : let data = self
1489 0 : .auth
1490 0 : .as_ref()
1491 0 : .unwrap()
1492 0 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)
1493 0 : .map_err(|e| QueryError::Unauthorized(e.0))?;
1494 :
1495 0 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
1496 0 : return Err(QueryError::Unauthorized(
1497 0 : "jwt token scope is Tenant, but tenant id is missing".into(),
1498 0 : ));
1499 0 : }
1500 0 :
1501 0 : debug!(
1502 0 : "jwt scope check succeeded for scope: {:#?} by tenant id: {:?}",
1503 : data.claims.scope, data.claims.tenant_id,
1504 : );
1505 :
1506 0 : self.claims = Some(data.claims);
1507 0 : Ok(())
1508 0 : }
1509 :
1510 0 : fn startup(
1511 0 : &mut self,
1512 0 : _pgb: &mut PostgresBackend<IO>,
1513 0 : _sm: &FeStartupPacket,
1514 0 : ) -> Result<(), QueryError> {
1515 : fail::fail_point!("ps::connection-start::startup-packet");
1516 0 : Ok(())
1517 0 : }
1518 :
1519 0 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
1520 : async fn process_query(
1521 : &mut self,
1522 : pgb: &mut PostgresBackend<IO>,
1523 : query_string: &str,
1524 0 : ) -> Result<(), QueryError> {
1525 0 : fail::fail_point!("simulated-bad-compute-connection", |_| {
1526 0 : info!("Hit failpoint for bad connection");
1527 0 : Err(QueryError::SimulatedConnectionError)
1528 0 : });
1529 :
1530 : fail::fail_point!("ps::connection-start::process-query");
1531 :
1532 0 : let ctx = self.connection_ctx.attached_child();
1533 0 : debug!("process query {query_string:?}");
1534 0 : let parts = query_string.split_whitespace().collect::<Vec<_>>();
1535 0 : if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
1536 0 : if params.len() != 2 {
1537 0 : return Err(QueryError::Other(anyhow::anyhow!(
1538 0 : "invalid param number for pagestream command"
1539 0 : )));
1540 0 : }
1541 0 : let tenant_id = TenantId::from_str(params[0])
1542 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1543 0 : let timeline_id = TimelineId::from_str(params[1])
1544 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1545 :
1546 0 : tracing::Span::current()
1547 0 : .record("tenant_id", field::display(tenant_id))
1548 0 : .record("timeline_id", field::display(timeline_id));
1549 0 :
1550 0 : self.check_permission(Some(tenant_id))?;
1551 :
1552 0 : COMPUTE_COMMANDS_COUNTERS
1553 0 : .for_command(ComputeCommandKind::PageStreamV2)
1554 0 : .inc();
1555 0 :
1556 0 : self.handle_pagerequests(
1557 0 : pgb,
1558 0 : tenant_id,
1559 0 : timeline_id,
1560 0 : PagestreamProtocolVersion::V2,
1561 0 : ctx,
1562 0 : )
1563 0 : .await?;
1564 0 : } else if let Some(params) = parts.strip_prefix(&["pagestream"]) {
1565 0 : if params.len() != 2 {
1566 0 : return Err(QueryError::Other(anyhow::anyhow!(
1567 0 : "invalid param number for pagestream command"
1568 0 : )));
1569 0 : }
1570 0 : let tenant_id = TenantId::from_str(params[0])
1571 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1572 0 : let timeline_id = TimelineId::from_str(params[1])
1573 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1574 :
1575 0 : tracing::Span::current()
1576 0 : .record("tenant_id", field::display(tenant_id))
1577 0 : .record("timeline_id", field::display(timeline_id));
1578 0 :
1579 0 : self.check_permission(Some(tenant_id))?;
1580 :
1581 0 : COMPUTE_COMMANDS_COUNTERS
1582 0 : .for_command(ComputeCommandKind::PageStream)
1583 0 : .inc();
1584 0 :
1585 0 : self.handle_pagerequests(
1586 0 : pgb,
1587 0 : tenant_id,
1588 0 : timeline_id,
1589 0 : PagestreamProtocolVersion::V1,
1590 0 : ctx,
1591 0 : )
1592 0 : .await?;
1593 0 : } else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
1594 0 : if params.len() < 2 {
1595 0 : return Err(QueryError::Other(anyhow::anyhow!(
1596 0 : "invalid param number for basebackup command"
1597 0 : )));
1598 0 : }
1599 :
1600 0 : let tenant_id = TenantId::from_str(params[0])
1601 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1602 0 : let timeline_id = TimelineId::from_str(params[1])
1603 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1604 :
1605 0 : tracing::Span::current()
1606 0 : .record("tenant_id", field::display(tenant_id))
1607 0 : .record("timeline_id", field::display(timeline_id));
1608 0 :
1609 0 : self.check_permission(Some(tenant_id))?;
1610 :
1611 0 : COMPUTE_COMMANDS_COUNTERS
1612 0 : .for_command(ComputeCommandKind::Basebackup)
1613 0 : .inc();
1614 :
1615 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1616 : Some(
1617 0 : Lsn::from_str(lsn_str)
1618 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1619 : )
1620 : } else {
1621 0 : None
1622 : };
1623 :
1624 0 : let gzip = match params.get(3) {
1625 0 : Some(&"--gzip") => true,
1626 0 : None => false,
1627 0 : Some(third_param) => {
1628 0 : return Err(QueryError::Other(anyhow::anyhow!(
1629 0 : "Parameter in position 3 unknown {third_param}",
1630 0 : )))
1631 : }
1632 : };
1633 :
1634 0 : let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
1635 0 : let res = async {
1636 0 : self.handle_basebackup_request(
1637 0 : pgb,
1638 0 : tenant_id,
1639 0 : timeline_id,
1640 0 : lsn,
1641 0 : None,
1642 0 : false,
1643 0 : gzip,
1644 0 : &ctx,
1645 0 : )
1646 0 : .await?;
1647 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1648 0 : Result::<(), QueryError>::Ok(())
1649 0 : }
1650 0 : .await;
1651 0 : metric_recording.observe(&res);
1652 0 : res?;
1653 : }
1654 : // same as basebackup, but result includes relational data as well
1655 0 : else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
1656 0 : if params.len() < 2 {
1657 0 : return Err(QueryError::Other(anyhow::anyhow!(
1658 0 : "invalid param number for fullbackup command"
1659 0 : )));
1660 0 : }
1661 :
1662 0 : let tenant_id = TenantId::from_str(params[0])
1663 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1664 0 : let timeline_id = TimelineId::from_str(params[1])
1665 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1666 :
1667 0 : tracing::Span::current()
1668 0 : .record("tenant_id", field::display(tenant_id))
1669 0 : .record("timeline_id", field::display(timeline_id));
1670 :
1671 : // The caller is responsible for providing correct lsn and prev_lsn.
1672 0 : let lsn = if let Some(lsn_str) = params.get(2) {
1673 : Some(
1674 0 : Lsn::from_str(lsn_str)
1675 0 : .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
1676 : )
1677 : } else {
1678 0 : None
1679 : };
1680 0 : let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
1681 : Some(
1682 0 : Lsn::from_str(prev_lsn_str)
1683 0 : .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
1684 : )
1685 : } else {
1686 0 : None
1687 : };
1688 :
1689 0 : self.check_permission(Some(tenant_id))?;
1690 :
1691 0 : COMPUTE_COMMANDS_COUNTERS
1692 0 : .for_command(ComputeCommandKind::Fullbackup)
1693 0 : .inc();
1694 0 :
1695 0 : // Check that the timeline exists
1696 0 : self.handle_basebackup_request(
1697 0 : pgb,
1698 0 : tenant_id,
1699 0 : timeline_id,
1700 0 : lsn,
1701 0 : prev_lsn,
1702 0 : true,
1703 0 : false,
1704 0 : &ctx,
1705 0 : )
1706 0 : .await?;
1707 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1708 0 : } else if query_string.starts_with("import basebackup ") {
1709 : // Import the `base` section (everything but the wal) of a basebackup.
1710 : // Assumes the tenant already exists on this pageserver.
1711 : //
1712 : // Files are scheduled to be persisted to remote storage, and the
1713 : // caller should poll the http api to check when that is done.
1714 : //
1715 : // Example import command:
1716 : // 1. Get start/end LSN from backup_manifest file
1717 : // 2. Run:
1718 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1719 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1720 0 : let params = &parts[2..];
1721 0 : if params.len() != 5 {
1722 0 : return Err(QueryError::Other(anyhow::anyhow!(
1723 0 : "invalid param number for import basebackup command"
1724 0 : )));
1725 0 : }
1726 0 : let tenant_id = TenantId::from_str(params[0])
1727 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1728 0 : let timeline_id = TimelineId::from_str(params[1])
1729 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1730 0 : let base_lsn = Lsn::from_str(params[2])
1731 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1732 0 : let end_lsn = Lsn::from_str(params[3])
1733 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1734 0 : let pg_version = u32::from_str(params[4])
1735 0 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1736 :
1737 0 : tracing::Span::current()
1738 0 : .record("tenant_id", field::display(tenant_id))
1739 0 : .record("timeline_id", field::display(timeline_id));
1740 0 :
1741 0 : self.check_permission(Some(tenant_id))?;
1742 :
1743 0 : COMPUTE_COMMANDS_COUNTERS
1744 0 : .for_command(ComputeCommandKind::ImportBasebackup)
1745 0 : .inc();
1746 0 :
1747 0 : match self
1748 0 : .handle_import_basebackup(
1749 0 : pgb,
1750 0 : tenant_id,
1751 0 : timeline_id,
1752 0 : base_lsn,
1753 0 : end_lsn,
1754 0 : pg_version,
1755 0 : ctx,
1756 0 : )
1757 0 : .await
1758 : {
1759 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1760 0 : Err(e) => {
1761 0 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1762 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1763 0 : &e.to_string(),
1764 0 : Some(e.pg_error_code()),
1765 0 : ))?
1766 : }
1767 : };
1768 0 : } else if query_string.starts_with("import wal ") {
1769 : // Import the `pg_wal` section of a basebackup.
1770 : //
1771 : // Files are scheduled to be persisted to remote storage, and the
1772 : // caller should poll the http api to check when that is done.
1773 0 : let params = &parts[2..];
1774 0 : if params.len() != 4 {
1775 0 : return Err(QueryError::Other(anyhow::anyhow!(
1776 0 : "invalid param number for import wal command"
1777 0 : )));
1778 0 : }
1779 0 : let tenant_id = TenantId::from_str(params[0])
1780 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1781 0 : let timeline_id = TimelineId::from_str(params[1])
1782 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1783 0 : let start_lsn = Lsn::from_str(params[2])
1784 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1785 0 : let end_lsn = Lsn::from_str(params[3])
1786 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1787 :
1788 0 : tracing::Span::current()
1789 0 : .record("tenant_id", field::display(tenant_id))
1790 0 : .record("timeline_id", field::display(timeline_id));
1791 0 :
1792 0 : self.check_permission(Some(tenant_id))?;
1793 :
1794 0 : COMPUTE_COMMANDS_COUNTERS
1795 0 : .for_command(ComputeCommandKind::ImportWal)
1796 0 : .inc();
1797 0 :
1798 0 : match self
1799 0 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1800 0 : .await
1801 : {
1802 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1803 0 : Err(e) => {
1804 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1805 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1806 0 : &e.to_string(),
1807 0 : Some(e.pg_error_code()),
1808 0 : ))?
1809 : }
1810 : };
1811 0 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1812 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1813 : // on connect
1814 0 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1815 0 : } else if query_string.starts_with("lease lsn ") {
1816 0 : let params = &parts[2..];
1817 0 : if params.len() != 3 {
1818 0 : return Err(QueryError::Other(anyhow::anyhow!(
1819 0 : "invalid param number {} for lease lsn command",
1820 0 : params.len()
1821 0 : )));
1822 0 : }
1823 :
1824 0 : let tenant_shard_id = TenantShardId::from_str(params[0])
1825 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1826 0 : let timeline_id = TimelineId::from_str(params[1])
1827 0 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1828 :
1829 0 : tracing::Span::current()
1830 0 : .record("tenant_id", field::display(tenant_shard_id))
1831 0 : .record("timeline_id", field::display(timeline_id));
1832 0 :
1833 0 : self.check_permission(Some(tenant_shard_id.tenant_id))?;
1834 :
1835 0 : COMPUTE_COMMANDS_COUNTERS
1836 0 : .for_command(ComputeCommandKind::LeaseLsn)
1837 0 : .inc();
1838 :
1839 : // The caller is responsible for providing correct lsn.
1840 0 : let lsn = Lsn::from_str(params[2])
1841 0 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1842 :
1843 0 : match self
1844 0 : .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
1845 0 : .await
1846 : {
1847 0 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1848 0 : Err(e) => {
1849 0 : error!("error obtaining lsn lease for {lsn}: {e:?}");
1850 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1851 0 : &e.to_string(),
1852 0 : Some(e.pg_error_code()),
1853 0 : ))?
1854 : }
1855 : };
1856 0 : } else if let Some(params) = parts.strip_prefix(&["show"]) {
1857 : // show <tenant_id>
1858 0 : if params.len() != 1 {
1859 0 : return Err(QueryError::Other(anyhow::anyhow!(
1860 0 : "invalid param number for config command"
1861 0 : )));
1862 0 : }
1863 0 : let tenant_id = TenantId::from_str(params[0])
1864 0 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1865 :
1866 0 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1867 0 :
1868 0 : self.check_permission(Some(tenant_id))?;
1869 :
1870 0 : COMPUTE_COMMANDS_COUNTERS
1871 0 : .for_command(ComputeCommandKind::Show)
1872 0 : .inc();
1873 :
1874 0 : let tenant = self
1875 0 : .get_active_tenant_with_timeout(
1876 0 : tenant_id,
1877 0 : ShardSelector::Zero,
1878 0 : ACTIVE_TENANT_TIMEOUT,
1879 0 : )
1880 0 : .await?;
1881 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1882 0 : RowDescriptor::int8_col(b"checkpoint_distance"),
1883 0 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1884 0 : RowDescriptor::int8_col(b"compaction_target_size"),
1885 0 : RowDescriptor::int8_col(b"compaction_period"),
1886 0 : RowDescriptor::int8_col(b"compaction_threshold"),
1887 0 : RowDescriptor::int8_col(b"gc_horizon"),
1888 0 : RowDescriptor::int8_col(b"gc_period"),
1889 0 : RowDescriptor::int8_col(b"image_creation_threshold"),
1890 0 : RowDescriptor::int8_col(b"pitr_interval"),
1891 0 : ]))?
1892 0 : .write_message_noflush(&BeMessage::DataRow(&[
1893 0 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1894 0 : Some(
1895 0 : tenant
1896 0 : .get_checkpoint_timeout()
1897 0 : .as_secs()
1898 0 : .to_string()
1899 0 : .as_bytes(),
1900 0 : ),
1901 0 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1902 0 : Some(
1903 0 : tenant
1904 0 : .get_compaction_period()
1905 0 : .as_secs()
1906 0 : .to_string()
1907 0 : .as_bytes(),
1908 0 : ),
1909 0 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1910 0 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1911 0 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1912 0 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1913 0 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1914 0 : ]))?
1915 0 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1916 : } else {
1917 0 : return Err(QueryError::Other(anyhow::anyhow!(
1918 0 : "unknown command {query_string}"
1919 0 : )));
1920 : }
1921 :
1922 0 : Ok(())
1923 0 : }
1924 : }
1925 :
1926 : impl From<GetActiveTenantError> for QueryError {
1927 0 : fn from(e: GetActiveTenantError) -> Self {
1928 0 : match e {
1929 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1930 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1931 0 : ),
1932 : GetActiveTenantError::Cancelled
1933 : | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
1934 0 : QueryError::Shutdown
1935 : }
1936 0 : e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()),
1937 0 : e => QueryError::Other(anyhow::anyhow!(e)),
1938 : }
1939 0 : }
1940 : }
1941 :
1942 0 : #[derive(Debug, thiserror::Error)]
1943 : enum GetActiveTimelineError {
1944 : #[error(transparent)]
1945 : Tenant(GetActiveTenantError),
1946 : #[error(transparent)]
1947 : Timeline(#[from] GetTimelineError),
1948 : }
1949 :
1950 : impl From<GetActiveTimelineError> for QueryError {
1951 0 : fn from(e: GetActiveTimelineError) -> Self {
1952 0 : match e {
1953 0 : GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => QueryError::Shutdown,
1954 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1955 0 : GetActiveTimelineError::Timeline(e) => QueryError::NotFound(format!("{e}").into()),
1956 : }
1957 0 : }
1958 : }
1959 :
1960 0 : fn set_tracing_field_shard_id(timeline: &Timeline) {
1961 0 : debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
1962 0 : tracing::Span::current().record(
1963 0 : "shard_id",
1964 0 : tracing::field::display(timeline.tenant_shard_id.shard_slug()),
1965 0 : );
1966 0 : debug_assert_current_span_has_tenant_and_timeline_id();
1967 0 : }
|