Line data Source code
1 : use anyhow::Context as _;
2 : use futures::future::ready;
3 : use futures::{Stream, StreamExt as _, TryStreamExt as _};
4 : use tokio::io::AsyncRead;
5 : use tokio_util::io::StreamReader;
6 : use tonic::codec::CompressionEncoding;
7 : use tonic::metadata::AsciiMetadataValue;
8 : use tonic::service::Interceptor;
9 : use tonic::service::interceptor::InterceptedService;
10 : use tonic::transport::{Channel, Endpoint};
11 :
12 : use utils::id::{TenantId, TimelineId};
13 : use utils::shard::ShardIndex;
14 :
15 : use crate::model::*;
16 : use crate::proto;
17 :
18 : /// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain
19 : /// types from `model` rather than generated Protobuf types.
20 : pub struct Client {
21 : inner: proto::PageServiceClient<InterceptedService<Channel, AuthInterceptor>>,
22 : }
23 :
24 : impl Client {
25 : /// Connects to the given gRPC endpoint.
26 0 : pub async fn connect<E>(
27 0 : endpoint: E,
28 0 : tenant_id: TenantId,
29 0 : timeline_id: TimelineId,
30 0 : shard_id: ShardIndex,
31 0 : auth_token: Option<String>,
32 0 : compression: Option<CompressionEncoding>,
33 0 : ) -> anyhow::Result<Self>
34 0 : where
35 0 : E: TryInto<Endpoint> + Send + Sync + 'static,
36 0 : <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
37 0 : {
38 0 : let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?;
39 0 : let channel = endpoint.connect().await?;
40 0 : Self::new(
41 0 : channel,
42 0 : tenant_id,
43 0 : timeline_id,
44 0 : shard_id,
45 0 : auth_token,
46 0 : compression,
47 : )
48 0 : }
49 :
50 : /// Creates a new client using the given gRPC channel.
51 0 : pub fn new(
52 0 : channel: Channel,
53 0 : tenant_id: TenantId,
54 0 : timeline_id: TimelineId,
55 0 : shard_id: ShardIndex,
56 0 : auth_token: Option<String>,
57 0 : compression: Option<CompressionEncoding>,
58 0 : ) -> anyhow::Result<Self> {
59 0 : let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?;
60 0 : let mut inner = proto::PageServiceClient::with_interceptor(channel, auth);
61 :
62 0 : if let Some(compression) = compression {
63 0 : // TODO: benchmark this (including network latency).
64 0 : inner = inner
65 0 : .accept_compressed(compression)
66 0 : .send_compressed(compression);
67 0 : }
68 :
69 0 : Ok(Self { inner })
70 0 : }
71 :
72 : /// Returns whether a relation exists.
73 0 : pub async fn check_rel_exists(
74 0 : &mut self,
75 0 : req: CheckRelExistsRequest,
76 0 : ) -> tonic::Result<CheckRelExistsResponse> {
77 0 : let req = proto::CheckRelExistsRequest::from(req);
78 0 : let resp = self.inner.check_rel_exists(req).await?.into_inner();
79 0 : Ok(resp.into())
80 0 : }
81 :
82 : /// Fetches a base backup.
83 0 : pub async fn get_base_backup(
84 0 : &mut self,
85 0 : req: GetBaseBackupRequest,
86 0 : ) -> tonic::Result<impl AsyncRead + use<>> {
87 0 : let req = proto::GetBaseBackupRequest::from(req);
88 0 : let chunks = self.inner.get_base_backup(req).await?.into_inner();
89 0 : Ok(StreamReader::new(
90 0 : chunks
91 0 : .map_ok(|resp| resp.chunk)
92 0 : .map_err(std::io::Error::other),
93 : ))
94 0 : }
95 :
96 : /// Returns the total size of a database, as # of bytes.
97 0 : pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result<GetDbSizeResponse> {
98 0 : let req = proto::GetDbSizeRequest::from(req);
99 0 : let resp = self.inner.get_db_size(req).await?.into_inner();
100 0 : Ok(resp.into())
101 0 : }
102 :
103 : /// Fetches pages.
104 : ///
105 : /// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are
106 : /// typically returned as status_code instead of errors, to avoid tearing down the entire stream
107 : /// via a tonic::Status error.
108 0 : pub async fn get_pages(
109 0 : &mut self,
110 0 : reqs: impl Stream<Item = GetPageRequest> + Send + 'static,
111 0 : ) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
112 0 : let reqs = reqs.map(proto::GetPageRequest::from);
113 0 : let resps = self.inner.get_pages(reqs).await?.into_inner();
114 0 : Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
115 0 : }
116 :
117 : /// Returns the size of a relation, as # of blocks.
118 0 : pub async fn get_rel_size(
119 0 : &mut self,
120 0 : req: GetRelSizeRequest,
121 0 : ) -> tonic::Result<GetRelSizeResponse> {
122 0 : let req = proto::GetRelSizeRequest::from(req);
123 0 : let resp = self.inner.get_rel_size(req).await?.into_inner();
124 0 : Ok(resp.into())
125 0 : }
126 :
127 : /// Fetches an SLRU segment.
128 0 : pub async fn get_slru_segment(
129 0 : &mut self,
130 0 : req: GetSlruSegmentRequest,
131 0 : ) -> tonic::Result<GetSlruSegmentResponse> {
132 0 : let req = proto::GetSlruSegmentRequest::from(req);
133 0 : let resp = self.inner.get_slru_segment(req).await?.into_inner();
134 0 : Ok(resp.try_into()?)
135 0 : }
136 :
137 : /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
138 : /// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
139 : ///
140 : /// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
141 : /// acquired because the LSN has already been garbage collected.
142 0 : pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result<LeaseLsnResponse> {
143 0 : let req = proto::LeaseLsnRequest::from(req);
144 0 : let resp = self.inner.lease_lsn(req).await?.into_inner();
145 0 : Ok(resp.try_into()?)
146 0 : }
147 : }
148 :
149 : /// Adds authentication metadata to gRPC requests.
150 : #[derive(Clone)]
151 : struct AuthInterceptor {
152 : tenant_id: AsciiMetadataValue,
153 : timeline_id: AsciiMetadataValue,
154 : shard_id: AsciiMetadataValue,
155 : auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
156 : }
157 :
158 : impl AuthInterceptor {
159 0 : fn new(
160 0 : tenant_id: TenantId,
161 0 : timeline_id: TimelineId,
162 0 : shard_id: ShardIndex,
163 0 : auth_token: Option<String>,
164 0 : ) -> anyhow::Result<Self> {
165 : Ok(Self {
166 0 : tenant_id: tenant_id.to_string().try_into()?,
167 0 : timeline_id: timeline_id.to_string().try_into()?,
168 0 : shard_id: shard_id.to_string().try_into()?,
169 0 : auth_header: auth_token
170 0 : .map(|token| format!("Bearer {token}").try_into())
171 0 : .transpose()?,
172 : })
173 0 : }
174 : }
175 :
176 : impl Interceptor for AuthInterceptor {
177 0 : fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
178 0 : let metadata = req.metadata_mut();
179 0 : metadata.insert("neon-tenant-id", self.tenant_id.clone());
180 0 : metadata.insert("neon-timeline-id", self.timeline_id.clone());
181 0 : metadata.insert("neon-shard-id", self.shard_id.clone());
182 0 : if let Some(ref auth_header) = self.auth_header {
183 0 : metadata.insert("authorization", auth_header.clone());
184 0 : }
185 0 : Ok(req)
186 0 : }
187 : }
|