Line data Source code
1 : //! A cache for [`crate::tenant::mgr`]+`Tenant::get_timeline`+`Timeline::gate.enter()`.
2 : //!
3 : //! # Motivation
4 : //!
5 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
6 : //!
7 : //! Without sharding, there is a single Timeline object to which we dispatch
8 : //! all requests. For example, a getpage request gets dispatched to the
9 : //! Timeline::get method of the Timeline object that represents the
10 : //! (tenant,timeline) of that connection.
11 : //!
12 : //! With sharding, for each request that comes in on the connection,
13 : //! we first have to perform shard routing based on the requested key (=~ page number).
14 : //! The result of shard routing is a Timeline object.
15 : //! We then dispatch the request to that Timeline object.
16 : //!
17 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
18 : //! we hold the Timeline gate open while we're invoking the method on the
19 : //! Timeline object.
20 : //!
21 : //! We want to avoid the overhead of doing, for each incoming request,
22 : //! - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
23 : //! - cloning the `Arc<Timeline>` out of the tenant manager so we can
24 : //! release the mgr rwlock before doing any request processing work
25 : //! - re-entering the Timeline gate for each Timeline method invocation.
26 : //!
27 : //! Regardless of how we accomplish the above, it should not
28 : //! prevent the Timeline from shutting down promptly.
29 : //!
30 : //!
31 : //! # Design
32 : //!
33 : //! ## Data Structures
34 : //!
35 : //! There are two concepts expressed as associated types in the `Types` trait:
36 : //! - `TenantManager`: the thing that performs the expensive work. It produces
37 : //! a `Timeline` object, which is the other associated type.
38 : //! - `Timeline`: the item that we cache for fast (TenantTimelineId,ShardSelector) lookup.
39 : //!
40 : //! There are three user-facing data structures exposed by this module:
41 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
42 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
43 : //! - `Handle`: a smart pointer that derefs to the Types::Timeline.
44 : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
45 : //! trying to ugprade back to a `Handle`. If successful, a re-upgraded Handle will always
46 : //! point to the same cached `Types::Timeline`. Upgrades never invoke the `TenantManager`.
47 : //!
48 : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
49 : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
50 : //!
51 : //! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
52 : //! referenced weakly and strongly from various places which we are now illustrating.
53 : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
54 : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
55 : //! or `Weak<Mutex<HandleInner>>`, respectively.
56 : //!
57 : //! - The `Handle` is a strong ref.
58 : //! - The `WeakHandle` is a weak ref.
59 : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
60 : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
61 : //!
62 : //! Lifetimes:
63 : //! - `WeakHandle` and `Handle`: single pagestream request.
64 : //! - `Cache`: single page service connection.
65 : //! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
66 : //!
67 : //! ## Request Handling Flow (= filling and using the `Cache``)
68 : //!
69 : //! To dispatch a request, the page service connection calls `Cache::get`.
70 : //!
71 : //! A cache miss means we call Types::TenantManager::resolve for shard routing,
72 : //! cloning the `Arc<Timeline>` out of it, and entering the gate. The result of
73 : //! resolve() is the object we want to cache, and return `Handle`s to for subseqent `Cache::get` calls.
74 : //!
75 : //! We wrap the object returned from resolve() in an `Arc` and store that inside the
76 : //! `Arc<Mutex<HandleInner>>>`. A weak ref to the HandleInner is stored in the `Cache`
77 : //! and a strong ref in the `PerTimelineState`.
78 : //! Another strong ref is returned wrapped in a `Handle`.
79 : //!
80 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
81 : //! and find the weak ref in the cache.
82 : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
83 : //!
84 : //! The pagestream processing is pipelined and involves a batching step.
85 : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
86 : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
87 : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
88 : //! It then drops the `Handle`, and thus the `Arc<Mutex<HandleInner>>` inside it.
89 : //!
90 : //! # Performance
91 : //!
92 : //! Remember from the introductory section:
93 : //!
94 : //! > We want to avoid the overhead of doing, for each incoming request,
95 : //! > - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
96 : //! > - cloning the `Arc<Timeline>` out of the tenant manager so we can
97 : //! > release the mgr rwlock before doing any request processing work
98 : //! > - re-entering the Timeline gate for each Timeline method invocation.
99 : //!
100 : //! All of these boil down to some state that is either globally shared among all shards
101 : //! or state shared among all tasks that serve a particular timeline.
102 : //! It is either protected by RwLock or manipulated via atomics.
103 : //! Even atomics are costly when shared across multiple cores.
104 : //! So, we want to avoid any permanent need for coordination between page_service tasks.
105 : //!
106 : //! The solution is to add indirection: we wrap the Types::Timeline object that is
107 : //! returned by Types::TenantManager into an Arc that is rivate to the `HandleInner`
108 : //! and hence to the single Cache / page_service connection.
109 : //! (Review the "Data Structures" section if that is unclear to you.)
110 : //!
111 : //!
112 : //! When upgrading a `WeakHandle`, we upgrade its weak to a strong ref (of the `Mutex<HandleInner>`),
113 : //! lock the mutex, take out a clone of the `Arc<Types::Timeline>`, and drop the Mutex.
114 : //! The Mutex is not contended because it is private to the connection.
115 : //! And again, the `Arc<Types::Timeline>` clone is cheap because that wrapper
116 : //! Arc's refcounts are private to the connection.
117 : //!
118 : //! Downgrading drops these two Arcs, which again, manipulates refcounts that are private to the connection.
119 : //!
120 : //!
121 : //! # Shutdown
122 : //!
123 : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
124 : //!
125 : //! ```text
126 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> Timeline
127 : //! ```
128 : //!
129 : //! Further, there is this cycle:
130 : //!
131 : //! ```text
132 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> GateGuard --keepalive--> Timeline
133 : //! ```
134 : //!
135 : //! The former cycle is a memory leak if not broken.
136 : //! The latter cycle further prevents the Timeline from shutting down
137 : //! because we certainly won't drop the Timeline while the GateGuard is alive.
138 : //! Preventing shutdown is the whole point of this handle/cache system,
139 : //! but when the Timeline needs to shut down, we need to break the cycle.
140 : //!
141 : //! The cycle is broken by either
142 : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
143 : //! - Connection shutdown (=> dropping the `Cache`).
144 : //!
145 : //! Both transition the `HandleInner` from [`HandleInner::Open`] to
146 : //! [`HandleInner::ShutDown`], which drops the only long-lived
147 : //! `Arc<Types::Timeline>`. Once the last short-lived Arc<Types::Timeline>
148 : //! is dropped, the `Types::Timeline` gets dropped and thereby
149 : //! the `GateGuard` and the `Arc<Timeline>` that it stores,
150 : //! thereby breaking both cycles.
151 : //!
152 : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
153 : //! thereby breaking the cycle.
154 : //! It also initiates draining of already existing `Handle`s by
155 : //! poisoning things so that no new `HandleInner`'s can be added
156 : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
157 : //!
158 : //! Concurrently existing / already upgraded `Handle`s will extend the
159 : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
160 : //! However, since `Handle`s are short-lived and new `Handle`s are not
161 : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
162 : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
163 : //!
164 : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
165 : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
166 : //! they will find the inner in state `HandleInner::ShutDown` state where the
167 : //! `Arc<GateGuard>` and Timeline has already been dropped.
168 : //!
169 : //! Dropping the `Cache` undoes the registration of this `Cache`'s
170 : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
171 : //! removes the strong ref to each of its `HandleInner`s
172 : //! from all the `PerTimelineState`.
173 : //!
174 : //! # Locking Rules
175 : //!
176 : //! To prevent deadlocks we:
177 : //!
178 : //! 1. Only ever hold one of the locks at a time.
179 : //! 2. Don't add more than one Drop impl that locks on the
180 : //! cycles above.
181 : //!
182 : //! As per (2), that impl is in `Drop for Cache`.
183 : //!
184 : //! # Fast Path for Shard Routing
185 : //!
186 : //! The `Cache` has a fast path for shard routing to avoid calling into
187 : //! the tenant manager for every request.
188 : //!
189 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
190 : //!
191 : //! The current implementation uses the first entry in the hash map
192 : //! to determine the `ShardParameters` and derive the correct
193 : //! `ShardIndex` for the requested key.
194 : //!
195 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
196 : //!
197 : //! If the lookup is successful and the `WeakHandle` can be upgraded,
198 : //! it's a hit.
199 : //!
200 : //! ## Cache invalidation
201 : //!
202 : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
203 : //! The only reasons why an entry in the cache can become stale are:
204 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
205 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
206 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
207 : //!
208 : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
209 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
210 : //!
211 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
212 : //! to the parent shard during a shard split. Eventually, the shard split task will
213 : //! shut down the parent => case (1).
214 :
215 : use std::collections::HashMap;
216 : use std::collections::hash_map;
217 : use std::sync::Arc;
218 : use std::sync::Mutex;
219 : use std::sync::Weak;
220 : use std::time::Duration;
221 :
222 : use pageserver_api::shard::ShardIdentity;
223 : use tracing::{instrument, trace};
224 : use utils::id::TimelineId;
225 : use utils::shard::{ShardIndex, ShardNumber};
226 :
227 : use crate::tenant::mgr::ShardSelector;
228 :
229 : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
230 : pub(crate) trait Types: Sized + std::fmt::Debug {
231 : type TenantManagerError: Sized + std::fmt::Debug;
232 : type TenantManager: TenantManager<Self> + Sized;
233 : type Timeline: Timeline<Self> + Sized;
234 : }
235 :
236 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
237 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
238 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
239 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
240 : struct CacheId(u64);
241 :
242 : impl CacheId {
243 16 : fn next() -> Self {
244 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
245 16 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
246 16 : if id == 0 {
247 0 : panic!("CacheId::new() returned 0, overflow");
248 16 : }
249 16 : Self(id)
250 16 : }
251 : }
252 :
253 : /// See module-level comment.
254 : pub(crate) struct Cache<T: Types> {
255 : id: CacheId,
256 : map: Map<T>,
257 : }
258 :
259 : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
260 :
261 : impl<T: Types> Default for Cache<T> {
262 16 : fn default() -> Self {
263 16 : Self {
264 16 : id: CacheId::next(),
265 16 : map: Default::default(),
266 16 : }
267 16 : }
268 : }
269 :
270 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
271 : pub(crate) struct ShardTimelineId {
272 : pub(crate) shard_index: ShardIndex,
273 : pub(crate) timeline_id: TimelineId,
274 : }
275 :
276 : /// See module-level comment.
277 : pub(crate) struct Handle<T: Types> {
278 : inner: Arc<Mutex<HandleInner<T>>>,
279 : open: Arc<T::Timeline>,
280 : }
281 : pub(crate) struct WeakHandle<T: Types> {
282 : inner: Weak<Mutex<HandleInner<T>>>,
283 : }
284 :
285 : enum HandleInner<T: Types> {
286 : Open(Arc<T::Timeline>),
287 : ShutDown,
288 : }
289 :
290 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
291 : ///
292 : /// See module-level comment for details.
293 : pub struct PerTimelineState<T: Types> {
294 : // None = shutting down
295 : #[allow(clippy::type_complexity)]
296 : handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
297 : }
298 :
299 : impl<T: Types> Default for PerTimelineState<T> {
300 245 : fn default() -> Self {
301 245 : Self {
302 245 : handles: Mutex::new(Some(Default::default())),
303 245 : }
304 245 : }
305 : }
306 :
307 : /// Abstract view of [`crate::tenant::mgr`], for testability.
308 : pub(crate) trait TenantManager<T: Types> {
309 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
310 : /// Errors are returned as [`GetError::TenantManager`].
311 : async fn resolve(
312 : &self,
313 : timeline_id: TimelineId,
314 : shard_selector: ShardSelector,
315 : ) -> Result<T::Timeline, T::TenantManagerError>;
316 : }
317 :
318 : /// Abstract view of an [`Arc<Timeline>`], for testability.
319 : pub(crate) trait Timeline<T: Types> {
320 : fn shard_timeline_id(&self) -> ShardTimelineId;
321 : fn get_shard_identity(&self) -> &ShardIdentity;
322 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
323 : }
324 :
325 : /// Errors returned by [`Cache::get`].
326 : #[derive(Debug)]
327 : pub(crate) enum GetError<T: Types> {
328 : TenantManager(T::TenantManagerError),
329 : PerTimelineStateShutDown,
330 : }
331 :
332 : /// Internal type used in [`Cache::get`].
333 : enum RoutingResult<T: Types> {
334 : FastPath(Handle<T>),
335 : SlowPath(ShardTimelineId),
336 : NeedConsultTenantManager,
337 : }
338 :
339 : impl<T: Types> Cache<T> {
340 : /* BEGIN_HADRON */
341 : /// A wrapper of do_get to resolve the tenant shard for a get page request.
342 : #[instrument(level = "trace", skip_all)]
343 : pub(crate) async fn get(
344 : &mut self,
345 : timeline_id: TimelineId,
346 : shard_selector: ShardSelector,
347 : tenant_manager: &T::TenantManager,
348 : ) -> Result<Handle<T>, GetError<T>> {
349 : const GET_MAX_RETRIES: usize = 10;
350 : const RETRY_BACKOFF: Duration = Duration::from_millis(100);
351 : let mut attempt = 0;
352 : loop {
353 : attempt += 1;
354 : match self
355 : .do_get(timeline_id, shard_selector, tenant_manager)
356 : .await
357 : {
358 : Ok(handle) => return Ok(handle),
359 : Err(e) => {
360 : // Retry on tenant manager error to handle tenant split more gracefully
361 : if attempt < GET_MAX_RETRIES {
362 : tokio::time::sleep(RETRY_BACKOFF).await;
363 : continue;
364 : } else {
365 : tracing::warn!(
366 : "Failed to resolve tenant shard after {} attempts: {:?}",
367 : GET_MAX_RETRIES,
368 : e
369 : );
370 : return Err(e);
371 : }
372 : }
373 : }
374 : }
375 : }
376 : /* END_HADRON */
377 :
378 : /// See module-level comment for details.
379 : ///
380 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
381 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
382 : /// the [`Handle`] are responsible for checking these conditions
383 : /// and if so, return an error that causes the page service to
384 : /// close the connection.
385 : #[instrument(level = "trace", skip_all)]
386 : async fn do_get(
387 : &mut self,
388 : timeline_id: TimelineId,
389 : shard_selector: ShardSelector,
390 : tenant_manager: &T::TenantManager,
391 : ) -> Result<Handle<T>, GetError<T>> {
392 : // terminates because when every iteration we remove an element from the map
393 : let miss: ShardSelector = loop {
394 : let routing_state = self.shard_routing(timeline_id, shard_selector);
395 : match routing_state {
396 : RoutingResult::FastPath(handle) => return Ok(handle),
397 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
398 : Some(cached) => match cached.upgrade() {
399 : Ok(upgraded) => return Ok(upgraded),
400 : Err(HandleUpgradeError::ShutDown) => {
401 : // TODO: dedup with shard_routing()
402 : trace!("handle cache stale");
403 : self.map.remove(&key).unwrap();
404 : continue;
405 : }
406 : },
407 : None => break ShardSelector::Known(key.shard_index),
408 : },
409 : RoutingResult::NeedConsultTenantManager => break shard_selector,
410 : }
411 : };
412 : self.get_miss(timeline_id, miss, tenant_manager).await
413 : }
414 :
415 : #[inline(always)]
416 56 : fn shard_routing(
417 56 : &mut self,
418 56 : timeline_id: TimelineId,
419 56 : shard_selector: ShardSelector,
420 56 : ) -> RoutingResult<T> {
421 : loop {
422 : // terminates because when every iteration we remove an element from the map
423 58 : let Some((first_key, first_handle)) = self.map.iter().next() else {
424 37 : return RoutingResult::NeedConsultTenantManager;
425 : };
426 21 : let Ok(first_handle) = first_handle.upgrade() else {
427 : // TODO: dedup with get()
428 2 : trace!("handle cache stale");
429 2 : let first_key_owned = *first_key;
430 2 : self.map.remove(&first_key_owned).unwrap();
431 2 : continue;
432 : };
433 :
434 19 : let first_handle_shard_identity = first_handle.get_shard_identity();
435 19 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
436 19 : shard_number: shard_num,
437 19 : shard_count: first_handle_shard_identity.count,
438 19 : };
439 :
440 19 : let need_idx = match shard_selector {
441 19 : ShardSelector::Page(key) => {
442 19 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
443 : }
444 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
445 0 : ShardSelector::Known(shard_idx) => shard_idx,
446 : };
447 19 : let need_shard_timeline_id = ShardTimelineId {
448 19 : shard_index: need_idx,
449 19 : timeline_id,
450 19 : };
451 19 : let first_handle_shard_timeline_id = ShardTimelineId {
452 19 : shard_index: first_handle_shard_identity.shard_index(),
453 19 : timeline_id: first_handle.shard_timeline_id().timeline_id,
454 19 : };
455 :
456 19 : if need_shard_timeline_id == first_handle_shard_timeline_id {
457 6 : return RoutingResult::FastPath(first_handle);
458 : } else {
459 13 : return RoutingResult::SlowPath(need_shard_timeline_id);
460 : }
461 : }
462 56 : }
463 :
464 : #[instrument(level = "trace", skip_all)]
465 : #[inline(always)]
466 : async fn get_miss(
467 : &mut self,
468 : timeline_id: TimelineId,
469 : shard_selector: ShardSelector,
470 : tenant_manager: &T::TenantManager,
471 : ) -> Result<Handle<T>, GetError<T>> {
472 : match tenant_manager.resolve(timeline_id, shard_selector).await {
473 : Ok(timeline) => {
474 : let key = timeline.shard_timeline_id();
475 : match &shard_selector {
476 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
477 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
478 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
479 : }
480 :
481 : trace!("creating new HandleInner");
482 : let timeline = Arc::new(timeline);
483 : let handle_inner_arc =
484 : Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
485 : let handle_weak = WeakHandle {
486 : inner: Arc::downgrade(&handle_inner_arc),
487 : };
488 : let handle = handle_weak
489 : .upgrade()
490 : .ok()
491 : .expect("we just created it and it's not linked anywhere yet");
492 : {
493 : let mut lock_guard = timeline
494 : .per_timeline_state()
495 : .handles
496 : .lock()
497 : .expect("mutex poisoned");
498 : match &mut *lock_guard {
499 : Some(per_timeline_state) => {
500 : let replaced =
501 : per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
502 : assert!(replaced.is_none(), "some earlier code left a stale handle");
503 : match self.map.entry(key) {
504 : hash_map::Entry::Occupied(_o) => {
505 : // This cannot not happen because
506 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
507 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
508 : // while we were waiting for the tenant manager.
509 : unreachable!()
510 : }
511 : hash_map::Entry::Vacant(v) => {
512 : v.insert(handle_weak);
513 : }
514 : }
515 : }
516 : None => {
517 : return Err(GetError::PerTimelineStateShutDown);
518 : }
519 : }
520 : }
521 : Ok(handle)
522 : }
523 : Err(e) => Err(GetError::TenantManager(e)),
524 : }
525 : }
526 : }
527 :
528 : pub(crate) enum HandleUpgradeError {
529 : ShutDown,
530 : }
531 :
532 : impl<T: Types> WeakHandle<T> {
533 54 : pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
534 54 : let Some(inner) = Weak::upgrade(&self.inner) else {
535 2 : return Err(HandleUpgradeError::ShutDown);
536 : };
537 52 : let lock_guard = inner.lock().expect("poisoned");
538 52 : match &*lock_guard {
539 49 : HandleInner::Open(open) => {
540 49 : let open = Arc::clone(open);
541 49 : drop(lock_guard);
542 49 : Ok(Handle { open, inner })
543 : }
544 3 : HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
545 : }
546 54 : }
547 :
548 0 : pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
549 0 : Weak::ptr_eq(&self.inner, &other.inner)
550 0 : }
551 : }
552 :
553 : impl<T: Types> std::ops::Deref for Handle<T> {
554 : type Target = T::Timeline;
555 71 : fn deref(&self) -> &Self::Target {
556 71 : &self.open
557 71 : }
558 : }
559 :
560 : impl<T: Types> Handle<T> {
561 1 : pub(crate) fn downgrade(&self) -> WeakHandle<T> {
562 1 : WeakHandle {
563 1 : inner: Arc::downgrade(&self.inner),
564 1 : }
565 1 : }
566 : }
567 :
568 : impl<T: Types> PerTimelineState<T> {
569 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
570 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
571 : /// Even if [`TenantManager::resolve`] would still resolve to it.
572 : ///
573 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`Types::Timeline`] alive.
574 : /// That's ok because they're short-lived. See module-level comment for details.
575 : #[instrument(level = "trace", skip_all)]
576 : pub(super) fn shutdown(&self) {
577 : let handles = self
578 : .handles
579 : .lock()
580 : .expect("mutex poisoned")
581 : // NB: this .take() sets locked to None.
582 : // That's what makes future `Cache::get` misses fail.
583 : // Cache hits are taken care of below.
584 : .take();
585 : let Some(handles) = handles else {
586 : trace!("already shut down");
587 : return;
588 : };
589 : for handle_inner_arc in handles.values() {
590 : // Make hits fail.
591 : let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
592 : lock_guard.shutdown();
593 : }
594 : drop(handles);
595 : }
596 : }
597 :
598 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
599 : impl<T: Types> Drop for Cache<T> {
600 16 : fn drop(&mut self) {
601 : for (
602 : _,
603 : WeakHandle {
604 16 : inner: handle_inner_weak,
605 : },
606 16 : ) in self.map.drain()
607 : {
608 16 : let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
609 2 : continue;
610 : };
611 14 : let Some(handle_timeline) = handle_inner_arc
612 14 : // locking rules: drop lock before acquiring other lock below
613 14 : .lock()
614 14 : .expect("poisoned")
615 14 : .shutdown()
616 : else {
617 : // Concurrent PerTimelineState::shutdown.
618 0 : continue;
619 : };
620 : // Clean up per_timeline_state so the HandleInner allocation can be dropped.
621 14 : let per_timeline_state = handle_timeline.per_timeline_state();
622 14 : let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
623 14 : let Some(handles) = &mut *handles_lock_guard else {
624 0 : continue;
625 : };
626 14 : let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
627 : // Concurrent PerTimelineState::shutdown.
628 0 : continue;
629 : };
630 14 : drop(handles_lock_guard); // locking rules!
631 14 : assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
632 : }
633 16 : }
634 : }
635 :
636 : impl<T: Types> HandleInner<T> {
637 19 : fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
638 19 : match std::mem::replace(self, HandleInner::ShutDown) {
639 19 : HandleInner::Open(timeline) => Some(timeline),
640 : HandleInner::ShutDown => {
641 : // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
642 : // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
643 : // the handle lock at the same time.
644 0 : None
645 : }
646 : }
647 19 : }
648 : }
649 :
650 : #[cfg(test)]
651 : mod tests {
652 : use std::sync::Weak;
653 :
654 : use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
655 : use pageserver_api::models::ShardParameters;
656 : use pageserver_api::reltag::RelTag;
657 : use pageserver_api::shard::ShardStripeSize;
658 : use utils::shard::ShardCount;
659 : use utils::sync::gate::GateGuard;
660 :
661 : use super::*;
662 :
663 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
664 :
665 : #[derive(Debug)]
666 : struct TestTypes;
667 : impl Types for TestTypes {
668 : type TenantManagerError = anyhow::Error;
669 : type TenantManager = StubManager;
670 : type Timeline = Entered;
671 : }
672 :
673 : struct StubManager {
674 : shards: Vec<Arc<StubTimeline>>,
675 : }
676 :
677 : struct StubTimeline {
678 : gate: utils::sync::gate::Gate,
679 : id: TimelineId,
680 : shard: ShardIdentity,
681 : per_timeline_state: PerTimelineState<TestTypes>,
682 : myself: Weak<StubTimeline>,
683 : }
684 :
685 : struct Entered {
686 : timeline: Arc<StubTimeline>,
687 : #[allow(dead_code)] // it's stored here to keep the gate open
688 : gate_guard: Arc<GateGuard>,
689 : }
690 :
691 : impl StubTimeline {
692 11 : fn getpage(&self) {
693 : // do nothing
694 11 : }
695 : }
696 :
697 : impl Timeline<TestTypes> for Entered {
698 48 : fn shard_timeline_id(&self) -> ShardTimelineId {
699 48 : ShardTimelineId {
700 48 : shard_index: self.shard.shard_index(),
701 48 : timeline_id: self.id,
702 48 : }
703 48 : }
704 :
705 19 : fn get_shard_identity(&self) -> &ShardIdentity {
706 19 : &self.shard
707 19 : }
708 :
709 43 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
710 43 : &self.per_timeline_state
711 43 : }
712 : }
713 :
714 : impl TenantManager<TestTypes> for StubManager {
715 50 : async fn resolve(
716 50 : &self,
717 50 : timeline_id: TimelineId,
718 50 : shard_selector: ShardSelector,
719 50 : ) -> anyhow::Result<Entered> {
720 63 : for timeline in &self.shards {
721 52 : if timeline.id == timeline_id {
722 40 : let enter_gate = || {
723 39 : let gate_guard = timeline.gate.enter()?;
724 29 : let gate_guard = Arc::new(gate_guard);
725 29 : anyhow::Ok(gate_guard)
726 39 : };
727 0 : match &shard_selector {
728 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
729 : return Ok(Entered {
730 0 : timeline: Arc::clone(timeline),
731 0 : gate_guard: enter_gate()?,
732 : });
733 : }
734 0 : ShardSelector::Zero => continue,
735 37 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
736 : return Ok(Entered {
737 37 : timeline: Arc::clone(timeline),
738 37 : gate_guard: enter_gate()?,
739 : });
740 : }
741 0 : ShardSelector::Page(_) => continue,
742 3 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
743 : return Ok(Entered {
744 2 : timeline: Arc::clone(timeline),
745 2 : gate_guard: enter_gate()?,
746 : });
747 : }
748 1 : ShardSelector::Known(_) => continue,
749 : }
750 12 : }
751 : }
752 11 : anyhow::bail!("not found")
753 50 : }
754 : }
755 :
756 : impl std::ops::Deref for Entered {
757 : type Target = StubTimeline;
758 191 : fn deref(&self) -> &Self::Target {
759 191 : &self.timeline
760 191 : }
761 : }
762 :
763 : #[tokio::test(start_paused = true)]
764 1 : async fn test_timeline_shutdown() {
765 1 : crate::tenant::harness::setup_logging();
766 :
767 1 : let timeline_id = TimelineId::generate();
768 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
769 1 : gate: Default::default(),
770 1 : id: timeline_id,
771 1 : shard: ShardIdentity::unsharded(),
772 1 : per_timeline_state: PerTimelineState::default(),
773 1 : myself: myself.clone(),
774 1 : });
775 1 : let mgr = StubManager {
776 1 : shards: vec![shard0.clone()],
777 1 : };
778 1 : let key = DBDIR_KEY;
779 :
780 1 : let mut cache = Cache::<TestTypes>::default();
781 :
782 : //
783 : // fill the cache
784 : //
785 1 : let handle: Handle<_> = cache
786 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
787 1 : .await
788 1 : .expect("we have the timeline");
789 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
790 1 : assert_eq!(cache.map.len(), 1);
791 1 : drop(handle);
792 :
793 : //
794 : // demonstrate that Handle holds up gate closure
795 : // but shutdown prevents new handles from being handed out
796 : //
797 :
798 1 : tokio::select! {
799 1 : _ = shard0.gate.close() => {
800 0 : panic!("cache and per-timeline handler state keep cache open");
801 : }
802 1 : _ = tokio::time::sleep(FOREVER) => {
803 1 : // NB: first poll of close() makes it enter closing state
804 1 : }
805 : }
806 :
807 1 : let handle = cache
808 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
809 1 : .await
810 1 : .expect("we have the timeline");
811 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
812 :
813 : // SHUTDOWN
814 1 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
815 :
816 1 : assert_eq!(
817 1 : cache.map.len(),
818 : 1,
819 0 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
820 : );
821 :
822 : // this handle is perfectly usable
823 1 : handle.getpage();
824 :
825 1 : cache
826 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
827 1 : .await
828 1 : .err()
829 1 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
830 1 : assert_eq!(
831 1 : cache.map.len(),
832 : 0,
833 0 : "first access after shutdown cleans up the Weak's from the cache"
834 : );
835 :
836 1 : tokio::select! {
837 1 : _ = shard0.gate.close() => {
838 0 : panic!("handle is keeping gate open");
839 : }
840 1 : _ = tokio::time::sleep(FOREVER) => { }
841 : }
842 :
843 1 : drop(handle);
844 :
845 : // closing gate succeeds after dropping handle
846 1 : tokio::select! {
847 1 : _ = shard0.gate.close() => { }
848 1 : _ = tokio::time::sleep(FOREVER) => {
849 0 : panic!("handle is dropped, no other gate holders exist")
850 : }
851 : }
852 :
853 : // map gets cleaned on next lookup
854 1 : cache
855 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
856 1 : .await
857 1 : .err()
858 1 : .expect("documented behavior: can't get new handle after shutdown");
859 1 : assert_eq!(cache.map.len(), 0);
860 :
861 : // ensure all refs to shard0 are gone and we're not leaking anything
862 1 : drop(shard0);
863 1 : drop(mgr);
864 1 : }
865 :
866 : #[tokio::test]
867 1 : async fn test_multiple_timelines_and_deletion() {
868 1 : crate::tenant::harness::setup_logging();
869 :
870 1 : let timeline_a = TimelineId::generate();
871 1 : let timeline_b = TimelineId::generate();
872 1 : assert_ne!(timeline_a, timeline_b);
873 1 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
874 1 : gate: Default::default(),
875 1 : id: timeline_a,
876 1 : shard: ShardIdentity::unsharded(),
877 1 : per_timeline_state: PerTimelineState::default(),
878 1 : myself: myself.clone(),
879 1 : });
880 1 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
881 1 : gate: Default::default(),
882 1 : id: timeline_b,
883 1 : shard: ShardIdentity::unsharded(),
884 1 : per_timeline_state: PerTimelineState::default(),
885 1 : myself: myself.clone(),
886 1 : });
887 1 : let mut mgr = StubManager {
888 1 : shards: vec![timeline_a.clone(), timeline_b.clone()],
889 1 : };
890 1 : let key = DBDIR_KEY;
891 :
892 1 : let mut cache = Cache::<TestTypes>::default();
893 :
894 1 : cache
895 1 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
896 1 : .await
897 1 : .expect("we have it");
898 1 : cache
899 1 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
900 1 : .await
901 1 : .expect("we have it");
902 1 : assert_eq!(cache.map.len(), 2);
903 :
904 : // delete timeline A
905 1 : timeline_a.per_timeline_state.shutdown();
906 2 : mgr.shards.retain(|t| t.id != timeline_a.id);
907 1 : assert!(
908 1 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
909 1 : .await
910 1 : .is_err(),
911 0 : "broken StubManager implementation"
912 : );
913 :
914 1 : assert_eq!(
915 1 : cache.map.len(),
916 : 2,
917 0 : "cache still has a Weak handle to Timeline A"
918 : );
919 1 : cache
920 1 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
921 1 : .await
922 1 : .err()
923 1 : .expect("documented behavior: can't get new handle after shutdown");
924 :
925 1 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
926 :
927 1 : cache
928 1 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
929 1 : .await
930 1 : .expect("we still have it");
931 1 : }
932 :
933 7 : fn make_relation_key_for_shard(shard: ShardNumber, params: ShardParameters) -> Key {
934 7 : rel_block_to_key(
935 7 : RelTag {
936 7 : spcnode: 1663,
937 7 : dbnode: 208101,
938 7 : relnode: 2620,
939 7 : forknum: 0,
940 7 : },
941 7 : shard.0 as u32 * params.stripe_size.0,
942 : )
943 7 : }
944 :
945 : #[tokio::test(start_paused = true)]
946 1 : async fn test_shard_split() {
947 1 : crate::tenant::harness::setup_logging();
948 1 : let timeline_id = TimelineId::generate();
949 1 : let parent = Arc::new_cyclic(|myself| StubTimeline {
950 1 : gate: Default::default(),
951 1 : id: timeline_id,
952 1 : shard: ShardIdentity::unsharded(),
953 1 : per_timeline_state: PerTimelineState::default(),
954 1 : myself: myself.clone(),
955 1 : });
956 1 : let child_params = ShardParameters {
957 1 : count: ShardCount(2),
958 1 : stripe_size: ShardStripeSize::default(),
959 1 : };
960 1 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
961 1 : gate: Default::default(),
962 1 : id: timeline_id,
963 1 : shard: ShardIdentity::from_params(ShardNumber(0), child_params),
964 1 : per_timeline_state: PerTimelineState::default(),
965 1 : myself: myself.clone(),
966 1 : });
967 1 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
968 1 : gate: Default::default(),
969 1 : id: timeline_id,
970 1 : shard: ShardIdentity::from_params(ShardNumber(1), child_params),
971 1 : per_timeline_state: PerTimelineState::default(),
972 1 : myself: myself.clone(),
973 1 : });
974 1 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
975 :
976 1 : let mut cache = Cache::<TestTypes>::default();
977 :
978 : // fill the cache with the parent
979 3 : for i in 0..2 {
980 2 : let handle = cache
981 2 : .get(
982 2 : timeline_id,
983 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
984 2 : &StubManager {
985 2 : shards: vec![parent.clone()],
986 2 : },
987 2 : )
988 2 : .await
989 2 : .expect("we have it");
990 2 : assert!(
991 2 : Weak::ptr_eq(&handle.myself, &parent.myself),
992 0 : "mgr returns parent first"
993 : );
994 2 : drop(handle);
995 : }
996 :
997 : //
998 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
999 : //
1000 :
1001 : // while we haven't shut down the parent, the cache will return the cached parent, even
1002 : // if the tenant manager returns the child
1003 3 : for i in 0..2 {
1004 2 : let handle = cache
1005 2 : .get(
1006 2 : timeline_id,
1007 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
1008 2 : &StubManager {
1009 2 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
1010 2 : },
1011 2 : )
1012 2 : .await
1013 2 : .expect("we have it");
1014 2 : assert!(
1015 2 : Weak::ptr_eq(&handle.myself, &parent.myself),
1016 0 : "mgr returns parent"
1017 : );
1018 2 : drop(handle);
1019 : }
1020 :
1021 1 : let parent_handle = cache
1022 1 : .get(
1023 1 : timeline_id,
1024 1 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), child_params)),
1025 1 : &StubManager {
1026 1 : shards: vec![parent.clone()],
1027 1 : },
1028 1 : )
1029 1 : .await
1030 1 : .expect("we have it");
1031 1 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
1032 :
1033 : // invalidate the cache
1034 1 : parent.per_timeline_state.shutdown();
1035 :
1036 : // the cache will now return the child, even though the parent handle still exists
1037 3 : for i in 0..2 {
1038 2 : let handle = cache
1039 2 : .get(
1040 2 : timeline_id,
1041 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
1042 2 : &StubManager {
1043 2 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
1044 2 : },
1045 2 : )
1046 2 : .await
1047 2 : .expect("we have it");
1048 2 : assert!(
1049 2 : Weak::ptr_eq(
1050 2 : &handle.myself,
1051 2 : &child_shards_by_shard_number[i as usize].myself
1052 : ),
1053 0 : "mgr returns child"
1054 : );
1055 2 : drop(handle);
1056 : }
1057 :
1058 : // all the while the parent handle kept the parent gate open
1059 1 : tokio::select! {
1060 1 : _ = parent_handle.gate.close() => {
1061 0 : panic!("parent handle is keeping gate open");
1062 : }
1063 1 : _ = tokio::time::sleep(FOREVER) => { }
1064 : }
1065 1 : drop(parent_handle);
1066 1 : tokio::select! {
1067 1 : _ = parent.gate.close() => { }
1068 1 : _ = tokio::time::sleep(FOREVER) => {
1069 1 : panic!("parent handle is dropped, no other gate holders exist")
1070 1 : }
1071 1 : }
1072 1 : }
1073 :
1074 : #[tokio::test(start_paused = true)]
1075 1 : async fn test_connection_handler_exit() {
1076 1 : crate::tenant::harness::setup_logging();
1077 1 : let timeline_id = TimelineId::generate();
1078 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1079 1 : gate: Default::default(),
1080 1 : id: timeline_id,
1081 1 : shard: ShardIdentity::unsharded(),
1082 1 : per_timeline_state: PerTimelineState::default(),
1083 1 : myself: myself.clone(),
1084 1 : });
1085 1 : let mgr = StubManager {
1086 1 : shards: vec![shard0.clone()],
1087 1 : };
1088 1 : let key = DBDIR_KEY;
1089 :
1090 : // Simulate 10 connections that's opened, used, and closed
1091 11 : for _ in 0..10 {
1092 10 : let mut cache = Cache::<TestTypes>::default();
1093 10 : let handle = {
1094 10 : let handle = cache
1095 10 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1096 10 : .await
1097 10 : .expect("we have the timeline");
1098 10 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1099 10 : handle
1100 1 : };
1101 10 : handle.getpage();
1102 1 : }
1103 1 :
1104 1 : // No handles exist, thus gates are closed and don't require shutdown.
1105 1 : // Thus the gate should close immediately, even without shutdown.
1106 1 : tokio::select! {
1107 1 : _ = shard0.gate.close() => { }
1108 1 : _ = tokio::time::sleep(FOREVER) => {
1109 1 : panic!("handle is dropped, no other gate holders exist")
1110 1 : }
1111 1 : }
1112 1 : }
1113 :
1114 : #[tokio::test(start_paused = true)]
1115 1 : async fn test_weak_handles() {
1116 1 : crate::tenant::harness::setup_logging();
1117 1 : let timeline_id = TimelineId::generate();
1118 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1119 1 : gate: Default::default(),
1120 1 : id: timeline_id,
1121 1 : shard: ShardIdentity::unsharded(),
1122 1 : per_timeline_state: PerTimelineState::default(),
1123 1 : myself: myself.clone(),
1124 1 : });
1125 1 : let mgr = StubManager {
1126 1 : shards: vec![shard0.clone()],
1127 1 : };
1128 :
1129 1 : let refcount_start = Arc::strong_count(&shard0);
1130 :
1131 1 : let key = DBDIR_KEY;
1132 :
1133 1 : let mut cache = Cache::<TestTypes>::default();
1134 :
1135 1 : let handle = cache
1136 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1137 1 : .await
1138 1 : .expect("we have the timeline");
1139 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1140 :
1141 1 : let weak_handle = handle.downgrade();
1142 :
1143 1 : drop(handle);
1144 :
1145 1 : let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
1146 :
1147 : // Start shutdown
1148 1 : shard0.per_timeline_state.shutdown();
1149 :
1150 : // Upgrades during shutdown don't work, even if upgraded_handle exists.
1151 1 : weak_handle
1152 1 : .upgrade()
1153 1 : .err()
1154 1 : .expect("can't upgrade weak handle as soon as shutdown started");
1155 :
1156 : // But upgraded_handle is still alive, so the gate won't close.
1157 1 : tokio::select! {
1158 1 : _ = shard0.gate.close() => {
1159 0 : panic!("handle is keeping gate open");
1160 : }
1161 1 : _ = tokio::time::sleep(FOREVER) => { }
1162 : }
1163 :
1164 : // Drop the last handle.
1165 1 : drop(upgraded_handle);
1166 :
1167 : // The gate should close now, despite there still being a weak_handle.
1168 1 : tokio::select! {
1169 1 : _ = shard0.gate.close() => { }
1170 1 : _ = tokio::time::sleep(FOREVER) => {
1171 0 : panic!("only strong handle is dropped and we shut down per-timeline-state")
1172 : }
1173 : }
1174 :
1175 : // The weak handle still can't be upgraded.
1176 1 : weak_handle
1177 1 : .upgrade()
1178 1 : .err()
1179 1 : .expect("still shouldn't be able to upgrade the weak handle");
1180 :
1181 : // There should be no strong references to the timeline object except the one on "stack".
1182 1 : assert_eq!(Arc::strong_count(&shard0), refcount_start);
1183 1 : }
1184 :
1185 : #[tokio::test(start_paused = true)]
1186 1 : async fn test_reference_cycle_broken_when_cache_is_dropped() {
1187 1 : crate::tenant::harness::setup_logging();
1188 1 : let timeline_id = TimelineId::generate();
1189 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1190 1 : gate: Default::default(),
1191 1 : id: timeline_id,
1192 1 : shard: ShardIdentity::unsharded(),
1193 1 : per_timeline_state: PerTimelineState::default(),
1194 1 : myself: myself.clone(),
1195 1 : });
1196 1 : let mgr = StubManager {
1197 1 : shards: vec![shard0.clone()],
1198 1 : };
1199 1 : let key = DBDIR_KEY;
1200 :
1201 1 : let mut cache = Cache::<TestTypes>::default();
1202 :
1203 : // helper to check if a handle is referenced by per_timeline_state
1204 2 : let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
1205 2 : let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
1206 2 : let per_timeline_state = per_timeline_state.as_ref().unwrap();
1207 2 : per_timeline_state
1208 2 : .values()
1209 2 : .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
1210 2 : };
1211 :
1212 : // Fill the cache.
1213 1 : let handle = cache
1214 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1215 1 : .await
1216 1 : .expect("we have the timeline");
1217 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1218 1 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1219 1 : assert!(
1220 1 : per_timeline_state_refs_handle(&handle_inner_weak),
1221 0 : "we still hold `handle` _and_ haven't dropped `cache` yet"
1222 : );
1223 :
1224 : // Drop the cache.
1225 1 : drop(cache);
1226 :
1227 1 : assert!(
1228 1 : !(per_timeline_state_refs_handle(&handle_inner_weak)),
1229 0 : "nothing should reference the handle allocation anymore"
1230 : );
1231 1 : assert!(
1232 1 : Weak::upgrade(&handle_inner_weak).is_some(),
1233 0 : "the local `handle` still keeps the allocation alive"
1234 : );
1235 : // but obviously the cache is gone so no new allocations can be handed out.
1236 :
1237 : // Drop handle.
1238 1 : drop(handle);
1239 1 : assert!(
1240 1 : Weak::upgrade(&handle_inner_weak).is_none(),
1241 1 : "the local `handle` is dropped, so the allocation should be dropped by now"
1242 1 : );
1243 1 : }
1244 :
1245 : #[tokio::test(start_paused = true)]
1246 1 : async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
1247 1 : crate::tenant::harness::setup_logging();
1248 1 : let timeline_id = TimelineId::generate();
1249 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1250 1 : gate: Default::default(),
1251 1 : id: timeline_id,
1252 1 : shard: ShardIdentity::unsharded(),
1253 1 : per_timeline_state: PerTimelineState::default(),
1254 1 : myself: myself.clone(),
1255 1 : });
1256 1 : let mgr = StubManager {
1257 1 : shards: vec![shard0.clone()],
1258 1 : };
1259 1 : let key = DBDIR_KEY;
1260 :
1261 1 : let mut cache = Cache::<TestTypes>::default();
1262 1 : let handle = cache
1263 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1264 1 : .await
1265 1 : .expect("we have the timeline");
1266 : // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
1267 1 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1268 :
1269 : // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
1270 1 : drop(handle);
1271 1 : assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
1272 :
1273 : // Shutdown the per_timeline_state.
1274 1 : shard0.per_timeline_state.shutdown();
1275 1 : assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
1276 :
1277 : // cache only contains Weak's, so, it can outlive the per_timeline_state without
1278 : // Drop explicitly solely to make this point.
1279 1 : drop(cache);
1280 1 : }
1281 : }
|