|             Line data    Source code 
       1              : use std::sync::{Arc, Mutex};
       2              : 
       3              : use futures::{
       4              :     stream::{SplitSink, SplitStream},
       5              :     SinkExt, StreamExt,
       6              : };
       7              : use pageserver_api::{
       8              :     models::{
       9              :         PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
      10              :         PagestreamGetPageResponse,
      11              :     },
      12              :     reltag::RelTag,
      13              : };
      14              : use tokio::task::JoinHandle;
      15              : use tokio_postgres::CopyOutStream;
      16              : use tokio_util::sync::CancellationToken;
      17              : use utils::{
      18              :     id::{TenantId, TimelineId},
      19              :     lsn::Lsn,
      20              : };
      21              : 
      22              : pub struct Client {
      23              :     client: tokio_postgres::Client,
      24              :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
      25              :     conn_task: JoinHandle<()>,
      26              : }
      27              : 
      28              : pub struct BasebackupRequest {
      29              :     pub tenant_id: TenantId,
      30              :     pub timeline_id: TimelineId,
      31              :     pub lsn: Option<Lsn>,
      32              :     pub gzip: bool,
      33              : }
      34              : 
      35              : impl Client {
      36            0 :     pub async fn new(connstring: String) -> anyhow::Result<Self> {
      37            0 :         let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
      38              : 
      39            0 :         let conn_task_cancel = CancellationToken::new();
      40            0 :         let conn_task = tokio::spawn({
      41            0 :             let conn_task_cancel = conn_task_cancel.clone();
      42            0 :             async move {
      43            0 :                 tokio::select! {
      44            0 :                     _ = conn_task_cancel.cancelled() => { }
      45            0 :                     res = connection => {
      46            0 :                         res.unwrap();
      47            0 :                     }
      48              :                 }
      49            0 :             }
      50            0 :         });
      51            0 :         Ok(Self {
      52            0 :             cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
      53            0 :             conn_task,
      54            0 :             client,
      55            0 :         })
      56            0 :     }
      57              : 
      58            0 :     pub async fn pagestream(
      59            0 :         self,
      60            0 :         tenant_id: TenantId,
      61            0 :         timeline_id: TimelineId,
      62            0 :     ) -> anyhow::Result<PagestreamClient> {
      63            0 :         let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
      64            0 :             .client
      65            0 :             .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
      66            0 :             .await?;
      67            0 :         let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
      68            0 :         let Client {
      69            0 :             cancel_on_client_drop,
      70            0 :             conn_task,
      71            0 :             client: _,
      72            0 :         } = self;
      73            0 :         let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
      74            0 :             ConnTaskRunning {
      75            0 :                 cancel_on_client_drop,
      76            0 :                 conn_task,
      77            0 :             },
      78            0 :         )));
      79            0 :         Ok(PagestreamClient {
      80            0 :             sink: PagestreamSender {
      81            0 :                 shared: shared.clone(),
      82            0 :                 sink,
      83            0 :             },
      84            0 :             stream: PagestreamReceiver {
      85            0 :                 shared: shared.clone(),
      86            0 :                 stream,
      87            0 :             },
      88            0 :             shared,
      89            0 :         })
      90            0 :     }
      91              : 
      92            0 :     pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
      93            0 :         let BasebackupRequest {
      94            0 :             tenant_id,
      95            0 :             timeline_id,
      96            0 :             lsn,
      97            0 :             gzip,
      98            0 :         } = req;
      99            0 :         let mut args = Vec::with_capacity(5);
     100            0 :         args.push("basebackup".to_string());
     101            0 :         args.push(format!("{tenant_id}"));
     102            0 :         args.push(format!("{timeline_id}"));
     103            0 :         if let Some(lsn) = lsn {
     104            0 :             args.push(format!("{lsn}"));
     105            0 :         }
     106            0 :         if *gzip {
     107            0 :             args.push("--gzip".to_string())
     108            0 :         }
     109            0 :         Ok(self.client.copy_out(&args.join(" ")).await?)
     110            0 :     }
     111              : }
     112              : 
     113              : /// Create using [`Client::pagestream`].
     114              : pub struct PagestreamClient {
     115              :     shared: Arc<Mutex<PagestreamShared>>,
     116              :     sink: PagestreamSender,
     117              :     stream: PagestreamReceiver,
     118              : }
     119              : 
     120              : pub struct PagestreamSender {
     121              :     #[allow(dead_code)]
     122              :     shared: Arc<Mutex<PagestreamShared>>,
     123              :     sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
     124              : }
     125              : 
     126              : pub struct PagestreamReceiver {
     127              :     #[allow(dead_code)]
     128              :     shared: Arc<Mutex<PagestreamShared>>,
     129              :     stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
     130              : }
     131              : 
     132              : enum PagestreamShared {
     133              :     ConnTaskRunning(ConnTaskRunning),
     134              :     ConnTaskCancelledJoinHandleReturnedOrDropped,
     135              : }
     136              : struct ConnTaskRunning {
     137              :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
     138              :     conn_task: JoinHandle<()>,
     139              : }
     140              : 
     141              : pub struct RelTagBlockNo {
     142              :     pub rel_tag: RelTag,
     143              :     pub block_no: u32,
     144              : }
     145              : 
     146              : impl PagestreamClient {
     147            0 :     pub async fn shutdown(self) {
     148            0 :         let Self {
     149            0 :             shared,
     150            0 :             sink,
     151            0 :             stream,
     152            0 :         } = { self };
     153              :         // The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
     154              :         // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
     155              :         // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
     156              :         //
     157              :         // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`,
     158              :         // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race).
     159              :         //
     160              :         // Further, the pageserver makes a lot of noise when it receives CopyFail.
     161              :         // Computes don't send it in practice, they just hard-close the connection.
     162              :         //
     163              :         // So, let's behave like the computes and suppress the CopyFail as follows:
     164              :         // kill the socket first, then drop copy_both.
     165              :         //
     166              :         // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
     167              :         //
     168              :         // NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
     169              :         // => https://github.com/neondatabase/neon/issues/6390
     170              :         let ConnTaskRunning {
     171            0 :             cancel_on_client_drop,
     172            0 :             conn_task,
     173              :         } = {
     174            0 :             let mut guard = shared.lock().unwrap();
     175            0 :             match std::mem::replace(
     176            0 :                 &mut *guard,
     177            0 :                 PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
     178            0 :             ) {
     179            0 :                 PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
     180            0 :                 PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
     181              :             }
     182              :         };
     183            0 :         let _ = cancel_on_client_drop.unwrap();
     184            0 :         conn_task.await.unwrap();
     185            0 : 
     186            0 :         // Now drop the split copy_both.
     187            0 :         drop(sink);
     188            0 :         drop(stream);
     189            0 :     }
     190              : 
     191            0 :     pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
     192            0 :         let Self {
     193            0 :             shared: _,
     194            0 :             sink,
     195            0 :             stream,
     196            0 :         } = self;
     197            0 :         (sink, stream)
     198            0 :     }
     199              : 
     200            0 :     pub async fn getpage(
     201            0 :         &mut self,
     202            0 :         req: PagestreamGetPageRequest,
     203            0 :     ) -> anyhow::Result<PagestreamGetPageResponse> {
     204            0 :         self.getpage_send(req).await?;
     205            0 :         self.getpage_recv().await
     206            0 :     }
     207              : 
     208            0 :     pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
     209            0 :         self.sink.getpage_send(req).await
     210            0 :     }
     211              : 
     212            0 :     pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
     213            0 :         self.stream.getpage_recv().await
     214            0 :     }
     215              : }
     216              : 
     217              : impl PagestreamSender {
     218              :     // TODO: maybe make this impl Sink instead for better composability?
     219            0 :     pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
     220            0 :         let msg = msg.serialize();
     221            0 :         self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
     222            0 :         Ok(())
     223            0 :     }
     224              : 
     225            0 :     pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
     226            0 :         self.send(PagestreamFeMessage::GetPage(req)).await
     227            0 :     }
     228              : }
     229              : 
     230              : impl PagestreamReceiver {
     231              :     // TODO: maybe make this impl Stream instead for better composability?
     232            0 :     pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
     233            0 :         let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
     234            0 :         let next: bytes::Bytes = next.unwrap()?;
     235            0 :         PagestreamBeMessage::deserialize(next)
     236            0 :     }
     237              : 
     238            0 :     pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
     239            0 :         let next: PagestreamBeMessage = self.recv().await?;
     240            0 :         match next {
     241            0 :             PagestreamBeMessage::GetPage(p) => Ok(p),
     242            0 :             PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
     243              :             PagestreamBeMessage::Exists(_)
     244              :             | PagestreamBeMessage::Nblocks(_)
     245              :             | PagestreamBeMessage::DbSize(_)
     246              :             | PagestreamBeMessage::GetSlruSegment(_) => {
     247            0 :                 anyhow::bail!(
     248            0 :                     "unexpected be message kind in response to getpage request: {}",
     249            0 :                     next.kind()
     250            0 :                 )
     251              :             }
     252              :             #[cfg(feature = "testing")]
     253              :             PagestreamBeMessage::Test(_) => {
     254            0 :                 anyhow::bail!(
     255            0 :                     "unexpected be message kind in response to getpage request: {}",
     256            0 :                     next.kind()
     257            0 :                 )
     258              :             }
     259              :         }
     260            0 :     }
     261              : }
         |