LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - pool.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 243 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 31 0

            Line data    Source code
       1              : //! This module provides various Pageserver gRPC client resource pools.
       2              : //!
       3              : //! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
       4              : //! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
       5              : //! of creating dedicated TCP connections and server tasks for every Postgres backend.
       6              : //!
       7              : //! Each resource has its own, nested pool. The pools are custom-built for the properties of each
       8              : //! resource -- they are different enough that a generic pool isn't suitable.
       9              : //!
      10              : //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
      11              : //!   can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
      12              : //!   per-channel client limit. Channels are closed immediately when empty, and indirectly rely on
      13              : //!   client/stream idle timeouts.
      14              : //!
      15              : //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
      16              : //!   channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
      17              : //!   single caller at a time, and is returned to the pool when dropped. Idle clients are removed
      18              : //!   from the pool after a while to free up resources.
      19              : //!
      20              : //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
      21              : //!   ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a
      22              : //!   time, and is returned to the pool when dropped. Idle streams are removed from the pool after
      23              : //!   a while to free up resources.
      24              : //!
      25              : //!   The stream only supports sending a single, synchronous request at a time, and does not support
      26              : //!   pipelining multiple requests from different callers onto the same stream -- instead, we scale
      27              : //!   out concurrent streams to improve throughput. There are many reasons for this design choice:
      28              : //!
      29              : //!     * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by
      30              : //!       a single server task, which may block e.g. on layer downloads, LSN waits, etc.
      31              : //!
      32              : //!     * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away
      33              : //!       (e.g. because of a timeout), the request would still be processed by the server and block
      34              : //!       requests behind it in the stream. It might even block its own timeout retry.
      35              : //!
      36              : //!     * Stream scheduling becomes significantly simpler and cheaper.
      37              : //!
      38              : //!     * Individual callers can still use client-side batching for pipelining.
      39              : //!
      40              : //!     * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
      41              : //!       per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
      42              : //!
      43              : //! Each channel corresponds to one TCP connection. Each client unary request and each stream
      44              : //! corresponds to one HTTP/2 stream and server task.
      45              : //!
      46              : //! TODO: error handling (including custom error types).
      47              : //! TODO: observability.
      48              : 
      49              : use std::collections::BTreeMap;
      50              : use std::num::NonZero;
      51              : use std::ops::{Deref, DerefMut};
      52              : use std::pin::Pin;
      53              : use std::sync::atomic::{AtomicUsize, Ordering};
      54              : use std::sync::{Arc, Mutex, Weak};
      55              : use std::time::{Duration, Instant};
      56              : 
      57              : use futures::{Stream, StreamExt as _};
      58              : use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
      59              : use tokio_stream::wrappers::WatchStream;
      60              : use tokio_util::sync::CancellationToken;
      61              : use tonic::codec::CompressionEncoding;
      62              : use tonic::transport::{Channel, Endpoint};
      63              : 
      64              : use pageserver_page_api as page_api;
      65              : use utils::id::{TenantId, TimelineId};
      66              : use utils::shard::ShardIndex;
      67              : 
      68              : /// Reap clients/streams that have been idle for this long. Channels are reaped immediately when
      69              : /// empty, and indirectly rely on the client/stream idle timeouts.
      70              : ///
      71              : /// A stream's client will be reaped after 2x the idle threshold (first stream the client), but
      72              : /// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to
      73              : /// keep its client around in the pool for a while.
      74              : const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
      75              :     false => Duration::from_secs(180),
      76              :     true => Duration::from_secs(1), // exercise reaping in tests
      77              : };
      78              : 
      79              : /// Reap idle resources with this interval.
      80              : const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
      81              :     false => Duration::from_secs(10),
      82              :     true => Duration::from_secs(1), // exercise reaping in tests
      83              : };
      84              : 
      85              : /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
      86              : /// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
      87              : /// The pool does not limit the number of channels, and instead relies on `ClientPool` or
      88              : /// `StreamPool` to limit the number of concurrent clients.
      89              : ///
      90              : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
      91              : ///
      92              : /// TODO: consider prewarming a set of channels, to avoid initial connection latency.
      93              : /// TODO: consider adding a circuit breaker for errors and fail fast.
      94              : pub struct ChannelPool {
      95              :     /// Pageserver endpoint to connect to.
      96              :     endpoint: Endpoint,
      97              :     /// Max number of clients per channel. Beyond this, a new channel will be created.
      98              :     max_clients_per_channel: NonZero<usize>,
      99              :     /// Open channels.
     100              :     channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
     101              :     /// Channel ID generator.
     102              :     next_channel_id: AtomicUsize,
     103              : }
     104              : 
     105              : type ChannelID = usize;
     106              : 
     107              : struct ChannelEntry {
     108              :     /// The gRPC channel (i.e. TCP connection). Shared by multiple clients.
     109              :     channel: Channel,
     110              :     /// Number of clients using this channel.
     111              :     clients: usize,
     112              : }
     113              : 
     114              : impl ChannelPool {
     115              :     /// Creates a new channel pool for the given Pageserver endpoint.
     116            0 :     pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
     117            0 :     where
     118            0 :         E: TryInto<Endpoint> + Send + Sync + 'static,
     119            0 :         <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
     120              :     {
     121            0 :         Ok(Arc::new(Self {
     122            0 :             endpoint: endpoint.try_into()?,
     123            0 :             max_clients_per_channel,
     124            0 :             channels: Mutex::default(),
     125            0 :             next_channel_id: AtomicUsize::default(),
     126              :         }))
     127            0 :     }
     128              : 
     129              :     /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
     130              :     ///
     131              :     /// This never blocks (except for mutex acquisition). The channel is connected lazily on first
     132              :     /// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
     133              :     /// automatically on failure (TODO: verify).
     134              :     ///
     135              :     /// Callers should not clone the returned channel, and must hold onto the returned guard as long
     136              :     /// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
     137              :     /// client requires an owned `Channel` and we don't have access to the channel's internal
     138              :     /// refcount.
     139              :     ///
     140              :     /// This is not performance-sensitive. It is only called when creating a new client, and clients
     141              :     /// are pooled and reused by `ClientPool`. The total number of channels will also be small. O(n)
     142              :     /// performance is therefore okay.
     143            0 :     pub fn get(self: &Arc<Self>) -> ChannelGuard {
     144            0 :         let mut channels = self.channels.lock().unwrap();
     145              : 
     146              :         // Try to find an existing channel with available capacity. We check entries in BTreeMap
     147              :         // order, to fill up the lower-ordered channels first. The client/stream pools also prefer
     148              :         // clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered
     149              :         // channels, and free up higher-ordered channels such that they can be reaped.
     150            0 :         for (&id, entry) in channels.iter_mut() {
     151            0 :             assert!(
     152            0 :                 entry.clients <= self.max_clients_per_channel.get(),
     153            0 :                 "channel overflow"
     154              :             );
     155            0 :             assert_ne!(entry.clients, 0, "empty channel not reaped");
     156            0 :             if entry.clients < self.max_clients_per_channel.get() {
     157            0 :                 entry.clients += 1;
     158            0 :                 return ChannelGuard {
     159            0 :                     pool: Arc::downgrade(self),
     160            0 :                     id,
     161            0 :                     channel: Some(entry.channel.clone()),
     162            0 :                 };
     163            0 :             }
     164              :         }
     165              : 
     166              :         // Create a new channel. We connect lazily on first use, such that we don't block here and
     167              :         // other clients can join onto the same channel while it's connecting.
     168            0 :         let channel = self.endpoint.connect_lazy();
     169              : 
     170            0 :         let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
     171            0 :         let entry = ChannelEntry {
     172            0 :             channel: channel.clone(),
     173            0 :             clients: 1, // account for the guard below
     174            0 :         };
     175            0 :         channels.insert(id, entry);
     176              : 
     177            0 :         ChannelGuard {
     178            0 :             pool: Arc::downgrade(self),
     179            0 :             id,
     180            0 :             channel: Some(channel),
     181            0 :         }
     182            0 :     }
     183              : }
     184              : 
     185              : /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
     186              : /// since the gRPC client requires an owned `Channel`.
     187              : pub struct ChannelGuard {
     188              :     pool: Weak<ChannelPool>,
     189              :     id: ChannelID,
     190              :     channel: Option<Channel>,
     191              : }
     192              : 
     193              : impl ChannelGuard {
     194              :     /// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
     195              :     /// the guard as long as the channel is in use, and should not clone it.
     196            0 :     pub fn take(&mut self) -> Channel {
     197            0 :         self.channel.take().expect("channel already taken")
     198            0 :     }
     199              : }
     200              : 
     201              : /// Returns the channel to the pool. The channel is closed when empty.
     202              : impl Drop for ChannelGuard {
     203            0 :     fn drop(&mut self) {
     204            0 :         let Some(pool) = self.pool.upgrade() else {
     205            0 :             return; // pool was dropped
     206              :         };
     207              : 
     208            0 :         let mut channels = pool.channels.lock().unwrap();
     209            0 :         let entry = channels.get_mut(&self.id).expect("unknown channel");
     210            0 :         assert!(entry.clients > 0, "channel underflow");
     211            0 :         entry.clients -= 1;
     212              : 
     213              :         // Reap empty channels immediately.
     214            0 :         if entry.clients == 0 {
     215            0 :             channels.remove(&self.id);
     216            0 :         }
     217            0 :     }
     218              : }
     219              : 
     220              : /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
     221              : /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
     222              : /// number of concurrent clients to `max_clients` via semaphore.
     223              : ///
     224              : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
     225              : pub struct ClientPool {
     226              :     /// Tenant ID.
     227              :     tenant_id: TenantId,
     228              :     /// Timeline ID.
     229              :     timeline_id: TimelineId,
     230              :     /// Shard ID.
     231              :     shard_id: ShardIndex,
     232              :     /// Authentication token, if any.
     233              :     auth_token: Option<String>,
     234              :     /// Compression to use.
     235              :     compression: Option<CompressionEncoding>,
     236              :     /// Channel pool to acquire channels from.
     237              :     channel_pool: Arc<ChannelPool>,
     238              :     /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
     239              :     limiter: Option<Arc<Semaphore>>,
     240              :     /// Idle pooled clients. Acquired clients are removed from here and returned on drop.
     241              :     ///
     242              :     /// The first client in the map will be acquired next. The map is sorted by client ID, which in
     243              :     /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
     244              :     /// lower-ordered channels. This allows us to free up and reap higher-ordered channels.
     245              :     idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
     246              :     /// Reaps idle clients.
     247              :     idle_reaper: Reaper,
     248              :     /// Unique client ID generator.
     249              :     next_client_id: AtomicUsize,
     250              : }
     251              : 
     252              : type ClientID = (ChannelID, usize);
     253              : 
     254              : struct ClientEntry {
     255              :     /// The pooled gRPC client.
     256              :     client: page_api::Client,
     257              :     /// The channel guard for the channel used by the client.
     258              :     channel_guard: ChannelGuard,
     259              :     /// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
     260              :     /// definition, so this is the time when it was added back to the pool.
     261              :     idle_since: Instant,
     262              : }
     263              : 
     264              : impl ClientPool {
     265              :     /// Creates a new client pool for the given tenant shard. Channels are acquired from the given
     266              :     /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
     267              :     /// `max_clients` concurrent clients, or unbounded if None.
     268            0 :     pub fn new(
     269            0 :         channel_pool: Arc<ChannelPool>,
     270            0 :         tenant_id: TenantId,
     271            0 :         timeline_id: TimelineId,
     272            0 :         shard_id: ShardIndex,
     273            0 :         auth_token: Option<String>,
     274            0 :         compression: Option<CompressionEncoding>,
     275            0 :         max_clients: Option<NonZero<usize>>,
     276            0 :     ) -> Arc<Self> {
     277            0 :         let pool = Arc::new(Self {
     278            0 :             tenant_id,
     279            0 :             timeline_id,
     280            0 :             shard_id,
     281            0 :             auth_token,
     282            0 :             compression,
     283            0 :             channel_pool,
     284            0 :             idle: Mutex::default(),
     285            0 :             idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
     286            0 :             limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
     287            0 :             next_client_id: AtomicUsize::default(),
     288              :         });
     289            0 :         pool.idle_reaper.spawn(&pool);
     290            0 :         pool
     291            0 :     }
     292              : 
     293              :     /// Gets a client from the pool, or creates a new one if necessary. Connections are established
     294              :     /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
     295              :     /// is returned to the pool when the guard is dropped.
     296              :     ///
     297              :     /// This is moderately performance-sensitive. It is called for every unary request, but these
     298              :     /// establish a new gRPC stream per request so they're already expensive. GetPage requests use
     299              :     /// the `StreamPool` instead.
     300            0 :     pub async fn get(self: &Arc<Self>) -> tonic::Result<ClientGuard> {
     301              :         // Acquire a permit if the pool is bounded.
     302            0 :         let mut permit = None;
     303            0 :         if let Some(limiter) = self.limiter.clone() {
     304            0 :             permit = Some(limiter.acquire_owned().await.expect("never closed"));
     305            0 :         }
     306              : 
     307              :         // Fast path: acquire an idle client from the pool.
     308            0 :         if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
     309            0 :             return Ok(ClientGuard {
     310            0 :                 pool: Arc::downgrade(self),
     311            0 :                 id,
     312            0 :                 client: Some(entry.client),
     313            0 :                 channel_guard: Some(entry.channel_guard),
     314            0 :                 permit,
     315            0 :             });
     316            0 :         }
     317              : 
     318              :         // Construct a new client.
     319            0 :         let mut channel_guard = self.channel_pool.get();
     320            0 :         let client = page_api::Client::new(
     321            0 :             channel_guard.take(),
     322            0 :             self.tenant_id,
     323            0 :             self.timeline_id,
     324            0 :             self.shard_id,
     325            0 :             self.auth_token.clone(),
     326            0 :             self.compression,
     327              :         )
     328            0 :         .map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?;
     329              : 
     330            0 :         Ok(ClientGuard {
     331            0 :             pool: Arc::downgrade(self),
     332            0 :             id: (
     333            0 :                 channel_guard.id,
     334            0 :                 self.next_client_id.fetch_add(1, Ordering::Relaxed),
     335            0 :             ),
     336            0 :             client: Some(client),
     337            0 :             channel_guard: Some(channel_guard),
     338            0 :             permit,
     339            0 :         })
     340            0 :     }
     341              : }
     342              : 
     343              : impl Reapable for ClientPool {
     344              :     /// Reaps clients that have been idle since before the cutoff.
     345            0 :     fn reap_idle(&self, cutoff: Instant) {
     346            0 :         self.idle
     347            0 :             .lock()
     348            0 :             .unwrap()
     349            0 :             .retain(|_, entry| entry.idle_since >= cutoff)
     350            0 :     }
     351              : }
     352              : 
     353              : /// A client acquired from the pool. The inner client can be accessed via Deref. The client is
     354              : /// returned to the pool when dropped.
     355              : pub struct ClientGuard {
     356              :     pool: Weak<ClientPool>,
     357              :     id: ClientID,
     358              :     client: Option<page_api::Client>,     // Some until dropped
     359              :     channel_guard: Option<ChannelGuard>,  // Some until dropped
     360              :     permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
     361              : }
     362              : 
     363              : impl Deref for ClientGuard {
     364              :     type Target = page_api::Client;
     365              : 
     366            0 :     fn deref(&self) -> &Self::Target {
     367            0 :         self.client.as_ref().expect("not dropped")
     368            0 :     }
     369              : }
     370              : 
     371              : impl DerefMut for ClientGuard {
     372            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     373            0 :         self.client.as_mut().expect("not dropped")
     374            0 :     }
     375              : }
     376              : 
     377              : /// Returns the client to the pool.
     378              : impl Drop for ClientGuard {
     379            0 :     fn drop(&mut self) {
     380            0 :         let Some(pool) = self.pool.upgrade() else {
     381            0 :             return; // pool was dropped
     382              :         };
     383              : 
     384            0 :         let entry = ClientEntry {
     385            0 :             client: self.client.take().expect("dropped once"),
     386            0 :             channel_guard: self.channel_guard.take().expect("dropped once"),
     387            0 :             idle_since: Instant::now(),
     388            0 :         };
     389            0 :         pool.idle.lock().unwrap().insert(self.id, entry);
     390              : 
     391            0 :         _ = self.permit; // returned on drop, referenced for visibility
     392            0 :     }
     393              : }
     394              : 
     395              : /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
     396              : /// acquires a client from the inner `ClientPool` for the stream's lifetime.
     397              : ///
     398              : /// Individual streams only send a single request at a time, and do not pipeline multiple callers
     399              : /// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily
     400              : /// to eliminate head-of-line blocking. See the module documentation for more details.
     401              : ///
     402              : /// TODO: consider making this generic over request and response types; not currently needed.
     403              : pub struct StreamPool {
     404              :     /// The client pool to acquire clients from. Must be unbounded.
     405              :     client_pool: Arc<ClientPool>,
     406              :     /// Idle pooled streams. Acquired streams are removed from here and returned on drop.
     407              :     ///
     408              :     /// The first stream in the map will be acquired next. The map is sorted by stream ID, which is
     409              :     /// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer
     410              :     /// acquiring idle streams from lower-ordered channels, which allows us to free up and reap
     411              :     /// higher-ordered channels.
     412              :     idle: Mutex<BTreeMap<StreamID, StreamEntry>>,
     413              :     /// Limits the max number of concurrent streams. None if the pool is unbounded.
     414              :     limiter: Option<Arc<Semaphore>>,
     415              :     /// Reaps idle streams.
     416              :     idle_reaper: Reaper,
     417              : }
     418              : 
     419              : /// The stream ID. Reuses the inner client ID.
     420              : type StreamID = ClientID;
     421              : 
     422              : /// A pooled stream.
     423              : struct StreamEntry {
     424              :     /// The bidirectional stream.
     425              :     stream: BiStream,
     426              :     /// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`.
     427              :     idle_since: Instant,
     428              : }
     429              : 
     430              : /// A bidirectional GetPage stream and its client. Can send requests and receive responses.
     431              : struct BiStream {
     432              :     /// The owning client. Holds onto the channel slot while the stream is alive.
     433              :     client: ClientGuard,
     434              :     /// Stream for sending requests. Uses a watch channel, so it can only send a single request at a
     435              :     /// time, and the caller must await the response before sending another request. This is
     436              :     /// enforced by `StreamGuard::send`.
     437              :     sender: watch::Sender<page_api::GetPageRequest>,
     438              :     /// Stream for receiving responses.
     439              :     receiver: Pin<Box<dyn Stream<Item = tonic::Result<page_api::GetPageResponse>> + Send>>,
     440              : }
     441              : 
     442              : impl StreamPool {
     443              :     /// Creates a new stream pool, using the given client pool. It will use up to `max_streams`
     444              :     /// concurrent streams.
     445              :     ///
     446              :     /// The client pool must be unbounded. The stream pool will enforce its own limits, and because
     447              :     /// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
     448              :     /// The stream pool should generally have its own dedicated client pool (but it can share a
     449              :     /// channel pool with others since these are always unbounded).
     450            0 :     pub fn new(client_pool: Arc<ClientPool>, max_streams: Option<NonZero<usize>>) -> Arc<Self> {
     451            0 :         assert!(client_pool.limiter.is_none(), "bounded client pool");
     452            0 :         let pool = Arc::new(Self {
     453            0 :             client_pool,
     454            0 :             idle: Mutex::default(),
     455            0 :             limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))),
     456            0 :             idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
     457              :         });
     458            0 :         pool.idle_reaper.spawn(&pool);
     459            0 :         pool
     460            0 :     }
     461              : 
     462              :     /// Acquires an available stream from the pool, or spins up a new stream if all streams are
     463              :     /// full. Returns a guard that can be used to send requests and await the responses. Blocks if
     464              :     /// the pool is full.
     465              :     ///
     466              :     /// This is very performance-sensitive, as it is on the GetPage hot path.
     467              :     ///
     468              :     /// TODO: is a `Mutex<BTreeMap>` performant enough? Will it become too contended? We can't
     469              :     /// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first
     470              :     /// to free up higher-ordered channels.
     471            0 :     pub async fn get(self: &Arc<Self>) -> tonic::Result<StreamGuard> {
     472              :         // Acquire a permit if the pool is bounded.
     473            0 :         let mut permit = None;
     474            0 :         if let Some(limiter) = self.limiter.clone() {
     475            0 :             permit = Some(limiter.acquire_owned().await.expect("never closed"));
     476            0 :         }
     477              : 
     478              :         // Fast path: acquire an idle stream from the pool.
     479            0 :         if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() {
     480            0 :             return Ok(StreamGuard {
     481            0 :                 pool: Arc::downgrade(self),
     482            0 :                 stream: Some(entry.stream),
     483            0 :                 can_reuse: true,
     484            0 :                 permit,
     485            0 :             });
     486            0 :         }
     487              : 
     488              :         // Spin up a new stream. Uses a watch channel to send a single request at a time, since
     489              :         // `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead.
     490            0 :         let mut client = self.client_pool.get().await?;
     491              : 
     492            0 :         let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default());
     493            0 :         let req_stream = WatchStream::from_changes(req_rx);
     494            0 :         let resp_stream = client.get_pages(req_stream).await?;
     495              : 
     496            0 :         Ok(StreamGuard {
     497            0 :             pool: Arc::downgrade(self),
     498            0 :             stream: Some(BiStream {
     499            0 :                 client,
     500            0 :                 sender: req_tx,
     501            0 :                 receiver: Box::pin(resp_stream),
     502            0 :             }),
     503            0 :             can_reuse: true,
     504            0 :             permit,
     505            0 :         })
     506            0 :     }
     507              : }
     508              : 
     509              : impl Reapable for StreamPool {
     510              :     /// Reaps streams that have been idle since before the cutoff.
     511            0 :     fn reap_idle(&self, cutoff: Instant) {
     512            0 :         self.idle
     513            0 :             .lock()
     514            0 :             .unwrap()
     515            0 :             .retain(|_, entry| entry.idle_since >= cutoff);
     516            0 :     }
     517              : }
     518              : 
     519              : /// A stream acquired from the pool. Returned to the pool when dropped, unless there are still
     520              : /// in-flight requests on the stream, or the stream failed.
     521              : pub struct StreamGuard {
     522              :     pool: Weak<StreamPool>,
     523              :     stream: Option<BiStream>,             // Some until dropped
     524              :     can_reuse: bool,                      // returned to pool if true
     525              :     permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
     526              : }
     527              : 
     528              : impl StreamGuard {
     529              :     /// Sends a request on the stream and awaits the response. If the future is dropped before it
     530              :     /// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the
     531              :     /// request and is not returned to the pool. The same is true if the stream errors, in which
     532              :     /// case the caller can't send further requests on the stream.
     533              :     ///
     534              :     /// We only support sending a single request at a time, to eliminate head-of-line blocking. See
     535              :     /// module documentation for details.
     536              :     ///
     537              :     /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
     538              :     /// to avoid tearing down the stream for per-request errors. Callers must check this.
     539            0 :     pub async fn send(
     540            0 :         &mut self,
     541            0 :         req: page_api::GetPageRequest,
     542            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     543            0 :         let req_id = req.request_id;
     544            0 :         let stream = self.stream.as_mut().expect("not dropped");
     545              : 
     546              :         // Mark the stream as not reusable while the request is in flight. We can't return the
     547              :         // stream to the pool until we receive the response, to avoid head-of-line blocking and
     548              :         // stale responses. Failed streams can't be reused either.
     549            0 :         if !self.can_reuse {
     550            0 :             return Err(tonic::Status::internal("stream can't be reused"));
     551            0 :         }
     552            0 :         self.can_reuse = false;
     553              : 
     554              :         // Send the request and receive the response.
     555              :         //
     556              :         // NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests.
     557            0 :         stream
     558            0 :             .sender
     559            0 :             .send(req)
     560            0 :             .map_err(|_| tonic::Status::unavailable("stream closed"))?;
     561              : 
     562            0 :         let resp = stream
     563            0 :             .receiver
     564            0 :             .next()
     565            0 :             .await
     566            0 :             .ok_or_else(|| tonic::Status::unavailable("stream closed"))??;
     567              : 
     568            0 :         if resp.request_id != req_id {
     569            0 :             return Err(tonic::Status::internal(format!(
     570            0 :                 "response ID {} does not match request ID {}",
     571            0 :                 resp.request_id, req_id
     572            0 :             )));
     573            0 :         }
     574              : 
     575              :         // Success, mark the stream as reusable.
     576            0 :         self.can_reuse = true;
     577              : 
     578            0 :         Ok(resp)
     579            0 :     }
     580              : }
     581              : 
     582              : impl Drop for StreamGuard {
     583            0 :     fn drop(&mut self) {
     584            0 :         let Some(pool) = self.pool.upgrade() else {
     585            0 :             return; // pool was dropped
     586              :         };
     587              : 
     588              :         // If the stream isn't reusable, it can't be returned to the pool.
     589            0 :         if !self.can_reuse {
     590            0 :             return;
     591            0 :         }
     592              : 
     593              :         // Place the idle stream back into the pool.
     594            0 :         let entry = StreamEntry {
     595            0 :             stream: self.stream.take().expect("dropped once"),
     596            0 :             idle_since: Instant::now(),
     597            0 :         };
     598            0 :         pool.idle
     599            0 :             .lock()
     600            0 :             .unwrap()
     601            0 :             .insert(entry.stream.client.id, entry);
     602              : 
     603            0 :         _ = self.permit; // returned on drop, referenced for visibility
     604            0 :     }
     605              : }
     606              : 
     607              : /// Periodically reaps idle resources from a pool.
     608              : struct Reaper {
     609              :     /// The task check interval.
     610              :     interval: Duration,
     611              :     /// The threshold for reaping idle resources.
     612              :     threshold: Duration,
     613              :     /// Cancels the reaper task. Cancelled when the reaper is dropped.
     614              :     cancel: CancellationToken,
     615              : }
     616              : 
     617              : impl Reaper {
     618              :     /// Creates a new reaper.
     619            0 :     pub fn new(threshold: Duration, interval: Duration) -> Self {
     620            0 :         Self {
     621            0 :             cancel: CancellationToken::new(),
     622            0 :             threshold,
     623            0 :             interval,
     624            0 :         }
     625            0 :     }
     626              : 
     627              :     /// Spawns a task to periodically reap idle resources from the given task pool. The task is
     628              :     /// cancelled when the reaper is dropped.
     629            0 :     pub fn spawn(&self, pool: &Arc<impl Reapable>) {
     630              :         // NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
     631            0 :         let pool = Arc::downgrade(pool);
     632            0 :         let cancel = self.cancel.clone();
     633            0 :         let (interval, threshold) = (self.interval, self.threshold);
     634              : 
     635            0 :         tokio::spawn(async move {
     636              :             loop {
     637            0 :                 tokio::select! {
     638            0 :                     _ = tokio::time::sleep(interval) => {
     639            0 :                         let Some(pool) = pool.upgrade() else {
     640            0 :                             return; // pool was dropped
     641              :                         };
     642            0 :                         pool.reap_idle(Instant::now() - threshold);
     643              :                     }
     644              : 
     645            0 :                     _ = cancel.cancelled() => return,
     646              :                 }
     647              :             }
     648            0 :         });
     649            0 :     }
     650              : }
     651              : 
     652              : impl Drop for Reaper {
     653            0 :     fn drop(&mut self) {
     654            0 :         self.cancel.cancel(); // cancel reaper task
     655            0 :     }
     656              : }
     657              : 
     658              : /// A reapable resource pool.
     659              : trait Reapable: Send + Sync + 'static {
     660              :     /// Reaps resources that have been idle since before the given cutoff.
     661              :     fn reap_idle(&self, cutoff: Instant);
     662              : }
        

Generated by: LCOV version 2.1-beta