Line data Source code
1 : use std::collections::HashMap;
2 : use std::num::NonZero;
3 : use std::pin::pin;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant};
6 :
7 : use anyhow::anyhow;
8 : use arc_swap::ArcSwap;
9 : use futures::stream::FuturesUnordered;
10 : use futures::{FutureExt as _, StreamExt as _};
11 : use tonic::codec::CompressionEncoding;
12 : use tracing::{debug, instrument};
13 : use utils::logging::warn_slow;
14 :
15 : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
16 : use crate::retry::Retry;
17 : use compute_api::spec::PageserverProtocol;
18 : use pageserver_page_api as page_api;
19 : use pageserver_page_api::GetPageSplitter;
20 : use utils::id::{TenantId, TimelineId};
21 : use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
22 :
23 : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
24 : /// when full.
25 : ///
26 : /// Normal requests are small, and we don't pipeline them, so we can afford a large number of
27 : /// streams per connection.
28 : ///
29 : /// TODO: tune all of these constants, and consider making them configurable.
30 : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
31 :
32 : /// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
33 : /// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
34 : /// transmission delays. This also concentrates large window sizes on a smaller set of
35 : /// streams/connections, presumably reducing memory use.
36 : const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
37 :
38 : /// The batch size threshold at which a GetPage request will use the bulk stream pool.
39 : ///
40 : /// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
41 : /// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
42 : const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
43 :
44 : /// The overall request call timeout, including retries and pool acquisition.
45 : /// TODO: should we retry forever? Should the caller decide?
46 : const CALL_TIMEOUT: Duration = Duration::from_secs(60);
47 :
48 : /// The per-request (retry attempt) timeout, including any lazy connection establishment.
49 : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
50 :
51 : /// The initial request retry backoff duration. The first retry does not back off.
52 : /// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
53 : const BASE_BACKOFF: Duration = Duration::from_millis(5);
54 :
55 : /// The maximum request retry backoff duration.
56 : const MAX_BACKOFF: Duration = Duration::from_secs(5);
57 :
58 : /// Threshold and interval for warning about slow operation.
59 : const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
60 :
61 : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
62 : /// basic `page_api::Client` gRPC client, and supports:
63 : ///
64 : /// * Sharded tenants across multiple Pageservers.
65 : /// * Pooling of connections, clients, and streams for efficient resource use.
66 : /// * Concurrent use by many callers.
67 : /// * Internal handling of GetPage bidirectional streams.
68 : /// * Automatic retries.
69 : /// * Observability.
70 : ///
71 : /// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
72 : /// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
73 : /// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
74 : /// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
75 : /// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
76 : /// one stream per backend, but without the TCP connection overhead. In the common case we expect
77 : /// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
78 : /// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
79 : ///
80 : /// TODO: this client does not support base backups or LSN leases, as these are only used by
81 : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
82 : pub struct PageserverClient {
83 : /// The tenant ID.
84 : tenant_id: TenantId,
85 : /// The timeline ID.
86 : timeline_id: TimelineId,
87 : /// The JWT auth token for this tenant, if any.
88 : auth_token: Option<String>,
89 : /// The compression to use, if any.
90 : compression: Option<CompressionEncoding>,
91 : /// The shards for this tenant.
92 : shards: ArcSwap<Shards>,
93 : }
94 :
95 : impl PageserverClient {
96 : /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
97 : /// in the shard spec, which must be complete and must use gRPC URLs.
98 0 : pub fn new(
99 0 : tenant_id: TenantId,
100 0 : timeline_id: TimelineId,
101 0 : shard_spec: ShardSpec,
102 0 : auth_token: Option<String>,
103 0 : compression: Option<CompressionEncoding>,
104 0 : ) -> anyhow::Result<Self> {
105 0 : let shards = Shards::new(
106 0 : tenant_id,
107 0 : timeline_id,
108 0 : shard_spec,
109 0 : auth_token.clone(),
110 0 : compression,
111 0 : )?;
112 0 : Ok(Self {
113 0 : tenant_id,
114 0 : timeline_id,
115 0 : auth_token,
116 0 : compression,
117 0 : shards: ArcSwap::new(Arc::new(shards)),
118 0 : })
119 0 : }
120 :
121 : /// Updates the shards from the given shard spec. In-flight requests will complete using the
122 : /// existing shards, but may retry with the new shards if they fail.
123 : ///
124 : /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
125 : /// properly spun down and dropped afterwards.
126 0 : pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
127 : // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
128 : // with concurrent updates, but that involves creating a new `Shards` on every attempt,
129 : // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
130 : // in the stack, and if they're violated then we already have problems elsewhere, so a
131 : // best-effort but possibly-racy check is okay here.
132 0 : let old = self.shards.load_full();
133 0 : if shard_spec.count < old.count {
134 0 : return Err(anyhow!(
135 0 : "can't reduce shard count from {} to {}",
136 0 : old.count,
137 0 : shard_spec.count
138 0 : ));
139 0 : }
140 0 : if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
141 0 : return Err(anyhow!(
142 0 : "can't change stripe size from {} to {}",
143 0 : old.stripe_size.expect("always Some when sharded"),
144 0 : shard_spec.stripe_size.expect("always Some when sharded")
145 0 : ));
146 0 : }
147 :
148 0 : let shards = Shards::new(
149 0 : self.tenant_id,
150 0 : self.timeline_id,
151 0 : shard_spec,
152 0 : self.auth_token.clone(),
153 0 : self.compression,
154 0 : )?;
155 0 : self.shards.store(Arc::new(shards));
156 0 : Ok(())
157 0 : }
158 :
159 : /// Returns the total size of a database, as # of bytes.
160 : #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
161 : pub async fn get_db_size(
162 : &self,
163 : req: page_api::GetDbSizeRequest,
164 : ) -> tonic::Result<page_api::GetDbSizeResponse> {
165 : debug!("sending request: {req:?}");
166 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
167 : // Relation metadata is only available on shard 0.
168 0 : let mut client = self.shards.load_full().get_zero().client().await?;
169 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
170 0 : })
171 : .await?;
172 : debug!("received response: {resp:?}");
173 : Ok(resp)
174 : }
175 :
176 : /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
177 : /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
178 : /// shard boundaries, and assembles the responses.
179 : ///
180 : /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
181 : /// errors. All responses will have `GetPageStatusCode::Ok`.
182 : #[instrument(skip_all, fields(
183 : req_id = %req.request_id,
184 : class = %req.request_class,
185 : rel = %req.rel,
186 0 : blkno = %req.block_numbers[0],
187 : blks = %req.block_numbers.len(),
188 : lsn = %req.read_lsn,
189 : ))]
190 : pub async fn get_page(
191 : &self,
192 : req: page_api::GetPageRequest,
193 : ) -> tonic::Result<page_api::GetPageResponse> {
194 : // Make sure we have at least one page.
195 : if req.block_numbers.is_empty() {
196 : return Err(tonic::Status::invalid_argument("no block number"));
197 : }
198 : // The request attempt must be 0. The client will increment it internally.
199 : if req.request_id.attempt != 0 {
200 : return Err(tonic::Status::invalid_argument("request attempt must be 0"));
201 : }
202 :
203 : debug!("sending request: {req:?}");
204 :
205 : // The shards may change while we're fetching pages. We execute the request using a stable
206 : // view of the shards (especially important for requests that span shards), but retry the
207 : // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
208 : // retries and re-splits in some cases where requests span shards, but these are expected to
209 : // be rare.
210 : //
211 : // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
212 : // once we figure out how to handle these.
213 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
214 0 : let mut req = req.clone();
215 0 : req.request_id.attempt = attempt as u32;
216 0 : let shards = self.shards.load_full();
217 0 : Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
218 0 : })
219 : .await?;
220 :
221 : debug!("received response: {resp:?}");
222 : Ok(resp)
223 : }
224 :
225 : /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
226 : /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
227 0 : async fn get_page_with_shards(
228 0 : req: page_api::GetPageRequest,
229 0 : shards: &Shards,
230 0 : ) -> tonic::Result<page_api::GetPageResponse> {
231 : // Fast path: request is for a single shard.
232 0 : if let Some(shard_id) =
233 0 : GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)?
234 : {
235 0 : return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
236 0 : }
237 :
238 : // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
239 : // reassemble the responses.
240 0 : let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)?;
241 :
242 0 : let mut shard_requests = FuturesUnordered::new();
243 0 : for (shard_id, shard_req) in splitter.drain_requests() {
244 0 : let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
245 0 : .map(move |result| result.map(|resp| (shard_id, resp)));
246 0 : shard_requests.push(future);
247 : }
248 :
249 0 : while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
250 0 : splitter.add_response(shard_id, shard_response)?;
251 : }
252 :
253 0 : Ok(splitter.collect_response()?)
254 0 : }
255 :
256 : /// Fetches pages on the given shard. Does not retry internally.
257 0 : async fn get_page_with_shard(
258 0 : req: page_api::GetPageRequest,
259 0 : shard: &Shard,
260 0 : ) -> tonic::Result<page_api::GetPageResponse> {
261 0 : let mut stream = shard.stream(Self::is_bulk(&req)).await?;
262 0 : let resp = stream.send(req.clone()).await?;
263 :
264 : // Convert per-request errors into a tonic::Status.
265 0 : if resp.status_code != page_api::GetPageStatusCode::Ok {
266 0 : return Err(tonic::Status::new(
267 0 : resp.status_code.into(),
268 0 : resp.reason.unwrap_or_else(|| String::from("unknown error")),
269 : ));
270 0 : }
271 :
272 : // Check that we received the expected pages.
273 0 : if req.rel != resp.rel {
274 0 : return Err(tonic::Status::internal(format!(
275 0 : "shard {} returned wrong relation, expected {} got {}",
276 0 : shard.id, req.rel, resp.rel
277 0 : )));
278 0 : }
279 0 : if !req
280 0 : .block_numbers
281 0 : .iter()
282 0 : .copied()
283 0 : .eq(resp.pages.iter().map(|p| p.block_number))
284 : {
285 0 : return Err(tonic::Status::internal(format!(
286 0 : "shard {} returned wrong pages, expected {:?} got {:?}",
287 : shard.id,
288 : req.block_numbers,
289 0 : resp.pages
290 0 : .iter()
291 0 : .map(|page| page.block_number)
292 0 : .collect::<Vec<_>>()
293 : )));
294 0 : }
295 :
296 0 : Ok(resp)
297 0 : }
298 :
299 : /// Returns the size of a relation, as # of blocks.
300 : #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
301 : pub async fn get_rel_size(
302 : &self,
303 : req: page_api::GetRelSizeRequest,
304 : ) -> tonic::Result<page_api::GetRelSizeResponse> {
305 : debug!("sending request: {req:?}");
306 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
307 : // Relation metadata is only available on shard 0.
308 0 : let mut client = self.shards.load_full().get_zero().client().await?;
309 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
310 0 : })
311 : .await?;
312 : debug!("received response: {resp:?}");
313 : Ok(resp)
314 : }
315 :
316 : /// Fetches an SLRU segment.
317 : #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
318 : pub async fn get_slru_segment(
319 : &self,
320 : req: page_api::GetSlruSegmentRequest,
321 : ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
322 : debug!("sending request: {req:?}");
323 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
324 : // SLRU segments are only available on shard 0.
325 0 : let mut client = self.shards.load_full().get_zero().client().await?;
326 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
327 0 : })
328 : .await?;
329 : debug!("received response: {resp:?}");
330 : Ok(resp)
331 : }
332 :
333 : /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
334 : /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
335 0 : async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
336 0 : where
337 0 : F: FnMut(usize) -> O, // pass attempt number, starting at 0
338 0 : O: Future<Output = tonic::Result<T>>,
339 0 : {
340 0 : Retry {
341 0 : timeout: Some(timeout),
342 0 : base_backoff: BASE_BACKOFF,
343 0 : max_backoff: MAX_BACKOFF,
344 0 : }
345 0 : .with(f)
346 0 : .await
347 0 : }
348 :
349 : /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
350 0 : async fn with_timeout<T>(
351 0 : timeout: Duration,
352 0 : f: impl Future<Output = tonic::Result<T>>,
353 0 : ) -> tonic::Result<T> {
354 0 : let started = Instant::now();
355 0 : tokio::time::timeout(timeout, f).await.map_err(|_| {
356 0 : tonic::Status::deadline_exceeded(format!(
357 0 : "request timed out after {:.3}s",
358 0 : started.elapsed().as_secs_f64()
359 : ))
360 0 : })?
361 0 : }
362 :
363 : /// Returns true if the request is considered a bulk request and should use the bulk pool.
364 0 : fn is_bulk(req: &page_api::GetPageRequest) -> bool {
365 0 : req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
366 0 : }
367 : }
368 :
369 : /// Shard specification for a PageserverClient.
370 : pub struct ShardSpec {
371 : /// Maps shard indices to gRPC URLs.
372 : ///
373 : /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
374 : /// INVARIANT: every URL is valid and uses grpc:// scheme.
375 : urls: HashMap<ShardIndex, String>,
376 : /// The shard count.
377 : ///
378 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
379 : count: ShardCount,
380 : /// The stripe size for these shards.
381 : ///
382 : /// INVARIANT: None for unsharded tenants, Some for sharded.
383 : stripe_size: Option<ShardStripeSize>,
384 : }
385 :
386 : impl ShardSpec {
387 : /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
388 : /// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
389 0 : pub fn new(
390 0 : urls: HashMap<ShardIndex, String>,
391 0 : stripe_size: Option<ShardStripeSize>,
392 0 : ) -> anyhow::Result<Self> {
393 : // Compute the shard count.
394 0 : let count = match urls.len() {
395 0 : 0 => return Err(anyhow!("no shards provided")),
396 0 : 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
397 0 : n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
398 0 : n => ShardCount::new(n as u8),
399 : };
400 :
401 : // Validate the stripe size.
402 0 : if stripe_size.is_none() && !count.is_unsharded() {
403 0 : return Err(anyhow!("stripe size must be given for sharded tenants"));
404 0 : }
405 0 : if stripe_size.is_some() && count.is_unsharded() {
406 0 : return Err(anyhow!("stripe size can't be given for unsharded tenants"));
407 0 : }
408 :
409 : // Validate the shard spec.
410 0 : for (shard_id, url) in &urls {
411 : // The shard index must match the computed shard count, even for unsharded tenants.
412 0 : if shard_id.shard_count != count {
413 0 : return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
414 0 : }
415 : // The shard index' number and count must be consistent.
416 0 : if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
417 0 : return Err(anyhow!("invalid shard index {shard_id}"));
418 0 : }
419 : // The above conditions guarantee that we have all shards 0..count: len() matches count,
420 : // shard number < count, and numbers are unique (via hashmap).
421 :
422 : // Validate the URL.
423 0 : if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
424 0 : return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
425 0 : }
426 : }
427 :
428 0 : Ok(Self {
429 0 : urls,
430 0 : count,
431 0 : stripe_size,
432 0 : })
433 0 : }
434 : }
435 :
436 : /// Tracks the tenant's shards.
437 : struct Shards {
438 : /// Shards by shard index.
439 : ///
440 : /// INVARIANT: every shard 0..count is present.
441 : /// INVARIANT: shard 0 is always present.
442 : by_index: HashMap<ShardIndex, Shard>,
443 : /// The shard count.
444 : ///
445 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
446 : count: ShardCount,
447 : /// The stripe size.
448 : ///
449 : /// INVARIANT: None for unsharded tenants, Some for sharded.
450 : stripe_size: Option<ShardStripeSize>,
451 : }
452 :
453 : impl Shards {
454 : /// Creates a new set of shards based on a shard spec.
455 0 : fn new(
456 0 : tenant_id: TenantId,
457 0 : timeline_id: TimelineId,
458 0 : shard_spec: ShardSpec,
459 0 : auth_token: Option<String>,
460 0 : compression: Option<CompressionEncoding>,
461 0 : ) -> anyhow::Result<Self> {
462 : // NB: the shard spec has already been validated when constructed.
463 0 : let mut shards = HashMap::with_capacity(shard_spec.urls.len());
464 0 : for (shard_id, url) in shard_spec.urls {
465 0 : shards.insert(
466 0 : shard_id,
467 0 : Shard::new(
468 0 : url,
469 0 : tenant_id,
470 0 : timeline_id,
471 0 : shard_id,
472 0 : auth_token.clone(),
473 0 : compression,
474 0 : )?,
475 : );
476 : }
477 :
478 0 : Ok(Self {
479 0 : by_index: shards,
480 0 : count: shard_spec.count,
481 0 : stripe_size: shard_spec.stripe_size,
482 0 : })
483 0 : }
484 :
485 : /// Looks up the given shard.
486 : #[allow(clippy::result_large_err)] // TODO: check perf impact
487 0 : fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
488 0 : self.by_index
489 0 : .get(&shard_id)
490 0 : .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
491 0 : }
492 :
493 : /// Returns shard 0.
494 0 : fn get_zero(&self) -> &Shard {
495 0 : self.get(ShardIndex::new(ShardNumber(0), self.count))
496 0 : .expect("always present")
497 0 : }
498 : }
499 :
500 : /// A single shard. Has dedicated resource pools with the following structure:
501 : ///
502 : /// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
503 : /// * Client pool: unbounded.
504 : /// * Stream pool: unbounded.
505 : /// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
506 : /// * Bulk client pool: unbounded.
507 : /// * Bulk stream pool: unbounded.
508 : ///
509 : /// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
510 : /// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
511 : /// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
512 : /// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
513 : /// similar (except for TCP transmission time).
514 : ///
515 : /// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
516 : /// the code is fairly trivial, so we may as well keep them around for now in case we need them.
517 : struct Shard {
518 : /// The shard ID.
519 : id: ShardIndex,
520 : /// Unary gRPC client pool.
521 : client_pool: Arc<ClientPool>,
522 : /// GetPage stream pool.
523 : stream_pool: Arc<StreamPool>,
524 : /// GetPage stream pool for bulk requests.
525 : bulk_stream_pool: Arc<StreamPool>,
526 : }
527 :
528 : impl Shard {
529 : /// Creates a new shard. It has its own dedicated resource pools.
530 0 : fn new(
531 0 : url: String,
532 0 : tenant_id: TenantId,
533 0 : timeline_id: TimelineId,
534 0 : shard_id: ShardIndex,
535 0 : auth_token: Option<String>,
536 0 : compression: Option<CompressionEncoding>,
537 0 : ) -> anyhow::Result<Self> {
538 : // Shard pools for unary requests and non-bulk GetPage requests.
539 0 : let client_pool = ClientPool::new(
540 0 : ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
541 0 : tenant_id,
542 0 : timeline_id,
543 0 : shard_id,
544 0 : auth_token.clone(),
545 0 : compression,
546 0 : None, // unbounded
547 : );
548 0 : let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
549 :
550 : // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
551 0 : let bulk_stream_pool = StreamPool::new(
552 0 : ClientPool::new(
553 0 : ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
554 0 : tenant_id,
555 0 : timeline_id,
556 0 : shard_id,
557 0 : auth_token,
558 0 : compression,
559 0 : None, // unbounded,
560 : ),
561 0 : None, // unbounded
562 : );
563 :
564 0 : Ok(Self {
565 0 : id: shard_id,
566 0 : client_pool,
567 0 : stream_pool,
568 0 : bulk_stream_pool,
569 0 : })
570 0 : }
571 :
572 : /// Returns a pooled client for this shard.
573 : #[instrument(skip_all)]
574 : async fn client(&self) -> tonic::Result<ClientGuard> {
575 : warn_slow(
576 : "client pool acquisition",
577 : SLOW_THRESHOLD,
578 : pin!(self.client_pool.get()),
579 : )
580 : .await
581 : }
582 :
583 : /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
584 : #[instrument(skip_all, fields(bulk))]
585 : async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
586 : let pool = match bulk {
587 : false => &self.stream_pool,
588 : true => &self.bulk_stream_pool,
589 : };
590 : warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
591 : }
592 : }
|