Line data Source code
1 : use anyhow::Context as _;
2 : use futures::future::ready;
3 : use futures::{Stream, StreamExt as _, TryStreamExt as _};
4 : use tokio::io::AsyncRead;
5 : use tokio_util::io::StreamReader;
6 : use tonic::codec::CompressionEncoding;
7 : use tonic::metadata::AsciiMetadataValue;
8 : use tonic::service::Interceptor;
9 : use tonic::service::interceptor::InterceptedService;
10 : use tonic::transport::{Channel, Endpoint};
11 :
12 : use utils::id::{TenantId, TimelineId};
13 : use utils::shard::ShardIndex;
14 :
15 : use crate::model::*;
16 : use crate::proto;
17 :
18 : /// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain
19 : /// types from `model` rather than generated Protobuf types.
20 : pub struct Client {
21 : inner: proto::PageServiceClient<InterceptedService<Channel, AuthInterceptor>>,
22 : }
23 :
24 : impl Client {
25 : /// Connects to the given gRPC endpoint.
26 0 : pub async fn connect<E>(
27 0 : endpoint: E,
28 0 : tenant_id: TenantId,
29 0 : timeline_id: TimelineId,
30 0 : shard_id: ShardIndex,
31 0 : auth_token: Option<String>,
32 0 : compression: Option<CompressionEncoding>,
33 0 : ) -> anyhow::Result<Self>
34 0 : where
35 0 : E: TryInto<Endpoint> + Send + Sync + 'static,
36 0 : <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
37 0 : {
38 0 : let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?;
39 0 : let channel = endpoint.connect().await?;
40 0 : Self::new(
41 0 : channel,
42 0 : tenant_id,
43 0 : timeline_id,
44 0 : shard_id,
45 0 : auth_token,
46 0 : compression,
47 : )
48 0 : }
49 :
50 : /// Creates a new client using the given gRPC channel.
51 0 : pub fn new(
52 0 : channel: Channel,
53 0 : tenant_id: TenantId,
54 0 : timeline_id: TimelineId,
55 0 : shard_id: ShardIndex,
56 0 : auth_token: Option<String>,
57 0 : compression: Option<CompressionEncoding>,
58 0 : ) -> anyhow::Result<Self> {
59 0 : let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?;
60 0 : let mut inner = proto::PageServiceClient::with_interceptor(channel, auth);
61 :
62 0 : if let Some(compression) = compression {
63 0 : // TODO: benchmark this (including network latency).
64 0 : inner = inner
65 0 : .accept_compressed(compression)
66 0 : .send_compressed(compression);
67 0 : }
68 :
69 0 : Ok(Self { inner })
70 0 : }
71 :
72 : /// Fetches a base backup.
73 0 : pub async fn get_base_backup(
74 0 : &mut self,
75 0 : req: GetBaseBackupRequest,
76 0 : ) -> tonic::Result<impl AsyncRead + use<>> {
77 0 : let req = proto::GetBaseBackupRequest::from(req);
78 0 : let chunks = self.inner.get_base_backup(req).await?.into_inner();
79 0 : Ok(StreamReader::new(
80 0 : chunks
81 0 : .map_ok(|resp| resp.chunk)
82 0 : .map_err(std::io::Error::other),
83 : ))
84 0 : }
85 :
86 : /// Returns the total size of a database, as # of bytes.
87 0 : pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result<GetDbSizeResponse> {
88 0 : let req = proto::GetDbSizeRequest::from(req);
89 0 : let resp = self.inner.get_db_size(req).await?.into_inner();
90 0 : Ok(resp.into())
91 0 : }
92 :
93 : /// Fetches pages.
94 : ///
95 : /// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are
96 : /// typically returned as status_code instead of errors, to avoid tearing down the entire stream
97 : /// via a tonic::Status error.
98 0 : pub async fn get_pages(
99 0 : &mut self,
100 0 : reqs: impl Stream<Item = GetPageRequest> + Send + 'static,
101 0 : ) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
102 0 : let reqs = reqs.map(proto::GetPageRequest::from);
103 0 : let resps = self.inner.get_pages(reqs).await?.into_inner();
104 0 : Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
105 0 : }
106 :
107 : /// Returns the size of a relation as # of blocks, or None if allow_missing=true and the
108 : /// relation does not exist.
109 0 : pub async fn get_rel_size(
110 0 : &mut self,
111 0 : req: GetRelSizeRequest,
112 0 : ) -> tonic::Result<GetRelSizeResponse> {
113 0 : let req = proto::GetRelSizeRequest::from(req);
114 0 : let resp = self.inner.get_rel_size(req).await?.into_inner();
115 0 : Ok(resp.into())
116 0 : }
117 :
118 : /// Fetches an SLRU segment.
119 0 : pub async fn get_slru_segment(
120 0 : &mut self,
121 0 : req: GetSlruSegmentRequest,
122 0 : ) -> tonic::Result<GetSlruSegmentResponse> {
123 0 : let req = proto::GetSlruSegmentRequest::from(req);
124 0 : let resp = self.inner.get_slru_segment(req).await?.into_inner();
125 0 : Ok(resp.try_into()?)
126 0 : }
127 :
128 : /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
129 : /// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
130 : ///
131 : /// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
132 : /// acquired because the LSN has already been garbage collected.
133 0 : pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result<LeaseLsnResponse> {
134 0 : let req = proto::LeaseLsnRequest::from(req);
135 0 : let resp = self.inner.lease_lsn(req).await?.into_inner();
136 0 : Ok(resp.try_into()?)
137 0 : }
138 : }
139 :
140 : /// Adds authentication metadata to gRPC requests.
141 : #[derive(Clone)]
142 : struct AuthInterceptor {
143 : tenant_id: AsciiMetadataValue,
144 : timeline_id: AsciiMetadataValue,
145 : shard_id: AsciiMetadataValue,
146 : auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
147 : }
148 :
149 : impl AuthInterceptor {
150 0 : fn new(
151 0 : tenant_id: TenantId,
152 0 : timeline_id: TimelineId,
153 0 : shard_id: ShardIndex,
154 0 : auth_token: Option<String>,
155 0 : ) -> anyhow::Result<Self> {
156 : Ok(Self {
157 0 : tenant_id: tenant_id.to_string().try_into()?,
158 0 : timeline_id: timeline_id.to_string().try_into()?,
159 0 : shard_id: shard_id.to_string().try_into()?,
160 0 : auth_header: auth_token
161 0 : .map(|token| format!("Bearer {token}").try_into())
162 0 : .transpose()?,
163 : })
164 0 : }
165 : }
166 :
167 : impl Interceptor for AuthInterceptor {
168 0 : fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
169 0 : let metadata = req.metadata_mut();
170 0 : metadata.insert("neon-tenant-id", self.tenant_id.clone());
171 0 : metadata.insert("neon-timeline-id", self.timeline_id.clone());
172 0 : metadata.insert("neon-shard-id", self.shard_id.clone());
173 0 : if let Some(ref auth_header) = self.auth_header {
174 0 : metadata.insert("authorization", auth_header.clone());
175 0 : }
176 0 : Ok(req)
177 0 : }
178 : }
|