LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - client.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 248 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 49 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::num::NonZero;
       3              : use std::pin::pin;
       4              : use std::sync::Arc;
       5              : use std::time::{Duration, Instant};
       6              : 
       7              : use anyhow::anyhow;
       8              : use arc_swap::ArcSwap;
       9              : use futures::stream::FuturesUnordered;
      10              : use futures::{FutureExt as _, StreamExt as _};
      11              : use tonic::codec::CompressionEncoding;
      12              : use tracing::{debug, instrument};
      13              : use utils::logging::warn_slow;
      14              : 
      15              : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
      16              : use crate::retry::Retry;
      17              : use crate::split::GetPageSplitter;
      18              : use compute_api::spec::PageserverProtocol;
      19              : use pageserver_page_api as page_api;
      20              : use utils::id::{TenantId, TimelineId};
      21              : use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
      22              : 
      23              : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
      24              : /// when full.
      25              : ///
      26              : /// Normal requests are small, and we don't pipeline them, so we can afford a large number of
      27              : /// streams per connection.
      28              : ///
      29              : /// TODO: tune all of these constants, and consider making them configurable.
      30              : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
      31              : 
      32              : /// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
      33              : /// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
      34              : /// transmission delays. This also concentrates large window sizes on a smaller set of
      35              : /// streams/connections, presumably reducing memory use.
      36              : const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
      37              : 
      38              : /// The batch size threshold at which a GetPage request will use the bulk stream pool.
      39              : ///
      40              : /// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
      41              : /// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
      42              : const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
      43              : 
      44              : /// The overall request call timeout, including retries and pool acquisition.
      45              : /// TODO: should we retry forever? Should the caller decide?
      46              : const CALL_TIMEOUT: Duration = Duration::from_secs(60);
      47              : 
      48              : /// The per-request (retry attempt) timeout, including any lazy connection establishment.
      49              : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      50              : 
      51              : /// The initial request retry backoff duration. The first retry does not back off.
      52              : /// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
      53              : const BASE_BACKOFF: Duration = Duration::from_millis(5);
      54              : 
      55              : /// The maximum request retry backoff duration.
      56              : const MAX_BACKOFF: Duration = Duration::from_secs(5);
      57              : 
      58              : /// Threshold and interval for warning about slow operation.
      59              : const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
      60              : 
      61              : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
      62              : /// basic `page_api::Client` gRPC client, and supports:
      63              : ///
      64              : /// * Sharded tenants across multiple Pageservers.
      65              : /// * Pooling of connections, clients, and streams for efficient resource use.
      66              : /// * Concurrent use by many callers.
      67              : /// * Internal handling of GetPage bidirectional streams.
      68              : /// * Automatic retries.
      69              : /// * Observability.
      70              : ///
      71              : /// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
      72              : /// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
      73              : /// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
      74              : /// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
      75              : /// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
      76              : /// one stream per backend, but without the TCP connection overhead. In the common case we expect
      77              : /// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
      78              : /// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
      79              : ///
      80              : /// TODO: this client does not support base backups or LSN leases, as these are only used by
      81              : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
      82              : pub struct PageserverClient {
      83              :     /// The tenant ID.
      84              :     tenant_id: TenantId,
      85              :     /// The timeline ID.
      86              :     timeline_id: TimelineId,
      87              :     /// The JWT auth token for this tenant, if any.
      88              :     auth_token: Option<String>,
      89              :     /// The compression to use, if any.
      90              :     compression: Option<CompressionEncoding>,
      91              :     /// The shards for this tenant.
      92              :     shards: ArcSwap<Shards>,
      93              : }
      94              : 
      95              : impl PageserverClient {
      96              :     /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
      97              :     /// in the shard spec, which must be complete and must use gRPC URLs.
      98            0 :     pub fn new(
      99            0 :         tenant_id: TenantId,
     100            0 :         timeline_id: TimelineId,
     101            0 :         shard_spec: ShardSpec,
     102            0 :         auth_token: Option<String>,
     103            0 :         compression: Option<CompressionEncoding>,
     104            0 :     ) -> anyhow::Result<Self> {
     105            0 :         let shards = Shards::new(
     106            0 :             tenant_id,
     107            0 :             timeline_id,
     108            0 :             shard_spec,
     109            0 :             auth_token.clone(),
     110            0 :             compression,
     111            0 :         )?;
     112            0 :         Ok(Self {
     113            0 :             tenant_id,
     114            0 :             timeline_id,
     115            0 :             auth_token,
     116            0 :             compression,
     117            0 :             shards: ArcSwap::new(Arc::new(shards)),
     118            0 :         })
     119            0 :     }
     120              : 
     121              :     /// Updates the shards from the given shard spec. In-flight requests will complete using the
     122              :     /// existing shards, but may retry with the new shards if they fail.
     123              :     ///
     124              :     /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
     125              :     /// properly spun down and dropped afterwards.
     126            0 :     pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
     127              :         // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
     128              :         // with concurrent updates, but that involves creating a new `Shards` on every attempt,
     129              :         // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
     130              :         // in the stack, and if they're violated then we already have problems elsewhere, so a
     131              :         // best-effort but possibly-racy check is okay here.
     132            0 :         let old = self.shards.load_full();
     133            0 :         if shard_spec.count < old.count {
     134            0 :             return Err(anyhow!(
     135            0 :                 "can't reduce shard count from {} to {}",
     136            0 :                 old.count,
     137            0 :                 shard_spec.count
     138            0 :             ));
     139            0 :         }
     140            0 :         if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
     141            0 :             return Err(anyhow!(
     142            0 :                 "can't change stripe size from {} to {}",
     143            0 :                 old.stripe_size.expect("always Some when sharded"),
     144            0 :                 shard_spec.stripe_size.expect("always Some when sharded")
     145            0 :             ));
     146            0 :         }
     147              : 
     148            0 :         let shards = Shards::new(
     149            0 :             self.tenant_id,
     150            0 :             self.timeline_id,
     151            0 :             shard_spec,
     152            0 :             self.auth_token.clone(),
     153            0 :             self.compression,
     154            0 :         )?;
     155            0 :         self.shards.store(Arc::new(shards));
     156            0 :         Ok(())
     157            0 :     }
     158              : 
     159              :     /// Returns the total size of a database, as # of bytes.
     160              :     #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
     161              :     pub async fn get_db_size(
     162              :         &self,
     163              :         req: page_api::GetDbSizeRequest,
     164              :     ) -> tonic::Result<page_api::GetDbSizeResponse> {
     165              :         debug!("sending request: {req:?}");
     166            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     167              :             // Relation metadata is only available on shard 0.
     168            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     169            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
     170            0 :         })
     171              :         .await?;
     172              :         debug!("received response: {resp:?}");
     173              :         Ok(resp)
     174              :     }
     175              : 
     176              :     /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
     177              :     /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
     178              :     /// shard boundaries, and assembles the responses.
     179              :     ///
     180              :     /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
     181              :     /// errors. All responses will have `GetPageStatusCode::Ok`.
     182              :     #[instrument(skip_all, fields(
     183              :         req_id = %req.request_id,
     184              :         class = %req.request_class,
     185              :         rel = %req.rel,
     186            0 :         blkno = %req.block_numbers[0],
     187              :         blks = %req.block_numbers.len(),
     188              :         lsn = %req.read_lsn,
     189              :     ))]
     190              :     pub async fn get_page(
     191              :         &self,
     192              :         req: page_api::GetPageRequest,
     193              :     ) -> tonic::Result<page_api::GetPageResponse> {
     194              :         // Make sure we have at least one page.
     195              :         if req.block_numbers.is_empty() {
     196              :             return Err(tonic::Status::invalid_argument("no block number"));
     197              :         }
     198              :         // The request attempt must be 0. The client will increment it internally.
     199              :         if req.request_id.attempt != 0 {
     200              :             return Err(tonic::Status::invalid_argument("request attempt must be 0"));
     201              :         }
     202              : 
     203              :         debug!("sending request: {req:?}");
     204              : 
     205              :         // The shards may change while we're fetching pages. We execute the request using a stable
     206              :         // view of the shards (especially important for requests that span shards), but retry the
     207              :         // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
     208              :         // retries and re-splits in some cases where requests span shards, but these are expected to
     209              :         // be rare.
     210              :         //
     211              :         // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
     212              :         // once we figure out how to handle these.
     213            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
     214            0 :             let mut req = req.clone();
     215            0 :             req.request_id.attempt = attempt as u32;
     216            0 :             let shards = self.shards.load_full();
     217            0 :             Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
     218            0 :         })
     219              :         .await?;
     220              : 
     221              :         debug!("received response: {resp:?}");
     222              :         Ok(resp)
     223              :     }
     224              : 
     225              :     /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
     226              :     /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
     227            0 :     async fn get_page_with_shards(
     228            0 :         req: page_api::GetPageRequest,
     229            0 :         shards: &Shards,
     230            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     231              :         // Fast path: request is for a single shard.
     232            0 :         if let Some(shard_id) =
     233            0 :             GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
     234            0 :                 .map_err(|err| tonic::Status::internal(err.to_string()))?
     235              :         {
     236            0 :             return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
     237            0 :         }
     238              : 
     239              :         // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
     240              :         // reassemble the responses.
     241            0 :         let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
     242            0 :             .map_err(|err| tonic::Status::internal(err.to_string()))?;
     243              : 
     244            0 :         let mut shard_requests = FuturesUnordered::new();
     245            0 :         for (shard_id, shard_req) in splitter.drain_requests() {
     246            0 :             let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
     247            0 :                 .map(move |result| result.map(|resp| (shard_id, resp)));
     248            0 :             shard_requests.push(future);
     249              :         }
     250              : 
     251            0 :         while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
     252            0 :             splitter
     253            0 :                 .add_response(shard_id, shard_response)
     254            0 :                 .map_err(|err| tonic::Status::internal(err.to_string()))?;
     255              :         }
     256              : 
     257            0 :         splitter
     258            0 :             .get_response()
     259            0 :             .map_err(|err| tonic::Status::internal(err.to_string()))
     260            0 :     }
     261              : 
     262              :     /// Fetches pages on the given shard. Does not retry internally.
     263            0 :     async fn get_page_with_shard(
     264            0 :         req: page_api::GetPageRequest,
     265            0 :         shard: &Shard,
     266            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     267            0 :         let mut stream = shard.stream(Self::is_bulk(&req)).await?;
     268            0 :         let resp = stream.send(req.clone()).await?;
     269              : 
     270              :         // Convert per-request errors into a tonic::Status.
     271            0 :         if resp.status_code != page_api::GetPageStatusCode::Ok {
     272            0 :             return Err(tonic::Status::new(
     273            0 :                 resp.status_code.into(),
     274            0 :                 resp.reason.unwrap_or_else(|| String::from("unknown error")),
     275              :             ));
     276            0 :         }
     277              : 
     278              :         // Check that we received the expected pages.
     279            0 :         if req.rel != resp.rel {
     280            0 :             return Err(tonic::Status::internal(format!(
     281            0 :                 "shard {} returned wrong relation, expected {} got {}",
     282            0 :                 shard.id, req.rel, resp.rel
     283            0 :             )));
     284            0 :         }
     285            0 :         if !req
     286            0 :             .block_numbers
     287            0 :             .iter()
     288            0 :             .copied()
     289            0 :             .eq(resp.pages.iter().map(|p| p.block_number))
     290              :         {
     291            0 :             return Err(tonic::Status::internal(format!(
     292            0 :                 "shard {} returned wrong pages, expected {:?} got {:?}",
     293              :                 shard.id,
     294              :                 req.block_numbers,
     295            0 :                 resp.pages
     296            0 :                     .iter()
     297            0 :                     .map(|page| page.block_number)
     298            0 :                     .collect::<Vec<_>>()
     299              :             )));
     300            0 :         }
     301              : 
     302            0 :         Ok(resp)
     303            0 :     }
     304              : 
     305              :     /// Returns the size of a relation, as # of blocks.
     306              :     #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
     307              :     pub async fn get_rel_size(
     308              :         &self,
     309              :         req: page_api::GetRelSizeRequest,
     310              :     ) -> tonic::Result<page_api::GetRelSizeResponse> {
     311              :         debug!("sending request: {req:?}");
     312            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     313              :             // Relation metadata is only available on shard 0.
     314            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     315            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
     316            0 :         })
     317              :         .await?;
     318              :         debug!("received response: {resp:?}");
     319              :         Ok(resp)
     320              :     }
     321              : 
     322              :     /// Fetches an SLRU segment.
     323              :     #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
     324              :     pub async fn get_slru_segment(
     325              :         &self,
     326              :         req: page_api::GetSlruSegmentRequest,
     327              :     ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
     328              :         debug!("sending request: {req:?}");
     329            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     330              :             // SLRU segments are only available on shard 0.
     331            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     332            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
     333            0 :         })
     334              :         .await?;
     335              :         debug!("received response: {resp:?}");
     336              :         Ok(resp)
     337              :     }
     338              : 
     339              :     /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
     340              :     /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
     341            0 :     async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
     342            0 :     where
     343            0 :         F: FnMut(usize) -> O, // pass attempt number, starting at 0
     344            0 :         O: Future<Output = tonic::Result<T>>,
     345            0 :     {
     346            0 :         Retry {
     347            0 :             timeout: Some(timeout),
     348            0 :             base_backoff: BASE_BACKOFF,
     349            0 :             max_backoff: MAX_BACKOFF,
     350            0 :         }
     351            0 :         .with(f)
     352            0 :         .await
     353            0 :     }
     354              : 
     355              :     /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
     356            0 :     async fn with_timeout<T>(
     357            0 :         timeout: Duration,
     358            0 :         f: impl Future<Output = tonic::Result<T>>,
     359            0 :     ) -> tonic::Result<T> {
     360            0 :         let started = Instant::now();
     361            0 :         tokio::time::timeout(timeout, f).await.map_err(|_| {
     362            0 :             tonic::Status::deadline_exceeded(format!(
     363            0 :                 "request timed out after {:.3}s",
     364            0 :                 started.elapsed().as_secs_f64()
     365              :             ))
     366            0 :         })?
     367            0 :     }
     368              : 
     369              :     /// Returns true if the request is considered a bulk request and should use the bulk pool.
     370            0 :     fn is_bulk(req: &page_api::GetPageRequest) -> bool {
     371            0 :         req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
     372            0 :     }
     373              : }
     374              : 
     375              : /// Shard specification for a PageserverClient.
     376              : pub struct ShardSpec {
     377              :     /// Maps shard indices to gRPC URLs.
     378              :     ///
     379              :     /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
     380              :     /// INVARIANT: every URL is valid and uses grpc:// scheme.
     381              :     urls: HashMap<ShardIndex, String>,
     382              :     /// The shard count.
     383              :     ///
     384              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     385              :     count: ShardCount,
     386              :     /// The stripe size for these shards.
     387              :     ///
     388              :     /// INVARIANT: None for unsharded tenants, Some for sharded.
     389              :     stripe_size: Option<ShardStripeSize>,
     390              : }
     391              : 
     392              : impl ShardSpec {
     393              :     /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
     394              :     /// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
     395            0 :     pub fn new(
     396            0 :         urls: HashMap<ShardIndex, String>,
     397            0 :         stripe_size: Option<ShardStripeSize>,
     398            0 :     ) -> anyhow::Result<Self> {
     399              :         // Compute the shard count.
     400            0 :         let count = match urls.len() {
     401            0 :             0 => return Err(anyhow!("no shards provided")),
     402            0 :             1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
     403            0 :             n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
     404            0 :             n => ShardCount::new(n as u8),
     405              :         };
     406              : 
     407              :         // Validate the stripe size.
     408            0 :         if stripe_size.is_none() && !count.is_unsharded() {
     409            0 :             return Err(anyhow!("stripe size must be given for sharded tenants"));
     410            0 :         }
     411            0 :         if stripe_size.is_some() && count.is_unsharded() {
     412            0 :             return Err(anyhow!("stripe size can't be given for unsharded tenants"));
     413            0 :         }
     414              : 
     415              :         // Validate the shard spec.
     416            0 :         for (shard_id, url) in &urls {
     417              :             // The shard index must match the computed shard count, even for unsharded tenants.
     418            0 :             if shard_id.shard_count != count {
     419            0 :                 return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
     420            0 :             }
     421              :             // The shard index' number and count must be consistent.
     422            0 :             if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
     423            0 :                 return Err(anyhow!("invalid shard index {shard_id}"));
     424            0 :             }
     425              :             // The above conditions guarantee that we have all shards 0..count: len() matches count,
     426              :             // shard number < count, and numbers are unique (via hashmap).
     427              : 
     428              :             // Validate the URL.
     429            0 :             if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
     430            0 :                 return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
     431            0 :             }
     432              :         }
     433              : 
     434            0 :         Ok(Self {
     435            0 :             urls,
     436            0 :             count,
     437            0 :             stripe_size,
     438            0 :         })
     439            0 :     }
     440              : }
     441              : 
     442              : /// Tracks the tenant's shards.
     443              : struct Shards {
     444              :     /// Shards by shard index.
     445              :     ///
     446              :     /// INVARIANT: every shard 0..count is present.
     447              :     /// INVARIANT: shard 0 is always present.
     448              :     by_index: HashMap<ShardIndex, Shard>,
     449              :     /// The shard count.
     450              :     ///
     451              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     452              :     count: ShardCount,
     453              :     /// The stripe size.
     454              :     ///
     455              :     /// INVARIANT: None for unsharded tenants, Some for sharded.
     456              :     stripe_size: Option<ShardStripeSize>,
     457              : }
     458              : 
     459              : impl Shards {
     460              :     /// Creates a new set of shards based on a shard spec.
     461            0 :     fn new(
     462            0 :         tenant_id: TenantId,
     463            0 :         timeline_id: TimelineId,
     464            0 :         shard_spec: ShardSpec,
     465            0 :         auth_token: Option<String>,
     466            0 :         compression: Option<CompressionEncoding>,
     467            0 :     ) -> anyhow::Result<Self> {
     468              :         // NB: the shard spec has already been validated when constructed.
     469            0 :         let mut shards = HashMap::with_capacity(shard_spec.urls.len());
     470            0 :         for (shard_id, url) in shard_spec.urls {
     471            0 :             shards.insert(
     472            0 :                 shard_id,
     473            0 :                 Shard::new(
     474            0 :                     url,
     475            0 :                     tenant_id,
     476            0 :                     timeline_id,
     477            0 :                     shard_id,
     478            0 :                     auth_token.clone(),
     479            0 :                     compression,
     480            0 :                 )?,
     481              :             );
     482              :         }
     483              : 
     484            0 :         Ok(Self {
     485            0 :             by_index: shards,
     486            0 :             count: shard_spec.count,
     487            0 :             stripe_size: shard_spec.stripe_size,
     488            0 :         })
     489            0 :     }
     490              : 
     491              :     /// Looks up the given shard.
     492              :     #[allow(clippy::result_large_err)] // TODO: check perf impact
     493            0 :     fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
     494            0 :         self.by_index
     495            0 :             .get(&shard_id)
     496            0 :             .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
     497            0 :     }
     498              : 
     499              :     /// Returns shard 0.
     500            0 :     fn get_zero(&self) -> &Shard {
     501            0 :         self.get(ShardIndex::new(ShardNumber(0), self.count))
     502            0 :             .expect("always present")
     503            0 :     }
     504              : }
     505              : 
     506              : /// A single shard. Has dedicated resource pools with the following structure:
     507              : ///
     508              : /// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
     509              : ///   * Client pool: unbounded.
     510              : ///     * Stream pool: unbounded.
     511              : /// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
     512              : ///   * Bulk client pool: unbounded.
     513              : ///     * Bulk stream pool: unbounded.
     514              : ///
     515              : /// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
     516              : /// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
     517              : /// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
     518              : /// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
     519              : /// similar (except for TCP transmission time).
     520              : ///
     521              : /// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
     522              : /// the code is fairly trivial, so we may as well keep them around for now in case we need them.
     523              : struct Shard {
     524              :     /// The shard ID.
     525              :     id: ShardIndex,
     526              :     /// Unary gRPC client pool.
     527              :     client_pool: Arc<ClientPool>,
     528              :     /// GetPage stream pool.
     529              :     stream_pool: Arc<StreamPool>,
     530              :     /// GetPage stream pool for bulk requests.
     531              :     bulk_stream_pool: Arc<StreamPool>,
     532              : }
     533              : 
     534              : impl Shard {
     535              :     /// Creates a new shard. It has its own dedicated resource pools.
     536            0 :     fn new(
     537            0 :         url: String,
     538            0 :         tenant_id: TenantId,
     539            0 :         timeline_id: TimelineId,
     540            0 :         shard_id: ShardIndex,
     541            0 :         auth_token: Option<String>,
     542            0 :         compression: Option<CompressionEncoding>,
     543            0 :     ) -> anyhow::Result<Self> {
     544              :         // Shard pools for unary requests and non-bulk GetPage requests.
     545            0 :         let client_pool = ClientPool::new(
     546            0 :             ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
     547            0 :             tenant_id,
     548            0 :             timeline_id,
     549            0 :             shard_id,
     550            0 :             auth_token.clone(),
     551            0 :             compression,
     552            0 :             None, // unbounded
     553              :         );
     554            0 :         let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
     555              : 
     556              :         // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
     557            0 :         let bulk_stream_pool = StreamPool::new(
     558            0 :             ClientPool::new(
     559            0 :                 ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
     560            0 :                 tenant_id,
     561            0 :                 timeline_id,
     562            0 :                 shard_id,
     563            0 :                 auth_token,
     564            0 :                 compression,
     565            0 :                 None, // unbounded,
     566              :             ),
     567            0 :             None, // unbounded
     568              :         );
     569              : 
     570            0 :         Ok(Self {
     571            0 :             id: shard_id,
     572            0 :             client_pool,
     573            0 :             stream_pool,
     574            0 :             bulk_stream_pool,
     575            0 :         })
     576            0 :     }
     577              : 
     578              :     /// Returns a pooled client for this shard.
     579              :     #[instrument(skip_all)]
     580              :     async fn client(&self) -> tonic::Result<ClientGuard> {
     581              :         warn_slow(
     582              :             "client pool acquisition",
     583              :             SLOW_THRESHOLD,
     584              :             pin!(self.client_pool.get()),
     585              :         )
     586              :         .await
     587              :     }
     588              : 
     589              :     /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
     590              :     #[instrument(skip_all, fields(bulk))]
     591              :     async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
     592              :         let pool = match bulk {
     593              :             false => &self.stream_pool,
     594              :             true => &self.bulk_stream_pool,
     595              :         };
     596              :         warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
     597              :     }
     598              : }
        

Generated by: LCOV version 2.1-beta