Line data Source code
1 : //
2 : //! The Page Service listens for client connections and serves their GetPage@LSN
3 : //! requests.
4 : //
5 : // It is possible to connect here using usual psql/pgbench/libpq. Following
6 : // commands are supported now:
7 : // *status* -- show actual info about this pageserver,
8 : // *pagestream* -- enter mode where smgr and pageserver talk with their
9 : // custom protocol.
10 : //
11 :
12 : use anyhow::Context;
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use bytes::Bytes;
16 : use futures::Stream;
17 : use pageserver_api::models::TenantState;
18 : use pageserver_api::models::{
19 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
20 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
21 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
22 : PagestreamNblocksRequest, PagestreamNblocksResponse,
23 : };
24 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
25 : use pq_proto::framed::ConnectionError;
26 : use pq_proto::FeStartupPacket;
27 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
28 : use std::io;
29 : use std::net::TcpListener;
30 : use std::pin::pin;
31 : use std::str;
32 : use std::str::FromStr;
33 : use std::sync::Arc;
34 : use std::time::Duration;
35 : use tokio::io::AsyncWriteExt;
36 : use tokio::io::{AsyncRead, AsyncWrite};
37 : use tokio_util::io::StreamReader;
38 : use tracing::field;
39 : use tracing::*;
40 : use utils::id::ConnectionId;
41 : use utils::{
42 : auth::{Claims, JwtAuth, Scope},
43 : id::{TenantId, TimelineId},
44 : lsn::Lsn,
45 : simple_rcu::RcuReadGuard,
46 : };
47 :
48 : use crate::auth::check_permission;
49 : use crate::basebackup;
50 : use crate::config::PageServerConf;
51 : use crate::context::{DownloadBehavior, RequestContext};
52 : use crate::import_datadir::import_wal_from_tar;
53 : use crate::metrics;
54 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
55 : use crate::task_mgr;
56 : use crate::task_mgr::TaskKind;
57 : use crate::tenant;
58 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
59 : use crate::tenant::mgr;
60 : use crate::tenant::mgr::GetTenantError;
61 : use crate::tenant::{Tenant, Timeline};
62 : use crate::trace::Tracer;
63 :
64 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
65 : use postgres_ffi::BLCKSZ;
66 :
67 7 : fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
68 7 : where
69 7 : IO: AsyncRead + AsyncWrite + Unpin,
70 7 : {
71 7 : async_stream::try_stream! {
72 7 : loop {
73 26355 : let msg = tokio::select! {
74 7 : biased;
75 7 :
76 7 : _ = task_mgr::shutdown_watcher() => {
77 7 : // We were requested to shut down.
78 7 : let msg = "pageserver is shutting down";
79 7 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
80 7 : Err(QueryError::Other(anyhow::anyhow!(msg)))
81 7 : }
82 7 :
83 26355 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
84 7 : };
85 7 :
86 26355 : match msg {
87 26355 : Ok(Some(message)) => {
88 26355 : let copy_data_bytes = match message {
89 26342 : FeMessage::CopyData(bytes) => bytes,
90 7 : FeMessage::CopyDone => { break },
91 7 : FeMessage::Sync => continue,
92 7 : FeMessage::Terminate => {
93 7 : let msg = "client terminated connection with Terminate message during COPY";
94 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
95 0 : // error can't happen here, ErrorResponse serialization should be always ok
96 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
97 7 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
98 7 : break;
99 7 : }
100 7 : m => {
101 0 : let msg = format!("unexpected message {m:?}");
102 0 : // error can't happen here, ErrorResponse serialization should be always ok
103 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
104 7 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
105 7 : break;
106 7 : }
107 7 : };
108 7 :
109 26342 : yield copy_data_bytes;
110 7 : }
111 7 : Ok(None) => {
112 7 : let msg = "client closed connection during COPY";
113 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
114 0 : // error can't happen here, ErrorResponse serialization should be always ok
115 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
116 7 : pgb.flush().await?;
117 7 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
118 7 : }
119 7 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
120 0 : Err(io_error)?;
121 7 : }
122 7 : Err(other) => {
123 0 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
124 7 : }
125 7 : };
126 7 : }
127 7 : }
128 7 : }
129 :
130 : /// Read the end of a tar archive.
131 : ///
132 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
133 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
134 : /// and check that there is no more data after the EOF marker.
135 : ///
136 : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
137 : /// Perhaps it should be a hard error?
138 5 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
139 5 : use tokio::io::AsyncReadExt;
140 5 : let mut buf = [0u8; 512];
141 5 :
142 5 : // Read the all-zeros block, and verify it
143 5 : let mut total_bytes = 0;
144 10 : while total_bytes < 512 {
145 5 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
146 5 : total_bytes += nbytes;
147 5 : if nbytes == 0 {
148 0 : break;
149 5 : }
150 : }
151 5 : if total_bytes < 512 {
152 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
153 5 : }
154 2560 : if !buf.iter().all(|&x| x == 0) {
155 0 : anyhow::bail!("invalid tar EOF marker");
156 5 : }
157 5 :
158 5 : // Drain any data after the EOF marker
159 5 : let mut trailing_bytes = 0;
160 : loop {
161 6 : let nbytes = reader.read(&mut buf).await?;
162 6 : trailing_bytes += nbytes;
163 6 : if nbytes == 0 {
164 5 : break;
165 1 : }
166 : }
167 5 : if trailing_bytes > 0 {
168 1 : warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
169 4 : }
170 5 : Ok(())
171 5 : }
172 :
173 : ///////////////////////////////////////////////////////////////////////////////
174 :
175 : ///
176 : /// Main loop of the page service.
177 : ///
178 : /// Listens for connections, and launches a new handler task for each.
179 : ///
180 575 : pub async fn libpq_listener_main(
181 575 : conf: &'static PageServerConf,
182 575 : broker_client: storage_broker::BrokerClientChannel,
183 575 : auth: Option<Arc<JwtAuth>>,
184 575 : listener: TcpListener,
185 575 : auth_type: AuthType,
186 575 : listener_ctx: RequestContext,
187 575 : ) -> anyhow::Result<()> {
188 575 : listener.set_nonblocking(true)?;
189 575 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
190 :
191 : // Wait for a new connection to arrive, or for server shutdown.
192 5902 : while let Some(res) = tokio::select! {
193 : biased;
194 :
195 : _ = task_mgr::shutdown_watcher() => {
196 : // We were requested to shut down.
197 : None
198 : }
199 :
200 5327 : res = tokio_listener.accept() => {
201 : Some(res)
202 : }
203 : } {
204 5327 : match res {
205 5327 : Ok((socket, peer_addr)) => {
206 : // Connection established. Spawn a new task to handle it.
207 0 : debug!("accepted connection from {}", peer_addr);
208 5327 : let local_auth = auth.clone();
209 5327 :
210 5327 : let connection_ctx = listener_ctx
211 5327 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
212 5327 :
213 5327 : // PageRequestHandler tasks are not associated with any particular
214 5327 : // timeline in the task manager. In practice most connections will
215 5327 : // only deal with a particular timeline, but we don't know which one
216 5327 : // yet.
217 5327 : task_mgr::spawn(
218 5327 : &tokio::runtime::Handle::current(),
219 5327 : TaskKind::PageRequestHandler,
220 5327 : None,
221 5327 : None,
222 5327 : "serving compute connection task",
223 5327 : false,
224 5327 : page_service_conn_main(
225 5327 : conf,
226 5327 : broker_client.clone(),
227 5327 : local_auth,
228 5327 : socket,
229 5327 : auth_type,
230 5327 : connection_ctx,
231 5327 : ),
232 5327 : );
233 : }
234 0 : Err(err) => {
235 : // accept() failed. Log the error, and loop back to retry on next connection.
236 0 : error!("accept() failed: {:?}", err);
237 : }
238 : }
239 : }
240 :
241 0 : debug!("page_service loop terminated");
242 :
243 148 : Ok(())
244 148 : }
245 :
246 15981 : #[instrument(skip_all, fields(peer_addr))]
247 : async fn page_service_conn_main(
248 : conf: &'static PageServerConf,
249 : broker_client: storage_broker::BrokerClientChannel,
250 : auth: Option<Arc<JwtAuth>>,
251 : socket: tokio::net::TcpStream,
252 : auth_type: AuthType,
253 : connection_ctx: RequestContext,
254 : ) -> anyhow::Result<()> {
255 : // Immediately increment the gauge, then create a job to decrement it on task exit.
256 : // One of the pros of `defer!` is that this will *most probably*
257 : // get called, even in presence of panics.
258 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
259 : gauge.inc();
260 5260 : scopeguard::defer! {
261 5260 : gauge.dec();
262 5260 : }
263 :
264 : socket
265 : .set_nodelay(true)
266 : .context("could not set TCP_NODELAY")?;
267 :
268 : let peer_addr = socket.peer_addr().context("get peer address")?;
269 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
270 :
271 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
272 : // - long enough for most valid compute connections
273 : // - less than infinite to stop us from "leaking" connections to long-gone computes
274 : //
275 : // no write timeout is used, because the kernel is assumed to error writes after some time.
276 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
277 :
278 : // timeout should be lower, but trying out multiple days for
279 : // <https://github.com/neondatabase/neon/issues/4205>
280 : socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3)));
281 : let socket = std::pin::pin!(socket);
282 :
283 : // XXX: pgbackend.run() should take the connection_ctx,
284 : // and create a child per-query context when it invokes process_query.
285 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
286 : // and create the per-query context in process_query ourselves.
287 : let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
288 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
289 :
290 : match pgbackend
291 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
292 : .await
293 : {
294 : Ok(()) => {
295 : // we've been requested to shut down
296 : Ok(())
297 : }
298 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
299 : if is_expected_io_error(&io_error) {
300 7 : info!("Postgres client disconnected ({io_error})");
301 : Ok(())
302 : } else {
303 : Err(io_error).context("Postgres connection error")
304 : }
305 : }
306 : other => other.context("Postgres query error"),
307 : }
308 : }
309 :
310 : struct PageServerHandler {
311 : _conf: &'static PageServerConf,
312 : broker_client: storage_broker::BrokerClientChannel,
313 : auth: Option<Arc<JwtAuth>>,
314 : claims: Option<Claims>,
315 :
316 : /// The context created for the lifetime of the connection
317 : /// services by this PageServerHandler.
318 : /// For each query received over the connection,
319 : /// `process_query` creates a child context from this one.
320 : connection_ctx: RequestContext,
321 : }
322 :
323 : impl PageServerHandler {
324 5327 : pub fn new(
325 5327 : conf: &'static PageServerConf,
326 5327 : broker_client: storage_broker::BrokerClientChannel,
327 5327 : auth: Option<Arc<JwtAuth>>,
328 5327 : connection_ctx: RequestContext,
329 5327 : ) -> Self {
330 5327 : PageServerHandler {
331 5327 : _conf: conf,
332 5327 : broker_client,
333 5327 : auth,
334 5327 : claims: None,
335 5327 : connection_ctx,
336 5327 : }
337 5327 : }
338 :
339 22972 : #[instrument(skip_all)]
340 : async fn handle_pagerequests<IO>(
341 : &self,
342 : pgb: &mut PostgresBackend<IO>,
343 : tenant_id: TenantId,
344 : timeline_id: TimelineId,
345 : ctx: RequestContext,
346 : ) -> Result<(), QueryError>
347 : where
348 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
349 : {
350 : debug_assert_current_span_has_tenant_and_timeline_id();
351 :
352 : // NOTE: pagerequests handler exits when connection is closed,
353 : // so there is no need to reset the association
354 : task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
355 :
356 : // Make request tracer if needed
357 : let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
358 : let mut tracer = if tenant.get_trace_read_requests() {
359 : let connection_id = ConnectionId::generate();
360 : let path = tenant
361 : .conf
362 : .trace_path(&tenant_id, &timeline_id, &connection_id);
363 : Some(Tracer::new(path))
364 : } else {
365 : None
366 : };
367 :
368 : // Check that the timeline exists
369 : let timeline = tenant
370 : .get_timeline(timeline_id, true)
371 0 : .map_err(|e| anyhow::anyhow!(e))?;
372 :
373 : // switch client to COPYBOTH
374 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
375 : pgb.flush().await?;
376 :
377 : let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
378 :
379 : loop {
380 8851539 : let msg = tokio::select! {
381 : biased;
382 :
383 : _ = task_mgr::shutdown_watcher() => {
384 : // We were requested to shut down.
385 96 : info!("shutdown request received in page handler");
386 : break;
387 : }
388 :
389 4604004 : msg = pgb.read_message() => { msg }
390 : };
391 :
392 : let copy_data_bytes = match msg? {
393 : Some(FeMessage::CopyData(bytes)) => bytes,
394 : Some(FeMessage::Terminate) => break,
395 : Some(m) => {
396 : return Err(QueryError::Other(anyhow::anyhow!(
397 : "unexpected message: {m:?} during COPY"
398 : )));
399 : }
400 : None => break, // client disconnected
401 : };
402 :
403 0 : trace!("query: {copy_data_bytes:?}");
404 :
405 : // Trace request if needed
406 : if let Some(t) = tracer.as_mut() {
407 : t.trace(©_data_bytes)
408 : }
409 :
410 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
411 :
412 : // TODO: We could create a new per-request context here, with unique ID.
413 : // Currently we use the same per-timeline context for all requests
414 :
415 : let response = match neon_fe_msg {
416 : PagestreamFeMessage::Exists(req) => {
417 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
418 : self.handle_get_rel_exists_request(&timeline, &req, &ctx)
419 : .await
420 : }
421 : PagestreamFeMessage::Nblocks(req) => {
422 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
423 : self.handle_get_nblocks_request(&timeline, &req, &ctx).await
424 : }
425 : PagestreamFeMessage::GetPage(req) => {
426 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
427 : self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
428 : .await
429 : }
430 : PagestreamFeMessage::DbSize(req) => {
431 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
432 : self.handle_db_size_request(&timeline, &req, &ctx).await
433 : }
434 : };
435 :
436 1 : let response = response.unwrap_or_else(|e| {
437 1 : // print the all details to the log with {:#}, but for the client the
438 1 : // error message is enough
439 1 : error!("error reading relation or page version: {:?}", e);
440 1 : PagestreamBeMessage::Error(PagestreamErrorResponse {
441 1 : message: e.to_string(),
442 1 : })
443 1 : });
444 :
445 : pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
446 : pgb.flush().await?;
447 : }
448 : Ok(())
449 : }
450 :
451 : #[allow(clippy::too_many_arguments)]
452 10 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
453 : async fn handle_import_basebackup<IO>(
454 : &self,
455 : pgb: &mut PostgresBackend<IO>,
456 : tenant_id: TenantId,
457 : timeline_id: TimelineId,
458 : base_lsn: Lsn,
459 : _end_lsn: Lsn,
460 : pg_version: u32,
461 : ctx: RequestContext,
462 : ) -> Result<(), QueryError>
463 : where
464 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
465 : {
466 : debug_assert_current_span_has_tenant_and_timeline_id();
467 :
468 : task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
469 : // Create empty timeline
470 5 : info!("creating new timeline");
471 : let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
472 : let timeline = tenant
473 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
474 : .await?;
475 :
476 : // TODO mark timeline as not ready until it reaches end_lsn.
477 : // We might have some wal to import as well, and we should prevent compute
478 : // from connecting before that and writing conflicting wal.
479 : //
480 : // This is not relevant for pageserver->pageserver migrations, since there's
481 : // no wal to import. But should be fixed if we want to import from postgres.
482 :
483 : // TODO leave clean state on error. For now you can use detach to clean
484 : // up broken state from a failed import.
485 :
486 : // Import basebackup provided via CopyData
487 5 : info!("importing basebackup");
488 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
489 : pgb.flush().await?;
490 :
491 : let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
492 : timeline
493 : .import_basebackup_from_tar(
494 : &mut copyin_reader,
495 : base_lsn,
496 : self.broker_client.clone(),
497 : &ctx,
498 : )
499 : .await?;
500 :
501 : // Read the end of the tar archive.
502 : read_tar_eof(copyin_reader).await?;
503 :
504 : // TODO check checksum
505 : // Meanwhile you can verify client-side by taking fullbackup
506 : // and checking that it matches in size with what was imported.
507 : // It wouldn't work if base came from vanilla postgres though,
508 : // since we discard some log files.
509 :
510 3 : info!("done");
511 : Ok(())
512 : }
513 :
514 6 : #[instrument(skip_all, fields(%start_lsn, %end_lsn))]
515 : async fn handle_import_wal<IO>(
516 : &self,
517 : pgb: &mut PostgresBackend<IO>,
518 : tenant_id: TenantId,
519 : timeline_id: TimelineId,
520 : start_lsn: Lsn,
521 : end_lsn: Lsn,
522 : ctx: RequestContext,
523 : ) -> Result<(), QueryError>
524 : where
525 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
526 : {
527 : debug_assert_current_span_has_tenant_and_timeline_id();
528 : task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
529 :
530 : let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
531 : let last_record_lsn = timeline.get_last_record_lsn();
532 : if last_record_lsn != start_lsn {
533 : return Err(QueryError::Other(
534 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
535 : );
536 : }
537 :
538 : // TODO leave clean state on error. For now you can use detach to clean
539 : // up broken state from a failed import.
540 :
541 : // Import wal provided via CopyData
542 2 : info!("importing wal");
543 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
544 : pgb.flush().await?;
545 : let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
546 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
547 2 : info!("wal import complete");
548 :
549 : // Read the end of the tar archive.
550 : read_tar_eof(copyin_reader).await?;
551 :
552 : // TODO Does it make sense to overshoot?
553 : if timeline.get_last_record_lsn() < end_lsn {
554 : return Err(QueryError::Other(
555 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
556 : );
557 : }
558 :
559 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
560 : // We only want to persist the data, and it doesn't matter if it's in the
561 : // shape of deltas or images.
562 2 : info!("flushing layers");
563 : timeline.freeze_and_flush().await?;
564 :
565 2 : info!("done");
566 : Ok(())
567 : }
568 :
569 : /// Helper function to handle the LSN from client request.
570 : ///
571 : /// Each GetPage (and Exists and Nblocks) request includes information about
572 : /// which version of the page is being requested. The client can request the
573 : /// latest version of the page, or the version that's valid at a particular
574 : /// LSN. The primary compute node will always request the latest page
575 : /// version, while a standby will request a version at the LSN that it's
576 : /// currently caught up to.
577 : ///
578 : /// In either case, if the page server hasn't received the WAL up to the
579 : /// requested LSN yet, we will wait for it to arrive. The return value is
580 : /// the LSN that should be used to look up the page versions.
581 4599615 : async fn wait_or_get_last_lsn(
582 4599615 : timeline: &Timeline,
583 4599615 : mut lsn: Lsn,
584 4599615 : latest: bool,
585 4599615 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
586 4599615 : ctx: &RequestContext,
587 4599615 : ) -> anyhow::Result<Lsn> {
588 4599615 : if latest {
589 : // Latest page version was requested. If LSN is given, it is a hint
590 : // to the page server that there have been no modifications to the
591 : // page after that LSN. If we haven't received WAL up to that point,
592 : // wait until it arrives.
593 4497372 : let last_record_lsn = timeline.get_last_record_lsn();
594 4497372 :
595 4497372 : // Note: this covers the special case that lsn == Lsn(0). That
596 4497372 : // special case means "return the latest version whatever it is",
597 4497372 : // and it's used for bootstrapping purposes, when the page server is
598 4497372 : // connected directly to the compute node. That is needed because
599 4497372 : // when you connect to the compute node, to receive the WAL, the
600 4497372 : // walsender process will do a look up in the pg_authid catalog
601 4497372 : // table for authentication. That poses a deadlock problem: the
602 4497372 : // catalog table lookup will send a GetPage request, but the GetPage
603 4497372 : // request will block in the page server because the recent WAL
604 4497372 : // hasn't been received yet, and it cannot be received until the
605 4497372 : // walsender completes the authentication and starts streaming the
606 4497372 : // WAL.
607 4497372 : if lsn <= last_record_lsn {
608 4372604 : lsn = last_record_lsn;
609 4372604 : } else {
610 169751 : timeline.wait_lsn(lsn, ctx).await?;
611 : // Since we waited for 'lsn' to arrive, that is now the last
612 : // record LSN. (Or close enough for our purposes; the
613 : // last-record LSN can advance immediately after we return
614 : // anyway)
615 : }
616 : } else {
617 102243 : if lsn == Lsn(0) {
618 1 : anyhow::bail!("invalid LSN(0) in request");
619 102242 : }
620 102242 : timeline.wait_lsn(lsn, ctx).await?;
621 : }
622 4599614 : anyhow::ensure!(
623 4599614 : lsn >= **latest_gc_cutoff_lsn,
624 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
625 0 : lsn, **latest_gc_cutoff_lsn
626 : );
627 4599614 : Ok(lsn)
628 4599615 : }
629 :
630 191820 : #[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
631 : async fn handle_get_rel_exists_request(
632 : &self,
633 : timeline: &Timeline,
634 : req: &PagestreamExistsRequest,
635 : ctx: &RequestContext,
636 : ) -> anyhow::Result<PagestreamBeMessage> {
637 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
638 : let lsn =
639 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
640 : .await?;
641 :
642 : let exists = timeline
643 : .get_rel_exists(req.rel, lsn, req.latest, ctx)
644 : .await?;
645 :
646 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
647 : exists,
648 : }))
649 : }
650 :
651 72780 : #[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
652 : async fn handle_get_nblocks_request(
653 : &self,
654 : timeline: &Timeline,
655 : req: &PagestreamNblocksRequest,
656 : ctx: &RequestContext,
657 : ) -> anyhow::Result<PagestreamBeMessage> {
658 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
659 : let lsn =
660 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
661 : .await?;
662 :
663 : let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
664 :
665 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
666 : n_blocks,
667 : }))
668 : }
669 :
670 20 : #[instrument(skip(self, timeline, req, ctx), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
671 : async fn handle_db_size_request(
672 : &self,
673 : timeline: &Timeline,
674 : req: &PagestreamDbSizeRequest,
675 : ctx: &RequestContext,
676 : ) -> anyhow::Result<PagestreamBeMessage> {
677 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
678 : let lsn =
679 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
680 : .await?;
681 :
682 : let total_blocks = timeline
683 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
684 : .await?;
685 : let db_size = total_blocks as i64 * BLCKSZ as i64;
686 :
687 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
688 : db_size,
689 : }))
690 : }
691 :
692 13600380 : #[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
693 : async fn handle_get_page_at_lsn_request(
694 : &self,
695 : timeline: &Timeline,
696 : req: &PagestreamGetPageRequest,
697 : ctx: &RequestContext,
698 : ) -> anyhow::Result<PagestreamBeMessage> {
699 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
700 : let lsn =
701 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
702 : .await?;
703 : /*
704 : // Add a 1s delay to some requests. The delay helps the requests to
705 : // hit the race condition from github issue #1047 more easily.
706 : use rand::Rng;
707 : if rand::thread_rng().gen::<u8>() < 5 {
708 : std::thread::sleep(std::time::Duration::from_millis(1000));
709 : }
710 : */
711 :
712 : let page = timeline
713 : .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
714 : .await?;
715 :
716 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
717 : page,
718 : }))
719 : }
720 :
721 : #[allow(clippy::too_many_arguments)]
722 2007 : #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
723 : async fn handle_basebackup_request<IO>(
724 : &mut self,
725 : pgb: &mut PostgresBackend<IO>,
726 : tenant_id: TenantId,
727 : timeline_id: TimelineId,
728 : lsn: Option<Lsn>,
729 : prev_lsn: Option<Lsn>,
730 : full_backup: bool,
731 : gzip: bool,
732 : ctx: RequestContext,
733 : ) -> anyhow::Result<()>
734 : where
735 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
736 : {
737 : debug_assert_current_span_has_tenant_and_timeline_id();
738 :
739 : let started = std::time::Instant::now();
740 :
741 : // check that the timeline exists
742 : let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
743 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
744 : if let Some(lsn) = lsn {
745 : // Backup was requested at a particular LSN. Wait for it to arrive.
746 220 : info!("waiting for {}", lsn);
747 : timeline.wait_lsn(lsn, &ctx).await?;
748 : timeline
749 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
750 : .context("invalid basebackup lsn")?;
751 : }
752 :
753 : let lsn_awaited_after = started.elapsed();
754 :
755 : // switch client to COPYOUT
756 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
757 : pgb.flush().await?;
758 :
759 : // Send a tarball of the latest layer on the timeline. Compress if not
760 : // fullbackup. TODO Compress in that case too (tests need to be updated)
761 : if full_backup {
762 : let mut writer = pgb.copyout_writer();
763 : basebackup::send_basebackup_tarball(
764 : &mut writer,
765 : &timeline,
766 : lsn,
767 : prev_lsn,
768 : full_backup,
769 : &ctx,
770 : )
771 : .await?;
772 : } else {
773 : let mut writer = pgb.copyout_writer();
774 : if gzip {
775 : let mut encoder = GzipEncoder::with_quality(
776 : writer,
777 : // NOTE using fast compression because it's on the critical path
778 : // for compute startup. For an empty database, we get
779 : // <100KB with this method. The Level::Best compression method
780 : // gives us <20KB, but maybe we should add basebackup caching
781 : // on compute shutdown first.
782 : async_compression::Level::Fastest,
783 : );
784 : basebackup::send_basebackup_tarball(
785 : &mut encoder,
786 : &timeline,
787 : lsn,
788 : prev_lsn,
789 : full_backup,
790 : &ctx,
791 : )
792 : .await?;
793 : // shutdown the encoder to ensure the gzip footer is written
794 : encoder.shutdown().await?;
795 : } else {
796 : basebackup::send_basebackup_tarball(
797 : &mut writer,
798 : &timeline,
799 : lsn,
800 : prev_lsn,
801 : full_backup,
802 : &ctx,
803 : )
804 : .await?;
805 : }
806 : }
807 :
808 : pgb.write_message_noflush(&BeMessage::CopyDone)?;
809 : pgb.flush().await?;
810 :
811 : let basebackup_after = started
812 : .elapsed()
813 : .checked_sub(lsn_awaited_after)
814 : .unwrap_or(Duration::ZERO);
815 :
816 660 : info!(
817 660 : lsn_await_millis = lsn_awaited_after.as_millis(),
818 660 : basebackup_millis = basebackup_after.as_millis(),
819 660 : "basebackup complete"
820 660 : );
821 :
822 : Ok(())
823 : }
824 :
825 : // when accessing management api supply None as an argument
826 : // when using to authorize tenant pass corresponding tenant id
827 5314 : fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
828 5314 : if self.auth.is_none() {
829 : // auth is set to Trust, nothing to check so just return ok
830 5236 : return Ok(());
831 78 : }
832 78 : // auth is some, just checked above, when auth is some
833 78 : // then claims are always present because of checks during connection init
834 78 : // so this expect won't trigger
835 78 : let claims = self
836 78 : .claims
837 78 : .as_ref()
838 78 : .expect("claims presence already checked");
839 78 : check_permission(claims, tenant_id)
840 5314 : }
841 : }
842 :
843 : #[async_trait::async_trait]
844 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
845 : where
846 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
847 : {
848 80 : fn check_auth_jwt(
849 80 : &mut self,
850 80 : _pgb: &mut PostgresBackend<IO>,
851 80 : jwt_response: &[u8],
852 80 : ) -> Result<(), QueryError> {
853 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
854 : // which requires auth to be present
855 80 : let data = self
856 80 : .auth
857 80 : .as_ref()
858 80 : .unwrap()
859 80 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
860 :
861 80 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
862 0 : return Err(QueryError::Other(anyhow::anyhow!(
863 0 : "jwt token scope is Tenant, but tenant id is missing"
864 0 : )));
865 80 : }
866 80 :
867 80 : info!(
868 80 : "jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
869 80 : data.claims.scope, data.claims.tenant_id,
870 80 : );
871 :
872 80 : self.claims = Some(data.claims);
873 80 : Ok(())
874 80 : }
875 :
876 5327 : fn startup(
877 5327 : &mut self,
878 5327 : _pgb: &mut PostgresBackend<IO>,
879 5327 : _sm: &FeStartupPacket,
880 5327 : ) -> Result<(), QueryError> {
881 5327 : Ok(())
882 5327 : }
883 :
884 10668 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
885 : async fn process_query(
886 : &mut self,
887 : pgb: &mut PostgresBackend<IO>,
888 : query_string: &str,
889 5334 : ) -> Result<(), QueryError> {
890 5334 : let ctx = self.connection_ctx.attached_child();
891 5334 : debug!("process query {query_string:?}");
892 :
893 5334 : if query_string.starts_with("pagestream ") {
894 4622 : let (_, params_raw) = query_string.split_at("pagestream ".len());
895 4622 : let params = params_raw.split(' ').collect::<Vec<_>>();
896 4622 : if params.len() != 2 {
897 0 : return Err(QueryError::Other(anyhow::anyhow!(
898 0 : "invalid param number for pagestream command"
899 0 : )));
900 4622 : }
901 4622 : let tenant_id = TenantId::from_str(params[0])
902 4622 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
903 4622 : let timeline_id = TimelineId::from_str(params[1])
904 4622 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
905 :
906 4622 : tracing::Span::current()
907 4622 : .record("tenant_id", field::display(tenant_id))
908 4622 : .record("timeline_id", field::display(timeline_id));
909 4622 :
910 4622 : self.check_permission(Some(tenant_id))?;
911 :
912 4622 : self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
913 12981472 : .await?;
914 712 : } else if query_string.starts_with("basebackup ") {
915 660 : let (_, params_raw) = query_string.split_at("basebackup ".len());
916 660 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
917 660 :
918 660 : if params.len() < 2 {
919 0 : return Err(QueryError::Other(anyhow::anyhow!(
920 0 : "invalid param number for basebackup command"
921 0 : )));
922 660 : }
923 :
924 660 : let tenant_id = TenantId::from_str(params[0])
925 660 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
926 660 : let timeline_id = TimelineId::from_str(params[1])
927 660 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
928 :
929 660 : tracing::Span::current()
930 660 : .record("tenant_id", field::display(tenant_id))
931 660 : .record("timeline_id", field::display(timeline_id));
932 660 :
933 660 : self.check_permission(Some(tenant_id))?;
934 :
935 660 : let lsn = if params.len() >= 3 {
936 : Some(
937 213 : Lsn::from_str(params[2])
938 213 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
939 : )
940 : } else {
941 447 : None
942 : };
943 :
944 660 : let gzip = if params.len() >= 4 {
945 213 : if params[3] == "--gzip" {
946 213 : true
947 : } else {
948 0 : return Err(QueryError::Other(anyhow::anyhow!(
949 0 : "Parameter in position 3 unknown {}",
950 0 : params[3],
951 0 : )));
952 : }
953 : } else {
954 447 : false
955 : };
956 :
957 660 : ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
958 660 : &*metrics::BASEBACKUP_QUERY_TIME,
959 660 : async move {
960 660 : self.handle_basebackup_request(
961 660 : pgb,
962 660 : tenant_id,
963 660 : timeline_id,
964 660 : lsn,
965 660 : None,
966 660 : false,
967 660 : gzip,
968 660 : ctx,
969 660 : )
970 21790 : .await?;
971 651 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
972 651 : anyhow::Ok(())
973 660 : },
974 660 : )
975 21790 : .await?;
976 : }
977 : // return pair of prev_lsn and last_lsn
978 52 : else if query_string.starts_with("get_last_record_rlsn ") {
979 11 : let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
980 11 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
981 11 :
982 11 : if params.len() != 2 {
983 0 : return Err(QueryError::Other(anyhow::anyhow!(
984 0 : "invalid param number for get_last_record_rlsn command"
985 0 : )));
986 11 : }
987 :
988 11 : let tenant_id = TenantId::from_str(params[0])
989 11 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
990 11 : let timeline_id = TimelineId::from_str(params[1])
991 11 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
992 :
993 11 : tracing::Span::current()
994 11 : .record("tenant_id", field::display(tenant_id))
995 11 : .record("timeline_id", field::display(timeline_id));
996 11 :
997 11 : self.check_permission(Some(tenant_id))?;
998 9 : let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
999 :
1000 9 : let end_of_timeline = timeline.get_last_record_rlsn();
1001 9 :
1002 9 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1003 9 : RowDescriptor::text_col(b"prev_lsn"),
1004 9 : RowDescriptor::text_col(b"last_lsn"),
1005 9 : ]))?
1006 9 : .write_message_noflush(&BeMessage::DataRow(&[
1007 9 : Some(end_of_timeline.prev.to_string().as_bytes()),
1008 9 : Some(end_of_timeline.last.to_string().as_bytes()),
1009 9 : ]))?
1010 9 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1011 : }
1012 : // same as basebackup, but result includes relational data as well
1013 41 : else if query_string.starts_with("fullbackup ") {
1014 9 : let (_, params_raw) = query_string.split_at("fullbackup ".len());
1015 9 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1016 9 :
1017 9 : if params.len() < 2 {
1018 0 : return Err(QueryError::Other(anyhow::anyhow!(
1019 0 : "invalid param number for fullbackup command"
1020 0 : )));
1021 9 : }
1022 :
1023 9 : let tenant_id = TenantId::from_str(params[0])
1024 9 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1025 9 : let timeline_id = TimelineId::from_str(params[1])
1026 9 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1027 :
1028 9 : tracing::Span::current()
1029 9 : .record("tenant_id", field::display(tenant_id))
1030 9 : .record("timeline_id", field::display(timeline_id));
1031 :
1032 : // The caller is responsible for providing correct lsn and prev_lsn.
1033 9 : let lsn = if params.len() > 2 {
1034 : Some(
1035 9 : Lsn::from_str(params[2])
1036 9 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1037 : )
1038 : } else {
1039 0 : None
1040 : };
1041 9 : let prev_lsn = if params.len() > 3 {
1042 : Some(
1043 6 : Lsn::from_str(params[3])
1044 6 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
1045 : )
1046 : } else {
1047 3 : None
1048 : };
1049 :
1050 9 : self.check_permission(Some(tenant_id))?;
1051 :
1052 : // Check that the timeline exists
1053 9 : self.handle_basebackup_request(
1054 9 : pgb,
1055 9 : tenant_id,
1056 9 : timeline_id,
1057 9 : lsn,
1058 9 : prev_lsn,
1059 9 : true,
1060 9 : false,
1061 9 : ctx,
1062 9 : )
1063 3527 : .await?;
1064 9 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1065 32 : } else if query_string.starts_with("import basebackup ") {
1066 : // Import the `base` section (everything but the wal) of a basebackup.
1067 : // Assumes the tenant already exists on this pageserver.
1068 : //
1069 : // Files are scheduled to be persisted to remote storage, and the
1070 : // caller should poll the http api to check when that is done.
1071 : //
1072 : // Example import command:
1073 : // 1. Get start/end LSN from backup_manifest file
1074 : // 2. Run:
1075 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1076 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1077 5 : let (_, params_raw) = query_string.split_at("import basebackup ".len());
1078 5 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1079 5 : if params.len() != 5 {
1080 0 : return Err(QueryError::Other(anyhow::anyhow!(
1081 0 : "invalid param number for import basebackup command"
1082 0 : )));
1083 5 : }
1084 5 : let tenant_id = TenantId::from_str(params[0])
1085 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1086 5 : let timeline_id = TimelineId::from_str(params[1])
1087 5 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1088 5 : let base_lsn = Lsn::from_str(params[2])
1089 5 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1090 5 : let end_lsn = Lsn::from_str(params[3])
1091 5 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1092 5 : let pg_version = u32::from_str(params[4])
1093 5 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1094 :
1095 5 : tracing::Span::current()
1096 5 : .record("tenant_id", field::display(tenant_id))
1097 5 : .record("timeline_id", field::display(timeline_id));
1098 5 :
1099 5 : self.check_permission(Some(tenant_id))?;
1100 :
1101 5 : match self
1102 5 : .handle_import_basebackup(
1103 5 : pgb,
1104 5 : tenant_id,
1105 5 : timeline_id,
1106 5 : base_lsn,
1107 5 : end_lsn,
1108 5 : pg_version,
1109 5 : ctx,
1110 5 : )
1111 546 : .await
1112 : {
1113 3 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1114 2 : Err(e) => {
1115 2 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1116 2 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1117 2 : &e.to_string(),
1118 2 : Some(e.pg_error_code()),
1119 2 : ))?
1120 : }
1121 : };
1122 27 : } else if query_string.starts_with("import wal ") {
1123 : // Import the `pg_wal` section of a basebackup.
1124 : //
1125 : // Files are scheduled to be persisted to remote storage, and the
1126 : // caller should poll the http api to check when that is done.
1127 2 : let (_, params_raw) = query_string.split_at("import wal ".len());
1128 2 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1129 2 : if params.len() != 4 {
1130 0 : return Err(QueryError::Other(anyhow::anyhow!(
1131 0 : "invalid param number for import wal command"
1132 0 : )));
1133 2 : }
1134 2 : let tenant_id = TenantId::from_str(params[0])
1135 2 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1136 2 : let timeline_id = TimelineId::from_str(params[1])
1137 2 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1138 2 : let start_lsn = Lsn::from_str(params[2])
1139 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1140 2 : let end_lsn = Lsn::from_str(params[3])
1141 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1142 :
1143 2 : tracing::Span::current()
1144 2 : .record("tenant_id", field::display(tenant_id))
1145 2 : .record("timeline_id", field::display(timeline_id));
1146 2 :
1147 2 : self.check_permission(Some(tenant_id))?;
1148 :
1149 2 : match self
1150 2 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1151 124 : .await
1152 : {
1153 2 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1154 0 : Err(e) => {
1155 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1156 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1157 0 : &e.to_string(),
1158 0 : Some(e.pg_error_code()),
1159 0 : ))?
1160 : }
1161 : };
1162 25 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1163 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1164 : // on connect
1165 20 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1166 5 : } else if query_string.starts_with("show ") {
1167 : // show <tenant_id>
1168 5 : let (_, params_raw) = query_string.split_at("show ".len());
1169 5 : let params = params_raw.split(' ').collect::<Vec<_>>();
1170 5 : if params.len() != 1 {
1171 0 : return Err(QueryError::Other(anyhow::anyhow!(
1172 0 : "invalid param number for config command"
1173 0 : )));
1174 5 : }
1175 5 : let tenant_id = TenantId::from_str(params[0])
1176 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1177 :
1178 5 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1179 5 :
1180 5 : self.check_permission(Some(tenant_id))?;
1181 :
1182 5 : let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
1183 5 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1184 5 : RowDescriptor::int8_col(b"checkpoint_distance"),
1185 5 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1186 5 : RowDescriptor::int8_col(b"compaction_target_size"),
1187 5 : RowDescriptor::int8_col(b"compaction_period"),
1188 5 : RowDescriptor::int8_col(b"compaction_threshold"),
1189 5 : RowDescriptor::int8_col(b"gc_horizon"),
1190 5 : RowDescriptor::int8_col(b"gc_period"),
1191 5 : RowDescriptor::int8_col(b"image_creation_threshold"),
1192 5 : RowDescriptor::int8_col(b"pitr_interval"),
1193 5 : ]))?
1194 5 : .write_message_noflush(&BeMessage::DataRow(&[
1195 5 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1196 5 : Some(
1197 5 : tenant
1198 5 : .get_checkpoint_timeout()
1199 5 : .as_secs()
1200 5 : .to_string()
1201 5 : .as_bytes(),
1202 5 : ),
1203 5 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1204 5 : Some(
1205 5 : tenant
1206 5 : .get_compaction_period()
1207 5 : .as_secs()
1208 5 : .to_string()
1209 5 : .as_bytes(),
1210 5 : ),
1211 5 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1212 5 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1213 5 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1214 5 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1215 5 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1216 5 : ]))?
1217 5 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1218 : } else {
1219 0 : return Err(QueryError::Other(anyhow::anyhow!(
1220 0 : "unknown command {query_string}"
1221 0 : )));
1222 : }
1223 :
1224 5181 : Ok(())
1225 10601 : }
1226 : }
1227 :
1228 6 : #[derive(thiserror::Error, Debug)]
1229 : enum GetActiveTenantError {
1230 : #[error(
1231 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1232 : )]
1233 : WaitForActiveTimeout {
1234 : latest_state: TenantState,
1235 : wait_time: Duration,
1236 : },
1237 : #[error(transparent)]
1238 : NotFound(GetTenantError),
1239 : #[error(transparent)]
1240 : WaitTenantActive(tenant::WaitToBecomeActiveError),
1241 : }
1242 :
1243 : impl From<GetActiveTenantError> for QueryError {
1244 69 : fn from(e: GetActiveTenantError) -> Self {
1245 69 : match e {
1246 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1247 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1248 0 : ),
1249 64 : GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
1250 5 : GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
1251 : }
1252 69 : }
1253 : }
1254 :
1255 : /// Get active tenant.
1256 : ///
1257 : /// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
1258 : /// ensures that queries don't fail immediately after pageserver startup, because
1259 : /// all tenants are still loading.
1260 5312 : async fn get_active_tenant_with_timeout(
1261 5312 : tenant_id: TenantId,
1262 5312 : _ctx: &RequestContext, /* require get a context to support cancellation in the future */
1263 5312 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1264 5312 : let tenant = match mgr::get_tenant(tenant_id, false).await {
1265 5307 : Ok(tenant) => tenant,
1266 5 : Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
1267 : Err(GetTenantError::NotActive(_)) => {
1268 0 : unreachable!("we're calling get_tenant with active=false")
1269 : }
1270 : };
1271 5307 : let wait_time = Duration::from_secs(30);
1272 5307 : match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
1273 5241 : Ok(Ok(())) => Ok(tenant),
1274 : // no .context(), the error message is good enough and some tests depend on it
1275 66 : Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
1276 : Err(_) => {
1277 0 : let latest_state = tenant.current_state();
1278 0 : if latest_state == TenantState::Active {
1279 0 : Ok(tenant)
1280 : } else {
1281 0 : Err(GetActiveTenantError::WaitForActiveTimeout {
1282 0 : latest_state,
1283 0 : wait_time,
1284 0 : })
1285 : }
1286 : }
1287 : }
1288 5312 : }
1289 :
1290 6 : #[derive(Debug, thiserror::Error)]
1291 : enum GetActiveTimelineError {
1292 : #[error(transparent)]
1293 : Tenant(GetActiveTenantError),
1294 : #[error(transparent)]
1295 : Timeline(anyhow::Error),
1296 : }
1297 :
1298 : impl From<GetActiveTimelineError> for QueryError {
1299 0 : fn from(e: GetActiveTimelineError) -> Self {
1300 0 : match e {
1301 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1302 0 : GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
1303 : }
1304 0 : }
1305 : }
1306 :
1307 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1308 680 : async fn get_active_tenant_timeline(
1309 680 : tenant_id: TenantId,
1310 680 : timeline_id: TimelineId,
1311 680 : ctx: &RequestContext,
1312 680 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1313 680 : let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
1314 8 : .await
1315 680 : .map_err(GetActiveTimelineError::Tenant)?;
1316 678 : let timeline = tenant
1317 678 : .get_timeline(timeline_id, true)
1318 678 : .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
1319 678 : Ok(timeline)
1320 680 : }
|