Line data Source code
1 : //! This module provides various Pageserver gRPC client resource pools.
2 : //!
3 : //! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
4 : //! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
5 : //! of creating dedicated TCP connections and server tasks for every Postgres backend.
6 : //!
7 : //! Each resource has its own, nested pool. The pools are custom-built for the properties of each
8 : //! resource -- they are different enough that a generic pool isn't suitable.
9 : //!
10 : //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
11 : //! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
12 : //! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on
13 : //! client/stream idle timeouts.
14 : //!
15 : //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
16 : //! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
17 : //! single caller at a time, and is returned to the pool when dropped. Idle clients are removed
18 : //! from the pool after a while to free up resources.
19 : //!
20 : //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
21 : //! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a
22 : //! time, and is returned to the pool when dropped. Idle streams are removed from the pool after
23 : //! a while to free up resources.
24 : //!
25 : //! The stream only supports sending a single, synchronous request at a time, and does not support
26 : //! pipelining multiple requests from different callers onto the same stream -- instead, we scale
27 : //! out concurrent streams to improve throughput. There are many reasons for this design choice:
28 : //!
29 : //! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by
30 : //! a single server task, which may block e.g. on layer downloads, LSN waits, etc.
31 : //!
32 : //! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away
33 : //! (e.g. because of a timeout), the request would still be processed by the server and block
34 : //! requests behind it in the stream. It might even block its own timeout retry.
35 : //!
36 : //! * Stream scheduling becomes significantly simpler and cheaper.
37 : //!
38 : //! * Individual callers can still use client-side batching for pipelining.
39 : //!
40 : //! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
41 : //! per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
42 : //!
43 : //! Each channel corresponds to one TCP connection. Each client unary request and each stream
44 : //! corresponds to one HTTP/2 stream and server task.
45 : //!
46 : //! TODO: error handling (including custom error types).
47 : //! TODO: observability.
48 :
49 : use std::collections::BTreeMap;
50 : use std::num::NonZero;
51 : use std::ops::{Deref, DerefMut};
52 : use std::pin::Pin;
53 : use std::sync::atomic::{AtomicUsize, Ordering};
54 : use std::sync::{Arc, Mutex, Weak};
55 : use std::time::{Duration, Instant};
56 :
57 : use futures::{Stream, StreamExt as _};
58 : use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
59 : use tokio_stream::wrappers::WatchStream;
60 : use tokio_util::sync::CancellationToken;
61 : use tonic::codec::CompressionEncoding;
62 : use tonic::transport::{Channel, Endpoint};
63 :
64 : use pageserver_page_api as page_api;
65 : use utils::id::{TenantId, TimelineId};
66 : use utils::shard::ShardIndex;
67 :
68 : /// Reap clients/streams that have been idle for this long. Channels are reaped immediately when
69 : /// empty, and indirectly rely on the client/stream idle timeouts.
70 : ///
71 : /// A stream's client will be reaped after 2x the idle threshold (first stream the client), but
72 : /// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to
73 : /// keep its client around in the pool for a while.
74 : const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
75 : false => Duration::from_secs(180),
76 : true => Duration::from_secs(1), // exercise reaping in tests
77 : };
78 :
79 : /// Reap idle resources with this interval.
80 : const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
81 : false => Duration::from_secs(10),
82 : true => Duration::from_secs(1), // exercise reaping in tests
83 : };
84 :
85 : /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
86 : /// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
87 : /// The pool does not limit the number of channels, and instead relies on `ClientPool` or
88 : /// `StreamPool` to limit the number of concurrent clients.
89 : ///
90 : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
91 : ///
92 : /// TODO: consider prewarming a set of channels, to avoid initial connection latency.
93 : /// TODO: consider adding a circuit breaker for errors and fail fast.
94 : pub struct ChannelPool {
95 : /// Pageserver endpoint to connect to.
96 : endpoint: Endpoint,
97 : /// Max number of clients per channel. Beyond this, a new channel will be created.
98 : max_clients_per_channel: NonZero<usize>,
99 : /// Open channels.
100 : channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
101 : /// Channel ID generator.
102 : next_channel_id: AtomicUsize,
103 : }
104 :
105 : type ChannelID = usize;
106 :
107 : struct ChannelEntry {
108 : /// The gRPC channel (i.e. TCP connection). Shared by multiple clients.
109 : channel: Channel,
110 : /// Number of clients using this channel.
111 : clients: usize,
112 : }
113 :
114 : impl ChannelPool {
115 : /// Creates a new channel pool for the given Pageserver endpoint.
116 0 : pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
117 0 : where
118 0 : E: TryInto<Endpoint> + Send + Sync + 'static,
119 0 : <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
120 : {
121 0 : Ok(Arc::new(Self {
122 0 : endpoint: endpoint.try_into()?,
123 0 : max_clients_per_channel,
124 0 : channels: Mutex::default(),
125 0 : next_channel_id: AtomicUsize::default(),
126 : }))
127 0 : }
128 :
129 : /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
130 : ///
131 : /// This never blocks (except for mutex acquisition). The channel is connected lazily on first
132 : /// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
133 : /// automatically on failure (TODO: verify).
134 : ///
135 : /// Callers should not clone the returned channel, and must hold onto the returned guard as long
136 : /// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
137 : /// client requires an owned `Channel` and we don't have access to the channel's internal
138 : /// refcount.
139 : ///
140 : /// This is not performance-sensitive. It is only called when creating a new client, and clients
141 : /// are pooled and reused by `ClientPool`. The total number of channels will also be small. O(n)
142 : /// performance is therefore okay.
143 0 : pub fn get(self: &Arc<Self>) -> ChannelGuard {
144 0 : let mut channels = self.channels.lock().unwrap();
145 :
146 : // Try to find an existing channel with available capacity. We check entries in BTreeMap
147 : // order, to fill up the lower-ordered channels first. The client/stream pools also prefer
148 : // clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered
149 : // channels, and free up higher-ordered channels such that they can be reaped.
150 0 : for (&id, entry) in channels.iter_mut() {
151 0 : assert!(
152 0 : entry.clients <= self.max_clients_per_channel.get(),
153 0 : "channel overflow"
154 : );
155 0 : assert_ne!(entry.clients, 0, "empty channel not reaped");
156 0 : if entry.clients < self.max_clients_per_channel.get() {
157 0 : entry.clients += 1;
158 0 : return ChannelGuard {
159 0 : pool: Arc::downgrade(self),
160 0 : id,
161 0 : channel: Some(entry.channel.clone()),
162 0 : };
163 0 : }
164 : }
165 :
166 : // Create a new channel. We connect lazily on first use, such that we don't block here and
167 : // other clients can join onto the same channel while it's connecting.
168 0 : let channel = self.endpoint.connect_lazy();
169 :
170 0 : let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
171 0 : let entry = ChannelEntry {
172 0 : channel: channel.clone(),
173 0 : clients: 1, // account for the guard below
174 0 : };
175 0 : channels.insert(id, entry);
176 :
177 0 : ChannelGuard {
178 0 : pool: Arc::downgrade(self),
179 0 : id,
180 0 : channel: Some(channel),
181 0 : }
182 0 : }
183 : }
184 :
185 : /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
186 : /// since the gRPC client requires an owned `Channel`.
187 : pub struct ChannelGuard {
188 : pool: Weak<ChannelPool>,
189 : id: ChannelID,
190 : channel: Option<Channel>,
191 : }
192 :
193 : impl ChannelGuard {
194 : /// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
195 : /// the guard as long as the channel is in use, and should not clone it.
196 0 : pub fn take(&mut self) -> Channel {
197 0 : self.channel.take().expect("channel already taken")
198 0 : }
199 : }
200 :
201 : /// Returns the channel to the pool. The channel is closed when empty.
202 : impl Drop for ChannelGuard {
203 0 : fn drop(&mut self) {
204 0 : let Some(pool) = self.pool.upgrade() else {
205 0 : return; // pool was dropped
206 : };
207 :
208 0 : let mut channels = pool.channels.lock().unwrap();
209 0 : let entry = channels.get_mut(&self.id).expect("unknown channel");
210 0 : assert!(entry.clients > 0, "channel underflow");
211 0 : entry.clients -= 1;
212 :
213 : // Reap empty channels immediately.
214 0 : if entry.clients == 0 {
215 0 : channels.remove(&self.id);
216 0 : }
217 0 : }
218 : }
219 :
220 : /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
221 : /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
222 : /// number of concurrent clients to `max_clients` via semaphore.
223 : ///
224 : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
225 : pub struct ClientPool {
226 : /// Tenant ID.
227 : tenant_id: TenantId,
228 : /// Timeline ID.
229 : timeline_id: TimelineId,
230 : /// Shard ID.
231 : shard_id: ShardIndex,
232 : /// Authentication token, if any.
233 : auth_token: Option<String>,
234 : /// Compression to use.
235 : compression: Option<CompressionEncoding>,
236 : /// Channel pool to acquire channels from.
237 : channel_pool: Arc<ChannelPool>,
238 : /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
239 : limiter: Option<Arc<Semaphore>>,
240 : /// Idle pooled clients. Acquired clients are removed from here and returned on drop.
241 : ///
242 : /// The first client in the map will be acquired next. The map is sorted by client ID, which in
243 : /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
244 : /// lower-ordered channels. This allows us to free up and reap higher-ordered channels.
245 : idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
246 : /// Reaps idle clients.
247 : idle_reaper: Reaper,
248 : /// Unique client ID generator.
249 : next_client_id: AtomicUsize,
250 : }
251 :
252 : type ClientID = (ChannelID, usize);
253 :
254 : struct ClientEntry {
255 : /// The pooled gRPC client.
256 : client: page_api::Client,
257 : /// The channel guard for the channel used by the client.
258 : channel_guard: ChannelGuard,
259 : /// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
260 : /// definition, so this is the time when it was added back to the pool.
261 : idle_since: Instant,
262 : }
263 :
264 : impl ClientPool {
265 : /// Creates a new client pool for the given tenant shard. Channels are acquired from the given
266 : /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
267 : /// `max_clients` concurrent clients, or unbounded if None.
268 0 : pub fn new(
269 0 : channel_pool: Arc<ChannelPool>,
270 0 : tenant_id: TenantId,
271 0 : timeline_id: TimelineId,
272 0 : shard_id: ShardIndex,
273 0 : auth_token: Option<String>,
274 0 : compression: Option<CompressionEncoding>,
275 0 : max_clients: Option<NonZero<usize>>,
276 0 : ) -> Arc<Self> {
277 0 : let pool = Arc::new(Self {
278 0 : tenant_id,
279 0 : timeline_id,
280 0 : shard_id,
281 0 : auth_token,
282 0 : compression,
283 0 : channel_pool,
284 0 : idle: Mutex::default(),
285 0 : idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
286 0 : limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
287 0 : next_client_id: AtomicUsize::default(),
288 : });
289 0 : pool.idle_reaper.spawn(&pool);
290 0 : pool
291 0 : }
292 :
293 : /// Gets a client from the pool, or creates a new one if necessary. Connections are established
294 : /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
295 : /// is returned to the pool when the guard is dropped.
296 : ///
297 : /// This is moderately performance-sensitive. It is called for every unary request, but these
298 : /// establish a new gRPC stream per request so they're already expensive. GetPage requests use
299 : /// the `StreamPool` instead.
300 0 : pub async fn get(self: &Arc<Self>) -> tonic::Result<ClientGuard> {
301 : // Acquire a permit if the pool is bounded.
302 0 : let mut permit = None;
303 0 : if let Some(limiter) = self.limiter.clone() {
304 0 : permit = Some(limiter.acquire_owned().await.expect("never closed"));
305 0 : }
306 :
307 : // Fast path: acquire an idle client from the pool.
308 0 : if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
309 0 : return Ok(ClientGuard {
310 0 : pool: Arc::downgrade(self),
311 0 : id,
312 0 : client: Some(entry.client),
313 0 : channel_guard: Some(entry.channel_guard),
314 0 : permit,
315 0 : });
316 0 : }
317 :
318 : // Construct a new client.
319 0 : let mut channel_guard = self.channel_pool.get();
320 0 : let client = page_api::Client::new(
321 0 : channel_guard.take(),
322 0 : self.tenant_id,
323 0 : self.timeline_id,
324 0 : self.shard_id,
325 0 : self.auth_token.clone(),
326 0 : self.compression,
327 : )
328 0 : .map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?;
329 :
330 0 : Ok(ClientGuard {
331 0 : pool: Arc::downgrade(self),
332 0 : id: (
333 0 : channel_guard.id,
334 0 : self.next_client_id.fetch_add(1, Ordering::Relaxed),
335 0 : ),
336 0 : client: Some(client),
337 0 : channel_guard: Some(channel_guard),
338 0 : permit,
339 0 : })
340 0 : }
341 : }
342 :
343 : impl Reapable for ClientPool {
344 : /// Reaps clients that have been idle since before the cutoff.
345 0 : fn reap_idle(&self, cutoff: Instant) {
346 0 : self.idle
347 0 : .lock()
348 0 : .unwrap()
349 0 : .retain(|_, entry| entry.idle_since >= cutoff)
350 0 : }
351 : }
352 :
353 : /// A client acquired from the pool. The inner client can be accessed via Deref. The client is
354 : /// returned to the pool when dropped.
355 : pub struct ClientGuard {
356 : pool: Weak<ClientPool>,
357 : id: ClientID,
358 : client: Option<page_api::Client>, // Some until dropped
359 : channel_guard: Option<ChannelGuard>, // Some until dropped
360 : permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
361 : }
362 :
363 : impl Deref for ClientGuard {
364 : type Target = page_api::Client;
365 :
366 0 : fn deref(&self) -> &Self::Target {
367 0 : self.client.as_ref().expect("not dropped")
368 0 : }
369 : }
370 :
371 : impl DerefMut for ClientGuard {
372 0 : fn deref_mut(&mut self) -> &mut Self::Target {
373 0 : self.client.as_mut().expect("not dropped")
374 0 : }
375 : }
376 :
377 : /// Returns the client to the pool.
378 : impl Drop for ClientGuard {
379 0 : fn drop(&mut self) {
380 0 : let Some(pool) = self.pool.upgrade() else {
381 0 : return; // pool was dropped
382 : };
383 :
384 0 : let entry = ClientEntry {
385 0 : client: self.client.take().expect("dropped once"),
386 0 : channel_guard: self.channel_guard.take().expect("dropped once"),
387 0 : idle_since: Instant::now(),
388 0 : };
389 0 : pool.idle.lock().unwrap().insert(self.id, entry);
390 :
391 0 : _ = self.permit; // returned on drop, referenced for visibility
392 0 : }
393 : }
394 :
395 : /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
396 : /// acquires a client from the inner `ClientPool` for the stream's lifetime.
397 : ///
398 : /// Individual streams only send a single request at a time, and do not pipeline multiple callers
399 : /// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily
400 : /// to eliminate head-of-line blocking. See the module documentation for more details.
401 : ///
402 : /// TODO: consider making this generic over request and response types; not currently needed.
403 : pub struct StreamPool {
404 : /// The client pool to acquire clients from. Must be unbounded.
405 : client_pool: Arc<ClientPool>,
406 : /// Idle pooled streams. Acquired streams are removed from here and returned on drop.
407 : ///
408 : /// The first stream in the map will be acquired next. The map is sorted by stream ID, which is
409 : /// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer
410 : /// acquiring idle streams from lower-ordered channels, which allows us to free up and reap
411 : /// higher-ordered channels.
412 : idle: Mutex<BTreeMap<StreamID, StreamEntry>>,
413 : /// Limits the max number of concurrent streams. None if the pool is unbounded.
414 : limiter: Option<Arc<Semaphore>>,
415 : /// Reaps idle streams.
416 : idle_reaper: Reaper,
417 : }
418 :
419 : /// The stream ID. Reuses the inner client ID.
420 : type StreamID = ClientID;
421 :
422 : /// A pooled stream.
423 : struct StreamEntry {
424 : /// The bidirectional stream.
425 : stream: BiStream,
426 : /// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`.
427 : idle_since: Instant,
428 : }
429 :
430 : /// A bidirectional GetPage stream and its client. Can send requests and receive responses.
431 : struct BiStream {
432 : /// The owning client. Holds onto the channel slot while the stream is alive.
433 : client: ClientGuard,
434 : /// Stream for sending requests. Uses a watch channel, so it can only send a single request at a
435 : /// time, and the caller must await the response before sending another request. This is
436 : /// enforced by `StreamGuard::send`.
437 : sender: watch::Sender<page_api::GetPageRequest>,
438 : /// Stream for receiving responses.
439 : receiver: Pin<Box<dyn Stream<Item = tonic::Result<page_api::GetPageResponse>> + Send>>,
440 : }
441 :
442 : impl StreamPool {
443 : /// Creates a new stream pool, using the given client pool. It will use up to `max_streams`
444 : /// concurrent streams.
445 : ///
446 : /// The client pool must be unbounded. The stream pool will enforce its own limits, and because
447 : /// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
448 : /// The stream pool should generally have its own dedicated client pool (but it can share a
449 : /// channel pool with others since these are always unbounded).
450 0 : pub fn new(client_pool: Arc<ClientPool>, max_streams: Option<NonZero<usize>>) -> Arc<Self> {
451 0 : assert!(client_pool.limiter.is_none(), "bounded client pool");
452 0 : let pool = Arc::new(Self {
453 0 : client_pool,
454 0 : idle: Mutex::default(),
455 0 : limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))),
456 0 : idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
457 : });
458 0 : pool.idle_reaper.spawn(&pool);
459 0 : pool
460 0 : }
461 :
462 : /// Acquires an available stream from the pool, or spins up a new stream if all streams are
463 : /// full. Returns a guard that can be used to send requests and await the responses. Blocks if
464 : /// the pool is full.
465 : ///
466 : /// This is very performance-sensitive, as it is on the GetPage hot path.
467 : ///
468 : /// TODO: is a `Mutex<BTreeMap>` performant enough? Will it become too contended? We can't
469 : /// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first
470 : /// to free up higher-ordered channels.
471 0 : pub async fn get(self: &Arc<Self>) -> tonic::Result<StreamGuard> {
472 : // Acquire a permit if the pool is bounded.
473 0 : let mut permit = None;
474 0 : if let Some(limiter) = self.limiter.clone() {
475 0 : permit = Some(limiter.acquire_owned().await.expect("never closed"));
476 0 : }
477 :
478 : // Fast path: acquire an idle stream from the pool.
479 0 : if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() {
480 0 : return Ok(StreamGuard {
481 0 : pool: Arc::downgrade(self),
482 0 : stream: Some(entry.stream),
483 0 : can_reuse: true,
484 0 : permit,
485 0 : });
486 0 : }
487 :
488 : // Spin up a new stream. Uses a watch channel to send a single request at a time, since
489 : // `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead.
490 0 : let mut client = self.client_pool.get().await?;
491 :
492 0 : let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default());
493 0 : let req_stream = WatchStream::from_changes(req_rx);
494 0 : let resp_stream = client.get_pages(req_stream).await?;
495 :
496 0 : Ok(StreamGuard {
497 0 : pool: Arc::downgrade(self),
498 0 : stream: Some(BiStream {
499 0 : client,
500 0 : sender: req_tx,
501 0 : receiver: Box::pin(resp_stream),
502 0 : }),
503 0 : can_reuse: true,
504 0 : permit,
505 0 : })
506 0 : }
507 : }
508 :
509 : impl Reapable for StreamPool {
510 : /// Reaps streams that have been idle since before the cutoff.
511 0 : fn reap_idle(&self, cutoff: Instant) {
512 0 : self.idle
513 0 : .lock()
514 0 : .unwrap()
515 0 : .retain(|_, entry| entry.idle_since >= cutoff);
516 0 : }
517 : }
518 :
519 : /// A stream acquired from the pool. Returned to the pool when dropped, unless there are still
520 : /// in-flight requests on the stream, or the stream failed.
521 : pub struct StreamGuard {
522 : pool: Weak<StreamPool>,
523 : stream: Option<BiStream>, // Some until dropped
524 : can_reuse: bool, // returned to pool if true
525 : permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
526 : }
527 :
528 : impl StreamGuard {
529 : /// Sends a request on the stream and awaits the response. If the future is dropped before it
530 : /// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the
531 : /// request and is not returned to the pool. The same is true if the stream errors, in which
532 : /// case the caller can't send further requests on the stream.
533 : ///
534 : /// We only support sending a single request at a time, to eliminate head-of-line blocking. See
535 : /// module documentation for details.
536 : ///
537 : /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
538 : /// to avoid tearing down the stream for per-request errors. Callers must check this.
539 0 : pub async fn send(
540 0 : &mut self,
541 0 : req: page_api::GetPageRequest,
542 0 : ) -> tonic::Result<page_api::GetPageResponse> {
543 0 : let req_id = req.request_id;
544 0 : let stream = self.stream.as_mut().expect("not dropped");
545 :
546 : // Mark the stream as not reusable while the request is in flight. We can't return the
547 : // stream to the pool until we receive the response, to avoid head-of-line blocking and
548 : // stale responses. Failed streams can't be reused either.
549 0 : if !self.can_reuse {
550 0 : return Err(tonic::Status::internal("stream can't be reused"));
551 0 : }
552 0 : self.can_reuse = false;
553 :
554 : // Send the request and receive the response.
555 : //
556 : // NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests.
557 0 : stream
558 0 : .sender
559 0 : .send(req)
560 0 : .map_err(|_| tonic::Status::unavailable("stream closed"))?;
561 :
562 0 : let resp = stream
563 0 : .receiver
564 0 : .next()
565 0 : .await
566 0 : .ok_or_else(|| tonic::Status::unavailable("stream closed"))??;
567 :
568 0 : if resp.request_id != req_id {
569 0 : return Err(tonic::Status::internal(format!(
570 0 : "response ID {} does not match request ID {}",
571 0 : resp.request_id, req_id
572 0 : )));
573 0 : }
574 :
575 : // Success, mark the stream as reusable.
576 0 : self.can_reuse = true;
577 :
578 0 : Ok(resp)
579 0 : }
580 : }
581 :
582 : impl Drop for StreamGuard {
583 0 : fn drop(&mut self) {
584 0 : let Some(pool) = self.pool.upgrade() else {
585 0 : return; // pool was dropped
586 : };
587 :
588 : // If the stream isn't reusable, it can't be returned to the pool.
589 0 : if !self.can_reuse {
590 0 : return;
591 0 : }
592 :
593 : // Place the idle stream back into the pool.
594 0 : let entry = StreamEntry {
595 0 : stream: self.stream.take().expect("dropped once"),
596 0 : idle_since: Instant::now(),
597 0 : };
598 0 : pool.idle
599 0 : .lock()
600 0 : .unwrap()
601 0 : .insert(entry.stream.client.id, entry);
602 :
603 0 : _ = self.permit; // returned on drop, referenced for visibility
604 0 : }
605 : }
606 :
607 : /// Periodically reaps idle resources from a pool.
608 : struct Reaper {
609 : /// The task check interval.
610 : interval: Duration,
611 : /// The threshold for reaping idle resources.
612 : threshold: Duration,
613 : /// Cancels the reaper task. Cancelled when the reaper is dropped.
614 : cancel: CancellationToken,
615 : }
616 :
617 : impl Reaper {
618 : /// Creates a new reaper.
619 0 : pub fn new(threshold: Duration, interval: Duration) -> Self {
620 0 : Self {
621 0 : cancel: CancellationToken::new(),
622 0 : threshold,
623 0 : interval,
624 0 : }
625 0 : }
626 :
627 : /// Spawns a task to periodically reap idle resources from the given task pool. The task is
628 : /// cancelled when the reaper is dropped.
629 0 : pub fn spawn(&self, pool: &Arc<impl Reapable>) {
630 : // NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
631 0 : let pool = Arc::downgrade(pool);
632 0 : let cancel = self.cancel.clone();
633 0 : let (interval, threshold) = (self.interval, self.threshold);
634 :
635 0 : tokio::spawn(async move {
636 : loop {
637 0 : tokio::select! {
638 0 : _ = tokio::time::sleep(interval) => {
639 0 : let Some(pool) = pool.upgrade() else {
640 0 : return; // pool was dropped
641 : };
642 0 : pool.reap_idle(Instant::now() - threshold);
643 : }
644 :
645 0 : _ = cancel.cancelled() => return,
646 : }
647 : }
648 0 : });
649 0 : }
650 : }
651 :
652 : impl Drop for Reaper {
653 0 : fn drop(&mut self) {
654 0 : self.cancel.cancel(); // cancel reaper task
655 0 : }
656 : }
657 :
658 : /// A reapable resource pool.
659 : trait Reapable: Send + Sync + 'static {
660 : /// Reaps resources that have been idle since before the given cutoff.
661 : fn reap_idle(&self, cutoff: Instant);
662 : }
|