Line data Source code
1 : //! A cache for [`crate::tenant::mgr`]+`Tenant::get_timeline`+`Timeline::gate.enter()`.
2 : //!
3 : //! # Motivation
4 : //!
5 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
6 : //!
7 : //! Without sharding, there is a single Timeline object to which we dispatch
8 : //! all requests. For example, a getpage request gets dispatched to the
9 : //! Timeline::get method of the Timeline object that represents the
10 : //! (tenant,timeline) of that connection.
11 : //!
12 : //! With sharding, for each request that comes in on the connection,
13 : //! we first have to perform shard routing based on the requested key (=~ page number).
14 : //! The result of shard routing is a Timeline object.
15 : //! We then dispatch the request to that Timeline object.
16 : //!
17 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
18 : //! we hold the Timeline gate open while we're invoking the method on the
19 : //! Timeline object.
20 : //!
21 : //! We want to avoid the overhead of doing, for each incoming request,
22 : //! - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
23 : //! - cloning the `Arc<Timeline>` out of the tenant manager so we can
24 : //! release the mgr rwlock before doing any request processing work
25 : //! - re-entering the Timeline gate for each Timeline method invocation.
26 : //!
27 : //! Regardless of how we accomplish the above, it should not
28 : //! prevent the Timeline from shutting down promptly.
29 : //!
30 : //!
31 : //! # Design
32 : //!
33 : //! ## Data Structures
34 : //!
35 : //! There are two concepts expressed as associated types in the `Types` trait:
36 : //! - `TenantManager`: the thing that performs the expensive work. It produces
37 : //! a `Timeline` object, which is the other associated type.
38 : //! - `Timeline`: the item that we cache for fast (TenantTimelineId,ShardSelector) lookup.
39 : //!
40 : //! There are three user-facing data structures exposed by this module:
41 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
42 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
43 : //! - `Handle`: a smart pointer that derefs to the Types::Timeline.
44 : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
45 : //! trying to ugprade back to a `Handle`. If successful, a re-upgraded Handle will always
46 : //! point to the same cached `Types::Timeline`. Upgrades never invoke the `TenantManager`.
47 : //!
48 : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
49 : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
50 : //!
51 : //! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
52 : //! referenced weakly and strongly from various places which we are now illustrating.
53 : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
54 : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
55 : //! or `Weak<Mutex<HandleInner>>`, respectively.
56 : //!
57 : //! - The `Handle` is a strong ref.
58 : //! - The `WeakHandle` is a weak ref.
59 : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
60 : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
61 : //!
62 : //! Lifetimes:
63 : //! - `WeakHandle` and `Handle`: single pagestream request.
64 : //! - `Cache`: single page service connection.
65 : //! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
66 : //!
67 : //! ## Request Handling Flow (= filling and using the `Cache``)
68 : //!
69 : //! To dispatch a request, the page service connection calls `Cache::get`.
70 : //!
71 : //! A cache miss means we call Types::TenantManager::resolve for shard routing,
72 : //! cloning the `Arc<Timeline>` out of it, and entering the gate. The result of
73 : //! resolve() is the object we want to cache, and return `Handle`s to for subseqent `Cache::get` calls.
74 : //!
75 : //! We wrap the object returned from resolve() in an `Arc` and store that inside the
76 : //! `Arc<Mutex<HandleInner>>>`. A weak ref to the HandleInner is stored in the `Cache`
77 : //! and a strong ref in the `PerTimelineState`.
78 : //! Another strong ref is returned wrapped in a `Handle`.
79 : //!
80 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
81 : //! and find the weak ref in the cache.
82 : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
83 : //!
84 : //! The pagestream processing is pipelined and involves a batching step.
85 : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
86 : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
87 : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
88 : //! It then drops the `Handle`, and thus the `Arc<Mutex<HandleInner>>` inside it.
89 : //!
90 : //! # Performance
91 : //!
92 : //! Remember from the introductory section:
93 : //!
94 : //! > We want to avoid the overhead of doing, for each incoming request,
95 : //! > - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
96 : //! > - cloning the `Arc<Timeline>` out of the tenant manager so we can
97 : //! > release the mgr rwlock before doing any request processing work
98 : //! > - re-entering the Timeline gate for each Timeline method invocation.
99 : //!
100 : //! All of these boil down to some state that is either globally shared among all shards
101 : //! or state shared among all tasks that serve a particular timeline.
102 : //! It is either protected by RwLock or manipulated via atomics.
103 : //! Even atomics are costly when shared across multiple cores.
104 : //! So, we want to avoid any permanent need for coordination between page_service tasks.
105 : //!
106 : //! The solution is to add indirection: we wrap the Types::Timeline object that is
107 : //! returned by Types::TenantManager into an Arc that is rivate to the `HandleInner`
108 : //! and hence to the single Cache / page_service connection.
109 : //! (Review the "Data Structures" section if that is unclear to you.)
110 : //!
111 : //!
112 : //! When upgrading a `WeakHandle`, we upgrade its weak to a strong ref (of the `Mutex<HandleInner>`),
113 : //! lock the mutex, take out a clone of the `Arc<Types::Timeline>`, and drop the Mutex.
114 : //! The Mutex is not contended because it is private to the connection.
115 : //! And again, the `Arc<Types::Timeline>` clone is cheap because that wrapper
116 : //! Arc's refcounts are private to the connection.
117 : //!
118 : //! Downgrading drops these two Arcs, which again, manipulates refcounts that are private to the connection.
119 : //!
120 : //!
121 : //! # Shutdown
122 : //!
123 : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
124 : //!
125 : //! ```text
126 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> Timeline
127 : //! ```
128 : //!
129 : //! Further, there is this cycle:
130 : //!
131 : //! ```text
132 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> GateGuard --keepalive--> Timeline
133 : //! ```
134 : //!
135 : //! The former cycle is a memory leak if not broken.
136 : //! The latter cycle further prevents the Timeline from shutting down
137 : //! because we certainly won't drop the Timeline while the GateGuard is alive.
138 : //! Preventing shutdown is the whole point of this handle/cache system,
139 : //! but when the Timeline needs to shut down, we need to break the cycle.
140 : //!
141 : //! The cycle is broken by either
142 : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
143 : //! - Connection shutdown (=> dropping the `Cache`).
144 : //!
145 : //! Both transition the `HandleInner` from [`HandleInner::Open`] to
146 : //! [`HandleInner::ShutDown`], which drops the only long-lived
147 : //! `Arc<Types::Timeline>`. Once the last short-lived Arc<Types::Timeline>
148 : //! is dropped, the `Types::Timeline` gets dropped and thereby
149 : //! the `GateGuard` and the `Arc<Timeline>` that it stores,
150 : //! thereby breaking both cycles.
151 : //!
152 : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
153 : //! thereby breaking the cycle.
154 : //! It also initiates draining of already existing `Handle`s by
155 : //! poisoning things so that no new `HandleInner`'s can be added
156 : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
157 : //!
158 : //! Concurrently existing / already upgraded `Handle`s will extend the
159 : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
160 : //! However, since `Handle`s are short-lived and new `Handle`s are not
161 : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
162 : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
163 : //!
164 : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
165 : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
166 : //! they will find the inner in state `HandleInner::ShutDown` state where the
167 : //! `Arc<GateGuard>` and Timeline has already been dropped.
168 : //!
169 : //! Dropping the `Cache` undoes the registration of this `Cache`'s
170 : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
171 : //! removes the strong ref to each of its `HandleInner`s
172 : //! from all the `PerTimelineState`.
173 : //!
174 : //! # Locking Rules
175 : //!
176 : //! To prevent deadlocks we:
177 : //!
178 : //! 1. Only ever hold one of the locks at a time.
179 : //! 2. Don't add more than one Drop impl that locks on the
180 : //! cycles above.
181 : //!
182 : //! As per (2), that impl is in `Drop for Cache`.
183 : //!
184 : //! # Fast Path for Shard Routing
185 : //!
186 : //! The `Cache` has a fast path for shard routing to avoid calling into
187 : //! the tenant manager for every request.
188 : //!
189 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
190 : //!
191 : //! The current implementation uses the first entry in the hash map
192 : //! to determine the `ShardParameters` and derive the correct
193 : //! `ShardIndex` for the requested key.
194 : //!
195 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
196 : //!
197 : //! If the lookup is successful and the `WeakHandle` can be upgraded,
198 : //! it's a hit.
199 : //!
200 : //! ## Cache invalidation
201 : //!
202 : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
203 : //! The only reasons why an entry in the cache can become stale are:
204 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
205 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
206 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
207 : //!
208 : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
209 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
210 : //!
211 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
212 : //! to the parent shard during a shard split. Eventually, the shard split task will
213 : //! shut down the parent => case (1).
214 :
215 : use std::collections::{HashMap, hash_map};
216 : use std::sync::{Arc, Mutex, Weak};
217 :
218 : use pageserver_api::shard::ShardIdentity;
219 : use tracing::{instrument, trace};
220 : use utils::id::TimelineId;
221 : use utils::shard::{ShardIndex, ShardNumber};
222 :
223 : use crate::tenant::mgr::ShardSelector;
224 :
225 : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
226 : pub(crate) trait Types: Sized + std::fmt::Debug {
227 : type TenantManagerError: Sized + std::fmt::Debug;
228 : type TenantManager: TenantManager<Self> + Sized;
229 : type Timeline: Timeline<Self> + Sized;
230 : }
231 :
232 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
233 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
234 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
235 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
236 : struct CacheId(u64);
237 :
238 : impl CacheId {
239 192 : fn next() -> Self {
240 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
241 192 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
242 192 : if id == 0 {
243 0 : panic!("CacheId::new() returned 0, overflow");
244 192 : }
245 192 : Self(id)
246 192 : }
247 : }
248 :
249 : /// See module-level comment.
250 : pub(crate) struct Cache<T: Types> {
251 : id: CacheId,
252 : map: Map<T>,
253 : }
254 :
255 : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
256 :
257 : impl<T: Types> Default for Cache<T> {
258 192 : fn default() -> Self {
259 192 : Self {
260 192 : id: CacheId::next(),
261 192 : map: Default::default(),
262 192 : }
263 192 : }
264 : }
265 :
266 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
267 : pub(crate) struct ShardTimelineId {
268 : pub(crate) shard_index: ShardIndex,
269 : pub(crate) timeline_id: TimelineId,
270 : }
271 :
272 : /// See module-level comment.
273 : pub(crate) struct Handle<T: Types> {
274 : inner: Arc<Mutex<HandleInner<T>>>,
275 : open: Arc<T::Timeline>,
276 : }
277 : pub(crate) struct WeakHandle<T: Types> {
278 : inner: Weak<Mutex<HandleInner<T>>>,
279 : }
280 :
281 : enum HandleInner<T: Types> {
282 : Open(Arc<T::Timeline>),
283 : ShutDown,
284 : }
285 :
286 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
287 : ///
288 : /// See module-level comment for details.
289 : pub struct PerTimelineState<T: Types> {
290 : // None = shutting down
291 : #[allow(clippy::type_complexity)]
292 : handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
293 : }
294 :
295 : impl<T: Types> Default for PerTimelineState<T> {
296 2904 : fn default() -> Self {
297 2904 : Self {
298 2904 : handles: Mutex::new(Some(Default::default())),
299 2904 : }
300 2904 : }
301 : }
302 :
303 : /// Abstract view of [`crate::tenant::mgr`], for testability.
304 : pub(crate) trait TenantManager<T: Types> {
305 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
306 : /// Errors are returned as [`GetError::TenantManager`].
307 : async fn resolve(
308 : &self,
309 : timeline_id: TimelineId,
310 : shard_selector: ShardSelector,
311 : ) -> Result<T::Timeline, T::TenantManagerError>;
312 : }
313 :
314 : /// Abstract view of an [`Arc<Timeline>`], for testability.
315 : pub(crate) trait Timeline<T: Types> {
316 : fn shard_timeline_id(&self) -> ShardTimelineId;
317 : fn get_shard_identity(&self) -> &ShardIdentity;
318 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
319 : }
320 :
321 : /// Errors returned by [`Cache::get`].
322 : #[derive(Debug)]
323 : pub(crate) enum GetError<T: Types> {
324 : TenantManager(T::TenantManagerError),
325 : PerTimelineStateShutDown,
326 : }
327 :
328 : /// Internal type used in [`Cache::get`].
329 : enum RoutingResult<T: Types> {
330 : FastPath(Handle<T>),
331 : SlowPath(ShardTimelineId),
332 : NeedConsultTenantManager,
333 : }
334 :
335 : impl<T: Types> Cache<T> {
336 : /// See module-level comment for details.
337 : ///
338 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
339 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
340 : /// the [`Handle`] are responsible for checking these conditions
341 : /// and if so, return an error that causes the page service to
342 : /// close the connection.
343 : #[instrument(level = "trace", skip_all)]
344 : pub(crate) async fn get(
345 : &mut self,
346 : timeline_id: TimelineId,
347 : shard_selector: ShardSelector,
348 : tenant_manager: &T::TenantManager,
349 : ) -> Result<Handle<T>, GetError<T>> {
350 : // terminates because when every iteration we remove an element from the map
351 : let miss: ShardSelector = loop {
352 : let routing_state = self.shard_routing(timeline_id, shard_selector);
353 : match routing_state {
354 : RoutingResult::FastPath(handle) => return Ok(handle),
355 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
356 : Some(cached) => match cached.upgrade() {
357 : Ok(upgraded) => return Ok(upgraded),
358 : Err(HandleUpgradeError::ShutDown) => {
359 : // TODO: dedup with shard_routing()
360 : trace!("handle cache stale");
361 : self.map.remove(&key).unwrap();
362 : continue;
363 : }
364 : },
365 : None => break ShardSelector::Known(key.shard_index),
366 : },
367 : RoutingResult::NeedConsultTenantManager => break shard_selector,
368 : }
369 : };
370 : self.get_miss(timeline_id, miss, tenant_manager).await
371 : }
372 :
373 : #[inline(always)]
374 341 : fn shard_routing(
375 341 : &mut self,
376 341 : timeline_id: TimelineId,
377 341 : shard_selector: ShardSelector,
378 341 : ) -> RoutingResult<T> {
379 : loop {
380 : // terminates because when every iteration we remove an element from the map
381 372 : let Some((first_key, first_handle)) = self.map.iter().next() else {
382 228 : return RoutingResult::NeedConsultTenantManager;
383 : };
384 144 : let Ok(first_handle) = first_handle.upgrade() else {
385 : // TODO: dedup with get()
386 31 : trace!("handle cache stale");
387 31 : let first_key_owned = *first_key;
388 31 : self.map.remove(&first_key_owned).unwrap();
389 31 : continue;
390 : };
391 :
392 113 : let first_handle_shard_identity = first_handle.get_shard_identity();
393 113 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
394 113 : shard_number: shard_num,
395 113 : shard_count: first_handle_shard_identity.count,
396 113 : };
397 :
398 113 : let need_idx = match shard_selector {
399 113 : ShardSelector::Page(key) => {
400 113 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
401 : }
402 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
403 0 : ShardSelector::Known(shard_idx) => shard_idx,
404 : };
405 113 : let need_shard_timeline_id = ShardTimelineId {
406 113 : shard_index: need_idx,
407 113 : timeline_id,
408 113 : };
409 113 : let first_handle_shard_timeline_id = ShardTimelineId {
410 113 : shard_index: first_handle_shard_identity.shard_index(),
411 113 : timeline_id: first_handle.shard_timeline_id().timeline_id,
412 113 : };
413 113 :
414 113 : if need_shard_timeline_id == first_handle_shard_timeline_id {
415 72 : return RoutingResult::FastPath(first_handle);
416 : } else {
417 41 : return RoutingResult::SlowPath(need_shard_timeline_id);
418 : }
419 : }
420 341 : }
421 :
422 : #[instrument(level = "trace", skip_all)]
423 : #[inline(always)]
424 : async fn get_miss(
425 : &mut self,
426 : timeline_id: TimelineId,
427 : shard_selector: ShardSelector,
428 : tenant_manager: &T::TenantManager,
429 : ) -> Result<Handle<T>, GetError<T>> {
430 : match tenant_manager.resolve(timeline_id, shard_selector).await {
431 : Ok(timeline) => {
432 : let key = timeline.shard_timeline_id();
433 : match &shard_selector {
434 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
435 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
436 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
437 : }
438 :
439 : trace!("creating new HandleInner");
440 : let timeline = Arc::new(timeline);
441 : let handle_inner_arc =
442 : Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
443 : let handle_weak = WeakHandle {
444 : inner: Arc::downgrade(&handle_inner_arc),
445 : };
446 : let handle = handle_weak
447 : .upgrade()
448 : .ok()
449 : .expect("we just created it and it's not linked anywhere yet");
450 : {
451 : let mut lock_guard = timeline
452 : .per_timeline_state()
453 : .handles
454 : .lock()
455 : .expect("mutex poisoned");
456 : match &mut *lock_guard {
457 : Some(per_timeline_state) => {
458 : let replaced =
459 : per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
460 : assert!(replaced.is_none(), "some earlier code left a stale handle");
461 : match self.map.entry(key) {
462 : hash_map::Entry::Occupied(_o) => {
463 : // This cannot not happen because
464 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
465 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
466 : // while we were waiting for the tenant manager.
467 : unreachable!()
468 : }
469 : hash_map::Entry::Vacant(v) => {
470 : v.insert(handle_weak);
471 : }
472 : }
473 : }
474 : None => {
475 : return Err(GetError::PerTimelineStateShutDown);
476 : }
477 : }
478 : }
479 : Ok(handle)
480 : }
481 : Err(e) => Err(GetError::TenantManager(e)),
482 : }
483 : }
484 : }
485 :
486 : pub(crate) enum HandleUpgradeError {
487 : ShutDown,
488 : }
489 :
490 : impl<T: Types> WeakHandle<T> {
491 425 : pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
492 425 : let Some(inner) = Weak::upgrade(&self.inner) else {
493 24 : return Err(HandleUpgradeError::ShutDown);
494 : };
495 401 : let lock_guard = inner.lock().expect("poisoned");
496 401 : match &*lock_guard {
497 365 : HandleInner::Open(open) => {
498 365 : let open = Arc::clone(open);
499 365 : drop(lock_guard);
500 365 : Ok(Handle { open, inner })
501 : }
502 36 : HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
503 : }
504 425 : }
505 :
506 0 : pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
507 0 : Weak::ptr_eq(&self.inner, &other.inner)
508 0 : }
509 : }
510 :
511 : impl<T: Types> std::ops::Deref for Handle<T> {
512 : type Target = T::Timeline;
513 622 : fn deref(&self) -> &Self::Target {
514 622 : &self.open
515 622 : }
516 : }
517 :
518 : impl<T: Types> Handle<T> {
519 12 : pub(crate) fn downgrade(&self) -> WeakHandle<T> {
520 12 : WeakHandle {
521 12 : inner: Arc::downgrade(&self.inner),
522 12 : }
523 12 : }
524 : }
525 :
526 : impl<T: Types> PerTimelineState<T> {
527 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
528 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
529 : /// Even if [`TenantManager::resolve`] would still resolve to it.
530 : ///
531 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`Types::Timeline`] alive.
532 : /// That's ok because they're short-lived. See module-level comment for details.
533 : #[instrument(level = "trace", skip_all)]
534 : pub(super) fn shutdown(&self) {
535 : let handles = self
536 : .handles
537 : .lock()
538 : .expect("mutex poisoned")
539 : // NB: this .take() sets locked to None.
540 : // That's what makes future `Cache::get` misses fail.
541 : // Cache hits are taken care of below.
542 : .take();
543 : let Some(handles) = handles else {
544 : trace!("already shut down");
545 : return;
546 : };
547 : for handle_inner_arc in handles.values() {
548 : // Make hits fail.
549 : let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
550 : lock_guard.shutdown();
551 : }
552 : drop(handles);
553 : }
554 : }
555 :
556 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
557 : impl<T: Types> Drop for Cache<T> {
558 192 : fn drop(&mut self) {
559 : for (
560 : _,
561 : WeakHandle {
562 192 : inner: handle_inner_weak,
563 : },
564 192 : ) in self.map.drain()
565 : {
566 192 : let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
567 24 : continue;
568 : };
569 168 : let Some(handle_timeline) = handle_inner_arc
570 168 : // locking rules: drop lock before acquiring other lock below
571 168 : .lock()
572 168 : .expect("poisoned")
573 168 : .shutdown()
574 : else {
575 : // Concurrent PerTimelineState::shutdown.
576 0 : continue;
577 : };
578 : // Clean up per_timeline_state so the HandleInner allocation can be dropped.
579 168 : let per_timeline_state = handle_timeline.per_timeline_state();
580 168 : let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
581 168 : let Some(handles) = &mut *handles_lock_guard else {
582 0 : continue;
583 : };
584 168 : let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
585 : // Concurrent PerTimelineState::shutdown.
586 0 : continue;
587 : };
588 168 : drop(handles_lock_guard); // locking rules!
589 168 : assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
590 : }
591 192 : }
592 : }
593 :
594 : impl<T: Types> HandleInner<T> {
595 228 : fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
596 228 : match std::mem::replace(self, HandleInner::ShutDown) {
597 228 : HandleInner::Open(timeline) => Some(timeline),
598 : HandleInner::ShutDown => {
599 : // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
600 : // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
601 : // the handle lock at the same time.
602 0 : None
603 : }
604 : }
605 228 : }
606 : }
607 :
608 : #[cfg(test)]
609 : mod tests {
610 : use std::sync::Weak;
611 :
612 : use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
613 : use pageserver_api::models::ShardParameters;
614 : use pageserver_api::reltag::RelTag;
615 : use pageserver_api::shard::ShardStripeSize;
616 : use utils::shard::ShardCount;
617 : use utils::sync::gate::GateGuard;
618 :
619 : use super::*;
620 :
621 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
622 :
623 : #[derive(Debug)]
624 : struct TestTypes;
625 : impl Types for TestTypes {
626 : type TenantManagerError = anyhow::Error;
627 : type TenantManager = StubManager;
628 : type Timeline = Entered;
629 : }
630 :
631 : struct StubManager {
632 : shards: Vec<Arc<StubTimeline>>,
633 : }
634 :
635 : struct StubTimeline {
636 : gate: utils::sync::gate::Gate,
637 : id: TimelineId,
638 : shard: ShardIdentity,
639 : per_timeline_state: PerTimelineState<TestTypes>,
640 : myself: Weak<StubTimeline>,
641 : }
642 :
643 : struct Entered {
644 : timeline: Arc<StubTimeline>,
645 : #[allow(dead_code)] // it's stored here to keep the gate open
646 : gate_guard: Arc<GateGuard>,
647 : }
648 :
649 : impl StubTimeline {
650 132 : fn getpage(&self) {
651 132 : // do nothing
652 132 : }
653 : }
654 :
655 : impl Timeline<TestTypes> for Entered {
656 353 : fn shard_timeline_id(&self) -> ShardTimelineId {
657 353 : ShardTimelineId {
658 353 : shard_index: self.shard.shard_index(),
659 353 : timeline_id: self.id,
660 353 : }
661 353 : }
662 :
663 113 : fn get_shard_identity(&self) -> &ShardIdentity {
664 113 : &self.shard
665 113 : }
666 :
667 408 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
668 408 : &self.per_timeline_state
669 408 : }
670 : }
671 :
672 : impl TenantManager<TestTypes> for StubManager {
673 276 : async fn resolve(
674 276 : &self,
675 276 : timeline_id: TimelineId,
676 276 : shard_selector: ShardSelector,
677 276 : ) -> anyhow::Result<Entered> {
678 324 : for timeline in &self.shards {
679 300 : if timeline.id == timeline_id {
680 264 : let enter_gate = || {
681 252 : let gate_guard = timeline.gate.enter()?;
682 240 : let gate_guard = Arc::new(gate_guard);
683 240 : anyhow::Ok(gate_guard)
684 252 : };
685 0 : match &shard_selector {
686 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
687 0 : return Ok(Entered {
688 0 : timeline: Arc::clone(timeline),
689 0 : gate_guard: enter_gate()?,
690 : });
691 : }
692 0 : ShardSelector::Zero => continue,
693 228 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
694 228 : return Ok(Entered {
695 228 : timeline: Arc::clone(timeline),
696 228 : gate_guard: enter_gate()?,
697 : });
698 : }
699 0 : ShardSelector::Page(_) => continue,
700 36 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
701 24 : return Ok(Entered {
702 24 : timeline: Arc::clone(timeline),
703 24 : gate_guard: enter_gate()?,
704 : });
705 : }
706 12 : ShardSelector::Known(_) => continue,
707 : }
708 36 : }
709 : }
710 24 : anyhow::bail!("not found")
711 276 : }
712 : }
713 :
714 : impl std::ops::Deref for Entered {
715 : type Target = StubTimeline;
716 1623 : fn deref(&self) -> &Self::Target {
717 1623 : &self.timeline
718 1623 : }
719 : }
720 :
721 : #[tokio::test(start_paused = true)]
722 12 : async fn test_timeline_shutdown() {
723 12 : crate::tenant::harness::setup_logging();
724 12 :
725 12 : let timeline_id = TimelineId::generate();
726 12 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
727 12 : gate: Default::default(),
728 12 : id: timeline_id,
729 12 : shard: ShardIdentity::unsharded(),
730 12 : per_timeline_state: PerTimelineState::default(),
731 12 : myself: myself.clone(),
732 12 : });
733 12 : let mgr = StubManager {
734 12 : shards: vec![shard0.clone()],
735 12 : };
736 12 : let key = DBDIR_KEY;
737 12 :
738 12 : let mut cache = Cache::<TestTypes>::default();
739 12 :
740 12 : //
741 12 : // fill the cache
742 12 : //
743 12 : let handle: Handle<_> = cache
744 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
745 12 : .await
746 12 : .expect("we have the timeline");
747 12 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
748 12 : assert_eq!(cache.map.len(), 1);
749 12 : drop(handle);
750 12 :
751 12 : //
752 12 : // demonstrate that Handle holds up gate closure
753 12 : // but shutdown prevents new handles from being handed out
754 12 : //
755 12 :
756 12 : tokio::select! {
757 12 : _ = shard0.gate.close() => {
758 12 : panic!("cache and per-timeline handler state keep cache open");
759 12 : }
760 12 : _ = tokio::time::sleep(FOREVER) => {
761 12 : // NB: first poll of close() makes it enter closing state
762 12 : }
763 12 : }
764 12 :
765 12 : let handle = cache
766 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
767 12 : .await
768 12 : .expect("we have the timeline");
769 12 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
770 12 :
771 12 : // SHUTDOWN
772 12 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
773 12 :
774 12 : assert_eq!(
775 12 : cache.map.len(),
776 12 : 1,
777 12 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
778 12 : );
779 12 :
780 12 : // this handle is perfectly usable
781 12 : handle.getpage();
782 12 :
783 12 : cache
784 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
785 12 : .await
786 12 : .err()
787 12 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
788 12 : assert_eq!(
789 12 : cache.map.len(),
790 12 : 0,
791 12 : "first access after shutdown cleans up the Weak's from the cache"
792 12 : );
793 12 :
794 12 : tokio::select! {
795 12 : _ = shard0.gate.close() => {
796 12 : panic!("handle is keeping gate open");
797 12 : }
798 12 : _ = tokio::time::sleep(FOREVER) => { }
799 12 : }
800 12 :
801 12 : drop(handle);
802 12 :
803 12 : // closing gate succeeds after dropping handle
804 12 : tokio::select! {
805 12 : _ = shard0.gate.close() => { }
806 12 : _ = tokio::time::sleep(FOREVER) => {
807 12 : panic!("handle is dropped, no other gate holders exist")
808 12 : }
809 12 : }
810 12 :
811 12 : // map gets cleaned on next lookup
812 12 : cache
813 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
814 12 : .await
815 12 : .err()
816 12 : .expect("documented behavior: can't get new handle after shutdown");
817 12 : assert_eq!(cache.map.len(), 0);
818 12 :
819 12 : // ensure all refs to shard0 are gone and we're not leaking anything
820 12 : drop(shard0);
821 12 : drop(mgr);
822 12 : }
823 :
824 : #[tokio::test]
825 12 : async fn test_multiple_timelines_and_deletion() {
826 12 : crate::tenant::harness::setup_logging();
827 12 :
828 12 : let timeline_a = TimelineId::generate();
829 12 : let timeline_b = TimelineId::generate();
830 12 : assert_ne!(timeline_a, timeline_b);
831 12 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
832 12 : gate: Default::default(),
833 12 : id: timeline_a,
834 12 : shard: ShardIdentity::unsharded(),
835 12 : per_timeline_state: PerTimelineState::default(),
836 12 : myself: myself.clone(),
837 12 : });
838 12 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
839 12 : gate: Default::default(),
840 12 : id: timeline_b,
841 12 : shard: ShardIdentity::unsharded(),
842 12 : per_timeline_state: PerTimelineState::default(),
843 12 : myself: myself.clone(),
844 12 : });
845 12 : let mut mgr = StubManager {
846 12 : shards: vec![timeline_a.clone(), timeline_b.clone()],
847 12 : };
848 12 : let key = DBDIR_KEY;
849 12 :
850 12 : let mut cache = Cache::<TestTypes>::default();
851 12 :
852 12 : cache
853 12 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
854 12 : .await
855 12 : .expect("we have it");
856 12 : cache
857 12 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
858 12 : .await
859 12 : .expect("we have it");
860 12 : assert_eq!(cache.map.len(), 2);
861 12 :
862 12 : // delete timeline A
863 12 : timeline_a.per_timeline_state.shutdown();
864 24 : mgr.shards.retain(|t| t.id != timeline_a.id);
865 12 : assert!(
866 12 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
867 12 : .await
868 12 : .is_err(),
869 12 : "broken StubManager implementation"
870 12 : );
871 12 :
872 12 : assert_eq!(
873 12 : cache.map.len(),
874 12 : 2,
875 12 : "cache still has a Weak handle to Timeline A"
876 12 : );
877 12 : cache
878 12 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
879 12 : .await
880 12 : .err()
881 12 : .expect("documented behavior: can't get new handle after shutdown");
882 12 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
883 12 :
884 12 : cache
885 12 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
886 12 : .await
887 12 : .expect("we still have it");
888 12 : }
889 :
890 84 : fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
891 84 : rel_block_to_key(
892 84 : RelTag {
893 84 : spcnode: 1663,
894 84 : dbnode: 208101,
895 84 : relnode: 2620,
896 84 : forknum: 0,
897 84 : },
898 84 : shard.0 as u32 * params.stripe_size.0,
899 84 : )
900 84 : }
901 :
902 : #[tokio::test(start_paused = true)]
903 12 : async fn test_shard_split() {
904 12 : crate::tenant::harness::setup_logging();
905 12 : let timeline_id = TimelineId::generate();
906 12 : let parent = Arc::new_cyclic(|myself| StubTimeline {
907 12 : gate: Default::default(),
908 12 : id: timeline_id,
909 12 : shard: ShardIdentity::unsharded(),
910 12 : per_timeline_state: PerTimelineState::default(),
911 12 : myself: myself.clone(),
912 12 : });
913 12 : let child_params = ShardParameters {
914 12 : count: ShardCount(2),
915 12 : stripe_size: ShardStripeSize::default(),
916 12 : };
917 12 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
918 12 : gate: Default::default(),
919 12 : id: timeline_id,
920 12 : shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
921 12 : per_timeline_state: PerTimelineState::default(),
922 12 : myself: myself.clone(),
923 12 : });
924 12 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
925 12 : gate: Default::default(),
926 12 : id: timeline_id,
927 12 : shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
928 12 : per_timeline_state: PerTimelineState::default(),
929 12 : myself: myself.clone(),
930 12 : });
931 12 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
932 12 :
933 12 : let mut cache = Cache::<TestTypes>::default();
934 12 :
935 12 : // fill the cache with the parent
936 36 : for i in 0..2 {
937 24 : let handle = cache
938 24 : .get(
939 24 : timeline_id,
940 24 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
941 24 : &StubManager {
942 24 : shards: vec![parent.clone()],
943 24 : },
944 24 : )
945 24 : .await
946 24 : .expect("we have it");
947 24 : assert!(
948 24 : Weak::ptr_eq(&handle.myself, &parent.myself),
949 12 : "mgr returns parent first"
950 12 : );
951 24 : drop(handle);
952 12 : }
953 12 :
954 12 : //
955 12 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
956 12 : //
957 12 :
958 12 : // while we haven't shut down the parent, the cache will return the cached parent, even
959 12 : // if the tenant manager returns the child
960 36 : for i in 0..2 {
961 24 : let handle = cache
962 24 : .get(
963 24 : timeline_id,
964 24 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
965 24 : &StubManager {
966 24 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
967 24 : },
968 24 : )
969 24 : .await
970 24 : .expect("we have it");
971 24 : assert!(
972 24 : Weak::ptr_eq(&handle.myself, &parent.myself),
973 12 : "mgr returns parent"
974 12 : );
975 24 : drop(handle);
976 12 : }
977 12 :
978 12 : let parent_handle = cache
979 12 : .get(
980 12 : timeline_id,
981 12 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
982 12 : &StubManager {
983 12 : shards: vec![parent.clone()],
984 12 : },
985 12 : )
986 12 : .await
987 12 : .expect("we have it");
988 12 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
989 12 :
990 12 : // invalidate the cache
991 12 : parent.per_timeline_state.shutdown();
992 12 :
993 12 : // the cache will now return the child, even though the parent handle still exists
994 36 : for i in 0..2 {
995 24 : let handle = cache
996 24 : .get(
997 24 : timeline_id,
998 24 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
999 24 : &StubManager {
1000 24 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
1001 24 : },
1002 24 : )
1003 24 : .await
1004 24 : .expect("we have it");
1005 24 : assert!(
1006 24 : Weak::ptr_eq(
1007 24 : &handle.myself,
1008 24 : &child_shards_by_shard_number[i as usize].myself
1009 24 : ),
1010 12 : "mgr returns child"
1011 12 : );
1012 24 : drop(handle);
1013 12 : }
1014 12 :
1015 12 : // all the while the parent handle kept the parent gate open
1016 12 : tokio::select! {
1017 12 : _ = parent_handle.gate.close() => {
1018 12 : panic!("parent handle is keeping gate open");
1019 12 : }
1020 12 : _ = tokio::time::sleep(FOREVER) => { }
1021 12 : }
1022 12 : drop(parent_handle);
1023 12 : tokio::select! {
1024 12 : _ = parent.gate.close() => { }
1025 12 : _ = tokio::time::sleep(FOREVER) => {
1026 12 : panic!("parent handle is dropped, no other gate holders exist")
1027 12 : }
1028 12 : }
1029 12 : }
1030 :
1031 : #[tokio::test(start_paused = true)]
1032 12 : async fn test_connection_handler_exit() {
1033 12 : crate::tenant::harness::setup_logging();
1034 12 : let timeline_id = TimelineId::generate();
1035 12 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1036 12 : gate: Default::default(),
1037 12 : id: timeline_id,
1038 12 : shard: ShardIdentity::unsharded(),
1039 12 : per_timeline_state: PerTimelineState::default(),
1040 12 : myself: myself.clone(),
1041 12 : });
1042 12 : let mgr = StubManager {
1043 12 : shards: vec![shard0.clone()],
1044 12 : };
1045 12 : let key = DBDIR_KEY;
1046 12 :
1047 12 : // Simulate 10 connections that's opened, used, and closed
1048 132 : for _ in 0..10 {
1049 120 : let mut cache = Cache::<TestTypes>::default();
1050 120 : let handle = {
1051 120 : let handle = cache
1052 120 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1053 120 : .await
1054 120 : .expect("we have the timeline");
1055 120 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1056 120 : handle
1057 120 : };
1058 120 : handle.getpage();
1059 12 : }
1060 12 :
1061 12 : // No handles exist, thus gates are closed and don't require shutdown.
1062 12 : // Thus the gate should close immediately, even without shutdown.
1063 12 : tokio::select! {
1064 12 : _ = shard0.gate.close() => { }
1065 12 : _ = tokio::time::sleep(FOREVER) => {
1066 12 : panic!("handle is dropped, no other gate holders exist")
1067 12 : }
1068 12 : }
1069 12 : }
1070 :
1071 : #[tokio::test(start_paused = true)]
1072 12 : async fn test_weak_handles() {
1073 12 : crate::tenant::harness::setup_logging();
1074 12 : let timeline_id = TimelineId::generate();
1075 12 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1076 12 : gate: Default::default(),
1077 12 : id: timeline_id,
1078 12 : shard: ShardIdentity::unsharded(),
1079 12 : per_timeline_state: PerTimelineState::default(),
1080 12 : myself: myself.clone(),
1081 12 : });
1082 12 : let mgr = StubManager {
1083 12 : shards: vec![shard0.clone()],
1084 12 : };
1085 12 :
1086 12 : let refcount_start = Arc::strong_count(&shard0);
1087 12 :
1088 12 : let key = DBDIR_KEY;
1089 12 :
1090 12 : let mut cache = Cache::<TestTypes>::default();
1091 12 :
1092 12 : let handle = cache
1093 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1094 12 : .await
1095 12 : .expect("we have the timeline");
1096 12 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1097 12 :
1098 12 : let weak_handle = handle.downgrade();
1099 12 :
1100 12 : drop(handle);
1101 12 :
1102 12 : let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
1103 12 :
1104 12 : // Start shutdown
1105 12 : shard0.per_timeline_state.shutdown();
1106 12 :
1107 12 : // Upgrades during shutdown don't work, even if upgraded_handle exists.
1108 12 : weak_handle
1109 12 : .upgrade()
1110 12 : .err()
1111 12 : .expect("can't upgrade weak handle as soon as shutdown started");
1112 12 :
1113 12 : // But upgraded_handle is still alive, so the gate won't close.
1114 12 : tokio::select! {
1115 12 : _ = shard0.gate.close() => {
1116 12 : panic!("handle is keeping gate open");
1117 12 : }
1118 12 : _ = tokio::time::sleep(FOREVER) => { }
1119 12 : }
1120 12 :
1121 12 : // Drop the last handle.
1122 12 : drop(upgraded_handle);
1123 12 :
1124 12 : // The gate should close now, despite there still being a weak_handle.
1125 12 : tokio::select! {
1126 12 : _ = shard0.gate.close() => { }
1127 12 : _ = tokio::time::sleep(FOREVER) => {
1128 12 : panic!("only strong handle is dropped and we shut down per-timeline-state")
1129 12 : }
1130 12 : }
1131 12 :
1132 12 : // The weak handle still can't be upgraded.
1133 12 : weak_handle
1134 12 : .upgrade()
1135 12 : .err()
1136 12 : .expect("still shouldn't be able to upgrade the weak handle");
1137 12 :
1138 12 : // There should be no strong references to the timeline object except the one on "stack".
1139 12 : assert_eq!(Arc::strong_count(&shard0), refcount_start);
1140 12 : }
1141 :
1142 : #[tokio::test(start_paused = true)]
1143 12 : async fn test_reference_cycle_broken_when_cache_is_dropped() {
1144 12 : crate::tenant::harness::setup_logging();
1145 12 : let timeline_id = TimelineId::generate();
1146 12 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1147 12 : gate: Default::default(),
1148 12 : id: timeline_id,
1149 12 : shard: ShardIdentity::unsharded(),
1150 12 : per_timeline_state: PerTimelineState::default(),
1151 12 : myself: myself.clone(),
1152 12 : });
1153 12 : let mgr = StubManager {
1154 12 : shards: vec![shard0.clone()],
1155 12 : };
1156 12 : let key = DBDIR_KEY;
1157 12 :
1158 12 : let mut cache = Cache::<TestTypes>::default();
1159 12 :
1160 12 : // helper to check if a handle is referenced by per_timeline_state
1161 24 : let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
1162 24 : let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
1163 24 : let per_timeline_state = per_timeline_state.as_ref().unwrap();
1164 24 : per_timeline_state
1165 24 : .values()
1166 24 : .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
1167 24 : };
1168 12 :
1169 12 : // Fill the cache.
1170 12 : let handle = cache
1171 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1172 12 : .await
1173 12 : .expect("we have the timeline");
1174 12 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1175 12 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1176 12 : assert!(
1177 12 : per_timeline_state_refs_handle(&handle_inner_weak),
1178 12 : "we still hold `handle` _and_ haven't dropped `cache` yet"
1179 12 : );
1180 12 :
1181 12 : // Drop the cache.
1182 12 : drop(cache);
1183 12 :
1184 12 : assert!(
1185 12 : !(per_timeline_state_refs_handle(&handle_inner_weak)),
1186 12 : "nothing should reference the handle allocation anymore"
1187 12 : );
1188 12 : assert!(
1189 12 : Weak::upgrade(&handle_inner_weak).is_some(),
1190 12 : "the local `handle` still keeps the allocation alive"
1191 12 : );
1192 12 : // but obviously the cache is gone so no new allocations can be handed out.
1193 12 :
1194 12 : // Drop handle.
1195 12 : drop(handle);
1196 12 : assert!(
1197 12 : Weak::upgrade(&handle_inner_weak).is_none(),
1198 12 : "the local `handle` is dropped, so the allocation should be dropped by now"
1199 12 : );
1200 12 : }
1201 :
1202 : #[tokio::test(start_paused = true)]
1203 12 : async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
1204 12 : crate::tenant::harness::setup_logging();
1205 12 : let timeline_id = TimelineId::generate();
1206 12 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1207 12 : gate: Default::default(),
1208 12 : id: timeline_id,
1209 12 : shard: ShardIdentity::unsharded(),
1210 12 : per_timeline_state: PerTimelineState::default(),
1211 12 : myself: myself.clone(),
1212 12 : });
1213 12 : let mgr = StubManager {
1214 12 : shards: vec![shard0.clone()],
1215 12 : };
1216 12 : let key = DBDIR_KEY;
1217 12 :
1218 12 : let mut cache = Cache::<TestTypes>::default();
1219 12 : let handle = cache
1220 12 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1221 12 : .await
1222 12 : .expect("we have the timeline");
1223 12 : // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
1224 12 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1225 12 :
1226 12 : // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
1227 12 : drop(handle);
1228 12 : assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
1229 12 :
1230 12 : // Shutdown the per_timeline_state.
1231 12 : shard0.per_timeline_state.shutdown();
1232 12 : assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
1233 12 :
1234 12 : // cache only contains Weak's, so, it can outlive the per_timeline_state without
1235 12 : // Drop explicitly solely to make this point.
1236 12 : drop(cache);
1237 12 : }
1238 : }
|