Line data Source code
1 : //! This module provides various Pageserver gRPC client resource pools.
2 : //!
3 : //! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
4 : //! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
5 : //! of creating dedicated TCP connections and server tasks for every Postgres backend.
6 : //!
7 : //! Each resource has its own, nested pool. The pools are custom-built for the properties of each
8 : //! resource -- they are different enough that a generic pool isn't suitable.
9 : //!
10 : //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
11 : //! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
12 : //! per-channel client limit. Channels may be closed when they are no longer used by any clients.
13 : //!
14 : //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
15 : //! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
16 : //! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
17 : //! from the pool after some time, to free up the channel.
18 : //!
19 : //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
20 : //! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
21 : //! returns a guard that can be used to send a single request, to properly enforce queue depth and
22 : //! route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
23 : //! possibly pipelining multiple requests from multiple callers on the same stream (up to some
24 : //! queue depth). Idle streams may be removed from the pool after a while to free up the client.
25 : //!
26 : //! Each channel corresponds to one TCP connection. Each client unary request and each stream
27 : //! corresponds to one HTTP/2 stream and server task.
28 : //!
29 : //! TODO: error handling (including custom error types).
30 : //! TODO: observability.
31 :
32 : use std::collections::{BTreeMap, HashMap};
33 : use std::num::NonZero;
34 : use std::ops::{Deref, DerefMut};
35 : use std::sync::atomic::{AtomicUsize, Ordering};
36 : use std::sync::{Arc, Mutex, Weak};
37 : use std::time::{Duration, Instant};
38 :
39 : use futures::StreamExt as _;
40 : use tokio::sync::mpsc::{Receiver, Sender};
41 : use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
42 : use tokio_util::sync::CancellationToken;
43 : use tonic::codec::CompressionEncoding;
44 : use tonic::transport::{Channel, Endpoint};
45 : use tracing::{error, warn};
46 :
47 : use pageserver_page_api as page_api;
48 : use utils::id::{TenantId, TimelineId};
49 : use utils::shard::ShardIndex;
50 :
51 : /// Reap channels/clients/streams that have been idle for this long.
52 : ///
53 : /// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
54 : /// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
55 : /// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
56 : /// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
57 : /// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
58 : /// channels, and/or stream pool clients.
59 : const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
60 : false => Duration::from_secs(180),
61 : true => Duration::from_secs(1), // exercise reaping in tests
62 : };
63 :
64 : /// Reap idle resources with this interval.
65 : const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
66 : false => Duration::from_secs(10),
67 : true => Duration::from_secs(1), // exercise reaping in tests
68 : };
69 :
70 : /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
71 : /// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
72 : /// The pool does not limit the number of channels, and instead relies on `ClientPool` or
73 : /// `StreamPool` to limit the number of concurrent clients.
74 : ///
75 : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
76 : ///
77 : /// TODO: consider prewarming a set of channels, to avoid initial connection latency.
78 : /// TODO: consider adding a circuit breaker for errors and fail fast.
79 : pub struct ChannelPool {
80 : /// Pageserver endpoint to connect to.
81 : endpoint: Endpoint,
82 : /// Max number of clients per channel. Beyond this, a new channel will be created.
83 : max_clients_per_channel: NonZero<usize>,
84 : /// Open channels.
85 : channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
86 : /// Reaps idle channels.
87 : idle_reaper: Reaper,
88 : /// Channel ID generator.
89 : next_channel_id: AtomicUsize,
90 : }
91 :
92 : type ChannelID = usize;
93 :
94 : struct ChannelEntry {
95 : /// The gRPC channel (i.e. TCP connection). Shared by multiple clients.
96 : channel: Channel,
97 : /// Number of clients using this channel.
98 : clients: usize,
99 : /// The channel has been idle (no clients) since this time. None if channel is in use.
100 : /// INVARIANT: Some if clients == 0, otherwise None.
101 : idle_since: Option<Instant>,
102 : }
103 :
104 : impl ChannelPool {
105 : /// Creates a new channel pool for the given Pageserver endpoint.
106 0 : pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
107 0 : where
108 0 : E: TryInto<Endpoint> + Send + Sync + 'static,
109 0 : <E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
110 : {
111 0 : let pool = Arc::new(Self {
112 0 : endpoint: endpoint.try_into()?,
113 0 : max_clients_per_channel,
114 0 : channels: Mutex::default(),
115 0 : idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
116 0 : next_channel_id: AtomicUsize::default(),
117 : });
118 0 : pool.idle_reaper.spawn(&pool);
119 0 : Ok(pool)
120 0 : }
121 :
122 : /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
123 : ///
124 : /// This never blocks (except for mutex acquisition). The channel is connected lazily on first
125 : /// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
126 : /// automatically on failure (TODO: verify).
127 : ///
128 : /// Callers should not clone the returned channel, and must hold onto the returned guard as long
129 : /// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
130 : /// client requires an owned `Channel` and we don't have access to the channel's internal
131 : /// refcount.
132 : ///
133 : /// This is not performance-sensitive. It is only called when creating a new client, and clients
134 : /// are pooled and reused by `ClientPool`. The total number of channels will also be small. O(n)
135 : /// performance is therefore okay.
136 0 : pub fn get(self: &Arc<Self>) -> ChannelGuard {
137 0 : let mut channels = self.channels.lock().unwrap();
138 :
139 : // Try to find an existing channel with available capacity. We check entries in BTreeMap
140 : // order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
141 : // with lower-ordered channel IDs first. This will cluster clients in lower-ordered
142 : // channels, and free up higher-ordered channels such that they can be reaped.
143 0 : for (&id, entry) in channels.iter_mut() {
144 0 : assert!(
145 0 : entry.clients <= self.max_clients_per_channel.get(),
146 0 : "channel overflow"
147 : );
148 0 : assert_eq!(
149 0 : entry.idle_since.is_some(),
150 0 : entry.clients == 0,
151 0 : "incorrect channel idle state"
152 : );
153 0 : if entry.clients < self.max_clients_per_channel.get() {
154 0 : entry.clients += 1;
155 0 : entry.idle_since = None;
156 0 : return ChannelGuard {
157 0 : pool: Arc::downgrade(self),
158 0 : id,
159 0 : channel: Some(entry.channel.clone()),
160 0 : };
161 0 : }
162 : }
163 :
164 : // Create a new channel. We connect lazily on first use, such that we don't block here and
165 : // other clients can join onto the same channel while it's connecting.
166 0 : let channel = self.endpoint.connect_lazy();
167 :
168 0 : let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
169 0 : let entry = ChannelEntry {
170 0 : channel: channel.clone(),
171 0 : clients: 1, // account for the guard below
172 0 : idle_since: None,
173 0 : };
174 0 : channels.insert(id, entry);
175 :
176 0 : ChannelGuard {
177 0 : pool: Arc::downgrade(self),
178 0 : id,
179 0 : channel: Some(channel),
180 0 : }
181 0 : }
182 : }
183 :
184 : impl Reapable for ChannelPool {
185 : /// Reaps channels that have been idle since before the cutoff.
186 0 : fn reap_idle(&self, cutoff: Instant) {
187 0 : self.channels.lock().unwrap().retain(|_, entry| {
188 0 : let Some(idle_since) = entry.idle_since else {
189 0 : assert_ne!(entry.clients, 0, "empty channel not marked idle");
190 0 : return true;
191 : };
192 0 : assert_eq!(entry.clients, 0, "idle channel has clients");
193 0 : idle_since >= cutoff
194 0 : })
195 0 : }
196 : }
197 :
198 : /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
199 : /// since the gRPC client requires an owned `Channel`.
200 : pub struct ChannelGuard {
201 : pool: Weak<ChannelPool>,
202 : id: ChannelID,
203 : channel: Option<Channel>,
204 : }
205 :
206 : impl ChannelGuard {
207 : /// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
208 : /// the guard as long as the channel is in use, and should not clone it.
209 0 : pub fn take(&mut self) -> Channel {
210 0 : self.channel.take().expect("channel already taken")
211 0 : }
212 : }
213 :
214 : /// Returns the channel to the pool.
215 : impl Drop for ChannelGuard {
216 0 : fn drop(&mut self) {
217 0 : let Some(pool) = self.pool.upgrade() else {
218 0 : return; // pool was dropped
219 : };
220 :
221 0 : let mut channels = pool.channels.lock().unwrap();
222 0 : let entry = channels.get_mut(&self.id).expect("unknown channel");
223 0 : assert!(entry.idle_since.is_none(), "active channel marked idle");
224 0 : assert!(entry.clients > 0, "channel underflow");
225 0 : entry.clients -= 1;
226 0 : if entry.clients == 0 {
227 0 : entry.idle_since = Some(Instant::now()); // mark channel as idle
228 0 : }
229 0 : }
230 : }
231 :
232 : /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
233 : /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
234 : /// number of concurrent clients to `max_clients` via semaphore.
235 : ///
236 : /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
237 : pub struct ClientPool {
238 : /// Tenant ID.
239 : tenant_id: TenantId,
240 : /// Timeline ID.
241 : timeline_id: TimelineId,
242 : /// Shard ID.
243 : shard_id: ShardIndex,
244 : /// Authentication token, if any.
245 : auth_token: Option<String>,
246 : /// Compression to use.
247 : compression: Option<CompressionEncoding>,
248 : /// Channel pool to acquire channels from.
249 : channel_pool: Arc<ChannelPool>,
250 : /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
251 : limiter: Option<Arc<Semaphore>>,
252 : /// Idle pooled clients. Acquired clients are removed from here and returned on drop.
253 : ///
254 : /// The first client in the map will be acquired next. The map is sorted by client ID, which in
255 : /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
256 : /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
257 : /// clients are reaped.
258 : idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
259 : /// Reaps idle clients.
260 : idle_reaper: Reaper,
261 : /// Unique client ID generator.
262 : next_client_id: AtomicUsize,
263 : }
264 :
265 : type ClientID = (ChannelID, usize);
266 :
267 : struct ClientEntry {
268 : /// The pooled gRPC client.
269 : client: page_api::Client,
270 : /// The channel guard for the channel used by the client.
271 : channel_guard: ChannelGuard,
272 : /// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
273 : /// definition, so this is the time when it was added back to the pool.
274 : idle_since: Instant,
275 : }
276 :
277 : impl ClientPool {
278 : /// Creates a new client pool for the given tenant shard. Channels are acquired from the given
279 : /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
280 : /// `max_clients` concurrent clients, or unbounded if None.
281 0 : pub fn new(
282 0 : channel_pool: Arc<ChannelPool>,
283 0 : tenant_id: TenantId,
284 0 : timeline_id: TimelineId,
285 0 : shard_id: ShardIndex,
286 0 : auth_token: Option<String>,
287 0 : compression: Option<CompressionEncoding>,
288 0 : max_clients: Option<NonZero<usize>>,
289 0 : ) -> Arc<Self> {
290 0 : let pool = Arc::new(Self {
291 0 : tenant_id,
292 0 : timeline_id,
293 0 : shard_id,
294 0 : auth_token,
295 0 : compression,
296 0 : channel_pool,
297 0 : idle: Mutex::default(),
298 0 : idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
299 0 : limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
300 0 : next_client_id: AtomicUsize::default(),
301 : });
302 0 : pool.idle_reaper.spawn(&pool);
303 0 : pool
304 0 : }
305 :
306 : /// Gets a client from the pool, or creates a new one if necessary. Connections are established
307 : /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
308 : /// is returned to the pool when the guard is dropped.
309 : ///
310 : /// This is moderately performance-sensitive. It is called for every unary request, but these
311 : /// establish a new gRPC stream per request so they're already expensive. GetPage requests use
312 : /// the `StreamPool` instead.
313 0 : pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
314 : // Acquire a permit if the pool is bounded.
315 0 : let mut permit = None;
316 0 : if let Some(limiter) = self.limiter.clone() {
317 0 : permit = Some(limiter.acquire_owned().await.expect("never closed"));
318 0 : }
319 :
320 : // Fast path: acquire an idle client from the pool.
321 0 : if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
322 0 : return Ok(ClientGuard {
323 0 : pool: Arc::downgrade(self),
324 0 : id,
325 0 : client: Some(entry.client),
326 0 : channel_guard: Some(entry.channel_guard),
327 0 : permit,
328 0 : });
329 0 : }
330 :
331 : // Slow path: construct a new client.
332 0 : let mut channel_guard = self.channel_pool.get();
333 0 : let client = page_api::Client::new(
334 0 : channel_guard.take(),
335 0 : self.tenant_id,
336 0 : self.timeline_id,
337 0 : self.shard_id,
338 0 : self.auth_token.clone(),
339 0 : self.compression,
340 0 : )?;
341 :
342 0 : Ok(ClientGuard {
343 0 : pool: Arc::downgrade(self),
344 0 : id: (
345 0 : channel_guard.id,
346 0 : self.next_client_id.fetch_add(1, Ordering::Relaxed),
347 0 : ),
348 0 : client: Some(client),
349 0 : channel_guard: Some(channel_guard),
350 0 : permit,
351 0 : })
352 0 : }
353 : }
354 :
355 : impl Reapable for ClientPool {
356 : /// Reaps clients that have been idle since before the cutoff.
357 0 : fn reap_idle(&self, cutoff: Instant) {
358 0 : self.idle
359 0 : .lock()
360 0 : .unwrap()
361 0 : .retain(|_, entry| entry.idle_since >= cutoff)
362 0 : }
363 : }
364 :
365 : /// A client acquired from the pool. The inner client can be accessed via Deref. The client is
366 : /// returned to the pool when dropped.
367 : pub struct ClientGuard {
368 : pool: Weak<ClientPool>,
369 : id: ClientID,
370 : client: Option<page_api::Client>, // Some until dropped
371 : channel_guard: Option<ChannelGuard>, // Some until dropped
372 : permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
373 : }
374 :
375 : impl Deref for ClientGuard {
376 : type Target = page_api::Client;
377 :
378 0 : fn deref(&self) -> &Self::Target {
379 0 : self.client.as_ref().expect("not dropped")
380 0 : }
381 : }
382 :
383 : impl DerefMut for ClientGuard {
384 0 : fn deref_mut(&mut self) -> &mut Self::Target {
385 0 : self.client.as_mut().expect("not dropped")
386 0 : }
387 : }
388 :
389 : /// Returns the client to the pool.
390 : impl Drop for ClientGuard {
391 0 : fn drop(&mut self) {
392 0 : let Some(pool) = self.pool.upgrade() else {
393 0 : return; // pool was dropped
394 : };
395 :
396 0 : let entry = ClientEntry {
397 0 : client: self.client.take().expect("dropped once"),
398 0 : channel_guard: self.channel_guard.take().expect("dropped once"),
399 0 : idle_since: Instant::now(),
400 0 : };
401 0 : pool.idle.lock().unwrap().insert(self.id, entry);
402 :
403 0 : _ = self.permit; // returned on drop, referenced for visibility
404 0 : }
405 : }
406 :
407 : /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
408 : /// acquires a client from the inner `ClientPool` for the stream's lifetime.
409 : ///
410 : /// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
411 : /// a single request and await the response. Internally, requests are multiplexed across streams and
412 : /// channels. This allows proper queue depth enforcement and response routing.
413 : ///
414 : /// TODO: consider making this generic over request and response types; not currently needed.
415 : pub struct StreamPool {
416 : /// The client pool to acquire clients from. Must be unbounded.
417 : client_pool: Arc<ClientPool>,
418 : /// All pooled streams.
419 : ///
420 : /// Incoming requests will be sent over an existing stream with available capacity. If all
421 : /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
422 : /// stream has an associated Tokio task that processes requests and responses.
423 : streams: Mutex<HashMap<StreamID, StreamEntry>>,
424 : /// The max number of concurrent streams, or None if unbounded.
425 : max_streams: Option<NonZero<usize>>,
426 : /// The max number of concurrent requests per stream.
427 : max_queue_depth: NonZero<usize>,
428 : /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
429 : /// None if the pool is unbounded.
430 : limiter: Option<Arc<Semaphore>>,
431 : /// Reaps idle streams.
432 : idle_reaper: Reaper,
433 : /// Stream ID generator.
434 : next_stream_id: AtomicUsize,
435 : }
436 :
437 : type StreamID = usize;
438 : type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
439 : type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
440 : type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
441 :
442 : struct StreamEntry {
443 : /// Sends caller requests to the stream task. The stream task exits when this is dropped.
444 : sender: RequestSender,
445 : /// Number of in-flight requests on this stream.
446 : queue_depth: usize,
447 : /// The time when this stream went idle (queue_depth == 0).
448 : /// INVARIANT: Some if queue_depth == 0, otherwise None.
449 : idle_since: Option<Instant>,
450 : }
451 :
452 : impl StreamPool {
453 : /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
454 : /// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
455 : ///
456 : /// The client pool must be unbounded. The stream pool will enforce its own limits, and because
457 : /// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
458 : /// The stream pool should generally have its own dedicated client pool (but it can share a
459 : /// channel pool with others since these are always unbounded).
460 0 : pub fn new(
461 0 : client_pool: Arc<ClientPool>,
462 0 : max_streams: Option<NonZero<usize>>,
463 0 : max_queue_depth: NonZero<usize>,
464 0 : ) -> Arc<Self> {
465 0 : assert!(client_pool.limiter.is_none(), "bounded client pool");
466 0 : let pool = Arc::new(Self {
467 0 : client_pool,
468 0 : streams: Mutex::default(),
469 0 : limiter: max_streams.map(|max_streams| {
470 0 : Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
471 0 : }),
472 0 : max_streams,
473 0 : max_queue_depth,
474 0 : idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
475 0 : next_stream_id: AtomicUsize::default(),
476 : });
477 0 : pool.idle_reaper.spawn(&pool);
478 0 : pool
479 0 : }
480 :
481 : /// Acquires an available stream from the pool, or spins up a new stream async if all streams
482 : /// are full. Returns a guard that can be used to send a single request on the stream and await
483 : /// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
484 : /// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
485 : ///
486 : /// This is very performance-sensitive, as it is on the GetPage hot path.
487 : ///
488 : /// TODO: this must do something more sophisticated for performance. We want:
489 : ///
490 : /// * Cheap, concurrent access in the common case where we can use a pooled stream.
491 : /// * Quick acquisition of pooled streams with available capacity.
492 : /// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
493 : /// * Prefer filling up existing streams' queue depth before spinning up new streams.
494 : /// * Don't hold a lock while spinning up new streams.
495 : /// * Allow concurrent clients to join onto streams while they're spun up.
496 : /// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
497 : ///
498 : /// For now, we just do something simple but inefficient (linear scan under mutex).
499 0 : pub async fn get(self: &Arc<Self>) -> StreamGuard {
500 : // Acquire a permit if the pool is bounded.
501 0 : let mut permit = None;
502 0 : if let Some(limiter) = self.limiter.clone() {
503 0 : permit = Some(limiter.acquire_owned().await.expect("never closed"));
504 0 : }
505 0 : let mut streams = self.streams.lock().unwrap();
506 :
507 : // Look for a pooled stream with available capacity.
508 0 : for (&id, entry) in streams.iter_mut() {
509 0 : assert!(
510 0 : entry.queue_depth <= self.max_queue_depth.get(),
511 0 : "stream queue overflow"
512 : );
513 0 : assert_eq!(
514 0 : entry.idle_since.is_some(),
515 0 : entry.queue_depth == 0,
516 0 : "incorrect stream idle state"
517 : );
518 0 : if entry.queue_depth < self.max_queue_depth.get() {
519 0 : entry.queue_depth += 1;
520 0 : entry.idle_since = None;
521 0 : return StreamGuard {
522 0 : pool: Arc::downgrade(self),
523 0 : id,
524 0 : sender: entry.sender.clone(),
525 0 : permit,
526 0 : };
527 0 : }
528 : }
529 :
530 : // No available stream, spin up a new one. We install the stream entry in the pool first and
531 : // return the guard, while spinning up the stream task async. This allows other callers to
532 : // join onto this stream and also create additional streams concurrently if this fills up.
533 0 : let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
534 0 : let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
535 0 : let entry = StreamEntry {
536 0 : sender: req_tx.clone(),
537 0 : queue_depth: 1, // reserve quota for this caller
538 0 : idle_since: None,
539 0 : };
540 0 : streams.insert(id, entry);
541 :
542 0 : if let Some(max_streams) = self.max_streams {
543 0 : assert!(streams.len() <= max_streams.get(), "stream overflow");
544 0 : };
545 :
546 0 : let client_pool = self.client_pool.clone();
547 0 : let pool = Arc::downgrade(self);
548 :
549 0 : tokio::spawn(async move {
550 0 : if let Err(err) = Self::run_stream(client_pool, req_rx).await {
551 0 : error!("stream failed: {err}");
552 0 : }
553 : // Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
554 0 : if let Some(pool) = pool.upgrade() {
555 0 : let entry = pool.streams.lock().unwrap().remove(&id);
556 0 : assert!(entry.is_some(), "unknown stream ID: {id}");
557 0 : }
558 0 : });
559 :
560 0 : StreamGuard {
561 0 : pool: Arc::downgrade(self),
562 0 : id,
563 0 : sender: req_tx,
564 0 : permit,
565 0 : }
566 0 : }
567 :
568 : /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
569 : /// bidirectional GetPage stream, then forwards requests and responses between callers and the
570 : /// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
571 : /// atomic with pool stream acquisition.
572 : ///
573 : /// The task exits when the request channel is closed, or on a stream error. The caller is
574 : /// responsible for removing the stream from the pool on exit.
575 0 : async fn run_stream(
576 0 : client_pool: Arc<ClientPool>,
577 0 : mut caller_rx: RequestReceiver,
578 0 : ) -> anyhow::Result<()> {
579 : // Acquire a client from the pool and create a stream.
580 0 : let mut client = client_pool.get().await?;
581 :
582 : // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
583 : // theoretically deadlock if both the client and server block on sends (since we're not
584 : // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
585 : // low queue depths, but it was seen to happen with the libpq protocol so better safe than
586 : // sorry. It should never buffer more than the queue depth anyway, but using an unbounded
587 : // channel guarantees that it will never block.
588 0 : let (req_tx, req_rx) = mpsc::unbounded_channel();
589 0 : let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
590 0 : let mut resp_stream = client.get_pages(req_stream).await?;
591 :
592 : // Track caller response channels by request ID. If the task returns early, these response
593 : // channels will be dropped and the waiting callers will receive an error.
594 : //
595 : // NB: this will leak entries if the server doesn't respond to a request (by request ID).
596 : // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
597 : // block further use. But we could consider reaping closed channels after some time.
598 0 : let mut callers = HashMap::new();
599 :
600 : // Process requests and responses.
601 : loop {
602 0 : tokio::select! {
603 : // Receive requests from callers and send them to the stream.
604 0 : req = caller_rx.recv() => {
605 : // Shut down if request channel is closed.
606 0 : let Some((req, resp_tx)) = req else {
607 0 : return Ok(());
608 : };
609 :
610 : // Store the response channel by request ID.
611 0 : if callers.contains_key(&req.request_id) {
612 : // Error on request ID duplicates. Ignore callers that went away.
613 0 : _ = resp_tx.send(Err(tonic::Status::invalid_argument(
614 0 : format!("duplicate request ID: {}", req.request_id),
615 0 : )));
616 0 : continue;
617 0 : }
618 0 : callers.insert(req.request_id, resp_tx);
619 :
620 : // Send the request on the stream. Bail out if the stream is closed.
621 0 : req_tx.send(req).map_err(|_| {
622 0 : tonic::Status::unavailable("stream closed")
623 0 : })?;
624 : }
625 :
626 : // Receive responses from the stream and send them to callers.
627 0 : resp = resp_stream.next() => {
628 : // Shut down if the stream is closed, and bail out on stream errors.
629 0 : let Some(resp) = resp.transpose()? else {
630 0 : return Ok(())
631 : };
632 :
633 : // Send the response to the caller. Ignore errors if the caller went away.
634 0 : let Some(resp_tx) = callers.remove(&resp.request_id) else {
635 0 : warn!("received response for unknown request ID: {}", resp.request_id);
636 0 : continue;
637 : };
638 0 : _ = resp_tx.send(Ok(resp));
639 : }
640 : }
641 : }
642 0 : }
643 : }
644 :
645 : impl Reapable for StreamPool {
646 : /// Reaps streams that have been idle since before the cutoff.
647 0 : fn reap_idle(&self, cutoff: Instant) {
648 0 : self.streams.lock().unwrap().retain(|_, entry| {
649 0 : let Some(idle_since) = entry.idle_since else {
650 0 : assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
651 0 : return true;
652 : };
653 0 : assert_eq!(entry.queue_depth, 0, "idle stream has requests");
654 0 : idle_since >= cutoff
655 0 : });
656 0 : }
657 : }
658 :
659 : /// A pooled stream reference. Can be used to send a single request, to properly enforce queue
660 : /// depth. Queue depth is already reserved and will be returned on drop.
661 : pub struct StreamGuard {
662 : pool: Weak<StreamPool>,
663 : id: StreamID,
664 : sender: RequestSender,
665 : permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
666 : }
667 :
668 : impl StreamGuard {
669 : /// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
670 : /// valid for a single request (to enforce queue depth). This also drops the guard on return and
671 : /// returns the queue depth quota to the pool.
672 : ///
673 : /// The `GetPageRequest::request_id` must be unique across in-flight requests.
674 : ///
675 : /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
676 : /// to avoid tearing down the stream for per-request errors. Callers must check this.
677 0 : pub async fn send(
678 0 : self,
679 0 : req: page_api::GetPageRequest,
680 0 : ) -> tonic::Result<page_api::GetPageResponse> {
681 0 : let (resp_tx, resp_rx) = oneshot::channel();
682 :
683 0 : self.sender
684 0 : .send((req, resp_tx))
685 0 : .await
686 0 : .map_err(|_| tonic::Status::unavailable("stream closed"))?;
687 :
688 0 : resp_rx
689 0 : .await
690 0 : .map_err(|_| tonic::Status::unavailable("stream closed"))?
691 0 : }
692 : }
693 :
694 : impl Drop for StreamGuard {
695 0 : fn drop(&mut self) {
696 0 : let Some(pool) = self.pool.upgrade() else {
697 0 : return; // pool was dropped
698 : };
699 :
700 : // Release the queue depth reservation on drop. This can prematurely decrement it if dropped
701 : // before the response is received, but that's okay.
702 : //
703 : // TODO: actually, it's probably not okay. Queue depth release should be moved into the
704 : // stream task, such that it continues to account for the queue depth slot until the server
705 : // responds. Otherwise, if a slow request times out and keeps blocking the stream, the
706 : // server will keep waiting on it and we can pile on subsequent requests (including the
707 : // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
708 : // requests on e.g. LSN waits and layer downloads, instead returning early to free up the
709 : // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
710 : // blocking. TBD.
711 0 : let mut streams = pool.streams.lock().unwrap();
712 0 : let entry = streams.get_mut(&self.id).expect("unknown stream");
713 0 : assert!(entry.idle_since.is_none(), "active stream marked idle");
714 0 : assert!(entry.queue_depth > 0, "stream queue underflow");
715 0 : entry.queue_depth -= 1;
716 0 : if entry.queue_depth == 0 {
717 0 : entry.idle_since = Some(Instant::now()); // mark stream as idle
718 0 : }
719 :
720 0 : _ = self.permit; // returned on drop, referenced for visibility
721 0 : }
722 : }
723 :
724 : /// Periodically reaps idle resources from a pool.
725 : struct Reaper {
726 : /// The task check interval.
727 : interval: Duration,
728 : /// The threshold for reaping idle resources.
729 : threshold: Duration,
730 : /// Cancels the reaper task. Cancelled when the reaper is dropped.
731 : cancel: CancellationToken,
732 : }
733 :
734 : impl Reaper {
735 : /// Creates a new reaper.
736 0 : pub fn new(threshold: Duration, interval: Duration) -> Self {
737 0 : Self {
738 0 : cancel: CancellationToken::new(),
739 0 : threshold,
740 0 : interval,
741 0 : }
742 0 : }
743 :
744 : /// Spawns a task to periodically reap idle resources from the given task pool. The task is
745 : /// cancelled when the reaper is dropped.
746 0 : pub fn spawn(&self, pool: &Arc<impl Reapable>) {
747 : // NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
748 0 : let pool = Arc::downgrade(pool);
749 0 : let cancel = self.cancel.clone();
750 0 : let (interval, threshold) = (self.interval, self.threshold);
751 :
752 0 : tokio::spawn(async move {
753 : loop {
754 0 : tokio::select! {
755 0 : _ = tokio::time::sleep(interval) => {
756 0 : let Some(pool) = pool.upgrade() else {
757 0 : return; // pool was dropped
758 : };
759 0 : pool.reap_idle(Instant::now() - threshold);
760 : }
761 :
762 0 : _ = cancel.cancelled() => return,
763 : }
764 : }
765 0 : });
766 0 : }
767 : }
768 :
769 : impl Drop for Reaper {
770 0 : fn drop(&mut self) {
771 0 : self.cancel.cancel(); // cancel reaper task
772 0 : }
773 : }
774 :
775 : /// A reapable resource pool.
776 : trait Reapable: Send + Sync + 'static {
777 : /// Reaps resources that have been idle since before the given cutoff.
778 : fn reap_idle(&self, cutoff: Instant);
779 : }
|