TLA Line data Source code
1 : //
2 : //! The Page Service listens for client connections and serves their GetPage@LSN
3 : //! requests.
4 : //
5 : // It is possible to connect here using usual psql/pgbench/libpq. Following
6 : // commands are supported now:
7 : // *status* -- show actual info about this pageserver,
8 : // *pagestream* -- enter mode where smgr and pageserver talk with their
9 : // custom protocol.
10 : //
11 :
12 : use anyhow::Context;
13 : use async_compression::tokio::write::GzipEncoder;
14 : use bytes::Buf;
15 : use bytes::Bytes;
16 : use futures::Stream;
17 : use pageserver_api::models::TenantState;
18 : use pageserver_api::models::{
19 : PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
20 : PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
21 : PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
22 : PagestreamNblocksRequest, PagestreamNblocksResponse,
23 : };
24 : use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
25 : use pq_proto::framed::ConnectionError;
26 : use pq_proto::FeStartupPacket;
27 : use pq_proto::{BeMessage, FeMessage, RowDescriptor};
28 : use std::io;
29 : use std::net::TcpListener;
30 : use std::pin::pin;
31 : use std::str;
32 : use std::str::FromStr;
33 : use std::sync::Arc;
34 : use std::time::Duration;
35 : use tokio::io::AsyncWriteExt;
36 : use tokio::io::{AsyncRead, AsyncWrite};
37 : use tokio_util::io::StreamReader;
38 : use tokio_util::sync::CancellationToken;
39 : use tracing::field;
40 : use tracing::*;
41 : use utils::id::ConnectionId;
42 : use utils::{
43 : auth::{Claims, JwtAuth, Scope},
44 : id::{TenantId, TimelineId},
45 : lsn::Lsn,
46 : simple_rcu::RcuReadGuard,
47 : };
48 :
49 : use crate::auth::check_permission;
50 : use crate::basebackup;
51 : use crate::config::PageServerConf;
52 : use crate::context::{DownloadBehavior, RequestContext};
53 : use crate::import_datadir::import_wal_from_tar;
54 : use crate::metrics;
55 : use crate::metrics::LIVE_CONNECTIONS_COUNT;
56 : use crate::task_mgr;
57 : use crate::task_mgr::TaskKind;
58 : use crate::tenant;
59 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
60 : use crate::tenant::mgr;
61 : use crate::tenant::mgr::GetTenantError;
62 : use crate::tenant::{Tenant, Timeline};
63 : use crate::trace::Tracer;
64 :
65 : use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
66 : use postgres_ffi::BLCKSZ;
67 :
68 : /// Read the end of a tar archive.
69 : ///
70 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
71 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
72 : /// and check that there is no more data after the EOF marker.
73 : ///
74 : /// XXX: Currently, any trailing data after the EOF marker prints a warning.
75 : /// Perhaps it should be a hard error?
76 CBC 5 : async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
77 5 : use tokio::io::AsyncReadExt;
78 5 : let mut buf = [0u8; 512];
79 5 :
80 5 : // Read the all-zeros block, and verify it
81 5 : let mut total_bytes = 0;
82 10 : while total_bytes < 512 {
83 5 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
84 5 : total_bytes += nbytes;
85 5 : if nbytes == 0 {
86 UBC 0 : break;
87 CBC 5 : }
88 : }
89 5 : if total_bytes < 512 {
90 UBC 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
91 CBC 5 : }
92 2560 : if !buf.iter().all(|&x| x == 0) {
93 UBC 0 : anyhow::bail!("invalid tar EOF marker");
94 CBC 5 : }
95 5 :
96 5 : // Drain any data after the EOF marker
97 5 : let mut trailing_bytes = 0;
98 : loop {
99 6 : let nbytes = reader.read(&mut buf).await?;
100 6 : trailing_bytes += nbytes;
101 6 : if nbytes == 0 {
102 5 : break;
103 1 : }
104 : }
105 5 : if trailing_bytes > 0 {
106 1 : warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
107 4 : }
108 5 : Ok(())
109 5 : }
110 :
111 : ///////////////////////////////////////////////////////////////////////////////
112 :
113 : ///
114 : /// Main loop of the page service.
115 : ///
116 : /// Listens for connections, and launches a new handler task for each.
117 : ///
118 560 : pub async fn libpq_listener_main(
119 560 : conf: &'static PageServerConf,
120 560 : broker_client: storage_broker::BrokerClientChannel,
121 560 : auth: Option<Arc<JwtAuth>>,
122 560 : listener: TcpListener,
123 560 : auth_type: AuthType,
124 560 : listener_ctx: RequestContext,
125 560 : cancel: CancellationToken,
126 560 : ) -> anyhow::Result<()> {
127 560 : listener.set_nonblocking(true)?;
128 560 : let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
129 :
130 : // Wait for a new connection to arrive, or for server shutdown.
131 5679 : while let Some(res) = tokio::select! {
132 : biased;
133 :
134 : _ = cancel.cancelled() => {
135 : // We were requested to shut down.
136 : None
137 : }
138 :
139 5119 : res = tokio_listener.accept() => {
140 : Some(res)
141 : }
142 : } {
143 5119 : match res {
144 5119 : Ok((socket, peer_addr)) => {
145 : // Connection established. Spawn a new task to handle it.
146 UBC 0 : debug!("accepted connection from {}", peer_addr);
147 CBC 5119 : let local_auth = auth.clone();
148 5119 :
149 5119 : let connection_ctx = listener_ctx
150 5119 : .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
151 5119 :
152 5119 : // PageRequestHandler tasks are not associated with any particular
153 5119 : // timeline in the task manager. In practice most connections will
154 5119 : // only deal with a particular timeline, but we don't know which one
155 5119 : // yet.
156 5119 : task_mgr::spawn(
157 5119 : &tokio::runtime::Handle::current(),
158 5119 : TaskKind::PageRequestHandler,
159 5119 : None,
160 5119 : None,
161 5119 : "serving compute connection task",
162 5119 : false,
163 5119 : page_service_conn_main(
164 5119 : conf,
165 5119 : broker_client.clone(),
166 5119 : local_auth,
167 5119 : socket,
168 5119 : auth_type,
169 5119 : connection_ctx,
170 5119 : ),
171 5119 : );
172 : }
173 UBC 0 : Err(err) => {
174 : // accept() failed. Log the error, and loop back to retry on next connection.
175 0 : error!("accept() failed: {:?}", err);
176 : }
177 : }
178 : }
179 :
180 0 : debug!("page_service loop terminated");
181 :
182 CBC 144 : Ok(())
183 144 : }
184 :
185 15357 : #[instrument(skip_all, fields(peer_addr))]
186 : async fn page_service_conn_main(
187 : conf: &'static PageServerConf,
188 : broker_client: storage_broker::BrokerClientChannel,
189 : auth: Option<Arc<JwtAuth>>,
190 : socket: tokio::net::TcpStream,
191 : auth_type: AuthType,
192 : connection_ctx: RequestContext,
193 : ) -> anyhow::Result<()> {
194 : // Immediately increment the gauge, then create a job to decrement it on task exit.
195 : // One of the pros of `defer!` is that this will *most probably*
196 : // get called, even in presence of panics.
197 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
198 : gauge.inc();
199 5043 : scopeguard::defer! {
200 5043 : gauge.dec();
201 5043 : }
202 :
203 : socket
204 : .set_nodelay(true)
205 : .context("could not set TCP_NODELAY")?;
206 :
207 : let peer_addr = socket.peer_addr().context("get peer address")?;
208 : tracing::Span::current().record("peer_addr", field::display(peer_addr));
209 :
210 : // setup read timeout of 10 minutes. the timeout is rather arbitrary for requirements:
211 : // - long enough for most valid compute connections
212 : // - less than infinite to stop us from "leaking" connections to long-gone computes
213 : //
214 : // no write timeout is used, because the kernel is assumed to error writes after some time.
215 : let mut socket = tokio_io_timeout::TimeoutReader::new(socket);
216 :
217 : // timeout should be lower, but trying out multiple days for
218 : // <https://github.com/neondatabase/neon/issues/4205>
219 : socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3)));
220 : let socket = std::pin::pin!(socket);
221 :
222 : // XXX: pgbackend.run() should take the connection_ctx,
223 : // and create a child per-query context when it invokes process_query.
224 : // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
225 : // and create the per-query context in process_query ourselves.
226 : let mut conn_handler = PageServerHandler::new(
227 : conf,
228 : broker_client,
229 : auth,
230 : connection_ctx,
231 : task_mgr::shutdown_token(),
232 : );
233 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
234 :
235 : match pgbackend
236 : .run(&mut conn_handler, task_mgr::shutdown_watcher)
237 : .await
238 : {
239 : Ok(()) => {
240 : // we've been requested to shut down
241 : Ok(())
242 : }
243 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
244 : if is_expected_io_error(&io_error) {
245 UBC 0 : info!("Postgres client disconnected ({io_error})");
246 : Ok(())
247 : } else {
248 : Err(io_error).context("Postgres connection error")
249 : }
250 : }
251 : other => other.context("Postgres query error"),
252 : }
253 : }
254 :
255 : struct PageServerHandler {
256 : _conf: &'static PageServerConf,
257 : broker_client: storage_broker::BrokerClientChannel,
258 : auth: Option<Arc<JwtAuth>>,
259 : claims: Option<Claims>,
260 :
261 : /// The context created for the lifetime of the connection
262 : /// services by this PageServerHandler.
263 : /// For each query received over the connection,
264 : /// `process_query` creates a child context from this one.
265 : connection_ctx: RequestContext,
266 :
267 : /// A token that should fire when the tenant transitions from
268 : /// attached state, or when the pageserver is shutting down.
269 : cancel: CancellationToken,
270 : }
271 :
272 : impl PageServerHandler {
273 CBC 5119 : pub fn new(
274 5119 : conf: &'static PageServerConf,
275 5119 : broker_client: storage_broker::BrokerClientChannel,
276 5119 : auth: Option<Arc<JwtAuth>>,
277 5119 : connection_ctx: RequestContext,
278 5119 : cancel: CancellationToken,
279 5119 : ) -> Self {
280 5119 : PageServerHandler {
281 5119 : _conf: conf,
282 5119 : broker_client,
283 5119 : auth,
284 5119 : claims: None,
285 5119 : connection_ctx,
286 5119 : cancel,
287 5119 : }
288 5119 : }
289 :
290 : /// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
291 : /// this rather than naked flush() in order to shut down promptly. Without this, we would
292 : /// block shutdown of a tenant if a postgres client was failing to consume bytes we send
293 : /// in the flush.
294 3784731 : async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
295 3784731 : where
296 3784731 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
297 3784731 : {
298 3793867 : tokio::select!(
299 3784727 : flush_r = pgb.flush() => {
300 : Ok(flush_r?)
301 : },
302 : _ = self.cancel.cancelled() => {
303 : Err(QueryError::Shutdown)
304 : }
305 : )
306 3784731 : }
307 :
308 7 : fn copyin_stream<'a, IO>(
309 7 : &'a self,
310 7 : pgb: &'a mut PostgresBackend<IO>,
311 7 : ) -> impl Stream<Item = io::Result<Bytes>> + 'a
312 7 : where
313 7 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
314 7 : {
315 7 : async_stream::try_stream! {
316 7 : loop {
317 26355 : let msg = tokio::select! {
318 7 : biased;
319 7 :
320 7 : _ = self.cancel.cancelled() => {
321 7 : // We were requested to shut down.
322 7 : let msg = "pageserver is shutting down";
323 7 : let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
324 7 : Err(QueryError::Shutdown)
325 7 : }
326 7 :
327 26355 : msg = pgb.read_message() => { msg.map_err(QueryError::from)}
328 7 : };
329 7 :
330 26355 : match msg {
331 26355 : Ok(Some(message)) => {
332 26355 : let copy_data_bytes = match message {
333 26342 : FeMessage::CopyData(bytes) => bytes,
334 7 : FeMessage::CopyDone => { break },
335 7 : FeMessage::Sync => continue,
336 7 : FeMessage::Terminate => {
337 7 : let msg = "client terminated connection with Terminate message during COPY";
338 UBC 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
339 0 : // error can't happen here, ErrorResponse serialization should be always ok
340 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
341 CBC 7 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
342 7 : break;
343 7 : }
344 7 : m => {
345 UBC 0 : let msg = format!("unexpected message {m:?}");
346 0 : // error can't happen here, ErrorResponse serialization should be always ok
347 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
348 CBC 7 : Err(io::Error::new(io::ErrorKind::Other, msg))?;
349 7 : break;
350 7 : }
351 7 : };
352 7 :
353 26342 : yield copy_data_bytes;
354 7 : }
355 7 : Ok(None) => {
356 7 : let msg = "client closed connection during COPY";
357 UBC 0 : let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
358 0 : // error can't happen here, ErrorResponse serialization should be always ok
359 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
360 CBC 7 : self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
361 7 : Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
362 7 : }
363 7 : Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
364 UBC 0 : Err(io_error)?;
365 CBC 7 : }
366 7 : Err(other) => {
367 UBC 0 : Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
368 CBC 7 : }
369 7 : };
370 7 : }
371 7 : }
372 7 : }
373 :
374 22110 : #[instrument(skip_all)]
375 : async fn handle_pagerequests<IO>(
376 : &self,
377 : pgb: &mut PostgresBackend<IO>,
378 : tenant_id: TenantId,
379 : timeline_id: TimelineId,
380 : ctx: RequestContext,
381 : ) -> Result<(), QueryError>
382 : where
383 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
384 : {
385 : debug_assert_current_span_has_tenant_and_timeline_id();
386 :
387 : // NOTE: pagerequests handler exits when connection is closed,
388 : // so there is no need to reset the association
389 : task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
390 :
391 : // Make request tracer if needed
392 : let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
393 : let mut tracer = if tenant.get_trace_read_requests() {
394 : let connection_id = ConnectionId::generate();
395 : let path = tenant
396 : .conf
397 : .trace_path(&tenant_id, &timeline_id, &connection_id);
398 : Some(Tracer::new(path))
399 : } else {
400 : None
401 : };
402 :
403 : // Check that the timeline exists
404 : let timeline = tenant
405 : .get_timeline(timeline_id, true)
406 LBC (1) : .map_err(|e| anyhow::anyhow!(e))?;
407 :
408 : // switch client to COPYBOTH
409 : pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
410 : self.flush_cancellable(pgb).await?;
411 :
412 : let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
413 :
414 : loop {
415 CBC 7193794 : let msg = tokio::select! {
416 : biased;
417 :
418 : _ = self.cancel.cancelled() => {
419 : // We were requested to shut down.
420 105 : info!("shutdown request received in page handler");
421 : return Err(QueryError::Shutdown)
422 : }
423 :
424 3783293 : msg = pgb.read_message() => { msg }
425 : };
426 :
427 : let copy_data_bytes = match msg? {
428 : Some(FeMessage::CopyData(bytes)) => bytes,
429 : Some(FeMessage::Terminate) => break,
430 : Some(m) => {
431 : return Err(QueryError::Other(anyhow::anyhow!(
432 : "unexpected message: {m:?} during COPY"
433 : )));
434 : }
435 : None => break, // client disconnected
436 : };
437 :
438 UBC 0 : trace!("query: {copy_data_bytes:?}");
439 :
440 : // Trace request if needed
441 : if let Some(t) = tracer.as_mut() {
442 : t.trace(©_data_bytes)
443 : }
444 :
445 : let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
446 :
447 : // TODO: We could create a new per-request context here, with unique ID.
448 : // Currently we use the same per-timeline context for all requests
449 :
450 : let (response, span) = match neon_fe_msg {
451 : PagestreamFeMessage::Exists(req) => {
452 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
453 : let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
454 : (
455 : self.handle_get_rel_exists_request(&timeline, &req, &ctx)
456 : .instrument(span.clone())
457 : .await,
458 : span,
459 : )
460 : }
461 : PagestreamFeMessage::Nblocks(req) => {
462 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
463 : let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
464 : (
465 : self.handle_get_nblocks_request(&timeline, &req, &ctx)
466 : .instrument(span.clone())
467 : .await,
468 : span,
469 : )
470 : }
471 : PagestreamFeMessage::GetPage(req) => {
472 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
473 : let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
474 : (
475 : self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
476 : .instrument(span.clone())
477 : .await,
478 : span,
479 : )
480 : }
481 : PagestreamFeMessage::DbSize(req) => {
482 : let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
483 : let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
484 : (
485 : self.handle_db_size_request(&timeline, &req, &ctx)
486 : .instrument(span.clone())
487 : .await,
488 : span,
489 : )
490 : }
491 : };
492 :
493 CBC 1 : let response = response.unwrap_or_else(|e| {
494 1 : // print the all details to the log with {:#}, but for the client the
495 1 : // error message is enough
496 1 : span.in_scope(|| error!("error reading relation or page version: {:#}", e));
497 1 : PagestreamBeMessage::Error(PagestreamErrorResponse {
498 1 : message: e.to_string(),
499 1 : })
500 1 : });
501 :
502 : pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
503 : self.flush_cancellable(pgb).await?;
504 : }
505 : Ok(())
506 : }
507 :
508 : #[allow(clippy::too_many_arguments)]
509 10 : #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
510 : async fn handle_import_basebackup<IO>(
511 : &self,
512 : pgb: &mut PostgresBackend<IO>,
513 : tenant_id: TenantId,
514 : timeline_id: TimelineId,
515 : base_lsn: Lsn,
516 : _end_lsn: Lsn,
517 : pg_version: u32,
518 : ctx: RequestContext,
519 : ) -> Result<(), QueryError>
520 : where
521 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
522 : {
523 : debug_assert_current_span_has_tenant_and_timeline_id();
524 :
525 : task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
526 : // Create empty timeline
527 5 : info!("creating new timeline");
528 : let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
529 : let timeline = tenant
530 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
531 : .await?;
532 :
533 : // TODO mark timeline as not ready until it reaches end_lsn.
534 : // We might have some wal to import as well, and we should prevent compute
535 : // from connecting before that and writing conflicting wal.
536 : //
537 : // This is not relevant for pageserver->pageserver migrations, since there's
538 : // no wal to import. But should be fixed if we want to import from postgres.
539 :
540 : // TODO leave clean state on error. For now you can use detach to clean
541 : // up broken state from a failed import.
542 :
543 : // Import basebackup provided via CopyData
544 5 : info!("importing basebackup");
545 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
546 : self.flush_cancellable(pgb).await?;
547 :
548 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
549 : timeline
550 : .import_basebackup_from_tar(
551 : &mut copyin_reader,
552 : base_lsn,
553 : self.broker_client.clone(),
554 : &ctx,
555 : )
556 : .await?;
557 :
558 : // Read the end of the tar archive.
559 : read_tar_eof(copyin_reader).await?;
560 :
561 : // TODO check checksum
562 : // Meanwhile you can verify client-side by taking fullbackup
563 : // and checking that it matches in size with what was imported.
564 : // It wouldn't work if base came from vanilla postgres though,
565 : // since we discard some log files.
566 :
567 3 : info!("done");
568 : Ok(())
569 : }
570 :
571 6 : #[instrument(skip_all, fields(%start_lsn, %end_lsn))]
572 : async fn handle_import_wal<IO>(
573 : &self,
574 : pgb: &mut PostgresBackend<IO>,
575 : tenant_id: TenantId,
576 : timeline_id: TimelineId,
577 : start_lsn: Lsn,
578 : end_lsn: Lsn,
579 : ctx: RequestContext,
580 : ) -> Result<(), QueryError>
581 : where
582 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
583 : {
584 : debug_assert_current_span_has_tenant_and_timeline_id();
585 : task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
586 :
587 : let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
588 : let last_record_lsn = timeline.get_last_record_lsn();
589 : if last_record_lsn != start_lsn {
590 : return Err(QueryError::Other(
591 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
592 : );
593 : }
594 :
595 : // TODO leave clean state on error. For now you can use detach to clean
596 : // up broken state from a failed import.
597 :
598 : // Import wal provided via CopyData
599 2 : info!("importing wal");
600 : pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
601 : self.flush_cancellable(pgb).await?;
602 : let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
603 : import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
604 2 : info!("wal import complete");
605 :
606 : // Read the end of the tar archive.
607 : read_tar_eof(copyin_reader).await?;
608 :
609 : // TODO Does it make sense to overshoot?
610 : if timeline.get_last_record_lsn() < end_lsn {
611 : return Err(QueryError::Other(
612 : anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
613 : );
614 : }
615 :
616 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
617 : // We only want to persist the data, and it doesn't matter if it's in the
618 : // shape of deltas or images.
619 2 : info!("flushing layers");
620 : timeline.freeze_and_flush().await?;
621 :
622 2 : info!("done");
623 : Ok(())
624 : }
625 :
626 : /// Helper function to handle the LSN from client request.
627 : ///
628 : /// Each GetPage (and Exists and Nblocks) request includes information about
629 : /// which version of the page is being requested. The client can request the
630 : /// latest version of the page, or the version that's valid at a particular
631 : /// LSN. The primary compute node will always request the latest page
632 : /// version, while a standby will request a version at the LSN that it's
633 : /// currently caught up to.
634 : ///
635 : /// In either case, if the page server hasn't received the WAL up to the
636 : /// requested LSN yet, we will wait for it to arrive. The return value is
637 : /// the LSN that should be used to look up the page versions.
638 3779079 : async fn wait_or_get_last_lsn(
639 3779079 : timeline: &Timeline,
640 3779079 : mut lsn: Lsn,
641 3779079 : latest: bool,
642 3779079 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
643 3779079 : ctx: &RequestContext,
644 3779079 : ) -> anyhow::Result<Lsn> {
645 3779077 : if latest {
646 : // Latest page version was requested. If LSN is given, it is a hint
647 : // to the page server that there have been no modifications to the
648 : // page after that LSN. If we haven't received WAL up to that point,
649 : // wait until it arrives.
650 3665291 : let last_record_lsn = timeline.get_last_record_lsn();
651 3665291 :
652 3665291 : // Note: this covers the special case that lsn == Lsn(0). That
653 3665291 : // special case means "return the latest version whatever it is",
654 3665291 : // and it's used for bootstrapping purposes, when the page server is
655 3665291 : // connected directly to the compute node. That is needed because
656 3665291 : // when you connect to the compute node, to receive the WAL, the
657 3665291 : // walsender process will do a look up in the pg_authid catalog
658 3665291 : // table for authentication. That poses a deadlock problem: the
659 3665291 : // catalog table lookup will send a GetPage request, but the GetPage
660 3665291 : // request will block in the page server because the recent WAL
661 3665291 : // hasn't been received yet, and it cannot be received until the
662 3665291 : // walsender completes the authentication and starts streaming the
663 3665291 : // WAL.
664 3665291 : if lsn <= last_record_lsn {
665 3562783 : lsn = last_record_lsn;
666 3562783 : } else {
667 145031 : timeline.wait_lsn(lsn, ctx).await?;
668 : // Since we waited for 'lsn' to arrive, that is now the last
669 : // record LSN. (Or close enough for our purposes; the
670 : // last-record LSN can advance immediately after we return
671 : // anyway)
672 : }
673 : } else {
674 113786 : if lsn == Lsn(0) {
675 1 : anyhow::bail!("invalid LSN(0) in request");
676 113785 : }
677 113785 : timeline.wait_lsn(lsn, ctx).await?;
678 : }
679 3779075 : anyhow::ensure!(
680 3779075 : lsn >= **latest_gc_cutoff_lsn,
681 UBC 0 : "tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
682 0 : lsn, **latest_gc_cutoff_lsn
683 : );
684 CBC 3779075 : Ok(lsn)
685 3779076 : }
686 :
687 46663 : async fn handle_get_rel_exists_request(
688 46663 : &self,
689 46663 : timeline: &Timeline,
690 46663 : req: &PagestreamExistsRequest,
691 46663 : ctx: &RequestContext,
692 46663 : ) -> anyhow::Result<PagestreamBeMessage> {
693 46663 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
694 46663 : let lsn =
695 46663 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
696 UBC 0 : .await?;
697 :
698 CBC 46663 : let exists = timeline
699 46663 : .get_rel_exists(req.rel, lsn, req.latest, ctx)
700 31900 : .await?;
701 :
702 46663 : Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
703 46663 : exists,
704 46663 : }))
705 46663 : }
706 :
707 17067 : async fn handle_get_nblocks_request(
708 17067 : &self,
709 17067 : timeline: &Timeline,
710 17067 : req: &PagestreamNblocksRequest,
711 17067 : ctx: &RequestContext,
712 17067 : ) -> anyhow::Result<PagestreamBeMessage> {
713 17067 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
714 17067 : let lsn =
715 17067 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
716 183 : .await?;
717 :
718 17067 : let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
719 :
720 17067 : Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
721 17067 : n_blocks,
722 17067 : }))
723 17067 : }
724 :
725 5 : async fn handle_db_size_request(
726 5 : &self,
727 5 : timeline: &Timeline,
728 5 : req: &PagestreamDbSizeRequest,
729 5 : ctx: &RequestContext,
730 5 : ) -> anyhow::Result<PagestreamBeMessage> {
731 5 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
732 5 : let lsn =
733 5 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
734 LBC (2) : .await?;
735 :
736 CBC 5 : let total_blocks = timeline
737 5 : .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
738 107 : .await?;
739 5 : let db_size = total_blocks as i64 * BLCKSZ as i64;
740 5 :
741 5 : Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
742 5 : db_size,
743 5 : }))
744 5 : }
745 :
746 3715344 : async fn handle_get_page_at_lsn_request(
747 3715344 : &self,
748 3715344 : timeline: &Timeline,
749 3715344 : req: &PagestreamGetPageRequest,
750 3715344 : ctx: &RequestContext,
751 3715344 : ) -> anyhow::Result<PagestreamBeMessage> {
752 3715342 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
753 3715340 : let lsn =
754 3715342 : Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
755 144848 : .await?;
756 : /*
757 : // Add a 1s delay to some requests. The delay helps the requests to
758 : // hit the race condition from github issue #1047 more easily.
759 : use rand::Rng;
760 : if rand::thread_rng().gen::<u8>() < 5 {
761 : std::thread::sleep(std::time::Duration::from_millis(1000));
762 : }
763 : */
764 :
765 3715340 : let page = timeline
766 3715340 : .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
767 5377724 : .await?;
768 :
769 3715309 : Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
770 3715309 : page,
771 3715309 : }))
772 3715310 : }
773 :
774 : #[allow(clippy::too_many_arguments)]
775 1941 : #[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
776 : async fn handle_basebackup_request<IO>(
777 : &mut self,
778 : pgb: &mut PostgresBackend<IO>,
779 : tenant_id: TenantId,
780 : timeline_id: TimelineId,
781 : lsn: Option<Lsn>,
782 : prev_lsn: Option<Lsn>,
783 : full_backup: bool,
784 : gzip: bool,
785 : ctx: RequestContext,
786 : ) -> anyhow::Result<()>
787 : where
788 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
789 : {
790 : debug_assert_current_span_has_tenant_and_timeline_id();
791 :
792 : let started = std::time::Instant::now();
793 :
794 : // check that the timeline exists
795 : let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
796 : let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
797 : if let Some(lsn) = lsn {
798 : // Backup was requested at a particular LSN. Wait for it to arrive.
799 237 : info!("waiting for {}", lsn);
800 : timeline.wait_lsn(lsn, &ctx).await?;
801 : timeline
802 : .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
803 : .context("invalid basebackup lsn")?;
804 : }
805 :
806 : let lsn_awaited_after = started.elapsed();
807 :
808 : // switch client to COPYOUT
809 : pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
810 : self.flush_cancellable(pgb).await?;
811 :
812 : // Send a tarball of the latest layer on the timeline. Compress if not
813 : // fullbackup. TODO Compress in that case too (tests need to be updated)
814 : if full_backup {
815 : let mut writer = pgb.copyout_writer();
816 : basebackup::send_basebackup_tarball(
817 : &mut writer,
818 : &timeline,
819 : lsn,
820 : prev_lsn,
821 : full_backup,
822 : &ctx,
823 : )
824 : .await?;
825 : } else {
826 : let mut writer = pgb.copyout_writer();
827 : if gzip {
828 : let mut encoder = GzipEncoder::with_quality(
829 : writer,
830 : // NOTE using fast compression because it's on the critical path
831 : // for compute startup. For an empty database, we get
832 : // <100KB with this method. The Level::Best compression method
833 : // gives us <20KB, but maybe we should add basebackup caching
834 : // on compute shutdown first.
835 : async_compression::Level::Fastest,
836 : );
837 : basebackup::send_basebackup_tarball(
838 : &mut encoder,
839 : &timeline,
840 : lsn,
841 : prev_lsn,
842 : full_backup,
843 : &ctx,
844 : )
845 : .await?;
846 : // shutdown the encoder to ensure the gzip footer is written
847 : encoder.shutdown().await?;
848 : } else {
849 : basebackup::send_basebackup_tarball(
850 : &mut writer,
851 : &timeline,
852 : lsn,
853 : prev_lsn,
854 : full_backup,
855 : &ctx,
856 : )
857 : .await?;
858 : }
859 : }
860 :
861 : pgb.write_message_noflush(&BeMessage::CopyDone)?;
862 : self.flush_cancellable(pgb).await?;
863 :
864 : let basebackup_after = started
865 : .elapsed()
866 : .checked_sub(lsn_awaited_after)
867 : .unwrap_or(Duration::ZERO);
868 :
869 638 : info!(
870 638 : lsn_await_millis = lsn_awaited_after.as_millis(),
871 638 : basebackup_millis = basebackup_after.as_millis(),
872 638 : "basebackup complete"
873 638 : );
874 :
875 : Ok(())
876 : }
877 :
878 : // when accessing management api supply None as an argument
879 : // when using to authorize tenant pass corresponding tenant id
880 5106 : fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
881 5106 : if self.auth.is_none() {
882 : // auth is set to Trust, nothing to check so just return ok
883 5027 : return Ok(());
884 79 : }
885 79 : // auth is some, just checked above, when auth is some
886 79 : // then claims are always present because of checks during connection init
887 79 : // so this expect won't trigger
888 79 : let claims = self
889 79 : .claims
890 79 : .as_ref()
891 79 : .expect("claims presence already checked");
892 79 : check_permission(claims, tenant_id)
893 5106 : }
894 : }
895 :
896 : #[async_trait::async_trait]
897 : impl<IO> postgres_backend::Handler<IO> for PageServerHandler
898 : where
899 : IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
900 : {
901 81 : fn check_auth_jwt(
902 81 : &mut self,
903 81 : _pgb: &mut PostgresBackend<IO>,
904 81 : jwt_response: &[u8],
905 81 : ) -> Result<(), QueryError> {
906 : // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
907 : // which requires auth to be present
908 81 : let data = self
909 81 : .auth
910 81 : .as_ref()
911 81 : .unwrap()
912 81 : .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
913 :
914 81 : if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() {
915 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
916 0 : "jwt token scope is Tenant, but tenant id is missing"
917 0 : )));
918 CBC 81 : }
919 81 :
920 81 : info!(
921 81 : "jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
922 81 : data.claims.scope, data.claims.tenant_id,
923 81 : );
924 :
925 81 : self.claims = Some(data.claims);
926 81 : Ok(())
927 81 : }
928 :
929 5119 : fn startup(
930 5119 : &mut self,
931 5119 : _pgb: &mut PostgresBackend<IO>,
932 5119 : _sm: &FeStartupPacket,
933 5119 : ) -> Result<(), QueryError> {
934 5119 : Ok(())
935 5119 : }
936 :
937 10252 : #[instrument(skip_all, fields(tenant_id, timeline_id))]
938 : async fn process_query(
939 : &mut self,
940 : pgb: &mut PostgresBackend<IO>,
941 : query_string: &str,
942 5126 : ) -> Result<(), QueryError> {
943 5126 : let ctx = self.connection_ctx.attached_child();
944 5126 : debug!("process query {query_string:?}");
945 :
946 5126 : if query_string.starts_with("pagestream ") {
947 4436 : let (_, params_raw) = query_string.split_at("pagestream ".len());
948 4436 : let params = params_raw.split(' ').collect::<Vec<_>>();
949 4436 : if params.len() != 2 {
950 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
951 0 : "invalid param number for pagestream command"
952 0 : )));
953 CBC 4436 : }
954 4436 : let tenant_id = TenantId::from_str(params[0])
955 4436 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
956 4436 : let timeline_id = TimelineId::from_str(params[1])
957 4436 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
958 :
959 4436 : tracing::Span::current()
960 4436 : .record("tenant_id", field::display(tenant_id))
961 4436 : .record("timeline_id", field::display(timeline_id));
962 4436 :
963 4436 : self.check_permission(Some(tenant_id))?;
964 :
965 4436 : self.handle_pagerequests(pgb, tenant_id, timeline_id, ctx)
966 8974679 : .await?;
967 690 : } else if query_string.starts_with("basebackup ") {
968 638 : let (_, params_raw) = query_string.split_at("basebackup ".len());
969 638 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
970 638 :
971 638 : if params.len() < 2 {
972 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
973 0 : "invalid param number for basebackup command"
974 0 : )));
975 CBC 638 : }
976 :
977 638 : let tenant_id = TenantId::from_str(params[0])
978 638 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
979 638 : let timeline_id = TimelineId::from_str(params[1])
980 638 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
981 :
982 638 : tracing::Span::current()
983 638 : .record("tenant_id", field::display(tenant_id))
984 638 : .record("timeline_id", field::display(timeline_id));
985 638 :
986 638 : self.check_permission(Some(tenant_id))?;
987 :
988 638 : let lsn = if params.len() >= 3 {
989 : Some(
990 229 : Lsn::from_str(params[2])
991 229 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
992 : )
993 : } else {
994 409 : None
995 : };
996 :
997 638 : let gzip = if params.len() >= 4 {
998 229 : if params[3] == "--gzip" {
999 229 : true
1000 : } else {
1001 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1002 0 : "Parameter in position 3 unknown {}",
1003 0 : params[3],
1004 0 : )));
1005 : }
1006 : } else {
1007 CBC 409 : false
1008 : };
1009 :
1010 638 : ::metrics::metric_vec_duration::observe_async_block_duration_by_result(
1011 638 : &*metrics::BASEBACKUP_QUERY_TIME,
1012 638 : async move {
1013 638 : self.handle_basebackup_request(
1014 638 : pgb,
1015 638 : tenant_id,
1016 638 : timeline_id,
1017 638 : lsn,
1018 638 : None,
1019 638 : false,
1020 638 : gzip,
1021 638 : ctx,
1022 638 : )
1023 40022 : .await?;
1024 629 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1025 629 : anyhow::Ok(())
1026 638 : },
1027 638 : )
1028 40022 : .await?;
1029 : }
1030 : // return pair of prev_lsn and last_lsn
1031 52 : else if query_string.starts_with("get_last_record_rlsn ") {
1032 11 : let (_, params_raw) = query_string.split_at("get_last_record_rlsn ".len());
1033 11 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1034 11 :
1035 11 : if params.len() != 2 {
1036 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1037 0 : "invalid param number for get_last_record_rlsn command"
1038 0 : )));
1039 CBC 11 : }
1040 :
1041 11 : let tenant_id = TenantId::from_str(params[0])
1042 11 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1043 11 : let timeline_id = TimelineId::from_str(params[1])
1044 11 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1045 :
1046 11 : tracing::Span::current()
1047 11 : .record("tenant_id", field::display(tenant_id))
1048 11 : .record("timeline_id", field::display(timeline_id));
1049 11 :
1050 11 : self.check_permission(Some(tenant_id))?;
1051 9 : let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
1052 :
1053 9 : let end_of_timeline = timeline.get_last_record_rlsn();
1054 9 :
1055 9 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1056 9 : RowDescriptor::text_col(b"prev_lsn"),
1057 9 : RowDescriptor::text_col(b"last_lsn"),
1058 9 : ]))?
1059 9 : .write_message_noflush(&BeMessage::DataRow(&[
1060 9 : Some(end_of_timeline.prev.to_string().as_bytes()),
1061 9 : Some(end_of_timeline.last.to_string().as_bytes()),
1062 9 : ]))?
1063 9 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1064 : }
1065 : // same as basebackup, but result includes relational data as well
1066 41 : else if query_string.starts_with("fullbackup ") {
1067 9 : let (_, params_raw) = query_string.split_at("fullbackup ".len());
1068 9 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1069 9 :
1070 9 : if params.len() < 2 {
1071 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1072 0 : "invalid param number for fullbackup command"
1073 0 : )));
1074 CBC 9 : }
1075 :
1076 9 : let tenant_id = TenantId::from_str(params[0])
1077 9 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1078 9 : let timeline_id = TimelineId::from_str(params[1])
1079 9 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1080 :
1081 9 : tracing::Span::current()
1082 9 : .record("tenant_id", field::display(tenant_id))
1083 9 : .record("timeline_id", field::display(timeline_id));
1084 :
1085 : // The caller is responsible for providing correct lsn and prev_lsn.
1086 9 : let lsn = if params.len() > 2 {
1087 : Some(
1088 9 : Lsn::from_str(params[2])
1089 9 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?,
1090 : )
1091 : } else {
1092 UBC 0 : None
1093 : };
1094 CBC 9 : let prev_lsn = if params.len() > 3 {
1095 : Some(
1096 6 : Lsn::from_str(params[3])
1097 6 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?,
1098 : )
1099 : } else {
1100 3 : None
1101 : };
1102 :
1103 9 : self.check_permission(Some(tenant_id))?;
1104 :
1105 : // Check that the timeline exists
1106 9 : self.handle_basebackup_request(
1107 9 : pgb,
1108 9 : tenant_id,
1109 9 : timeline_id,
1110 9 : lsn,
1111 9 : prev_lsn,
1112 9 : true,
1113 9 : false,
1114 9 : ctx,
1115 9 : )
1116 5788 : .await?;
1117 9 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1118 32 : } else if query_string.starts_with("import basebackup ") {
1119 : // Import the `base` section (everything but the wal) of a basebackup.
1120 : // Assumes the tenant already exists on this pageserver.
1121 : //
1122 : // Files are scheduled to be persisted to remote storage, and the
1123 : // caller should poll the http api to check when that is done.
1124 : //
1125 : // Example import command:
1126 : // 1. Get start/end LSN from backup_manifest file
1127 : // 2. Run:
1128 : // cat my_backup/base.tar | psql -h $PAGESERVER \
1129 : // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
1130 5 : let (_, params_raw) = query_string.split_at("import basebackup ".len());
1131 5 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1132 5 : if params.len() != 5 {
1133 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1134 0 : "invalid param number for import basebackup command"
1135 0 : )));
1136 CBC 5 : }
1137 5 : let tenant_id = TenantId::from_str(params[0])
1138 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1139 5 : let timeline_id = TimelineId::from_str(params[1])
1140 5 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1141 5 : let base_lsn = Lsn::from_str(params[2])
1142 5 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1143 5 : let end_lsn = Lsn::from_str(params[3])
1144 5 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1145 5 : let pg_version = u32::from_str(params[4])
1146 5 : .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
1147 :
1148 5 : tracing::Span::current()
1149 5 : .record("tenant_id", field::display(tenant_id))
1150 5 : .record("timeline_id", field::display(timeline_id));
1151 5 :
1152 5 : self.check_permission(Some(tenant_id))?;
1153 :
1154 5 : match self
1155 5 : .handle_import_basebackup(
1156 5 : pgb,
1157 5 : tenant_id,
1158 5 : timeline_id,
1159 5 : base_lsn,
1160 5 : end_lsn,
1161 5 : pg_version,
1162 5 : ctx,
1163 5 : )
1164 814 : .await
1165 : {
1166 3 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1167 2 : Err(e) => {
1168 2 : error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
1169 2 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1170 2 : &e.to_string(),
1171 2 : Some(e.pg_error_code()),
1172 2 : ))?
1173 : }
1174 : };
1175 27 : } else if query_string.starts_with("import wal ") {
1176 : // Import the `pg_wal` section of a basebackup.
1177 : //
1178 : // Files are scheduled to be persisted to remote storage, and the
1179 : // caller should poll the http api to check when that is done.
1180 2 : let (_, params_raw) = query_string.split_at("import wal ".len());
1181 2 : let params = params_raw.split_whitespace().collect::<Vec<_>>();
1182 2 : if params.len() != 4 {
1183 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1184 0 : "invalid param number for import wal command"
1185 0 : )));
1186 CBC 2 : }
1187 2 : let tenant_id = TenantId::from_str(params[0])
1188 2 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1189 2 : let timeline_id = TimelineId::from_str(params[1])
1190 2 : .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
1191 2 : let start_lsn = Lsn::from_str(params[2])
1192 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
1193 2 : let end_lsn = Lsn::from_str(params[3])
1194 2 : .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
1195 :
1196 2 : tracing::Span::current()
1197 2 : .record("tenant_id", field::display(tenant_id))
1198 2 : .record("timeline_id", field::display(timeline_id));
1199 2 :
1200 2 : self.check_permission(Some(tenant_id))?;
1201 :
1202 2 : match self
1203 2 : .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
1204 246 : .await
1205 : {
1206 2 : Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
1207 UBC 0 : Err(e) => {
1208 0 : error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
1209 0 : pgb.write_message_noflush(&BeMessage::ErrorResponse(
1210 0 : &e.to_string(),
1211 0 : Some(e.pg_error_code()),
1212 0 : ))?
1213 : }
1214 : };
1215 CBC 25 : } else if query_string.to_ascii_lowercase().starts_with("set ") {
1216 : // important because psycopg2 executes "SET datestyle TO 'ISO'"
1217 : // on connect
1218 20 : pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1219 5 : } else if query_string.starts_with("show ") {
1220 : // show <tenant_id>
1221 5 : let (_, params_raw) = query_string.split_at("show ".len());
1222 5 : let params = params_raw.split(' ').collect::<Vec<_>>();
1223 5 : if params.len() != 1 {
1224 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1225 0 : "invalid param number for config command"
1226 0 : )));
1227 CBC 5 : }
1228 5 : let tenant_id = TenantId::from_str(params[0])
1229 5 : .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
1230 :
1231 5 : tracing::Span::current().record("tenant_id", field::display(tenant_id));
1232 5 :
1233 5 : self.check_permission(Some(tenant_id))?;
1234 :
1235 5 : let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
1236 5 : pgb.write_message_noflush(&BeMessage::RowDescription(&[
1237 5 : RowDescriptor::int8_col(b"checkpoint_distance"),
1238 5 : RowDescriptor::int8_col(b"checkpoint_timeout"),
1239 5 : RowDescriptor::int8_col(b"compaction_target_size"),
1240 5 : RowDescriptor::int8_col(b"compaction_period"),
1241 5 : RowDescriptor::int8_col(b"compaction_threshold"),
1242 5 : RowDescriptor::int8_col(b"gc_horizon"),
1243 5 : RowDescriptor::int8_col(b"gc_period"),
1244 5 : RowDescriptor::int8_col(b"image_creation_threshold"),
1245 5 : RowDescriptor::int8_col(b"pitr_interval"),
1246 5 : ]))?
1247 5 : .write_message_noflush(&BeMessage::DataRow(&[
1248 5 : Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
1249 5 : Some(
1250 5 : tenant
1251 5 : .get_checkpoint_timeout()
1252 5 : .as_secs()
1253 5 : .to_string()
1254 5 : .as_bytes(),
1255 5 : ),
1256 5 : Some(tenant.get_compaction_target_size().to_string().as_bytes()),
1257 5 : Some(
1258 5 : tenant
1259 5 : .get_compaction_period()
1260 5 : .as_secs()
1261 5 : .to_string()
1262 5 : .as_bytes(),
1263 5 : ),
1264 5 : Some(tenant.get_compaction_threshold().to_string().as_bytes()),
1265 5 : Some(tenant.get_gc_horizon().to_string().as_bytes()),
1266 5 : Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
1267 5 : Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
1268 5 : Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
1269 5 : ]))?
1270 5 : .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
1271 : } else {
1272 UBC 0 : return Err(QueryError::Other(anyhow::anyhow!(
1273 0 : "unknown command {query_string}"
1274 0 : )));
1275 : }
1276 :
1277 CBC 4888 : Ok(())
1278 10176 : }
1279 : }
1280 :
1281 3 : #[derive(thiserror::Error, Debug)]
1282 : enum GetActiveTenantError {
1283 : #[error(
1284 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1285 : )]
1286 : WaitForActiveTimeout {
1287 : latest_state: TenantState,
1288 : wait_time: Duration,
1289 : },
1290 : #[error(transparent)]
1291 : NotFound(GetTenantError),
1292 : #[error(transparent)]
1293 : WaitTenantActive(tenant::WaitToBecomeActiveError),
1294 : }
1295 :
1296 : impl From<GetActiveTenantError> for QueryError {
1297 35 : fn from(e: GetActiveTenantError) -> Self {
1298 35 : match e {
1299 UBC 0 : GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
1300 0 : ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
1301 0 : ),
1302 CBC 30 : GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
1303 5 : GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
1304 : }
1305 35 : }
1306 : }
1307 :
1308 : /// Get active tenant.
1309 : ///
1310 : /// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
1311 : /// ensures that queries don't fail immediately after pageserver startup, because
1312 : /// all tenants are still loading.
1313 5104 : async fn get_active_tenant_with_timeout(
1314 5104 : tenant_id: TenantId,
1315 5104 : _ctx: &RequestContext, /* require get a context to support cancellation in the future */
1316 5104 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1317 5104 : let tenant = match mgr::get_tenant(tenant_id, false).await {
1318 5099 : Ok(tenant) => tenant,
1319 5 : Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
1320 : Err(GetTenantError::NotActive(_)) => {
1321 UBC 0 : unreachable!("we're calling get_tenant with active_only=false")
1322 : }
1323 : Err(GetTenantError::Broken(_)) => {
1324 0 : unreachable!("we're calling get_tenant with active_only=false")
1325 : }
1326 : };
1327 CBC 5099 : let wait_time = Duration::from_secs(30);
1328 5099 : match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
1329 5068 : Ok(Ok(())) => Ok(tenant),
1330 : // no .context(), the error message is good enough and some tests depend on it
1331 31 : Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
1332 : Err(_) => {
1333 UBC 0 : let latest_state = tenant.current_state();
1334 0 : if latest_state == TenantState::Active {
1335 0 : Ok(tenant)
1336 : } else {
1337 0 : Err(GetActiveTenantError::WaitForActiveTimeout {
1338 0 : latest_state,
1339 0 : wait_time,
1340 0 : })
1341 : }
1342 : }
1343 : }
1344 CBC 5104 : }
1345 :
1346 6 : #[derive(Debug, thiserror::Error)]
1347 : enum GetActiveTimelineError {
1348 : #[error(transparent)]
1349 : Tenant(GetActiveTenantError),
1350 : #[error(transparent)]
1351 : Timeline(anyhow::Error),
1352 : }
1353 :
1354 : impl From<GetActiveTimelineError> for QueryError {
1355 UBC 0 : fn from(e: GetActiveTimelineError) -> Self {
1356 0 : match e {
1357 0 : GetActiveTimelineError::Tenant(e) => e.into(),
1358 0 : GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
1359 : }
1360 0 : }
1361 : }
1362 :
1363 : /// Shorthand for getting a reference to a Timeline of an Active tenant.
1364 CBC 658 : async fn get_active_tenant_timeline(
1365 658 : tenant_id: TenantId,
1366 658 : timeline_id: TimelineId,
1367 658 : ctx: &RequestContext,
1368 658 : ) -> Result<Arc<Timeline>, GetActiveTimelineError> {
1369 658 : let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
1370 7 : .await
1371 658 : .map_err(GetActiveTimelineError::Tenant)?;
1372 657 : let timeline = tenant
1373 657 : .get_timeline(timeline_id, true)
1374 657 : .map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
1375 656 : Ok(timeline)
1376 658 : }
|