Line data Source code
1 : use std::collections::HashMap;
2 : use std::num::NonZero;
3 : use std::pin::pin;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant};
6 :
7 : use anyhow::anyhow;
8 : use arc_swap::ArcSwap;
9 : use futures::stream::FuturesUnordered;
10 : use futures::{FutureExt as _, StreamExt as _};
11 : use tonic::codec::CompressionEncoding;
12 : use tracing::{debug, instrument};
13 : use utils::logging::warn_slow;
14 :
15 : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
16 : use crate::retry::Retry;
17 : use crate::split::GetPageSplitter;
18 : use compute_api::spec::PageserverProtocol;
19 : use pageserver_api::shard::ShardStripeSize;
20 : use pageserver_page_api as page_api;
21 : use utils::id::{TenantId, TimelineId};
22 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
23 :
24 : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
25 : /// when full.
26 : ///
27 : /// Normal requests are small, and we don't pipeline them, so we can afford a large number of
28 : /// streams per connection.
29 : ///
30 : /// TODO: tune all of these constants, and consider making them configurable.
31 : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
32 :
33 : /// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
34 : /// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
35 : /// transmission delays. This also concentrates large window sizes on a smaller set of
36 : /// streams/connections, presumably reducing memory use.
37 : const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
38 :
39 : /// The batch size threshold at which a GetPage request will use the bulk stream pool.
40 : ///
41 : /// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
42 : /// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
43 : const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
44 :
45 : /// The overall request call timeout, including retries and pool acquisition.
46 : /// TODO: should we retry forever? Should the caller decide?
47 : const CALL_TIMEOUT: Duration = Duration::from_secs(60);
48 :
49 : /// The per-request (retry attempt) timeout, including any lazy connection establishment.
50 : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
51 :
52 : /// The initial request retry backoff duration. The first retry does not back off.
53 : /// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
54 : const BASE_BACKOFF: Duration = Duration::from_millis(5);
55 :
56 : /// The maximum request retry backoff duration.
57 : const MAX_BACKOFF: Duration = Duration::from_secs(5);
58 :
59 : /// Threshold and interval for warning about slow operation.
60 : const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
61 :
62 : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
63 : /// basic `page_api::Client` gRPC client, and supports:
64 : ///
65 : /// * Sharded tenants across multiple Pageservers.
66 : /// * Pooling of connections, clients, and streams for efficient resource use.
67 : /// * Concurrent use by many callers.
68 : /// * Internal handling of GetPage bidirectional streams.
69 : /// * Automatic retries.
70 : /// * Observability.
71 : ///
72 : /// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
73 : /// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
74 : /// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
75 : /// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
76 : /// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
77 : /// one stream per backend, but without the TCP connection overhead. In the common case we expect
78 : /// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
79 : /// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
80 : ///
81 : /// TODO: this client does not support base backups or LSN leases, as these are only used by
82 : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
83 : pub struct PageserverClient {
84 : /// The tenant ID.
85 : tenant_id: TenantId,
86 : /// The timeline ID.
87 : timeline_id: TimelineId,
88 : /// The JWT auth token for this tenant, if any.
89 : auth_token: Option<String>,
90 : /// The compression to use, if any.
91 : compression: Option<CompressionEncoding>,
92 : /// The shards for this tenant.
93 : shards: ArcSwap<Shards>,
94 : }
95 :
96 : impl PageserverClient {
97 : /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
98 : /// in the shard spec, which must be complete and must use gRPC URLs.
99 0 : pub fn new(
100 0 : tenant_id: TenantId,
101 0 : timeline_id: TimelineId,
102 0 : shard_spec: ShardSpec,
103 0 : auth_token: Option<String>,
104 0 : compression: Option<CompressionEncoding>,
105 0 : ) -> anyhow::Result<Self> {
106 0 : let shards = Shards::new(
107 0 : tenant_id,
108 0 : timeline_id,
109 0 : shard_spec,
110 0 : auth_token.clone(),
111 0 : compression,
112 0 : )?;
113 0 : Ok(Self {
114 0 : tenant_id,
115 0 : timeline_id,
116 0 : auth_token,
117 0 : compression,
118 0 : shards: ArcSwap::new(Arc::new(shards)),
119 0 : })
120 0 : }
121 :
122 : /// Updates the shards from the given shard spec. In-flight requests will complete using the
123 : /// existing shards, but may retry with the new shards if they fail.
124 : ///
125 : /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
126 : /// properly spun down and dropped afterwards.
127 0 : pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
128 : // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
129 : // with concurrent updates, but that involves creating a new `Shards` on every attempt,
130 : // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
131 : // in the stack, and if they're violated then we already have problems elsewhere, so a
132 : // best-effort but possibly-racy check is okay here.
133 0 : let old = self.shards.load_full();
134 0 : if shard_spec.count < old.count {
135 0 : return Err(anyhow!(
136 0 : "can't reduce shard count from {} to {}",
137 0 : old.count,
138 0 : shard_spec.count
139 0 : ));
140 0 : }
141 0 : if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
142 0 : return Err(anyhow!(
143 0 : "can't change stripe size from {} to {}",
144 0 : old.stripe_size,
145 0 : shard_spec.stripe_size
146 0 : ));
147 0 : }
148 :
149 0 : let shards = Shards::new(
150 0 : self.tenant_id,
151 0 : self.timeline_id,
152 0 : shard_spec,
153 0 : self.auth_token.clone(),
154 0 : self.compression,
155 0 : )?;
156 0 : self.shards.store(Arc::new(shards));
157 0 : Ok(())
158 0 : }
159 :
160 : /// Returns whether a relation exists.
161 : #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
162 : pub async fn check_rel_exists(
163 : &self,
164 : req: page_api::CheckRelExistsRequest,
165 : ) -> tonic::Result<page_api::CheckRelExistsResponse> {
166 : debug!("sending request: {req:?}");
167 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
168 : // Relation metadata is only available on shard 0.
169 0 : let mut client = self.shards.load_full().get_zero().client().await?;
170 0 : Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await
171 0 : })
172 : .await?;
173 : debug!("received response: {resp:?}");
174 : Ok(resp)
175 : }
176 :
177 : /// Returns the total size of a database, as # of bytes.
178 : #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
179 : pub async fn get_db_size(
180 : &self,
181 : req: page_api::GetDbSizeRequest,
182 : ) -> tonic::Result<page_api::GetDbSizeResponse> {
183 : debug!("sending request: {req:?}");
184 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
185 : // Relation metadata is only available on shard 0.
186 0 : let mut client = self.shards.load_full().get_zero().client().await?;
187 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
188 0 : })
189 : .await?;
190 : debug!("received response: {resp:?}");
191 : Ok(resp)
192 : }
193 :
194 : /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
195 : /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
196 : /// shard boundaries, and assembles the responses.
197 : ///
198 : /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
199 : /// errors. All responses will have `GetPageStatusCode::Ok`.
200 : #[instrument(skip_all, fields(
201 : req_id = %req.request_id,
202 : class = %req.request_class,
203 : rel = %req.rel,
204 0 : blkno = %req.block_numbers[0],
205 : blks = %req.block_numbers.len(),
206 : lsn = %req.read_lsn,
207 : ))]
208 : pub async fn get_page(
209 : &self,
210 : req: page_api::GetPageRequest,
211 : ) -> tonic::Result<page_api::GetPageResponse> {
212 : // Make sure we have at least one page.
213 : if req.block_numbers.is_empty() {
214 : return Err(tonic::Status::invalid_argument("no block number"));
215 : }
216 : // The request attempt must be 0. The client will increment it internally.
217 : if req.request_id.attempt != 0 {
218 : return Err(tonic::Status::invalid_argument("request attempt must be 0"));
219 : }
220 :
221 : debug!("sending request: {req:?}");
222 :
223 : // The shards may change while we're fetching pages. We execute the request using a stable
224 : // view of the shards (especially important for requests that span shards), but retry the
225 : // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
226 : // retries and re-splits in some cases where requests span shards, but these are expected to
227 : // be rare.
228 : //
229 : // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
230 : // once we figure out how to handle these.
231 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
232 0 : let mut req = req.clone();
233 0 : req.request_id.attempt = attempt as u32;
234 0 : let shards = self.shards.load_full();
235 0 : Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
236 0 : })
237 : .await?;
238 :
239 : debug!("received response: {resp:?}");
240 : Ok(resp)
241 : }
242 :
243 : /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
244 : /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
245 0 : async fn get_page_with_shards(
246 0 : req: page_api::GetPageRequest,
247 0 : shards: &Shards,
248 0 : ) -> tonic::Result<page_api::GetPageResponse> {
249 : // Fast path: request is for a single shard.
250 0 : if let Some(shard_id) =
251 0 : GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
252 : {
253 0 : return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
254 0 : }
255 :
256 : // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
257 : // reassemble the responses.
258 0 : let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
259 :
260 0 : let mut shard_requests = FuturesUnordered::new();
261 0 : for (shard_id, shard_req) in splitter.drain_requests() {
262 0 : let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
263 0 : .map(move |result| result.map(|resp| (shard_id, resp)));
264 0 : shard_requests.push(future);
265 : }
266 :
267 0 : while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
268 0 : splitter.add_response(shard_id, shard_response)?;
269 : }
270 :
271 0 : splitter.get_response()
272 0 : }
273 :
274 : /// Fetches pages on the given shard. Does not retry internally.
275 0 : async fn get_page_with_shard(
276 0 : req: page_api::GetPageRequest,
277 0 : shard: &Shard,
278 0 : ) -> tonic::Result<page_api::GetPageResponse> {
279 0 : let mut stream = shard.stream(Self::is_bulk(&req)).await?;
280 0 : let resp = stream.send(req.clone()).await?;
281 :
282 : // Convert per-request errors into a tonic::Status.
283 0 : if resp.status_code != page_api::GetPageStatusCode::Ok {
284 0 : return Err(tonic::Status::new(
285 0 : resp.status_code.into(),
286 0 : resp.reason.unwrap_or_else(|| String::from("unknown error")),
287 : ));
288 0 : }
289 :
290 : // Check that we received the expected pages.
291 0 : if req.rel != resp.rel {
292 0 : return Err(tonic::Status::internal(format!(
293 0 : "shard {} returned wrong relation, expected {} got {}",
294 0 : shard.id, req.rel, resp.rel
295 0 : )));
296 0 : }
297 0 : if !req
298 0 : .block_numbers
299 0 : .iter()
300 0 : .copied()
301 0 : .eq(resp.pages.iter().map(|p| p.block_number))
302 : {
303 0 : return Err(tonic::Status::internal(format!(
304 0 : "shard {} returned wrong pages, expected {:?} got {:?}",
305 : shard.id,
306 : req.block_numbers,
307 0 : resp.pages
308 0 : .iter()
309 0 : .map(|page| page.block_number)
310 0 : .collect::<Vec<_>>()
311 : )));
312 0 : }
313 :
314 0 : Ok(resp)
315 0 : }
316 :
317 : /// Returns the size of a relation, as # of blocks.
318 : #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
319 : pub async fn get_rel_size(
320 : &self,
321 : req: page_api::GetRelSizeRequest,
322 : ) -> tonic::Result<page_api::GetRelSizeResponse> {
323 : debug!("sending request: {req:?}");
324 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
325 : // Relation metadata is only available on shard 0.
326 0 : let mut client = self.shards.load_full().get_zero().client().await?;
327 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
328 0 : })
329 : .await?;
330 : debug!("received response: {resp:?}");
331 : Ok(resp)
332 : }
333 :
334 : /// Fetches an SLRU segment.
335 : #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
336 : pub async fn get_slru_segment(
337 : &self,
338 : req: page_api::GetSlruSegmentRequest,
339 : ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
340 : debug!("sending request: {req:?}");
341 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
342 : // SLRU segments are only available on shard 0.
343 0 : let mut client = self.shards.load_full().get_zero().client().await?;
344 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
345 0 : })
346 : .await?;
347 : debug!("received response: {resp:?}");
348 : Ok(resp)
349 : }
350 :
351 : /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
352 : /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
353 0 : async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
354 0 : where
355 0 : F: FnMut(usize) -> O, // pass attempt number, starting at 0
356 0 : O: Future<Output = tonic::Result<T>>,
357 0 : {
358 0 : Retry {
359 0 : timeout: Some(timeout),
360 0 : base_backoff: BASE_BACKOFF,
361 0 : max_backoff: MAX_BACKOFF,
362 0 : }
363 0 : .with(f)
364 0 : .await
365 0 : }
366 :
367 : /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
368 0 : async fn with_timeout<T>(
369 0 : timeout: Duration,
370 0 : f: impl Future<Output = tonic::Result<T>>,
371 0 : ) -> tonic::Result<T> {
372 0 : let started = Instant::now();
373 0 : tokio::time::timeout(timeout, f).await.map_err(|_| {
374 0 : tonic::Status::deadline_exceeded(format!(
375 0 : "request timed out after {:.3}s",
376 0 : started.elapsed().as_secs_f64()
377 : ))
378 0 : })?
379 0 : }
380 :
381 : /// Returns true if the request is considered a bulk request and should use the bulk pool.
382 0 : fn is_bulk(req: &page_api::GetPageRequest) -> bool {
383 0 : req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
384 0 : }
385 : }
386 :
387 : /// Shard specification for a PageserverClient.
388 : pub struct ShardSpec {
389 : /// Maps shard indices to gRPC URLs.
390 : ///
391 : /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
392 : /// INVARIANT: every URL is valid and uses grpc:// scheme.
393 : urls: HashMap<ShardIndex, String>,
394 : /// The shard count.
395 : ///
396 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
397 : count: ShardCount,
398 : /// The stripe size for these shards.
399 : stripe_size: ShardStripeSize,
400 : }
401 :
402 : impl ShardSpec {
403 : /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
404 : /// The stripe size may be omitted for unsharded tenants.
405 0 : pub fn new(
406 0 : urls: HashMap<ShardIndex, String>,
407 0 : stripe_size: Option<ShardStripeSize>,
408 0 : ) -> anyhow::Result<Self> {
409 : // Compute the shard count.
410 0 : let count = match urls.len() {
411 0 : 0 => return Err(anyhow!("no shards provided")),
412 0 : 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
413 0 : n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
414 0 : n => ShardCount::new(n as u8),
415 : };
416 :
417 : // Determine the stripe size. It doesn't matter for unsharded tenants.
418 0 : if stripe_size.is_none() && !count.is_unsharded() {
419 0 : return Err(anyhow!("stripe size must be given for sharded tenants"));
420 0 : }
421 0 : let stripe_size = stripe_size.unwrap_or_default();
422 :
423 : // Validate the shard spec.
424 0 : for (shard_id, url) in &urls {
425 : // The shard index must match the computed shard count, even for unsharded tenants.
426 0 : if shard_id.shard_count != count {
427 0 : return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
428 0 : }
429 : // The shard index' number and count must be consistent.
430 0 : if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
431 0 : return Err(anyhow!("invalid shard index {shard_id}"));
432 0 : }
433 : // The above conditions guarantee that we have all shards 0..count: len() matches count,
434 : // shard number < count, and numbers are unique (via hashmap).
435 :
436 : // Validate the URL.
437 0 : if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
438 0 : return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
439 0 : }
440 : }
441 :
442 0 : Ok(Self {
443 0 : urls,
444 0 : count,
445 0 : stripe_size,
446 0 : })
447 0 : }
448 : }
449 :
450 : /// Tracks the tenant's shards.
451 : struct Shards {
452 : /// Shards by shard index.
453 : ///
454 : /// INVARIANT: every shard 0..count is present.
455 : /// INVARIANT: shard 0 is always present.
456 : by_index: HashMap<ShardIndex, Shard>,
457 : /// The shard count.
458 : ///
459 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
460 : count: ShardCount,
461 : /// The stripe size. Only used for sharded tenants.
462 : stripe_size: ShardStripeSize,
463 : }
464 :
465 : impl Shards {
466 : /// Creates a new set of shards based on a shard spec.
467 0 : fn new(
468 0 : tenant_id: TenantId,
469 0 : timeline_id: TimelineId,
470 0 : shard_spec: ShardSpec,
471 0 : auth_token: Option<String>,
472 0 : compression: Option<CompressionEncoding>,
473 0 : ) -> anyhow::Result<Self> {
474 : // NB: the shard spec has already been validated when constructed.
475 0 : let mut shards = HashMap::with_capacity(shard_spec.urls.len());
476 0 : for (shard_id, url) in shard_spec.urls {
477 0 : shards.insert(
478 0 : shard_id,
479 0 : Shard::new(
480 0 : url,
481 0 : tenant_id,
482 0 : timeline_id,
483 0 : shard_id,
484 0 : auth_token.clone(),
485 0 : compression,
486 0 : )?,
487 : );
488 : }
489 :
490 0 : Ok(Self {
491 0 : by_index: shards,
492 0 : count: shard_spec.count,
493 0 : stripe_size: shard_spec.stripe_size,
494 0 : })
495 0 : }
496 :
497 : /// Looks up the given shard.
498 : #[allow(clippy::result_large_err)] // TODO: check perf impact
499 0 : fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
500 0 : self.by_index
501 0 : .get(&shard_id)
502 0 : .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
503 0 : }
504 :
505 : /// Returns shard 0.
506 0 : fn get_zero(&self) -> &Shard {
507 0 : self.get(ShardIndex::new(ShardNumber(0), self.count))
508 0 : .expect("always present")
509 0 : }
510 : }
511 :
512 : /// A single shard. Has dedicated resource pools with the following structure:
513 : ///
514 : /// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
515 : /// * Client pool: unbounded.
516 : /// * Stream pool: unbounded.
517 : /// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
518 : /// * Bulk client pool: unbounded.
519 : /// * Bulk stream pool: unbounded.
520 : ///
521 : /// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
522 : /// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
523 : /// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
524 : /// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
525 : /// similar (except for TCP transmission time).
526 : ///
527 : /// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
528 : /// the code is fairly trivial, so we may as well keep them around for now in case we need them.
529 : struct Shard {
530 : /// The shard ID.
531 : id: ShardIndex,
532 : /// Unary gRPC client pool.
533 : client_pool: Arc<ClientPool>,
534 : /// GetPage stream pool.
535 : stream_pool: Arc<StreamPool>,
536 : /// GetPage stream pool for bulk requests.
537 : bulk_stream_pool: Arc<StreamPool>,
538 : }
539 :
540 : impl Shard {
541 : /// Creates a new shard. It has its own dedicated resource pools.
542 0 : fn new(
543 0 : url: String,
544 0 : tenant_id: TenantId,
545 0 : timeline_id: TimelineId,
546 0 : shard_id: ShardIndex,
547 0 : auth_token: Option<String>,
548 0 : compression: Option<CompressionEncoding>,
549 0 : ) -> anyhow::Result<Self> {
550 : // Shard pools for unary requests and non-bulk GetPage requests.
551 0 : let client_pool = ClientPool::new(
552 0 : ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
553 0 : tenant_id,
554 0 : timeline_id,
555 0 : shard_id,
556 0 : auth_token.clone(),
557 0 : compression,
558 0 : None, // unbounded
559 : );
560 0 : let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
561 :
562 : // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
563 0 : let bulk_stream_pool = StreamPool::new(
564 0 : ClientPool::new(
565 0 : ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
566 0 : tenant_id,
567 0 : timeline_id,
568 0 : shard_id,
569 0 : auth_token,
570 0 : compression,
571 0 : None, // unbounded,
572 : ),
573 0 : None, // unbounded
574 : );
575 :
576 0 : Ok(Self {
577 0 : id: shard_id,
578 0 : client_pool,
579 0 : stream_pool,
580 0 : bulk_stream_pool,
581 0 : })
582 0 : }
583 :
584 : /// Returns a pooled client for this shard.
585 : #[instrument(skip_all)]
586 : async fn client(&self) -> tonic::Result<ClientGuard> {
587 : warn_slow(
588 : "client pool acquisition",
589 : SLOW_THRESHOLD,
590 : pin!(self.client_pool.get()),
591 : )
592 : .await
593 : }
594 :
595 : /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
596 : #[instrument(skip_all, fields(bulk))]
597 : async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
598 : let pool = match bulk {
599 : false => &self.stream_pool,
600 : true => &self.bulk_stream_pool,
601 : };
602 : warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
603 : }
604 : }
|