LCOV - differential code coverage report
Current view: top level - pageserver/client/src - page_service.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 79 0 79
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 12 0 12
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::pin::Pin;
       2                 : 
       3                 : use futures::SinkExt;
       4                 : use pageserver_api::{
       5                 :     models::{
       6                 :         PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
       7                 :         PagestreamGetPageResponse,
       8                 :     },
       9                 :     reltag::RelTag,
      10                 : };
      11                 : use tokio::task::JoinHandle;
      12                 : use tokio_postgres::CopyOutStream;
      13                 : use tokio_stream::StreamExt;
      14                 : use tokio_util::sync::CancellationToken;
      15                 : use utils::{
      16                 :     id::{TenantId, TimelineId},
      17                 :     lsn::Lsn,
      18                 : };
      19                 : 
      20                 : pub struct Client {
      21                 :     client: tokio_postgres::Client,
      22                 :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
      23                 :     conn_task: JoinHandle<()>,
      24                 : }
      25                 : 
      26                 : pub struct BasebackupRequest {
      27                 :     pub tenant_id: TenantId,
      28                 :     pub timeline_id: TimelineId,
      29                 :     pub lsn: Option<Lsn>,
      30                 :     pub gzip: bool,
      31                 : }
      32                 : 
      33                 : impl Client {
      34 UBC           0 :     pub async fn new(connstring: String) -> anyhow::Result<Self> {
      35               0 :         let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
      36                 : 
      37               0 :         let conn_task_cancel = CancellationToken::new();
      38               0 :         let conn_task = tokio::spawn({
      39               0 :             let conn_task_cancel = conn_task_cancel.clone();
      40               0 :             async move {
      41               0 :                 tokio::select! {
      42                 :                     _ = conn_task_cancel.cancelled() => { }
      43               0 :                     res = connection => {
      44                 :                         res.unwrap();
      45                 :                     }
      46                 :                 }
      47               0 :             }
      48               0 :         });
      49               0 :         Ok(Self {
      50               0 :             cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
      51               0 :             conn_task,
      52               0 :             client,
      53               0 :         })
      54               0 :     }
      55                 : 
      56               0 :     pub async fn pagestream(
      57               0 :         self,
      58               0 :         tenant_id: TenantId,
      59               0 :         timeline_id: TimelineId,
      60               0 :     ) -> anyhow::Result<PagestreamClient> {
      61               0 :         let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
      62               0 :             .client
      63               0 :             .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
      64               0 :             .await?;
      65                 :         let Client {
      66               0 :             cancel_on_client_drop,
      67               0 :             conn_task,
      68               0 :             client: _,
      69               0 :         } = self;
      70               0 :         Ok(PagestreamClient {
      71               0 :             copy_both: Box::pin(copy_both),
      72               0 :             conn_task,
      73               0 :             cancel_on_client_drop,
      74               0 :         })
      75               0 :     }
      76                 : 
      77               0 :     pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
      78               0 :         let BasebackupRequest {
      79               0 :             tenant_id,
      80               0 :             timeline_id,
      81               0 :             lsn,
      82               0 :             gzip,
      83               0 :         } = req;
      84               0 :         let mut args = Vec::with_capacity(5);
      85               0 :         args.push("basebackup".to_string());
      86               0 :         args.push(format!("{tenant_id}"));
      87               0 :         args.push(format!("{timeline_id}"));
      88               0 :         if let Some(lsn) = lsn {
      89               0 :             args.push(format!("{lsn}"));
      90               0 :         }
      91               0 :         if *gzip {
      92               0 :             args.push("--gzip".to_string())
      93               0 :         }
      94               0 :         Ok(self.client.copy_out(&args.join(" ")).await?)
      95               0 :     }
      96                 : }
      97                 : 
      98                 : /// Create using [`Client::pagestream`].
      99                 : pub struct PagestreamClient {
     100                 :     copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
     101                 :     cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
     102                 :     conn_task: JoinHandle<()>,
     103                 : }
     104                 : 
     105                 : pub struct RelTagBlockNo {
     106                 :     pub rel_tag: RelTag,
     107                 :     pub block_no: u32,
     108                 : }
     109                 : 
     110                 : impl PagestreamClient {
     111               0 :     pub async fn shutdown(mut self) {
     112               0 :         let _ = self.cancel_on_client_drop.take();
     113               0 :         self.conn_task.await.unwrap();
     114               0 :     }
     115                 : 
     116               0 :     pub async fn getpage(
     117               0 :         &mut self,
     118               0 :         req: PagestreamGetPageRequest,
     119               0 :     ) -> anyhow::Result<PagestreamGetPageResponse> {
     120               0 :         let req = PagestreamFeMessage::GetPage(req);
     121               0 :         let req: bytes::Bytes = req.serialize();
     122               0 :         // let mut req = tokio_util::io::ReaderStream::new(&req);
     123               0 :         let mut req = tokio_stream::once(Ok(req));
     124               0 : 
     125               0 :         self.copy_both.send_all(&mut req).await?;
     126                 : 
     127               0 :         let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
     128               0 :         let next: bytes::Bytes = next.unwrap()?;
     129                 : 
     130               0 :         let msg = PagestreamBeMessage::deserialize(next)?;
     131               0 :         match msg {
     132               0 :             PagestreamBeMessage::GetPage(p) => Ok(p),
     133               0 :             PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
     134                 :             PagestreamBeMessage::Exists(_)
     135                 :             | PagestreamBeMessage::Nblocks(_)
     136                 :             | PagestreamBeMessage::DbSize(_) => {
     137               0 :                 anyhow::bail!(
     138               0 :                     "unexpected be message kind in response to getpage request: {}",
     139               0 :                     msg.kind()
     140               0 :                 )
     141                 :             }
     142                 :         }
     143               0 :     }
     144                 : }
        

Generated by: LCOV version 2.1-beta