LCOV - code coverage report
Current view: top level - pageserver/client_grpc/src - pool.rs (source / functions) Coverage Total Hit
Test: 15f04989d2faf4ce76cecb56042184aca56ebae6.info Lines: 0.0 % 311 0
Test Date: 2025-07-14 11:50:36 Functions: 0.0 % 38 0

            Line data    Source code
       1              : //! This module provides various Pageserver gRPC client resource pools.
       2              : //!
       3              : //! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
       4              : //! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
       5              : //! of creating dedicated TCP connections and server tasks for every Postgres backend.
       6              : //!
       7              : //! Each resource has its own, nested pool. The pools are custom-built for the properties of each
       8              : //! resource -- they are different enough that a generic pool isn't suitable.
       9              : //!
      10              : //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
      11              : //!   can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
      12              : //!   per-channel client limit. Channels may be closed when they are no longer used by any clients.
      13              : //!
      14              : //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
      15              : //!   channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
      16              : //!   single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
      17              : //!   from the pool after some time, to free up the channel.
      18              : //!
      19              : //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
      20              : //!   ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
      21              : //!   returns a guard that can be used to send a single request, to properly enforce queue depth and
      22              : //!   route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
      23              : //!   possibly pipelining multiple requests from multiple callers on the same stream (up to some
      24              : //!   queue depth). Idle streams may be removed from the pool after a while to free up the client.
      25              : //!
      26              : //! Each channel corresponds to one TCP connection. Each client unary request and each stream
      27              : //! corresponds to one HTTP/2 stream and server task.
      28              : //!
      29              : //! TODO: error handling (including custom error types).
      30              : //! TODO: observability.
      31              : 
      32              : use std::collections::{BTreeMap, HashMap};
      33              : use std::num::NonZero;
      34              : use std::ops::{Deref, DerefMut};
      35              : use std::sync::atomic::{AtomicUsize, Ordering};
      36              : use std::sync::{Arc, Mutex, Weak};
      37              : use std::time::{Duration, Instant};
      38              : 
      39              : use futures::StreamExt as _;
      40              : use tokio::sync::mpsc::{Receiver, Sender};
      41              : use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
      42              : use tokio_util::sync::CancellationToken;
      43              : use tonic::codec::CompressionEncoding;
      44              : use tonic::transport::{Channel, Endpoint};
      45              : use tracing::{error, warn};
      46              : 
      47              : use pageserver_page_api as page_api;
      48              : use utils::id::{TenantId, TimelineId};
      49              : use utils::shard::ShardIndex;
      50              : 
      51              : /// Reap channels/clients/streams that have been idle for this long.
      52              : ///
      53              : /// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
      54              : /// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
      55              : /// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
      56              : /// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
      57              : /// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
      58              : /// channels, and/or stream pool clients.
      59              : const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
      60              :     false => Duration::from_secs(180),
      61              :     true => Duration::from_secs(1), // exercise reaping in tests
      62              : };
      63              : 
      64              : /// Reap idle resources with this interval.
      65              : const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
      66              :     false => Duration::from_secs(10),
      67              :     true => Duration::from_secs(1), // exercise reaping in tests
      68              : };
      69              : 
      70              : /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
      71              : /// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
      72              : /// The pool does not limit the number of channels, and instead relies on `ClientPool` or
      73              : /// `StreamPool` to limit the number of concurrent clients.
      74              : ///
      75              : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
      76              : ///
      77              : /// TODO: consider prewarming a set of channels, to avoid initial connection latency.
      78              : /// TODO: consider adding a circuit breaker for errors and fail fast.
      79              : pub struct ChannelPool {
      80              :     /// Pageserver endpoint to connect to.
      81              :     endpoint: Endpoint,
      82              :     /// Max number of clients per channel. Beyond this, a new channel will be created.
      83              :     max_clients_per_channel: NonZero<usize>,
      84              :     /// Open channels.
      85              :     channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
      86              :     /// Reaps idle channels.
      87              :     idle_reaper: Reaper,
      88              :     /// Channel ID generator.
      89              :     next_channel_id: AtomicUsize,
      90              : }
      91              : 
      92              : type ChannelID = usize;
      93              : 
      94              : struct ChannelEntry {
      95              :     /// The gRPC channel (i.e. TCP connection). Shared by multiple clients.
      96              :     channel: Channel,
      97              :     /// Number of clients using this channel.
      98              :     clients: usize,
      99              :     /// The channel has been idle (no clients) since this time. None if channel is in use.
     100              :     /// INVARIANT: Some if clients == 0, otherwise None.
     101              :     idle_since: Option<Instant>,
     102              : }
     103              : 
     104              : impl ChannelPool {
     105              :     /// Creates a new channel pool for the given Pageserver endpoint.
     106            0 :     pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
     107            0 :     where
     108            0 :         E: TryInto<Endpoint> + Send + Sync + 'static,
     109            0 :         <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
     110              :     {
     111            0 :         let pool = Arc::new(Self {
     112            0 :             endpoint: endpoint.try_into()?,
     113            0 :             max_clients_per_channel,
     114            0 :             channels: Mutex::default(),
     115            0 :             idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
     116            0 :             next_channel_id: AtomicUsize::default(),
     117              :         });
     118            0 :         pool.idle_reaper.spawn(&pool);
     119            0 :         Ok(pool)
     120            0 :     }
     121              : 
     122              :     /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
     123              :     ///
     124              :     /// This never blocks (except for mutex acquisition). The channel is connected lazily on first
     125              :     /// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
     126              :     /// automatically on failure (TODO: verify).
     127              :     ///
     128              :     /// Callers should not clone the returned channel, and must hold onto the returned guard as long
     129              :     /// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
     130              :     /// client requires an owned `Channel` and we don't have access to the channel's internal
     131              :     /// refcount.
     132              :     ///
     133              :     /// This is not performance-sensitive. It is only called when creating a new client, and clients
     134              :     /// are pooled and reused by `ClientPool`. The total number of channels will also be small. O(n)
     135              :     /// performance is therefore okay.
     136            0 :     pub fn get(self: &Arc<Self>) -> ChannelGuard {
     137            0 :         let mut channels = self.channels.lock().unwrap();
     138              : 
     139              :         // Try to find an existing channel with available capacity. We check entries in BTreeMap
     140              :         // order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
     141              :         // with lower-ordered channel IDs first. This will cluster clients in lower-ordered
     142              :         // channels, and free up higher-ordered channels such that they can be reaped.
     143            0 :         for (&id, entry) in channels.iter_mut() {
     144            0 :             assert!(
     145            0 :                 entry.clients <= self.max_clients_per_channel.get(),
     146            0 :                 "channel overflow"
     147              :             );
     148            0 :             assert_eq!(
     149            0 :                 entry.idle_since.is_some(),
     150            0 :                 entry.clients == 0,
     151            0 :                 "incorrect channel idle state"
     152              :             );
     153            0 :             if entry.clients < self.max_clients_per_channel.get() {
     154            0 :                 entry.clients += 1;
     155            0 :                 entry.idle_since = None;
     156            0 :                 return ChannelGuard {
     157            0 :                     pool: Arc::downgrade(self),
     158            0 :                     id,
     159            0 :                     channel: Some(entry.channel.clone()),
     160            0 :                 };
     161            0 :             }
     162              :         }
     163              : 
     164              :         // Create a new channel. We connect lazily on first use, such that we don't block here and
     165              :         // other clients can join onto the same channel while it's connecting.
     166            0 :         let channel = self.endpoint.connect_lazy();
     167              : 
     168            0 :         let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
     169            0 :         let entry = ChannelEntry {
     170            0 :             channel: channel.clone(),
     171            0 :             clients: 1, // account for the guard below
     172            0 :             idle_since: None,
     173            0 :         };
     174            0 :         channels.insert(id, entry);
     175              : 
     176            0 :         ChannelGuard {
     177            0 :             pool: Arc::downgrade(self),
     178            0 :             id,
     179            0 :             channel: Some(channel),
     180            0 :         }
     181            0 :     }
     182              : }
     183              : 
     184              : impl Reapable for ChannelPool {
     185              :     /// Reaps channels that have been idle since before the cutoff.
     186            0 :     fn reap_idle(&self, cutoff: Instant) {
     187            0 :         self.channels.lock().unwrap().retain(|_, entry| {
     188            0 :             let Some(idle_since) = entry.idle_since else {
     189            0 :                 assert_ne!(entry.clients, 0, "empty channel not marked idle");
     190            0 :                 return true;
     191              :             };
     192            0 :             assert_eq!(entry.clients, 0, "idle channel has clients");
     193            0 :             idle_since >= cutoff
     194            0 :         })
     195            0 :     }
     196              : }
     197              : 
     198              : /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
     199              : /// since the gRPC client requires an owned `Channel`.
     200              : pub struct ChannelGuard {
     201              :     pool: Weak<ChannelPool>,
     202              :     id: ChannelID,
     203              :     channel: Option<Channel>,
     204              : }
     205              : 
     206              : impl ChannelGuard {
     207              :     /// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
     208              :     /// the guard as long as the channel is in use, and should not clone it.
     209            0 :     pub fn take(&mut self) -> Channel {
     210            0 :         self.channel.take().expect("channel already taken")
     211            0 :     }
     212              : }
     213              : 
     214              : /// Returns the channel to the pool.
     215              : impl Drop for ChannelGuard {
     216            0 :     fn drop(&mut self) {
     217            0 :         let Some(pool) = self.pool.upgrade() else {
     218            0 :             return; // pool was dropped
     219              :         };
     220              : 
     221            0 :         let mut channels = pool.channels.lock().unwrap();
     222            0 :         let entry = channels.get_mut(&self.id).expect("unknown channel");
     223            0 :         assert!(entry.idle_since.is_none(), "active channel marked idle");
     224            0 :         assert!(entry.clients > 0, "channel underflow");
     225            0 :         entry.clients -= 1;
     226            0 :         if entry.clients == 0 {
     227            0 :             entry.idle_since = Some(Instant::now()); // mark channel as idle
     228            0 :         }
     229            0 :     }
     230              : }
     231              : 
     232              : /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
     233              : /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
     234              : /// number of concurrent clients to `max_clients` via semaphore.
     235              : ///
     236              : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
     237              : pub struct ClientPool {
     238              :     /// Tenant ID.
     239              :     tenant_id: TenantId,
     240              :     /// Timeline ID.
     241              :     timeline_id: TimelineId,
     242              :     /// Shard ID.
     243              :     shard_id: ShardIndex,
     244              :     /// Authentication token, if any.
     245              :     auth_token: Option<String>,
     246              :     /// Compression to use.
     247              :     compression: Option<CompressionEncoding>,
     248              :     /// Channel pool to acquire channels from.
     249              :     channel_pool: Arc<ChannelPool>,
     250              :     /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
     251              :     limiter: Option<Arc<Semaphore>>,
     252              :     /// Idle pooled clients. Acquired clients are removed from here and returned on drop.
     253              :     ///
     254              :     /// The first client in the map will be acquired next. The map is sorted by client ID, which in
     255              :     /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
     256              :     /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
     257              :     /// clients are reaped.
     258              :     idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
     259              :     /// Reaps idle clients.
     260              :     idle_reaper: Reaper,
     261              :     /// Unique client ID generator.
     262              :     next_client_id: AtomicUsize,
     263              : }
     264              : 
     265              : type ClientID = (ChannelID, usize);
     266              : 
     267              : struct ClientEntry {
     268              :     /// The pooled gRPC client.
     269              :     client: page_api::Client,
     270              :     /// The channel guard for the channel used by the client.
     271              :     channel_guard: ChannelGuard,
     272              :     /// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
     273              :     /// definition, so this is the time when it was added back to the pool.
     274              :     idle_since: Instant,
     275              : }
     276              : 
     277              : impl ClientPool {
     278              :     /// Creates a new client pool for the given tenant shard. Channels are acquired from the given
     279              :     /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
     280              :     /// `max_clients` concurrent clients, or unbounded if None.
     281            0 :     pub fn new(
     282            0 :         channel_pool: Arc<ChannelPool>,
     283            0 :         tenant_id: TenantId,
     284            0 :         timeline_id: TimelineId,
     285            0 :         shard_id: ShardIndex,
     286            0 :         auth_token: Option<String>,
     287            0 :         compression: Option<CompressionEncoding>,
     288            0 :         max_clients: Option<NonZero<usize>>,
     289            0 :     ) -> Arc<Self> {
     290            0 :         let pool = Arc::new(Self {
     291            0 :             tenant_id,
     292            0 :             timeline_id,
     293            0 :             shard_id,
     294            0 :             auth_token,
     295            0 :             compression,
     296            0 :             channel_pool,
     297            0 :             idle: Mutex::default(),
     298            0 :             idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
     299            0 :             limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
     300            0 :             next_client_id: AtomicUsize::default(),
     301              :         });
     302            0 :         pool.idle_reaper.spawn(&pool);
     303            0 :         pool
     304            0 :     }
     305              : 
     306              :     /// Gets a client from the pool, or creates a new one if necessary. Connections are established
     307              :     /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
     308              :     /// is returned to the pool when the guard is dropped.
     309              :     ///
     310              :     /// This is moderately performance-sensitive. It is called for every unary request, but these
     311              :     /// establish a new gRPC stream per request so they're already expensive. GetPage requests use
     312              :     /// the `StreamPool` instead.
     313            0 :     pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
     314              :         // Acquire a permit if the pool is bounded.
     315            0 :         let mut permit = None;
     316            0 :         if let Some(limiter) = self.limiter.clone() {
     317            0 :             permit = Some(limiter.acquire_owned().await.expect("never closed"));
     318            0 :         }
     319              : 
     320              :         // Fast path: acquire an idle client from the pool.
     321            0 :         if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
     322            0 :             return Ok(ClientGuard {
     323            0 :                 pool: Arc::downgrade(self),
     324            0 :                 id,
     325            0 :                 client: Some(entry.client),
     326            0 :                 channel_guard: Some(entry.channel_guard),
     327            0 :                 permit,
     328            0 :             });
     329            0 :         }
     330              : 
     331              :         // Slow path: construct a new client.
     332            0 :         let mut channel_guard = self.channel_pool.get();
     333            0 :         let client = page_api::Client::new(
     334            0 :             channel_guard.take(),
     335            0 :             self.tenant_id,
     336            0 :             self.timeline_id,
     337            0 :             self.shard_id,
     338            0 :             self.auth_token.clone(),
     339            0 :             self.compression,
     340            0 :         )?;
     341              : 
     342            0 :         Ok(ClientGuard {
     343            0 :             pool: Arc::downgrade(self),
     344            0 :             id: (
     345            0 :                 channel_guard.id,
     346            0 :                 self.next_client_id.fetch_add(1, Ordering::Relaxed),
     347            0 :             ),
     348            0 :             client: Some(client),
     349            0 :             channel_guard: Some(channel_guard),
     350            0 :             permit,
     351            0 :         })
     352            0 :     }
     353              : }
     354              : 
     355              : impl Reapable for ClientPool {
     356              :     /// Reaps clients that have been idle since before the cutoff.
     357            0 :     fn reap_idle(&self, cutoff: Instant) {
     358            0 :         self.idle
     359            0 :             .lock()
     360            0 :             .unwrap()
     361            0 :             .retain(|_, entry| entry.idle_since >= cutoff)
     362            0 :     }
     363              : }
     364              : 
     365              : /// A client acquired from the pool. The inner client can be accessed via Deref. The client is
     366              : /// returned to the pool when dropped.
     367              : pub struct ClientGuard {
     368              :     pool: Weak<ClientPool>,
     369              :     id: ClientID,
     370              :     client: Option<page_api::Client>,     // Some until dropped
     371              :     channel_guard: Option<ChannelGuard>,  // Some until dropped
     372              :     permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
     373              : }
     374              : 
     375              : impl Deref for ClientGuard {
     376              :     type Target = page_api::Client;
     377              : 
     378            0 :     fn deref(&self) -> &Self::Target {
     379            0 :         self.client.as_ref().expect("not dropped")
     380            0 :     }
     381              : }
     382              : 
     383              : impl DerefMut for ClientGuard {
     384            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     385            0 :         self.client.as_mut().expect("not dropped")
     386            0 :     }
     387              : }
     388              : 
     389              : /// Returns the client to the pool.
     390              : impl Drop for ClientGuard {
     391            0 :     fn drop(&mut self) {
     392            0 :         let Some(pool) = self.pool.upgrade() else {
     393            0 :             return; // pool was dropped
     394              :         };
     395              : 
     396            0 :         let entry = ClientEntry {
     397            0 :             client: self.client.take().expect("dropped once"),
     398            0 :             channel_guard: self.channel_guard.take().expect("dropped once"),
     399            0 :             idle_since: Instant::now(),
     400            0 :         };
     401            0 :         pool.idle.lock().unwrap().insert(self.id, entry);
     402              : 
     403            0 :         _ = self.permit; // returned on drop, referenced for visibility
     404            0 :     }
     405              : }
     406              : 
     407              : /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
     408              : /// acquires a client from the inner `ClientPool` for the stream's lifetime.
     409              : ///
     410              : /// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
     411              : /// a single request and await the response. Internally, requests are multiplexed across streams and
     412              : /// channels. This allows proper queue depth enforcement and response routing.
     413              : ///
     414              : /// TODO: consider making this generic over request and response types; not currently needed.
     415              : pub struct StreamPool {
     416              :     /// The client pool to acquire clients from. Must be unbounded.
     417              :     client_pool: Arc<ClientPool>,
     418              :     /// All pooled streams.
     419              :     ///
     420              :     /// Incoming requests will be sent over an existing stream with available capacity. If all
     421              :     /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
     422              :     /// stream has an associated Tokio task that processes requests and responses.
     423              :     streams: Mutex<HashMap<StreamID, StreamEntry>>,
     424              :     /// The max number of concurrent streams, or None if unbounded.
     425              :     max_streams: Option<NonZero<usize>>,
     426              :     /// The max number of concurrent requests per stream.
     427              :     max_queue_depth: NonZero<usize>,
     428              :     /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
     429              :     /// None if the pool is unbounded.
     430              :     limiter: Option<Arc<Semaphore>>,
     431              :     /// Reaps idle streams.
     432              :     idle_reaper: Reaper,
     433              :     /// Stream ID generator.
     434              :     next_stream_id: AtomicUsize,
     435              : }
     436              : 
     437              : type StreamID = usize;
     438              : type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
     439              : type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
     440              : type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
     441              : 
     442              : struct StreamEntry {
     443              :     /// Sends caller requests to the stream task. The stream task exits when this is dropped.
     444              :     sender: RequestSender,
     445              :     /// Number of in-flight requests on this stream.
     446              :     queue_depth: usize,
     447              :     /// The time when this stream went idle (queue_depth == 0).
     448              :     /// INVARIANT: Some if queue_depth == 0, otherwise None.
     449              :     idle_since: Option<Instant>,
     450              : }
     451              : 
     452              : impl StreamPool {
     453              :     /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
     454              :     /// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
     455              :     ///
     456              :     /// The client pool must be unbounded. The stream pool will enforce its own limits, and because
     457              :     /// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
     458              :     /// The stream pool should generally have its own dedicated client pool (but it can share a
     459              :     /// channel pool with others since these are always unbounded).
     460            0 :     pub fn new(
     461            0 :         client_pool: Arc<ClientPool>,
     462            0 :         max_streams: Option<NonZero<usize>>,
     463            0 :         max_queue_depth: NonZero<usize>,
     464            0 :     ) -> Arc<Self> {
     465            0 :         assert!(client_pool.limiter.is_none(), "bounded client pool");
     466            0 :         let pool = Arc::new(Self {
     467            0 :             client_pool,
     468            0 :             streams: Mutex::default(),
     469            0 :             limiter: max_streams.map(|max_streams| {
     470            0 :                 Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
     471            0 :             }),
     472            0 :             max_streams,
     473            0 :             max_queue_depth,
     474            0 :             idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
     475            0 :             next_stream_id: AtomicUsize::default(),
     476              :         });
     477            0 :         pool.idle_reaper.spawn(&pool);
     478            0 :         pool
     479            0 :     }
     480              : 
     481              :     /// Acquires an available stream from the pool, or spins up a new stream async if all streams
     482              :     /// are full. Returns a guard that can be used to send a single request on the stream and await
     483              :     /// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
     484              :     /// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
     485              :     ///
     486              :     /// This is very performance-sensitive, as it is on the GetPage hot path.
     487              :     ///
     488              :     /// TODO: this must do something more sophisticated for performance. We want:
     489              :     ///
     490              :     /// * Cheap, concurrent access in the common case where we can use a pooled stream.
     491              :     /// * Quick acquisition of pooled streams with available capacity.
     492              :     /// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
     493              :     /// * Prefer filling up existing streams' queue depth before spinning up new streams.
     494              :     /// * Don't hold a lock while spinning up new streams.
     495              :     /// * Allow concurrent clients to join onto streams while they're spun up.
     496              :     /// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
     497              :     ///
     498              :     /// For now, we just do something simple but inefficient (linear scan under mutex).
     499            0 :     pub async fn get(self: &Arc<Self>) -> StreamGuard {
     500              :         // Acquire a permit if the pool is bounded.
     501            0 :         let mut permit = None;
     502            0 :         if let Some(limiter) = self.limiter.clone() {
     503            0 :             permit = Some(limiter.acquire_owned().await.expect("never closed"));
     504            0 :         }
     505            0 :         let mut streams = self.streams.lock().unwrap();
     506              : 
     507              :         // Look for a pooled stream with available capacity.
     508            0 :         for (&id, entry) in streams.iter_mut() {
     509            0 :             assert!(
     510            0 :                 entry.queue_depth <= self.max_queue_depth.get(),
     511            0 :                 "stream queue overflow"
     512              :             );
     513            0 :             assert_eq!(
     514            0 :                 entry.idle_since.is_some(),
     515            0 :                 entry.queue_depth == 0,
     516            0 :                 "incorrect stream idle state"
     517              :             );
     518            0 :             if entry.queue_depth < self.max_queue_depth.get() {
     519            0 :                 entry.queue_depth += 1;
     520            0 :                 entry.idle_since = None;
     521            0 :                 return StreamGuard {
     522            0 :                     pool: Arc::downgrade(self),
     523            0 :                     id,
     524            0 :                     sender: entry.sender.clone(),
     525            0 :                     permit,
     526            0 :                 };
     527            0 :             }
     528              :         }
     529              : 
     530              :         // No available stream, spin up a new one. We install the stream entry in the pool first and
     531              :         // return the guard, while spinning up the stream task async. This allows other callers to
     532              :         // join onto this stream and also create additional streams concurrently if this fills up.
     533            0 :         let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
     534            0 :         let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
     535            0 :         let entry = StreamEntry {
     536            0 :             sender: req_tx.clone(),
     537            0 :             queue_depth: 1, // reserve quota for this caller
     538            0 :             idle_since: None,
     539            0 :         };
     540            0 :         streams.insert(id, entry);
     541              : 
     542            0 :         if let Some(max_streams) = self.max_streams {
     543            0 :             assert!(streams.len() <= max_streams.get(), "stream overflow");
     544            0 :         };
     545              : 
     546            0 :         let client_pool = self.client_pool.clone();
     547            0 :         let pool = Arc::downgrade(self);
     548              : 
     549            0 :         tokio::spawn(async move {
     550            0 :             if let Err(err) = Self::run_stream(client_pool, req_rx).await {
     551            0 :                 error!("stream failed: {err}");
     552            0 :             }
     553              :             // Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
     554            0 :             if let Some(pool) = pool.upgrade() {
     555            0 :                 let entry = pool.streams.lock().unwrap().remove(&id);
     556            0 :                 assert!(entry.is_some(), "unknown stream ID: {id}");
     557            0 :             }
     558            0 :         });
     559              : 
     560            0 :         StreamGuard {
     561            0 :             pool: Arc::downgrade(self),
     562            0 :             id,
     563            0 :             sender: req_tx,
     564            0 :             permit,
     565            0 :         }
     566            0 :     }
     567              : 
     568              :     /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
     569              :     /// bidirectional GetPage stream, then forwards requests and responses between callers and the
     570              :     /// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
     571              :     /// atomic with pool stream acquisition.
     572              :     ///
     573              :     /// The task exits when the request channel is closed, or on a stream error. The caller is
     574              :     /// responsible for removing the stream from the pool on exit.
     575            0 :     async fn run_stream(
     576            0 :         client_pool: Arc<ClientPool>,
     577            0 :         mut caller_rx: RequestReceiver,
     578            0 :     ) -> anyhow::Result<()> {
     579              :         // Acquire a client from the pool and create a stream.
     580            0 :         let mut client = client_pool.get().await?;
     581              : 
     582              :         // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
     583              :         // theoretically deadlock if both the client and server block on sends (since we're not
     584              :         // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
     585              :         // low queue depths, but it was seen to happen with the libpq protocol so better safe than
     586              :         // sorry. It should never buffer more than the queue depth anyway, but using an unbounded
     587              :         // channel guarantees that it will never block.
     588            0 :         let (req_tx, req_rx) = mpsc::unbounded_channel();
     589            0 :         let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
     590            0 :         let mut resp_stream = client.get_pages(req_stream).await?;
     591              : 
     592              :         // Track caller response channels by request ID. If the task returns early, these response
     593              :         // channels will be dropped and the waiting callers will receive an error.
     594              :         //
     595              :         // NB: this will leak entries if the server doesn't respond to a request (by request ID).
     596              :         // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
     597              :         // block further use. But we could consider reaping closed channels after some time.
     598            0 :         let mut callers = HashMap::new();
     599              : 
     600              :         // Process requests and responses.
     601              :         loop {
     602            0 :             tokio::select! {
     603              :                 // Receive requests from callers and send them to the stream.
     604            0 :                 req = caller_rx.recv() => {
     605              :                     // Shut down if request channel is closed.
     606            0 :                     let Some((req, resp_tx)) = req else {
     607            0 :                         return Ok(());
     608              :                     };
     609              : 
     610              :                     // Store the response channel by request ID.
     611            0 :                     if callers.contains_key(&req.request_id) {
     612              :                         // Error on request ID duplicates. Ignore callers that went away.
     613            0 :                         _ = resp_tx.send(Err(tonic::Status::invalid_argument(
     614            0 :                             format!("duplicate request ID: {}", req.request_id),
     615            0 :                         )));
     616            0 :                         continue;
     617            0 :                     }
     618            0 :                     callers.insert(req.request_id, resp_tx);
     619              : 
     620              :                     // Send the request on the stream. Bail out if the stream is closed.
     621            0 :                     req_tx.send(req).map_err(|_| {
     622            0 :                         tonic::Status::unavailable("stream closed")
     623            0 :                     })?;
     624              :                 }
     625              : 
     626              :                 // Receive responses from the stream and send them to callers.
     627            0 :                 resp = resp_stream.next() => {
     628              :                     // Shut down if the stream is closed, and bail out on stream errors.
     629            0 :                     let Some(resp) = resp.transpose()? else {
     630            0 :                         return Ok(())
     631              :                     };
     632              : 
     633              :                     // Send the response to the caller. Ignore errors if the caller went away.
     634            0 :                     let Some(resp_tx) = callers.remove(&resp.request_id) else {
     635            0 :                         warn!("received response for unknown request ID: {}", resp.request_id);
     636            0 :                         continue;
     637              :                     };
     638            0 :                     _ = resp_tx.send(Ok(resp));
     639              :                 }
     640              :             }
     641              :         }
     642            0 :     }
     643              : }
     644              : 
     645              : impl Reapable for StreamPool {
     646              :     /// Reaps streams that have been idle since before the cutoff.
     647            0 :     fn reap_idle(&self, cutoff: Instant) {
     648            0 :         self.streams.lock().unwrap().retain(|_, entry| {
     649            0 :             let Some(idle_since) = entry.idle_since else {
     650            0 :                 assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
     651            0 :                 return true;
     652              :             };
     653            0 :             assert_eq!(entry.queue_depth, 0, "idle stream has requests");
     654            0 :             idle_since >= cutoff
     655            0 :         });
     656            0 :     }
     657              : }
     658              : 
     659              : /// A pooled stream reference. Can be used to send a single request, to properly enforce queue
     660              : /// depth. Queue depth is already reserved and will be returned on drop.
     661              : pub struct StreamGuard {
     662              :     pool: Weak<StreamPool>,
     663              :     id: StreamID,
     664              :     sender: RequestSender,
     665              :     permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
     666              : }
     667              : 
     668              : impl StreamGuard {
     669              :     /// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
     670              :     /// valid for a single request (to enforce queue depth). This also drops the guard on return and
     671              :     /// returns the queue depth quota to the pool.
     672              :     ///
     673              :     /// The `GetPageRequest::request_id` must be unique across in-flight requests.
     674              :     ///
     675              :     /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
     676              :     /// to avoid tearing down the stream for per-request errors. Callers must check this.
     677            0 :     pub async fn send(
     678            0 :         self,
     679            0 :         req: page_api::GetPageRequest,
     680            0 :     ) -> tonic::Result<page_api::GetPageResponse> {
     681            0 :         let (resp_tx, resp_rx) = oneshot::channel();
     682              : 
     683            0 :         self.sender
     684            0 :             .send((req, resp_tx))
     685            0 :             .await
     686            0 :             .map_err(|_| tonic::Status::unavailable("stream closed"))?;
     687              : 
     688            0 :         resp_rx
     689            0 :             .await
     690            0 :             .map_err(|_| tonic::Status::unavailable("stream closed"))?
     691            0 :     }
     692              : }
     693              : 
     694              : impl Drop for StreamGuard {
     695            0 :     fn drop(&mut self) {
     696            0 :         let Some(pool) = self.pool.upgrade() else {
     697            0 :             return; // pool was dropped
     698              :         };
     699              : 
     700              :         // Release the queue depth reservation on drop. This can prematurely decrement it if dropped
     701              :         // before the response is received, but that's okay.
     702              :         //
     703              :         // TODO: actually, it's probably not okay. Queue depth release should be moved into the
     704              :         // stream task, such that it continues to account for the queue depth slot until the server
     705              :         // responds. Otherwise, if a slow request times out and keeps blocking the stream, the
     706              :         // server will keep waiting on it and we can pile on subsequent requests (including the
     707              :         // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
     708              :         // requests on e.g. LSN waits and layer downloads, instead returning early to free up the
     709              :         // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
     710              :         // blocking. TBD.
     711            0 :         let mut streams = pool.streams.lock().unwrap();
     712            0 :         let entry = streams.get_mut(&self.id).expect("unknown stream");
     713            0 :         assert!(entry.idle_since.is_none(), "active stream marked idle");
     714            0 :         assert!(entry.queue_depth > 0, "stream queue underflow");
     715            0 :         entry.queue_depth -= 1;
     716            0 :         if entry.queue_depth == 0 {
     717            0 :             entry.idle_since = Some(Instant::now()); // mark stream as idle
     718            0 :         }
     719              : 
     720            0 :         _ = self.permit; // returned on drop, referenced for visibility
     721            0 :     }
     722              : }
     723              : 
     724              : /// Periodically reaps idle resources from a pool.
     725              : struct Reaper {
     726              :     /// The task check interval.
     727              :     interval: Duration,
     728              :     /// The threshold for reaping idle resources.
     729              :     threshold: Duration,
     730              :     /// Cancels the reaper task. Cancelled when the reaper is dropped.
     731              :     cancel: CancellationToken,
     732              : }
     733              : 
     734              : impl Reaper {
     735              :     /// Creates a new reaper.
     736            0 :     pub fn new(threshold: Duration, interval: Duration) -> Self {
     737            0 :         Self {
     738            0 :             cancel: CancellationToken::new(),
     739            0 :             threshold,
     740            0 :             interval,
     741            0 :         }
     742            0 :     }
     743              : 
     744              :     /// Spawns a task to periodically reap idle resources from the given task pool. The task is
     745              :     /// cancelled when the reaper is dropped.
     746            0 :     pub fn spawn(&self, pool: &Arc<impl Reapable>) {
     747              :         // NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
     748            0 :         let pool = Arc::downgrade(pool);
     749            0 :         let cancel = self.cancel.clone();
     750            0 :         let (interval, threshold) = (self.interval, self.threshold);
     751              : 
     752            0 :         tokio::spawn(async move {
     753              :             loop {
     754            0 :                 tokio::select! {
     755            0 :                     _ = tokio::time::sleep(interval) => {
     756            0 :                         let Some(pool) = pool.upgrade() else {
     757            0 :                             return; // pool was dropped
     758              :                         };
     759            0 :                         pool.reap_idle(Instant::now() - threshold);
     760              :                     }
     761              : 
     762            0 :                     _ = cancel.cancelled() => return,
     763              :                 }
     764              :             }
     765            0 :         });
     766            0 :     }
     767              : }
     768              : 
     769              : impl Drop for Reaper {
     770            0 :     fn drop(&mut self) {
     771            0 :         self.cancel.cancel(); // cancel reaper task
     772            0 :     }
     773              : }
     774              : 
     775              : /// A reapable resource pool.
     776              : trait Reapable: Send + Sync + 'static {
     777              :     /// Reaps resources that have been idle since before the given cutoff.
     778              :     fn reap_idle(&self, cutoff: Instant);
     779              : }
        

Generated by: LCOV version 2.1-beta