Line data Source code
1 : use std::collections::HashMap;
2 : use std::num::NonZero;
3 : use std::sync::Arc;
4 :
5 : use anyhow::anyhow;
6 : use arc_swap::ArcSwap;
7 : use futures::stream::FuturesUnordered;
8 : use futures::{FutureExt as _, StreamExt as _};
9 : use tonic::codec::CompressionEncoding;
10 : use tracing::instrument;
11 :
12 : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
13 : use crate::retry::Retry;
14 : use crate::split::GetPageSplitter;
15 : use compute_api::spec::PageserverProtocol;
16 : use pageserver_api::shard::ShardStripeSize;
17 : use pageserver_page_api as page_api;
18 : use utils::id::{TenantId, TimelineId};
19 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
20 :
21 : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
22 : /// when full.
23 : ///
24 : /// TODO: tune all of these constants, and consider making them configurable.
25 : /// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
26 : /// with only streams.
27 : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
28 :
29 : /// Max number of concurrent unary request clients per shard.
30 : const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
31 :
32 : /// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
33 : /// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
34 : const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
35 :
36 : /// Max number of pipelined requests per stream.
37 : const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
38 :
39 : /// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
40 : /// are more throughput-oriented, we have a smaller limit but higher queue depth.
41 : const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
42 :
43 : /// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
44 : /// get a larger queue depth.
45 : const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
46 :
47 : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
48 : /// basic `page_api::Client` gRPC client, and supports:
49 : ///
50 : /// * Sharded tenants across multiple Pageservers.
51 : /// * Pooling of connections, clients, and streams for efficient resource use.
52 : /// * Concurrent use by many callers.
53 : /// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
54 : /// * Automatic retries.
55 : /// * Observability.
56 : ///
57 : /// TODO: this client does not support base backups or LSN leases, as these are only used by
58 : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
59 : pub struct PageserverClient {
60 : /// The tenant ID.
61 : tenant_id: TenantId,
62 : /// The timeline ID.
63 : timeline_id: TimelineId,
64 : /// The JWT auth token for this tenant, if any.
65 : auth_token: Option<String>,
66 : /// The compression to use, if any.
67 : compression: Option<CompressionEncoding>,
68 : /// The shards for this tenant.
69 : shards: ArcSwap<Shards>,
70 : /// The retry configuration.
71 : retry: Retry,
72 : }
73 :
74 : impl PageserverClient {
75 : /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
76 : /// in the shard spec, which must be complete and must use gRPC URLs.
77 0 : pub fn new(
78 0 : tenant_id: TenantId,
79 0 : timeline_id: TimelineId,
80 0 : shard_spec: ShardSpec,
81 0 : auth_token: Option<String>,
82 0 : compression: Option<CompressionEncoding>,
83 0 : ) -> anyhow::Result<Self> {
84 0 : let shards = Shards::new(
85 0 : tenant_id,
86 0 : timeline_id,
87 0 : shard_spec,
88 0 : auth_token.clone(),
89 0 : compression,
90 0 : )?;
91 0 : Ok(Self {
92 0 : tenant_id,
93 0 : timeline_id,
94 0 : auth_token,
95 0 : compression,
96 0 : shards: ArcSwap::new(Arc::new(shards)),
97 0 : retry: Retry,
98 0 : })
99 0 : }
100 :
101 : /// Updates the shards from the given shard spec. In-flight requests will complete using the
102 : /// existing shards, but may retry with the new shards if they fail.
103 : ///
104 : /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
105 : /// properly spun down and dropped afterwards.
106 0 : pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
107 : // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
108 : // with concurrent updates, but that involves creating a new `Shards` on every attempt,
109 : // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
110 : // in the stack, and if they're violated then we already have problems elsewhere, so a
111 : // best-effort but possibly-racy check is okay here.
112 0 : let old = self.shards.load_full();
113 0 : if shard_spec.count < old.count {
114 0 : return Err(anyhow!(
115 0 : "can't reduce shard count from {} to {}",
116 0 : old.count,
117 0 : shard_spec.count
118 0 : ));
119 0 : }
120 0 : if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
121 0 : return Err(anyhow!(
122 0 : "can't change stripe size from {} to {}",
123 0 : old.stripe_size,
124 0 : shard_spec.stripe_size
125 0 : ));
126 0 : }
127 :
128 0 : let shards = Shards::new(
129 0 : self.tenant_id,
130 0 : self.timeline_id,
131 0 : shard_spec,
132 0 : self.auth_token.clone(),
133 0 : self.compression,
134 0 : )?;
135 0 : self.shards.store(Arc::new(shards));
136 0 : Ok(())
137 0 : }
138 :
139 : /// Returns whether a relation exists.
140 : #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
141 : pub async fn check_rel_exists(
142 : &self,
143 : req: page_api::CheckRelExistsRequest,
144 : ) -> tonic::Result<page_api::CheckRelExistsResponse> {
145 : self.retry
146 0 : .with(async |_| {
147 : // Relation metadata is only available on shard 0.
148 0 : let mut client = self.shards.load_full().get_zero().client().await?;
149 0 : client.check_rel_exists(req).await
150 0 : })
151 : .await
152 : }
153 :
154 : /// Returns the total size of a database, as # of bytes.
155 : #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
156 : pub async fn get_db_size(
157 : &self,
158 : req: page_api::GetDbSizeRequest,
159 : ) -> tonic::Result<page_api::GetDbSizeResponse> {
160 : self.retry
161 0 : .with(async |_| {
162 : // Relation metadata is only available on shard 0.
163 0 : let mut client = self.shards.load_full().get_zero().client().await?;
164 0 : client.get_db_size(req).await
165 0 : })
166 : .await
167 : }
168 :
169 : /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
170 : /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
171 : /// shard boundaries, and assembles the responses.
172 : ///
173 : /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
174 : /// errors. All responses will have `GetPageStatusCode::Ok`.
175 : #[instrument(skip_all, fields(
176 : req_id = %req.request_id,
177 : class = %req.request_class,
178 : rel = %req.rel,
179 0 : blkno = %req.block_numbers[0],
180 : blks = %req.block_numbers.len(),
181 : lsn = %req.read_lsn,
182 : ))]
183 : pub async fn get_page(
184 : &self,
185 : req: page_api::GetPageRequest,
186 : ) -> tonic::Result<page_api::GetPageResponse> {
187 : // Make sure we have at least one page.
188 : if req.block_numbers.is_empty() {
189 : return Err(tonic::Status::invalid_argument("no block number"));
190 : }
191 : // The request attempt must be 0. The client will increment it internally.
192 : if req.request_id.attempt != 0 {
193 : return Err(tonic::Status::invalid_argument("request attempt must be 0"));
194 : }
195 :
196 : // The shards may change while we're fetching pages. We execute the request using a stable
197 : // view of the shards (especially important for requests that span shards), but retry the
198 : // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
199 : // retries and re-splits in some cases where requests span shards, but these are expected to
200 : // be rare.
201 : //
202 : // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
203 : // once we figure out how to handle these.
204 : self.retry
205 0 : .with(async |attempt| {
206 0 : let mut req = req.clone();
207 0 : req.request_id.attempt = attempt as u32;
208 0 : Self::get_page_with_shards(req, &self.shards.load_full()).await
209 0 : })
210 : .await
211 : }
212 :
213 : /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
214 : /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
215 0 : async fn get_page_with_shards(
216 0 : req: page_api::GetPageRequest,
217 0 : shards: &Shards,
218 0 : ) -> tonic::Result<page_api::GetPageResponse> {
219 : // Fast path: request is for a single shard.
220 0 : if let Some(shard_id) =
221 0 : GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
222 : {
223 0 : return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
224 0 : }
225 :
226 : // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
227 : // reassemble the responses.
228 0 : let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
229 :
230 0 : let mut shard_requests = FuturesUnordered::new();
231 0 : for (shard_id, shard_req) in splitter.drain_requests() {
232 0 : let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
233 0 : .map(move |result| result.map(|resp| (shard_id, resp)));
234 0 : shard_requests.push(future);
235 : }
236 :
237 0 : while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
238 0 : splitter.add_response(shard_id, shard_response)?;
239 : }
240 :
241 0 : splitter.get_response()
242 0 : }
243 :
244 : /// Fetches pages on the given shard. Does not retry internally.
245 0 : async fn get_page_with_shard(
246 0 : req: page_api::GetPageRequest,
247 0 : shard: &Shard,
248 0 : ) -> tonic::Result<page_api::GetPageResponse> {
249 0 : let stream = shard.stream(req.request_class.is_bulk()).await;
250 0 : let resp = stream.send(req.clone()).await?;
251 :
252 : // Convert per-request errors into a tonic::Status.
253 0 : if resp.status_code != page_api::GetPageStatusCode::Ok {
254 0 : return Err(tonic::Status::new(
255 0 : resp.status_code.into(),
256 0 : resp.reason.unwrap_or_else(|| String::from("unknown error")),
257 : ));
258 0 : }
259 :
260 : // Check that we received the expected pages.
261 0 : if req.rel != resp.rel {
262 0 : return Err(tonic::Status::internal(format!(
263 0 : "shard {} returned wrong relation, expected {} got {}",
264 0 : shard.id, req.rel, resp.rel
265 0 : )));
266 0 : }
267 0 : if !req
268 0 : .block_numbers
269 0 : .iter()
270 0 : .copied()
271 0 : .eq(resp.pages.iter().map(|p| p.block_number))
272 : {
273 0 : return Err(tonic::Status::internal(format!(
274 0 : "shard {} returned wrong pages, expected {:?} got {:?}",
275 : shard.id,
276 : req.block_numbers,
277 0 : resp.pages
278 0 : .iter()
279 0 : .map(|page| page.block_number)
280 0 : .collect::<Vec<_>>()
281 : )));
282 0 : }
283 :
284 0 : Ok(resp)
285 0 : }
286 :
287 : /// Returns the size of a relation, as # of blocks.
288 : #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
289 : pub async fn get_rel_size(
290 : &self,
291 : req: page_api::GetRelSizeRequest,
292 : ) -> tonic::Result<page_api::GetRelSizeResponse> {
293 : self.retry
294 0 : .with(async |_| {
295 : // Relation metadata is only available on shard 0.
296 0 : let mut client = self.shards.load_full().get_zero().client().await?;
297 0 : client.get_rel_size(req).await
298 0 : })
299 : .await
300 : }
301 :
302 : /// Fetches an SLRU segment.
303 : #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
304 : pub async fn get_slru_segment(
305 : &self,
306 : req: page_api::GetSlruSegmentRequest,
307 : ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
308 : self.retry
309 0 : .with(async |_| {
310 : // SLRU segments are only available on shard 0.
311 0 : let mut client = self.shards.load_full().get_zero().client().await?;
312 0 : client.get_slru_segment(req).await
313 0 : })
314 : .await
315 : }
316 : }
317 :
318 : /// Shard specification for a PageserverClient.
319 : pub struct ShardSpec {
320 : /// Maps shard indices to gRPC URLs.
321 : ///
322 : /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
323 : /// INVARIANT: every URL is valid and uses grpc:// scheme.
324 : urls: HashMap<ShardIndex, String>,
325 : /// The shard count.
326 : ///
327 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
328 : count: ShardCount,
329 : /// The stripe size for these shards.
330 : stripe_size: ShardStripeSize,
331 : }
332 :
333 : impl ShardSpec {
334 : /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
335 : /// The stripe size may be omitted for unsharded tenants.
336 0 : pub fn new(
337 0 : urls: HashMap<ShardIndex, String>,
338 0 : stripe_size: Option<ShardStripeSize>,
339 0 : ) -> anyhow::Result<Self> {
340 : // Compute the shard count.
341 0 : let count = match urls.len() {
342 0 : 0 => return Err(anyhow!("no shards provided")),
343 0 : 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
344 0 : n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
345 0 : n => ShardCount::new(n as u8),
346 : };
347 :
348 : // Determine the stripe size. It doesn't matter for unsharded tenants.
349 0 : if stripe_size.is_none() && !count.is_unsharded() {
350 0 : return Err(anyhow!("stripe size must be given for sharded tenants"));
351 0 : }
352 0 : let stripe_size = stripe_size.unwrap_or_default();
353 :
354 : // Validate the shard spec.
355 0 : for (shard_id, url) in &urls {
356 : // The shard index must match the computed shard count, even for unsharded tenants.
357 0 : if shard_id.shard_count != count {
358 0 : return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
359 0 : }
360 : // The shard index' number and count must be consistent.
361 0 : if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
362 0 : return Err(anyhow!("invalid shard index {shard_id}"));
363 0 : }
364 : // The above conditions guarantee that we have all shards 0..count: len() matches count,
365 : // shard number < count, and numbers are unique (via hashmap).
366 :
367 : // Validate the URL.
368 0 : if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
369 0 : return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
370 0 : }
371 : }
372 :
373 0 : Ok(Self {
374 0 : urls,
375 0 : count,
376 0 : stripe_size,
377 0 : })
378 0 : }
379 : }
380 :
381 : /// Tracks the tenant's shards.
382 : struct Shards {
383 : /// Shards by shard index.
384 : ///
385 : /// INVARIANT: every shard 0..count is present.
386 : /// INVARIANT: shard 0 is always present.
387 : by_index: HashMap<ShardIndex, Shard>,
388 : /// The shard count.
389 : ///
390 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
391 : count: ShardCount,
392 : /// The stripe size. Only used for sharded tenants.
393 : stripe_size: ShardStripeSize,
394 : }
395 :
396 : impl Shards {
397 : /// Creates a new set of shards based on a shard spec.
398 0 : fn new(
399 0 : tenant_id: TenantId,
400 0 : timeline_id: TimelineId,
401 0 : shard_spec: ShardSpec,
402 0 : auth_token: Option<String>,
403 0 : compression: Option<CompressionEncoding>,
404 0 : ) -> anyhow::Result<Self> {
405 : // NB: the shard spec has already been validated when constructed.
406 0 : let mut shards = HashMap::with_capacity(shard_spec.urls.len());
407 0 : for (shard_id, url) in shard_spec.urls {
408 0 : shards.insert(
409 0 : shard_id,
410 0 : Shard::new(
411 0 : url,
412 0 : tenant_id,
413 0 : timeline_id,
414 0 : shard_id,
415 0 : auth_token.clone(),
416 0 : compression,
417 0 : )?,
418 : );
419 : }
420 :
421 0 : Ok(Self {
422 0 : by_index: shards,
423 0 : count: shard_spec.count,
424 0 : stripe_size: shard_spec.stripe_size,
425 0 : })
426 0 : }
427 :
428 : /// Looks up the given shard.
429 : #[allow(clippy::result_large_err)] // TODO: check perf impact
430 0 : fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
431 0 : self.by_index
432 0 : .get(&shard_id)
433 0 : .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
434 0 : }
435 :
436 : /// Returns shard 0.
437 0 : fn get_zero(&self) -> &Shard {
438 0 : self.get(ShardIndex::new(ShardNumber(0), self.count))
439 0 : .expect("always present")
440 0 : }
441 : }
442 :
443 : /// A single shard. Uses dedicated resource pools with the following structure:
444 : ///
445 : /// * Channel pool: unbounded.
446 : /// * Unary client pool: MAX_UNARY_CLIENTS.
447 : /// * Stream client pool: unbounded.
448 : /// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
449 : /// * Bulk channel pool: unbounded.
450 : /// * Bulk client pool: unbounded.
451 : /// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
452 : struct Shard {
453 : /// The shard ID.
454 : id: ShardIndex,
455 : /// Unary gRPC client pool.
456 : client_pool: Arc<ClientPool>,
457 : /// GetPage stream pool.
458 : stream_pool: Arc<StreamPool>,
459 : /// GetPage stream pool for bulk requests, e.g. prefetches.
460 : bulk_stream_pool: Arc<StreamPool>,
461 : }
462 :
463 : impl Shard {
464 : /// Creates a new shard. It has its own dedicated resource pools.
465 0 : fn new(
466 0 : url: String,
467 0 : tenant_id: TenantId,
468 0 : timeline_id: TimelineId,
469 0 : shard_id: ShardIndex,
470 0 : auth_token: Option<String>,
471 0 : compression: Option<CompressionEncoding>,
472 0 : ) -> anyhow::Result<Self> {
473 : // Common channel pool for unary and stream requests. Bounded by client/stream pools.
474 0 : let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
475 :
476 : // Client pool for unary requests.
477 0 : let client_pool = ClientPool::new(
478 0 : channel_pool.clone(),
479 0 : tenant_id,
480 0 : timeline_id,
481 0 : shard_id,
482 0 : auth_token.clone(),
483 0 : compression,
484 0 : Some(MAX_UNARY_CLIENTS),
485 : );
486 :
487 : // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
488 : // but shares a channel pool with it (as it's unbounded).
489 0 : let stream_pool = StreamPool::new(
490 0 : ClientPool::new(
491 0 : channel_pool.clone(),
492 0 : tenant_id,
493 0 : timeline_id,
494 0 : shard_id,
495 0 : auth_token.clone(),
496 0 : compression,
497 0 : None, // unbounded, limited by stream pool
498 : ),
499 0 : Some(MAX_STREAMS),
500 : MAX_STREAM_QUEUE_DEPTH,
501 : );
502 :
503 : // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
504 : // to avoid head-of-line blocking of latency-sensitive requests.
505 0 : let bulk_stream_pool = StreamPool::new(
506 0 : ClientPool::new(
507 0 : ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
508 0 : tenant_id,
509 0 : timeline_id,
510 0 : shard_id,
511 0 : auth_token,
512 0 : compression,
513 0 : None, // unbounded, limited by stream pool
514 : ),
515 0 : Some(MAX_BULK_STREAMS),
516 : MAX_BULK_STREAM_QUEUE_DEPTH,
517 : );
518 :
519 0 : Ok(Self {
520 0 : id: shard_id,
521 0 : client_pool,
522 0 : stream_pool,
523 0 : bulk_stream_pool,
524 0 : })
525 0 : }
526 :
527 : /// Returns a pooled client for this shard.
528 0 : async fn client(&self) -> tonic::Result<ClientGuard> {
529 0 : self.client_pool
530 0 : .get()
531 0 : .await
532 0 : .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
533 0 : }
534 :
535 : /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
536 : /// pool (e.g. for prefetches).
537 0 : async fn stream(&self, bulk: bool) -> StreamGuard {
538 0 : match bulk {
539 0 : false => self.stream_pool.get().await,
540 0 : true => self.bulk_stream_pool.get().await,
541 : }
542 0 : }
543 : }
|