LCOV - code coverage report
Current view: top level - pageserver/client/src - page_service.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 0.0 % 142 0
Test Date: 2025-03-12 16:10:49 Functions: 0.0 % 24 0

            Line data    Source code
       1              : use std::sync::{Arc, Mutex};
       2              : 
       3              : use futures::stream::{SplitSink, SplitStream};
       4              : use futures::{SinkExt, StreamExt};
       5              : use pageserver_api::models::{
       6              :     PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
       7              : };
       8              : use pageserver_api::reltag::RelTag;
       9              : use tokio::task::JoinHandle;
      10              : use tokio_postgres::CopyOutStream;
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::id::{TenantId, TimelineId};
      13              : use utils::lsn::Lsn;
      14              : 
      15              : pub struct Client {
      16              :     client: tokio_postgres::Client,
      17              :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
      18              :     conn_task: JoinHandle<()>,
      19              : }
      20              : 
      21              : pub struct BasebackupRequest {
      22              :     pub tenant_id: TenantId,
      23              :     pub timeline_id: TimelineId,
      24              :     pub lsn: Option<Lsn>,
      25              :     pub gzip: bool,
      26              : }
      27              : 
      28              : impl Client {
      29            0 :     pub async fn new(connstring: String) -> anyhow::Result<Self> {
      30            0 :         let (client, connection) =
      31            0 :             tokio_postgres::connect(&connstring, tokio_postgres::NoTls).await?;
      32              : 
      33            0 :         let conn_task_cancel = CancellationToken::new();
      34            0 :         let conn_task = tokio::spawn({
      35            0 :             let conn_task_cancel = conn_task_cancel.clone();
      36            0 :             async move {
      37            0 :                 tokio::select! {
      38            0 :                     _ = conn_task_cancel.cancelled() => { }
      39            0 :                     res = connection => {
      40            0 :                         res.unwrap();
      41            0 :                     }
      42              :                 }
      43            0 :             }
      44            0 :         });
      45            0 :         Ok(Self {
      46            0 :             cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
      47            0 :             conn_task,
      48            0 :             client,
      49            0 :         })
      50            0 :     }
      51              : 
      52            0 :     pub async fn pagestream(
      53            0 :         self,
      54            0 :         tenant_id: TenantId,
      55            0 :         timeline_id: TimelineId,
      56            0 :     ) -> anyhow::Result<PagestreamClient> {
      57            0 :         let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
      58            0 :             .client
      59            0 :             .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
      60            0 :             .await?;
      61            0 :         let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
      62            0 :         let Client {
      63            0 :             cancel_on_client_drop,
      64            0 :             conn_task,
      65            0 :             client: _,
      66            0 :         } = self;
      67            0 :         let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
      68            0 :             ConnTaskRunning {
      69            0 :                 cancel_on_client_drop,
      70            0 :                 conn_task,
      71            0 :             },
      72            0 :         )));
      73            0 :         Ok(PagestreamClient {
      74            0 :             sink: PagestreamSender {
      75            0 :                 shared: shared.clone(),
      76            0 :                 sink,
      77            0 :             },
      78            0 :             stream: PagestreamReceiver {
      79            0 :                 shared: shared.clone(),
      80            0 :                 stream,
      81            0 :             },
      82            0 :             shared,
      83            0 :         })
      84            0 :     }
      85              : 
      86            0 :     pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
      87            0 :         let BasebackupRequest {
      88            0 :             tenant_id,
      89            0 :             timeline_id,
      90            0 :             lsn,
      91            0 :             gzip,
      92            0 :         } = req;
      93            0 :         let mut args = Vec::with_capacity(5);
      94            0 :         args.push("basebackup".to_string());
      95            0 :         args.push(format!("{tenant_id}"));
      96            0 :         args.push(format!("{timeline_id}"));
      97            0 :         if let Some(lsn) = lsn {
      98            0 :             args.push(format!("{lsn}"));
      99            0 :         }
     100            0 :         if *gzip {
     101            0 :             args.push("--gzip".to_string())
     102            0 :         }
     103            0 :         Ok(self.client.copy_out(&args.join(" ")).await?)
     104            0 :     }
     105              : }
     106              : 
     107              : /// Create using [`Client::pagestream`].
     108              : pub struct PagestreamClient {
     109              :     shared: Arc<Mutex<PagestreamShared>>,
     110              :     sink: PagestreamSender,
     111              :     stream: PagestreamReceiver,
     112              : }
     113              : 
     114              : pub struct PagestreamSender {
     115              :     #[allow(dead_code)]
     116              :     shared: Arc<Mutex<PagestreamShared>>,
     117              :     sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
     118              : }
     119              : 
     120              : pub struct PagestreamReceiver {
     121              :     #[allow(dead_code)]
     122              :     shared: Arc<Mutex<PagestreamShared>>,
     123              :     stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
     124              : }
     125              : 
     126              : enum PagestreamShared {
     127              :     ConnTaskRunning(ConnTaskRunning),
     128              :     ConnTaskCancelledJoinHandleReturnedOrDropped,
     129              : }
     130              : struct ConnTaskRunning {
     131              :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
     132              :     conn_task: JoinHandle<()>,
     133              : }
     134              : 
     135              : pub struct RelTagBlockNo {
     136              :     pub rel_tag: RelTag,
     137              :     pub block_no: u32,
     138              : }
     139              : 
     140              : impl PagestreamClient {
     141            0 :     pub async fn shutdown(self) {
     142            0 :         let Self {
     143            0 :             shared,
     144            0 :             sink,
     145            0 :             stream,
     146            0 :         } = { self };
     147              :         // The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
     148              :         // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
     149              :         // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
     150              :         //
     151              :         // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`,
     152              :         // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race).
     153              :         //
     154              :         // Further, the pageserver makes a lot of noise when it receives CopyFail.
     155              :         // Computes don't send it in practice, they just hard-close the connection.
     156              :         //
     157              :         // So, let's behave like the computes and suppress the CopyFail as follows:
     158              :         // kill the socket first, then drop copy_both.
     159              :         //
     160              :         // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
     161              :         //
     162              :         // NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
     163              :         // => https://github.com/neondatabase/neon/issues/6390
     164              :         let ConnTaskRunning {
     165            0 :             cancel_on_client_drop,
     166            0 :             conn_task,
     167              :         } = {
     168            0 :             let mut guard = shared.lock().unwrap();
     169            0 :             match std::mem::replace(
     170            0 :                 &mut *guard,
     171            0 :                 PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
     172            0 :             ) {
     173            0 :                 PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
     174            0 :                 PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
     175              :             }
     176              :         };
     177            0 :         let _ = cancel_on_client_drop.unwrap();
     178            0 :         conn_task.await.unwrap();
     179            0 : 
     180            0 :         // Now drop the split copy_both.
     181            0 :         drop(sink);
     182            0 :         drop(stream);
     183            0 :     }
     184              : 
     185            0 :     pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
     186            0 :         let Self {
     187            0 :             shared: _,
     188            0 :             sink,
     189            0 :             stream,
     190            0 :         } = self;
     191            0 :         (sink, stream)
     192            0 :     }
     193              : 
     194            0 :     pub async fn getpage(
     195            0 :         &mut self,
     196            0 :         req: PagestreamGetPageRequest,
     197            0 :     ) -> anyhow::Result<PagestreamGetPageResponse> {
     198            0 :         self.getpage_send(req).await?;
     199            0 :         self.getpage_recv().await
     200            0 :     }
     201              : 
     202            0 :     pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
     203            0 :         self.sink.getpage_send(req).await
     204            0 :     }
     205              : 
     206            0 :     pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
     207            0 :         self.stream.getpage_recv().await
     208            0 :     }
     209              : }
     210              : 
     211              : impl PagestreamSender {
     212              :     // TODO: maybe make this impl Sink instead for better composability?
     213            0 :     pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
     214            0 :         let msg = msg.serialize();
     215            0 :         self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
     216            0 :         Ok(())
     217            0 :     }
     218              : 
     219            0 :     pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
     220            0 :         self.send(PagestreamFeMessage::GetPage(req)).await
     221            0 :     }
     222              : }
     223              : 
     224              : impl PagestreamReceiver {
     225              :     // TODO: maybe make this impl Stream instead for better composability?
     226            0 :     pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
     227            0 :         let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
     228            0 :         let next: bytes::Bytes = next.unwrap()?;
     229            0 :         PagestreamBeMessage::deserialize(next)
     230            0 :     }
     231              : 
     232            0 :     pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
     233            0 :         let next: PagestreamBeMessage = self.recv().await?;
     234            0 :         match next {
     235            0 :             PagestreamBeMessage::GetPage(p) => Ok(p),
     236            0 :             PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
     237              :             PagestreamBeMessage::Exists(_)
     238              :             | PagestreamBeMessage::Nblocks(_)
     239              :             | PagestreamBeMessage::DbSize(_)
     240              :             | PagestreamBeMessage::GetSlruSegment(_) => {
     241            0 :                 anyhow::bail!(
     242            0 :                     "unexpected be message kind in response to getpage request: {}",
     243            0 :                     next.kind()
     244            0 :                 )
     245              :             }
     246              :             #[cfg(feature = "testing")]
     247              :             PagestreamBeMessage::Test(_) => {
     248            0 :                 anyhow::bail!(
     249            0 :                     "unexpected be message kind in response to getpage request: {}",
     250            0 :                     next.kind()
     251            0 :                 )
     252              :             }
     253              :         }
     254            0 :     }
     255              : }
        

Generated by: LCOV version 2.1-beta