LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - client.rs (source / functions) Coverage Total Hit
Test: 5713ff31fc16472ab3f92425989ca6addc3dcf9c.info Lines: 0.0 % 242 0
Test Date: 2025-07-30 16:18:19 Functions: 0.0 % 45 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::num::NonZero;
       3              : use std::pin::pin;
       4              : use std::sync::Arc;
       5              : use std::time::{Duration, Instant};
       6              : 
       7              : use anyhow::anyhow;
       8              : use arc_swap::ArcSwap;
       9              : use futures::stream::FuturesUnordered;
      10              : use futures::{FutureExt as _, StreamExt as _};
      11              : use tonic::codec::CompressionEncoding;
      12              : use tracing::{debug, instrument};
      13              : use utils::logging::warn_slow;
      14              : 
      15              : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
      16              : use crate::retry::Retry;
      17              : use compute_api::spec::PageserverProtocol;
      18              : use pageserver_page_api as page_api;
      19              : use pageserver_page_api::GetPageSplitter;
      20              : use utils::id::{TenantId, TimelineId};
      21              : use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
      22              : 
      23              : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
      24              : /// when full.
      25              : ///
      26              : /// Normal requests are small, and we don't pipeline them, so we can afford a large number of
      27              : /// streams per connection.
      28              : ///
      29              : /// TODO: tune all of these constants, and consider making them configurable.
      30              : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
      31              : 
      32              : /// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
      33              : /// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
      34              : /// transmission delays. This also concentrates large window sizes on a smaller set of
      35              : /// streams/connections, presumably reducing memory use.
      36              : const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
      37              : 
      38              : /// The batch size threshold at which a GetPage request will use the bulk stream pool.
      39              : ///
      40              : /// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
      41              : /// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
      42              : const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
      43              : 
      44              : /// The overall request call timeout, including retries and pool acquisition.
      45              : /// TODO: should we retry forever? Should the caller decide?
      46              : const CALL_TIMEOUT: Duration = Duration::from_secs(60);
      47              : 
      48              : /// The per-request (retry attempt) timeout, including any lazy connection establishment.
      49              : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      50              : 
      51              : /// The initial request retry backoff duration. The first retry does not back off.
      52              : /// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
      53              : const BASE_BACKOFF: Duration = Duration::from_millis(5);
      54              : 
      55              : /// The maximum request retry backoff duration.
      56              : const MAX_BACKOFF: Duration = Duration::from_secs(5);
      57              : 
      58              : /// Threshold and interval for warning about slow operation.
      59              : const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
      60              : 
      61              : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
      62              : /// basic `page_api::Client` gRPC client, and supports:
      63              : ///
      64              : /// * Sharded tenants across multiple Pageservers.
      65              : /// * Pooling of connections, clients, and streams for efficient resource use.
      66              : /// * Concurrent use by many callers.
      67              : /// * Internal handling of GetPage bidirectional streams.
      68              : /// * Automatic retries.
      69              : /// * Observability.
      70              : ///
      71              : /// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
      72              : /// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
      73              : /// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
      74              : /// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
      75              : /// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
      76              : /// one stream per backend, but without the TCP connection overhead. In the common case we expect
      77              : /// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
      78              : /// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
      79              : ///
      80              : /// TODO: this client does not support base backups or LSN leases, as these are only used by
      81              : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
      82              : pub struct PageserverClient {
      83              :     /// The tenant ID.
      84              :     tenant_id: TenantId,
      85              :     /// The timeline ID.
      86              :     timeline_id: TimelineId,
      87              :     /// The JWT auth token for this tenant, if any.
      88              :     auth_token: Option<String>,
      89              :     /// The compression to use, if any.
      90              :     compression: Option<CompressionEncoding>,
      91              :     /// The shards for this tenant.
      92              :     shards: ArcSwap<Shards>,
      93              : }
      94              : 
      95              : impl PageserverClient {
      96              :     /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
      97              :     /// in the shard spec, which must be complete and must use gRPC URLs.
      98            0 :     pub fn new(
      99            0 :         tenant_id: TenantId,
     100            0 :         timeline_id: TimelineId,
     101            0 :         shard_spec: ShardSpec,
     102            0 :         auth_token: Option<String>,
     103            0 :         compression: Option<CompressionEncoding>,
     104            0 :     ) -> anyhow::Result<Self> {
     105            0 :         let shards = Shards::new(
     106            0 :             tenant_id,
     107            0 :             timeline_id,
     108            0 :             shard_spec,
     109            0 :             auth_token.clone(),
     110            0 :             compression,
     111            0 :         )?;
     112            0 :         Ok(Self {
     113            0 :             tenant_id,
     114            0 :             timeline_id,
     115            0 :             auth_token,
     116            0 :             compression,
     117            0 :             shards: ArcSwap::new(Arc::new(shards)),
     118            0 :         })
     119            0 :     }
     120              : 
     121              :     /// Updates the shards from the given shard spec. In-flight requests will complete using the
     122              :     /// existing shards, but may retry with the new shards if they fail.
     123              :     ///
     124              :     /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
     125              :     /// properly spun down and dropped afterwards.
     126            0 :     pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
     127              :         // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
     128              :         // with concurrent updates, but that involves creating a new `Shards` on every attempt,
     129              :         // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
     130              :         // in the stack, and if they're violated then we already have problems elsewhere, so a
     131              :         // best-effort but possibly-racy check is okay here.
     132            0 :         let old = self.shards.load_full();
     133            0 :         if shard_spec.count < old.count {
     134            0 :             return Err(anyhow!(
     135            0 :                 "can't reduce shard count from {} to {}",
     136            0 :                 old.count,
     137            0 :                 shard_spec.count
     138            0 :             ));
     139            0 :         }
     140            0 :         if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
     141            0 :             return Err(anyhow!(
     142            0 :                 "can't change stripe size from {} to {}",
     143            0 :                 old.stripe_size.expect("always Some when sharded"),
     144            0 :                 shard_spec.stripe_size.expect("always Some when sharded")
     145            0 :             ));
     146            0 :         }
     147              : 
     148            0 :         let shards = Shards::new(
     149            0 :             self.tenant_id,
     150            0 :             self.timeline_id,
     151            0 :             shard_spec,
     152            0 :             self.auth_token.clone(),
     153            0 :             self.compression,
     154            0 :         )?;
     155            0 :         self.shards.store(Arc::new(shards));
     156            0 :         Ok(())
     157            0 :     }
     158              : 
     159              :     /// Returns the total size of a database, as # of bytes.
     160              :     #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
     161              :     pub async fn get_db_size(
     162              :         &self,
     163              :         req: page_api::GetDbSizeRequest,
     164              :     ) -> tonic::Result<page_api::GetDbSizeResponse> {
     165              :         debug!("sending request: {req:?}");
     166            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     167              :             // Relation metadata is only available on shard 0.
     168            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     169            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
     170            0 :         })
     171              :         .await?;
     172              :         debug!("received response: {resp:?}");
     173              :         Ok(resp)
     174              :     }
     175              : 
     176              :     /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
     177              :     /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
     178              :     /// shard boundaries, and assembles the responses.
     179              :     ///
     180              :     /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
     181              :     /// errors. All responses will have `GetPageStatusCode::Ok`.
     182              :     #[instrument(skip_all, fields(
     183              :         req_id = %req.request_id,
     184              :         class = %req.request_class,
     185              :         rel = %req.rel,
     186            0 :         blkno = %req.block_numbers[0],
     187              :         blks = %req.block_numbers.len(),
     188              :         lsn = %req.read_lsn,
     189              :     ))]
     190              :     pub async fn get_page(
     191              :         &self,
     192              :         req: page_api::GetPageRequest,
     193              :     ) -> tonic::Result<page_api::GetPageResponse> {
     194              :         // Make sure we have at least one page.
     195              :         if req.block_numbers.is_empty() {
     196              :             return Err(tonic::Status::invalid_argument("no block number"));
     197              :         }
     198              :         // The request attempt must be 0. The client will increment it internally.
     199              :         if req.request_id.attempt != 0 {
     200              :             return Err(tonic::Status::invalid_argument("request attempt must be 0"));
     201              :         }
     202              : 
     203              :         debug!("sending request: {req:?}");
     204              : 
     205              :         // The shards may change while we're fetching pages. We execute the request using a stable
     206              :         // view of the shards (especially important for requests that span shards), but retry the
     207              :         // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
     208              :         // retries and re-splits in some cases where requests span shards, but these are expected to
     209              :         // be rare.
     210              :         //
     211              :         // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
     212              :         // once we figure out how to handle these.
     213            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
     214            0 :             let mut req = req.clone();
     215            0 :             req.request_id.attempt = attempt as u32;
     216            0 :             let shards = self.shards.load_full();
     217            0 :             Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
     218            0 :         })
     219              :         .await?;
     220              : 
     221              :         debug!("received response: {resp:?}");
     222              :         Ok(resp)
     223              :     }
     224              : 
     225              :     /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
     226              :     /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
     227            0 :     async fn get_page_with_shards(
     228            0 :         req: page_api::GetPageRequest,
     229            0 :         shards: &Shards,
     230            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     231              :         // Fast path: request is for a single shard.
     232            0 :         if let Some(shard_id) =
     233            0 :             GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)?
     234              :         {
     235            0 :             return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
     236            0 :         }
     237              : 
     238              :         // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
     239              :         // reassemble the responses.
     240            0 :         let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?;
     241              : 
     242            0 :         let mut shard_requests = FuturesUnordered::new();
     243            0 :         for (shard_id, shard_req) in splitter.drain_requests() {
     244            0 :             let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
     245            0 :                 .map(move |result| result.map(|resp| (shard_id, resp)));
     246            0 :             shard_requests.push(future);
     247              :         }
     248              : 
     249            0 :         while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
     250            0 :             splitter.add_response(shard_id, shard_response)?;
     251              :         }
     252              : 
     253            0 :         Ok(splitter.collect_response()?)
     254            0 :     }
     255              : 
     256              :     /// Fetches pages on the given shard. Does not retry internally.
     257            0 :     async fn get_page_with_shard(
     258            0 :         req: page_api::GetPageRequest,
     259            0 :         shard: &Shard,
     260            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     261            0 :         let mut stream = shard.stream(Self::is_bulk(&req)).await?;
     262            0 :         let resp = stream.send(req.clone()).await?;
     263              : 
     264              :         // Convert per-request errors into a tonic::Status.
     265            0 :         if resp.status_code != page_api::GetPageStatusCode::Ok {
     266            0 :             return Err(tonic::Status::new(
     267            0 :                 resp.status_code.into(),
     268            0 :                 resp.reason.unwrap_or_else(|| String::from("unknown error")),
     269              :             ));
     270            0 :         }
     271              : 
     272              :         // Check that we received the expected pages.
     273            0 :         if req.rel != resp.rel {
     274            0 :             return Err(tonic::Status::internal(format!(
     275            0 :                 "shard {} returned wrong relation, expected {} got {}",
     276            0 :                 shard.id, req.rel, resp.rel
     277            0 :             )));
     278            0 :         }
     279            0 :         if !req
     280            0 :             .block_numbers
     281            0 :             .iter()
     282            0 :             .copied()
     283            0 :             .eq(resp.pages.iter().map(|p| p.block_number))
     284              :         {
     285            0 :             return Err(tonic::Status::internal(format!(
     286            0 :                 "shard {} returned wrong pages, expected {:?} got {:?}",
     287              :                 shard.id,
     288              :                 req.block_numbers,
     289            0 :                 resp.pages
     290            0 :                     .iter()
     291            0 :                     .map(|page| page.block_number)
     292            0 :                     .collect::<Vec<_>>()
     293              :             )));
     294            0 :         }
     295              : 
     296            0 :         Ok(resp)
     297            0 :     }
     298              : 
     299              :     /// Returns the size of a relation, as # of blocks.
     300              :     #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
     301              :     pub async fn get_rel_size(
     302              :         &self,
     303              :         req: page_api::GetRelSizeRequest,
     304              :     ) -> tonic::Result<page_api::GetRelSizeResponse> {
     305              :         debug!("sending request: {req:?}");
     306            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     307              :             // Relation metadata is only available on shard 0.
     308            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     309            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
     310            0 :         })
     311              :         .await?;
     312              :         debug!("received response: {resp:?}");
     313              :         Ok(resp)
     314              :     }
     315              : 
     316              :     /// Fetches an SLRU segment.
     317              :     #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
     318              :     pub async fn get_slru_segment(
     319              :         &self,
     320              :         req: page_api::GetSlruSegmentRequest,
     321              :     ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
     322              :         debug!("sending request: {req:?}");
     323            0 :         let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
     324              :             // SLRU segments are only available on shard 0.
     325            0 :             let mut client = self.shards.load_full().get_zero().client().await?;
     326            0 :             Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
     327            0 :         })
     328              :         .await?;
     329              :         debug!("received response: {resp:?}");
     330              :         Ok(resp)
     331              :     }
     332              : 
     333              :     /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
     334              :     /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
     335            0 :     async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
     336            0 :     where
     337            0 :         F: FnMut(usize) -> O, // pass attempt number, starting at 0
     338            0 :         O: Future<Output = tonic::Result<T>>,
     339            0 :     {
     340            0 :         Retry {
     341            0 :             timeout: Some(timeout),
     342            0 :             base_backoff: BASE_BACKOFF,
     343            0 :             max_backoff: MAX_BACKOFF,
     344            0 :         }
     345            0 :         .with(f)
     346            0 :         .await
     347            0 :     }
     348              : 
     349              :     /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
     350            0 :     async fn with_timeout<T>(
     351            0 :         timeout: Duration,
     352            0 :         f: impl Future<Output = tonic::Result<T>>,
     353            0 :     ) -> tonic::Result<T> {
     354            0 :         let started = Instant::now();
     355            0 :         tokio::time::timeout(timeout, f).await.map_err(|_| {
     356            0 :             tonic::Status::deadline_exceeded(format!(
     357            0 :                 "request timed out after {:.3}s",
     358            0 :                 started.elapsed().as_secs_f64()
     359              :             ))
     360            0 :         })?
     361            0 :     }
     362              : 
     363              :     /// Returns true if the request is considered a bulk request and should use the bulk pool.
     364            0 :     fn is_bulk(req: &page_api::GetPageRequest) -> bool {
     365            0 :         req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
     366            0 :     }
     367              : }
     368              : 
     369              : /// Shard specification for a PageserverClient.
     370              : pub struct ShardSpec {
     371              :     /// Maps shard indices to gRPC URLs.
     372              :     ///
     373              :     /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
     374              :     /// INVARIANT: every URL is valid and uses grpc:// scheme.
     375              :     urls: HashMap<ShardIndex, String>,
     376              :     /// The shard count.
     377              :     ///
     378              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     379              :     count: ShardCount,
     380              :     /// The stripe size for these shards.
     381              :     ///
     382              :     /// INVARIANT: None for unsharded tenants, Some for sharded.
     383              :     stripe_size: Option<ShardStripeSize>,
     384              : }
     385              : 
     386              : impl ShardSpec {
     387              :     /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
     388              :     /// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
     389            0 :     pub fn new(
     390            0 :         urls: HashMap<ShardIndex, String>,
     391            0 :         stripe_size: Option<ShardStripeSize>,
     392            0 :     ) -> anyhow::Result<Self> {
     393              :         // Compute the shard count.
     394            0 :         let count = match urls.len() {
     395            0 :             0 => return Err(anyhow!("no shards provided")),
     396            0 :             1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
     397            0 :             n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
     398            0 :             n => ShardCount::new(n as u8),
     399              :         };
     400              : 
     401              :         // Validate the stripe size.
     402            0 :         if stripe_size.is_none() && !count.is_unsharded() {
     403            0 :             return Err(anyhow!("stripe size must be given for sharded tenants"));
     404            0 :         }
     405            0 :         if stripe_size.is_some() && count.is_unsharded() {
     406            0 :             return Err(anyhow!("stripe size can't be given for unsharded tenants"));
     407            0 :         }
     408              : 
     409              :         // Validate the shard spec.
     410            0 :         for (shard_id, url) in &urls {
     411              :             // The shard index must match the computed shard count, even for unsharded tenants.
     412            0 :             if shard_id.shard_count != count {
     413            0 :                 return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
     414            0 :             }
     415              :             // The shard index' number and count must be consistent.
     416            0 :             if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
     417            0 :                 return Err(anyhow!("invalid shard index {shard_id}"));
     418            0 :             }
     419              :             // The above conditions guarantee that we have all shards 0..count: len() matches count,
     420              :             // shard number < count, and numbers are unique (via hashmap).
     421              : 
     422              :             // Validate the URL.
     423            0 :             if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
     424            0 :                 return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
     425            0 :             }
     426              :         }
     427              : 
     428            0 :         Ok(Self {
     429            0 :             urls,
     430            0 :             count,
     431            0 :             stripe_size,
     432            0 :         })
     433            0 :     }
     434              : }
     435              : 
     436              : /// Tracks the tenant's shards.
     437              : struct Shards {
     438              :     /// Shards by shard index.
     439              :     ///
     440              :     /// INVARIANT: every shard 0..count is present.
     441              :     /// INVARIANT: shard 0 is always present.
     442              :     by_index: HashMap<ShardIndex, Shard>,
     443              :     /// The shard count.
     444              :     ///
     445              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     446              :     count: ShardCount,
     447              :     /// The stripe size.
     448              :     ///
     449              :     /// INVARIANT: None for unsharded tenants, Some for sharded.
     450              :     stripe_size: Option<ShardStripeSize>,
     451              : }
     452              : 
     453              : impl Shards {
     454              :     /// Creates a new set of shards based on a shard spec.
     455            0 :     fn new(
     456            0 :         tenant_id: TenantId,
     457            0 :         timeline_id: TimelineId,
     458            0 :         shard_spec: ShardSpec,
     459            0 :         auth_token: Option<String>,
     460            0 :         compression: Option<CompressionEncoding>,
     461            0 :     ) -> anyhow::Result<Self> {
     462              :         // NB: the shard spec has already been validated when constructed.
     463            0 :         let mut shards = HashMap::with_capacity(shard_spec.urls.len());
     464            0 :         for (shard_id, url) in shard_spec.urls {
     465            0 :             shards.insert(
     466            0 :                 shard_id,
     467            0 :                 Shard::new(
     468            0 :                     url,
     469            0 :                     tenant_id,
     470            0 :                     timeline_id,
     471            0 :                     shard_id,
     472            0 :                     auth_token.clone(),
     473            0 :                     compression,
     474            0 :                 )?,
     475              :             );
     476              :         }
     477              : 
     478            0 :         Ok(Self {
     479            0 :             by_index: shards,
     480            0 :             count: shard_spec.count,
     481            0 :             stripe_size: shard_spec.stripe_size,
     482            0 :         })
     483            0 :     }
     484              : 
     485              :     /// Looks up the given shard.
     486              :     #[allow(clippy::result_large_err)] // TODO: check perf impact
     487            0 :     fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
     488            0 :         self.by_index
     489            0 :             .get(&shard_id)
     490            0 :             .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
     491            0 :     }
     492              : 
     493              :     /// Returns shard 0.
     494            0 :     fn get_zero(&self) -> &Shard {
     495            0 :         self.get(ShardIndex::new(ShardNumber(0), self.count))
     496            0 :             .expect("always present")
     497            0 :     }
     498              : }
     499              : 
     500              : /// A single shard. Has dedicated resource pools with the following structure:
     501              : ///
     502              : /// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
     503              : ///   * Client pool: unbounded.
     504              : ///     * Stream pool: unbounded.
     505              : /// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
     506              : ///   * Bulk client pool: unbounded.
     507              : ///     * Bulk stream pool: unbounded.
     508              : ///
     509              : /// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
     510              : /// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
     511              : /// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
     512              : /// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
     513              : /// similar (except for TCP transmission time).
     514              : ///
     515              : /// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
     516              : /// the code is fairly trivial, so we may as well keep them around for now in case we need them.
     517              : struct Shard {
     518              :     /// The shard ID.
     519              :     id: ShardIndex,
     520              :     /// Unary gRPC client pool.
     521              :     client_pool: Arc<ClientPool>,
     522              :     /// GetPage stream pool.
     523              :     stream_pool: Arc<StreamPool>,
     524              :     /// GetPage stream pool for bulk requests.
     525              :     bulk_stream_pool: Arc<StreamPool>,
     526              : }
     527              : 
     528              : impl Shard {
     529              :     /// Creates a new shard. It has its own dedicated resource pools.
     530            0 :     fn new(
     531            0 :         url: String,
     532            0 :         tenant_id: TenantId,
     533            0 :         timeline_id: TimelineId,
     534            0 :         shard_id: ShardIndex,
     535            0 :         auth_token: Option<String>,
     536            0 :         compression: Option<CompressionEncoding>,
     537            0 :     ) -> anyhow::Result<Self> {
     538              :         // Shard pools for unary requests and non-bulk GetPage requests.
     539            0 :         let client_pool = ClientPool::new(
     540            0 :             ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
     541            0 :             tenant_id,
     542            0 :             timeline_id,
     543            0 :             shard_id,
     544            0 :             auth_token.clone(),
     545            0 :             compression,
     546            0 :             None, // unbounded
     547              :         );
     548            0 :         let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
     549              : 
     550              :         // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
     551            0 :         let bulk_stream_pool = StreamPool::new(
     552            0 :             ClientPool::new(
     553            0 :                 ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
     554            0 :                 tenant_id,
     555            0 :                 timeline_id,
     556            0 :                 shard_id,
     557            0 :                 auth_token,
     558            0 :                 compression,
     559            0 :                 None, // unbounded,
     560              :             ),
     561            0 :             None, // unbounded
     562              :         );
     563              : 
     564            0 :         Ok(Self {
     565            0 :             id: shard_id,
     566            0 :             client_pool,
     567            0 :             stream_pool,
     568            0 :             bulk_stream_pool,
     569            0 :         })
     570            0 :     }
     571              : 
     572              :     /// Returns a pooled client for this shard.
     573              :     #[instrument(skip_all)]
     574              :     async fn client(&self) -> tonic::Result<ClientGuard> {
     575              :         warn_slow(
     576              :             "client pool acquisition",
     577              :             SLOW_THRESHOLD,
     578              :             pin!(self.client_pool.get()),
     579              :         )
     580              :         .await
     581              :     }
     582              : 
     583              :     /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
     584              :     #[instrument(skip_all, fields(bulk))]
     585              :     async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
     586              :         let pool = match bulk {
     587              :             false => &self.stream_pool,
     588              :             true => &self.bulk_stream_pool,
     589              :         };
     590              :         warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
     591              :     }
     592              : }
        

Generated by: LCOV version 2.1-beta