LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - client.rs (source / functions) Coverage Total Hit
Test: 15f04989d2faf4ce76cecb56042184aca56ebae6.info Lines: 0.0 % 238 0
Test Date: 2025-07-14 11:50:36 Functions: 0.0 % 31 0

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::num::NonZero;
       3              : use std::sync::Arc;
       4              : 
       5              : use anyhow::anyhow;
       6              : use arc_swap::ArcSwap;
       7              : use futures::stream::FuturesUnordered;
       8              : use futures::{FutureExt as _, StreamExt as _};
       9              : use tonic::codec::CompressionEncoding;
      10              : use tracing::instrument;
      11              : 
      12              : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
      13              : use crate::retry::Retry;
      14              : use crate::split::GetPageSplitter;
      15              : use compute_api::spec::PageserverProtocol;
      16              : use pageserver_api::shard::ShardStripeSize;
      17              : use pageserver_page_api as page_api;
      18              : use utils::id::{TenantId, TimelineId};
      19              : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
      20              : 
      21              : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
      22              : /// when full.
      23              : ///
      24              : /// TODO: tune all of these constants, and consider making them configurable.
      25              : /// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
      26              : /// with only streams.
      27              : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
      28              : 
      29              : /// Max number of concurrent unary request clients per shard.
      30              : const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
      31              : 
      32              : /// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
      33              : /// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
      34              : const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
      35              : 
      36              : /// Max number of pipelined requests per stream.
      37              : const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
      38              : 
      39              : /// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
      40              : /// are more throughput-oriented, we have a smaller limit but higher queue depth.
      41              : const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
      42              : 
      43              : /// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
      44              : /// get a larger queue depth.
      45              : const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
      46              : 
      47              : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
      48              : /// basic `page_api::Client` gRPC client, and supports:
      49              : ///
      50              : /// * Sharded tenants across multiple Pageservers.
      51              : /// * Pooling of connections, clients, and streams for efficient resource use.
      52              : /// * Concurrent use by many callers.
      53              : /// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
      54              : /// * Automatic retries.
      55              : /// * Observability.
      56              : ///
      57              : /// TODO: this client does not support base backups or LSN leases, as these are only used by
      58              : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
      59              : pub struct PageserverClient {
      60              :     /// The tenant ID.
      61              :     tenant_id: TenantId,
      62              :     /// The timeline ID.
      63              :     timeline_id: TimelineId,
      64              :     /// The JWT auth token for this tenant, if any.
      65              :     auth_token: Option<String>,
      66              :     /// The compression to use, if any.
      67              :     compression: Option<CompressionEncoding>,
      68              :     /// The shards for this tenant.
      69              :     shards: ArcSwap<Shards>,
      70              :     /// The retry configuration.
      71              :     retry: Retry,
      72              : }
      73              : 
      74              : impl PageserverClient {
      75              :     /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
      76              :     /// in the shard spec, which must be complete and must use gRPC URLs.
      77            0 :     pub fn new(
      78            0 :         tenant_id: TenantId,
      79            0 :         timeline_id: TimelineId,
      80            0 :         shard_spec: ShardSpec,
      81            0 :         auth_token: Option<String>,
      82            0 :         compression: Option<CompressionEncoding>,
      83            0 :     ) -> anyhow::Result<Self> {
      84            0 :         let shards = Shards::new(
      85            0 :             tenant_id,
      86            0 :             timeline_id,
      87            0 :             shard_spec,
      88            0 :             auth_token.clone(),
      89            0 :             compression,
      90            0 :         )?;
      91            0 :         Ok(Self {
      92            0 :             tenant_id,
      93            0 :             timeline_id,
      94            0 :             auth_token,
      95            0 :             compression,
      96            0 :             shards: ArcSwap::new(Arc::new(shards)),
      97            0 :             retry: Retry,
      98            0 :         })
      99            0 :     }
     100              : 
     101              :     /// Updates the shards from the given shard spec. In-flight requests will complete using the
     102              :     /// existing shards, but may retry with the new shards if they fail.
     103              :     ///
     104              :     /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
     105              :     /// properly spun down and dropped afterwards.
     106            0 :     pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
     107              :         // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
     108              :         // with concurrent updates, but that involves creating a new `Shards` on every attempt,
     109              :         // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
     110              :         // in the stack, and if they're violated then we already have problems elsewhere, so a
     111              :         // best-effort but possibly-racy check is okay here.
     112            0 :         let old = self.shards.load_full();
     113            0 :         if shard_spec.count < old.count {
     114            0 :             return Err(anyhow!(
     115            0 :                 "can't reduce shard count from {} to {}",
     116            0 :                 old.count,
     117            0 :                 shard_spec.count
     118            0 :             ));
     119            0 :         }
     120            0 :         if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
     121            0 :             return Err(anyhow!(
     122            0 :                 "can't change stripe size from {} to {}",
     123            0 :                 old.stripe_size,
     124            0 :                 shard_spec.stripe_size
     125            0 :             ));
     126            0 :         }
     127              : 
     128            0 :         let shards = Shards::new(
     129            0 :             self.tenant_id,
     130            0 :             self.timeline_id,
     131            0 :             shard_spec,
     132            0 :             self.auth_token.clone(),
     133            0 :             self.compression,
     134            0 :         )?;
     135            0 :         self.shards.store(Arc::new(shards));
     136            0 :         Ok(())
     137            0 :     }
     138              : 
     139              :     /// Returns whether a relation exists.
     140              :     #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
     141              :     pub async fn check_rel_exists(
     142              :         &self,
     143              :         req: page_api::CheckRelExistsRequest,
     144              :     ) -> tonic::Result<page_api::CheckRelExistsResponse> {
     145              :         self.retry
     146            0 :             .with(async |_| {
     147              :                 // Relation metadata is only available on shard 0.
     148            0 :                 let mut client = self.shards.load_full().get_zero().client().await?;
     149            0 :                 client.check_rel_exists(req).await
     150            0 :             })
     151              :             .await
     152              :     }
     153              : 
     154              :     /// Returns the total size of a database, as # of bytes.
     155              :     #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
     156              :     pub async fn get_db_size(
     157              :         &self,
     158              :         req: page_api::GetDbSizeRequest,
     159              :     ) -> tonic::Result<page_api::GetDbSizeResponse> {
     160              :         self.retry
     161            0 :             .with(async |_| {
     162              :                 // Relation metadata is only available on shard 0.
     163            0 :                 let mut client = self.shards.load_full().get_zero().client().await?;
     164            0 :                 client.get_db_size(req).await
     165            0 :             })
     166              :             .await
     167              :     }
     168              : 
     169              :     /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
     170              :     /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
     171              :     /// shard boundaries, and assembles the responses.
     172              :     ///
     173              :     /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
     174              :     /// errors. All responses will have `GetPageStatusCode::Ok`.
     175              :     #[instrument(skip_all, fields(
     176              :         req_id = %req.request_id,
     177              :         class = %req.request_class,
     178              :         rel = %req.rel,
     179            0 :         blkno = %req.block_numbers[0],
     180              :         blks = %req.block_numbers.len(),
     181              :         lsn = %req.read_lsn,
     182              :     ))]
     183              :     pub async fn get_page(
     184              :         &self,
     185              :         req: page_api::GetPageRequest,
     186              :     ) -> tonic::Result<page_api::GetPageResponse> {
     187              :         // Make sure we have at least one page.
     188              :         if req.block_numbers.is_empty() {
     189              :             return Err(tonic::Status::invalid_argument("no block number"));
     190              :         }
     191              :         // The request attempt must be 0. The client will increment it internally.
     192              :         if req.request_id.attempt != 0 {
     193              :             return Err(tonic::Status::invalid_argument("request attempt must be 0"));
     194              :         }
     195              : 
     196              :         // The shards may change while we're fetching pages. We execute the request using a stable
     197              :         // view of the shards (especially important for requests that span shards), but retry the
     198              :         // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
     199              :         // retries and re-splits in some cases where requests span shards, but these are expected to
     200              :         // be rare.
     201              :         //
     202              :         // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
     203              :         // once we figure out how to handle these.
     204              :         self.retry
     205            0 :             .with(async |attempt| {
     206            0 :                 let mut req = req.clone();
     207            0 :                 req.request_id.attempt = attempt as u32;
     208            0 :                 Self::get_page_with_shards(req, &self.shards.load_full()).await
     209            0 :             })
     210              :             .await
     211              :     }
     212              : 
     213              :     /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
     214              :     /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
     215            0 :     async fn get_page_with_shards(
     216            0 :         req: page_api::GetPageRequest,
     217            0 :         shards: &Shards,
     218            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     219              :         // Fast path: request is for a single shard.
     220            0 :         if let Some(shard_id) =
     221            0 :             GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
     222              :         {
     223            0 :             return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
     224            0 :         }
     225              : 
     226              :         // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
     227              :         // reassemble the responses.
     228            0 :         let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
     229              : 
     230            0 :         let mut shard_requests = FuturesUnordered::new();
     231            0 :         for (shard_id, shard_req) in splitter.drain_requests() {
     232            0 :             let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
     233            0 :                 .map(move |result| result.map(|resp| (shard_id, resp)));
     234            0 :             shard_requests.push(future);
     235              :         }
     236              : 
     237            0 :         while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
     238            0 :             splitter.add_response(shard_id, shard_response)?;
     239              :         }
     240              : 
     241            0 :         splitter.get_response()
     242            0 :     }
     243              : 
     244              :     /// Fetches pages on the given shard. Does not retry internally.
     245            0 :     async fn get_page_with_shard(
     246            0 :         req: page_api::GetPageRequest,
     247            0 :         shard: &Shard,
     248            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     249            0 :         let stream = shard.stream(req.request_class.is_bulk()).await;
     250            0 :         let resp = stream.send(req.clone()).await?;
     251              : 
     252              :         // Convert per-request errors into a tonic::Status.
     253            0 :         if resp.status_code != page_api::GetPageStatusCode::Ok {
     254            0 :             return Err(tonic::Status::new(
     255            0 :                 resp.status_code.into(),
     256            0 :                 resp.reason.unwrap_or_else(|| String::from("unknown error")),
     257              :             ));
     258            0 :         }
     259              : 
     260              :         // Check that we received the expected pages.
     261            0 :         if req.rel != resp.rel {
     262            0 :             return Err(tonic::Status::internal(format!(
     263            0 :                 "shard {} returned wrong relation, expected {} got {}",
     264            0 :                 shard.id, req.rel, resp.rel
     265            0 :             )));
     266            0 :         }
     267            0 :         if !req
     268            0 :             .block_numbers
     269            0 :             .iter()
     270            0 :             .copied()
     271            0 :             .eq(resp.pages.iter().map(|p| p.block_number))
     272              :         {
     273            0 :             return Err(tonic::Status::internal(format!(
     274            0 :                 "shard {} returned wrong pages, expected {:?} got {:?}",
     275              :                 shard.id,
     276              :                 req.block_numbers,
     277            0 :                 resp.pages
     278            0 :                     .iter()
     279            0 :                     .map(|page| page.block_number)
     280            0 :                     .collect::<Vec<_>>()
     281              :             )));
     282            0 :         }
     283              : 
     284            0 :         Ok(resp)
     285            0 :     }
     286              : 
     287              :     /// Returns the size of a relation, as # of blocks.
     288              :     #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
     289              :     pub async fn get_rel_size(
     290              :         &self,
     291              :         req: page_api::GetRelSizeRequest,
     292              :     ) -> tonic::Result<page_api::GetRelSizeResponse> {
     293              :         self.retry
     294            0 :             .with(async |_| {
     295              :                 // Relation metadata is only available on shard 0.
     296            0 :                 let mut client = self.shards.load_full().get_zero().client().await?;
     297            0 :                 client.get_rel_size(req).await
     298            0 :             })
     299              :             .await
     300              :     }
     301              : 
     302              :     /// Fetches an SLRU segment.
     303              :     #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
     304              :     pub async fn get_slru_segment(
     305              :         &self,
     306              :         req: page_api::GetSlruSegmentRequest,
     307              :     ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
     308              :         self.retry
     309            0 :             .with(async |_| {
     310              :                 // SLRU segments are only available on shard 0.
     311            0 :                 let mut client = self.shards.load_full().get_zero().client().await?;
     312            0 :                 client.get_slru_segment(req).await
     313            0 :             })
     314              :             .await
     315              :     }
     316              : }
     317              : 
     318              : /// Shard specification for a PageserverClient.
     319              : pub struct ShardSpec {
     320              :     /// Maps shard indices to gRPC URLs.
     321              :     ///
     322              :     /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
     323              :     /// INVARIANT: every URL is valid and uses grpc:// scheme.
     324              :     urls: HashMap<ShardIndex, String>,
     325              :     /// The shard count.
     326              :     ///
     327              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     328              :     count: ShardCount,
     329              :     /// The stripe size for these shards.
     330              :     stripe_size: ShardStripeSize,
     331              : }
     332              : 
     333              : impl ShardSpec {
     334              :     /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
     335              :     /// The stripe size may be omitted for unsharded tenants.
     336            0 :     pub fn new(
     337            0 :         urls: HashMap<ShardIndex, String>,
     338            0 :         stripe_size: Option<ShardStripeSize>,
     339            0 :     ) -> anyhow::Result<Self> {
     340              :         // Compute the shard count.
     341            0 :         let count = match urls.len() {
     342            0 :             0 => return Err(anyhow!("no shards provided")),
     343            0 :             1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
     344            0 :             n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
     345            0 :             n => ShardCount::new(n as u8),
     346              :         };
     347              : 
     348              :         // Determine the stripe size. It doesn't matter for unsharded tenants.
     349            0 :         if stripe_size.is_none() && !count.is_unsharded() {
     350            0 :             return Err(anyhow!("stripe size must be given for sharded tenants"));
     351            0 :         }
     352            0 :         let stripe_size = stripe_size.unwrap_or_default();
     353              : 
     354              :         // Validate the shard spec.
     355            0 :         for (shard_id, url) in &urls {
     356              :             // The shard index must match the computed shard count, even for unsharded tenants.
     357            0 :             if shard_id.shard_count != count {
     358            0 :                 return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
     359            0 :             }
     360              :             // The shard index' number and count must be consistent.
     361            0 :             if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
     362            0 :                 return Err(anyhow!("invalid shard index {shard_id}"));
     363            0 :             }
     364              :             // The above conditions guarantee that we have all shards 0..count: len() matches count,
     365              :             // shard number < count, and numbers are unique (via hashmap).
     366              : 
     367              :             // Validate the URL.
     368            0 :             if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
     369            0 :                 return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
     370            0 :             }
     371              :         }
     372              : 
     373            0 :         Ok(Self {
     374            0 :             urls,
     375            0 :             count,
     376            0 :             stripe_size,
     377            0 :         })
     378            0 :     }
     379              : }
     380              : 
     381              : /// Tracks the tenant's shards.
     382              : struct Shards {
     383              :     /// Shards by shard index.
     384              :     ///
     385              :     /// INVARIANT: every shard 0..count is present.
     386              :     /// INVARIANT: shard 0 is always present.
     387              :     by_index: HashMap<ShardIndex, Shard>,
     388              :     /// The shard count.
     389              :     ///
     390              :     /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
     391              :     count: ShardCount,
     392              :     /// The stripe size. Only used for sharded tenants.
     393              :     stripe_size: ShardStripeSize,
     394              : }
     395              : 
     396              : impl Shards {
     397              :     /// Creates a new set of shards based on a shard spec.
     398            0 :     fn new(
     399            0 :         tenant_id: TenantId,
     400            0 :         timeline_id: TimelineId,
     401            0 :         shard_spec: ShardSpec,
     402            0 :         auth_token: Option<String>,
     403            0 :         compression: Option<CompressionEncoding>,
     404            0 :     ) -> anyhow::Result<Self> {
     405              :         // NB: the shard spec has already been validated when constructed.
     406            0 :         let mut shards = HashMap::with_capacity(shard_spec.urls.len());
     407            0 :         for (shard_id, url) in shard_spec.urls {
     408            0 :             shards.insert(
     409            0 :                 shard_id,
     410            0 :                 Shard::new(
     411            0 :                     url,
     412            0 :                     tenant_id,
     413            0 :                     timeline_id,
     414            0 :                     shard_id,
     415            0 :                     auth_token.clone(),
     416            0 :                     compression,
     417            0 :                 )?,
     418              :             );
     419              :         }
     420              : 
     421            0 :         Ok(Self {
     422            0 :             by_index: shards,
     423            0 :             count: shard_spec.count,
     424            0 :             stripe_size: shard_spec.stripe_size,
     425            0 :         })
     426            0 :     }
     427              : 
     428              :     /// Looks up the given shard.
     429              :     #[allow(clippy::result_large_err)] // TODO: check perf impact
     430            0 :     fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
     431            0 :         self.by_index
     432            0 :             .get(&shard_id)
     433            0 :             .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
     434            0 :     }
     435              : 
     436              :     /// Returns shard 0.
     437            0 :     fn get_zero(&self) -> &Shard {
     438            0 :         self.get(ShardIndex::new(ShardNumber(0), self.count))
     439            0 :             .expect("always present")
     440            0 :     }
     441              : }
     442              : 
     443              : /// A single shard. Uses dedicated resource pools with the following structure:
     444              : ///
     445              : /// * Channel pool: unbounded.
     446              : ///   * Unary client pool: MAX_UNARY_CLIENTS.
     447              : ///   * Stream client pool: unbounded.
     448              : ///     * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
     449              : /// * Bulk channel pool: unbounded.
     450              : ///   * Bulk client pool: unbounded.
     451              : ///     * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
     452              : struct Shard {
     453              :     /// The shard ID.
     454              :     id: ShardIndex,
     455              :     /// Unary gRPC client pool.
     456              :     client_pool: Arc<ClientPool>,
     457              :     /// GetPage stream pool.
     458              :     stream_pool: Arc<StreamPool>,
     459              :     /// GetPage stream pool for bulk requests, e.g. prefetches.
     460              :     bulk_stream_pool: Arc<StreamPool>,
     461              : }
     462              : 
     463              : impl Shard {
     464              :     /// Creates a new shard. It has its own dedicated resource pools.
     465            0 :     fn new(
     466            0 :         url: String,
     467            0 :         tenant_id: TenantId,
     468            0 :         timeline_id: TimelineId,
     469            0 :         shard_id: ShardIndex,
     470            0 :         auth_token: Option<String>,
     471            0 :         compression: Option<CompressionEncoding>,
     472            0 :     ) -> anyhow::Result<Self> {
     473              :         // Common channel pool for unary and stream requests. Bounded by client/stream pools.
     474            0 :         let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
     475              : 
     476              :         // Client pool for unary requests.
     477            0 :         let client_pool = ClientPool::new(
     478            0 :             channel_pool.clone(),
     479            0 :             tenant_id,
     480            0 :             timeline_id,
     481            0 :             shard_id,
     482            0 :             auth_token.clone(),
     483            0 :             compression,
     484            0 :             Some(MAX_UNARY_CLIENTS),
     485              :         );
     486              : 
     487              :         // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
     488              :         // but shares a channel pool with it (as it's unbounded).
     489            0 :         let stream_pool = StreamPool::new(
     490            0 :             ClientPool::new(
     491            0 :                 channel_pool.clone(),
     492            0 :                 tenant_id,
     493            0 :                 timeline_id,
     494            0 :                 shard_id,
     495            0 :                 auth_token.clone(),
     496            0 :                 compression,
     497            0 :                 None, // unbounded, limited by stream pool
     498              :             ),
     499            0 :             Some(MAX_STREAMS),
     500              :             MAX_STREAM_QUEUE_DEPTH,
     501              :         );
     502              : 
     503              :         // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
     504              :         // to avoid head-of-line blocking of latency-sensitive requests.
     505            0 :         let bulk_stream_pool = StreamPool::new(
     506            0 :             ClientPool::new(
     507            0 :                 ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
     508            0 :                 tenant_id,
     509            0 :                 timeline_id,
     510            0 :                 shard_id,
     511            0 :                 auth_token,
     512            0 :                 compression,
     513            0 :                 None, // unbounded, limited by stream pool
     514              :             ),
     515            0 :             Some(MAX_BULK_STREAMS),
     516              :             MAX_BULK_STREAM_QUEUE_DEPTH,
     517              :         );
     518              : 
     519            0 :         Ok(Self {
     520            0 :             id: shard_id,
     521            0 :             client_pool,
     522            0 :             stream_pool,
     523            0 :             bulk_stream_pool,
     524            0 :         })
     525            0 :     }
     526              : 
     527              :     /// Returns a pooled client for this shard.
     528            0 :     async fn client(&self) -> tonic::Result<ClientGuard> {
     529            0 :         self.client_pool
     530            0 :             .get()
     531            0 :             .await
     532            0 :             .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
     533            0 :     }
     534              : 
     535              :     /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
     536              :     /// pool (e.g. for prefetches).
     537            0 :     async fn stream(&self, bulk: bool) -> StreamGuard {
     538            0 :         match bulk {
     539            0 :             false => self.stream_pool.get().await,
     540            0 :             true => self.bulk_stream_pool.get().await,
     541              :         }
     542            0 :     }
     543              : }
        

Generated by: LCOV version 2.1-beta