Line data Source code
1 : //! An efficient way to keep the timeline gate open without preventing
2 : //! timeline shutdown for longer than a single call to a timeline method.
3 : //!
4 : //! # Motivation
5 : //!
6 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
7 : //!
8 : //! Without sharding, there is a single Timeline object to which we dispatch
9 : //! all requests. For example, a getpage request gets dispatched to the
10 : //! Timeline::get method of the Timeline object that represents the
11 : //! (tenant,timeline) of that connection.
12 : //!
13 : //! With sharding, for each request that comes in on the connection,
14 : //! we first have to perform shard routing based on the requested key (=~ page number).
15 : //! The result of shard routing is a Timeline object.
16 : //! We then dispatch the request to that Timeline object.
17 : //!
18 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
19 : //! we hold the Timeline gate open while we're invoking the method on the
20 : //! Timeline object.
21 : //!
22 : //! However, we want to avoid the overhead of entering the gate for every
23 : //! method invocation.
24 : //!
25 : //! Further, for shard routing, we want to avoid calling the tenant manager to
26 : //! resolve the shard for every request. Instead, we want to cache the
27 : //! routing result so we can bypass the tenant manager for all subsequent requests
28 : //! that get routed to that shard.
29 : //!
30 : //! Regardless of how we accomplish the above, it should not
31 : //! prevent the Timeline from shutting down promptly.
32 : //!
33 : //! # Design
34 : //!
35 : //! ## Data Structures
36 : //!
37 : //! There are three user-facing data structures:
38 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
39 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
40 : //! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
41 : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
42 : //! trying to ugprade back to a `Handle`, guaranteeing it's the same `Timeline` *object*.
43 : //!
44 : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
45 : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
46 : //!
47 : //! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
48 : //! referenced weakly and strongly from various places which we are now illustrating.
49 : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
50 : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
51 : //! or `Weak<Mutex<HandleInner>>`, respectively.
52 : //!
53 : //! - The `Handle` is a strong ref.
54 : //! - The `WeakHandle` is a weak ref.
55 : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
56 : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
57 : //!
58 : //! Lifetimes:
59 : //! - `WeakHandle` and `Handle`: single pagestream request.
60 : //! - `Cache`: single page service connection.
61 : //! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
62 : //!
63 : //! ## Request Handling Flow (= filling and using the `Cache``)
64 : //!
65 : //! To dispatch a request, the page service connection calls `Cache::get`.
66 : //!
67 : //! A cache miss means we consult the tenant manager for shard routing,
68 : //! resulting in an `Arc<Timeline>`. We enter its gate _once_ and store it in the the
69 : //! `Arc<Mutex<HandleInner>>>`. A weak ref is stored in the `Cache`
70 : //! and a strong ref in the `PerTimelineState`.
71 : //! A strong ref is returned wrapped in a `Handle`.
72 : //!
73 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
74 : //! and find the weak ref in the cache.
75 : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
76 : //!
77 : //! The pagestream processing is pipelined and involves a batching step.
78 : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
79 : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
80 : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
81 : //! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
82 : //!
83 : //! # Performance
84 : //!
85 : //! Remember from the introductory section:
86 : //!
87 : //! > However, we want to avoid the overhead of entering the gate for every
88 : //! > method invocation.
89 : //!
90 : //! Why do we want to avoid that?
91 : //! Because the gate is a shared location in memory and entering it involves
92 : //! bumping refcounts, which leads to cache contention if done frequently
93 : //! from multiple cores in parallel.
94 : //!
95 : //! So, we only acquire the `GateGuard` once on `Cache` miss, and wrap it in an `Arc`.
96 : //! That `Arc` is private to the `HandleInner` and hence to the connection.
97 : //! (Review the "Data Structures" section if that is unclear to you.)
98 : //!
99 : //! A `WeakHandle` is a weak ref to the `HandleInner`.
100 : //! When upgrading a `WeakHandle`, we upgrade to a strong ref to the `HandleInner` and
101 : //! further acquire an additional strong ref to the `Arc<GateGuard>` inside it.
102 : //! Again, this manipulation of ref counts is is cheap because `Arc` is private to the connection.
103 : //!
104 : //! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
105 : //! Again, this is cheap because the `Arc` is private to the connection.
106 : //!
107 : //! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
108 : //! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
109 : //! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
110 : //! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
111 : //! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
112 : //! so that we can clone it cheaply when upgrading a `WeakHandle`.
113 : //!
114 : //! # Shutdown
115 : //!
116 : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
117 : //!
118 : //! ```text
119 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
120 : //! ```
121 : //!
122 : //! Further, there is this cycle:
123 : //!
124 : //! ```text
125 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> GateGuard --keepalive--> Timeline
126 : //! ```
127 : //!
128 : //! The former cycle is a memory leak if not broken.
129 : //! The latter cycle further prevents the Timeline from shutting down
130 : //! because we certainly won't drop the Timeline while the GateGuard is alive.
131 : //! Preventing shutdown is the whole point of this handle/cache system,
132 : //! but when the Timeline needs to shut down, we need to break the cycle.
133 : //!
134 : //! The cycle is broken by either
135 : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
136 : //! - Connection shutdown (=> dropping the `Cache`).
137 : //!
138 : //! Both transition the `HandleInner` from [`HandleInner::KeepingTimelineGateOpen`] to
139 : //! [`HandleInner::ShutDown`], which drops the only long-lived strong ref to the
140 : //! `Arc<GateGuard>`.
141 : //!
142 : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
143 : //! thereby breaking the cycle.
144 : //! It also initiates draining of already existing `Handle`s by
145 : //! poisoning things so that no new `HandleInner`'s can be added
146 : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
147 : //!
148 : //! Concurrently existing / already upgraded `Handle`s will extend the
149 : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
150 : //! However, since `Handle`s are short-lived and new `Handle`s are not
151 : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
152 : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
153 : //!
154 : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
155 : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
156 : //! they will find the inner in state `HandleInner::ShutDown` state where the
157 : //! `Arc<GateGuard>` and Timeline has already been dropped.
158 : //!
159 : //! Dropping the `Cache` undoes the registration of this `Cache`'s
160 : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
161 : //! removes the strong ref to each of its `HandleInner`s
162 : //! from all the `PerTimelineState`.
163 : //!
164 : //! # Locking Rules
165 : //!
166 : //! To prevent deadlocks we:
167 : //!
168 : //! 1. Only ever hold one of the locks at a time.
169 : //! 2. Don't add more than one Drop impl that locks on the
170 : //! cycles above.
171 : //!
172 : //! As per (2), that impl is in `Drop for Cache`.
173 : //!
174 : //! # Fast Path for Shard Routing
175 : //!
176 : //! The `Cache` has a fast path for shard routing to avoid calling into
177 : //! the tenant manager for every request.
178 : //!
179 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
180 : //!
181 : //! The current implementation uses the first entry in the hash map
182 : //! to determine the `ShardParameters` and derive the correct
183 : //! `ShardIndex` for the requested key.
184 : //!
185 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
186 : //!
187 : //! If the lookup is successful and the `WeakHandle` can be upgraded,
188 : //! it's a hit.
189 : //!
190 : //! ## Cache invalidation
191 : //!
192 : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
193 : //! The only reasons why an entry in the cache can become stale are:
194 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
195 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
196 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
197 : //!
198 : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
199 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
200 : //!
201 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
202 : //! to the parent shard during a shard split. Eventually, the shard split task will
203 : //! shut down the parent => case (1).
204 :
205 : use std::collections::hash_map;
206 : use std::collections::HashMap;
207 : use std::sync::Arc;
208 : use std::sync::Mutex;
209 : use std::sync::Weak;
210 :
211 : use pageserver_api::shard::ShardIdentity;
212 : use tracing::instrument;
213 : use tracing::trace;
214 : use utils::id::TimelineId;
215 : use utils::shard::ShardIndex;
216 : use utils::shard::ShardNumber;
217 :
218 : use crate::tenant::mgr::ShardSelector;
219 :
220 : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
221 : pub(crate) trait Types: Sized + std::fmt::Debug {
222 : type TenantManagerError: Sized + std::fmt::Debug;
223 : type TenantManager: TenantManager<Self> + Sized;
224 : type Timeline: ArcTimeline<Self> + Sized;
225 : }
226 :
227 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
228 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
229 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
230 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
231 : struct CacheId(u64);
232 :
233 : impl CacheId {
234 64 : fn next() -> Self {
235 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
236 64 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
237 64 : if id == 0 {
238 0 : panic!("CacheId::new() returned 0, overflow");
239 64 : }
240 64 : Self(id)
241 64 : }
242 : }
243 :
244 : /// See module-level comment.
245 : pub(crate) struct Cache<T: Types> {
246 : id: CacheId,
247 : map: Map<T>,
248 : }
249 :
250 : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
251 :
252 : impl<T: Types> Default for Cache<T> {
253 64 : fn default() -> Self {
254 64 : Self {
255 64 : id: CacheId::next(),
256 64 : map: Default::default(),
257 64 : }
258 64 : }
259 : }
260 :
261 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
262 : pub(crate) struct ShardTimelineId {
263 : pub(crate) shard_index: ShardIndex,
264 : pub(crate) timeline_id: TimelineId,
265 : }
266 :
267 : /// See module-level comment.
268 : pub(crate) struct Handle<T: Types> {
269 : timeline: Arc<T::Timeline>,
270 : #[allow(dead_code)] // the field exists to keep the gate open
271 : gate_guard: Arc<utils::sync::gate::GateGuard>,
272 : inner: Arc<Mutex<HandleInner<T>>>,
273 : }
274 : pub(crate) struct WeakHandle<T: Types> {
275 : inner: Weak<Mutex<HandleInner<T>>>,
276 : }
277 : enum HandleInner<T: Types> {
278 : KeepingTimelineGateOpen {
279 : #[allow(dead_code)]
280 : gate_guard: Arc<utils::sync::gate::GateGuard>,
281 : timeline: Arc<T::Timeline>,
282 : },
283 : ShutDown,
284 : }
285 :
286 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
287 : ///
288 : /// See module-level comment for details.
289 : pub struct PerTimelineState<T: Types> {
290 : // None = shutting down
291 : #[allow(clippy::type_complexity)]
292 : handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
293 : }
294 :
295 : impl<T: Types> Default for PerTimelineState<T> {
296 936 : fn default() -> Self {
297 936 : Self {
298 936 : handles: Mutex::new(Some(Default::default())),
299 936 : }
300 936 : }
301 : }
302 :
303 : /// Abstract view of [`crate::tenant::mgr`], for testability.
304 : pub(crate) trait TenantManager<T: Types> {
305 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
306 : /// Errors are returned as [`GetError::TenantManager`].
307 : async fn resolve(
308 : &self,
309 : timeline_id: TimelineId,
310 : shard_selector: ShardSelector,
311 : ) -> Result<T::Timeline, T::TenantManagerError>;
312 : }
313 :
314 : /// Abstract view of an [`Arc<Timeline>`], for testability.
315 : pub(crate) trait ArcTimeline<T: Types>: Clone {
316 : fn gate(&self) -> &utils::sync::gate::Gate;
317 : fn shard_timeline_id(&self) -> ShardTimelineId;
318 : fn get_shard_identity(&self) -> &ShardIdentity;
319 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
320 : }
321 :
322 : /// Errors returned by [`Cache::get`].
323 : #[derive(Debug)]
324 : pub(crate) enum GetError<T: Types> {
325 : TenantManager(T::TenantManagerError),
326 : TimelineGateClosed,
327 : PerTimelineStateShutDown,
328 : }
329 :
330 : /// Internal type used in [`Cache::get`].
331 : enum RoutingResult<T: Types> {
332 : FastPath(Handle<T>),
333 : SlowPath(ShardTimelineId),
334 : NeedConsultTenantManager,
335 : }
336 :
337 : impl<T: Types> Cache<T> {
338 : /// See module-level comment for details.
339 : ///
340 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
341 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
342 : /// the [`Handle`] are responsible for checking these conditions
343 : /// and if so, return an error that causes the page service to
344 : /// close the connection.
345 : #[instrument(level = "trace", skip_all)]
346 : pub(crate) async fn get(
347 : &mut self,
348 : timeline_id: TimelineId,
349 : shard_selector: ShardSelector,
350 : tenant_manager: &T::TenantManager,
351 : ) -> Result<Handle<T>, GetError<T>> {
352 : // terminates because when every iteration we remove an element from the map
353 : let miss: ShardSelector = loop {
354 : let routing_state = self.shard_routing(timeline_id, shard_selector);
355 : match routing_state {
356 : RoutingResult::FastPath(handle) => return Ok(handle),
357 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
358 : Some(cached) => match cached.upgrade() {
359 : Ok(upgraded) => return Ok(upgraded),
360 : Err(HandleUpgradeError::ShutDown) => {
361 : // TODO: dedup with shard_routing()
362 : trace!("handle cache stale");
363 : self.map.remove(&key).unwrap();
364 : continue;
365 : }
366 : },
367 : None => break ShardSelector::Known(key.shard_index),
368 : },
369 : RoutingResult::NeedConsultTenantManager => break shard_selector,
370 : }
371 : };
372 : self.get_miss(timeline_id, miss, tenant_manager).await
373 : }
374 :
375 : #[inline(always)]
376 113 : fn shard_routing(
377 113 : &mut self,
378 113 : timeline_id: TimelineId,
379 113 : shard_selector: ShardSelector,
380 113 : ) -> RoutingResult<T> {
381 : loop {
382 : // terminates because when every iteration we remove an element from the map
383 124 : let Some((first_key, first_handle)) = self.map.iter().next() else {
384 76 : return RoutingResult::NeedConsultTenantManager;
385 : };
386 48 : let Ok(first_handle) = first_handle.upgrade() else {
387 : // TODO: dedup with get()
388 11 : trace!("handle cache stale");
389 11 : let first_key_owned = *first_key;
390 11 : self.map.remove(&first_key_owned).unwrap();
391 11 : continue;
392 : };
393 :
394 37 : let first_handle_shard_identity = first_handle.get_shard_identity();
395 37 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
396 37 : shard_number: shard_num,
397 37 : shard_count: first_handle_shard_identity.count,
398 37 : };
399 :
400 37 : let need_idx = match shard_selector {
401 37 : ShardSelector::Page(key) => {
402 37 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
403 : }
404 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
405 0 : ShardSelector::Known(shard_idx) => shard_idx,
406 : };
407 37 : let need_shard_timeline_id = ShardTimelineId {
408 37 : shard_index: need_idx,
409 37 : timeline_id,
410 37 : };
411 37 : let first_handle_shard_timeline_id = ShardTimelineId {
412 37 : shard_index: first_handle_shard_identity.shard_index(),
413 37 : timeline_id: first_handle.shard_timeline_id().timeline_id,
414 37 : };
415 37 :
416 37 : if need_shard_timeline_id == first_handle_shard_timeline_id {
417 24 : return RoutingResult::FastPath(first_handle);
418 : } else {
419 13 : return RoutingResult::SlowPath(need_shard_timeline_id);
420 : }
421 : }
422 113 : }
423 :
424 : #[instrument(level = "trace", skip_all)]
425 : #[inline(always)]
426 : async fn get_miss(
427 : &mut self,
428 : timeline_id: TimelineId,
429 : shard_selector: ShardSelector,
430 : tenant_manager: &T::TenantManager,
431 : ) -> Result<Handle<T>, GetError<T>> {
432 : match tenant_manager.resolve(timeline_id, shard_selector).await {
433 : Ok(timeline) => {
434 : let key = timeline.shard_timeline_id();
435 : match &shard_selector {
436 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
437 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
438 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
439 : }
440 :
441 : trace!("creating new HandleInner");
442 : let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
443 : gate_guard: Arc::new(
444 : // this enter() is expensive in production code because
445 : // it hits the global Arc<Timeline>::gate refcounts
446 : match timeline.gate().enter() {
447 : Ok(guard) => guard,
448 : Err(_) => {
449 : return Err(GetError::TimelineGateClosed);
450 : }
451 : },
452 : ),
453 : // this clone is expensive in production code because
454 : // it hits the global Arc<Timeline>::clone refcounts
455 : timeline: Arc::new(timeline.clone()),
456 : }));
457 : let handle_weak = WeakHandle {
458 : inner: Arc::downgrade(&handle_inner_arc),
459 : };
460 : let handle = handle_weak
461 : .upgrade()
462 : .ok()
463 : .expect("we just created it and it's not linked anywhere yet");
464 : {
465 : let mut lock_guard = timeline
466 : .per_timeline_state()
467 : .handles
468 : .lock()
469 : .expect("mutex poisoned");
470 : match &mut *lock_guard {
471 : Some(per_timeline_state) => {
472 : let replaced =
473 : per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
474 : assert!(replaced.is_none(), "some earlier code left a stale handle");
475 : match self.map.entry(key) {
476 : hash_map::Entry::Occupied(_o) => {
477 : // This cannot not happen because
478 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
479 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
480 : // while we were waiting for the tenant manager.
481 : unreachable!()
482 : }
483 : hash_map::Entry::Vacant(v) => {
484 : v.insert(handle_weak);
485 : }
486 : }
487 : }
488 : None => {
489 : return Err(GetError::PerTimelineStateShutDown);
490 : }
491 : }
492 : }
493 : Ok(handle)
494 : }
495 : Err(e) => Err(GetError::TenantManager(e)),
496 : }
497 : }
498 : }
499 :
500 : pub(crate) enum HandleUpgradeError {
501 : ShutDown,
502 : }
503 :
504 : impl<T: Types> WeakHandle<T> {
505 141 : pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
506 141 : let Some(inner) = Weak::upgrade(&self.inner) else {
507 8 : return Err(HandleUpgradeError::ShutDown);
508 : };
509 133 : let lock_guard = inner.lock().expect("poisoned");
510 133 : match &*lock_guard {
511 : HandleInner::KeepingTimelineGateOpen {
512 121 : timeline,
513 121 : gate_guard,
514 121 : } => {
515 121 : let gate_guard = Arc::clone(gate_guard);
516 121 : let timeline = Arc::clone(timeline);
517 121 : drop(lock_guard);
518 121 : Ok(Handle {
519 121 : timeline,
520 121 : gate_guard,
521 121 : inner,
522 121 : })
523 : }
524 12 : HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
525 : }
526 141 : }
527 :
528 0 : pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
529 0 : Weak::ptr_eq(&self.inner, &other.inner)
530 0 : }
531 : }
532 :
533 : impl<T: Types> std::ops::Deref for Handle<T> {
534 : type Target = T::Timeline;
535 206 : fn deref(&self) -> &Self::Target {
536 206 : &self.timeline
537 206 : }
538 : }
539 :
540 : impl<T: Types> Handle<T> {
541 4 : pub(crate) fn downgrade(&self) -> WeakHandle<T> {
542 4 : WeakHandle {
543 4 : inner: Arc::downgrade(&self.inner),
544 4 : }
545 4 : }
546 : }
547 :
548 : impl<T: Types> PerTimelineState<T> {
549 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
550 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
551 : /// Even if [`TenantManager::resolve`] would still resolve to it.
552 : ///
553 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
554 : /// That's ok because they're short-lived. See module-level comment for details.
555 : #[instrument(level = "trace", skip_all)]
556 : pub(super) fn shutdown(&self) {
557 : let handles = self
558 : .handles
559 : .lock()
560 : .expect("mutex poisoned")
561 : // NB: this .take() sets locked to None.
562 : // That's what makes future `Cache::get` misses fail.
563 : // Cache hits are taken care of below.
564 : .take();
565 : let Some(handles) = handles else {
566 : trace!("already shut down");
567 : return;
568 : };
569 : for handle_inner_arc in handles.values() {
570 : // Make hits fail.
571 : let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
572 : lock_guard.shutdown();
573 : }
574 : drop(handles);
575 : }
576 : }
577 :
578 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
579 : impl<T: Types> Drop for Cache<T> {
580 64 : fn drop(&mut self) {
581 : for (
582 : _,
583 : WeakHandle {
584 64 : inner: handle_inner_weak,
585 : },
586 64 : ) in self.map.drain()
587 : {
588 64 : let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
589 8 : continue;
590 : };
591 56 : let Some(handle_timeline) = handle_inner_arc
592 56 : // locking rules: drop lock before acquiring other lock below
593 56 : .lock()
594 56 : .expect("poisoned")
595 56 : .shutdown()
596 : else {
597 : // Concurrent PerTimelineState::shutdown.
598 0 : continue;
599 : };
600 : // Clean up per_timeline_state so the HandleInner allocation can be dropped.
601 56 : let per_timeline_state = handle_timeline.per_timeline_state();
602 56 : let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
603 56 : let Some(handles) = &mut *handles_lock_guard else {
604 0 : continue;
605 : };
606 56 : let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
607 : // Concurrent PerTimelineState::shutdown.
608 0 : continue;
609 : };
610 56 : drop(handles_lock_guard); // locking rules!
611 56 : assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
612 : }
613 64 : }
614 : }
615 :
616 : impl<T: Types> HandleInner<T> {
617 76 : fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
618 76 : match std::mem::replace(self, HandleInner::ShutDown) {
619 76 : HandleInner::KeepingTimelineGateOpen { timeline, .. } => Some(timeline),
620 : HandleInner::ShutDown => {
621 : // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
622 : // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
623 : // the handle lock at the same time.
624 0 : None
625 : }
626 : }
627 76 : }
628 : }
629 :
630 : #[cfg(test)]
631 : mod tests {
632 : use std::sync::Weak;
633 :
634 : use pageserver_api::{
635 : key::{rel_block_to_key, Key, DBDIR_KEY},
636 : models::ShardParameters,
637 : reltag::RelTag,
638 : shard::ShardStripeSize,
639 : };
640 : use utils::shard::ShardCount;
641 :
642 : use super::*;
643 :
644 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
645 :
646 : #[derive(Debug)]
647 : struct TestTypes;
648 : impl Types for TestTypes {
649 : type TenantManagerError = anyhow::Error;
650 : type TenantManager = StubManager;
651 : type Timeline = Arc<StubTimeline>;
652 : }
653 :
654 : struct StubManager {
655 : shards: Vec<Arc<StubTimeline>>,
656 : }
657 :
658 : struct StubTimeline {
659 : gate: utils::sync::gate::Gate,
660 : id: TimelineId,
661 : shard: ShardIdentity,
662 : per_timeline_state: PerTimelineState<TestTypes>,
663 : myself: Weak<StubTimeline>,
664 : }
665 :
666 : impl StubTimeline {
667 44 : fn getpage(&self) {
668 44 : // do nothing
669 44 : }
670 : }
671 :
672 : impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
673 84 : fn gate(&self) -> &utils::sync::gate::Gate {
674 84 : &self.gate
675 84 : }
676 :
677 121 : fn shard_timeline_id(&self) -> ShardTimelineId {
678 121 : ShardTimelineId {
679 121 : shard_index: self.shard.shard_index(),
680 121 : timeline_id: self.id,
681 121 : }
682 121 : }
683 :
684 37 : fn get_shard_identity(&self) -> &ShardIdentity {
685 37 : &self.shard
686 37 : }
687 :
688 136 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
689 136 : &self.per_timeline_state
690 136 : }
691 : }
692 :
693 : impl TenantManager<TestTypes> for StubManager {
694 92 : async fn resolve(
695 92 : &self,
696 92 : timeline_id: TimelineId,
697 92 : shard_selector: ShardSelector,
698 92 : ) -> anyhow::Result<Arc<StubTimeline>> {
699 108 : for timeline in &self.shards {
700 100 : if timeline.id == timeline_id {
701 0 : match &shard_selector {
702 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
703 0 : return Ok(Arc::clone(timeline));
704 : }
705 0 : ShardSelector::Zero => continue,
706 76 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
707 76 : return Ok(Arc::clone(timeline));
708 : }
709 0 : ShardSelector::Page(_) => continue,
710 12 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
711 8 : return Ok(Arc::clone(timeline));
712 : }
713 4 : ShardSelector::Known(_) => continue,
714 : }
715 12 : }
716 : }
717 8 : anyhow::bail!("not found")
718 92 : }
719 : }
720 :
721 : #[tokio::test(start_paused = true)]
722 4 : async fn test_timeline_shutdown() {
723 4 : crate::tenant::harness::setup_logging();
724 4 :
725 4 : let timeline_id = TimelineId::generate();
726 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
727 4 : gate: Default::default(),
728 4 : id: timeline_id,
729 4 : shard: ShardIdentity::unsharded(),
730 4 : per_timeline_state: PerTimelineState::default(),
731 4 : myself: myself.clone(),
732 4 : });
733 4 : let mgr = StubManager {
734 4 : shards: vec![shard0.clone()],
735 4 : };
736 4 : let key = DBDIR_KEY;
737 4 :
738 4 : let mut cache = Cache::<TestTypes>::default();
739 4 :
740 4 : //
741 4 : // fill the cache
742 4 : //
743 4 : let handle: Handle<_> = cache
744 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
745 4 : .await
746 4 : .expect("we have the timeline");
747 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
748 4 : assert_eq!(cache.map.len(), 1);
749 4 : drop(handle);
750 4 :
751 4 : //
752 4 : // demonstrate that Handle holds up gate closure
753 4 : // but shutdown prevents new handles from being handed out
754 4 : //
755 4 :
756 4 : tokio::select! {
757 4 : _ = shard0.gate.close() => {
758 4 : panic!("cache and per-timeline handler state keep cache open");
759 4 : }
760 4 : _ = tokio::time::sleep(FOREVER) => {
761 4 : // NB: first poll of close() makes it enter closing state
762 4 : }
763 4 : }
764 4 :
765 4 : let handle = cache
766 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
767 4 : .await
768 4 : .expect("we have the timeline");
769 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
770 4 :
771 4 : // SHUTDOWN
772 4 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
773 4 :
774 4 : assert_eq!(
775 4 : cache.map.len(),
776 4 : 1,
777 4 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
778 4 : );
779 4 :
780 4 : // this handle is perfectly usable
781 4 : handle.getpage();
782 4 :
783 4 : cache
784 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
785 4 : .await
786 4 : .err()
787 4 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
788 4 : assert_eq!(
789 4 : cache.map.len(),
790 4 : 0,
791 4 : "first access after shutdown cleans up the Weak's from the cache"
792 4 : );
793 4 :
794 4 : tokio::select! {
795 4 : _ = shard0.gate.close() => {
796 4 : panic!("handle is keeping gate open");
797 4 : }
798 4 : _ = tokio::time::sleep(FOREVER) => { }
799 4 : }
800 4 :
801 4 : drop(handle);
802 4 :
803 4 : // closing gate succeeds after dropping handle
804 4 : tokio::select! {
805 4 : _ = shard0.gate.close() => { }
806 4 : _ = tokio::time::sleep(FOREVER) => {
807 4 : panic!("handle is dropped, no other gate holders exist")
808 4 : }
809 4 : }
810 4 :
811 4 : // map gets cleaned on next lookup
812 4 : cache
813 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
814 4 : .await
815 4 : .err()
816 4 : .expect("documented behavior: can't get new handle after shutdown");
817 4 : assert_eq!(cache.map.len(), 0);
818 4 :
819 4 : // ensure all refs to shard0 are gone and we're not leaking anything
820 4 : drop(shard0);
821 4 : drop(mgr);
822 4 : }
823 :
824 : #[tokio::test]
825 4 : async fn test_multiple_timelines_and_deletion() {
826 4 : crate::tenant::harness::setup_logging();
827 4 :
828 4 : let timeline_a = TimelineId::generate();
829 4 : let timeline_b = TimelineId::generate();
830 4 : assert_ne!(timeline_a, timeline_b);
831 4 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
832 4 : gate: Default::default(),
833 4 : id: timeline_a,
834 4 : shard: ShardIdentity::unsharded(),
835 4 : per_timeline_state: PerTimelineState::default(),
836 4 : myself: myself.clone(),
837 4 : });
838 4 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
839 4 : gate: Default::default(),
840 4 : id: timeline_b,
841 4 : shard: ShardIdentity::unsharded(),
842 4 : per_timeline_state: PerTimelineState::default(),
843 4 : myself: myself.clone(),
844 4 : });
845 4 : let mut mgr = StubManager {
846 4 : shards: vec![timeline_a.clone(), timeline_b.clone()],
847 4 : };
848 4 : let key = DBDIR_KEY;
849 4 :
850 4 : let mut cache = Cache::<TestTypes>::default();
851 4 :
852 4 : cache
853 4 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
854 4 : .await
855 4 : .expect("we have it");
856 4 : cache
857 4 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
858 4 : .await
859 4 : .expect("we have it");
860 4 : assert_eq!(cache.map.len(), 2);
861 4 :
862 4 : // delete timeline A
863 4 : timeline_a.per_timeline_state.shutdown();
864 8 : mgr.shards.retain(|t| t.id != timeline_a.id);
865 4 : assert!(
866 4 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
867 4 : .await
868 4 : .is_err(),
869 4 : "broken StubManager implementation"
870 4 : );
871 4 :
872 4 : assert_eq!(
873 4 : cache.map.len(),
874 4 : 2,
875 4 : "cache still has a Weak handle to Timeline A"
876 4 : );
877 4 : cache
878 4 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
879 4 : .await
880 4 : .err()
881 4 : .expect("documented behavior: can't get new handle after shutdown");
882 4 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
883 4 :
884 4 : cache
885 4 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
886 4 : .await
887 4 : .expect("we still have it");
888 4 : }
889 :
890 28 : fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
891 28 : rel_block_to_key(
892 28 : RelTag {
893 28 : spcnode: 1663,
894 28 : dbnode: 208101,
895 28 : relnode: 2620,
896 28 : forknum: 0,
897 28 : },
898 28 : shard.0 as u32 * params.stripe_size.0,
899 28 : )
900 28 : }
901 :
902 : #[tokio::test(start_paused = true)]
903 4 : async fn test_shard_split() {
904 4 : crate::tenant::harness::setup_logging();
905 4 : let timeline_id = TimelineId::generate();
906 4 : let parent = Arc::new_cyclic(|myself| StubTimeline {
907 4 : gate: Default::default(),
908 4 : id: timeline_id,
909 4 : shard: ShardIdentity::unsharded(),
910 4 : per_timeline_state: PerTimelineState::default(),
911 4 : myself: myself.clone(),
912 4 : });
913 4 : let child_params = ShardParameters {
914 4 : count: ShardCount(2),
915 4 : stripe_size: ShardStripeSize::default(),
916 4 : };
917 4 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
918 4 : gate: Default::default(),
919 4 : id: timeline_id,
920 4 : shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
921 4 : per_timeline_state: PerTimelineState::default(),
922 4 : myself: myself.clone(),
923 4 : });
924 4 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
925 4 : gate: Default::default(),
926 4 : id: timeline_id,
927 4 : shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
928 4 : per_timeline_state: PerTimelineState::default(),
929 4 : myself: myself.clone(),
930 4 : });
931 4 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
932 4 :
933 4 : let mut cache = Cache::<TestTypes>::default();
934 4 :
935 4 : // fill the cache with the parent
936 12 : for i in 0..2 {
937 8 : let handle = cache
938 8 : .get(
939 8 : timeline_id,
940 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
941 8 : &StubManager {
942 8 : shards: vec![parent.clone()],
943 8 : },
944 8 : )
945 8 : .await
946 8 : .expect("we have it");
947 8 : assert!(
948 8 : Weak::ptr_eq(&handle.myself, &parent.myself),
949 4 : "mgr returns parent first"
950 4 : );
951 8 : drop(handle);
952 4 : }
953 4 :
954 4 : //
955 4 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
956 4 : //
957 4 :
958 4 : // while we haven't shut down the parent, the cache will return the cached parent, even
959 4 : // if the tenant manager returns the child
960 12 : for i in 0..2 {
961 8 : let handle = cache
962 8 : .get(
963 8 : timeline_id,
964 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
965 8 : &StubManager {
966 8 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
967 8 : },
968 8 : )
969 8 : .await
970 8 : .expect("we have it");
971 8 : assert!(
972 8 : Weak::ptr_eq(&handle.myself, &parent.myself),
973 4 : "mgr returns parent"
974 4 : );
975 8 : drop(handle);
976 4 : }
977 4 :
978 4 : let parent_handle = cache
979 4 : .get(
980 4 : timeline_id,
981 4 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
982 4 : &StubManager {
983 4 : shards: vec![parent.clone()],
984 4 : },
985 4 : )
986 4 : .await
987 4 : .expect("we have it");
988 4 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
989 4 :
990 4 : // invalidate the cache
991 4 : parent.per_timeline_state.shutdown();
992 4 :
993 4 : // the cache will now return the child, even though the parent handle still exists
994 12 : for i in 0..2 {
995 8 : let handle = cache
996 8 : .get(
997 8 : timeline_id,
998 8 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
999 8 : &StubManager {
1000 8 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
1001 8 : },
1002 8 : )
1003 8 : .await
1004 8 : .expect("we have it");
1005 8 : assert!(
1006 8 : Weak::ptr_eq(
1007 8 : &handle.myself,
1008 8 : &child_shards_by_shard_number[i as usize].myself
1009 8 : ),
1010 4 : "mgr returns child"
1011 4 : );
1012 8 : drop(handle);
1013 4 : }
1014 4 :
1015 4 : // all the while the parent handle kept the parent gate open
1016 4 : tokio::select! {
1017 4 : _ = parent_handle.gate.close() => {
1018 4 : panic!("parent handle is keeping gate open");
1019 4 : }
1020 4 : _ = tokio::time::sleep(FOREVER) => { }
1021 4 : }
1022 4 : drop(parent_handle);
1023 4 : tokio::select! {
1024 4 : _ = parent.gate.close() => { }
1025 4 : _ = tokio::time::sleep(FOREVER) => {
1026 4 : panic!("parent handle is dropped, no other gate holders exist")
1027 4 : }
1028 4 : }
1029 4 : }
1030 :
1031 : #[tokio::test(start_paused = true)]
1032 4 : async fn test_connection_handler_exit() {
1033 4 : crate::tenant::harness::setup_logging();
1034 4 : let timeline_id = TimelineId::generate();
1035 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1036 4 : gate: Default::default(),
1037 4 : id: timeline_id,
1038 4 : shard: ShardIdentity::unsharded(),
1039 4 : per_timeline_state: PerTimelineState::default(),
1040 4 : myself: myself.clone(),
1041 4 : });
1042 4 : let mgr = StubManager {
1043 4 : shards: vec![shard0.clone()],
1044 4 : };
1045 4 : let key = DBDIR_KEY;
1046 4 :
1047 4 : // Simulate 10 connections that's opened, used, and closed
1048 4 : let mut used_handles = vec![];
1049 44 : for _ in 0..10 {
1050 40 : let mut cache = Cache::<TestTypes>::default();
1051 40 : let handle = {
1052 40 : let handle = cache
1053 40 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1054 40 : .await
1055 40 : .expect("we have the timeline");
1056 40 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1057 40 : handle
1058 40 : };
1059 40 : handle.getpage();
1060 40 : used_handles.push(Arc::downgrade(&handle.timeline));
1061 4 : }
1062 4 :
1063 4 : // No handles exist, thus gates are closed and don't require shutdown.
1064 4 : // Thus the gate should close immediately, even without shutdown.
1065 4 : tokio::select! {
1066 4 : _ = shard0.gate.close() => { }
1067 4 : _ = tokio::time::sleep(FOREVER) => {
1068 4 : panic!("handle is dropped, no other gate holders exist")
1069 4 : }
1070 4 : }
1071 4 : }
1072 :
1073 : #[tokio::test(start_paused = true)]
1074 4 : async fn test_weak_handles() {
1075 4 : crate::tenant::harness::setup_logging();
1076 4 : let timeline_id = TimelineId::generate();
1077 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1078 4 : gate: Default::default(),
1079 4 : id: timeline_id,
1080 4 : shard: ShardIdentity::unsharded(),
1081 4 : per_timeline_state: PerTimelineState::default(),
1082 4 : myself: myself.clone(),
1083 4 : });
1084 4 : let mgr = StubManager {
1085 4 : shards: vec![shard0.clone()],
1086 4 : };
1087 4 :
1088 4 : let refcount_start = Arc::strong_count(&shard0);
1089 4 :
1090 4 : let key = DBDIR_KEY;
1091 4 :
1092 4 : let mut cache = Cache::<TestTypes>::default();
1093 4 :
1094 4 : let handle = cache
1095 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1096 4 : .await
1097 4 : .expect("we have the timeline");
1098 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1099 4 :
1100 4 : let weak_handle = handle.downgrade();
1101 4 :
1102 4 : drop(handle);
1103 4 :
1104 4 : let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
1105 4 :
1106 4 : // Start shutdown
1107 4 : shard0.per_timeline_state.shutdown();
1108 4 :
1109 4 : // Upgrades during shutdown don't work, even if upgraded_handle exists.
1110 4 : weak_handle
1111 4 : .upgrade()
1112 4 : .err()
1113 4 : .expect("can't upgrade weak handle as soon as shutdown started");
1114 4 :
1115 4 : // But upgraded_handle is still alive, so the gate won't close.
1116 4 : tokio::select! {
1117 4 : _ = shard0.gate.close() => {
1118 4 : panic!("handle is keeping gate open");
1119 4 : }
1120 4 : _ = tokio::time::sleep(FOREVER) => { }
1121 4 : }
1122 4 :
1123 4 : // Drop the last handle.
1124 4 : drop(upgraded_handle);
1125 4 :
1126 4 : // The gate should close now, despite there still being a weak_handle.
1127 4 : tokio::select! {
1128 4 : _ = shard0.gate.close() => { }
1129 4 : _ = tokio::time::sleep(FOREVER) => {
1130 4 : panic!("only strong handle is dropped and we shut down per-timeline-state")
1131 4 : }
1132 4 : }
1133 4 :
1134 4 : // The weak handle still can't be upgraded.
1135 4 : weak_handle
1136 4 : .upgrade()
1137 4 : .err()
1138 4 : .expect("still shouldn't be able to upgrade the weak handle");
1139 4 :
1140 4 : // There should be no strong references to the timeline object except the one on "stack".
1141 4 : assert_eq!(Arc::strong_count(&shard0), refcount_start);
1142 4 : }
1143 :
1144 : #[tokio::test(start_paused = true)]
1145 4 : async fn test_reference_cycle_broken_when_cache_is_dropped() {
1146 4 : crate::tenant::harness::setup_logging();
1147 4 : let timeline_id = TimelineId::generate();
1148 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1149 4 : gate: Default::default(),
1150 4 : id: timeline_id,
1151 4 : shard: ShardIdentity::unsharded(),
1152 4 : per_timeline_state: PerTimelineState::default(),
1153 4 : myself: myself.clone(),
1154 4 : });
1155 4 : let mgr = StubManager {
1156 4 : shards: vec![shard0.clone()],
1157 4 : };
1158 4 : let key = DBDIR_KEY;
1159 4 :
1160 4 : let mut cache = Cache::<TestTypes>::default();
1161 4 :
1162 4 : // helper to check if a handle is referenced by per_timeline_state
1163 8 : let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
1164 8 : let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
1165 8 : let per_timeline_state = per_timeline_state.as_ref().unwrap();
1166 8 : per_timeline_state
1167 8 : .values()
1168 8 : .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
1169 8 : };
1170 4 :
1171 4 : // Fill the cache.
1172 4 : let handle = cache
1173 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1174 4 : .await
1175 4 : .expect("we have the timeline");
1176 4 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1177 4 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1178 4 : assert!(
1179 4 : per_timeline_state_refs_handle(&handle_inner_weak),
1180 4 : "we still hold `handle` _and_ haven't dropped `cache` yet"
1181 4 : );
1182 4 :
1183 4 : // Drop the cache.
1184 4 : drop(cache);
1185 4 :
1186 4 : assert!(
1187 4 : !(per_timeline_state_refs_handle(&handle_inner_weak)),
1188 4 : "nothing should reference the handle allocation anymore"
1189 4 : );
1190 4 : assert!(
1191 4 : Weak::upgrade(&handle_inner_weak).is_some(),
1192 4 : "the local `handle` still keeps the allocation alive"
1193 4 : );
1194 4 : // but obviously the cache is gone so no new allocations can be handed out.
1195 4 :
1196 4 : // Drop handle.
1197 4 : drop(handle);
1198 4 : assert!(
1199 4 : Weak::upgrade(&handle_inner_weak).is_none(),
1200 4 : "the local `handle` is dropped, so the allocation should be dropped by now"
1201 4 : );
1202 4 : }
1203 :
1204 : #[tokio::test(start_paused = true)]
1205 4 : async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
1206 4 : crate::tenant::harness::setup_logging();
1207 4 : let timeline_id = TimelineId::generate();
1208 4 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1209 4 : gate: Default::default(),
1210 4 : id: timeline_id,
1211 4 : shard: ShardIdentity::unsharded(),
1212 4 : per_timeline_state: PerTimelineState::default(),
1213 4 : myself: myself.clone(),
1214 4 : });
1215 4 : let mgr = StubManager {
1216 4 : shards: vec![shard0.clone()],
1217 4 : };
1218 4 : let key = DBDIR_KEY;
1219 4 :
1220 4 : let mut cache = Cache::<TestTypes>::default();
1221 4 : let handle = cache
1222 4 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1223 4 : .await
1224 4 : .expect("we have the timeline");
1225 4 : // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
1226 4 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1227 4 :
1228 4 : // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
1229 4 : drop(handle);
1230 4 : assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
1231 4 :
1232 4 : // Shutdown the per_timeline_state.
1233 4 : shard0.per_timeline_state.shutdown();
1234 4 : assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
1235 4 :
1236 4 : // cache only contains Weak's, so, it can outlive the per_timeline_state without
1237 4 : // Drop explicitly solely to make this point.
1238 4 : drop(cache);
1239 4 : }
1240 : }
|