LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - handle.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 97.7 % 650 635
Test Date: 2025-02-20 13:11:02 Functions: 83.6 % 55 46

            Line data    Source code
       1              : //! An efficient way to keep the timeline gate open without preventing
       2              : //! timeline shutdown for longer than a single call to a timeline method.
       3              : //!
       4              : //! # Motivation
       5              : //!
       6              : //! On a single page service connection, we're typically serving a single TenantTimelineId.
       7              : //!
       8              : //! Without sharding, there is a single Timeline object to which we dispatch
       9              : //! all requests. For example, a getpage request gets dispatched to the
      10              : //! Timeline::get method of the Timeline object that represents the
      11              : //! (tenant,timeline) of that connection.
      12              : //!
      13              : //! With sharding, for each request that comes in on the connection,
      14              : //! we first have to perform shard routing based on the requested key (=~ page number).
      15              : //! The result of shard routing is a Timeline object.
      16              : //! We then dispatch the request to that Timeline object.
      17              : //!
      18              : //! Regardless of whether the tenant is sharded or not, we want to ensure that
      19              : //! we hold the Timeline gate open while we're invoking the method on the
      20              : //! Timeline object.
      21              : //!
      22              : //! However, we want to avoid the overhead of entering the gate for every
      23              : //! method invocation.
      24              : //!
      25              : //! Further, for shard routing, we want to avoid calling the tenant manager to
      26              : //! resolve the shard for every request. Instead, we want to cache the
      27              : //! routing result so we can bypass the tenant manager for all subsequent requests
      28              : //! that get routed to that shard.
      29              : //!
      30              : //! Regardless of how we accomplish the above, it should not
      31              : //! prevent the Timeline from shutting down promptly.
      32              : //!
      33              : //! # Design
      34              : //!
      35              : //! ## Data Structures
      36              : //!
      37              : //! There are three user-facing data structures:
      38              : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
      39              : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
      40              : //! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
      41              : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
      42              : //!   trying to ugprade back to a `Handle`, guaranteeing it's the same `Timeline` *object*.
      43              : //!
      44              : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
      45              : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
      46              : //!
      47              : //! The `HandleInner`  is allocated as a `Arc<Mutex<HandleInner>>` and
      48              : //! referenced weakly and strongly from various places which we are now illustrating.
      49              : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
      50              : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
      51              : //! or `Weak<Mutex<HandleInner>>`, respectively.
      52              : //!
      53              : //! - The `Handle` is a strong ref.
      54              : //! - The `WeakHandle` is a weak ref.
      55              : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
      56              : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
      57              : //!
      58              : //! Lifetimes:
      59              : //! - `WeakHandle` and `Handle`: single pagestream request.
      60              : //! - `Cache`: single page service connection.
      61              : //! - `PerTimelineState`:  lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
      62              : //!
      63              : //! ## Request Handling Flow (= filling and using the `Cache``)
      64              : //!
      65              : //! To dispatch a request, the page service connection calls `Cache::get`.
      66              : //!
      67              : //! A cache miss means we consult the tenant manager for shard routing,
      68              : //! resulting in an `Arc<Timeline>`. We enter its gate _once_ and store it in the the
      69              : //! `Arc<Mutex<HandleInner>>>`. A weak ref is stored in the `Cache`
      70              : //! and a strong ref in the `PerTimelineState`.
      71              : //! A strong ref is returned wrapped in a `Handle`.
      72              : //!
      73              : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
      74              : //! and find the weak ref in the cache.
      75              : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
      76              : //!
      77              : //! The pagestream processing is pipelined and involves a batching step.
      78              : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
      79              : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
      80              : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
      81              : //! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
      82              : //!
      83              : //! # Performance
      84              : //!
      85              : //! Remember from the introductory section:
      86              : //!
      87              : //! > However, we want to avoid the overhead of entering the gate for every
      88              : //! > method invocation.
      89              : //!
      90              : //! Why do we want to avoid that?
      91              : //! Because the gate is a shared location in memory and entering it involves
      92              : //! bumping refcounts, which leads to cache contention if done frequently
      93              : //! from multiple cores in parallel.
      94              : //!
      95              : //! So, we only acquire the `GateGuard` once on `Cache` miss, and wrap it in an `Arc`.
      96              : //! That `Arc` is private to the `HandleInner` and hence to the connection.
      97              : //! (Review the "Data Structures" section if that is unclear to you.)
      98              : //!
      99              : //! A `WeakHandle` is a weak ref to the `HandleInner`.
     100              : //! When upgrading a `WeakHandle`, we upgrade to a strong ref to the `HandleInner` and
     101              : //! further acquire an additional strong ref to the `Arc<GateGuard>` inside it.
     102              : //! Again, this manipulation of ref counts is is cheap because `Arc` is private to the connection.
     103              : //!
     104              : //! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
     105              : //! Again, this is cheap because the `Arc` is private to the connection.
     106              : //!
     107              : //! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
     108              : //! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
     109              : //! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
     110              : //! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
     111              : //! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
     112              : //! so that we can clone it cheaply when upgrading a `WeakHandle`.
     113              : //!
     114              : //! # Shutdown
     115              : //!
     116              : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
     117              : //!
     118              : //! ```text
     119              : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
     120              : //! ```
     121              : //!
     122              : //! Further, there is this cycle:
     123              : //!
     124              : //! ```text
     125              : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> GateGuard --keepalive--> Timeline
     126              : //! ```
     127              : //!
     128              : //! The former cycle is a memory leak if not broken.
     129              : //! The latter cycle further prevents the Timeline from shutting down
     130              : //! because we certainly won't drop the Timeline while the GateGuard is alive.
     131              : //! Preventing shutdown is the whole point of this handle/cache system,
     132              : //! but when the Timeline needs to shut down, we need to break the cycle.
     133              : //!
     134              : //! The cycle is broken by either
     135              : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
     136              : //! - Connection shutdown (=> dropping the `Cache`).
     137              : //!
     138              : //! Both transition the `HandleInner` from [`HandleInner::KeepingTimelineGateOpen`] to
     139              : //! [`HandleInner::ShutDown`], which drops the only long-lived strong ref to the
     140              : //! `Arc<GateGuard>`.
     141              : //!
     142              : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
     143              : //! thereby breaking the cycle.
     144              : //! It also initiates draining of already existing `Handle`s by
     145              : //! poisoning things so that no new `HandleInner`'s can be added
     146              : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
     147              : //!
     148              : //! Concurrently existing / already upgraded `Handle`s will extend the
     149              : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
     150              : //! However, since `Handle`s are short-lived and new `Handle`s are not
     151              : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
     152              : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
     153              : //!
     154              : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
     155              : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
     156              : //! they will find the inner in state `HandleInner::ShutDown` state where the
     157              : //! `Arc<GateGuard>` and Timeline has already been dropped.
     158              : //!
     159              : //! Dropping the `Cache` undoes the registration of this `Cache`'s
     160              : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
     161              : //! removes the strong ref to each of its `HandleInner`s
     162              : //! from all the `PerTimelineState`.
     163              : //!
     164              : //! # Locking Rules
     165              : //!
     166              : //! To prevent deadlocks we:
     167              : //!
     168              : //! 1. Only ever hold one of the locks at a time.
     169              : //! 2. Don't add more than one Drop impl that locks on the
     170              : //!    cycles above.
     171              : //!
     172              : //! As per (2), that impl is in `Drop for Cache`.
     173              : //!
     174              : //! # Fast Path for Shard Routing
     175              : //!
     176              : //! The `Cache` has a fast path for shard routing to avoid calling into
     177              : //! the tenant manager for every request.
     178              : //!
     179              : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
     180              : //!
     181              : //! The current implementation uses the first entry in the hash map
     182              : //! to determine the `ShardParameters` and derive the correct
     183              : //! `ShardIndex` for the requested key.
     184              : //!
     185              : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
     186              : //!
     187              : //! If the lookup is successful and the `WeakHandle` can be upgraded,
     188              : //! it's a hit.
     189              : //!
     190              : //! ## Cache invalidation
     191              : //!
     192              : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
     193              : //! The only reasons why an entry in the cache can become stale are:
     194              : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
     195              : //!    being detached, timeline or shard deleted, or pageserver is shutting down.
     196              : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
     197              : //!
     198              : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
     199              : //! timeline has shut down, and when that happens, we remove the entry from the cache.
     200              : //!
     201              : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
     202              : //! to the parent shard during a shard split. Eventually, the shard split task will
     203              : //! shut down the parent => case (1).
     204              : 
     205              : use std::collections::hash_map;
     206              : use std::collections::HashMap;
     207              : use std::sync::Arc;
     208              : use std::sync::Mutex;
     209              : use std::sync::Weak;
     210              : 
     211              : use pageserver_api::shard::ShardIdentity;
     212              : use tracing::instrument;
     213              : use tracing::trace;
     214              : use utils::id::TimelineId;
     215              : use utils::shard::ShardIndex;
     216              : use utils::shard::ShardNumber;
     217              : 
     218              : use crate::tenant::mgr::ShardSelector;
     219              : 
     220              : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
     221              : pub(crate) trait Types: Sized + std::fmt::Debug {
     222              :     type TenantManagerError: Sized + std::fmt::Debug;
     223              :     type TenantManager: TenantManager<Self> + Sized;
     224              :     type Timeline: ArcTimeline<Self> + Sized;
     225              : }
     226              : 
     227              : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
     228              : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
     229              : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
     230              : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
     231              : struct CacheId(u64);
     232              : 
     233              : impl CacheId {
     234           64 :     fn next() -> Self {
     235              :         static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
     236           64 :         let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     237           64 :         if id == 0 {
     238            0 :             panic!("CacheId::new() returned 0, overflow");
     239           64 :         }
     240           64 :         Self(id)
     241           64 :     }
     242              : }
     243              : 
     244              : /// See module-level comment.
     245              : pub(crate) struct Cache<T: Types> {
     246              :     id: CacheId,
     247              :     map: Map<T>,
     248              : }
     249              : 
     250              : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
     251              : 
     252              : impl<T: Types> Default for Cache<T> {
     253           64 :     fn default() -> Self {
     254           64 :         Self {
     255           64 :             id: CacheId::next(),
     256           64 :             map: Default::default(),
     257           64 :         }
     258           64 :     }
     259              : }
     260              : 
     261              : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
     262              : pub(crate) struct ShardTimelineId {
     263              :     pub(crate) shard_index: ShardIndex,
     264              :     pub(crate) timeline_id: TimelineId,
     265              : }
     266              : 
     267              : /// See module-level comment.
     268              : pub(crate) struct Handle<T: Types> {
     269              :     timeline: Arc<T::Timeline>,
     270              :     #[allow(dead_code)] // the field exists to keep the gate open
     271              :     gate_guard: Arc<utils::sync::gate::GateGuard>,
     272              :     inner: Arc<Mutex<HandleInner<T>>>,
     273              : }
     274              : pub(crate) struct WeakHandle<T: Types> {
     275              :     inner: Weak<Mutex<HandleInner<T>>>,
     276              : }
     277              : enum HandleInner<T: Types> {
     278              :     KeepingTimelineGateOpen {
     279              :         #[allow(dead_code)]
     280              :         gate_guard: Arc<utils::sync::gate::GateGuard>,
     281              :         timeline: Arc<T::Timeline>,
     282              :     },
     283              :     ShutDown,
     284              : }
     285              : 
     286              : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
     287              : ///
     288              : /// See module-level comment for details.
     289              : pub struct PerTimelineState<T: Types> {
     290              :     // None = shutting down
     291              :     #[allow(clippy::type_complexity)]
     292              :     handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
     293              : }
     294              : 
     295              : impl<T: Types> Default for PerTimelineState<T> {
     296          936 :     fn default() -> Self {
     297          936 :         Self {
     298          936 :             handles: Mutex::new(Some(Default::default())),
     299          936 :         }
     300          936 :     }
     301              : }
     302              : 
     303              : /// Abstract view of [`crate::tenant::mgr`], for testability.
     304              : pub(crate) trait TenantManager<T: Types> {
     305              :     /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
     306              :     /// Errors are returned as [`GetError::TenantManager`].
     307              :     async fn resolve(
     308              :         &self,
     309              :         timeline_id: TimelineId,
     310              :         shard_selector: ShardSelector,
     311              :     ) -> Result<T::Timeline, T::TenantManagerError>;
     312              : }
     313              : 
     314              : /// Abstract view of an [`Arc<Timeline>`], for testability.
     315              : pub(crate) trait ArcTimeline<T: Types>: Clone {
     316              :     fn gate(&self) -> &utils::sync::gate::Gate;
     317              :     fn shard_timeline_id(&self) -> ShardTimelineId;
     318              :     fn get_shard_identity(&self) -> &ShardIdentity;
     319              :     fn per_timeline_state(&self) -> &PerTimelineState<T>;
     320              : }
     321              : 
     322              : /// Errors returned by [`Cache::get`].
     323              : #[derive(Debug)]
     324              : pub(crate) enum GetError<T: Types> {
     325              :     TenantManager(T::TenantManagerError),
     326              :     TimelineGateClosed,
     327              :     PerTimelineStateShutDown,
     328              : }
     329              : 
     330              : /// Internal type used in [`Cache::get`].
     331              : enum RoutingResult<T: Types> {
     332              :     FastPath(Handle<T>),
     333              :     SlowPath(ShardTimelineId),
     334              :     NeedConsultTenantManager,
     335              : }
     336              : 
     337              : impl<T: Types> Cache<T> {
     338              :     /// See module-level comment for details.
     339              :     ///
     340              :     /// Does NOT check for the shutdown state of [`Types::Timeline`].
     341              :     /// Instead, the methods of [`Types::Timeline`] that are invoked through
     342              :     /// the [`Handle`] are responsible for checking these conditions
     343              :     /// and if so, return an error that causes the page service to
     344              :     /// close the connection.
     345              :     #[instrument(level = "trace", skip_all)]
     346              :     pub(crate) async fn get(
     347              :         &mut self,
     348              :         timeline_id: TimelineId,
     349              :         shard_selector: ShardSelector,
     350              :         tenant_manager: &T::TenantManager,
     351              :     ) -> Result<Handle<T>, GetError<T>> {
     352              :         // terminates because when every iteration we remove an element from the map
     353              :         let miss: ShardSelector = loop {
     354              :             let routing_state = self.shard_routing(timeline_id, shard_selector);
     355              :             match routing_state {
     356              :                 RoutingResult::FastPath(handle) => return Ok(handle),
     357              :                 RoutingResult::SlowPath(key) => match self.map.get(&key) {
     358              :                     Some(cached) => match cached.upgrade() {
     359              :                         Ok(upgraded) => return Ok(upgraded),
     360              :                         Err(HandleUpgradeError::ShutDown) => {
     361              :                             // TODO: dedup with shard_routing()
     362              :                             trace!("handle cache stale");
     363              :                             self.map.remove(&key).unwrap();
     364              :                             continue;
     365              :                         }
     366              :                     },
     367              :                     None => break ShardSelector::Known(key.shard_index),
     368              :                 },
     369              :                 RoutingResult::NeedConsultTenantManager => break shard_selector,
     370              :             }
     371              :         };
     372              :         self.get_miss(timeline_id, miss, tenant_manager).await
     373              :     }
     374              : 
     375              :     #[inline(always)]
     376          113 :     fn shard_routing(
     377          113 :         &mut self,
     378          113 :         timeline_id: TimelineId,
     379          113 :         shard_selector: ShardSelector,
     380          113 :     ) -> RoutingResult<T> {
     381              :         loop {
     382              :             // terminates because when every iteration we remove an element from the map
     383          124 :             let Some((first_key, first_handle)) = self.map.iter().next() else {
     384           76 :                 return RoutingResult::NeedConsultTenantManager;
     385              :             };
     386           48 :             let Ok(first_handle) = first_handle.upgrade() else {
     387              :                 // TODO: dedup with get()
     388           11 :                 trace!("handle cache stale");
     389           11 :                 let first_key_owned = *first_key;
     390           11 :                 self.map.remove(&first_key_owned).unwrap();
     391           11 :                 continue;
     392              :             };
     393              : 
     394           37 :             let first_handle_shard_identity = first_handle.get_shard_identity();
     395           37 :             let make_shard_index = |shard_num: ShardNumber| ShardIndex {
     396           37 :                 shard_number: shard_num,
     397           37 :                 shard_count: first_handle_shard_identity.count,
     398           37 :             };
     399              : 
     400           37 :             let need_idx = match shard_selector {
     401           37 :                 ShardSelector::Page(key) => {
     402           37 :                     make_shard_index(first_handle_shard_identity.get_shard_number(&key))
     403              :                 }
     404            0 :                 ShardSelector::Zero => make_shard_index(ShardNumber(0)),
     405            0 :                 ShardSelector::Known(shard_idx) => shard_idx,
     406              :             };
     407           37 :             let need_shard_timeline_id = ShardTimelineId {
     408           37 :                 shard_index: need_idx,
     409           37 :                 timeline_id,
     410           37 :             };
     411           37 :             let first_handle_shard_timeline_id = ShardTimelineId {
     412           37 :                 shard_index: first_handle_shard_identity.shard_index(),
     413           37 :                 timeline_id: first_handle.shard_timeline_id().timeline_id,
     414           37 :             };
     415           37 : 
     416           37 :             if need_shard_timeline_id == first_handle_shard_timeline_id {
     417           24 :                 return RoutingResult::FastPath(first_handle);
     418              :             } else {
     419           13 :                 return RoutingResult::SlowPath(need_shard_timeline_id);
     420              :             }
     421              :         }
     422          113 :     }
     423              : 
     424              :     #[instrument(level = "trace", skip_all)]
     425              :     #[inline(always)]
     426              :     async fn get_miss(
     427              :         &mut self,
     428              :         timeline_id: TimelineId,
     429              :         shard_selector: ShardSelector,
     430              :         tenant_manager: &T::TenantManager,
     431              :     ) -> Result<Handle<T>, GetError<T>> {
     432              :         match tenant_manager.resolve(timeline_id, shard_selector).await {
     433              :             Ok(timeline) => {
     434              :                 let key = timeline.shard_timeline_id();
     435              :                 match &shard_selector {
     436              :                     ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
     437              :                     ShardSelector::Page(_) => (), // gotta trust tenant_manager
     438              :                     ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
     439              :                 }
     440              : 
     441              :                 trace!("creating new HandleInner");
     442              :                 let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
     443              :                     gate_guard: Arc::new(
     444              :                         // this enter() is expensive in production code because
     445              :                         // it hits the global Arc<Timeline>::gate refcounts
     446              :                         match timeline.gate().enter() {
     447              :                             Ok(guard) => guard,
     448              :                             Err(_) => {
     449              :                                 return Err(GetError::TimelineGateClosed);
     450              :                             }
     451              :                         },
     452              :                     ),
     453              :                     // this clone is expensive in production code because
     454              :                     // it hits the global Arc<Timeline>::clone refcounts
     455              :                     timeline: Arc::new(timeline.clone()),
     456              :                 }));
     457              :                 let handle_weak = WeakHandle {
     458              :                     inner: Arc::downgrade(&handle_inner_arc),
     459              :                 };
     460              :                 let handle = handle_weak
     461              :                     .upgrade()
     462              :                     .ok()
     463              :                     .expect("we just created it and it's not linked anywhere yet");
     464              :                 {
     465              :                     let mut lock_guard = timeline
     466              :                         .per_timeline_state()
     467              :                         .handles
     468              :                         .lock()
     469              :                         .expect("mutex poisoned");
     470              :                     match &mut *lock_guard {
     471              :                         Some(per_timeline_state) => {
     472              :                             let replaced =
     473              :                                 per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
     474              :                             assert!(replaced.is_none(), "some earlier code left a stale handle");
     475              :                             match self.map.entry(key) {
     476              :                                 hash_map::Entry::Occupied(_o) => {
     477              :                                     // This cannot not happen because
     478              :                                     // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
     479              :                                     // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
     480              :                                     //    while we were waiting for the tenant manager.
     481              :                                     unreachable!()
     482              :                                 }
     483              :                                 hash_map::Entry::Vacant(v) => {
     484              :                                     v.insert(handle_weak);
     485              :                                 }
     486              :                             }
     487              :                         }
     488              :                         None => {
     489              :                             return Err(GetError::PerTimelineStateShutDown);
     490              :                         }
     491              :                     }
     492              :                 }
     493              :                 Ok(handle)
     494              :             }
     495              :             Err(e) => Err(GetError::TenantManager(e)),
     496              :         }
     497              :     }
     498              : }
     499              : 
     500              : pub(crate) enum HandleUpgradeError {
     501              :     ShutDown,
     502              : }
     503              : 
     504              : impl<T: Types> WeakHandle<T> {
     505          141 :     pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
     506          141 :         let Some(inner) = Weak::upgrade(&self.inner) else {
     507            8 :             return Err(HandleUpgradeError::ShutDown);
     508              :         };
     509          133 :         let lock_guard = inner.lock().expect("poisoned");
     510          133 :         match &*lock_guard {
     511              :             HandleInner::KeepingTimelineGateOpen {
     512          121 :                 timeline,
     513          121 :                 gate_guard,
     514          121 :             } => {
     515          121 :                 let gate_guard = Arc::clone(gate_guard);
     516          121 :                 let timeline = Arc::clone(timeline);
     517          121 :                 drop(lock_guard);
     518          121 :                 Ok(Handle {
     519          121 :                     timeline,
     520          121 :                     gate_guard,
     521          121 :                     inner,
     522          121 :                 })
     523              :             }
     524           12 :             HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
     525              :         }
     526          141 :     }
     527              : 
     528            0 :     pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
     529            0 :         Weak::ptr_eq(&self.inner, &other.inner)
     530            0 :     }
     531              : }
     532              : 
     533              : impl<T: Types> std::ops::Deref for Handle<T> {
     534              :     type Target = T::Timeline;
     535          206 :     fn deref(&self) -> &Self::Target {
     536          206 :         &self.timeline
     537          206 :     }
     538              : }
     539              : 
     540              : impl<T: Types> Handle<T> {
     541            4 :     pub(crate) fn downgrade(&self) -> WeakHandle<T> {
     542            4 :         WeakHandle {
     543            4 :             inner: Arc::downgrade(&self.inner),
     544            4 :         }
     545            4 :     }
     546              : }
     547              : 
     548              : impl<T: Types> PerTimelineState<T> {
     549              :     /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
     550              :     /// to the [`Types::Timeline`] that embeds this per-timeline state.
     551              :     /// Even if [`TenantManager::resolve`] would still resolve to it.
     552              :     ///
     553              :     /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
     554              :     /// That's ok because they're short-lived. See module-level comment for details.
     555              :     #[instrument(level = "trace", skip_all)]
     556              :     pub(super) fn shutdown(&self) {
     557              :         let handles = self
     558              :             .handles
     559              :             .lock()
     560              :             .expect("mutex poisoned")
     561              :             // NB: this .take() sets locked to None.
     562              :             // That's what makes future `Cache::get` misses fail.
     563              :             // Cache hits are taken care of below.
     564              :             .take();
     565              :         let Some(handles) = handles else {
     566              :             trace!("already shut down");
     567              :             return;
     568              :         };
     569              :         for handle_inner_arc in handles.values() {
     570              :             // Make hits fail.
     571              :             let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
     572              :             lock_guard.shutdown();
     573              :         }
     574              :         drop(handles);
     575              :     }
     576              : }
     577              : 
     578              : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
     579              : impl<T: Types> Drop for Cache<T> {
     580           64 :     fn drop(&mut self) {
     581              :         for (
     582              :             _,
     583              :             WeakHandle {
     584           64 :                 inner: handle_inner_weak,
     585              :             },
     586           64 :         ) in self.map.drain()
     587              :         {
     588           64 :             let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
     589            8 :                 continue;
     590              :             };
     591           56 :             let Some(handle_timeline) = handle_inner_arc
     592           56 :                 // locking rules: drop lock before acquiring other lock below
     593           56 :                 .lock()
     594           56 :                 .expect("poisoned")
     595           56 :                 .shutdown()
     596              :             else {
     597              :                 // Concurrent PerTimelineState::shutdown.
     598            0 :                 continue;
     599              :             };
     600              :             // Clean up per_timeline_state so the HandleInner allocation can be dropped.
     601           56 :             let per_timeline_state = handle_timeline.per_timeline_state();
     602           56 :             let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
     603           56 :             let Some(handles) = &mut *handles_lock_guard else {
     604            0 :                 continue;
     605              :             };
     606           56 :             let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
     607              :                 // Concurrent PerTimelineState::shutdown.
     608            0 :                 continue;
     609              :             };
     610           56 :             drop(handles_lock_guard); // locking rules!
     611           56 :             assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
     612              :         }
     613           64 :     }
     614              : }
     615              : 
     616              : impl<T: Types> HandleInner<T> {
     617           76 :     fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
     618           76 :         match std::mem::replace(self, HandleInner::ShutDown) {
     619           76 :             HandleInner::KeepingTimelineGateOpen { timeline, .. } => Some(timeline),
     620              :             HandleInner::ShutDown => {
     621              :                 // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
     622              :                 // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
     623              :                 // the handle lock at the same time.
     624            0 :                 None
     625              :             }
     626              :         }
     627           76 :     }
     628              : }
     629              : 
     630              : #[cfg(test)]
     631              : mod tests {
     632              :     use std::sync::Weak;
     633              : 
     634              :     use pageserver_api::{
     635              :         key::{rel_block_to_key, Key, DBDIR_KEY},
     636              :         models::ShardParameters,
     637              :         reltag::RelTag,
     638              :         shard::ShardStripeSize,
     639              :     };
     640              :     use utils::shard::ShardCount;
     641              : 
     642              :     use super::*;
     643              : 
     644              :     const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
     645              : 
     646              :     #[derive(Debug)]
     647              :     struct TestTypes;
     648              :     impl Types for TestTypes {
     649              :         type TenantManagerError = anyhow::Error;
     650              :         type TenantManager = StubManager;
     651              :         type Timeline = Arc<StubTimeline>;
     652              :     }
     653              : 
     654              :     struct StubManager {
     655              :         shards: Vec<Arc<StubTimeline>>,
     656              :     }
     657              : 
     658              :     struct StubTimeline {
     659              :         gate: utils::sync::gate::Gate,
     660              :         id: TimelineId,
     661              :         shard: ShardIdentity,
     662              :         per_timeline_state: PerTimelineState<TestTypes>,
     663              :         myself: Weak<StubTimeline>,
     664              :     }
     665              : 
     666              :     impl StubTimeline {
     667           44 :         fn getpage(&self) {
     668           44 :             // do nothing
     669           44 :         }
     670              :     }
     671              : 
     672              :     impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
     673           84 :         fn gate(&self) -> &utils::sync::gate::Gate {
     674           84 :             &self.gate
     675           84 :         }
     676              : 
     677          121 :         fn shard_timeline_id(&self) -> ShardTimelineId {
     678          121 :             ShardTimelineId {
     679          121 :                 shard_index: self.shard.shard_index(),
     680          121 :                 timeline_id: self.id,
     681          121 :             }
     682          121 :         }
     683              : 
     684           37 :         fn get_shard_identity(&self) -> &ShardIdentity {
     685           37 :             &self.shard
     686           37 :         }
     687              : 
     688          136 :         fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
     689          136 :             &self.per_timeline_state
     690          136 :         }
     691              :     }
     692              : 
     693              :     impl TenantManager<TestTypes> for StubManager {
     694           92 :         async fn resolve(
     695           92 :             &self,
     696           92 :             timeline_id: TimelineId,
     697           92 :             shard_selector: ShardSelector,
     698           92 :         ) -> anyhow::Result<Arc<StubTimeline>> {
     699          108 :             for timeline in &self.shards {
     700          100 :                 if timeline.id == timeline_id {
     701            0 :                     match &shard_selector {
     702            0 :                         ShardSelector::Zero if timeline.shard.is_shard_zero() => {
     703            0 :                             return Ok(Arc::clone(timeline));
     704              :                         }
     705            0 :                         ShardSelector::Zero => continue,
     706           76 :                         ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
     707           76 :                             return Ok(Arc::clone(timeline));
     708              :                         }
     709            0 :                         ShardSelector::Page(_) => continue,
     710           12 :                         ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
     711            8 :                             return Ok(Arc::clone(timeline));
     712              :                         }
     713            4 :                         ShardSelector::Known(_) => continue,
     714              :                     }
     715           12 :                 }
     716              :             }
     717            8 :             anyhow::bail!("not found")
     718           92 :         }
     719              :     }
     720              : 
     721              :     #[tokio::test(start_paused = true)]
     722            4 :     async fn test_timeline_shutdown() {
     723            4 :         crate::tenant::harness::setup_logging();
     724            4 : 
     725            4 :         let timeline_id = TimelineId::generate();
     726            4 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
     727            4 :             gate: Default::default(),
     728            4 :             id: timeline_id,
     729            4 :             shard: ShardIdentity::unsharded(),
     730            4 :             per_timeline_state: PerTimelineState::default(),
     731            4 :             myself: myself.clone(),
     732            4 :         });
     733            4 :         let mgr = StubManager {
     734            4 :             shards: vec![shard0.clone()],
     735            4 :         };
     736            4 :         let key = DBDIR_KEY;
     737            4 : 
     738            4 :         let mut cache = Cache::<TestTypes>::default();
     739            4 : 
     740            4 :         //
     741            4 :         // fill the cache
     742            4 :         //
     743            4 :         let handle: Handle<_> = cache
     744            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     745            4 :             .await
     746            4 :             .expect("we have the timeline");
     747            4 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     748            4 :         assert_eq!(cache.map.len(), 1);
     749            4 :         drop(handle);
     750            4 : 
     751            4 :         //
     752            4 :         // demonstrate that Handle holds up gate closure
     753            4 :         // but shutdown prevents new handles from being handed out
     754            4 :         //
     755            4 : 
     756            4 :         tokio::select! {
     757            4 :             _ = shard0.gate.close() => {
     758            4 :                 panic!("cache and per-timeline handler state keep cache open");
     759            4 :             }
     760            4 :             _ = tokio::time::sleep(FOREVER) => {
     761            4 :                 // NB: first poll of close() makes it enter closing state
     762            4 :             }
     763            4 :         }
     764            4 : 
     765            4 :         let handle = cache
     766            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     767            4 :             .await
     768            4 :             .expect("we have the timeline");
     769            4 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     770            4 : 
     771            4 :         // SHUTDOWN
     772            4 :         shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
     773            4 : 
     774            4 :         assert_eq!(
     775            4 :             cache.map.len(),
     776            4 :             1,
     777            4 :             "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
     778            4 :         );
     779            4 : 
     780            4 :         // this handle is perfectly usable
     781            4 :         handle.getpage();
     782            4 : 
     783            4 :         cache
     784            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     785            4 :             .await
     786            4 :             .err()
     787            4 :             .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
     788            4 :         assert_eq!(
     789            4 :             cache.map.len(),
     790            4 :             0,
     791            4 :             "first access after shutdown cleans up the Weak's from the cache"
     792            4 :         );
     793            4 : 
     794            4 :         tokio::select! {
     795            4 :             _ = shard0.gate.close() => {
     796            4 :                 panic!("handle is keeping gate open");
     797            4 :             }
     798            4 :             _ = tokio::time::sleep(FOREVER) => { }
     799            4 :         }
     800            4 : 
     801            4 :         drop(handle);
     802            4 : 
     803            4 :         // closing gate succeeds after dropping handle
     804            4 :         tokio::select! {
     805            4 :             _ = shard0.gate.close() => { }
     806            4 :             _ = tokio::time::sleep(FOREVER) => {
     807            4 :                 panic!("handle is dropped, no other gate holders exist")
     808            4 :             }
     809            4 :         }
     810            4 : 
     811            4 :         // map gets cleaned on next lookup
     812            4 :         cache
     813            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     814            4 :             .await
     815            4 :             .err()
     816            4 :             .expect("documented behavior: can't get new handle after shutdown");
     817            4 :         assert_eq!(cache.map.len(), 0);
     818            4 : 
     819            4 :         // ensure all refs to shard0 are gone and we're not leaking anything
     820            4 :         drop(shard0);
     821            4 :         drop(mgr);
     822            4 :     }
     823              : 
     824              :     #[tokio::test]
     825            4 :     async fn test_multiple_timelines_and_deletion() {
     826            4 :         crate::tenant::harness::setup_logging();
     827            4 : 
     828            4 :         let timeline_a = TimelineId::generate();
     829            4 :         let timeline_b = TimelineId::generate();
     830            4 :         assert_ne!(timeline_a, timeline_b);
     831            4 :         let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
     832            4 :             gate: Default::default(),
     833            4 :             id: timeline_a,
     834            4 :             shard: ShardIdentity::unsharded(),
     835            4 :             per_timeline_state: PerTimelineState::default(),
     836            4 :             myself: myself.clone(),
     837            4 :         });
     838            4 :         let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
     839            4 :             gate: Default::default(),
     840            4 :             id: timeline_b,
     841            4 :             shard: ShardIdentity::unsharded(),
     842            4 :             per_timeline_state: PerTimelineState::default(),
     843            4 :             myself: myself.clone(),
     844            4 :         });
     845            4 :         let mut mgr = StubManager {
     846            4 :             shards: vec![timeline_a.clone(), timeline_b.clone()],
     847            4 :         };
     848            4 :         let key = DBDIR_KEY;
     849            4 : 
     850            4 :         let mut cache = Cache::<TestTypes>::default();
     851            4 : 
     852            4 :         cache
     853            4 :             .get(timeline_a.id, ShardSelector::Page(key), &mgr)
     854            4 :             .await
     855            4 :             .expect("we have it");
     856            4 :         cache
     857            4 :             .get(timeline_b.id, ShardSelector::Page(key), &mgr)
     858            4 :             .await
     859            4 :             .expect("we have it");
     860            4 :         assert_eq!(cache.map.len(), 2);
     861            4 : 
     862            4 :         // delete timeline A
     863            4 :         timeline_a.per_timeline_state.shutdown();
     864            8 :         mgr.shards.retain(|t| t.id != timeline_a.id);
     865            4 :         assert!(
     866            4 :             mgr.resolve(timeline_a.id, ShardSelector::Page(key))
     867            4 :                 .await
     868            4 :                 .is_err(),
     869            4 :             "broken StubManager implementation"
     870            4 :         );
     871            4 : 
     872            4 :         assert_eq!(
     873            4 :             cache.map.len(),
     874            4 :             2,
     875            4 :             "cache still has a Weak handle to Timeline A"
     876            4 :         );
     877            4 :         cache
     878            4 :             .get(timeline_a.id, ShardSelector::Page(key), &mgr)
     879            4 :             .await
     880            4 :             .err()
     881            4 :             .expect("documented behavior: can't get new handle after shutdown");
     882            4 :         assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
     883            4 : 
     884            4 :         cache
     885            4 :             .get(timeline_b.id, ShardSelector::Page(key), &mgr)
     886            4 :             .await
     887            4 :             .expect("we still have it");
     888            4 :     }
     889              : 
     890           28 :     fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
     891           28 :         rel_block_to_key(
     892           28 :             RelTag {
     893           28 :                 spcnode: 1663,
     894           28 :                 dbnode: 208101,
     895           28 :                 relnode: 2620,
     896           28 :                 forknum: 0,
     897           28 :             },
     898           28 :             shard.0 as u32 * params.stripe_size.0,
     899           28 :         )
     900           28 :     }
     901              : 
     902              :     #[tokio::test(start_paused = true)]
     903            4 :     async fn test_shard_split() {
     904            4 :         crate::tenant::harness::setup_logging();
     905            4 :         let timeline_id = TimelineId::generate();
     906            4 :         let parent = Arc::new_cyclic(|myself| StubTimeline {
     907            4 :             gate: Default::default(),
     908            4 :             id: timeline_id,
     909            4 :             shard: ShardIdentity::unsharded(),
     910            4 :             per_timeline_state: PerTimelineState::default(),
     911            4 :             myself: myself.clone(),
     912            4 :         });
     913            4 :         let child_params = ShardParameters {
     914            4 :             count: ShardCount(2),
     915            4 :             stripe_size: ShardStripeSize::default(),
     916            4 :         };
     917            4 :         let child0 = Arc::new_cyclic(|myself| StubTimeline {
     918            4 :             gate: Default::default(),
     919            4 :             id: timeline_id,
     920            4 :             shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
     921            4 :             per_timeline_state: PerTimelineState::default(),
     922            4 :             myself: myself.clone(),
     923            4 :         });
     924            4 :         let child1 = Arc::new_cyclic(|myself| StubTimeline {
     925            4 :             gate: Default::default(),
     926            4 :             id: timeline_id,
     927            4 :             shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
     928            4 :             per_timeline_state: PerTimelineState::default(),
     929            4 :             myself: myself.clone(),
     930            4 :         });
     931            4 :         let child_shards_by_shard_number = [child0.clone(), child1.clone()];
     932            4 : 
     933            4 :         let mut cache = Cache::<TestTypes>::default();
     934            4 : 
     935            4 :         // fill the cache with the parent
     936           12 :         for i in 0..2 {
     937            8 :             let handle = cache
     938            8 :                 .get(
     939            8 :                     timeline_id,
     940            8 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
     941            8 :                     &StubManager {
     942            8 :                         shards: vec![parent.clone()],
     943            8 :                     },
     944            8 :                 )
     945            8 :                 .await
     946            8 :                 .expect("we have it");
     947            8 :             assert!(
     948            8 :                 Weak::ptr_eq(&handle.myself, &parent.myself),
     949            4 :                 "mgr returns parent first"
     950            4 :             );
     951            8 :             drop(handle);
     952            4 :         }
     953            4 : 
     954            4 :         //
     955            4 :         // SHARD SPLIT: tenant manager changes, but the cache isn't informed
     956            4 :         //
     957            4 : 
     958            4 :         // while we haven't shut down the parent, the cache will return the cached parent, even
     959            4 :         // if the tenant manager returns the child
     960           12 :         for i in 0..2 {
     961            8 :             let handle = cache
     962            8 :                 .get(
     963            8 :                     timeline_id,
     964            8 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
     965            8 :                     &StubManager {
     966            8 :                         shards: vec![], // doesn't matter what's in here, the cache is fully loaded
     967            8 :                     },
     968            8 :                 )
     969            8 :                 .await
     970            8 :                 .expect("we have it");
     971            8 :             assert!(
     972            8 :                 Weak::ptr_eq(&handle.myself, &parent.myself),
     973            4 :                 "mgr returns parent"
     974            4 :             );
     975            8 :             drop(handle);
     976            4 :         }
     977            4 : 
     978            4 :         let parent_handle = cache
     979            4 :             .get(
     980            4 :                 timeline_id,
     981            4 :                 ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
     982            4 :                 &StubManager {
     983            4 :                     shards: vec![parent.clone()],
     984            4 :                 },
     985            4 :             )
     986            4 :             .await
     987            4 :             .expect("we have it");
     988            4 :         assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
     989            4 : 
     990            4 :         // invalidate the cache
     991            4 :         parent.per_timeline_state.shutdown();
     992            4 : 
     993            4 :         // the cache will now return the child, even though the parent handle still exists
     994           12 :         for i in 0..2 {
     995            8 :             let handle = cache
     996            8 :                 .get(
     997            8 :                     timeline_id,
     998            8 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
     999            8 :                     &StubManager {
    1000            8 :                         shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
    1001            8 :                     },
    1002            8 :                 )
    1003            8 :                 .await
    1004            8 :                 .expect("we have it");
    1005            8 :             assert!(
    1006            8 :                 Weak::ptr_eq(
    1007            8 :                     &handle.myself,
    1008            8 :                     &child_shards_by_shard_number[i as usize].myself
    1009            8 :                 ),
    1010            4 :                 "mgr returns child"
    1011            4 :             );
    1012            8 :             drop(handle);
    1013            4 :         }
    1014            4 : 
    1015            4 :         // all the while the parent handle kept the parent gate open
    1016            4 :         tokio::select! {
    1017            4 :             _ = parent_handle.gate.close() => {
    1018            4 :                 panic!("parent handle is keeping gate open");
    1019            4 :             }
    1020            4 :             _ = tokio::time::sleep(FOREVER) => { }
    1021            4 :         }
    1022            4 :         drop(parent_handle);
    1023            4 :         tokio::select! {
    1024            4 :             _ = parent.gate.close() => { }
    1025            4 :             _ = tokio::time::sleep(FOREVER) => {
    1026            4 :                 panic!("parent handle is dropped, no other gate holders exist")
    1027            4 :             }
    1028            4 :         }
    1029            4 :     }
    1030              : 
    1031              :     #[tokio::test(start_paused = true)]
    1032            4 :     async fn test_connection_handler_exit() {
    1033            4 :         crate::tenant::harness::setup_logging();
    1034            4 :         let timeline_id = TimelineId::generate();
    1035            4 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1036            4 :             gate: Default::default(),
    1037            4 :             id: timeline_id,
    1038            4 :             shard: ShardIdentity::unsharded(),
    1039            4 :             per_timeline_state: PerTimelineState::default(),
    1040            4 :             myself: myself.clone(),
    1041            4 :         });
    1042            4 :         let mgr = StubManager {
    1043            4 :             shards: vec![shard0.clone()],
    1044            4 :         };
    1045            4 :         let key = DBDIR_KEY;
    1046            4 : 
    1047            4 :         // Simulate 10 connections that's opened, used, and closed
    1048            4 :         let mut used_handles = vec![];
    1049           44 :         for _ in 0..10 {
    1050           40 :             let mut cache = Cache::<TestTypes>::default();
    1051           40 :             let handle = {
    1052           40 :                 let handle = cache
    1053           40 :                     .get(timeline_id, ShardSelector::Page(key), &mgr)
    1054           40 :                     .await
    1055           40 :                     .expect("we have the timeline");
    1056           40 :                 assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
    1057           40 :                 handle
    1058           40 :             };
    1059           40 :             handle.getpage();
    1060           40 :             used_handles.push(Arc::downgrade(&handle.timeline));
    1061            4 :         }
    1062            4 : 
    1063            4 :         // No handles exist, thus gates are closed and don't require shutdown.
    1064            4 :         // Thus the gate should close immediately, even without shutdown.
    1065            4 :         tokio::select! {
    1066            4 :             _ = shard0.gate.close() => { }
    1067            4 :             _ = tokio::time::sleep(FOREVER) => {
    1068            4 :                 panic!("handle is dropped, no other gate holders exist")
    1069            4 :             }
    1070            4 :         }
    1071            4 :     }
    1072              : 
    1073              :     #[tokio::test(start_paused = true)]
    1074            4 :     async fn test_weak_handles() {
    1075            4 :         crate::tenant::harness::setup_logging();
    1076            4 :         let timeline_id = TimelineId::generate();
    1077            4 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1078            4 :             gate: Default::default(),
    1079            4 :             id: timeline_id,
    1080            4 :             shard: ShardIdentity::unsharded(),
    1081            4 :             per_timeline_state: PerTimelineState::default(),
    1082            4 :             myself: myself.clone(),
    1083            4 :         });
    1084            4 :         let mgr = StubManager {
    1085            4 :             shards: vec![shard0.clone()],
    1086            4 :         };
    1087            4 : 
    1088            4 :         let refcount_start = Arc::strong_count(&shard0);
    1089            4 : 
    1090            4 :         let key = DBDIR_KEY;
    1091            4 : 
    1092            4 :         let mut cache = Cache::<TestTypes>::default();
    1093            4 : 
    1094            4 :         let handle = cache
    1095            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
    1096            4 :             .await
    1097            4 :             .expect("we have the timeline");
    1098            4 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
    1099            4 : 
    1100            4 :         let weak_handle = handle.downgrade();
    1101            4 : 
    1102            4 :         drop(handle);
    1103            4 : 
    1104            4 :         let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
    1105            4 : 
    1106            4 :         // Start shutdown
    1107            4 :         shard0.per_timeline_state.shutdown();
    1108            4 : 
    1109            4 :         // Upgrades during shutdown don't work, even if upgraded_handle exists.
    1110            4 :         weak_handle
    1111            4 :             .upgrade()
    1112            4 :             .err()
    1113            4 :             .expect("can't upgrade weak handle as soon as shutdown started");
    1114            4 : 
    1115            4 :         // But upgraded_handle is still alive, so the gate won't close.
    1116            4 :         tokio::select! {
    1117            4 :             _ = shard0.gate.close() => {
    1118            4 :                 panic!("handle is keeping gate open");
    1119            4 :             }
    1120            4 :             _ = tokio::time::sleep(FOREVER) => { }
    1121            4 :         }
    1122            4 : 
    1123            4 :         // Drop the last handle.
    1124            4 :         drop(upgraded_handle);
    1125            4 : 
    1126            4 :         // The gate should close now, despite there still being a weak_handle.
    1127            4 :         tokio::select! {
    1128            4 :             _ = shard0.gate.close() => { }
    1129            4 :             _ = tokio::time::sleep(FOREVER) => {
    1130            4 :                 panic!("only strong handle is dropped and we shut down per-timeline-state")
    1131            4 :             }
    1132            4 :         }
    1133            4 : 
    1134            4 :         // The weak handle still can't be upgraded.
    1135            4 :         weak_handle
    1136            4 :             .upgrade()
    1137            4 :             .err()
    1138            4 :             .expect("still shouldn't be able to upgrade the weak handle");
    1139            4 : 
    1140            4 :         // There should be no strong references to the timeline object except the one on "stack".
    1141            4 :         assert_eq!(Arc::strong_count(&shard0), refcount_start);
    1142            4 :     }
    1143              : 
    1144              :     #[tokio::test(start_paused = true)]
    1145            4 :     async fn test_reference_cycle_broken_when_cache_is_dropped() {
    1146            4 :         crate::tenant::harness::setup_logging();
    1147            4 :         let timeline_id = TimelineId::generate();
    1148            4 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1149            4 :             gate: Default::default(),
    1150            4 :             id: timeline_id,
    1151            4 :             shard: ShardIdentity::unsharded(),
    1152            4 :             per_timeline_state: PerTimelineState::default(),
    1153            4 :             myself: myself.clone(),
    1154            4 :         });
    1155            4 :         let mgr = StubManager {
    1156            4 :             shards: vec![shard0.clone()],
    1157            4 :         };
    1158            4 :         let key = DBDIR_KEY;
    1159            4 : 
    1160            4 :         let mut cache = Cache::<TestTypes>::default();
    1161            4 : 
    1162            4 :         // helper to check if a handle is referenced by per_timeline_state
    1163            8 :         let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
    1164            8 :             let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
    1165            8 :             let per_timeline_state = per_timeline_state.as_ref().unwrap();
    1166            8 :             per_timeline_state
    1167            8 :                 .values()
    1168            8 :                 .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
    1169            8 :         };
    1170            4 : 
    1171            4 :         // Fill the cache.
    1172            4 :         let handle = cache
    1173            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
    1174            4 :             .await
    1175            4 :             .expect("we have the timeline");
    1176            4 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
    1177            4 :         let handle_inner_weak = Arc::downgrade(&handle.inner);
    1178            4 :         assert!(
    1179            4 :             per_timeline_state_refs_handle(&handle_inner_weak),
    1180            4 :             "we still hold `handle` _and_ haven't dropped `cache` yet"
    1181            4 :         );
    1182            4 : 
    1183            4 :         // Drop the cache.
    1184            4 :         drop(cache);
    1185            4 : 
    1186            4 :         assert!(
    1187            4 :             !(per_timeline_state_refs_handle(&handle_inner_weak)),
    1188            4 :             "nothing should reference the handle allocation anymore"
    1189            4 :         );
    1190            4 :         assert!(
    1191            4 :             Weak::upgrade(&handle_inner_weak).is_some(),
    1192            4 :             "the local `handle` still keeps the allocation alive"
    1193            4 :         );
    1194            4 :         // but obviously the cache is gone so no new allocations can be handed out.
    1195            4 : 
    1196            4 :         // Drop handle.
    1197            4 :         drop(handle);
    1198            4 :         assert!(
    1199            4 :             Weak::upgrade(&handle_inner_weak).is_none(),
    1200            4 :             "the local `handle` is dropped, so the allocation should be dropped by now"
    1201            4 :         );
    1202            4 :     }
    1203              : 
    1204              :     #[tokio::test(start_paused = true)]
    1205            4 :     async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
    1206            4 :         crate::tenant::harness::setup_logging();
    1207            4 :         let timeline_id = TimelineId::generate();
    1208            4 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1209            4 :             gate: Default::default(),
    1210            4 :             id: timeline_id,
    1211            4 :             shard: ShardIdentity::unsharded(),
    1212            4 :             per_timeline_state: PerTimelineState::default(),
    1213            4 :             myself: myself.clone(),
    1214            4 :         });
    1215            4 :         let mgr = StubManager {
    1216            4 :             shards: vec![shard0.clone()],
    1217            4 :         };
    1218            4 :         let key = DBDIR_KEY;
    1219            4 : 
    1220            4 :         let mut cache = Cache::<TestTypes>::default();
    1221            4 :         let handle = cache
    1222            4 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
    1223            4 :             .await
    1224            4 :             .expect("we have the timeline");
    1225            4 :         // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
    1226            4 :         let handle_inner_weak = Arc::downgrade(&handle.inner);
    1227            4 : 
    1228            4 :         // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
    1229            4 :         drop(handle);
    1230            4 :         assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
    1231            4 : 
    1232            4 :         // Shutdown the per_timeline_state.
    1233            4 :         shard0.per_timeline_state.shutdown();
    1234            4 :         assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
    1235            4 : 
    1236            4 :         // cache only contains Weak's, so, it can outlive the per_timeline_state without
    1237            4 :         // Drop explicitly solely to make this point.
    1238            4 :         drop(cache);
    1239            4 :     }
    1240              : }
        

Generated by: LCOV version 2.1-beta