LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - client.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 244 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 52 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::num::NonZero;
       3              : use std::pin::pin;
       4              : use std::sync::Arc;
       5              : use std::time::{Duration, Instant};
       6              : 
       7              : use anyhow::anyhow;
       8              : use arc_swap::ArcSwap;
       9              : use futures::stream::FuturesUnordered;
      10              : use futures::{FutureExt as _, StreamExt as _};
      11              : use tonic::codec::CompressionEncoding;
      12              : use tracing::{debug, instrument};
      13              : use utils::logging::warn_slow;
      14              : 
      15              : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
      16              : use crate::retry::Retry;
      17              : use crate::split::GetPageSplitter;
      18              : use compute_api::spec::PageserverProtocol;
      19              : use pageserver_api::shard::ShardStripeSize;
      20              : use pageserver_page_api as page_api;
      21              : use utils::id::{TenantId, TimelineId};
      22              : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
      23              : 
      24              : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
      25              : /// when full.
      26              : ///
      27              : /// Normal requests are small, and we don't pipeline them, so we can afford a large number of
      28              : /// streams per connection.
      29              : ///
      30              : /// TODO: tune all of these constants, and consider making them configurable.
      31              : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
      32              : 
      33              : /// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
      34              : /// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
      35              : /// transmission delays. This also concentrates large window sizes on a smaller set of
      36              : /// streams/connections, presumably reducing memory use.
      37              : const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
      38              : 
      39              : /// The batch size threshold at which a GetPage request will use the bulk stream pool.
      40              : ///
      41              : /// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
      42              : /// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
      43              : const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
      44              : 
      45              : /// The overall request call timeout, including retries and pool acquisition.
      46              : /// TODO: should we retry forever? Should the caller decide?
      47              : const CALL_TIMEOUT: Duration = Duration::from_secs(60);
      48              : 
      49              : /// The per-request (retry attempt) timeout, including any lazy connection establishment.
      50              : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      51              : 
      52              : /// The initial request retry backoff duration. The first retry does not back off.
      53              : /// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
      54              : const BASE_BACKOFF: Duration = Duration::from_millis(5);
      55              : 
      56              : /// The maximum request retry backoff duration.
      57              : const MAX_BACKOFF: Duration = Duration::from_secs(5);
      58              : 
      59              : /// Threshold and interval for warning about slow operation.
      60              : const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
      61              : 
      62              : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
      63              : /// basic `page_api::Client` gRPC client, and supports:
      64              : ///
      65              : /// * Sharded tenants across multiple Pageservers.
      66              : /// * Pooling of connections, clients, and streams for efficient resource use.
      67              : /// * Concurrent use by many callers.
      68              : /// * Internal handling of GetPage bidirectional streams.
      69              : /// * Automatic retries.
      70              : /// * Observability.
      71              : ///
      72              : /// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
      73              : /// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
      74              : /// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
      75              : /// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
      76              : /// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
      77              : /// one stream per backend, but without the TCP connection overhead. In the common case we expect
      78              : /// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
      79              : /// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
      80              : ///
      81              : /// TODO: this client does not support base backups or LSN leases, as these are only used by
      82              : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
      83              : pub struct PageserverClient {
      84              :     /// The tenant ID.
      85              :     tenant_id: TenantId,
      86              :     /// The timeline ID.
      87              :     timeline_id: TimelineId,
      88              :     /// The JWT auth token for this tenant, if any.
      89              :     auth_token: Option<String>,
      90              :     /// The compression to use, if any.
      91              :     compression: Option<CompressionEncoding>,
      92              :     /// The shards for this tenant.
      93              :     shards: ArcSwap<Shards>,
      94              : }
      95              : 
      96              : impl PageserverClient {
      97              :     /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
      98              :     /// in the shard spec, which must be complete and must use gRPC URLs.
      99            0 :     pub fn new(
     100            0 :         tenant_id: TenantId,
     101            0 :         timeline_id: TimelineId,
     102            0 :         shard_spec: ShardSpec,
     103            0 :         auth_token: Option<String>,
     104            0 :         compression: Option<CompressionEncoding>,
     105            0 :     ) -> anyhow::Result<Self> {
     106            0 :         let shards = Shards::new(
     107            0 :             tenant_id,
     108            0 :             timeline_id,
     109            0 :             shard_spec,
     110            0 :             auth_token.clone(),
     111            0 :             compression,
     112            0 :         )?;
     113            0 :         Ok(Self {
     114            0 :             tenant_id,
     115            0 :             timeline_id,
     116            0 :             auth_token,
     117            0 :             compression,
     118            0 :             shards: ArcSwap::new(Arc::new(shards)),
     119            0 :         })
     120            0 :     }
     121              : 
     122              :     /// Updates the shards from the given shard spec. In-flight requests will complete using the
     123              :     /// existing shards, but may retry with the new shards if they fail.
     124              :     ///
     125              :     /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
     126              :     /// properly spun down and dropped afterwards.
     127            0 :     pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
     128              :         // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
     129              :         // with concurrent updates, but that involves creating a new `Shards` on every attempt,
     130              :         // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
     131              :         // in the stack, and if they're violated then we already have problems elsewhere, so a
     132              :         // best-effort but possibly-racy check is okay here.
     133            0 :         let old = self.shards.load_full();
     134            0 :         if shard_spec.count < old.count {
     135            0 :             return Err(anyhow!(
     136            0 :                 "can't reduce shard count from {} to {}",
     137            0 :                 old.count,
     138            0 :                 shard_spec.count
     139            0 :             ));
     140            0 :         }
     141            0 :         if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
     142            0 :             return Err(anyhow!(
     143            0 :                 "can't change stripe size from {} to {}",
     144            0 :                 old.stripe_size,
     145            0 :                 shard_spec.stripe_size
     146            0 :             ));
     147            0 :         }
     148              : 
     149            0 :         let shards = Shards::new(
     150            0 :             self.tenant_id,
     151            0 :             self.timeline_id,
     152            0 :             shard_spec,
     153            0 :             self.auth_token.clone(),
     154            0 :             self.compression,
     155            0 :         )?;
     156            0 :         self.shards.store(Arc::new(shards));
     157            0 :         Ok(())
     158            0 :     }
     159              : 
     160              :     /// Returns whether a relation exists.
     161              :     #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
     162              :     pub async fn check_rel_exists(
     163              :         &self,
     164              :         req: page_api::CheckRelExistsRequest,
     165              :     ) -> tonic::Result<page_api::CheckRelExistsResponse> {
     166              :         debug!("sending request: {req:?}");
     167            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     168              :             // Relation metadata is only available on shard 0.
     169            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     170            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
     171            0 :         })
     172              :         .await?;
     173              :         debug!("received response: {resp:?}");
     174              :         Ok(resp)
     175              :     }
     176              : 
     177              :     /// Returns the total size of a database, as # of bytes.
     178              :     #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
     179              :     pub async fn get_db_size(
     180              :         &self,
     181              :         req: page_api::GetDbSizeRequest,
     182              :     ) -> tonic::Result<page_api::GetDbSizeResponse> {
     183              :         debug!("sending request: {req:?}");
     184            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     185              :             // Relation metadata is only available on shard 0.
     186            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     187            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
     188            0 :         })
     189              :         .await?;
     190              :         debug!("received response: {resp:?}");
     191              :         Ok(resp)
     192              :     }
     193              : 
     194              :     /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
     195              :     /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
     196              :     /// shard boundaries, and assembles the responses.
     197              :     ///
     198              :     /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
     199              :     /// errors. All responses will have `GetPageStatusCode::Ok`.
     200              :     #[instrument(skip_all, fields(
     201              :         req_id = %req.request_id,
     202              :         class = %req.request_class,
     203              :         rel = %req.rel,
     204            0 :         blkno = %req.block_numbers[0],
     205              :         blks = %req.block_numbers.len(),
     206              :         lsn = %req.read_lsn,
     207              :     ))]
     208              :     pub async fn get_page(
     209              :         &self,
     210              :         req: page_api::GetPageRequest,
     211              :     ) -> tonic::Result<page_api::GetPageResponse> {
     212              :         // Make sure we have at least one page.
     213              :         if req.block_numbers.is_empty() {
     214              :             return Err(tonic::Status::invalid_argument("no block number"));
     215              :         }
     216              :         // The request attempt must be 0. The client will increment it internally.
     217              :         if req.request_id.attempt != 0 {
     218              :             return Err(tonic::Status::invalid_argument("request attempt must be 0"));
     219              :         }
     220              : 
     221              :         debug!("sending request: {req:?}");
     222              : 
     223              :         // The shards may change while we're fetching pages. We execute the request using a stable
     224              :         // view of the shards (especially important for requests that span shards), but retry the
     225              :         // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
     226              :         // retries and re-splits in some cases where requests span shards, but these are expected to
     227              :         // be rare.
     228              :         //
     229              :         // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
     230              :         // once we figure out how to handle these.
     231            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
     232            0 :             let mut req = req.clone();
     233            0 :             req.request_id.attempt = attempt as u32;
     234            0 :             let shards = self.shards.load_full();
     235            0 :             Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
     236            0 :         })
     237              :         .await?;
     238              : 
     239              :         debug!("received response: {resp:?}");
     240              :         Ok(resp)
     241              :     }
     242              : 
     243              :     /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
     244              :     /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
     245            0 :     async fn get_page_with_shards(
     246            0 :         req: page_api::GetPageRequest,
     247            0 :         shards: &Shards,
     248            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     249              :         // Fast path: request is for a single shard.
     250            0 :         if let Some(shard_id) =
     251            0 :             GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
     252              :         {
     253            0 :             return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
     254            0 :         }
     255              : 
     256              :         // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
     257              :         // reassemble the responses.
     258            0 :         let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
     259              : 
     260            0 :         let mut shard_requests = FuturesUnordered::new();
     261            0 :         for (shard_id, shard_req) in splitter.drain_requests() {
     262            0 :             let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
     263            0 :                 .map(move |result| result.map(|resp| (shard_id, resp)));
     264            0 :             shard_requests.push(future);
     265              :         }
     266              : 
     267            0 :         while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
     268            0 :             splitter.add_response(shard_id, shard_response)?;
     269              :         }
     270              : 
     271            0 :         splitter.get_response()
     272            0 :     }
     273              : 
     274              :     /// Fetches pages on the given shard. Does not retry internally.
     275            0 :     async fn get_page_with_shard(
     276            0 :         req: page_api::GetPageRequest,
     277            0 :         shard: &Shard,
     278            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     279            0 :         let mut stream = shard.stream(Self::is_bulk(&req)).await?;
     280            0 :         let resp = stream.send(req.clone()).await?;
     281              : 
     282              :         // Convert per-request errors into a tonic::Status.
     283            0 :         if resp.status_code != page_api::GetPageStatusCode::Ok {
     284            0 :             return Err(tonic::Status::new(
     285            0 :                 resp.status_code.into(),
     286            0 :                 resp.reason.unwrap_or_else(|| String::from("unknown error")),
     287              :             ));
     288            0 :         }
     289              : 
     290              :         // Check that we received the expected pages.
     291            0 :         if req.rel != resp.rel {
     292            0 :             return Err(tonic::Status::internal(format!(
     293            0 :                 "shard {} returned wrong relation, expected {} got {}",
     294            0 :                 shard.id, req.rel, resp.rel
     295            0 :             )));
     296            0 :         }
     297            0 :         if !req
     298            0 :             .block_numbers
     299            0 :             .iter()
     300            0 :             .copied()
     301            0 :             .eq(resp.pages.iter().map(|p| p.block_number))
     302              :         {
     303            0 :             return Err(tonic::Status::internal(format!(
     304            0 :                 "shard {} returned wrong pages, expected {:?} got {:?}",
     305              :                 shard.id,
     306              :                 req.block_numbers,
     307            0 :                 resp.pages
     308            0 :                     .iter()
     309            0 :                     .map(|page| page.block_number)
     310            0 :                     .collect::<Vec<_>>()
     311              :             )));
     312            0 :         }
     313              : 
     314            0 :         Ok(resp)
     315            0 :     }
     316              : 
     317              :     /// Returns the size of a relation, as # of blocks.
     318              :     #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
     319              :     pub async fn get_rel_size(
     320              :         &self,
     321              :         req: page_api::GetRelSizeRequest,
     322              :     ) -> tonic::Result<page_api::GetRelSizeResponse> {
     323              :         debug!("sending request: {req:?}");
     324            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     325              :             // Relation metadata is only available on shard 0.
     326            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     327            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
     328            0 :         })
     329              :         .await?;
     330              :         debug!("received response: {resp:?}");
     331              :         Ok(resp)
     332              :     }
     333              : 
     334              :     /// Fetches an SLRU segment.
     335              :     #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
     336              :     pub async fn get_slru_segment(
     337              :         &self,
     338              :         req: page_api::GetSlruSegmentRequest,
     339              :     ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
     340              :         debug!("sending request: {req:?}");
     341            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     342              :             // SLRU segments are only available on shard 0.
     343            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     344            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
     345            0 :         })
     346              :         .await?;
     347              :         debug!("received response: {resp:?}");
     348              :         Ok(resp)
     349              :     }
     350              : 
     351              :     /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
     352              :     /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
     353            0 :     async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
     354            0 :     where
     355            0 :         F: FnMut(usize) -> O, // pass attempt number, starting at 0
     356            0 :         O: Future<Output = tonic::Result<T>>,
     357            0 :     {
     358            0 :         Retry {
     359            0 :             timeout: Some(timeout),
     360            0 :             base_backoff: BASE_BACKOFF,
     361            0 :             max_backoff: MAX_BACKOFF,
     362            0 :         }
     363            0 :         .with(f)
     364            0 :         .await
     365            0 :     }
     366              : 
     367              :     /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
     368            0 :     async fn with_timeout<T>(
     369            0 :         timeout: Duration,
     370            0 :         f: impl Future<Output = tonic::Result<T>>,
     371            0 :     ) -> tonic::Result<T> {
     372            0 :         let started = Instant::now();
     373            0 :         tokio::time::timeout(timeout, f).await.map_err(|_| {
     374            0 :             tonic::Status::deadline_exceeded(format!(
     375            0 :                 "request timed out after {:.3}s",
     376            0 :                 started.elapsed().as_secs_f64()
     377              :             ))
     378            0 :         })?
     379            0 :     }
     380              : 
     381              :     /// Returns true if the request is considered a bulk request and should use the bulk pool.
     382            0 :     fn is_bulk(req: &page_api::GetPageRequest) -> bool {
     383            0 :         req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
     384            0 :     }
     385              : }
     386              : 
     387              : /// Shard specification for a PageserverClient.
     388              : pub struct ShardSpec {
     389              :     /// Maps shard indices to gRPC URLs.
     390              :     ///
     391              :     /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
     392              :     /// INVARIANT: every URL is valid and uses grpc:// scheme.
     393              :     urls: HashMap<ShardIndex, String>,
     394              :     /// The shard count.
     395              :     ///
     396              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     397              :     count: ShardCount,
     398              :     /// The stripe size for these shards.
     399              :     stripe_size: ShardStripeSize,
     400              : }
     401              : 
     402              : impl ShardSpec {
     403              :     /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
     404              :     /// The stripe size may be omitted for unsharded tenants.
     405            0 :     pub fn new(
     406            0 :         urls: HashMap<ShardIndex, String>,
     407            0 :         stripe_size: Option<ShardStripeSize>,
     408            0 :     ) -> anyhow::Result<Self> {
     409              :         // Compute the shard count.
     410            0 :         let count = match urls.len() {
     411            0 :             0 => return Err(anyhow!("no shards provided")),
     412            0 :             1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
     413            0 :             n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
     414            0 :             n => ShardCount::new(n as u8),
     415              :         };
     416              : 
     417              :         // Determine the stripe size. It doesn't matter for unsharded tenants.
     418            0 :         if stripe_size.is_none() && !count.is_unsharded() {
     419            0 :             return Err(anyhow!("stripe size must be given for sharded tenants"));
     420            0 :         }
     421            0 :         let stripe_size = stripe_size.unwrap_or_default();
     422              : 
     423              :         // Validate the shard spec.
     424            0 :         for (shard_id, url) in &urls {
     425              :             // The shard index must match the computed shard count, even for unsharded tenants.
     426            0 :             if shard_id.shard_count != count {
     427            0 :                 return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
     428            0 :             }
     429              :             // The shard index' number and count must be consistent.
     430            0 :             if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
     431            0 :                 return Err(anyhow!("invalid shard index {shard_id}"));
     432            0 :             }
     433              :             // The above conditions guarantee that we have all shards 0..count: len() matches count,
     434              :             // shard number < count, and numbers are unique (via hashmap).
     435              : 
     436              :             // Validate the URL.
     437            0 :             if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
     438            0 :                 return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
     439            0 :             }
     440              :         }
     441              : 
     442            0 :         Ok(Self {
     443            0 :             urls,
     444            0 :             count,
     445            0 :             stripe_size,
     446            0 :         })
     447            0 :     }
     448              : }
     449              : 
     450              : /// Tracks the tenant's shards.
     451              : struct Shards {
     452              :     /// Shards by shard index.
     453              :     ///
     454              :     /// INVARIANT: every shard 0..count is present.
     455              :     /// INVARIANT: shard 0 is always present.
     456              :     by_index: HashMap<ShardIndex, Shard>,
     457              :     /// The shard count.
     458              :     ///
     459              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     460              :     count: ShardCount,
     461              :     /// The stripe size. Only used for sharded tenants.
     462              :     stripe_size: ShardStripeSize,
     463              : }
     464              : 
     465              : impl Shards {
     466              :     /// Creates a new set of shards based on a shard spec.
     467            0 :     fn new(
     468            0 :         tenant_id: TenantId,
     469            0 :         timeline_id: TimelineId,
     470            0 :         shard_spec: ShardSpec,
     471            0 :         auth_token: Option<String>,
     472            0 :         compression: Option<CompressionEncoding>,
     473            0 :     ) -> anyhow::Result<Self> {
     474              :         // NB: the shard spec has already been validated when constructed.
     475            0 :         let mut shards = HashMap::with_capacity(shard_spec.urls.len());
     476            0 :         for (shard_id, url) in shard_spec.urls {
     477            0 :             shards.insert(
     478            0 :                 shard_id,
     479            0 :                 Shard::new(
     480            0 :                     url,
     481            0 :                     tenant_id,
     482            0 :                     timeline_id,
     483            0 :                     shard_id,
     484            0 :                     auth_token.clone(),
     485            0 :                     compression,
     486            0 :                 )?,
     487              :             );
     488              :         }
     489              : 
     490            0 :         Ok(Self {
     491            0 :             by_index: shards,
     492            0 :             count: shard_spec.count,
     493            0 :             stripe_size: shard_spec.stripe_size,
     494            0 :         })
     495            0 :     }
     496              : 
     497              :     /// Looks up the given shard.
     498              :     #[allow(clippy::result_large_err)] // TODO: check perf impact
     499            0 :     fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
     500            0 :         self.by_index
     501            0 :             .get(&shard_id)
     502            0 :             .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
     503            0 :     }
     504              : 
     505              :     /// Returns shard 0.
     506            0 :     fn get_zero(&self) -> &Shard {
     507            0 :         self.get(ShardIndex::new(ShardNumber(0), self.count))
     508            0 :             .expect("always present")
     509            0 :     }
     510              : }
     511              : 
     512              : /// A single shard. Has dedicated resource pools with the following structure:
     513              : ///
     514              : /// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
     515              : ///   * Client pool: unbounded.
     516              : ///     * Stream pool: unbounded.
     517              : /// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
     518              : ///   * Bulk client pool: unbounded.
     519              : ///     * Bulk stream pool: unbounded.
     520              : ///
     521              : /// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
     522              : /// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
     523              : /// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
     524              : /// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
     525              : /// similar (except for TCP transmission time).
     526              : ///
     527              : /// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
     528              : /// the code is fairly trivial, so we may as well keep them around for now in case we need them.
     529              : struct Shard {
     530              :     /// The shard ID.
     531              :     id: ShardIndex,
     532              :     /// Unary gRPC client pool.
     533              :     client_pool: Arc<ClientPool>,
     534              :     /// GetPage stream pool.
     535              :     stream_pool: Arc<StreamPool>,
     536              :     /// GetPage stream pool for bulk requests.
     537              :     bulk_stream_pool: Arc<StreamPool>,
     538              : }
     539              : 
     540              : impl Shard {
     541              :     /// Creates a new shard. It has its own dedicated resource pools.
     542            0 :     fn new(
     543            0 :         url: String,
     544            0 :         tenant_id: TenantId,
     545            0 :         timeline_id: TimelineId,
     546            0 :         shard_id: ShardIndex,
     547            0 :         auth_token: Option<String>,
     548            0 :         compression: Option<CompressionEncoding>,
     549            0 :     ) -> anyhow::Result<Self> {
     550              :         // Shard pools for unary requests and non-bulk GetPage requests.
     551            0 :         let client_pool = ClientPool::new(
     552            0 :             ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
     553            0 :             tenant_id,
     554            0 :             timeline_id,
     555            0 :             shard_id,
     556            0 :             auth_token.clone(),
     557            0 :             compression,
     558            0 :             None, // unbounded
     559              :         );
     560            0 :         let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
     561              : 
     562              :         // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
     563            0 :         let bulk_stream_pool = StreamPool::new(
     564            0 :             ClientPool::new(
     565            0 :                 ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
     566            0 :                 tenant_id,
     567            0 :                 timeline_id,
     568            0 :                 shard_id,
     569            0 :                 auth_token,
     570            0 :                 compression,
     571            0 :                 None, // unbounded,
     572              :             ),
     573            0 :             None, // unbounded
     574              :         );
     575              : 
     576            0 :         Ok(Self {
     577            0 :             id: shard_id,
     578            0 :             client_pool,
     579            0 :             stream_pool,
     580            0 :             bulk_stream_pool,
     581            0 :         })
     582            0 :     }
     583              : 
     584              :     /// Returns a pooled client for this shard.
     585              :     #[instrument(skip_all)]
     586              :     async fn client(&self) -> tonic::Result<ClientGuard> {
     587              :         warn_slow(
     588              :             "client pool acquisition",
     589              :             SLOW_THRESHOLD,
     590              :             pin!(self.client_pool.get()),
     591              :         )
     592              :         .await
     593              :     }
     594              : 
     595              :     /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
     596              :     #[instrument(skip_all, fields(bulk))]
     597              :     async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
     598              :         let pool = match bulk {
     599              :             false => &self.stream_pool,
     600              :             true => &self.bulk_stream_pool,
     601              :         };
     602              :         warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
     603              :     }
     604              : }
        

Generated by: LCOV version 2.1-beta