Line data Source code
1 : //! A cache for [`crate::tenant::mgr`]+`Tenant::get_timeline`+`Timeline::gate.enter()`.
2 : //!
3 : //! # Motivation
4 : //!
5 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
6 : //!
7 : //! Without sharding, there is a single Timeline object to which we dispatch
8 : //! all requests. For example, a getpage request gets dispatched to the
9 : //! Timeline::get method of the Timeline object that represents the
10 : //! (tenant,timeline) of that connection.
11 : //!
12 : //! With sharding, for each request that comes in on the connection,
13 : //! we first have to perform shard routing based on the requested key (=~ page number).
14 : //! The result of shard routing is a Timeline object.
15 : //! We then dispatch the request to that Timeline object.
16 : //!
17 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
18 : //! we hold the Timeline gate open while we're invoking the method on the
19 : //! Timeline object.
20 : //!
21 : //! We want to avoid the overhead of doing, for each incoming request,
22 : //! - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
23 : //! - cloning the `Arc<Timeline>` out of the tenant manager so we can
24 : //! release the mgr rwlock before doing any request processing work
25 : //! - re-entering the Timeline gate for each Timeline method invocation.
26 : //!
27 : //! Regardless of how we accomplish the above, it should not
28 : //! prevent the Timeline from shutting down promptly.
29 : //!
30 : //!
31 : //! # Design
32 : //!
33 : //! ## Data Structures
34 : //!
35 : //! There are two concepts expressed as associated types in the `Types` trait:
36 : //! - `TenantManager`: the thing that performs the expensive work. It produces
37 : //! a `Timeline` object, which is the other associated type.
38 : //! - `Timeline`: the item that we cache for fast (TenantTimelineId,ShardSelector) lookup.
39 : //!
40 : //! There are three user-facing data structures exposed by this module:
41 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
42 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
43 : //! - `Handle`: a smart pointer that derefs to the Types::Timeline.
44 : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
45 : //! trying to ugprade back to a `Handle`. If successful, a re-upgraded Handle will always
46 : //! point to the same cached `Types::Timeline`. Upgrades never invoke the `TenantManager`.
47 : //!
48 : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
49 : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
50 : //!
51 : //! The `HandleInner` is allocated as a `Arc<Mutex<HandleInner>>` and
52 : //! referenced weakly and strongly from various places which we are now illustrating.
53 : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
54 : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
55 : //! or `Weak<Mutex<HandleInner>>`, respectively.
56 : //!
57 : //! - The `Handle` is a strong ref.
58 : //! - The `WeakHandle` is a weak ref.
59 : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
60 : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
61 : //!
62 : //! Lifetimes:
63 : //! - `WeakHandle` and `Handle`: single pagestream request.
64 : //! - `Cache`: single page service connection.
65 : //! - `PerTimelineState`: lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
66 : //!
67 : //! ## Request Handling Flow (= filling and using the `Cache``)
68 : //!
69 : //! To dispatch a request, the page service connection calls `Cache::get`.
70 : //!
71 : //! A cache miss means we call Types::TenantManager::resolve for shard routing,
72 : //! cloning the `Arc<Timeline>` out of it, and entering the gate. The result of
73 : //! resolve() is the object we want to cache, and return `Handle`s to for subseqent `Cache::get` calls.
74 : //!
75 : //! We wrap the object returned from resolve() in an `Arc` and store that inside the
76 : //! `Arc<Mutex<HandleInner>>>`. A weak ref to the HandleInner is stored in the `Cache`
77 : //! and a strong ref in the `PerTimelineState`.
78 : //! Another strong ref is returned wrapped in a `Handle`.
79 : //!
80 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
81 : //! and find the weak ref in the cache.
82 : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
83 : //!
84 : //! The pagestream processing is pipelined and involves a batching step.
85 : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
86 : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
87 : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
88 : //! It then drops the `Handle`, and thus the `Arc<Mutex<HandleInner>>` inside it.
89 : //!
90 : //! # Performance
91 : //!
92 : //! Remember from the introductory section:
93 : //!
94 : //! > We want to avoid the overhead of doing, for each incoming request,
95 : //! > - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
96 : //! > - cloning the `Arc<Timeline>` out of the tenant manager so we can
97 : //! > release the mgr rwlock before doing any request processing work
98 : //! > - re-entering the Timeline gate for each Timeline method invocation.
99 : //!
100 : //! All of these boil down to some state that is either globally shared among all shards
101 : //! or state shared among all tasks that serve a particular timeline.
102 : //! It is either protected by RwLock or manipulated via atomics.
103 : //! Even atomics are costly when shared across multiple cores.
104 : //! So, we want to avoid any permanent need for coordination between page_service tasks.
105 : //!
106 : //! The solution is to add indirection: we wrap the Types::Timeline object that is
107 : //! returned by Types::TenantManager into an Arc that is rivate to the `HandleInner`
108 : //! and hence to the single Cache / page_service connection.
109 : //! (Review the "Data Structures" section if that is unclear to you.)
110 : //!
111 : //!
112 : //! When upgrading a `WeakHandle`, we upgrade its weak to a strong ref (of the `Mutex<HandleInner>`),
113 : //! lock the mutex, take out a clone of the `Arc<Types::Timeline>`, and drop the Mutex.
114 : //! The Mutex is not contended because it is private to the connection.
115 : //! And again, the `Arc<Types::Timeline>` clone is cheap because that wrapper
116 : //! Arc's refcounts are private to the connection.
117 : //!
118 : //! Downgrading drops these two Arcs, which again, manipulates refcounts that are private to the connection.
119 : //!
120 : //!
121 : //! # Shutdown
122 : //!
123 : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
124 : //!
125 : //! ```text
126 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> Timeline
127 : //! ```
128 : //!
129 : //! Further, there is this cycle:
130 : //!
131 : //! ```text
132 : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> GateGuard --keepalive--> Timeline
133 : //! ```
134 : //!
135 : //! The former cycle is a memory leak if not broken.
136 : //! The latter cycle further prevents the Timeline from shutting down
137 : //! because we certainly won't drop the Timeline while the GateGuard is alive.
138 : //! Preventing shutdown is the whole point of this handle/cache system,
139 : //! but when the Timeline needs to shut down, we need to break the cycle.
140 : //!
141 : //! The cycle is broken by either
142 : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
143 : //! - Connection shutdown (=> dropping the `Cache`).
144 : //!
145 : //! Both transition the `HandleInner` from [`HandleInner::Open`] to
146 : //! [`HandleInner::ShutDown`], which drops the only long-lived
147 : //! `Arc<Types::Timeline>`. Once the last short-lived Arc<Types::Timeline>
148 : //! is dropped, the `Types::Timeline` gets dropped and thereby
149 : //! the `GateGuard` and the `Arc<Timeline>` that it stores,
150 : //! thereby breaking both cycles.
151 : //!
152 : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
153 : //! thereby breaking the cycle.
154 : //! It also initiates draining of already existing `Handle`s by
155 : //! poisoning things so that no new `HandleInner`'s can be added
156 : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
157 : //!
158 : //! Concurrently existing / already upgraded `Handle`s will extend the
159 : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
160 : //! However, since `Handle`s are short-lived and new `Handle`s are not
161 : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
162 : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
163 : //!
164 : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
165 : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
166 : //! they will find the inner in state `HandleInner::ShutDown` state where the
167 : //! `Arc<GateGuard>` and Timeline has already been dropped.
168 : //!
169 : //! Dropping the `Cache` undoes the registration of this `Cache`'s
170 : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
171 : //! removes the strong ref to each of its `HandleInner`s
172 : //! from all the `PerTimelineState`.
173 : //!
174 : //! # Locking Rules
175 : //!
176 : //! To prevent deadlocks we:
177 : //!
178 : //! 1. Only ever hold one of the locks at a time.
179 : //! 2. Don't add more than one Drop impl that locks on the
180 : //! cycles above.
181 : //!
182 : //! As per (2), that impl is in `Drop for Cache`.
183 : //!
184 : //! # Fast Path for Shard Routing
185 : //!
186 : //! The `Cache` has a fast path for shard routing to avoid calling into
187 : //! the tenant manager for every request.
188 : //!
189 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
190 : //!
191 : //! The current implementation uses the first entry in the hash map
192 : //! to determine the `ShardParameters` and derive the correct
193 : //! `ShardIndex` for the requested key.
194 : //!
195 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
196 : //!
197 : //! If the lookup is successful and the `WeakHandle` can be upgraded,
198 : //! it's a hit.
199 : //!
200 : //! ## Cache invalidation
201 : //!
202 : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
203 : //! The only reasons why an entry in the cache can become stale are:
204 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
205 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
206 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
207 : //!
208 : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
209 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
210 : //!
211 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
212 : //! to the parent shard during a shard split. Eventually, the shard split task will
213 : //! shut down the parent => case (1).
214 :
215 : use std::collections::HashMap;
216 : use std::collections::hash_map;
217 : use std::sync::Arc;
218 : use std::sync::Mutex;
219 : use std::sync::Weak;
220 : use std::time::Duration;
221 :
222 : use pageserver_api::shard::ShardIdentity;
223 : use tracing::{instrument, trace};
224 : use utils::id::TimelineId;
225 : use utils::shard::{ShardIndex, ShardNumber};
226 :
227 : use crate::page_service::GetActiveTimelineError;
228 : use crate::tenant::GetTimelineError;
229 : use crate::tenant::mgr::{GetActiveTenantError, ShardSelector};
230 :
231 : pub(crate) trait Types: Sized {
232 : type TenantManager: TenantManager<Self> + Sized;
233 : type Timeline: Timeline<Self> + Sized;
234 : }
235 :
236 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
237 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
238 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
239 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
240 : struct CacheId(u64);
241 :
242 : impl CacheId {
243 16 : fn next() -> Self {
244 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
245 16 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
246 16 : if id == 0 {
247 0 : panic!("CacheId::new() returned 0, overflow");
248 16 : }
249 16 : Self(id)
250 16 : }
251 : }
252 :
253 : /// See module-level comment.
254 : pub(crate) struct Cache<T: Types> {
255 : id: CacheId,
256 : map: Map<T>,
257 : }
258 :
259 : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
260 :
261 : impl<T: Types> Default for Cache<T> {
262 16 : fn default() -> Self {
263 16 : Self {
264 16 : id: CacheId::next(),
265 16 : map: Default::default(),
266 16 : }
267 16 : }
268 : }
269 :
270 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
271 : pub(crate) struct ShardTimelineId {
272 : pub(crate) shard_index: ShardIndex,
273 : pub(crate) timeline_id: TimelineId,
274 : }
275 :
276 : /// See module-level comment.
277 : pub(crate) struct Handle<T: Types> {
278 : inner: Arc<Mutex<HandleInner<T>>>,
279 : open: Arc<T::Timeline>,
280 : }
281 : pub(crate) struct WeakHandle<T: Types> {
282 : inner: Weak<Mutex<HandleInner<T>>>,
283 : }
284 :
285 : enum HandleInner<T: Types> {
286 : Open(Arc<T::Timeline>),
287 : ShutDown,
288 : }
289 :
290 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
291 : ///
292 : /// See module-level comment for details.
293 : pub struct PerTimelineState<T: Types> {
294 : // None = shutting down
295 : #[allow(clippy::type_complexity)]
296 : handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
297 : }
298 :
299 : impl<T: Types> Default for PerTimelineState<T> {
300 245 : fn default() -> Self {
301 245 : Self {
302 245 : handles: Mutex::new(Some(Default::default())),
303 245 : }
304 245 : }
305 : }
306 :
307 : /// Abstract view of [`crate::tenant::mgr`], for testability.
308 : pub(crate) trait TenantManager<T: Types> {
309 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
310 : async fn resolve(
311 : &self,
312 : timeline_id: TimelineId,
313 : shard_selector: ShardSelector,
314 : ) -> Result<T::Timeline, GetActiveTimelineError>;
315 : }
316 :
317 : /// Abstract view of an [`Arc<Timeline>`], for testability.
318 : pub(crate) trait Timeline<T: Types> {
319 : fn shard_timeline_id(&self) -> ShardTimelineId;
320 : fn get_shard_identity(&self) -> &ShardIdentity;
321 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
322 : }
323 :
324 : /// Internal type used in [`Cache::get`].
325 : enum RoutingResult<T: Types> {
326 : FastPath(Handle<T>),
327 : SlowPath(ShardTimelineId),
328 : NeedConsultTenantManager,
329 : }
330 :
331 : impl<T: Types> Cache<T> {
332 : /* BEGIN_HADRON */
333 : /// A wrapper of do_get to resolve the tenant shard for a get page request.
334 : #[instrument(level = "trace", skip_all)]
335 : pub(crate) async fn get(
336 : &mut self,
337 : timeline_id: TimelineId,
338 : shard_selector: ShardSelector,
339 : tenant_manager: &T::TenantManager,
340 : ) -> Result<Handle<T>, GetActiveTimelineError> {
341 : const GET_MAX_RETRIES: usize = 10;
342 : const RETRY_BACKOFF: Duration = Duration::from_millis(100);
343 : let mut attempt = 0;
344 : loop {
345 : attempt += 1;
346 : match self
347 : .do_get(timeline_id, shard_selector, tenant_manager)
348 : .await
349 : {
350 : Ok(handle) => return Ok(handle),
351 : Err(
352 : e @ GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout {
353 : ..
354 : }),
355 : ) => {
356 : // Retry on tenant manager error to handle tenant split more gracefully
357 : if attempt < GET_MAX_RETRIES {
358 : tokio::time::sleep(RETRY_BACKOFF).await;
359 : continue;
360 : } else {
361 : tracing::info!(
362 : "Failed to resolve tenant shard after {} attempts: {:?}",
363 : GET_MAX_RETRIES,
364 : e
365 : );
366 : return Err(e);
367 : }
368 : }
369 : Err(err) => return Err(err),
370 : }
371 : }
372 : }
373 : /* END_HADRON */
374 :
375 : /// See module-level comment for details.
376 : ///
377 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
378 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
379 : /// the [`Handle`] are responsible for checking these conditions
380 : /// and if so, return an error that causes the page service to
381 : /// close the connection.
382 : #[instrument(level = "trace", skip_all)]
383 : async fn do_get(
384 : &mut self,
385 : timeline_id: TimelineId,
386 : shard_selector: ShardSelector,
387 : tenant_manager: &T::TenantManager,
388 : ) -> Result<Handle<T>, GetActiveTimelineError> {
389 : // terminates because when every iteration we remove an element from the map
390 : let miss: ShardSelector = loop {
391 : let routing_state = self.shard_routing(timeline_id, shard_selector);
392 : match routing_state {
393 : RoutingResult::FastPath(handle) => return Ok(handle),
394 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
395 : Some(cached) => match cached.upgrade() {
396 : Ok(upgraded) => return Ok(upgraded),
397 : Err(HandleUpgradeError::ShutDown) => {
398 : // TODO: dedup with shard_routing()
399 : trace!("handle cache stale");
400 : self.map.remove(&key).unwrap();
401 : continue;
402 : }
403 : },
404 : None => break ShardSelector::Known(key.shard_index),
405 : },
406 : RoutingResult::NeedConsultTenantManager => break shard_selector,
407 : }
408 : };
409 : self.get_miss(timeline_id, miss, tenant_manager).await
410 : }
411 :
412 : #[inline(always)]
413 29 : fn shard_routing(
414 29 : &mut self,
415 29 : timeline_id: TimelineId,
416 29 : shard_selector: ShardSelector,
417 29 : ) -> RoutingResult<T> {
418 : loop {
419 : // terminates because when every iteration we remove an element from the map
420 31 : let Some((first_key, first_handle)) = self.map.iter().next() else {
421 19 : return RoutingResult::NeedConsultTenantManager;
422 : };
423 12 : let Ok(first_handle) = first_handle.upgrade() else {
424 : // TODO: dedup with get()
425 2 : trace!("handle cache stale");
426 2 : let first_key_owned = *first_key;
427 2 : self.map.remove(&first_key_owned).unwrap();
428 2 : continue;
429 : };
430 :
431 10 : let first_handle_shard_identity = first_handle.get_shard_identity();
432 10 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
433 10 : shard_number: shard_num,
434 10 : shard_count: first_handle_shard_identity.count,
435 10 : };
436 :
437 10 : let need_idx = match shard_selector {
438 10 : ShardSelector::Page(key) => {
439 10 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
440 : }
441 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
442 0 : ShardSelector::Known(shard_idx) => shard_idx,
443 : };
444 10 : let need_shard_timeline_id = ShardTimelineId {
445 10 : shard_index: need_idx,
446 10 : timeline_id,
447 10 : };
448 10 : let first_handle_shard_timeline_id = ShardTimelineId {
449 10 : shard_index: first_handle_shard_identity.shard_index(),
450 10 : timeline_id: first_handle.shard_timeline_id().timeline_id,
451 10 : };
452 :
453 10 : if need_shard_timeline_id == first_handle_shard_timeline_id {
454 6 : return RoutingResult::FastPath(first_handle);
455 : } else {
456 4 : return RoutingResult::SlowPath(need_shard_timeline_id);
457 : }
458 : }
459 29 : }
460 :
461 : #[instrument(level = "trace", skip_all)]
462 : #[inline(always)]
463 : async fn get_miss(
464 : &mut self,
465 : timeline_id: TimelineId,
466 : shard_selector: ShardSelector,
467 : tenant_manager: &T::TenantManager,
468 : ) -> Result<Handle<T>, GetActiveTimelineError> {
469 : let timeline = tenant_manager.resolve(timeline_id, shard_selector).await?;
470 : let key = timeline.shard_timeline_id();
471 : match &shard_selector {
472 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
473 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
474 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
475 : }
476 :
477 : trace!("creating new HandleInner");
478 : let timeline = Arc::new(timeline);
479 : let handle_inner_arc = Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
480 : let handle_weak = WeakHandle {
481 : inner: Arc::downgrade(&handle_inner_arc),
482 : };
483 : let handle = handle_weak
484 : .upgrade()
485 : .ok()
486 : .expect("we just created it and it's not linked anywhere yet");
487 : let mut lock_guard = timeline
488 : .per_timeline_state()
489 : .handles
490 : .lock()
491 : .expect("mutex poisoned");
492 : let Some(per_timeline_state) = &mut *lock_guard else {
493 : return Err(GetActiveTimelineError::Timeline(
494 : GetTimelineError::ShuttingDown,
495 : ));
496 : };
497 : let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
498 : assert!(replaced.is_none(), "some earlier code left a stale handle");
499 : match self.map.entry(key) {
500 : hash_map::Entry::Occupied(_o) => {
501 : // This cannot not happen because
502 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
503 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
504 : // while we were waiting for the tenant manager.
505 : unreachable!()
506 : }
507 : hash_map::Entry::Vacant(v) => {
508 : v.insert(handle_weak);
509 : }
510 : }
511 : Ok(handle)
512 : }
513 : }
514 :
515 : pub(crate) enum HandleUpgradeError {
516 : ShutDown,
517 : }
518 :
519 : impl<T: Types> WeakHandle<T> {
520 36 : pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
521 36 : let Some(inner) = Weak::upgrade(&self.inner) else {
522 2 : return Err(HandleUpgradeError::ShutDown);
523 : };
524 34 : let lock_guard = inner.lock().expect("poisoned");
525 34 : match &*lock_guard {
526 31 : HandleInner::Open(open) => {
527 31 : let open = Arc::clone(open);
528 31 : drop(lock_guard);
529 31 : Ok(Handle { open, inner })
530 : }
531 3 : HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
532 : }
533 36 : }
534 :
535 0 : pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
536 0 : Weak::ptr_eq(&self.inner, &other.inner)
537 0 : }
538 : }
539 :
540 : impl<T: Types> std::ops::Deref for Handle<T> {
541 : type Target = T::Timeline;
542 53 : fn deref(&self) -> &Self::Target {
543 53 : &self.open
544 53 : }
545 : }
546 :
547 : impl<T: Types> Handle<T> {
548 1 : pub(crate) fn downgrade(&self) -> WeakHandle<T> {
549 1 : WeakHandle {
550 1 : inner: Arc::downgrade(&self.inner),
551 1 : }
552 1 : }
553 : }
554 :
555 : impl<T: Types> PerTimelineState<T> {
556 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
557 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
558 : /// Even if [`TenantManager::resolve`] would still resolve to it.
559 : ///
560 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`Types::Timeline`] alive.
561 : /// That's ok because they're short-lived. See module-level comment for details.
562 : #[instrument(level = "trace", skip_all)]
563 : pub(super) fn shutdown(&self) {
564 : let handles = self
565 : .handles
566 : .lock()
567 : .expect("mutex poisoned")
568 : // NB: this .take() sets locked to None.
569 : // That's what makes future `Cache::get` misses fail.
570 : // Cache hits are taken care of below.
571 : .take();
572 : let Some(handles) = handles else {
573 : trace!("already shut down");
574 : return;
575 : };
576 : for handle_inner_arc in handles.values() {
577 : // Make hits fail.
578 : let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
579 : lock_guard.shutdown();
580 : }
581 : drop(handles);
582 : }
583 : }
584 :
585 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
586 : impl<T: Types> Drop for Cache<T> {
587 16 : fn drop(&mut self) {
588 : for (
589 : _,
590 : WeakHandle {
591 16 : inner: handle_inner_weak,
592 : },
593 16 : ) in self.map.drain()
594 : {
595 16 : let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
596 2 : continue;
597 : };
598 14 : let Some(handle_timeline) = handle_inner_arc
599 14 : // locking rules: drop lock before acquiring other lock below
600 14 : .lock()
601 14 : .expect("poisoned")
602 14 : .shutdown()
603 : else {
604 : // Concurrent PerTimelineState::shutdown.
605 0 : continue;
606 : };
607 : // Clean up per_timeline_state so the HandleInner allocation can be dropped.
608 14 : let per_timeline_state = handle_timeline.per_timeline_state();
609 14 : let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
610 14 : let Some(handles) = &mut *handles_lock_guard else {
611 0 : continue;
612 : };
613 14 : let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
614 : // Concurrent PerTimelineState::shutdown.
615 0 : continue;
616 : };
617 14 : drop(handles_lock_guard); // locking rules!
618 14 : assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
619 : }
620 16 : }
621 : }
622 :
623 : impl<T: Types> HandleInner<T> {
624 19 : fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
625 19 : match std::mem::replace(self, HandleInner::ShutDown) {
626 19 : HandleInner::Open(timeline) => Some(timeline),
627 : HandleInner::ShutDown => {
628 : // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
629 : // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
630 : // the handle lock at the same time.
631 0 : None
632 : }
633 : }
634 19 : }
635 : }
636 :
637 : #[cfg(test)]
638 : mod tests {
639 : use std::sync::Weak;
640 :
641 : use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
642 : use pageserver_api::models::ShardParameters;
643 : use pageserver_api::reltag::RelTag;
644 : use pageserver_api::shard::DEFAULT_STRIPE_SIZE;
645 : use utils::id::TenantId;
646 : use utils::shard::{ShardCount, TenantShardId};
647 : use utils::sync::gate::GateGuard;
648 :
649 : use super::*;
650 :
651 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
652 :
653 : #[derive(Debug)]
654 : struct TestTypes;
655 : impl Types for TestTypes {
656 : type TenantManager = StubManager;
657 : type Timeline = Entered;
658 : }
659 :
660 : struct StubManager {
661 : shards: Vec<Arc<StubTimeline>>,
662 : }
663 :
664 : struct StubTimeline {
665 : gate: utils::sync::gate::Gate,
666 : id: TimelineId,
667 : shard: ShardIdentity,
668 : per_timeline_state: PerTimelineState<TestTypes>,
669 : myself: Weak<StubTimeline>,
670 : }
671 :
672 : struct Entered {
673 : timeline: Arc<StubTimeline>,
674 : #[allow(dead_code)] // it's stored here to keep the gate open
675 : gate_guard: Arc<GateGuard>,
676 : }
677 :
678 : impl StubTimeline {
679 11 : fn getpage(&self) {
680 : // do nothing
681 11 : }
682 : }
683 :
684 : impl Timeline<TestTypes> for Entered {
685 30 : fn shard_timeline_id(&self) -> ShardTimelineId {
686 30 : ShardTimelineId {
687 30 : shard_index: self.shard.shard_index(),
688 30 : timeline_id: self.id,
689 30 : }
690 30 : }
691 :
692 10 : fn get_shard_identity(&self) -> &ShardIdentity {
693 10 : &self.shard
694 10 : }
695 :
696 34 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
697 34 : &self.per_timeline_state
698 34 : }
699 : }
700 :
701 : impl TenantManager<TestTypes> for StubManager {
702 23 : async fn resolve(
703 23 : &self,
704 23 : timeline_id: TimelineId,
705 23 : shard_selector: ShardSelector,
706 23 : ) -> Result<Entered, GetActiveTimelineError> {
707 21 : fn enter_gate(
708 21 : timeline: &StubTimeline,
709 21 : ) -> Result<Arc<GateGuard>, GetActiveTimelineError> {
710 21 : Ok(Arc::new(timeline.gate.enter().map_err(|_| {
711 1 : GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown)
712 1 : })?))
713 21 : }
714 :
715 27 : for timeline in &self.shards {
716 25 : if timeline.id == timeline_id {
717 0 : match &shard_selector {
718 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
719 : return Ok(Entered {
720 0 : timeline: Arc::clone(timeline),
721 0 : gate_guard: enter_gate(timeline)?,
722 : });
723 : }
724 0 : ShardSelector::Zero => continue,
725 19 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
726 : return Ok(Entered {
727 19 : timeline: Arc::clone(timeline),
728 19 : gate_guard: enter_gate(timeline)?,
729 : });
730 : }
731 0 : ShardSelector::Page(_) => continue,
732 3 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
733 : return Ok(Entered {
734 2 : timeline: Arc::clone(timeline),
735 2 : gate_guard: enter_gate(timeline)?,
736 : });
737 : }
738 1 : ShardSelector::Known(_) => continue,
739 : }
740 3 : }
741 : }
742 2 : Err(GetActiveTimelineError::Timeline(
743 2 : GetTimelineError::NotFound {
744 2 : tenant_id: TenantShardId::unsharded(TenantId::from([0; 16])),
745 2 : timeline_id,
746 2 : },
747 2 : ))
748 23 : }
749 : }
750 :
751 : impl std::ops::Deref for Entered {
752 : type Target = StubTimeline;
753 137 : fn deref(&self) -> &Self::Target {
754 137 : &self.timeline
755 137 : }
756 : }
757 :
758 : #[tokio::test(start_paused = true)]
759 1 : async fn test_timeline_shutdown() {
760 1 : crate::tenant::harness::setup_logging();
761 :
762 1 : let timeline_id = TimelineId::generate();
763 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
764 1 : gate: Default::default(),
765 1 : id: timeline_id,
766 1 : shard: ShardIdentity::unsharded(),
767 1 : per_timeline_state: PerTimelineState::default(),
768 1 : myself: myself.clone(),
769 1 : });
770 1 : let mgr = StubManager {
771 1 : shards: vec![shard0.clone()],
772 1 : };
773 1 : let key = DBDIR_KEY;
774 :
775 1 : let mut cache = Cache::<TestTypes>::default();
776 :
777 : //
778 : // fill the cache
779 : //
780 1 : let handle: Handle<_> = cache
781 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
782 1 : .await
783 1 : .expect("we have the timeline");
784 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
785 1 : assert_eq!(cache.map.len(), 1);
786 1 : drop(handle);
787 :
788 : //
789 : // demonstrate that Handle holds up gate closure
790 : // but shutdown prevents new handles from being handed out
791 : //
792 :
793 1 : tokio::select! {
794 1 : _ = shard0.gate.close() => {
795 0 : panic!("cache and per-timeline handler state keep cache open");
796 : }
797 1 : _ = tokio::time::sleep(FOREVER) => {
798 1 : // NB: first poll of close() makes it enter closing state
799 1 : }
800 : }
801 :
802 1 : let handle = cache
803 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
804 1 : .await
805 1 : .expect("we have the timeline");
806 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
807 :
808 : // SHUTDOWN
809 1 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
810 :
811 1 : assert_eq!(
812 1 : cache.map.len(),
813 : 1,
814 0 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
815 : );
816 :
817 : // this handle is perfectly usable
818 1 : handle.getpage();
819 :
820 1 : cache
821 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
822 1 : .await
823 1 : .err()
824 1 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
825 1 : assert_eq!(
826 1 : cache.map.len(),
827 : 0,
828 0 : "first access after shutdown cleans up the Weak's from the cache"
829 : );
830 :
831 1 : tokio::select! {
832 1 : _ = shard0.gate.close() => {
833 0 : panic!("handle is keeping gate open");
834 : }
835 1 : _ = tokio::time::sleep(FOREVER) => { }
836 : }
837 :
838 1 : drop(handle);
839 :
840 : // closing gate succeeds after dropping handle
841 1 : tokio::select! {
842 1 : _ = shard0.gate.close() => { }
843 1 : _ = tokio::time::sleep(FOREVER) => {
844 0 : panic!("handle is dropped, no other gate holders exist")
845 : }
846 : }
847 :
848 : // map gets cleaned on next lookup
849 1 : cache
850 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
851 1 : .await
852 1 : .err()
853 1 : .expect("documented behavior: can't get new handle after shutdown");
854 1 : assert_eq!(cache.map.len(), 0);
855 :
856 : // ensure all refs to shard0 are gone and we're not leaking anything
857 1 : drop(shard0);
858 1 : drop(mgr);
859 1 : }
860 :
861 : #[tokio::test]
862 1 : async fn test_multiple_timelines_and_deletion() {
863 1 : crate::tenant::harness::setup_logging();
864 :
865 1 : let timeline_a = TimelineId::generate();
866 1 : let timeline_b = TimelineId::generate();
867 1 : assert_ne!(timeline_a, timeline_b);
868 1 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
869 1 : gate: Default::default(),
870 1 : id: timeline_a,
871 1 : shard: ShardIdentity::unsharded(),
872 1 : per_timeline_state: PerTimelineState::default(),
873 1 : myself: myself.clone(),
874 1 : });
875 1 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
876 1 : gate: Default::default(),
877 1 : id: timeline_b,
878 1 : shard: ShardIdentity::unsharded(),
879 1 : per_timeline_state: PerTimelineState::default(),
880 1 : myself: myself.clone(),
881 1 : });
882 1 : let mut mgr = StubManager {
883 1 : shards: vec![timeline_a.clone(), timeline_b.clone()],
884 1 : };
885 1 : let key = DBDIR_KEY;
886 :
887 1 : let mut cache = Cache::<TestTypes>::default();
888 :
889 1 : cache
890 1 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
891 1 : .await
892 1 : .expect("we have it");
893 1 : cache
894 1 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
895 1 : .await
896 1 : .expect("we have it");
897 1 : assert_eq!(cache.map.len(), 2);
898 :
899 : // delete timeline A
900 1 : timeline_a.per_timeline_state.shutdown();
901 2 : mgr.shards.retain(|t| t.id != timeline_a.id);
902 1 : assert!(
903 1 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
904 1 : .await
905 1 : .is_err(),
906 0 : "broken StubManager implementation"
907 : );
908 :
909 1 : assert_eq!(
910 1 : cache.map.len(),
911 : 2,
912 0 : "cache still has a Weak handle to Timeline A"
913 : );
914 1 : cache
915 1 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
916 1 : .await
917 1 : .err()
918 1 : .expect("documented behavior: can't get new handle after shutdown");
919 :
920 1 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
921 :
922 1 : cache
923 1 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
924 1 : .await
925 1 : .expect("we still have it");
926 1 : }
927 :
928 7 : fn make_relation_key_for_shard(shard: ShardNumber, params: ShardParameters) -> Key {
929 7 : rel_block_to_key(
930 7 : RelTag {
931 7 : spcnode: 1663,
932 7 : dbnode: 208101,
933 7 : relnode: 2620,
934 7 : forknum: 0,
935 7 : },
936 7 : shard.0 as u32 * params.stripe_size.0,
937 : )
938 7 : }
939 :
940 : #[tokio::test(start_paused = true)]
941 1 : async fn test_shard_split() {
942 1 : crate::tenant::harness::setup_logging();
943 1 : let timeline_id = TimelineId::generate();
944 1 : let parent = Arc::new_cyclic(|myself| StubTimeline {
945 1 : gate: Default::default(),
946 1 : id: timeline_id,
947 1 : shard: ShardIdentity::unsharded(),
948 1 : per_timeline_state: PerTimelineState::default(),
949 1 : myself: myself.clone(),
950 1 : });
951 1 : let child_params = ShardParameters {
952 1 : count: ShardCount(2),
953 1 : stripe_size: DEFAULT_STRIPE_SIZE,
954 1 : };
955 1 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
956 1 : gate: Default::default(),
957 1 : id: timeline_id,
958 1 : shard: ShardIdentity::from_params(ShardNumber(0), child_params),
959 1 : per_timeline_state: PerTimelineState::default(),
960 1 : myself: myself.clone(),
961 1 : });
962 1 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
963 1 : gate: Default::default(),
964 1 : id: timeline_id,
965 1 : shard: ShardIdentity::from_params(ShardNumber(1), child_params),
966 1 : per_timeline_state: PerTimelineState::default(),
967 1 : myself: myself.clone(),
968 1 : });
969 1 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
970 :
971 1 : let mut cache = Cache::<TestTypes>::default();
972 :
973 : // fill the cache with the parent
974 3 : for i in 0..2 {
975 2 : let handle = cache
976 2 : .get(
977 2 : timeline_id,
978 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
979 2 : &StubManager {
980 2 : shards: vec![parent.clone()],
981 2 : },
982 2 : )
983 2 : .await
984 2 : .expect("we have it");
985 2 : assert!(
986 2 : Weak::ptr_eq(&handle.myself, &parent.myself),
987 0 : "mgr returns parent first"
988 : );
989 2 : drop(handle);
990 : }
991 :
992 : //
993 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
994 : //
995 :
996 : // while we haven't shut down the parent, the cache will return the cached parent, even
997 : // if the tenant manager returns the child
998 3 : for i in 0..2 {
999 2 : let handle = cache
1000 2 : .get(
1001 2 : timeline_id,
1002 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
1003 2 : &StubManager {
1004 2 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
1005 2 : },
1006 2 : )
1007 2 : .await
1008 2 : .expect("we have it");
1009 2 : assert!(
1010 2 : Weak::ptr_eq(&handle.myself, &parent.myself),
1011 0 : "mgr returns parent"
1012 : );
1013 2 : drop(handle);
1014 : }
1015 :
1016 1 : let parent_handle = cache
1017 1 : .get(
1018 1 : timeline_id,
1019 1 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), child_params)),
1020 1 : &StubManager {
1021 1 : shards: vec![parent.clone()],
1022 1 : },
1023 1 : )
1024 1 : .await
1025 1 : .expect("we have it");
1026 1 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
1027 :
1028 : // invalidate the cache
1029 1 : parent.per_timeline_state.shutdown();
1030 :
1031 : // the cache will now return the child, even though the parent handle still exists
1032 3 : for i in 0..2 {
1033 2 : let handle = cache
1034 2 : .get(
1035 2 : timeline_id,
1036 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
1037 2 : &StubManager {
1038 2 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
1039 2 : },
1040 2 : )
1041 2 : .await
1042 2 : .expect("we have it");
1043 2 : assert!(
1044 2 : Weak::ptr_eq(
1045 2 : &handle.myself,
1046 2 : &child_shards_by_shard_number[i as usize].myself
1047 : ),
1048 0 : "mgr returns child"
1049 : );
1050 2 : drop(handle);
1051 : }
1052 :
1053 : // all the while the parent handle kept the parent gate open
1054 1 : tokio::select! {
1055 1 : _ = parent_handle.gate.close() => {
1056 0 : panic!("parent handle is keeping gate open");
1057 : }
1058 1 : _ = tokio::time::sleep(FOREVER) => { }
1059 : }
1060 1 : drop(parent_handle);
1061 1 : tokio::select! {
1062 1 : _ = parent.gate.close() => { }
1063 1 : _ = tokio::time::sleep(FOREVER) => {
1064 1 : panic!("parent handle is dropped, no other gate holders exist")
1065 1 : }
1066 1 : }
1067 1 : }
1068 :
1069 : #[tokio::test(start_paused = true)]
1070 1 : async fn test_connection_handler_exit() {
1071 1 : crate::tenant::harness::setup_logging();
1072 1 : let timeline_id = TimelineId::generate();
1073 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1074 1 : gate: Default::default(),
1075 1 : id: timeline_id,
1076 1 : shard: ShardIdentity::unsharded(),
1077 1 : per_timeline_state: PerTimelineState::default(),
1078 1 : myself: myself.clone(),
1079 1 : });
1080 1 : let mgr = StubManager {
1081 1 : shards: vec![shard0.clone()],
1082 1 : };
1083 1 : let key = DBDIR_KEY;
1084 :
1085 : // Simulate 10 connections that's opened, used, and closed
1086 11 : for _ in 0..10 {
1087 10 : let mut cache = Cache::<TestTypes>::default();
1088 10 : let handle = {
1089 10 : let handle = cache
1090 10 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1091 10 : .await
1092 10 : .expect("we have the timeline");
1093 10 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1094 10 : handle
1095 1 : };
1096 10 : handle.getpage();
1097 1 : }
1098 1 :
1099 1 : // No handles exist, thus gates are closed and don't require shutdown.
1100 1 : // Thus the gate should close immediately, even without shutdown.
1101 1 : tokio::select! {
1102 1 : _ = shard0.gate.close() => { }
1103 1 : _ = tokio::time::sleep(FOREVER) => {
1104 1 : panic!("handle is dropped, no other gate holders exist")
1105 1 : }
1106 1 : }
1107 1 : }
1108 :
1109 : #[tokio::test(start_paused = true)]
1110 1 : async fn test_weak_handles() {
1111 1 : crate::tenant::harness::setup_logging();
1112 1 : let timeline_id = TimelineId::generate();
1113 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1114 1 : gate: Default::default(),
1115 1 : id: timeline_id,
1116 1 : shard: ShardIdentity::unsharded(),
1117 1 : per_timeline_state: PerTimelineState::default(),
1118 1 : myself: myself.clone(),
1119 1 : });
1120 1 : let mgr = StubManager {
1121 1 : shards: vec![shard0.clone()],
1122 1 : };
1123 :
1124 1 : let refcount_start = Arc::strong_count(&shard0);
1125 :
1126 1 : let key = DBDIR_KEY;
1127 :
1128 1 : let mut cache = Cache::<TestTypes>::default();
1129 :
1130 1 : let handle = cache
1131 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1132 1 : .await
1133 1 : .expect("we have the timeline");
1134 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1135 :
1136 1 : let weak_handle = handle.downgrade();
1137 :
1138 1 : drop(handle);
1139 :
1140 1 : let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
1141 :
1142 : // Start shutdown
1143 1 : shard0.per_timeline_state.shutdown();
1144 :
1145 : // Upgrades during shutdown don't work, even if upgraded_handle exists.
1146 1 : weak_handle
1147 1 : .upgrade()
1148 1 : .err()
1149 1 : .expect("can't upgrade weak handle as soon as shutdown started");
1150 :
1151 : // But upgraded_handle is still alive, so the gate won't close.
1152 1 : tokio::select! {
1153 1 : _ = shard0.gate.close() => {
1154 0 : panic!("handle is keeping gate open");
1155 : }
1156 1 : _ = tokio::time::sleep(FOREVER) => { }
1157 : }
1158 :
1159 : // Drop the last handle.
1160 1 : drop(upgraded_handle);
1161 :
1162 : // The gate should close now, despite there still being a weak_handle.
1163 1 : tokio::select! {
1164 1 : _ = shard0.gate.close() => { }
1165 1 : _ = tokio::time::sleep(FOREVER) => {
1166 0 : panic!("only strong handle is dropped and we shut down per-timeline-state")
1167 : }
1168 : }
1169 :
1170 : // The weak handle still can't be upgraded.
1171 1 : weak_handle
1172 1 : .upgrade()
1173 1 : .err()
1174 1 : .expect("still shouldn't be able to upgrade the weak handle");
1175 :
1176 : // There should be no strong references to the timeline object except the one on "stack".
1177 1 : assert_eq!(Arc::strong_count(&shard0), refcount_start);
1178 1 : }
1179 :
1180 : #[tokio::test(start_paused = true)]
1181 1 : async fn test_reference_cycle_broken_when_cache_is_dropped() {
1182 1 : crate::tenant::harness::setup_logging();
1183 1 : let timeline_id = TimelineId::generate();
1184 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1185 1 : gate: Default::default(),
1186 1 : id: timeline_id,
1187 1 : shard: ShardIdentity::unsharded(),
1188 1 : per_timeline_state: PerTimelineState::default(),
1189 1 : myself: myself.clone(),
1190 1 : });
1191 1 : let mgr = StubManager {
1192 1 : shards: vec![shard0.clone()],
1193 1 : };
1194 1 : let key = DBDIR_KEY;
1195 :
1196 1 : let mut cache = Cache::<TestTypes>::default();
1197 :
1198 : // helper to check if a handle is referenced by per_timeline_state
1199 2 : let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
1200 2 : let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
1201 2 : let per_timeline_state = per_timeline_state.as_ref().unwrap();
1202 2 : per_timeline_state
1203 2 : .values()
1204 2 : .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
1205 2 : };
1206 :
1207 : // Fill the cache.
1208 1 : let handle = cache
1209 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1210 1 : .await
1211 1 : .expect("we have the timeline");
1212 1 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
1213 1 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1214 1 : assert!(
1215 1 : per_timeline_state_refs_handle(&handle_inner_weak),
1216 0 : "we still hold `handle` _and_ haven't dropped `cache` yet"
1217 : );
1218 :
1219 : // Drop the cache.
1220 1 : drop(cache);
1221 :
1222 1 : assert!(
1223 1 : !(per_timeline_state_refs_handle(&handle_inner_weak)),
1224 0 : "nothing should reference the handle allocation anymore"
1225 : );
1226 1 : assert!(
1227 1 : Weak::upgrade(&handle_inner_weak).is_some(),
1228 0 : "the local `handle` still keeps the allocation alive"
1229 : );
1230 : // but obviously the cache is gone so no new allocations can be handed out.
1231 :
1232 : // Drop handle.
1233 1 : drop(handle);
1234 1 : assert!(
1235 1 : Weak::upgrade(&handle_inner_weak).is_none(),
1236 1 : "the local `handle` is dropped, so the allocation should be dropped by now"
1237 1 : );
1238 1 : }
1239 :
1240 : #[tokio::test(start_paused = true)]
1241 1 : async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
1242 1 : crate::tenant::harness::setup_logging();
1243 1 : let timeline_id = TimelineId::generate();
1244 1 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
1245 1 : gate: Default::default(),
1246 1 : id: timeline_id,
1247 1 : shard: ShardIdentity::unsharded(),
1248 1 : per_timeline_state: PerTimelineState::default(),
1249 1 : myself: myself.clone(),
1250 1 : });
1251 1 : let mgr = StubManager {
1252 1 : shards: vec![shard0.clone()],
1253 1 : };
1254 1 : let key = DBDIR_KEY;
1255 :
1256 1 : let mut cache = Cache::<TestTypes>::default();
1257 1 : let handle = cache
1258 1 : .get(timeline_id, ShardSelector::Page(key), &mgr)
1259 1 : .await
1260 1 : .expect("we have the timeline");
1261 : // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
1262 1 : let handle_inner_weak = Arc::downgrade(&handle.inner);
1263 :
1264 : // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
1265 1 : drop(handle);
1266 1 : assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
1267 :
1268 : // Shutdown the per_timeline_state.
1269 1 : shard0.per_timeline_state.shutdown();
1270 1 : assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
1271 :
1272 : // cache only contains Weak's, so, it can outlive the per_timeline_state without
1273 : // Drop explicitly solely to make this point.
1274 1 : drop(cache);
1275 1 : }
1276 : }
|