Line data Source code
1 : //! A cache for [`crate::tenant::mgr`]+`Tenant::get_timeline`+`Timeline::gate.enter()`.
2 : //!
3 : //! # Motivation
4 : //!
5 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
6 : //!
7 : //! Without sharding, there is a single Timeline object to which we dispatch
8 : //! all requests. For example, a getpage request gets dispatched to the
9 : //! Timeline::get method of the Timeline object that represents the
10 : //! (tenant,timeline) of that connection.
11 : //!
12 : //! With sharding, for each request that comes in on the connection,
13 : //! we first have to perform shard routing based on the requested key (=~ page number).
14 : //! The result of shard routing is a Timeline object.
15 : //! We then dispatch the request to that Timeline object.
16 : //!
17 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
18 : //! we hold the Timeline gate open while we're invoking the method on the
19 : //! Timeline object.
20 : //!
21 : //! We want to avoid the overhead of doing, for each incoming request,
22 : //! - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
23 : //! - cloning the `Arc<Timeline>` out of the tenant manager so we can
24 : //! release the mgr rwlock before doing any request processing work
25 : //! - re-entering the Timeline gate for each Timeline method invocation.
26 : //!
27 : //! Regardless of how we accomplish the above, it should not
28 : //! prevent the Timeline from shutting down promptly.
29 : //!
30 : //!
31 : //! # Design
32 : //!
33 : //! ## Data Structures
34 : //!
35 : //! There are two concepts expressed as associated types in the `Types` trait:
36 : //! - `TenantManager`: the thing that performs the expensive work. It produces
37 : //! a `Timeline` object, which is the other associated type.
38 : //! - `Timeline`: the item that we cache for fast (TenantTimelineId,ShardSelector) lookup.
39 : //!
40 : //! There are three user-facing data structures exposed by this module:
41 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
42 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
43 : //! - `Handle`: a smart pointer that derefs to the Types::Timeline.
44 : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
45 : //! trying to ugprade back to a `Handle`. If successful, a re-upgraded Handle will always
46 : //! point to the same cached `Types::Timeline`. Upgrades never invoke the `TenantManager`.
47 : //!
48 : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
49 : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
50 : //!
51 : //! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
52 : //! referenced weakly and strongly from various places which we are now illustrating.
53 : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
54 : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
55 : //! or `Weak<Mutex<HandleInner>>`, respectively.
56 : //!
57 : //! - The `Handle` is a strong ref.
58 : //! - The `WeakHandle` is a weak ref.
59 : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
60 : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
61 : //!
62 : //! Lifetimes:
63 : //! - `WeakHandle` and `Handle`: single pagestream request.
64 : //! - `Cache`: single page service connection.
65 : //! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
66 : //!
67 : //! ## Request Handling Flow (= filling and using the `Cache``)
68 : //!
69 : //! To dispatch a request, the page service connection calls `Cache::get`.
70 : //!
71 : //! A cache miss means we call Types::TenantManager::resolve for shard routing,
72 : //! cloning the `Arc<Timeline>` out of it, and entering the gate. The result of
73 : //! resolve() is the object we want to cache, and return `Handle`s to for subseqent `Cache::get` calls.
74 : //!
75 : //! We wrap the object returned from resolve() in an `Arc` and store that inside the
76 : //! `Arc<Mutex<HandleInner>>>`. A weak ref to the HandleInner is stored in the `Cache`
77 : //! and a strong ref in the `PerTimelineState`.
78 : //! Another strong ref is returned wrapped in a `Handle`.
79 : //!
80 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
81 : //! and find the weak ref in the cache.
82 : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
83 : //!
84 : //! The pagestream processing is pipelined and involves a batching step.
85 : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
86 : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
87 : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
88 : //! It then drops the `Handle`, and thus the `Arc<Mutex<HandleInner>>` inside it.
89 : //!
90 : //! # Performance
91 : //!
92 : //! Remember from the introductory section:
93 : //!
94 : //! > We want to avoid the overhead of doing, for each incoming request,
95 : //! > - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
96 : //! > - cloning the `Arc<Timeline>` out of the tenant manager so we can
97 : //! > release the mgr rwlock before doing any request processing work
98 : //! > - re-entering the Timeline gate for each Timeline method invocation.
99 : //!
100 : //! All of these boil down to some state that is either globally shared among all shards
101 : //! or state shared among all tasks that serve a particular timeline.
102 : //! It is either protected by RwLock or manipulated via atomics.
103 : //! Even atomics are costly when shared across multiple cores.
104 : //! So, we want to avoid any permanent need for coordination between page_service tasks.
105 : //!
106 : //! The solution is to add indirection: we wrap the Types::Timeline object that is
107 : //! returned by Types::TenantManager into an Arc that is rivate to the `HandleInner`
108 : //! and hence to the single Cache / page_service connection.
109 : //! (Review the "Data Structures" section if that is unclear to you.)
110 : //!
111 : //!
112 : //! When upgrading a `WeakHandle`, we upgrade its weak to a strong ref (of the `Mutex<HandleInner>`),
113 : //! lock the mutex, take out a clone of the `Arc<Types::Timeline>`, and drop the Mutex.
114 : //! The Mutex is not contended because it is private to the connection.
115 : //! And again, the `Arc<Types::Timeline>` clone is cheap because that wrapper
116 : //! Arc's refcounts are private to the connection.
117 : //!
118 : //! Downgrading drops these two Arcs, which again, manipulates refcounts that are private to the connection.
119 : //!
120 : //!
121 : //! # Shutdown
122 : //!
123 : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
124 : //!
125 : //! ```text
126 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> Timeline
127 : //! ```
128 : //!
129 : //! Further, there is this cycle:
130 : //!
131 : //! ```text
132 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> GateGuard --keepalive--> Timeline
133 : //! ```
134 : //!
135 : //! The former cycle is a memory leak if not broken.
136 : //! The latter cycle further prevents the Timeline from shutting down
137 : //! because we certainly won't drop the Timeline while the GateGuard is alive.
138 : //! Preventing shutdown is the whole point of this handle/cache system,
139 : //! but when the Timeline needs to shut down, we need to break the cycle.
140 : //!
141 : //! The cycle is broken by either
142 : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
143 : //! - Connection shutdown (=> dropping the `Cache`).
144 : //!
145 : //! Both transition the `HandleInner` from [`HandleInner::Open`] to
146 : //! [`HandleInner::ShutDown`], which drops the only long-lived
147 : //! `Arc<Types::Timeline>`. Once the last short-lived Arc<Types::Timeline>
148 : //! is dropped, the `Types::Timeline` gets dropped and thereby
149 : //! the `GateGuard` and the `Arc<Timeline>` that it stores,
150 : //! thereby breaking both cycles.
151 : //!
152 : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
153 : //! thereby breaking the cycle.
154 : //! It also initiates draining of already existing `Handle`s by
155 : //! poisoning things so that no new `HandleInner`'s can be added
156 : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
157 : //!
158 : //! Concurrently existing / already upgraded `Handle`s will extend the
159 : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
160 : //! However, since `Handle`s are short-lived and new `Handle`s are not
161 : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
162 : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
163 : //!
164 : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
165 : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
166 : //! they will find the inner in state `HandleInner::ShutDown` state where the
167 : //! `Arc<GateGuard>` and Timeline has already been dropped.
168 : //!
169 : //! Dropping the `Cache` undoes the registration of this `Cache`'s
170 : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
171 : //! removes the strong ref to each of its `HandleInner`s
172 : //! from all the `PerTimelineState`.
173 : //!
174 : //! # Locking Rules
175 : //!
176 : //! To prevent deadlocks we:
177 : //!
178 : //! 1. Only ever hold one of the locks at a time.
179 : //! 2. Don't add more than one Drop impl that locks on the
180 : //! cycles above.
181 : //!
182 : //! As per (2), that impl is in `Drop for Cache`.
183 : //!
184 : //! # Fast Path for Shard Routing
185 : //!
186 : //! The `Cache` has a fast path for shard routing to avoid calling into
187 : //! the tenant manager for every request.
188 : //!
189 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
190 : //!
191 : //! The current implementation uses the first entry in the hash map
192 : //! to determine the `ShardParameters` and derive the correct
193 : //! `ShardIndex` for the requested key.
194 : //!
195 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
196 : //!
197 : //! If the lookup is successful and the `WeakHandle` can be upgraded,
198 : //! it's a hit.
199 : //!
200 : //! ## Cache invalidation
201 : //!
202 : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
203 : //! The only reasons why an entry in the cache can become stale are:
204 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
205 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
206 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
207 : //!
208 : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
209 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
210 : //!
211 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
212 : //! to the parent shard during a shard split. Eventually, the shard split task will
213 : //! shut down the parent => case (1).
214 :
215 : use std::collections::{HashMap, hash_map};
216 : use std::sync::{Arc, Mutex, Weak};
217 :
218 : use pageserver_api::shard::ShardIdentity;
219 : use tracing::{instrument, trace};
220 : use utils::id::TimelineId;
221 : use utils::shard::{ShardIndex, ShardNumber};
222 :
223 : use crate::tenant::mgr::ShardSelector;
224 :
225 : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
226 : pub(crate) trait Types: Sized + std::fmt::Debug {
227 : type TenantManagerError: Sized + std::fmt::Debug;
228 : type TenantManager: TenantManager<Self> + Sized;
229 : type Timeline: Timeline<Self> + Sized;
230 : }
231 :
232 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
233 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
234 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
235 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
236 : struct CacheId(u64);
237 :
238 : impl CacheId {
239 64 : fn next() -> Self {
240 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
241 64 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
242 64 : if id == 0 {
243 0 : panic!("CacheId::new() returned 0, overflow");
244 64 : }
245 64 : Self(id)
246 64 : }
247 : }
248 :
249 : /// See module-level comment.
250 : pub(crate) struct Cache<T: Types> {
251 : id: CacheId,
252 : map: Map<T>,
253 : }
254 :
255 : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
256 :
257 : impl<T: Types> Default for Cache<T> {
258 64 : fn default() -> Self {
259 64 : Self {
260 64 : id: CacheId::next(),
261 64 : map: Default::default(),
262 64 : }
263 64 : }
264 : }
265 :
266 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
267 : pub(crate) struct ShardTimelineId {
268 : pub(crate) shard_index: ShardIndex,
269 : pub(crate) timeline_id: TimelineId,
270 : }
271 :
272 : /// See module-level comment.
273 : pub(crate) struct Handle<T: Types> {
274 : inner: Arc<Mutex<HandleInner<T>>>,
275 : open: Arc<T::Timeline>,
276 : }
277 : pub(crate) struct WeakHandle<T: Types> {
278 : inner: Weak<Mutex<HandleInner<T>>>,
279 : }
280 :
281 : enum HandleInner<T: Types> {
282 : Open(Arc<T::Timeline>),
283 : ShutDown,
284 : }
285 :
286 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
287 : ///
288 : /// See module-level comment for details.
289 : pub struct PerTimelineState<T: Types> {
290 : // None = shutting down
291 : #[allow(clippy::type_complexity)]
292 : handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
293 : }
294 :
295 : impl<T: Types> Default for PerTimelineState<T> {
296 944 : fn default() -> Self {
297 944 : Self {
298 944 : handles: Mutex::new(Some(Default::default())),
299 944 : }
300 944 : }
301 : }
302 :
303 : /// Abstract view of [`crate::tenant::mgr`], for testability.
304 : pub(crate) trait TenantManager<T: Types> {
305 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
306 : /// Errors are returned as [`GetError::TenantManager`].
307 : async fn resolve(
308 : &self,
309 : timeline_id: TimelineId,
310 : shard_selector: ShardSelector,
311 : ) -> Result<T::Timeline, T::TenantManagerError>;
312 : }
313 :
314 : /// Abstract view of an [`Arc<Timeline>`], for testability.
315 : pub(crate) trait Timeline<T: Types> {
316 : fn shard_timeline_id(&self) -> ShardTimelineId;
317 : fn get_shard_identity(&self) -> &ShardIdentity;
318 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
319 : }
320 :
321 : /// Errors returned by [`Cache::get`].
322 : #[derive(Debug)]
323 : pub(crate) enum GetError<T: Types> {
324 : TenantManager(T::TenantManagerError),
325 : PerTimelineStateShutDown,
326 : }
327 :
328 : /// Internal type used in [`Cache::get`].
329 : enum RoutingResult<T: Types> {
330 : FastPath(Handle<T>),
331 : SlowPath(ShardTimelineId),
332 : NeedConsultTenantManager,
333 : }
334 :
335 : impl<T: Types> Cache<T> {
336 : /// See module-level comment for details.
337 : ///
338 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
339 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
340 : /// the [`Handle`] are responsible for checking these conditions
341 : /// and if so, return an error that causes the page service to
342 : /// close the connection.
343 : #[instrument(level = "trace", skip_all)]
344 : pub(crate) async fn get(
345 : &mut self,
346 : timeline_id: TimelineId,
347 : shard_selector: ShardSelector,
348 : tenant_manager: &T::TenantManager,
349 : ) -> Result<Handle<T>, GetError<T>> {
350 : // terminates because when every iteration we remove an element from the map
351 : let miss: ShardSelector = loop {
352 : let routing_state = self.shard_routing(timeline_id, shard_selector);
353 : match routing_state {
354 : RoutingResult::FastPath(handle) => return Ok(handle),
355 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
356 : Some(cached) => match cached.upgrade() {
357 : Ok(upgraded) => return Ok(upgraded),
358 : Err(HandleUpgradeError::ShutDown) => {
359 : // TODO: dedup with shard_routing()
360 : trace!("handle cache stale");
361 : self.map.remove(&key).unwrap();
362 : continue;
363 : }
364 : },
365 : None => break ShardSelector::Known(key.shard_index),
366 : },
367 : RoutingResult::NeedConsultTenantManager => break shard_selector,
368 : }
369 : };
370 : self.get_miss(timeline_id, miss, tenant_manager).await
371 : }
372 :
373 : #[inline(always)]
374 112 : fn shard_routing(
375 112 : &mut self,
376 112 : timeline_id: TimelineId,
377 112 : shard_selector: ShardSelector,
378 112 : ) -> RoutingResult<T> {
379 : loop {
380 : // terminates because when every iteration we remove an element from the map
381 124 : let Some((first_key, first_handle)) = self.map.iter().next() else {
382 76 : return RoutingResult::NeedConsultTenantManager;
383 : };
384 48 : let Ok(first_handle) = first_handle.upgrade() else {
385 : // TODO: dedup with get()
386 12 : trace!("handle cache stale");
387 12 : let first_key_owned = *first_key;
388 12 : self.map.remove(&first_key_owned).unwrap();
389 12 : continue;
390 : };
391 :
392 36 : let first_handle_shard_identity = first_handle.get_shard_identity();
393 36 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
394 36 : shard_number: shard_num,
395 36 : shard_count: first_handle_shard_identity.count,
396 36 : };
397 :
398 36 : let need_idx = match shard_selector {
399 36 : ShardSelector::Page(key) => {
400 36 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
401 : }
402 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
403 0 : ShardSelector::Known(shard_idx) => shard_idx,
404 : };
405 36 : let need_shard_timeline_id = ShardTimelineId {
406 36 : shard_index: need_idx,
407 36 : timeline_id,
408 36 : };
409 36 : let first_handle_shard_timeline_id = ShardTimelineId {
410 36 : shard_index: first_handle_shard_identity.shard_index(),
411 36 : timeline_id: first_handle.shard_timeline_id().timeline_id,
412 36 : };
413 36 :
414 36 : if need_shard_timeline_id == first_handle_shard_timeline_id {
415 24 : return RoutingResult::FastPath(first_handle);
416 : } else {
417 12 : return RoutingResult::SlowPath(need_shard_timeline_id);
418 : }
419 : }
420 112 : }
421 :
422 : #[instrument(level = "trace", skip_all)]
423 : #[inline(always)]
424 : async fn get_miss(
425 : &mut self,
426 : timeline_id: TimelineId,
427 : shard_selector: ShardSelector,
428 : tenant_manager: &T::TenantManager,
429 : ) -> Result<Handle<T>, GetError<T>> {
430 : match tenant_manager.resolve(timeline_id, shard_selector).await {
431 : Ok(timeline) => {
432 : let key = timeline.shard_timeline_id();
433 : match &shard_selector {
434 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
435 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
436 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
437 : }
438 :
439 : trace!("creating new HandleInner");
440 : let timeline = Arc::new(timeline);
441 : let handle_inner_arc =
442 : Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
443 : let handle_weak = WeakHandle {
444 : inner: Arc::downgrade(&handle_inner_arc),
445 : };
446 : let handle = handle_weak
447 : .upgrade()
448 : .ok()
449 : .expect("we just created it and it's not linked anywhere yet");
450 : {
451 : let mut lock_guard = timeline
452 : .per_timeline_state()
453 : .handles
454 : .lock()
455 : .expect("mutex poisoned");
456 : match &mut *lock_guard {
457 : Some(per_timeline_state) => {
458 : let replaced =
459 : per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
460 : assert!(replaced.is_none(), "some earlier code left a stale handle");
461 : match self.map.entry(key) {
462 : hash_map::Entry::Occupied(_o) => {
463 : // This cannot not happen because
464 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
465 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
466 : // while we were waiting for the tenant manager.
467 : unreachable!()
468 : }
469 : hash_map::Entry::Vacant(v) => {
470 : v.insert(handle_weak);
471 : }
472 : }
473 : }
474 : None => {
475 : return Err(GetError::PerTimelineStateShutDown);
476 : }
477 : }
478 : }
479 : Ok(handle)
480 : }
481 : Err(e) => Err(GetError::TenantManager(e)),
482 : }
483 : }
484 : }
485 :
486 : pub(crate) enum HandleUpgradeError {
487 : ShutDown,
488 : }
489 :
490 : impl<T: Types> WeakHandle<T> {
491 140 : pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
492 140 : let Some(inner) = Weak::upgrade(&self.inner) else {
493 8 : return Err(HandleUpgradeError::ShutDown);
494 : };
495 132 : let lock_guard = inner.lock().expect("poisoned");
496 132 : match &*lock_guard {
497 120 : HandleInner::Open(open) => {
498 120 : let open = Arc::clone(open);
499 120 : drop(lock_guard);
500 120 : Ok(Handle { open, inner })
501 : }
502 12 : HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
503 : }
504 140 : }
505 :
506 0 : pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
507 0 : Weak::ptr_eq(&self.inner, &other.inner)
508 0 : }
509 : }
510 :
511 : impl<T: Types> std::ops::Deref for Handle<T> {
512 : type Target = T::Timeline;
513 204 : fn deref(&self) -> &Self::Target {
514 204 : &self.open
515 204 : }
516 : }
517 :
518 : impl<T: Types> Handle<T> {
519 4 : pub(crate) fn downgrade(&self) -> WeakHandle<T> {
520 4 : WeakHandle {
521 4 : inner: Arc::downgrade(&self.inner),
522 4 : }
523 4 : }
524 : }
525 :
526 : impl<T: Types> PerTimelineState<T> {
527 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
528 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
529 : /// Even if [`TenantManager::resolve`] would still resolve to it.
530 : ///
531 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`Types::Timeline`] alive.
532 : /// That's ok because they're short-lived. See module-level comment for details.
533 : #[instrument(level = "trace", skip_all)]
534 : pub(super) fn shutdown(&self) {
535 : let handles = self
536 : .handles
537 : .lock()
538 : .expect("mutex poisoned")
539 : // NB: this .take() sets locked to None.
540 : // That's what makes future `Cache::get` misses fail.
541 : // Cache hits are taken care of below.
542 : .take();
543 : let Some(handles) = handles else {
544 : trace!("already shut down");
545 : return;
546 : };
547 : for handle_inner_arc in handles.values() {
548 : // Make hits fail.
549 : let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
550 : lock_guard.shutdown();
551 : }
552 : drop(handles);
553 : }
554 : }
555 :
556 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
557 : impl<T: Types> Drop for Cache<T> {
558 64 : fn drop(&mut self) {
559 : for (
560 : _,
561 : WeakHandle {
562 64 : inner: handle_inner_weak,
563 : },
564 64 : ) in self.map.drain()
565 : {
566 64 : let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
567 8 : continue;
568 : };
569 56 : let Some(handle_timeline) = handle_inner_arc
570 56 : // locking rules: drop lock before acquiring other lock below
571 56 : .lock()
572 56 : .expect("poisoned")
573 56 : .shutdown()
574 : else {
575 : // Concurrent PerTimelineState::shutdown.
576 0 : continue;
577 : };
578 : // Clean up per_timeline_state so the HandleInner allocation can be dropped.
579 56 : let per_timeline_state = handle_timeline.per_timeline_state();
580 56 : let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
581 56 : let Some(handles) = &mut *handles_lock_guard else {
582 0 : continue;
583 : };
584 56 : let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
585 : // Concurrent PerTimelineState::shutdown.
586 0 : continue;
587 : };
588 56 : drop(handles_lock_guard); // locking rules!
589 56 : assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
590 : }
591 64 : }
592 : }
593 :
594 : impl<T: Types> HandleInner<T> {
595 76 : fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
596 76 : match std::mem::replace(self, HandleInner::ShutDown) {
597 76 : HandleInner::Open(timeline) => Some(timeline),
598 : HandleInner::ShutDown => {
599 : // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
600 : // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
601 : // the handle lock at the same time.
602 0 : None
603 : }
604 : }
605 76 : }
606 : }
607 :
608 : #[cfg(test)]
609 : mod tests {
610 : use std::sync::Weak;
611 :
612 : use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
613 : use pageserver_api::models::ShardParameters;
614 : use pageserver_api::reltag::RelTag;
615 : use pageserver_api::shard::ShardStripeSize;
616 : use utils::shard::ShardCount;
617 : use utils::sync::gate::GateGuard;
618 :
619 : use super::*;
620 :
621 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
622 :
623 : #[derive(Debug)]
624 : struct TestTypes;
625 : impl Types for TestTypes {
626 : type TenantManagerError = anyhow::Error;
627 : type TenantManager = StubManager;
628 : type Timeline = Entered;
629 : }
630 :
631 : struct StubManager {
632 : shards: Vec<Arc<StubTimeline>>,
633 : }
634 :
635 : struct StubTimeline {
636 : gate: utils::sync::gate::Gate,
637 : id: TimelineId,
638 : shard: ShardIdentity,
639 : per_timeline_state: PerTimelineState<TestTypes>,
640 : myself: Weak<StubTimeline>,
641 : }
642 :
643 : struct Entered {
644 : timeline: Arc<StubTimeline>,
645 : #[allow(dead_code)] // it's stored here to keep the gate open
646 : gate_guard: Arc<GateGuard>,
647 : }
648 :
649 : impl StubTimeline {
650 44 : fn getpage(&self) {
651 44 : // do nothing
652 44 : }
653 : }
654 :
655 : impl Timeline<TestTypes> for Entered {
656 116 : fn shard_timeline_id(&self) -> ShardTimelineId {
657 116 : ShardTimelineId {
658 116 : shard_index: self.shard.shard_index(),
659 116 : timeline_id: self.id,
660 116 : }
661 116 : }
662 :
663 36 : fn get_shard_identity(&self) -> &ShardIdentity {
664 36 : &self.shard
665 36 : }
666 :
667 136 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
668 136 : &self.per_timeline_state
669 136 : }
670 : }
671 :
672 : impl TenantManager<TestTypes> for StubManager {
673 92 : async fn resolve(
674 92 : &self,
675 92 : timeline_id: TimelineId,
676 92 : shard_selector: ShardSelector,
677 92 : ) -> anyhow::Result<Entered> {
678 108 : for timeline in &self.shards {
679 100 : if timeline.id == timeline_id {
680 88 : let enter_gate = || {
681 84 : let gate_guard = timeline.gate.enter()?;
682 80 : let gate_guard = Arc::new(gate_guard);
683 80 : anyhow::Ok(gate_guard)
684 84 : };
685 0 : match &shard_selector {
686 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
687 0 : return Ok(Entered {
688 0 : timeline: Arc::clone(timeline),
689 0 : gate_guard: enter_gate()?,
690 : });
691 : }
692 0 : ShardSelector::Zero => continue,
693 76 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
694 76 : return Ok(Entered {
695 76 : timeline: Arc::clone(timeline),
696 76 : gate_guard: enter_gate()?,
697 : });
698 : }
699 0 : ShardSelector::Page(_) => continue,
700 12 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
701 8 : return Ok(Entered {
702 8 : timeline: Arc::clone(timeline),
703 8 : gate_guard: enter_gate()?,
704 : });
705 : }
706 4 : ShardSelector::Known(_) => continue,
707 : }
708 12 : }
709 : }
710 8 : anyhow::bail!("not found")
711 92 : }
712 : }
713 :
714 : impl std::ops::Deref for Entered {
715 : type Target = StubTimeline;
716 536 : fn deref(&self) -> &Self::Target {
717 536 : &self.timeline
718 536 : }
719 : }
720 :
721 : #[tokio::test(start_paused = true)]
722 4 : async fn test_timeline_shutdown() {
723 4 : crate::tenant::harness::setup_logging();
724 4 :
725 4 : let timeline_id = TimelineId::generate();
726 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
727 4 : gate: Default::default(),
728 4 : id: timeline_id,
729 4 : shard: ShardIdentity::unsharded(),
730 4 : per_timeline_state: PerTimelineState::default(),
731 4 : myself: myself.clone(),
732 4 : });
733 4 : let mgr = StubManager {
734 4 : shards: vec![shard0.clone()],
735 4 : };
736 4 : let key = DBDIR_KEY;
737 4 :
738 4 : let mut cache = Cache::<TestTypes>::default();
739 4 :
740 4 : //
741 4 : // fill the cache
742 4 : //
743 4 : let handle: Handle<_> = cache
744 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
745 4 : .await
746 4 : .expect("we have the timeline");
747 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
748 4 : assert_eq!(cache.map.len(), 1);
749 4 : drop(handle);
750 4 :
751 4 : //
752 4 : // demonstrate that Handle holds up gate closure
753 4 : // but shutdown prevents new handles from being handed out
754 4 : //
755 4 :
756 4 : tokio::select! {
757 4 : _ = shard0.gate.close() => {
758 4 : panic!("cache and per-timeline handler state keep cache open");
759 4 : }
760 4 : _ = tokio::time::sleep(FOREVER) => {
761 4 : // NB: first poll of close() makes it enter closing state
762 4 : }
763 4 : }
764 4 :
765 4 : let handle = cache
766 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
767 4 : .await
768 4 : .expect("we have the timeline");
769 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
770 4 :
771 4 : // SHUTDOWN
772 4 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
773 4 :
774 4 : assert_eq!(
775 4 : cache.map.len(),
776 4 : 1,
777 4 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
778 4 : );
779 4 :
780 4 : // this handle is perfectly usable
781 4 : handle.getpage();
782 4 :
783 4 : cache
784 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
785 4 : .await
786 4 : .err()
787 4 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
788 4 : assert_eq!(
789 4 : cache.map.len(),
790 4 : 0,
791 4 : "first access after shutdown cleans up the Weak's from the cache"
792 4 : );
793 4 :
794 4 : tokio::select! {
795 4 : _ = shard0.gate.close() => {
796 4 : panic!("handle is keeping gate open");
797 4 : }
798 4 : _ = tokio::time::sleep(FOREVER) => { }
799 4 : }
800 4 :
801 4 : drop(handle);
802 4 :
803 4 : // closing gate succeeds after dropping handle
804 4 : tokio::select! {
805 4 : _ = shard0.gate.close() => { }
806 4 : _ = tokio::time::sleep(FOREVER) => {
807 4 : panic!("handle is dropped, no other gate holders exist")
808 4 : }
809 4 : }
810 4 :
811 4 : // map gets cleaned on next lookup
812 4 : cache
813 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
814 4 : .await
815 4 : .err()
816 4 : .expect("documented behavior: can't get new handle after shutdown");
817 4 : assert_eq!(cache.map.len(), 0);
818 4 :
819 4 : // ensure all refs to shard0 are gone and we're not leaking anything
820 4 : drop(shard0);
821 4 : drop(mgr);
822 4 : }
823 :
824 : #[tokio::test]
825 4 : async fn test_multiple_timelines_and_deletion() {
826 4 : crate::tenant::harness::setup_logging();
827 4 :
828 4 : let timeline_a = TimelineId::generate();
829 4 : let timeline_b = TimelineId::generate();
830 4 : assert_ne!(timeline_a, timeline_b);
831 4 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
832 4 : gate: Default::default(),
833 4 : id: timeline_a,
834 4 : shard: ShardIdentity::unsharded(),
835 4 : per_timeline_state: PerTimelineState::default(),
836 4 : myself: myself.clone(),
837 4 : });
838 4 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
839 4 : gate: Default::default(),
840 4 : id: timeline_b,
841 4 : shard: ShardIdentity::unsharded(),
842 4 : per_timeline_state: PerTimelineState::default(),
843 4 : myself: myself.clone(),
844 4 : });
845 4 : let mut mgr = StubManager {
846 4 : shards: vec![timeline_a.clone(), timeline_b.clone()],
847 4 : };
848 4 : let key = DBDIR_KEY;
849 4 :
850 4 : let mut cache = Cache::<TestTypes>::default();
851 4 :
852 4 : cache
853 4 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
854 4 : .await
855 4 : .expect("we have it");
856 4 : cache
857 4 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
858 4 : .await
859 4 : .expect("we have it");
860 4 : assert_eq!(cache.map.len(), 2);
861 4 :
862 4 : // delete timeline A
863 4 : timeline_a.per_timeline_state.shutdown();
864 8 : mgr.shards.retain(|t| t.id != timeline_a.id);
865 4 : assert!(
866 4 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
867 4 : .await
868 4 : .is_err(),
869 4 : "broken StubManager implementation"
870 4 : );
871 4 :
872 4 : assert_eq!(
873 4 : cache.map.len(),
874 4 : 2,
875 4 : "cache still has a Weak handle to Timeline A"
876 4 : );
877 4 : cache
878 4 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
879 4 : .await
880 4 : .err()
881 4 : .expect("documented behavior: can't get new handle after shutdown");
882 4 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
883 4 :
884 4 : cache
885 4 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
886 4 : .await
887 4 : .expect("we still have it");
888 4 : }
889 :
890 28 : fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
891 28 : rel_block_to_key(
892 28 : RelTag {
893 28 : spcnode: 1663,
894 28 : dbnode: 208101,
895 28 : relnode: 2620,
896 28 : forknum: 0,
897 28 : },
898 28 : shard.0 as u32 * params.stripe_size.0,
899 28 : )
900 28 : }
901 :
902 : #[tokio::test(start_paused = true)]
903 4 : async fn test_shard_split() {
904 4 : crate::tenant::harness::setup_logging();
905 4 : let timeline_id = TimelineId::generate();
906 4 : let parent = Arc::new_cyclic(|myself| StubTimeline {
907 4 : gate: Default::default(),
908 4 : id: timeline_id,
909 4 : shard: ShardIdentity::unsharded(),
910 4 : per_timeline_state: PerTimelineState::default(),
911 4 : myself: myself.clone(),
912 4 : });
913 4 : let child_params = ShardParameters {
914 4 : count: ShardCount(2),
915 4 : stripe_size: ShardStripeSize::default(),
916 4 : };
917 4 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
918 4 : gate: Default::default(),
919 4 : id: timeline_id,
920 4 : shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
921 4 : per_timeline_state: PerTimelineState::default(),
922 4 : myself: myself.clone(),
923 4 : });
924 4 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
925 4 : gate: Default::default(),
926 4 : id: timeline_id,
927 4 : shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
928 4 : per_timeline_state: PerTimelineState::default(),
929 4 : myself: myself.clone(),
930 4 : });
931 4 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
932 4 :
933 4 : let mut cache = Cache::<TestTypes>::default();
934 4 :
935 4 : // fill the cache with the parent
936 12 : for i in 0..2 {
937 8 : let handle = cache
938 8 : .get(
939 8 : timeline_id,
940 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
941 8 : &StubManager {
942 8 : shards: vec![parent.clone()],
943 8 : },
944 8 : )
945 8 : .await
946 8 : .expect("we have it");
947 8 : assert!(
948 8 : Weak::ptr_eq(&handle.myself, &parent.myself),
949 4 : "mgr returns parent first"
950 4 : );
951 8 : drop(handle);
952 4 : }
953 4 :
954 4 : //
955 4 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
956 4 : //
957 4 :
958 4 : // while we haven't shut down the parent, the cache will return the cached parent, even
959 4 : // if the tenant manager returns the child
960 12 : for i in 0..2 {
961 8 : let handle = cache
962 8 : .get(
963 8 : timeline_id,
964 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
965 8 : &StubManager {
966 8 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
967 8 : },
968 8 : )
969 8 : .await
970 8 : .expect("we have it");
971 8 : assert!(
972 8 : Weak::ptr_eq(&handle.myself, &parent.myself),
973 4 : "mgr returns parent"
974 4 : );
975 8 : drop(handle);
976 4 : }
977 4 :
978 4 : let parent_handle = cache
979 4 : .get(
980 4 : timeline_id,
981 4 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
982 4 : &StubManager {
983 4 : shards: vec![parent.clone()],
984 4 : },
985 4 : )
986 4 : .await
987 4 : .expect("we have it");
988 4 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
989 4 :
990 4 : // invalidate the cache
991 4 : parent.per_timeline_state.shutdown();
992 4 :
993 4 : // the cache will now return the child, even though the parent handle still exists
994 12 : for i in 0..2 {
995 8 : let handle = cache
996 8 : .get(
997 8 : timeline_id,
998 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
999 8 : &StubManager {
1000 8 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
1001 8 : },
1002 8 : )
1003 8 : .await
1004 8 : .expect("we have it");
1005 8 : assert!(
1006 8 : Weak::ptr_eq(
1007 8 : &handle.myself,
1008 8 : &child_shards_by_shard_number[i as usize].myself
1009 8 : ),
1010 4 : "mgr returns child"
1011 4 : );
1012 8 : drop(handle);
1013 4 : }
1014 4 :
1015 4 : // all the while the parent handle kept the parent gate open
1016 4 : tokio::select! {
1017 4 : _ = parent_handle.gate.close() => {
1018 4 : panic!("parent handle is keeping gate open");
1019 4 : }
1020 4 : _ = tokio::time::sleep(FOREVER) => { }
1021 4 : }
1022 4 : drop(parent_handle);
1023 4 : tokio::select! {
1024 4 : _ = parent.gate.close() => { }
1025 4 : _ = tokio::time::sleep(FOREVER) => {
1026 4 : panic!("parent handle is dropped, no other gate holders exist")
1027 4 : }
1028 4 : }
1029 4 : }
1030 :
1031 : #[tokio::test(start_paused = true)]
1032 4 : async fn test_connection_handler_exit() {
1033 4 : crate::tenant::harness::setup_logging();
1034 4 : let timeline_id = TimelineId::generate();
1035 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1036 4 : gate: Default::default(),
1037 4 : id: timeline_id,
1038 4 : shard: ShardIdentity::unsharded(),
1039 4 : per_timeline_state: PerTimelineState::default(),
1040 4 : myself: myself.clone(),
1041 4 : });
1042 4 : let mgr = StubManager {
1043 4 : shards: vec![shard0.clone()],
1044 4 : };
1045 4 : let key = DBDIR_KEY;
1046 4 :
1047 4 : // Simulate 10 connections that's opened, used, and closed
1048 44 : for _ in 0..10 {
1049 40 : let mut cache = Cache::<TestTypes>::default();
1050 40 : let handle = {
1051 40 : let handle = cache
1052 40 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1053 40 : .await
1054 40 : .expect("we have the timeline");
1055 40 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1056 40 : handle
1057 40 : };
1058 40 : handle.getpage();
1059 4 : }
1060 4 :
1061 4 : // No handles exist, thus gates are closed and don't require shutdown.
1062 4 : // Thus the gate should close immediately, even without shutdown.
1063 4 : tokio::select! {
1064 4 : _ = shard0.gate.close() => { }
1065 4 : _ = tokio::time::sleep(FOREVER) => {
1066 4 : panic!("handle is dropped, no other gate holders exist")
1067 4 : }
1068 4 : }
1069 4 : }
1070 :
1071 : #[tokio::test(start_paused = true)]
1072 4 : async fn test_weak_handles() {
1073 4 : crate::tenant::harness::setup_logging();
1074 4 : let timeline_id = TimelineId::generate();
1075 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1076 4 : gate: Default::default(),
1077 4 : id: timeline_id,
1078 4 : shard: ShardIdentity::unsharded(),
1079 4 : per_timeline_state: PerTimelineState::default(),
1080 4 : myself: myself.clone(),
1081 4 : });
1082 4 : let mgr = StubManager {
1083 4 : shards: vec![shard0.clone()],
1084 4 : };
1085 4 :
1086 4 : let refcount_start = Arc::strong_count(&shard0);
1087 4 :
1088 4 : let key = DBDIR_KEY;
1089 4 :
1090 4 : let mut cache = Cache::<TestTypes>::default();
1091 4 :
1092 4 : let handle = cache
1093 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1094 4 : .await
1095 4 : .expect("we have the timeline");
1096 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1097 4 :
1098 4 : let weak_handle = handle.downgrade();
1099 4 :
1100 4 : drop(handle);
1101 4 :
1102 4 : let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
1103 4 :
1104 4 : // Start shutdown
1105 4 : shard0.per_timeline_state.shutdown();
1106 4 :
1107 4 : // Upgrades during shutdown don't work, even if upgraded_handle exists.
1108 4 : weak_handle
1109 4 : .upgrade()
1110 4 : .err()
1111 4 : .expect("can't upgrade weak handle as soon as shutdown started");
1112 4 :
1113 4 : // But upgraded_handle is still alive, so the gate won't close.
1114 4 : tokio::select! {
1115 4 : _ = shard0.gate.close() => {
1116 4 : panic!("handle is keeping gate open");
1117 4 : }
1118 4 : _ = tokio::time::sleep(FOREVER) => { }
1119 4 : }
1120 4 :
1121 4 : // Drop the last handle.
1122 4 : drop(upgraded_handle);
1123 4 :
1124 4 : // The gate should close now, despite there still being a weak_handle.
1125 4 : tokio::select! {
1126 4 : _ = shard0.gate.close() => { }
1127 4 : _ = tokio::time::sleep(FOREVER) => {
1128 4 : panic!("only strong handle is dropped and we shut down per-timeline-state")
1129 4 : }
1130 4 : }
1131 4 :
1132 4 : // The weak handle still can't be upgraded.
1133 4 : weak_handle
1134 4 : .upgrade()
1135 4 : .err()
1136 4 : .expect("still shouldn't be able to upgrade the weak handle");
1137 4 :
1138 4 : // There should be no strong references to the timeline object except the one on "stack".
1139 4 : assert_eq!(Arc::strong_count(&shard0), refcount_start);
1140 4 : }
1141 :
1142 : #[tokio::test(start_paused = true)]
1143 4 : async fn test_reference_cycle_broken_when_cache_is_dropped() {
1144 4 : crate::tenant::harness::setup_logging();
1145 4 : let timeline_id = TimelineId::generate();
1146 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1147 4 : gate: Default::default(),
1148 4 : id: timeline_id,
1149 4 : shard: ShardIdentity::unsharded(),
1150 4 : per_timeline_state: PerTimelineState::default(),
1151 4 : myself: myself.clone(),
1152 4 : });
1153 4 : let mgr = StubManager {
1154 4 : shards: vec![shard0.clone()],
1155 4 : };
1156 4 : let key = DBDIR_KEY;
1157 4 :
1158 4 : let mut cache = Cache::<TestTypes>::default();
1159 4 :
1160 4 : // helper to check if a handle is referenced by per_timeline_state
1161 8 : let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
1162 8 : let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
1163 8 : let per_timeline_state = per_timeline_state.as_ref().unwrap();
1164 8 : per_timeline_state
1165 8 : .values()
1166 8 : .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
1167 8 : };
1168 4 :
1169 4 : // Fill the cache.
1170 4 : let handle = cache
1171 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1172 4 : .await
1173 4 : .expect("we have the timeline");
1174 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1175 4 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1176 4 : assert!(
1177 4 : per_timeline_state_refs_handle(&handle_inner_weak),
1178 4 : "we still hold `handle` _and_ haven't dropped `cache` yet"
1179 4 : );
1180 4 :
1181 4 : // Drop the cache.
1182 4 : drop(cache);
1183 4 :
1184 4 : assert!(
1185 4 : !(per_timeline_state_refs_handle(&handle_inner_weak)),
1186 4 : "nothing should reference the handle allocation anymore"
1187 4 : );
1188 4 : assert!(
1189 4 : Weak::upgrade(&handle_inner_weak).is_some(),
1190 4 : "the local `handle` still keeps the allocation alive"
1191 4 : );
1192 4 : // but obviously the cache is gone so no new allocations can be handed out.
1193 4 :
1194 4 : // Drop handle.
1195 4 : drop(handle);
1196 4 : assert!(
1197 4 : Weak::upgrade(&handle_inner_weak).is_none(),
1198 4 : "the local `handle` is dropped, so the allocation should be dropped by now"
1199 4 : );
1200 4 : }
1201 :
1202 : #[tokio::test(start_paused = true)]
1203 4 : async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
1204 4 : crate::tenant::harness::setup_logging();
1205 4 : let timeline_id = TimelineId::generate();
1206 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1207 4 : gate: Default::default(),
1208 4 : id: timeline_id,
1209 4 : shard: ShardIdentity::unsharded(),
1210 4 : per_timeline_state: PerTimelineState::default(),
1211 4 : myself: myself.clone(),
1212 4 : });
1213 4 : let mgr = StubManager {
1214 4 : shards: vec![shard0.clone()],
1215 4 : };
1216 4 : let key = DBDIR_KEY;
1217 4 :
1218 4 : let mut cache = Cache::<TestTypes>::default();
1219 4 : let handle = cache
1220 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1221 4 : .await
1222 4 : .expect("we have the timeline");
1223 4 : // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
1224 4 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1225 4 :
1226 4 : // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
1227 4 : drop(handle);
1228 4 : assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
1229 4 :
1230 4 : // Shutdown the per_timeline_state.
1231 4 : shard0.per_timeline_state.shutdown();
1232 4 : assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
1233 4 :
1234 4 : // cache only contains Weak's, so, it can outlive the per_timeline_state without
1235 4 : // Drop explicitly solely to make this point.
1236 4 : drop(cache);
1237 4 : }
1238 : }
|