Line data Source code
1 : use std::collections::HashMap;
2 : use std::num::NonZero;
3 : use std::pin::pin;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant};
6 :
7 : use anyhow::anyhow;
8 : use arc_swap::ArcSwap;
9 : use futures::stream::FuturesUnordered;
10 : use futures::{FutureExt as _, StreamExt as _};
11 : use tonic::codec::CompressionEncoding;
12 : use tracing::{debug, instrument};
13 : use utils::logging::warn_slow;
14 :
15 : use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
16 : use crate::retry::Retry;
17 : use crate::split::GetPageSplitter;
18 : use compute_api::spec::PageserverProtocol;
19 : use pageserver_page_api as page_api;
20 : use utils::id::{TenantId, TimelineId};
21 : use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
22 :
23 : /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
24 : /// when full.
25 : ///
26 : /// Normal requests are small, and we don't pipeline them, so we can afford a large number of
27 : /// streams per connection.
28 : ///
29 : /// TODO: tune all of these constants, and consider making them configurable.
30 : const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(64).unwrap();
31 :
32 : /// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a
33 : /// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and
34 : /// transmission delays. This also concentrates large window sizes on a smaller set of
35 : /// streams/connections, presumably reducing memory use.
36 : const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
37 :
38 : /// The batch size threshold at which a GetPage request will use the bulk stream pool.
39 : ///
40 : /// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window
41 : /// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool.
42 : const BULK_THRESHOLD_BATCH_SIZE: usize = 5;
43 :
44 : /// The overall request call timeout, including retries and pool acquisition.
45 : /// TODO: should we retry forever? Should the caller decide?
46 : const CALL_TIMEOUT: Duration = Duration::from_secs(60);
47 :
48 : /// The per-request (retry attempt) timeout, including any lazy connection establishment.
49 : const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
50 :
51 : /// The initial request retry backoff duration. The first retry does not back off.
52 : /// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support.
53 : const BASE_BACKOFF: Duration = Duration::from_millis(5);
54 :
55 : /// The maximum request retry backoff duration.
56 : const MAX_BACKOFF: Duration = Duration::from_secs(5);
57 :
58 : /// Threshold and interval for warning about slow operation.
59 : const SLOW_THRESHOLD: Duration = Duration::from_secs(3);
60 :
61 : /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
62 : /// basic `page_api::Client` gRPC client, and supports:
63 : ///
64 : /// * Sharded tenants across multiple Pageservers.
65 : /// * Pooling of connections, clients, and streams for efficient resource use.
66 : /// * Concurrent use by many callers.
67 : /// * Internal handling of GetPage bidirectional streams.
68 : /// * Automatic retries.
69 : /// * Observability.
70 : ///
71 : /// The client has dedicated connection/client/stream pools per shard, for resource reuse. These
72 : /// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all
73 : /// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly
74 : /// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000
75 : /// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with
76 : /// one stream per backend, but without the TCP connection overhead. In the common case we expect
77 : /// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits,
78 : /// read coalescing, sharding (backends typically only talk to one shard at a time), etc.
79 : ///
80 : /// TODO: this client does not support base backups or LSN leases, as these are only used by
81 : /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
82 : pub struct PageserverClient {
83 : /// The tenant ID.
84 : tenant_id: TenantId,
85 : /// The timeline ID.
86 : timeline_id: TimelineId,
87 : /// The JWT auth token for this tenant, if any.
88 : auth_token: Option<String>,
89 : /// The compression to use, if any.
90 : compression: Option<CompressionEncoding>,
91 : /// The shards for this tenant.
92 : shards: ArcSwap<Shards>,
93 : }
94 :
95 : impl PageserverClient {
96 : /// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
97 : /// in the shard spec, which must be complete and must use gRPC URLs.
98 0 : pub fn new(
99 0 : tenant_id: TenantId,
100 0 : timeline_id: TimelineId,
101 0 : shard_spec: ShardSpec,
102 0 : auth_token: Option<String>,
103 0 : compression: Option<CompressionEncoding>,
104 0 : ) -> anyhow::Result<Self> {
105 0 : let shards = Shards::new(
106 0 : tenant_id,
107 0 : timeline_id,
108 0 : shard_spec,
109 0 : auth_token.clone(),
110 0 : compression,
111 0 : )?;
112 0 : Ok(Self {
113 0 : tenant_id,
114 0 : timeline_id,
115 0 : auth_token,
116 0 : compression,
117 0 : shards: ArcSwap::new(Arc::new(shards)),
118 0 : })
119 0 : }
120 :
121 : /// Updates the shards from the given shard spec. In-flight requests will complete using the
122 : /// existing shards, but may retry with the new shards if they fail.
123 : ///
124 : /// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
125 : /// properly spun down and dropped afterwards.
126 0 : pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
127 : // Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
128 : // with concurrent updates, but that involves creating a new `Shards` on every attempt,
129 : // which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
130 : // in the stack, and if they're violated then we already have problems elsewhere, so a
131 : // best-effort but possibly-racy check is okay here.
132 0 : let old = self.shards.load_full();
133 0 : if shard_spec.count < old.count {
134 0 : return Err(anyhow!(
135 0 : "can't reduce shard count from {} to {}",
136 0 : old.count,
137 0 : shard_spec.count
138 0 : ));
139 0 : }
140 0 : if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
141 0 : return Err(anyhow!(
142 0 : "can't change stripe size from {} to {}",
143 0 : old.stripe_size.expect("always Some when sharded"),
144 0 : shard_spec.stripe_size.expect("always Some when sharded")
145 0 : ));
146 0 : }
147 :
148 0 : let shards = Shards::new(
149 0 : self.tenant_id,
150 0 : self.timeline_id,
151 0 : shard_spec,
152 0 : self.auth_token.clone(),
153 0 : self.compression,
154 0 : )?;
155 0 : self.shards.store(Arc::new(shards));
156 0 : Ok(())
157 0 : }
158 :
159 : /// Returns the total size of a database, as # of bytes.
160 : #[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
161 : pub async fn get_db_size(
162 : &self,
163 : req: page_api::GetDbSizeRequest,
164 : ) -> tonic::Result<page_api::GetDbSizeResponse> {
165 : debug!("sending request: {req:?}");
166 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
167 : // Relation metadata is only available on shard 0.
168 0 : let mut client = self.shards.load_full().get_zero().client().await?;
169 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await
170 0 : })
171 : .await?;
172 : debug!("received response: {resp:?}");
173 : Ok(resp)
174 : }
175 :
176 : /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
177 : /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
178 : /// shard boundaries, and assembles the responses.
179 : ///
180 : /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
181 : /// errors. All responses will have `GetPageStatusCode::Ok`.
182 : #[instrument(skip_all, fields(
183 : req_id = %req.request_id,
184 : class = %req.request_class,
185 : rel = %req.rel,
186 0 : blkno = %req.block_numbers[0],
187 : blks = %req.block_numbers.len(),
188 : lsn = %req.read_lsn,
189 : ))]
190 : pub async fn get_page(
191 : &self,
192 : req: page_api::GetPageRequest,
193 : ) -> tonic::Result<page_api::GetPageResponse> {
194 : // Make sure we have at least one page.
195 : if req.block_numbers.is_empty() {
196 : return Err(tonic::Status::invalid_argument("no block number"));
197 : }
198 : // The request attempt must be 0. The client will increment it internally.
199 : if req.request_id.attempt != 0 {
200 : return Err(tonic::Status::invalid_argument("request attempt must be 0"));
201 : }
202 :
203 : debug!("sending request: {req:?}");
204 :
205 : // The shards may change while we're fetching pages. We execute the request using a stable
206 : // view of the shards (especially important for requests that span shards), but retry the
207 : // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
208 : // retries and re-splits in some cases where requests span shards, but these are expected to
209 : // be rare.
210 : //
211 : // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
212 : // once we figure out how to handle these.
213 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| {
214 0 : let mut req = req.clone();
215 0 : req.request_id.attempt = attempt as u32;
216 0 : let shards = self.shards.load_full();
217 0 : Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await
218 0 : })
219 : .await?;
220 :
221 : debug!("received response: {resp:?}");
222 : Ok(resp)
223 : }
224 :
225 : /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
226 : /// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
227 0 : async fn get_page_with_shards(
228 0 : req: page_api::GetPageRequest,
229 0 : shards: &Shards,
230 0 : ) -> tonic::Result<page_api::GetPageResponse> {
231 : // Fast path: request is for a single shard.
232 0 : if let Some(shard_id) =
233 0 : GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
234 0 : .map_err(|err| tonic::Status::internal(err.to_string()))?
235 : {
236 0 : return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
237 0 : }
238 :
239 : // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
240 : // reassemble the responses.
241 0 : let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size)
242 0 : .map_err(|err| tonic::Status::internal(err.to_string()))?;
243 :
244 0 : let mut shard_requests = FuturesUnordered::new();
245 0 : for (shard_id, shard_req) in splitter.drain_requests() {
246 0 : let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
247 0 : .map(move |result| result.map(|resp| (shard_id, resp)));
248 0 : shard_requests.push(future);
249 : }
250 :
251 0 : while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
252 0 : splitter
253 0 : .add_response(shard_id, shard_response)
254 0 : .map_err(|err| tonic::Status::internal(err.to_string()))?;
255 : }
256 :
257 0 : splitter
258 0 : .get_response()
259 0 : .map_err(|err| tonic::Status::internal(err.to_string()))
260 0 : }
261 :
262 : /// Fetches pages on the given shard. Does not retry internally.
263 0 : async fn get_page_with_shard(
264 0 : req: page_api::GetPageRequest,
265 0 : shard: &Shard,
266 0 : ) -> tonic::Result<page_api::GetPageResponse> {
267 0 : let mut stream = shard.stream(Self::is_bulk(&req)).await?;
268 0 : let resp = stream.send(req.clone()).await?;
269 :
270 : // Convert per-request errors into a tonic::Status.
271 0 : if resp.status_code != page_api::GetPageStatusCode::Ok {
272 0 : return Err(tonic::Status::new(
273 0 : resp.status_code.into(),
274 0 : resp.reason.unwrap_or_else(|| String::from("unknown error")),
275 : ));
276 0 : }
277 :
278 : // Check that we received the expected pages.
279 0 : if req.rel != resp.rel {
280 0 : return Err(tonic::Status::internal(format!(
281 0 : "shard {} returned wrong relation, expected {} got {}",
282 0 : shard.id, req.rel, resp.rel
283 0 : )));
284 0 : }
285 0 : if !req
286 0 : .block_numbers
287 0 : .iter()
288 0 : .copied()
289 0 : .eq(resp.pages.iter().map(|p| p.block_number))
290 : {
291 0 : return Err(tonic::Status::internal(format!(
292 0 : "shard {} returned wrong pages, expected {:?} got {:?}",
293 : shard.id,
294 : req.block_numbers,
295 0 : resp.pages
296 0 : .iter()
297 0 : .map(|page| page.block_number)
298 0 : .collect::<Vec<_>>()
299 : )));
300 0 : }
301 :
302 0 : Ok(resp)
303 0 : }
304 :
305 : /// Returns the size of a relation, as # of blocks.
306 : #[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
307 : pub async fn get_rel_size(
308 : &self,
309 : req: page_api::GetRelSizeRequest,
310 : ) -> tonic::Result<page_api::GetRelSizeResponse> {
311 : debug!("sending request: {req:?}");
312 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
313 : // Relation metadata is only available on shard 0.
314 0 : let mut client = self.shards.load_full().get_zero().client().await?;
315 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await
316 0 : })
317 : .await?;
318 : debug!("received response: {resp:?}");
319 : Ok(resp)
320 : }
321 :
322 : /// Fetches an SLRU segment.
323 : #[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
324 : pub async fn get_slru_segment(
325 : &self,
326 : req: page_api::GetSlruSegmentRequest,
327 : ) -> tonic::Result<page_api::GetSlruSegmentResponse> {
328 : debug!("sending request: {req:?}");
329 0 : let resp = Self::with_retries(CALL_TIMEOUT, async |_| {
330 : // SLRU segments are only available on shard 0.
331 0 : let mut client = self.shards.load_full().get_zero().client().await?;
332 0 : Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await
333 0 : })
334 : .await?;
335 : debug!("received response: {resp:?}");
336 : Ok(resp)
337 : }
338 :
339 : /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status
340 : /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout.
341 0 : async fn with_retries<T, F, O>(timeout: Duration, f: F) -> tonic::Result<T>
342 0 : where
343 0 : F: FnMut(usize) -> O, // pass attempt number, starting at 0
344 0 : O: Future<Output = tonic::Result<T>>,
345 0 : {
346 0 : Retry {
347 0 : timeout: Some(timeout),
348 0 : base_backoff: BASE_BACKOFF,
349 0 : max_backoff: MAX_BACKOFF,
350 0 : }
351 0 : .with(f)
352 0 : .await
353 0 : }
354 :
355 : /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout.
356 0 : async fn with_timeout<T>(
357 0 : timeout: Duration,
358 0 : f: impl Future<Output = tonic::Result<T>>,
359 0 : ) -> tonic::Result<T> {
360 0 : let started = Instant::now();
361 0 : tokio::time::timeout(timeout, f).await.map_err(|_| {
362 0 : tonic::Status::deadline_exceeded(format!(
363 0 : "request timed out after {:.3}s",
364 0 : started.elapsed().as_secs_f64()
365 : ))
366 0 : })?
367 0 : }
368 :
369 : /// Returns true if the request is considered a bulk request and should use the bulk pool.
370 0 : fn is_bulk(req: &page_api::GetPageRequest) -> bool {
371 0 : req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE
372 0 : }
373 : }
374 :
375 : /// Shard specification for a PageserverClient.
376 : pub struct ShardSpec {
377 : /// Maps shard indices to gRPC URLs.
378 : ///
379 : /// INVARIANT: every shard 0..count is present, and shard 0 is always present.
380 : /// INVARIANT: every URL is valid and uses grpc:// scheme.
381 : urls: HashMap<ShardIndex, String>,
382 : /// The shard count.
383 : ///
384 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
385 : count: ShardCount,
386 : /// The stripe size for these shards.
387 : ///
388 : /// INVARIANT: None for unsharded tenants, Some for sharded.
389 : stripe_size: Option<ShardStripeSize>,
390 : }
391 :
392 : impl ShardSpec {
393 : /// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
394 : /// The stripe size must be Some for sharded tenants, or None for unsharded tenants.
395 0 : pub fn new(
396 0 : urls: HashMap<ShardIndex, String>,
397 0 : stripe_size: Option<ShardStripeSize>,
398 0 : ) -> anyhow::Result<Self> {
399 : // Compute the shard count.
400 0 : let count = match urls.len() {
401 0 : 0 => return Err(anyhow!("no shards provided")),
402 0 : 1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
403 0 : n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
404 0 : n => ShardCount::new(n as u8),
405 : };
406 :
407 : // Validate the stripe size.
408 0 : if stripe_size.is_none() && !count.is_unsharded() {
409 0 : return Err(anyhow!("stripe size must be given for sharded tenants"));
410 0 : }
411 0 : if stripe_size.is_some() && count.is_unsharded() {
412 0 : return Err(anyhow!("stripe size can't be given for unsharded tenants"));
413 0 : }
414 :
415 : // Validate the shard spec.
416 0 : for (shard_id, url) in &urls {
417 : // The shard index must match the computed shard count, even for unsharded tenants.
418 0 : if shard_id.shard_count != count {
419 0 : return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
420 0 : }
421 : // The shard index' number and count must be consistent.
422 0 : if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
423 0 : return Err(anyhow!("invalid shard index {shard_id}"));
424 0 : }
425 : // The above conditions guarantee that we have all shards 0..count: len() matches count,
426 : // shard number < count, and numbers are unique (via hashmap).
427 :
428 : // Validate the URL.
429 0 : if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
430 0 : return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
431 0 : }
432 : }
433 :
434 0 : Ok(Self {
435 0 : urls,
436 0 : count,
437 0 : stripe_size,
438 0 : })
439 0 : }
440 : }
441 :
442 : /// Tracks the tenant's shards.
443 : struct Shards {
444 : /// Shards by shard index.
445 : ///
446 : /// INVARIANT: every shard 0..count is present.
447 : /// INVARIANT: shard 0 is always present.
448 : by_index: HashMap<ShardIndex, Shard>,
449 : /// The shard count.
450 : ///
451 : /// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
452 : count: ShardCount,
453 : /// The stripe size.
454 : ///
455 : /// INVARIANT: None for unsharded tenants, Some for sharded.
456 : stripe_size: Option<ShardStripeSize>,
457 : }
458 :
459 : impl Shards {
460 : /// Creates a new set of shards based on a shard spec.
461 0 : fn new(
462 0 : tenant_id: TenantId,
463 0 : timeline_id: TimelineId,
464 0 : shard_spec: ShardSpec,
465 0 : auth_token: Option<String>,
466 0 : compression: Option<CompressionEncoding>,
467 0 : ) -> anyhow::Result<Self> {
468 : // NB: the shard spec has already been validated when constructed.
469 0 : let mut shards = HashMap::with_capacity(shard_spec.urls.len());
470 0 : for (shard_id, url) in shard_spec.urls {
471 0 : shards.insert(
472 0 : shard_id,
473 0 : Shard::new(
474 0 : url,
475 0 : tenant_id,
476 0 : timeline_id,
477 0 : shard_id,
478 0 : auth_token.clone(),
479 0 : compression,
480 0 : )?,
481 : );
482 : }
483 :
484 0 : Ok(Self {
485 0 : by_index: shards,
486 0 : count: shard_spec.count,
487 0 : stripe_size: shard_spec.stripe_size,
488 0 : })
489 0 : }
490 :
491 : /// Looks up the given shard.
492 : #[allow(clippy::result_large_err)] // TODO: check perf impact
493 0 : fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
494 0 : self.by_index
495 0 : .get(&shard_id)
496 0 : .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
497 0 : }
498 :
499 : /// Returns shard 0.
500 0 : fn get_zero(&self) -> &Shard {
501 0 : self.get(ShardIndex::new(ShardNumber(0), self.count))
502 0 : .expect("always present")
503 0 : }
504 : }
505 :
506 : /// A single shard. Has dedicated resource pools with the following structure:
507 : ///
508 : /// * Channel pool: MAX_CLIENTS_PER_CHANNEL.
509 : /// * Client pool: unbounded.
510 : /// * Stream pool: unbounded.
511 : /// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL.
512 : /// * Bulk client pool: unbounded.
513 : /// * Bulk stream pool: unbounded.
514 : ///
515 : /// We use a separate bulk channel pool with a lower concurrency limit for large batch requests.
516 : /// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a
517 : /// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools
518 : /// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly
519 : /// similar (except for TCP transmission time).
520 : ///
521 : /// TODO: since we never use bounded pools, we could consider removing the pool limiters. However,
522 : /// the code is fairly trivial, so we may as well keep them around for now in case we need them.
523 : struct Shard {
524 : /// The shard ID.
525 : id: ShardIndex,
526 : /// Unary gRPC client pool.
527 : client_pool: Arc<ClientPool>,
528 : /// GetPage stream pool.
529 : stream_pool: Arc<StreamPool>,
530 : /// GetPage stream pool for bulk requests.
531 : bulk_stream_pool: Arc<StreamPool>,
532 : }
533 :
534 : impl Shard {
535 : /// Creates a new shard. It has its own dedicated resource pools.
536 0 : fn new(
537 0 : url: String,
538 0 : tenant_id: TenantId,
539 0 : timeline_id: TimelineId,
540 0 : shard_id: ShardIndex,
541 0 : auth_token: Option<String>,
542 0 : compression: Option<CompressionEncoding>,
543 0 : ) -> anyhow::Result<Self> {
544 : // Shard pools for unary requests and non-bulk GetPage requests.
545 0 : let client_pool = ClientPool::new(
546 0 : ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?,
547 0 : tenant_id,
548 0 : timeline_id,
549 0 : shard_id,
550 0 : auth_token.clone(),
551 0 : compression,
552 0 : None, // unbounded
553 : );
554 0 : let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded
555 :
556 : // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.).
557 0 : let bulk_stream_pool = StreamPool::new(
558 0 : ClientPool::new(
559 0 : ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?,
560 0 : tenant_id,
561 0 : timeline_id,
562 0 : shard_id,
563 0 : auth_token,
564 0 : compression,
565 0 : None, // unbounded,
566 : ),
567 0 : None, // unbounded
568 : );
569 :
570 0 : Ok(Self {
571 0 : id: shard_id,
572 0 : client_pool,
573 0 : stream_pool,
574 0 : bulk_stream_pool,
575 0 : })
576 0 : }
577 :
578 : /// Returns a pooled client for this shard.
579 : #[instrument(skip_all)]
580 : async fn client(&self) -> tonic::Result<ClientGuard> {
581 : warn_slow(
582 : "client pool acquisition",
583 : SLOW_THRESHOLD,
584 : pin!(self.client_pool.get()),
585 : )
586 : .await
587 : }
588 :
589 : /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool.
590 : #[instrument(skip_all, fields(bulk))]
591 : async fn stream(&self, bulk: bool) -> tonic::Result<StreamGuard> {
592 : let pool = match bulk {
593 : false => &self.stream_pool,
594 : true => &self.bulk_stream_pool,
595 : };
596 : warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await
597 : }
598 : }
|