Line data Source code
1 : //! An efficient way to keep the timeline gate open without preventing
2 : //! timeline shutdown for longer than a single call to a timeline method.
3 : //!
4 : //! # Motivation
5 : //!
6 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
7 : //!
8 : //! Without sharding, there is a single Timeline object to which we dispatch
9 : //! all requests. For example, a getpage request gets dispatched to the
10 : //! Timeline::get method of the Timeline object that represents the
11 : //! (tenant,timeline) of that connection.
12 : //!
13 : //! With sharding, for each request that comes in on the connection,
14 : //! we first have to perform shard routing based on the requested key (=~ page number).
15 : //! The result of shard routing is a Timeline object.
16 : //! We then dispatch the request to that Timeline object.
17 : //!
18 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
19 : //! we hold the Timeline gate open while we're invoking the method on the
20 : //! Timeline object.
21 : //!
22 : //! However, we want to avoid the overhead of entering the gate for every
23 : //! method invocation.
24 : //!
25 : //! Further, for shard routing, we want to avoid calling the tenant manager to
26 : //! resolve the shard for every request. Instead, we want to cache the
27 : //! routing result so we can bypass the tenant manager for all subsequent requests
28 : //! that get routed to that shard.
29 : //!
30 : //! Regardless of how we accomplish the above, it should not
31 : //! prevent the Timeline from shutting down promptly.
32 : //!
33 : //! # Design
34 : //!
35 : //! ## Data Structures
36 : //!
37 : //! There are three user-facing data structures:
38 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
39 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
40 : //! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
41 : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
42 : //! trying to ugprade back to a `Handle`, guaranteeing it's the same `Timeline` *object*.
43 : //!
44 : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
45 : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
46 : //!
47 : //! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
48 : //! referenced weakly and strongly from various places which we are now illustrating.
49 : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
50 : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
51 : //! or `Weak<Mutex<HandleInner>>`, respectively.
52 : //!
53 : //! - The `Handle` is a strong ref.
54 : //! - The `WeakHandle` is a weak ref.
55 : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
56 : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
57 : //!
58 : //! Lifetimes:
59 : //! - `WeakHandle` and `Handle`: single pagestream request.
60 : //! - `Cache`: single page service connection.
61 : //! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
62 : //!
63 : //! ## Request Handling Flow (= filling and using the `Cache``)
64 : //!
65 : //! To dispatch a request, the page service connection calls `Cache::get`.
66 : //!
67 : //! A cache miss means we consult the tenant manager for shard routing,
68 : //! resulting in an `Arc<Timeline>`. We enter its gate _once_ and store it in the the
69 : //! `Arc<Mutex<HandleInner>>>`. A weak ref is stored in the `Cache`
70 : //! and a strong ref in the `PerTimelineState`.
71 : //! A strong ref is returned wrapped in a `Handle`.
72 : //!
73 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
74 : //! and find the weak ref in the cache.
75 : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
76 : //!
77 : //! The pagestream processing is pipelined and involves a batching step.
78 : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
79 : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
80 : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
81 : //! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
82 : //!
83 : //! # Performance
84 : //!
85 : //! Remember from the introductory section:
86 : //!
87 : //! > However, we want to avoid the overhead of entering the gate for every
88 : //! > method invocation.
89 : //!
90 : //! Why do we want to avoid that?
91 : //! Because the gate is a shared location in memory and entering it involves
92 : //! bumping refcounts, which leads to cache contention if done frequently
93 : //! from multiple cores in parallel.
94 : //!
95 : //! So, we only acquire the `GateGuard` once on `Cache` miss, and wrap it in an `Arc`.
96 : //! That `Arc` is private to the `HandleInner` and hence to the connection.
97 : //! (Review the "Data Structures" section if that is unclear to you.)
98 : //!
99 : //! A `WeakHandle` is a weak ref to the `HandleInner`.
100 : //! When upgrading a `WeakHandle`, we upgrade to a strong ref to the `HandleInner` and
101 : //! further acquire an additional strong ref to the `Arc<GateGuard>` inside it.
102 : //! Again, this manipulation of ref counts is is cheap because `Arc` is private to the connection.
103 : //!
104 : //! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
105 : //! Again, this is cheap because the `Arc` is private to the connection.
106 : //!
107 : //! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
108 : //! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
109 : //! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
110 : //! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
111 : //! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
112 : //! so that we can clone it cheaply when upgrading a `WeakHandle`.
113 : //!
114 : //! # Shutdown
115 : //!
116 : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
117 : //!
118 : //! ```text
119 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
120 : //! ```
121 : //!
122 : //! Further, there is this cycle:
123 : //!
124 : //! ```text
125 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> GateGuard --keepalive--> Timeline
126 : //! ```
127 : //!
128 : //! The former cycle is a memory leak if not broken.
129 : //! The latter cycle further prevents the Timeline from shutting down
130 : //! because we certainly won't drop the Timeline while the GateGuard is alive.
131 : //! Preventing shutdown is the whole point of this handle/cache system,
132 : //! but when the Timeline needs to shut down, we need to break the cycle.
133 : //!
134 : //! The cycle is broken by either
135 : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
136 : //! - Connection shutdown (=> dropping the `Cache`).
137 : //!
138 : //! Both transition the `HandleInner` from [`HandleInner::KeepingTimelineGateOpen`] to
139 : //! [`HandleInner::ShutDown`], which drops the only long-lived strong ref to the
140 : //! `Arc<GateGuard>`.
141 : //!
142 : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
143 : //! thereby breaking the cycle.
144 : //! It also initiates draining of already existing `Handle`s by
145 : //! poisoning things so that no new `HandleInner`'s can be added
146 : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
147 : //!
148 : //! Concurrently existing / already upgraded `Handle`s will extend the
149 : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
150 : //! However, since `Handle`s are short-lived and new `Handle`s are not
151 : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
152 : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
153 : //!
154 : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
155 : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
156 : //! they will find the inner in state `HandleInner::ShutDown` state where the
157 : //! `Arc<GateGuard>` and Timeline has already been dropped.
158 : //!
159 : //! Dropping the `Cache` undoes the registration of this `Cache`'s
160 : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
161 : //! removes the strong ref to each of its `HandleInner`s
162 : //! from all the `PerTimelineState`.
163 : //!
164 : //! # Locking Rules
165 : //!
166 : //! To prevent deadlocks we:
167 : //!
168 : //! 1. Only ever hold one of the locks at a time.
169 : //! 2. Don't add more than one Drop impl that locks on the
170 : //! cycles above.
171 : //!
172 : //! As per (2), that impl is in `Drop for Cache`.
173 : //!
174 : //! # Fast Path for Shard Routing
175 : //!
176 : //! The `Cache` has a fast path for shard routing to avoid calling into
177 : //! the tenant manager for every request.
178 : //!
179 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
180 : //!
181 : //! The current implementation uses the first entry in the hash map
182 : //! to determine the `ShardParameters` and derive the correct
183 : //! `ShardIndex` for the requested key.
184 : //!
185 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
186 : //!
187 : //! If the lookup is successful and the `WeakHandle` can be upgraded,
188 : //! it's a hit.
189 : //!
190 : //! ## Cache invalidation
191 : //!
192 : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
193 : //! The only reasons why an entry in the cache can become stale are:
194 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
195 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
196 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
197 : //!
198 : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
199 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
200 : //!
201 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
202 : //! to the parent shard during a shard split. Eventually, the shard split task will
203 : //! shut down the parent => case (1).
204 :
205 : use std::collections::{HashMap, hash_map};
206 : use std::sync::{Arc, Mutex, Weak};
207 :
208 : use pageserver_api::shard::ShardIdentity;
209 : use tracing::{instrument, trace};
210 : use utils::id::TimelineId;
211 : use utils::shard::{ShardIndex, ShardNumber};
212 :
213 : use crate::tenant::mgr::ShardSelector;
214 :
215 : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
216 : pub(crate) trait Types: Sized + std::fmt::Debug {
217 : type TenantManagerError: Sized + std::fmt::Debug;
218 : type TenantManager: TenantManager<Self> + Sized;
219 : type Timeline: ArcTimeline<Self> + Sized;
220 : }
221 :
222 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
223 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
224 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
225 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
226 : struct CacheId(u64);
227 :
228 : impl CacheId {
229 64 : fn next() -> Self {
230 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
231 64 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
232 64 : if id == 0 {
233 0 : panic!("CacheId::new() returned 0, overflow");
234 64 : }
235 64 : Self(id)
236 64 : }
237 : }
238 :
239 : /// See module-level comment.
240 : pub(crate) struct Cache<T: Types> {
241 : id: CacheId,
242 : map: Map<T>,
243 : }
244 :
245 : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
246 :
247 : impl<T: Types> Default for Cache<T> {
248 64 : fn default() -> Self {
249 64 : Self {
250 64 : id: CacheId::next(),
251 64 : map: Default::default(),
252 64 : }
253 64 : }
254 : }
255 :
256 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
257 : pub(crate) struct ShardTimelineId {
258 : pub(crate) shard_index: ShardIndex,
259 : pub(crate) timeline_id: TimelineId,
260 : }
261 :
262 : /// See module-level comment.
263 : pub(crate) struct Handle<T: Types> {
264 : timeline: Arc<T::Timeline>,
265 : #[allow(dead_code)] // the field exists to keep the gate open
266 : gate_guard: Arc<utils::sync::gate::GateGuard>,
267 : inner: Arc<Mutex<HandleInner<T>>>,
268 : }
269 : pub(crate) struct WeakHandle<T: Types> {
270 : inner: Weak<Mutex<HandleInner<T>>>,
271 : }
272 : enum HandleInner<T: Types> {
273 : KeepingTimelineGateOpen {
274 : #[allow(dead_code)]
275 : gate_guard: Arc<utils::sync::gate::GateGuard>,
276 : timeline: Arc<T::Timeline>,
277 : },
278 : ShutDown,
279 : }
280 :
281 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
282 : ///
283 : /// See module-level comment for details.
284 : pub struct PerTimelineState<T: Types> {
285 : // None = shutting down
286 : #[allow(clippy::type_complexity)]
287 : handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
288 : }
289 :
290 : impl<T: Types> Default for PerTimelineState<T> {
291 936 : fn default() -> Self {
292 936 : Self {
293 936 : handles: Mutex::new(Some(Default::default())),
294 936 : }
295 936 : }
296 : }
297 :
298 : /// Abstract view of [`crate::tenant::mgr`], for testability.
299 : pub(crate) trait TenantManager<T: Types> {
300 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
301 : /// Errors are returned as [`GetError::TenantManager`].
302 : async fn resolve(
303 : &self,
304 : timeline_id: TimelineId,
305 : shard_selector: ShardSelector,
306 : ) -> Result<T::Timeline, T::TenantManagerError>;
307 : }
308 :
309 : /// Abstract view of an [`Arc<Timeline>`], for testability.
310 : pub(crate) trait ArcTimeline<T: Types>: Clone {
311 : fn gate(&self) -> &utils::sync::gate::Gate;
312 : fn shard_timeline_id(&self) -> ShardTimelineId;
313 : fn get_shard_identity(&self) -> &ShardIdentity;
314 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
315 : }
316 :
317 : /// Errors returned by [`Cache::get`].
318 : #[derive(Debug)]
319 : pub(crate) enum GetError<T: Types> {
320 : TenantManager(T::TenantManagerError),
321 : TimelineGateClosed,
322 : PerTimelineStateShutDown,
323 : }
324 :
325 : /// Internal type used in [`Cache::get`].
326 : enum RoutingResult<T: Types> {
327 : FastPath(Handle<T>),
328 : SlowPath(ShardTimelineId),
329 : NeedConsultTenantManager,
330 : }
331 :
332 : impl<T: Types> Cache<T> {
333 : /// See module-level comment for details.
334 : ///
335 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
336 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
337 : /// the [`Handle`] are responsible for checking these conditions
338 : /// and if so, return an error that causes the page service to
339 : /// close the connection.
340 : #[instrument(level = "trace", skip_all)]
341 : pub(crate) async fn get(
342 : &mut self,
343 : timeline_id: TimelineId,
344 : shard_selector: ShardSelector,
345 : tenant_manager: &T::TenantManager,
346 : ) -> Result<Handle<T>, GetError<T>> {
347 : // terminates because when every iteration we remove an element from the map
348 : let miss: ShardSelector = loop {
349 : let routing_state = self.shard_routing(timeline_id, shard_selector);
350 : match routing_state {
351 : RoutingResult::FastPath(handle) => return Ok(handle),
352 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
353 : Some(cached) => match cached.upgrade() {
354 : Ok(upgraded) => return Ok(upgraded),
355 : Err(HandleUpgradeError::ShutDown) => {
356 : // TODO: dedup with shard_routing()
357 : trace!("handle cache stale");
358 : self.map.remove(&key).unwrap();
359 : continue;
360 : }
361 : },
362 : None => break ShardSelector::Known(key.shard_index),
363 : },
364 : RoutingResult::NeedConsultTenantManager => break shard_selector,
365 : }
366 : };
367 : self.get_miss(timeline_id, miss, tenant_manager).await
368 : }
369 :
370 : #[inline(always)]
371 114 : fn shard_routing(
372 114 : &mut self,
373 114 : timeline_id: TimelineId,
374 114 : shard_selector: ShardSelector,
375 114 : ) -> RoutingResult<T> {
376 : loop {
377 : // terminates because when every iteration we remove an element from the map
378 124 : let Some((first_key, first_handle)) = self.map.iter().next() else {
379 76 : return RoutingResult::NeedConsultTenantManager;
380 : };
381 48 : let Ok(first_handle) = first_handle.upgrade() else {
382 : // TODO: dedup with get()
383 10 : trace!("handle cache stale");
384 10 : let first_key_owned = *first_key;
385 10 : self.map.remove(&first_key_owned).unwrap();
386 10 : continue;
387 : };
388 :
389 38 : let first_handle_shard_identity = first_handle.get_shard_identity();
390 38 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
391 38 : shard_number: shard_num,
392 38 : shard_count: first_handle_shard_identity.count,
393 38 : };
394 :
395 38 : let need_idx = match shard_selector {
396 38 : ShardSelector::Page(key) => {
397 38 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
398 : }
399 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
400 0 : ShardSelector::Known(shard_idx) => shard_idx,
401 : };
402 38 : let need_shard_timeline_id = ShardTimelineId {
403 38 : shard_index: need_idx,
404 38 : timeline_id,
405 38 : };
406 38 : let first_handle_shard_timeline_id = ShardTimelineId {
407 38 : shard_index: first_handle_shard_identity.shard_index(),
408 38 : timeline_id: first_handle.shard_timeline_id().timeline_id,
409 38 : };
410 38 :
411 38 : if need_shard_timeline_id == first_handle_shard_timeline_id {
412 24 : return RoutingResult::FastPath(first_handle);
413 : } else {
414 14 : return RoutingResult::SlowPath(need_shard_timeline_id);
415 : }
416 : }
417 114 : }
418 :
419 : #[instrument(level = "trace", skip_all)]
420 : #[inline(always)]
421 : async fn get_miss(
422 : &mut self,
423 : timeline_id: TimelineId,
424 : shard_selector: ShardSelector,
425 : tenant_manager: &T::TenantManager,
426 : ) -> Result<Handle<T>, GetError<T>> {
427 : match tenant_manager.resolve(timeline_id, shard_selector).await {
428 : Ok(timeline) => {
429 : let key = timeline.shard_timeline_id();
430 : match &shard_selector {
431 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
432 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
433 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
434 : }
435 :
436 : trace!("creating new HandleInner");
437 : let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
438 : gate_guard: Arc::new(
439 : // this enter() is expensive in production code because
440 : // it hits the global Arc<Timeline>::gate refcounts
441 : match timeline.gate().enter() {
442 : Ok(guard) => guard,
443 : Err(_) => {
444 : return Err(GetError::TimelineGateClosed);
445 : }
446 : },
447 : ),
448 : // this clone is expensive in production code because
449 : // it hits the global Arc<Timeline>::clone refcounts
450 : timeline: Arc::new(timeline.clone()),
451 : }));
452 : let handle_weak = WeakHandle {
453 : inner: Arc::downgrade(&handle_inner_arc),
454 : };
455 : let handle = handle_weak
456 : .upgrade()
457 : .ok()
458 : .expect("we just created it and it's not linked anywhere yet");
459 : {
460 : let mut lock_guard = timeline
461 : .per_timeline_state()
462 : .handles
463 : .lock()
464 : .expect("mutex poisoned");
465 : match &mut *lock_guard {
466 : Some(per_timeline_state) => {
467 : let replaced =
468 : per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
469 : assert!(replaced.is_none(), "some earlier code left a stale handle");
470 : match self.map.entry(key) {
471 : hash_map::Entry::Occupied(_o) => {
472 : // This cannot not happen because
473 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
474 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
475 : // while we were waiting for the tenant manager.
476 : unreachable!()
477 : }
478 : hash_map::Entry::Vacant(v) => {
479 : v.insert(handle_weak);
480 : }
481 : }
482 : }
483 : None => {
484 : return Err(GetError::PerTimelineStateShutDown);
485 : }
486 : }
487 : }
488 : Ok(handle)
489 : }
490 : Err(e) => Err(GetError::TenantManager(e)),
491 : }
492 : }
493 : }
494 :
495 : pub(crate) enum HandleUpgradeError {
496 : ShutDown,
497 : }
498 :
499 : impl<T: Types> WeakHandle<T> {
500 142 : pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
501 142 : let Some(inner) = Weak::upgrade(&self.inner) else {
502 8 : return Err(HandleUpgradeError::ShutDown);
503 : };
504 134 : let lock_guard = inner.lock().expect("poisoned");
505 134 : match &*lock_guard {
506 : HandleInner::KeepingTimelineGateOpen {
507 122 : timeline,
508 122 : gate_guard,
509 122 : } => {
510 122 : let gate_guard = Arc::clone(gate_guard);
511 122 : let timeline = Arc::clone(timeline);
512 122 : drop(lock_guard);
513 122 : Ok(Handle {
514 122 : timeline,
515 122 : gate_guard,
516 122 : inner,
517 122 : })
518 : }
519 12 : HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
520 : }
521 142 : }
522 :
523 0 : pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
524 0 : Weak::ptr_eq(&self.inner, &other.inner)
525 0 : }
526 : }
527 :
528 : impl<T: Types> std::ops::Deref for Handle<T> {
529 : type Target = T::Timeline;
530 208 : fn deref(&self) -> &Self::Target {
531 208 : &self.timeline
532 208 : }
533 : }
534 :
535 : impl<T: Types> Handle<T> {
536 4 : pub(crate) fn downgrade(&self) -> WeakHandle<T> {
537 4 : WeakHandle {
538 4 : inner: Arc::downgrade(&self.inner),
539 4 : }
540 4 : }
541 : }
542 :
543 : impl<T: Types> PerTimelineState<T> {
544 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
545 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
546 : /// Even if [`TenantManager::resolve`] would still resolve to it.
547 : ///
548 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
549 : /// That's ok because they're short-lived. See module-level comment for details.
550 : #[instrument(level = "trace", skip_all)]
551 : pub(super) fn shutdown(&self) {
552 : let handles = self
553 : .handles
554 : .lock()
555 : .expect("mutex poisoned")
556 : // NB: this .take() sets locked to None.
557 : // That's what makes future `Cache::get` misses fail.
558 : // Cache hits are taken care of below.
559 : .take();
560 : let Some(handles) = handles else {
561 : trace!("already shut down");
562 : return;
563 : };
564 : for handle_inner_arc in handles.values() {
565 : // Make hits fail.
566 : let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
567 : lock_guard.shutdown();
568 : }
569 : drop(handles);
570 : }
571 : }
572 :
573 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
574 : impl<T: Types> Drop for Cache<T> {
575 64 : fn drop(&mut self) {
576 : for (
577 : _,
578 : WeakHandle {
579 64 : inner: handle_inner_weak,
580 : },
581 64 : ) in self.map.drain()
582 : {
583 64 : let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
584 8 : continue;
585 : };
586 56 : let Some(handle_timeline) = handle_inner_arc
587 56 : // locking rules: drop lock before acquiring other lock below
588 56 : .lock()
589 56 : .expect("poisoned")
590 56 : .shutdown()
591 : else {
592 : // Concurrent PerTimelineState::shutdown.
593 0 : continue;
594 : };
595 : // Clean up per_timeline_state so the HandleInner allocation can be dropped.
596 56 : let per_timeline_state = handle_timeline.per_timeline_state();
597 56 : let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
598 56 : let Some(handles) = &mut *handles_lock_guard else {
599 0 : continue;
600 : };
601 56 : let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
602 : // Concurrent PerTimelineState::shutdown.
603 0 : continue;
604 : };
605 56 : drop(handles_lock_guard); // locking rules!
606 56 : assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
607 : }
608 64 : }
609 : }
610 :
611 : impl<T: Types> HandleInner<T> {
612 76 : fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
613 76 : match std::mem::replace(self, HandleInner::ShutDown) {
614 76 : HandleInner::KeepingTimelineGateOpen { timeline, .. } => Some(timeline),
615 : HandleInner::ShutDown => {
616 : // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
617 : // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
618 : // the handle lock at the same time.
619 0 : None
620 : }
621 : }
622 76 : }
623 : }
624 :
625 : #[cfg(test)]
626 : mod tests {
627 : use std::sync::Weak;
628 :
629 : use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
630 : use pageserver_api::models::ShardParameters;
631 : use pageserver_api::reltag::RelTag;
632 : use pageserver_api::shard::ShardStripeSize;
633 : use utils::shard::ShardCount;
634 :
635 : use super::*;
636 :
637 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
638 :
639 : #[derive(Debug)]
640 : struct TestTypes;
641 : impl Types for TestTypes {
642 : type TenantManagerError = anyhow::Error;
643 : type TenantManager = StubManager;
644 : type Timeline = Arc<StubTimeline>;
645 : }
646 :
647 : struct StubManager {
648 : shards: Vec<Arc<StubTimeline>>,
649 : }
650 :
651 : struct StubTimeline {
652 : gate: utils::sync::gate::Gate,
653 : id: TimelineId,
654 : shard: ShardIdentity,
655 : per_timeline_state: PerTimelineState<TestTypes>,
656 : myself: Weak<StubTimeline>,
657 : }
658 :
659 : impl StubTimeline {
660 44 : fn getpage(&self) {
661 44 : // do nothing
662 44 : }
663 : }
664 :
665 : impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
666 84 : fn gate(&self) -> &utils::sync::gate::Gate {
667 84 : &self.gate
668 84 : }
669 :
670 122 : fn shard_timeline_id(&self) -> ShardTimelineId {
671 122 : ShardTimelineId {
672 122 : shard_index: self.shard.shard_index(),
673 122 : timeline_id: self.id,
674 122 : }
675 122 : }
676 :
677 38 : fn get_shard_identity(&self) -> &ShardIdentity {
678 38 : &self.shard
679 38 : }
680 :
681 136 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
682 136 : &self.per_timeline_state
683 136 : }
684 : }
685 :
686 : impl TenantManager<TestTypes> for StubManager {
687 92 : async fn resolve(
688 92 : &self,
689 92 : timeline_id: TimelineId,
690 92 : shard_selector: ShardSelector,
691 92 : ) -> anyhow::Result<Arc<StubTimeline>> {
692 108 : for timeline in &self.shards {
693 100 : if timeline.id == timeline_id {
694 0 : match &shard_selector {
695 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
696 0 : return Ok(Arc::clone(timeline));
697 : }
698 0 : ShardSelector::Zero => continue,
699 76 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
700 76 : return Ok(Arc::clone(timeline));
701 : }
702 0 : ShardSelector::Page(_) => continue,
703 12 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
704 8 : return Ok(Arc::clone(timeline));
705 : }
706 4 : ShardSelector::Known(_) => continue,
707 : }
708 12 : }
709 : }
710 8 : anyhow::bail!("not found")
711 92 : }
712 : }
713 :
714 : #[tokio::test(start_paused = true)]
715 4 : async fn test_timeline_shutdown() {
716 4 : crate::tenant::harness::setup_logging();
717 4 :
718 4 : let timeline_id = TimelineId::generate();
719 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
720 4 : gate: Default::default(),
721 4 : id: timeline_id,
722 4 : shard: ShardIdentity::unsharded(),
723 4 : per_timeline_state: PerTimelineState::default(),
724 4 : myself: myself.clone(),
725 4 : });
726 4 : let mgr = StubManager {
727 4 : shards: vec![shard0.clone()],
728 4 : };
729 4 : let key = DBDIR_KEY;
730 4 :
731 4 : let mut cache = Cache::<TestTypes>::default();
732 4 :
733 4 : //
734 4 : // fill the cache
735 4 : //
736 4 : let handle: Handle<_> = cache
737 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
738 4 : .await
739 4 : .expect("we have the timeline");
740 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
741 4 : assert_eq!(cache.map.len(), 1);
742 4 : drop(handle);
743 4 :
744 4 : //
745 4 : // demonstrate that Handle holds up gate closure
746 4 : // but shutdown prevents new handles from being handed out
747 4 : //
748 4 :
749 4 : tokio::select! {
750 4 : _ = shard0.gate.close() => {
751 4 : panic!("cache and per-timeline handler state keep cache open");
752 4 : }
753 4 : _ = tokio::time::sleep(FOREVER) => {
754 4 : // NB: first poll of close() makes it enter closing state
755 4 : }
756 4 : }
757 4 :
758 4 : let handle = cache
759 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
760 4 : .await
761 4 : .expect("we have the timeline");
762 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
763 4 :
764 4 : // SHUTDOWN
765 4 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
766 4 :
767 4 : assert_eq!(
768 4 : cache.map.len(),
769 4 : 1,
770 4 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
771 4 : );
772 4 :
773 4 : // this handle is perfectly usable
774 4 : handle.getpage();
775 4 :
776 4 : cache
777 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
778 4 : .await
779 4 : .err()
780 4 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
781 4 : assert_eq!(
782 4 : cache.map.len(),
783 4 : 0,
784 4 : "first access after shutdown cleans up the Weak's from the cache"
785 4 : );
786 4 :
787 4 : tokio::select! {
788 4 : _ = shard0.gate.close() => {
789 4 : panic!("handle is keeping gate open");
790 4 : }
791 4 : _ = tokio::time::sleep(FOREVER) => { }
792 4 : }
793 4 :
794 4 : drop(handle);
795 4 :
796 4 : // closing gate succeeds after dropping handle
797 4 : tokio::select! {
798 4 : _ = shard0.gate.close() => { }
799 4 : _ = tokio::time::sleep(FOREVER) => {
800 4 : panic!("handle is dropped, no other gate holders exist")
801 4 : }
802 4 : }
803 4 :
804 4 : // map gets cleaned on next lookup
805 4 : cache
806 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
807 4 : .await
808 4 : .err()
809 4 : .expect("documented behavior: can't get new handle after shutdown");
810 4 : assert_eq!(cache.map.len(), 0);
811 4 :
812 4 : // ensure all refs to shard0 are gone and we're not leaking anything
813 4 : drop(shard0);
814 4 : drop(mgr);
815 4 : }
816 :
817 : #[tokio::test]
818 4 : async fn test_multiple_timelines_and_deletion() {
819 4 : crate::tenant::harness::setup_logging();
820 4 :
821 4 : let timeline_a = TimelineId::generate();
822 4 : let timeline_b = TimelineId::generate();
823 4 : assert_ne!(timeline_a, timeline_b);
824 4 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
825 4 : gate: Default::default(),
826 4 : id: timeline_a,
827 4 : shard: ShardIdentity::unsharded(),
828 4 : per_timeline_state: PerTimelineState::default(),
829 4 : myself: myself.clone(),
830 4 : });
831 4 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
832 4 : gate: Default::default(),
833 4 : id: timeline_b,
834 4 : shard: ShardIdentity::unsharded(),
835 4 : per_timeline_state: PerTimelineState::default(),
836 4 : myself: myself.clone(),
837 4 : });
838 4 : let mut mgr = StubManager {
839 4 : shards: vec![timeline_a.clone(), timeline_b.clone()],
840 4 : };
841 4 : let key = DBDIR_KEY;
842 4 :
843 4 : let mut cache = Cache::<TestTypes>::default();
844 4 :
845 4 : cache
846 4 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
847 4 : .await
848 4 : .expect("we have it");
849 4 : cache
850 4 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
851 4 : .await
852 4 : .expect("we have it");
853 4 : assert_eq!(cache.map.len(), 2);
854 4 :
855 4 : // delete timeline A
856 4 : timeline_a.per_timeline_state.shutdown();
857 8 : mgr.shards.retain(|t| t.id != timeline_a.id);
858 4 : assert!(
859 4 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
860 4 : .await
861 4 : .is_err(),
862 4 : "broken StubManager implementation"
863 4 : );
864 4 :
865 4 : assert_eq!(
866 4 : cache.map.len(),
867 4 : 2,
868 4 : "cache still has a Weak handle to Timeline A"
869 4 : );
870 4 : cache
871 4 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
872 4 : .await
873 4 : .err()
874 4 : .expect("documented behavior: can't get new handle after shutdown");
875 4 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
876 4 :
877 4 : cache
878 4 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
879 4 : .await
880 4 : .expect("we still have it");
881 4 : }
882 :
883 28 : fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
884 28 : rel_block_to_key(
885 28 : RelTag {
886 28 : spcnode: 1663,
887 28 : dbnode: 208101,
888 28 : relnode: 2620,
889 28 : forknum: 0,
890 28 : },
891 28 : shard.0 as u32 * params.stripe_size.0,
892 28 : )
893 28 : }
894 :
895 : #[tokio::test(start_paused = true)]
896 4 : async fn test_shard_split() {
897 4 : crate::tenant::harness::setup_logging();
898 4 : let timeline_id = TimelineId::generate();
899 4 : let parent = Arc::new_cyclic(|myself| StubTimeline {
900 4 : gate: Default::default(),
901 4 : id: timeline_id,
902 4 : shard: ShardIdentity::unsharded(),
903 4 : per_timeline_state: PerTimelineState::default(),
904 4 : myself: myself.clone(),
905 4 : });
906 4 : let child_params = ShardParameters {
907 4 : count: ShardCount(2),
908 4 : stripe_size: ShardStripeSize::default(),
909 4 : };
910 4 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
911 4 : gate: Default::default(),
912 4 : id: timeline_id,
913 4 : shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
914 4 : per_timeline_state: PerTimelineState::default(),
915 4 : myself: myself.clone(),
916 4 : });
917 4 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
918 4 : gate: Default::default(),
919 4 : id: timeline_id,
920 4 : shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
921 4 : per_timeline_state: PerTimelineState::default(),
922 4 : myself: myself.clone(),
923 4 : });
924 4 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
925 4 :
926 4 : let mut cache = Cache::<TestTypes>::default();
927 4 :
928 4 : // fill the cache with the parent
929 12 : for i in 0..2 {
930 8 : let handle = cache
931 8 : .get(
932 8 : timeline_id,
933 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
934 8 : &StubManager {
935 8 : shards: vec![parent.clone()],
936 8 : },
937 8 : )
938 8 : .await
939 8 : .expect("we have it");
940 8 : assert!(
941 8 : Weak::ptr_eq(&handle.myself, &parent.myself),
942 4 : "mgr returns parent first"
943 4 : );
944 8 : drop(handle);
945 4 : }
946 4 :
947 4 : //
948 4 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
949 4 : //
950 4 :
951 4 : // while we haven't shut down the parent, the cache will return the cached parent, even
952 4 : // if the tenant manager returns the child
953 12 : for i in 0..2 {
954 8 : let handle = cache
955 8 : .get(
956 8 : timeline_id,
957 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
958 8 : &StubManager {
959 8 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
960 8 : },
961 8 : )
962 8 : .await
963 8 : .expect("we have it");
964 8 : assert!(
965 8 : Weak::ptr_eq(&handle.myself, &parent.myself),
966 4 : "mgr returns parent"
967 4 : );
968 8 : drop(handle);
969 4 : }
970 4 :
971 4 : let parent_handle = cache
972 4 : .get(
973 4 : timeline_id,
974 4 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
975 4 : &StubManager {
976 4 : shards: vec![parent.clone()],
977 4 : },
978 4 : )
979 4 : .await
980 4 : .expect("we have it");
981 4 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
982 4 :
983 4 : // invalidate the cache
984 4 : parent.per_timeline_state.shutdown();
985 4 :
986 4 : // the cache will now return the child, even though the parent handle still exists
987 12 : for i in 0..2 {
988 8 : let handle = cache
989 8 : .get(
990 8 : timeline_id,
991 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
992 8 : &StubManager {
993 8 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
994 8 : },
995 8 : )
996 8 : .await
997 8 : .expect("we have it");
998 8 : assert!(
999 8 : Weak::ptr_eq(
1000 8 : &handle.myself,
1001 8 : &child_shards_by_shard_number[i as usize].myself
1002 8 : ),
1003 4 : "mgr returns child"
1004 4 : );
1005 8 : drop(handle);
1006 4 : }
1007 4 :
1008 4 : // all the while the parent handle kept the parent gate open
1009 4 : tokio::select! {
1010 4 : _ = parent_handle.gate.close() => {
1011 4 : panic!("parent handle is keeping gate open");
1012 4 : }
1013 4 : _ = tokio::time::sleep(FOREVER) => { }
1014 4 : }
1015 4 : drop(parent_handle);
1016 4 : tokio::select! {
1017 4 : _ = parent.gate.close() => { }
1018 4 : _ = tokio::time::sleep(FOREVER) => {
1019 4 : panic!("parent handle is dropped, no other gate holders exist")
1020 4 : }
1021 4 : }
1022 4 : }
1023 :
1024 : #[tokio::test(start_paused = true)]
1025 4 : async fn test_connection_handler_exit() {
1026 4 : crate::tenant::harness::setup_logging();
1027 4 : let timeline_id = TimelineId::generate();
1028 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1029 4 : gate: Default::default(),
1030 4 : id: timeline_id,
1031 4 : shard: ShardIdentity::unsharded(),
1032 4 : per_timeline_state: PerTimelineState::default(),
1033 4 : myself: myself.clone(),
1034 4 : });
1035 4 : let mgr = StubManager {
1036 4 : shards: vec![shard0.clone()],
1037 4 : };
1038 4 : let key = DBDIR_KEY;
1039 4 :
1040 4 : // Simulate 10 connections that's opened, used, and closed
1041 4 : let mut used_handles = vec![];
1042 44 : for _ in 0..10 {
1043 40 : let mut cache = Cache::<TestTypes>::default();
1044 40 : let handle = {
1045 40 : let handle = cache
1046 40 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1047 40 : .await
1048 40 : .expect("we have the timeline");
1049 40 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1050 40 : handle
1051 40 : };
1052 40 : handle.getpage();
1053 40 : used_handles.push(Arc::downgrade(&handle.timeline));
1054 4 : }
1055 4 :
1056 4 : // No handles exist, thus gates are closed and don't require shutdown.
1057 4 : // Thus the gate should close immediately, even without shutdown.
1058 4 : tokio::select! {
1059 4 : _ = shard0.gate.close() => { }
1060 4 : _ = tokio::time::sleep(FOREVER) => {
1061 4 : panic!("handle is dropped, no other gate holders exist")
1062 4 : }
1063 4 : }
1064 4 : }
1065 :
1066 : #[tokio::test(start_paused = true)]
1067 4 : async fn test_weak_handles() {
1068 4 : crate::tenant::harness::setup_logging();
1069 4 : let timeline_id = TimelineId::generate();
1070 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1071 4 : gate: Default::default(),
1072 4 : id: timeline_id,
1073 4 : shard: ShardIdentity::unsharded(),
1074 4 : per_timeline_state: PerTimelineState::default(),
1075 4 : myself: myself.clone(),
1076 4 : });
1077 4 : let mgr = StubManager {
1078 4 : shards: vec![shard0.clone()],
1079 4 : };
1080 4 :
1081 4 : let refcount_start = Arc::strong_count(&shard0);
1082 4 :
1083 4 : let key = DBDIR_KEY;
1084 4 :
1085 4 : let mut cache = Cache::<TestTypes>::default();
1086 4 :
1087 4 : let handle = cache
1088 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1089 4 : .await
1090 4 : .expect("we have the timeline");
1091 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1092 4 :
1093 4 : let weak_handle = handle.downgrade();
1094 4 :
1095 4 : drop(handle);
1096 4 :
1097 4 : let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
1098 4 :
1099 4 : // Start shutdown
1100 4 : shard0.per_timeline_state.shutdown();
1101 4 :
1102 4 : // Upgrades during shutdown don't work, even if upgraded_handle exists.
1103 4 : weak_handle
1104 4 : .upgrade()
1105 4 : .err()
1106 4 : .expect("can't upgrade weak handle as soon as shutdown started");
1107 4 :
1108 4 : // But upgraded_handle is still alive, so the gate won't close.
1109 4 : tokio::select! {
1110 4 : _ = shard0.gate.close() => {
1111 4 : panic!("handle is keeping gate open");
1112 4 : }
1113 4 : _ = tokio::time::sleep(FOREVER) => { }
1114 4 : }
1115 4 :
1116 4 : // Drop the last handle.
1117 4 : drop(upgraded_handle);
1118 4 :
1119 4 : // The gate should close now, despite there still being a weak_handle.
1120 4 : tokio::select! {
1121 4 : _ = shard0.gate.close() => { }
1122 4 : _ = tokio::time::sleep(FOREVER) => {
1123 4 : panic!("only strong handle is dropped and we shut down per-timeline-state")
1124 4 : }
1125 4 : }
1126 4 :
1127 4 : // The weak handle still can't be upgraded.
1128 4 : weak_handle
1129 4 : .upgrade()
1130 4 : .err()
1131 4 : .expect("still shouldn't be able to upgrade the weak handle");
1132 4 :
1133 4 : // There should be no strong references to the timeline object except the one on "stack".
1134 4 : assert_eq!(Arc::strong_count(&shard0), refcount_start);
1135 4 : }
1136 :
1137 : #[tokio::test(start_paused = true)]
1138 4 : async fn test_reference_cycle_broken_when_cache_is_dropped() {
1139 4 : crate::tenant::harness::setup_logging();
1140 4 : let timeline_id = TimelineId::generate();
1141 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1142 4 : gate: Default::default(),
1143 4 : id: timeline_id,
1144 4 : shard: ShardIdentity::unsharded(),
1145 4 : per_timeline_state: PerTimelineState::default(),
1146 4 : myself: myself.clone(),
1147 4 : });
1148 4 : let mgr = StubManager {
1149 4 : shards: vec![shard0.clone()],
1150 4 : };
1151 4 : let key = DBDIR_KEY;
1152 4 :
1153 4 : let mut cache = Cache::<TestTypes>::default();
1154 4 :
1155 4 : // helper to check if a handle is referenced by per_timeline_state
1156 8 : let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
1157 8 : let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
1158 8 : let per_timeline_state = per_timeline_state.as_ref().unwrap();
1159 8 : per_timeline_state
1160 8 : .values()
1161 8 : .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
1162 8 : };
1163 4 :
1164 4 : // Fill the cache.
1165 4 : let handle = cache
1166 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1167 4 : .await
1168 4 : .expect("we have the timeline");
1169 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1170 4 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1171 4 : assert!(
1172 4 : per_timeline_state_refs_handle(&handle_inner_weak),
1173 4 : "we still hold `handle` _and_ haven't dropped `cache` yet"
1174 4 : );
1175 4 :
1176 4 : // Drop the cache.
1177 4 : drop(cache);
1178 4 :
1179 4 : assert!(
1180 4 : !(per_timeline_state_refs_handle(&handle_inner_weak)),
1181 4 : "nothing should reference the handle allocation anymore"
1182 4 : );
1183 4 : assert!(
1184 4 : Weak::upgrade(&handle_inner_weak).is_some(),
1185 4 : "the local `handle` still keeps the allocation alive"
1186 4 : );
1187 4 : // but obviously the cache is gone so no new allocations can be handed out.
1188 4 :
1189 4 : // Drop handle.
1190 4 : drop(handle);
1191 4 : assert!(
1192 4 : Weak::upgrade(&handle_inner_weak).is_none(),
1193 4 : "the local `handle` is dropped, so the allocation should be dropped by now"
1194 4 : );
1195 4 : }
1196 :
1197 : #[tokio::test(start_paused = true)]
1198 4 : async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
1199 4 : crate::tenant::harness::setup_logging();
1200 4 : let timeline_id = TimelineId::generate();
1201 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1202 4 : gate: Default::default(),
1203 4 : id: timeline_id,
1204 4 : shard: ShardIdentity::unsharded(),
1205 4 : per_timeline_state: PerTimelineState::default(),
1206 4 : myself: myself.clone(),
1207 4 : });
1208 4 : let mgr = StubManager {
1209 4 : shards: vec![shard0.clone()],
1210 4 : };
1211 4 : let key = DBDIR_KEY;
1212 4 :
1213 4 : let mut cache = Cache::<TestTypes>::default();
1214 4 : let handle = cache
1215 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1216 4 : .await
1217 4 : .expect("we have the timeline");
1218 4 : // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
1219 4 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1220 4 :
1221 4 : // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
1222 4 : drop(handle);
1223 4 : assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
1224 4 :
1225 4 : // Shutdown the per_timeline_state.
1226 4 : shard0.per_timeline_state.shutdown();
1227 4 : assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
1228 4 :
1229 4 : // cache only contains Weak's, so, it can outlive the per_timeline_state without
1230 4 : // Drop explicitly solely to make this point.
1231 4 : drop(cache);
1232 4 : }
1233 : }
|