|             Line data    Source code 
       1              : use std::pin::Pin;
       2              : 
       3              : use futures::SinkExt;
       4              : use pageserver_api::{
       5              :     models::{
       6              :         PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
       7              :         PagestreamGetPageResponse,
       8              :     },
       9              :     reltag::RelTag,
      10              : };
      11              : use tokio::task::JoinHandle;
      12              : use tokio_postgres::CopyOutStream;
      13              : use tokio_stream::StreamExt;
      14              : use tokio_util::sync::CancellationToken;
      15              : use utils::{
      16              :     id::{TenantId, TimelineId},
      17              :     lsn::Lsn,
      18              : };
      19              : 
      20              : pub struct Client {
      21              :     client: tokio_postgres::Client,
      22              :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
      23              :     conn_task: JoinHandle<()>,
      24              : }
      25              : 
      26              : pub struct BasebackupRequest {
      27              :     pub tenant_id: TenantId,
      28              :     pub timeline_id: TimelineId,
      29              :     pub lsn: Option<Lsn>,
      30              :     pub gzip: bool,
      31              : }
      32              : 
      33              : impl Client {
      34            0 :     pub async fn new(connstring: String) -> anyhow::Result<Self> {
      35            0 :         let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
      36              : 
      37            0 :         let conn_task_cancel = CancellationToken::new();
      38            0 :         let conn_task = tokio::spawn({
      39            0 :             let conn_task_cancel = conn_task_cancel.clone();
      40            0 :             async move {
      41              :                 tokio::select! {
      42              :                     _ = conn_task_cancel.cancelled() => { }
      43              :                     res = connection => {
      44              :                         res.unwrap();
      45              :                     }
      46              :                 }
      47            0 :             }
      48            0 :         });
      49            0 :         Ok(Self {
      50            0 :             cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
      51            0 :             conn_task,
      52            0 :             client,
      53            0 :         })
      54            0 :     }
      55              : 
      56            0 :     pub async fn pagestream(
      57            0 :         self,
      58            0 :         tenant_id: TenantId,
      59            0 :         timeline_id: TimelineId,
      60            0 :     ) -> anyhow::Result<PagestreamClient> {
      61            0 :         let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
      62            0 :             .client
      63            0 :             .copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}"))
      64            0 :             .await?;
      65              :         let Client {
      66            0 :             cancel_on_client_drop,
      67            0 :             conn_task,
      68            0 :             client: _,
      69            0 :         } = self;
      70            0 :         Ok(PagestreamClient {
      71            0 :             copy_both: Box::pin(copy_both),
      72            0 :             conn_task,
      73            0 :             cancel_on_client_drop,
      74            0 :         })
      75            0 :     }
      76              : 
      77            0 :     pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
      78            0 :         let BasebackupRequest {
      79            0 :             tenant_id,
      80            0 :             timeline_id,
      81            0 :             lsn,
      82            0 :             gzip,
      83            0 :         } = req;
      84            0 :         let mut args = Vec::with_capacity(5);
      85            0 :         args.push("basebackup".to_string());
      86            0 :         args.push(format!("{tenant_id}"));
      87            0 :         args.push(format!("{timeline_id}"));
      88            0 :         if let Some(lsn) = lsn {
      89            0 :             args.push(format!("{lsn}"));
      90            0 :         }
      91            0 :         if *gzip {
      92            0 :             args.push("--gzip".to_string())
      93            0 :         }
      94            0 :         Ok(self.client.copy_out(&args.join(" ")).await?)
      95            0 :     }
      96              : }
      97              : 
      98              : /// Create using [`Client::pagestream`].
      99              : pub struct PagestreamClient {
     100              :     copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
     101              :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
     102              :     conn_task: JoinHandle<()>,
     103              : }
     104              : 
     105              : pub struct RelTagBlockNo {
     106              :     pub rel_tag: RelTag,
     107              :     pub block_no: u32,
     108              : }
     109              : 
     110              : impl PagestreamClient {
     111            0 :     pub async fn shutdown(self) {
     112            0 :         let Self {
     113            0 :             copy_both,
     114            0 :             cancel_on_client_drop: cancel_conn_task,
     115            0 :             conn_task,
     116            0 :         } = self;
     117            0 :         // The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
     118            0 :         // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
     119            0 :         // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
     120            0 :         //
     121            0 :         // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`,
     122            0 :         // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race).
     123            0 :         //
     124            0 :         // Further, the pageserver makes a lot of noise when it receives CopyFail.
     125            0 :         // Computes don't send it in practice, they just hard-close the connection.
     126            0 :         //
     127            0 :         // So, let's behave like the computes and suppress the CopyFail as follows:
     128            0 :         // kill the socket first, then drop copy_both.
     129            0 :         //
     130            0 :         // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
     131            0 :         //
     132            0 :         // NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
     133            0 :         // => https://github.com/neondatabase/neon/issues/6390
     134            0 :         let _ = cancel_conn_task.unwrap();
     135            0 :         conn_task.await.unwrap();
     136            0 :         drop(copy_both);
     137            0 :     }
     138              : 
     139            0 :     pub async fn getpage(
     140            0 :         &mut self,
     141            0 :         req: PagestreamGetPageRequest,
     142            0 :     ) -> anyhow::Result<PagestreamGetPageResponse> {
     143            0 :         let req = PagestreamFeMessage::GetPage(req);
     144            0 :         let req: bytes::Bytes = req.serialize();
     145            0 :         // let mut req = tokio_util::io::ReaderStream::new(&req);
     146            0 :         let mut req = tokio_stream::once(Ok(req));
     147            0 : 
     148            0 :         self.copy_both.send_all(&mut req).await?;
     149              : 
     150            0 :         let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
     151            0 :         let next: bytes::Bytes = next.unwrap()?;
     152              : 
     153            0 :         let msg = PagestreamBeMessage::deserialize(next)?;
     154            0 :         match msg {
     155            0 :             PagestreamBeMessage::GetPage(p) => Ok(p),
     156            0 :             PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
     157              :             PagestreamBeMessage::Exists(_)
     158              :             | PagestreamBeMessage::Nblocks(_)
     159              :             | PagestreamBeMessage::DbSize(_)
     160              :             | PagestreamBeMessage::GetSlruSegment(_) => {
     161            0 :                 anyhow::bail!(
     162            0 :                     "unexpected be message kind in response to getpage request: {}",
     163            0 :                     msg.kind()
     164            0 :                 )
     165              :             }
     166              :         }
     167            0 :     }
     168              : }
         |