LCOV - code coverage report
Current view: top level - pageserver/page_api/src - client.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 116 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 36 0

            Line data    Source code
       1              : use anyhow::Context as _;
       2              : use futures::future::ready;
       3              : use futures::{Stream, StreamExt as _, TryStreamExt as _};
       4              : use tokio::io::AsyncRead;
       5              : use tokio_util::io::StreamReader;
       6              : use tonic::codec::CompressionEncoding;
       7              : use tonic::metadata::AsciiMetadataValue;
       8              : use tonic::service::Interceptor;
       9              : use tonic::service::interceptor::InterceptedService;
      10              : use tonic::transport::{Channel, Endpoint};
      11              : 
      12              : use utils::id::{TenantId, TimelineId};
      13              : use utils::shard::ShardIndex;
      14              : 
      15              : use crate::model::*;
      16              : use crate::proto;
      17              : 
      18              : /// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain
      19              : /// types from `model` rather than generated Protobuf types.
      20              : pub struct Client {
      21              :     inner: proto::PageServiceClient<InterceptedService<Channel, AuthInterceptor>>,
      22              : }
      23              : 
      24              : impl Client {
      25              :     /// Connects to the given gRPC endpoint.
      26            0 :     pub async fn connect<E>(
      27            0 :         endpoint: E,
      28            0 :         tenant_id: TenantId,
      29            0 :         timeline_id: TimelineId,
      30            0 :         shard_id: ShardIndex,
      31            0 :         auth_token: Option<String>,
      32            0 :         compression: Option<CompressionEncoding>,
      33            0 :     ) -> anyhow::Result<Self>
      34            0 :     where
      35            0 :         E: TryInto<Endpoint> + Send + Sync + 'static,
      36            0 :         <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
      37            0 :     {
      38            0 :         let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?;
      39            0 :         let channel = endpoint.connect().await?;
      40            0 :         Self::new(
      41            0 :             channel,
      42            0 :             tenant_id,
      43            0 :             timeline_id,
      44            0 :             shard_id,
      45            0 :             auth_token,
      46            0 :             compression,
      47              :         )
      48            0 :     }
      49              : 
      50              :     /// Creates a new client using the given gRPC channel.
      51            0 :     pub fn new(
      52            0 :         channel: Channel,
      53            0 :         tenant_id: TenantId,
      54            0 :         timeline_id: TimelineId,
      55            0 :         shard_id: ShardIndex,
      56            0 :         auth_token: Option<String>,
      57            0 :         compression: Option<CompressionEncoding>,
      58            0 :     ) -> anyhow::Result<Self> {
      59            0 :         let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?;
      60            0 :         let mut inner = proto::PageServiceClient::with_interceptor(channel, auth);
      61              : 
      62            0 :         if let Some(compression) = compression {
      63            0 :             // TODO: benchmark this (including network latency).
      64            0 :             inner = inner
      65            0 :                 .accept_compressed(compression)
      66            0 :                 .send_compressed(compression);
      67            0 :         }
      68              : 
      69            0 :         Ok(Self { inner })
      70            0 :     }
      71              : 
      72              :     /// Returns whether a relation exists.
      73            0 :     pub async fn check_rel_exists(
      74            0 :         &mut self,
      75            0 :         req: CheckRelExistsRequest,
      76            0 :     ) -> tonic::Result<CheckRelExistsResponse> {
      77            0 :         let req = proto::CheckRelExistsRequest::from(req);
      78            0 :         let resp = self.inner.check_rel_exists(req).await?.into_inner();
      79            0 :         Ok(resp.into())
      80            0 :     }
      81              : 
      82              :     /// Fetches a base backup.
      83            0 :     pub async fn get_base_backup(
      84            0 :         &mut self,
      85            0 :         req: GetBaseBackupRequest,
      86            0 :     ) -> tonic::Result<impl AsyncRead + use<>> {
      87            0 :         let req = proto::GetBaseBackupRequest::from(req);
      88            0 :         let chunks = self.inner.get_base_backup(req).await?.into_inner();
      89            0 :         Ok(StreamReader::new(
      90            0 :             chunks
      91            0 :                 .map_ok(|resp| resp.chunk)
      92            0 :                 .map_err(std::io::Error::other),
      93              :         ))
      94            0 :     }
      95              : 
      96              :     /// Returns the total size of a database, as # of bytes.
      97            0 :     pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result<GetDbSizeResponse> {
      98            0 :         let req = proto::GetDbSizeRequest::from(req);
      99            0 :         let resp = self.inner.get_db_size(req).await?.into_inner();
     100            0 :         Ok(resp.into())
     101            0 :     }
     102              : 
     103              :     /// Fetches pages.
     104              :     ///
     105              :     /// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are
     106              :     /// typically returned as status_code instead of errors, to avoid tearing down the entire stream
     107              :     /// via a tonic::Status error.
     108            0 :     pub async fn get_pages(
     109            0 :         &mut self,
     110            0 :         reqs: impl Stream<Item = GetPageRequest> + Send + 'static,
     111            0 :     ) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
     112            0 :         let reqs = reqs.map(proto::GetPageRequest::from);
     113            0 :         let resps = self.inner.get_pages(reqs).await?.into_inner();
     114            0 :         Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
     115            0 :     }
     116              : 
     117              :     /// Returns the size of a relation, as # of blocks.
     118            0 :     pub async fn get_rel_size(
     119            0 :         &mut self,
     120            0 :         req: GetRelSizeRequest,
     121            0 :     ) -> tonic::Result<GetRelSizeResponse> {
     122            0 :         let req = proto::GetRelSizeRequest::from(req);
     123            0 :         let resp = self.inner.get_rel_size(req).await?.into_inner();
     124            0 :         Ok(resp.into())
     125            0 :     }
     126              : 
     127              :     /// Fetches an SLRU segment.
     128            0 :     pub async fn get_slru_segment(
     129            0 :         &mut self,
     130            0 :         req: GetSlruSegmentRequest,
     131            0 :     ) -> tonic::Result<GetSlruSegmentResponse> {
     132            0 :         let req = proto::GetSlruSegmentRequest::from(req);
     133            0 :         let resp = self.inner.get_slru_segment(req).await?.into_inner();
     134            0 :         Ok(resp.try_into()?)
     135            0 :     }
     136              : 
     137              :     /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
     138              :     /// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
     139              :     ///
     140              :     /// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
     141              :     /// acquired because the LSN has already been garbage collected.
     142            0 :     pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result<LeaseLsnResponse> {
     143            0 :         let req = proto::LeaseLsnRequest::from(req);
     144            0 :         let resp = self.inner.lease_lsn(req).await?.into_inner();
     145            0 :         Ok(resp.try_into()?)
     146            0 :     }
     147              : }
     148              : 
     149              : /// Adds authentication metadata to gRPC requests.
     150              : #[derive(Clone)]
     151              : struct AuthInterceptor {
     152              :     tenant_id: AsciiMetadataValue,
     153              :     timeline_id: AsciiMetadataValue,
     154              :     shard_id: AsciiMetadataValue,
     155              :     auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
     156              : }
     157              : 
     158              : impl AuthInterceptor {
     159            0 :     fn new(
     160            0 :         tenant_id: TenantId,
     161            0 :         timeline_id: TimelineId,
     162            0 :         shard_id: ShardIndex,
     163            0 :         auth_token: Option<String>,
     164            0 :     ) -> anyhow::Result<Self> {
     165              :         Ok(Self {
     166            0 :             tenant_id: tenant_id.to_string().try_into()?,
     167            0 :             timeline_id: timeline_id.to_string().try_into()?,
     168            0 :             shard_id: shard_id.to_string().try_into()?,
     169            0 :             auth_header: auth_token
     170            0 :                 .map(|token| format!("Bearer {token}").try_into())
     171            0 :                 .transpose()?,
     172              :         })
     173            0 :     }
     174              : }
     175              : 
     176              : impl Interceptor for AuthInterceptor {
     177            0 :     fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
     178            0 :         let metadata = req.metadata_mut();
     179            0 :         metadata.insert("neon-tenant-id", self.tenant_id.clone());
     180            0 :         metadata.insert("neon-timeline-id", self.timeline_id.clone());
     181            0 :         metadata.insert("neon-shard-id", self.shard_id.clone());
     182            0 :         if let Some(ref auth_header) = self.auth_header {
     183            0 :             metadata.insert("authorization", auth_header.clone());
     184            0 :         }
     185            0 :         Ok(req)
     186            0 :     }
     187              : }
        

Generated by: LCOV version 2.1-beta