LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - handle.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 93.9 % 523 491
Test Date: 2025-07-16 12:29:03 Functions: 83.9 % 56 47

            Line data    Source code
       1              : //! A cache for [`crate::tenant::mgr`]+`Tenant::get_timeline`+`Timeline::gate.enter()`.
       2              : //!
       3              : //! # Motivation
       4              : //!
       5              : //! On a single page service connection, we're typically serving a single TenantTimelineId.
       6              : //!
       7              : //! Without sharding, there is a single Timeline object to which we dispatch
       8              : //! all requests. For example, a getpage request gets dispatched to the
       9              : //! Timeline::get method of the Timeline object that represents the
      10              : //! (tenant,timeline) of that connection.
      11              : //!
      12              : //! With sharding, for each request that comes in on the connection,
      13              : //! we first have to perform shard routing based on the requested key (=~ page number).
      14              : //! The result of shard routing is a Timeline object.
      15              : //! We then dispatch the request to that Timeline object.
      16              : //!
      17              : //! Regardless of whether the tenant is sharded or not, we want to ensure that
      18              : //! we hold the Timeline gate open while we're invoking the method on the
      19              : //! Timeline object.
      20              : //!
      21              : //! We want to avoid the overhead of doing, for each incoming request,
      22              : //! - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
      23              : //! - cloning the `Arc<Timeline>` out of the tenant manager so we can
      24              : //!   release the mgr rwlock before doing any request processing work
      25              : //! - re-entering the Timeline gate for each Timeline method invocation.
      26              : //!
      27              : //! Regardless of how we accomplish the above, it should not
      28              : //! prevent the Timeline from shutting down promptly.
      29              : //!
      30              : //!
      31              : //! # Design
      32              : //!
      33              : //! ## Data Structures
      34              : //!
      35              : //! There are two concepts expressed as associated types in the `Types` trait:
      36              : //! - `TenantManager`: the thing that performs the expensive work. It produces
      37              : //!   a `Timeline` object, which is the other associated type.
      38              : //! - `Timeline`: the item that we cache for fast (TenantTimelineId,ShardSelector) lookup.
      39              : //!
      40              : //! There are three user-facing data structures exposed by this module:
      41              : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
      42              : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
      43              : //! - `Handle`: a smart pointer that derefs to the Types::Timeline.
      44              : //! - `WeakHandle`: downgrade of a `Handle` that does not keep the gate open, but allows
      45              : //!   trying to ugprade back to a `Handle`. If successful, a re-upgraded Handle will always
      46              : //!   point to the same cached `Types::Timeline`. Upgrades never invoke the `TenantManager`.
      47              : //!
      48              : //! Internally, there is 0 or 1 `HandleInner` per `(Cache,Timeline)`.
      49              : //! Since Cache:Connection is 1:1, there is 0 or 1 `HandleInner` per `(Connection,Timeline)`.
      50              : //!
      51              : //! The `HandleInner`  is allocated as a `Arc<Mutex<HandleInner>>` and
      52              : //! referenced weakly and strongly from various places which we are now illustrating.
      53              : //! For brevity, we will omit the `Arc<Mutex<>>` part in the following and instead
      54              : //! use `strong ref` and `weak ref` when referring to the `Arc<Mutex<HandleInner>>`
      55              : //! or `Weak<Mutex<HandleInner>>`, respectively.
      56              : //!
      57              : //! - The `Handle` is a strong ref.
      58              : //! - The `WeakHandle` is a weak ref.
      59              : //! - The `PerTimelineState` contains a `HashMap<CacheId, strong ref>`.
      60              : //! - The `Cache` is a `HashMap<unique identifier for the shard, weak ref>`.
      61              : //!
      62              : //! Lifetimes:
      63              : //! - `WeakHandle` and `Handle`: single pagestream request.
      64              : //! - `Cache`: single page service connection.
      65              : //! - `PerTimelineState`:  lifetime of the Timeline object (i.e., i.e., till `Timeline::shutdown`).
      66              : //!
      67              : //! ## Request Handling Flow (= filling and using the `Cache``)
      68              : //!
      69              : //! To dispatch a request, the page service connection calls `Cache::get`.
      70              : //!
      71              : //! A cache miss means we call Types::TenantManager::resolve for shard routing,
      72              : //! cloning the `Arc<Timeline>` out of it, and entering the gate. The result of
      73              : //! resolve() is the object we want to cache, and return `Handle`s to for subseqent `Cache::get` calls.
      74              : //!
      75              : //! We wrap the object returned from resolve() in an `Arc` and store that inside the
      76              : //! `Arc<Mutex<HandleInner>>>`. A weak ref to the HandleInner is stored in the `Cache`
      77              : //! and a strong ref in the `PerTimelineState`.
      78              : //! Another strong ref is returned wrapped in a `Handle`.
      79              : //!
      80              : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
      81              : //! and find the weak ref in the cache.
      82              : //! We upgrade the weak ref to a strong ref and return it wrapped in a `Handle`.
      83              : //!
      84              : //! The pagestream processing is pipelined and involves a batching step.
      85              : //! While a request is batching, the `Handle` is downgraded to a `WeakHandle`.
      86              : //! When the batch is ready to be executed, the `WeakHandle` is upgraded back to a `Handle`
      87              : //! and the request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
      88              : //! It then drops the `Handle`, and thus the `Arc<Mutex<HandleInner>>` inside it.
      89              : //!
      90              : //! # Performance
      91              : //!
      92              : //! Remember from the introductory section:
      93              : //!
      94              : //! > We want to avoid the overhead of doing, for each incoming request,
      95              : //! > - tenant manager lookup (global rwlock + btreemap lookup for shard routing)
      96              : //! > - cloning the `Arc<Timeline>` out of the tenant manager so we can
      97              : //! >   release the mgr rwlock before doing any request processing work
      98              : //! > - re-entering the Timeline gate for each Timeline method invocation.
      99              : //!
     100              : //! All of these boil down to some state that is either globally shared among all shards
     101              : //! or state shared among all tasks that serve a particular timeline.
     102              : //! It is either protected by RwLock or manipulated via atomics.
     103              : //! Even atomics are costly when shared across multiple cores.
     104              : //! So, we want to avoid any permanent need for coordination between page_service tasks.
     105              : //!
     106              : //! The solution is to add indirection: we wrap the Types::Timeline object that is
     107              : //! returned by Types::TenantManager into an Arc that is rivate to the `HandleInner`
     108              : //! and hence to the single Cache / page_service connection.
     109              : //! (Review the "Data Structures" section if that is unclear to you.)
     110              : //!
     111              : //!
     112              : //! When upgrading a `WeakHandle`, we upgrade its weak to a strong ref (of the `Mutex<HandleInner>`),
     113              : //! lock the mutex, take out a clone of the `Arc<Types::Timeline>`, and drop the Mutex.
     114              : //! The Mutex is not contended because it is private to the connection.
     115              : //! And again, the  `Arc<Types::Timeline>` clone is cheap because that wrapper
     116              : //! Arc's refcounts are private to the connection.
     117              : //!
     118              : //! Downgrading drops these two Arcs, which again, manipulates refcounts that are private to the connection.
     119              : //!
     120              : //!
     121              : //! # Shutdown
     122              : //!
     123              : //! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
     124              : //!
     125              : //! ```text
     126              : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> Timeline
     127              : //! ```
     128              : //!
     129              : //! Further, there is this cycle:
     130              : //!
     131              : //! ```text
     132              : //! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Types::Timeline --strong--> GateGuard --keepalive--> Timeline
     133              : //! ```
     134              : //!
     135              : //! The former cycle is a memory leak if not broken.
     136              : //! The latter cycle further prevents the Timeline from shutting down
     137              : //! because we certainly won't drop the Timeline while the GateGuard is alive.
     138              : //! Preventing shutdown is the whole point of this handle/cache system,
     139              : //! but when the Timeline needs to shut down, we need to break the cycle.
     140              : //!
     141              : //! The cycle is broken by either
     142              : //! - Timeline shutdown (=> `PerTimelineState::shutdown`)
     143              : //! - Connection shutdown (=> dropping the `Cache`).
     144              : //!
     145              : //! Both transition the `HandleInner` from [`HandleInner::Open`] to
     146              : //! [`HandleInner::ShutDown`], which drops the only long-lived
     147              : //! `Arc<Types::Timeline>`. Once the last short-lived Arc<Types::Timeline>
     148              : //! is dropped, the `Types::Timeline` gets dropped and thereby
     149              : //! the `GateGuard` and the `Arc<Timeline>` that it stores,
     150              : //! thereby breaking both cycles.
     151              : //!
     152              : //! `PerTimelineState::shutdown` drops all the `HandleInners` it contains,
     153              : //! thereby breaking the cycle.
     154              : //! It also initiates draining of already existing `Handle`s by
     155              : //! poisoning things so that no new `HandleInner`'s can be added
     156              : //! to the `PerTimelineState`, which will make subsequent `Cache::get` fail.
     157              : //!
     158              : //! Concurrently existing / already upgraded `Handle`s will extend the
     159              : //! lifetime of the `Arc<Mutex<HandleInner>>` and hence cycles.
     160              : //! However, since `Handle`s are short-lived and new `Handle`s are not
     161              : //! handed out from `Cache::get` or `WeakHandle::upgrade` after
     162              : //! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
     163              : //!
     164              : //! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
     165              : //! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
     166              : //! they will find the inner in state `HandleInner::ShutDown` state where the
     167              : //! `Arc<GateGuard>` and Timeline has already been dropped.
     168              : //!
     169              : //! Dropping the `Cache` undoes the registration of this `Cache`'s
     170              : //! `HandleInner`s from all the `PerTimelineState`s, i.e., it
     171              : //! removes the strong ref to each of its `HandleInner`s
     172              : //! from all the `PerTimelineState`.
     173              : //!
     174              : //! # Locking Rules
     175              : //!
     176              : //! To prevent deadlocks we:
     177              : //!
     178              : //! 1. Only ever hold one of the locks at a time.
     179              : //! 2. Don't add more than one Drop impl that locks on the
     180              : //!    cycles above.
     181              : //!
     182              : //! As per (2), that impl is in `Drop for Cache`.
     183              : //!
     184              : //! # Fast Path for Shard Routing
     185              : //!
     186              : //! The `Cache` has a fast path for shard routing to avoid calling into
     187              : //! the tenant manager for every request.
     188              : //!
     189              : //! The `Cache` maintains a hash map of `ShardTimelineId` to `WeakHandle`s.
     190              : //!
     191              : //! The current implementation uses the first entry in the hash map
     192              : //! to determine the `ShardParameters` and derive the correct
     193              : //! `ShardIndex` for the requested key.
     194              : //!
     195              : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
     196              : //!
     197              : //! If the lookup is successful and the `WeakHandle` can be upgraded,
     198              : //! it's a hit.
     199              : //!
     200              : //! ## Cache invalidation
     201              : //!
     202              : //! The insight is that cache invalidation is sufficient and most efficiently if done lazily.
     203              : //! The only reasons why an entry in the cache can become stale are:
     204              : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
     205              : //!    being detached, timeline or shard deleted, or pageserver is shutting down.
     206              : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
     207              : //!
     208              : //! Regarding (1), we will eventually fail to upgrade the `WeakHandle` once the
     209              : //! timeline has shut down, and when that happens, we remove the entry from the cache.
     210              : //!
     211              : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
     212              : //! to the parent shard during a shard split. Eventually, the shard split task will
     213              : //! shut down the parent => case (1).
     214              : 
     215              : use std::collections::HashMap;
     216              : use std::collections::hash_map;
     217              : use std::sync::Arc;
     218              : use std::sync::Mutex;
     219              : use std::sync::Weak;
     220              : use std::time::Duration;
     221              : 
     222              : use pageserver_api::shard::ShardIdentity;
     223              : use tracing::{instrument, trace};
     224              : use utils::id::TimelineId;
     225              : use utils::shard::{ShardIndex, ShardNumber};
     226              : 
     227              : use crate::tenant::mgr::ShardSelector;
     228              : 
     229              : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
     230              : pub(crate) trait Types: Sized + std::fmt::Debug {
     231              :     type TenantManagerError: Sized + std::fmt::Debug;
     232              :     type TenantManager: TenantManager<Self> + Sized;
     233              :     type Timeline: Timeline<Self> + Sized;
     234              : }
     235              : 
     236              : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
     237              : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
     238              : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
     239              : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
     240              : struct CacheId(u64);
     241              : 
     242              : impl CacheId {
     243           16 :     fn next() -> Self {
     244              :         static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
     245           16 :         let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     246           16 :         if id == 0 {
     247            0 :             panic!("CacheId::new() returned 0, overflow");
     248           16 :         }
     249           16 :         Self(id)
     250           16 :     }
     251              : }
     252              : 
     253              : /// See module-level comment.
     254              : pub(crate) struct Cache<T: Types> {
     255              :     id: CacheId,
     256              :     map: Map<T>,
     257              : }
     258              : 
     259              : type Map<T> = HashMap<ShardTimelineId, WeakHandle<T>>;
     260              : 
     261              : impl<T: Types> Default for Cache<T> {
     262           16 :     fn default() -> Self {
     263           16 :         Self {
     264           16 :             id: CacheId::next(),
     265           16 :             map: Default::default(),
     266           16 :         }
     267           16 :     }
     268              : }
     269              : 
     270              : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
     271              : pub(crate) struct ShardTimelineId {
     272              :     pub(crate) shard_index: ShardIndex,
     273              :     pub(crate) timeline_id: TimelineId,
     274              : }
     275              : 
     276              : /// See module-level comment.
     277              : pub(crate) struct Handle<T: Types> {
     278              :     inner: Arc<Mutex<HandleInner<T>>>,
     279              :     open: Arc<T::Timeline>,
     280              : }
     281              : pub(crate) struct WeakHandle<T: Types> {
     282              :     inner: Weak<Mutex<HandleInner<T>>>,
     283              : }
     284              : 
     285              : enum HandleInner<T: Types> {
     286              :     Open(Arc<T::Timeline>),
     287              :     ShutDown,
     288              : }
     289              : 
     290              : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
     291              : ///
     292              : /// See module-level comment for details.
     293              : pub struct PerTimelineState<T: Types> {
     294              :     // None = shutting down
     295              :     #[allow(clippy::type_complexity)]
     296              :     handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
     297              : }
     298              : 
     299              : impl<T: Types> Default for PerTimelineState<T> {
     300          245 :     fn default() -> Self {
     301          245 :         Self {
     302          245 :             handles: Mutex::new(Some(Default::default())),
     303          245 :         }
     304          245 :     }
     305              : }
     306              : 
     307              : /// Abstract view of [`crate::tenant::mgr`], for testability.
     308              : pub(crate) trait TenantManager<T: Types> {
     309              :     /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
     310              :     /// Errors are returned as [`GetError::TenantManager`].
     311              :     async fn resolve(
     312              :         &self,
     313              :         timeline_id: TimelineId,
     314              :         shard_selector: ShardSelector,
     315              :     ) -> Result<T::Timeline, T::TenantManagerError>;
     316              : }
     317              : 
     318              : /// Abstract view of an [`Arc<Timeline>`], for testability.
     319              : pub(crate) trait Timeline<T: Types> {
     320              :     fn shard_timeline_id(&self) -> ShardTimelineId;
     321              :     fn get_shard_identity(&self) -> &ShardIdentity;
     322              :     fn per_timeline_state(&self) -> &PerTimelineState<T>;
     323              : }
     324              : 
     325              : /// Errors returned by [`Cache::get`].
     326              : #[derive(Debug)]
     327              : pub(crate) enum GetError<T: Types> {
     328              :     TenantManager(T::TenantManagerError),
     329              :     PerTimelineStateShutDown,
     330              : }
     331              : 
     332              : /// Internal type used in [`Cache::get`].
     333              : enum RoutingResult<T: Types> {
     334              :     FastPath(Handle<T>),
     335              :     SlowPath(ShardTimelineId),
     336              :     NeedConsultTenantManager,
     337              : }
     338              : 
     339              : impl<T: Types> Cache<T> {
     340              :     /* BEGIN_HADRON */
     341              :     /// A wrapper of do_get to resolve the tenant shard for a get page request.
     342              :     #[instrument(level = "trace", skip_all)]
     343              :     pub(crate) async fn get(
     344              :         &mut self,
     345              :         timeline_id: TimelineId,
     346              :         shard_selector: ShardSelector,
     347              :         tenant_manager: &T::TenantManager,
     348              :     ) -> Result<Handle<T>, GetError<T>> {
     349              :         const GET_MAX_RETRIES: usize = 10;
     350              :         const RETRY_BACKOFF: Duration = Duration::from_millis(100);
     351              :         let mut attempt = 0;
     352              :         loop {
     353              :             attempt += 1;
     354              :             match self
     355              :                 .do_get(timeline_id, shard_selector, tenant_manager)
     356              :                 .await
     357              :             {
     358              :                 Ok(handle) => return Ok(handle),
     359              :                 Err(e) => {
     360              :                     // Retry on tenant manager error to handle tenant split more gracefully
     361              :                     if attempt < GET_MAX_RETRIES {
     362              :                         tokio::time::sleep(RETRY_BACKOFF).await;
     363              :                         continue;
     364              :                     } else {
     365              :                         tracing::warn!(
     366              :                             "Failed to resolve tenant shard after {} attempts: {:?}",
     367              :                             GET_MAX_RETRIES,
     368              :                             e
     369              :                         );
     370              :                         return Err(e);
     371              :                     }
     372              :                 }
     373              :             }
     374              :         }
     375              :     }
     376              :     /* END_HADRON */
     377              : 
     378              :     /// See module-level comment for details.
     379              :     ///
     380              :     /// Does NOT check for the shutdown state of [`Types::Timeline`].
     381              :     /// Instead, the methods of [`Types::Timeline`] that are invoked through
     382              :     /// the [`Handle`] are responsible for checking these conditions
     383              :     /// and if so, return an error that causes the page service to
     384              :     /// close the connection.
     385              :     #[instrument(level = "trace", skip_all)]
     386              :     async fn do_get(
     387              :         &mut self,
     388              :         timeline_id: TimelineId,
     389              :         shard_selector: ShardSelector,
     390              :         tenant_manager: &T::TenantManager,
     391              :     ) -> Result<Handle<T>, GetError<T>> {
     392              :         // terminates because when every iteration we remove an element from the map
     393              :         let miss: ShardSelector = loop {
     394              :             let routing_state = self.shard_routing(timeline_id, shard_selector);
     395              :             match routing_state {
     396              :                 RoutingResult::FastPath(handle) => return Ok(handle),
     397              :                 RoutingResult::SlowPath(key) => match self.map.get(&key) {
     398              :                     Some(cached) => match cached.upgrade() {
     399              :                         Ok(upgraded) => return Ok(upgraded),
     400              :                         Err(HandleUpgradeError::ShutDown) => {
     401              :                             // TODO: dedup with shard_routing()
     402              :                             trace!("handle cache stale");
     403              :                             self.map.remove(&key).unwrap();
     404              :                             continue;
     405              :                         }
     406              :                     },
     407              :                     None => break ShardSelector::Known(key.shard_index),
     408              :                 },
     409              :                 RoutingResult::NeedConsultTenantManager => break shard_selector,
     410              :             }
     411              :         };
     412              :         self.get_miss(timeline_id, miss, tenant_manager).await
     413              :     }
     414              : 
     415              :     #[inline(always)]
     416           56 :     fn shard_routing(
     417           56 :         &mut self,
     418           56 :         timeline_id: TimelineId,
     419           56 :         shard_selector: ShardSelector,
     420           56 :     ) -> RoutingResult<T> {
     421              :         loop {
     422              :             // terminates because when every iteration we remove an element from the map
     423           58 :             let Some((first_key, first_handle)) = self.map.iter().next() else {
     424           37 :                 return RoutingResult::NeedConsultTenantManager;
     425              :             };
     426           21 :             let Ok(first_handle) = first_handle.upgrade() else {
     427              :                 // TODO: dedup with get()
     428            2 :                 trace!("handle cache stale");
     429            2 :                 let first_key_owned = *first_key;
     430            2 :                 self.map.remove(&first_key_owned).unwrap();
     431            2 :                 continue;
     432              :             };
     433              : 
     434           19 :             let first_handle_shard_identity = first_handle.get_shard_identity();
     435           19 :             let make_shard_index = |shard_num: ShardNumber| ShardIndex {
     436           19 :                 shard_number: shard_num,
     437           19 :                 shard_count: first_handle_shard_identity.count,
     438           19 :             };
     439              : 
     440           19 :             let need_idx = match shard_selector {
     441           19 :                 ShardSelector::Page(key) => {
     442           19 :                     make_shard_index(first_handle_shard_identity.get_shard_number(&key))
     443              :                 }
     444            0 :                 ShardSelector::Zero => make_shard_index(ShardNumber(0)),
     445            0 :                 ShardSelector::Known(shard_idx) => shard_idx,
     446              :             };
     447           19 :             let need_shard_timeline_id = ShardTimelineId {
     448           19 :                 shard_index: need_idx,
     449           19 :                 timeline_id,
     450           19 :             };
     451           19 :             let first_handle_shard_timeline_id = ShardTimelineId {
     452           19 :                 shard_index: first_handle_shard_identity.shard_index(),
     453           19 :                 timeline_id: first_handle.shard_timeline_id().timeline_id,
     454           19 :             };
     455              : 
     456           19 :             if need_shard_timeline_id == first_handle_shard_timeline_id {
     457            6 :                 return RoutingResult::FastPath(first_handle);
     458              :             } else {
     459           13 :                 return RoutingResult::SlowPath(need_shard_timeline_id);
     460              :             }
     461              :         }
     462           56 :     }
     463              : 
     464              :     #[instrument(level = "trace", skip_all)]
     465              :     #[inline(always)]
     466              :     async fn get_miss(
     467              :         &mut self,
     468              :         timeline_id: TimelineId,
     469              :         shard_selector: ShardSelector,
     470              :         tenant_manager: &T::TenantManager,
     471              :     ) -> Result<Handle<T>, GetError<T>> {
     472              :         match tenant_manager.resolve(timeline_id, shard_selector).await {
     473              :             Ok(timeline) => {
     474              :                 let key = timeline.shard_timeline_id();
     475              :                 match &shard_selector {
     476              :                     ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
     477              :                     ShardSelector::Page(_) => (), // gotta trust tenant_manager
     478              :                     ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
     479              :                 }
     480              : 
     481              :                 trace!("creating new HandleInner");
     482              :                 let timeline = Arc::new(timeline);
     483              :                 let handle_inner_arc =
     484              :                     Arc::new(Mutex::new(HandleInner::Open(Arc::clone(&timeline))));
     485              :                 let handle_weak = WeakHandle {
     486              :                     inner: Arc::downgrade(&handle_inner_arc),
     487              :                 };
     488              :                 let handle = handle_weak
     489              :                     .upgrade()
     490              :                     .ok()
     491              :                     .expect("we just created it and it's not linked anywhere yet");
     492              :                 {
     493              :                     let mut lock_guard = timeline
     494              :                         .per_timeline_state()
     495              :                         .handles
     496              :                         .lock()
     497              :                         .expect("mutex poisoned");
     498              :                     match &mut *lock_guard {
     499              :                         Some(per_timeline_state) => {
     500              :                             let replaced =
     501              :                                 per_timeline_state.insert(self.id, Arc::clone(&handle_inner_arc));
     502              :                             assert!(replaced.is_none(), "some earlier code left a stale handle");
     503              :                             match self.map.entry(key) {
     504              :                                 hash_map::Entry::Occupied(_o) => {
     505              :                                     // This cannot not happen because
     506              :                                     // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
     507              :                                     // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
     508              :                                     //    while we were waiting for the tenant manager.
     509              :                                     unreachable!()
     510              :                                 }
     511              :                                 hash_map::Entry::Vacant(v) => {
     512              :                                     v.insert(handle_weak);
     513              :                                 }
     514              :                             }
     515              :                         }
     516              :                         None => {
     517              :                             return Err(GetError::PerTimelineStateShutDown);
     518              :                         }
     519              :                     }
     520              :                 }
     521              :                 Ok(handle)
     522              :             }
     523              :             Err(e) => Err(GetError::TenantManager(e)),
     524              :         }
     525              :     }
     526              : }
     527              : 
     528              : pub(crate) enum HandleUpgradeError {
     529              :     ShutDown,
     530              : }
     531              : 
     532              : impl<T: Types> WeakHandle<T> {
     533           54 :     pub(crate) fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
     534           54 :         let Some(inner) = Weak::upgrade(&self.inner) else {
     535            2 :             return Err(HandleUpgradeError::ShutDown);
     536              :         };
     537           52 :         let lock_guard = inner.lock().expect("poisoned");
     538           52 :         match &*lock_guard {
     539           49 :             HandleInner::Open(open) => {
     540           49 :                 let open = Arc::clone(open);
     541           49 :                 drop(lock_guard);
     542           49 :                 Ok(Handle { open, inner })
     543              :             }
     544            3 :             HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
     545              :         }
     546           54 :     }
     547              : 
     548            0 :     pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
     549            0 :         Weak::ptr_eq(&self.inner, &other.inner)
     550            0 :     }
     551              : }
     552              : 
     553              : impl<T: Types> std::ops::Deref for Handle<T> {
     554              :     type Target = T::Timeline;
     555           71 :     fn deref(&self) -> &Self::Target {
     556           71 :         &self.open
     557           71 :     }
     558              : }
     559              : 
     560              : impl<T: Types> Handle<T> {
     561            1 :     pub(crate) fn downgrade(&self) -> WeakHandle<T> {
     562            1 :         WeakHandle {
     563            1 :             inner: Arc::downgrade(&self.inner),
     564            1 :         }
     565            1 :     }
     566              : }
     567              : 
     568              : impl<T: Types> PerTimelineState<T> {
     569              :     /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
     570              :     /// to the [`Types::Timeline`] that embeds this per-timeline state.
     571              :     /// Even if [`TenantManager::resolve`] would still resolve to it.
     572              :     ///
     573              :     /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`Types::Timeline`] alive.
     574              :     /// That's ok because they're short-lived. See module-level comment for details.
     575              :     #[instrument(level = "trace", skip_all)]
     576              :     pub(super) fn shutdown(&self) {
     577              :         let handles = self
     578              :             .handles
     579              :             .lock()
     580              :             .expect("mutex poisoned")
     581              :             // NB: this .take() sets locked to None.
     582              :             // That's what makes future `Cache::get` misses fail.
     583              :             // Cache hits are taken care of below.
     584              :             .take();
     585              :         let Some(handles) = handles else {
     586              :             trace!("already shut down");
     587              :             return;
     588              :         };
     589              :         for handle_inner_arc in handles.values() {
     590              :             // Make hits fail.
     591              :             let mut lock_guard = handle_inner_arc.lock().expect("poisoned");
     592              :             lock_guard.shutdown();
     593              :         }
     594              :         drop(handles);
     595              :     }
     596              : }
     597              : 
     598              : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
     599              : impl<T: Types> Drop for Cache<T> {
     600           16 :     fn drop(&mut self) {
     601              :         for (
     602              :             _,
     603              :             WeakHandle {
     604           16 :                 inner: handle_inner_weak,
     605              :             },
     606           16 :         ) in self.map.drain()
     607              :         {
     608           16 :             let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
     609            2 :                 continue;
     610              :             };
     611           14 :             let Some(handle_timeline) = handle_inner_arc
     612           14 :                 // locking rules: drop lock before acquiring other lock below
     613           14 :                 .lock()
     614           14 :                 .expect("poisoned")
     615           14 :                 .shutdown()
     616              :             else {
     617              :                 // Concurrent PerTimelineState::shutdown.
     618            0 :                 continue;
     619              :             };
     620              :             // Clean up per_timeline_state so the HandleInner allocation can be dropped.
     621           14 :             let per_timeline_state = handle_timeline.per_timeline_state();
     622           14 :             let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
     623           14 :             let Some(handles) = &mut *handles_lock_guard else {
     624            0 :                 continue;
     625              :             };
     626           14 :             let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
     627              :                 // Concurrent PerTimelineState::shutdown.
     628            0 :                 continue;
     629              :             };
     630           14 :             drop(handles_lock_guard); // locking rules!
     631           14 :             assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
     632              :         }
     633           16 :     }
     634              : }
     635              : 
     636              : impl<T: Types> HandleInner<T> {
     637           19 :     fn shutdown(&mut self) -> Option<Arc<T::Timeline>> {
     638           19 :         match std::mem::replace(self, HandleInner::ShutDown) {
     639           19 :             HandleInner::Open(timeline) => Some(timeline),
     640              :             HandleInner::ShutDown => {
     641              :                 // Duplicate shutdowns are possible because both Cache::drop and PerTimelineState::shutdown
     642              :                 // may do it concurrently, but locking rules disallow holding per-timeline-state lock and
     643              :                 // the handle lock at the same time.
     644            0 :                 None
     645              :             }
     646              :         }
     647           19 :     }
     648              : }
     649              : 
     650              : #[cfg(test)]
     651              : mod tests {
     652              :     use std::sync::Weak;
     653              : 
     654              :     use pageserver_api::key::{DBDIR_KEY, Key, rel_block_to_key};
     655              :     use pageserver_api::models::ShardParameters;
     656              :     use pageserver_api::reltag::RelTag;
     657              :     use pageserver_api::shard::ShardStripeSize;
     658              :     use utils::shard::ShardCount;
     659              :     use utils::sync::gate::GateGuard;
     660              : 
     661              :     use super::*;
     662              : 
     663              :     const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
     664              : 
     665              :     #[derive(Debug)]
     666              :     struct TestTypes;
     667              :     impl Types for TestTypes {
     668              :         type TenantManagerError = anyhow::Error;
     669              :         type TenantManager = StubManager;
     670              :         type Timeline = Entered;
     671              :     }
     672              : 
     673              :     struct StubManager {
     674              :         shards: Vec<Arc<StubTimeline>>,
     675              :     }
     676              : 
     677              :     struct StubTimeline {
     678              :         gate: utils::sync::gate::Gate,
     679              :         id: TimelineId,
     680              :         shard: ShardIdentity,
     681              :         per_timeline_state: PerTimelineState<TestTypes>,
     682              :         myself: Weak<StubTimeline>,
     683              :     }
     684              : 
     685              :     struct Entered {
     686              :         timeline: Arc<StubTimeline>,
     687              :         #[allow(dead_code)] // it's stored here to keep the gate open
     688              :         gate_guard: Arc<GateGuard>,
     689              :     }
     690              : 
     691              :     impl StubTimeline {
     692           11 :         fn getpage(&self) {
     693              :             // do nothing
     694           11 :         }
     695              :     }
     696              : 
     697              :     impl Timeline<TestTypes> for Entered {
     698           48 :         fn shard_timeline_id(&self) -> ShardTimelineId {
     699           48 :             ShardTimelineId {
     700           48 :                 shard_index: self.shard.shard_index(),
     701           48 :                 timeline_id: self.id,
     702           48 :             }
     703           48 :         }
     704              : 
     705           19 :         fn get_shard_identity(&self) -> &ShardIdentity {
     706           19 :             &self.shard
     707           19 :         }
     708              : 
     709           43 :         fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
     710           43 :             &self.per_timeline_state
     711           43 :         }
     712              :     }
     713              : 
     714              :     impl TenantManager<TestTypes> for StubManager {
     715           50 :         async fn resolve(
     716           50 :             &self,
     717           50 :             timeline_id: TimelineId,
     718           50 :             shard_selector: ShardSelector,
     719           50 :         ) -> anyhow::Result<Entered> {
     720           63 :             for timeline in &self.shards {
     721           52 :                 if timeline.id == timeline_id {
     722           40 :                     let enter_gate = || {
     723           39 :                         let gate_guard = timeline.gate.enter()?;
     724           29 :                         let gate_guard = Arc::new(gate_guard);
     725           29 :                         anyhow::Ok(gate_guard)
     726           39 :                     };
     727            0 :                     match &shard_selector {
     728            0 :                         ShardSelector::Zero if timeline.shard.is_shard_zero() => {
     729              :                             return Ok(Entered {
     730            0 :                                 timeline: Arc::clone(timeline),
     731            0 :                                 gate_guard: enter_gate()?,
     732              :                             });
     733              :                         }
     734            0 :                         ShardSelector::Zero => continue,
     735           37 :                         ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
     736              :                             return Ok(Entered {
     737           37 :                                 timeline: Arc::clone(timeline),
     738           37 :                                 gate_guard: enter_gate()?,
     739              :                             });
     740              :                         }
     741            0 :                         ShardSelector::Page(_) => continue,
     742            3 :                         ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
     743              :                             return Ok(Entered {
     744            2 :                                 timeline: Arc::clone(timeline),
     745            2 :                                 gate_guard: enter_gate()?,
     746              :                             });
     747              :                         }
     748            1 :                         ShardSelector::Known(_) => continue,
     749              :                     }
     750           12 :                 }
     751              :             }
     752           11 :             anyhow::bail!("not found")
     753           50 :         }
     754              :     }
     755              : 
     756              :     impl std::ops::Deref for Entered {
     757              :         type Target = StubTimeline;
     758          191 :         fn deref(&self) -> &Self::Target {
     759          191 :             &self.timeline
     760          191 :         }
     761              :     }
     762              : 
     763              :     #[tokio::test(start_paused = true)]
     764            1 :     async fn test_timeline_shutdown() {
     765            1 :         crate::tenant::harness::setup_logging();
     766              : 
     767            1 :         let timeline_id = TimelineId::generate();
     768            1 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
     769            1 :             gate: Default::default(),
     770            1 :             id: timeline_id,
     771            1 :             shard: ShardIdentity::unsharded(),
     772            1 :             per_timeline_state: PerTimelineState::default(),
     773            1 :             myself: myself.clone(),
     774            1 :         });
     775            1 :         let mgr = StubManager {
     776            1 :             shards: vec![shard0.clone()],
     777            1 :         };
     778            1 :         let key = DBDIR_KEY;
     779              : 
     780            1 :         let mut cache = Cache::<TestTypes>::default();
     781              : 
     782              :         //
     783              :         // fill the cache
     784              :         //
     785            1 :         let handle: Handle<_> = cache
     786            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     787            1 :             .await
     788            1 :             .expect("we have the timeline");
     789            1 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     790            1 :         assert_eq!(cache.map.len(), 1);
     791            1 :         drop(handle);
     792              : 
     793              :         //
     794              :         // demonstrate that Handle holds up gate closure
     795              :         // but shutdown prevents new handles from being handed out
     796              :         //
     797              : 
     798            1 :         tokio::select! {
     799            1 :             _ = shard0.gate.close() => {
     800            0 :                 panic!("cache and per-timeline handler state keep cache open");
     801              :             }
     802            1 :             _ = tokio::time::sleep(FOREVER) => {
     803            1 :                 // NB: first poll of close() makes it enter closing state
     804            1 :             }
     805              :         }
     806              : 
     807            1 :         let handle = cache
     808            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     809            1 :             .await
     810            1 :             .expect("we have the timeline");
     811            1 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     812              : 
     813              :         // SHUTDOWN
     814            1 :         shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
     815              : 
     816            1 :         assert_eq!(
     817            1 :             cache.map.len(),
     818              :             1,
     819            0 :             "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
     820              :         );
     821              : 
     822              :         // this handle is perfectly usable
     823            1 :         handle.getpage();
     824              : 
     825            1 :         cache
     826            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     827            1 :             .await
     828            1 :             .err()
     829            1 :             .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
     830            1 :         assert_eq!(
     831            1 :             cache.map.len(),
     832              :             0,
     833            0 :             "first access after shutdown cleans up the Weak's from the cache"
     834              :         );
     835              : 
     836            1 :         tokio::select! {
     837            1 :             _ = shard0.gate.close() => {
     838            0 :                 panic!("handle is keeping gate open");
     839              :             }
     840            1 :             _ = tokio::time::sleep(FOREVER) => { }
     841              :         }
     842              : 
     843            1 :         drop(handle);
     844              : 
     845              :         // closing gate succeeds after dropping handle
     846            1 :         tokio::select! {
     847            1 :             _ = shard0.gate.close() => { }
     848            1 :             _ = tokio::time::sleep(FOREVER) => {
     849            0 :                 panic!("handle is dropped, no other gate holders exist")
     850              :             }
     851              :         }
     852              : 
     853              :         // map gets cleaned on next lookup
     854            1 :         cache
     855            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     856            1 :             .await
     857            1 :             .err()
     858            1 :             .expect("documented behavior: can't get new handle after shutdown");
     859            1 :         assert_eq!(cache.map.len(), 0);
     860              : 
     861              :         // ensure all refs to shard0 are gone and we're not leaking anything
     862            1 :         drop(shard0);
     863            1 :         drop(mgr);
     864            1 :     }
     865              : 
     866              :     #[tokio::test]
     867            1 :     async fn test_multiple_timelines_and_deletion() {
     868            1 :         crate::tenant::harness::setup_logging();
     869              : 
     870            1 :         let timeline_a = TimelineId::generate();
     871            1 :         let timeline_b = TimelineId::generate();
     872            1 :         assert_ne!(timeline_a, timeline_b);
     873            1 :         let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
     874            1 :             gate: Default::default(),
     875            1 :             id: timeline_a,
     876            1 :             shard: ShardIdentity::unsharded(),
     877            1 :             per_timeline_state: PerTimelineState::default(),
     878            1 :             myself: myself.clone(),
     879            1 :         });
     880            1 :         let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
     881            1 :             gate: Default::default(),
     882            1 :             id: timeline_b,
     883            1 :             shard: ShardIdentity::unsharded(),
     884            1 :             per_timeline_state: PerTimelineState::default(),
     885            1 :             myself: myself.clone(),
     886            1 :         });
     887            1 :         let mut mgr = StubManager {
     888            1 :             shards: vec![timeline_a.clone(), timeline_b.clone()],
     889            1 :         };
     890            1 :         let key = DBDIR_KEY;
     891              : 
     892            1 :         let mut cache = Cache::<TestTypes>::default();
     893              : 
     894            1 :         cache
     895            1 :             .get(timeline_a.id, ShardSelector::Page(key), &mgr)
     896            1 :             .await
     897            1 :             .expect("we have it");
     898            1 :         cache
     899            1 :             .get(timeline_b.id, ShardSelector::Page(key), &mgr)
     900            1 :             .await
     901            1 :             .expect("we have it");
     902            1 :         assert_eq!(cache.map.len(), 2);
     903              : 
     904              :         // delete timeline A
     905            1 :         timeline_a.per_timeline_state.shutdown();
     906            2 :         mgr.shards.retain(|t| t.id != timeline_a.id);
     907            1 :         assert!(
     908            1 :             mgr.resolve(timeline_a.id, ShardSelector::Page(key))
     909            1 :                 .await
     910            1 :                 .is_err(),
     911            0 :             "broken StubManager implementation"
     912              :         );
     913              : 
     914            1 :         assert_eq!(
     915            1 :             cache.map.len(),
     916              :             2,
     917            0 :             "cache still has a Weak handle to Timeline A"
     918              :         );
     919            1 :         cache
     920            1 :             .get(timeline_a.id, ShardSelector::Page(key), &mgr)
     921            1 :             .await
     922            1 :             .err()
     923            1 :             .expect("documented behavior: can't get new handle after shutdown");
     924              : 
     925            1 :         assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
     926              : 
     927            1 :         cache
     928            1 :             .get(timeline_b.id, ShardSelector::Page(key), &mgr)
     929            1 :             .await
     930            1 :             .expect("we still have it");
     931            1 :     }
     932              : 
     933            7 :     fn make_relation_key_for_shard(shard: ShardNumber, params: ShardParameters) -> Key {
     934            7 :         rel_block_to_key(
     935            7 :             RelTag {
     936            7 :                 spcnode: 1663,
     937            7 :                 dbnode: 208101,
     938            7 :                 relnode: 2620,
     939            7 :                 forknum: 0,
     940            7 :             },
     941            7 :             shard.0 as u32 * params.stripe_size.0,
     942              :         )
     943            7 :     }
     944              : 
     945              :     #[tokio::test(start_paused = true)]
     946            1 :     async fn test_shard_split() {
     947            1 :         crate::tenant::harness::setup_logging();
     948            1 :         let timeline_id = TimelineId::generate();
     949            1 :         let parent = Arc::new_cyclic(|myself| StubTimeline {
     950            1 :             gate: Default::default(),
     951            1 :             id: timeline_id,
     952            1 :             shard: ShardIdentity::unsharded(),
     953            1 :             per_timeline_state: PerTimelineState::default(),
     954            1 :             myself: myself.clone(),
     955            1 :         });
     956            1 :         let child_params = ShardParameters {
     957            1 :             count: ShardCount(2),
     958            1 :             stripe_size: ShardStripeSize::default(),
     959            1 :         };
     960            1 :         let child0 = Arc::new_cyclic(|myself| StubTimeline {
     961            1 :             gate: Default::default(),
     962            1 :             id: timeline_id,
     963            1 :             shard: ShardIdentity::from_params(ShardNumber(0), child_params),
     964            1 :             per_timeline_state: PerTimelineState::default(),
     965            1 :             myself: myself.clone(),
     966            1 :         });
     967            1 :         let child1 = Arc::new_cyclic(|myself| StubTimeline {
     968            1 :             gate: Default::default(),
     969            1 :             id: timeline_id,
     970            1 :             shard: ShardIdentity::from_params(ShardNumber(1), child_params),
     971            1 :             per_timeline_state: PerTimelineState::default(),
     972            1 :             myself: myself.clone(),
     973            1 :         });
     974            1 :         let child_shards_by_shard_number = [child0.clone(), child1.clone()];
     975              : 
     976            1 :         let mut cache = Cache::<TestTypes>::default();
     977              : 
     978              :         // fill the cache with the parent
     979            3 :         for i in 0..2 {
     980            2 :             let handle = cache
     981            2 :                 .get(
     982            2 :                     timeline_id,
     983            2 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
     984            2 :                     &StubManager {
     985            2 :                         shards: vec![parent.clone()],
     986            2 :                     },
     987            2 :                 )
     988            2 :                 .await
     989            2 :                 .expect("we have it");
     990            2 :             assert!(
     991            2 :                 Weak::ptr_eq(&handle.myself, &parent.myself),
     992            0 :                 "mgr returns parent first"
     993              :             );
     994            2 :             drop(handle);
     995              :         }
     996              : 
     997              :         //
     998              :         // SHARD SPLIT: tenant manager changes, but the cache isn't informed
     999              :         //
    1000              : 
    1001              :         // while we haven't shut down the parent, the cache will return the cached parent, even
    1002              :         // if the tenant manager returns the child
    1003            3 :         for i in 0..2 {
    1004            2 :             let handle = cache
    1005            2 :                 .get(
    1006            2 :                     timeline_id,
    1007            2 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
    1008            2 :                     &StubManager {
    1009            2 :                         shards: vec![], // doesn't matter what's in here, the cache is fully loaded
    1010            2 :                     },
    1011            2 :                 )
    1012            2 :                 .await
    1013            2 :                 .expect("we have it");
    1014            2 :             assert!(
    1015            2 :                 Weak::ptr_eq(&handle.myself, &parent.myself),
    1016            0 :                 "mgr returns parent"
    1017              :             );
    1018            2 :             drop(handle);
    1019              :         }
    1020              : 
    1021            1 :         let parent_handle = cache
    1022            1 :             .get(
    1023            1 :                 timeline_id,
    1024            1 :                 ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), child_params)),
    1025            1 :                 &StubManager {
    1026            1 :                     shards: vec![parent.clone()],
    1027            1 :                 },
    1028            1 :             )
    1029            1 :             .await
    1030            1 :             .expect("we have it");
    1031            1 :         assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
    1032              : 
    1033              :         // invalidate the cache
    1034            1 :         parent.per_timeline_state.shutdown();
    1035              : 
    1036              :         // the cache will now return the child, even though the parent handle still exists
    1037            3 :         for i in 0..2 {
    1038            2 :             let handle = cache
    1039            2 :                 .get(
    1040            2 :                     timeline_id,
    1041            2 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), child_params)),
    1042            2 :                     &StubManager {
    1043            2 :                         shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
    1044            2 :                     },
    1045            2 :                 )
    1046            2 :                 .await
    1047            2 :                 .expect("we have it");
    1048            2 :             assert!(
    1049            2 :                 Weak::ptr_eq(
    1050            2 :                     &handle.myself,
    1051            2 :                     &child_shards_by_shard_number[i as usize].myself
    1052              :                 ),
    1053            0 :                 "mgr returns child"
    1054              :             );
    1055            2 :             drop(handle);
    1056              :         }
    1057              : 
    1058              :         // all the while the parent handle kept the parent gate open
    1059            1 :         tokio::select! {
    1060            1 :             _ = parent_handle.gate.close() => {
    1061            0 :                 panic!("parent handle is keeping gate open");
    1062              :             }
    1063            1 :             _ = tokio::time::sleep(FOREVER) => { }
    1064              :         }
    1065            1 :         drop(parent_handle);
    1066            1 :         tokio::select! {
    1067            1 :             _ = parent.gate.close() => { }
    1068            1 :             _ = tokio::time::sleep(FOREVER) => {
    1069            1 :                 panic!("parent handle is dropped, no other gate holders exist")
    1070            1 :             }
    1071            1 :         }
    1072            1 :     }
    1073              : 
    1074              :     #[tokio::test(start_paused = true)]
    1075            1 :     async fn test_connection_handler_exit() {
    1076            1 :         crate::tenant::harness::setup_logging();
    1077            1 :         let timeline_id = TimelineId::generate();
    1078            1 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1079            1 :             gate: Default::default(),
    1080            1 :             id: timeline_id,
    1081            1 :             shard: ShardIdentity::unsharded(),
    1082            1 :             per_timeline_state: PerTimelineState::default(),
    1083            1 :             myself: myself.clone(),
    1084            1 :         });
    1085            1 :         let mgr = StubManager {
    1086            1 :             shards: vec![shard0.clone()],
    1087            1 :         };
    1088            1 :         let key = DBDIR_KEY;
    1089              : 
    1090              :         // Simulate 10 connections that's opened, used, and closed
    1091           11 :         for _ in 0..10 {
    1092           10 :             let mut cache = Cache::<TestTypes>::default();
    1093           10 :             let handle = {
    1094           10 :                 let handle = cache
    1095           10 :                     .get(timeline_id, ShardSelector::Page(key), &mgr)
    1096           10 :                     .await
    1097           10 :                     .expect("we have the timeline");
    1098           10 :                 assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
    1099           10 :                 handle
    1100            1 :             };
    1101           10 :             handle.getpage();
    1102            1 :         }
    1103            1 : 
    1104            1 :         // No handles exist, thus gates are closed and don't require shutdown.
    1105            1 :         // Thus the gate should close immediately, even without shutdown.
    1106            1 :         tokio::select! {
    1107            1 :             _ = shard0.gate.close() => { }
    1108            1 :             _ = tokio::time::sleep(FOREVER) => {
    1109            1 :                 panic!("handle is dropped, no other gate holders exist")
    1110            1 :             }
    1111            1 :         }
    1112            1 :     }
    1113              : 
    1114              :     #[tokio::test(start_paused = true)]
    1115            1 :     async fn test_weak_handles() {
    1116            1 :         crate::tenant::harness::setup_logging();
    1117            1 :         let timeline_id = TimelineId::generate();
    1118            1 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1119            1 :             gate: Default::default(),
    1120            1 :             id: timeline_id,
    1121            1 :             shard: ShardIdentity::unsharded(),
    1122            1 :             per_timeline_state: PerTimelineState::default(),
    1123            1 :             myself: myself.clone(),
    1124            1 :         });
    1125            1 :         let mgr = StubManager {
    1126            1 :             shards: vec![shard0.clone()],
    1127            1 :         };
    1128              : 
    1129            1 :         let refcount_start = Arc::strong_count(&shard0);
    1130              : 
    1131            1 :         let key = DBDIR_KEY;
    1132              : 
    1133            1 :         let mut cache = Cache::<TestTypes>::default();
    1134              : 
    1135            1 :         let handle = cache
    1136            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
    1137            1 :             .await
    1138            1 :             .expect("we have the timeline");
    1139            1 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
    1140              : 
    1141            1 :         let weak_handle = handle.downgrade();
    1142              : 
    1143            1 :         drop(handle);
    1144              : 
    1145            1 :         let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
    1146              : 
    1147              :         // Start shutdown
    1148            1 :         shard0.per_timeline_state.shutdown();
    1149              : 
    1150              :         // Upgrades during shutdown don't work, even if upgraded_handle exists.
    1151            1 :         weak_handle
    1152            1 :             .upgrade()
    1153            1 :             .err()
    1154            1 :             .expect("can't upgrade weak handle as soon as shutdown started");
    1155              : 
    1156              :         // But upgraded_handle is still alive, so the gate won't close.
    1157            1 :         tokio::select! {
    1158            1 :             _ = shard0.gate.close() => {
    1159            0 :                 panic!("handle is keeping gate open");
    1160              :             }
    1161            1 :             _ = tokio::time::sleep(FOREVER) => { }
    1162              :         }
    1163              : 
    1164              :         // Drop the last handle.
    1165            1 :         drop(upgraded_handle);
    1166              : 
    1167              :         // The gate should close now, despite there still being a weak_handle.
    1168            1 :         tokio::select! {
    1169            1 :             _ = shard0.gate.close() => { }
    1170            1 :             _ = tokio::time::sleep(FOREVER) => {
    1171            0 :                 panic!("only strong handle is dropped and we shut down per-timeline-state")
    1172              :             }
    1173              :         }
    1174              : 
    1175              :         // The weak handle still can't be upgraded.
    1176            1 :         weak_handle
    1177            1 :             .upgrade()
    1178            1 :             .err()
    1179            1 :             .expect("still shouldn't be able to upgrade the weak handle");
    1180              : 
    1181              :         // There should be no strong references to the timeline object except the one on "stack".
    1182            1 :         assert_eq!(Arc::strong_count(&shard0), refcount_start);
    1183            1 :     }
    1184              : 
    1185              :     #[tokio::test(start_paused = true)]
    1186            1 :     async fn test_reference_cycle_broken_when_cache_is_dropped() {
    1187            1 :         crate::tenant::harness::setup_logging();
    1188            1 :         let timeline_id = TimelineId::generate();
    1189            1 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1190            1 :             gate: Default::default(),
    1191            1 :             id: timeline_id,
    1192            1 :             shard: ShardIdentity::unsharded(),
    1193            1 :             per_timeline_state: PerTimelineState::default(),
    1194            1 :             myself: myself.clone(),
    1195            1 :         });
    1196            1 :         let mgr = StubManager {
    1197            1 :             shards: vec![shard0.clone()],
    1198            1 :         };
    1199            1 :         let key = DBDIR_KEY;
    1200              : 
    1201            1 :         let mut cache = Cache::<TestTypes>::default();
    1202              : 
    1203              :         // helper to check if a handle is referenced by per_timeline_state
    1204            2 :         let per_timeline_state_refs_handle = |handle_weak: &Weak<Mutex<HandleInner<_>>>| {
    1205            2 :             let per_timeline_state = shard0.per_timeline_state.handles.lock().unwrap();
    1206            2 :             let per_timeline_state = per_timeline_state.as_ref().unwrap();
    1207            2 :             per_timeline_state
    1208            2 :                 .values()
    1209            2 :                 .any(|v| Weak::ptr_eq(&Arc::downgrade(v), handle_weak))
    1210            2 :         };
    1211              : 
    1212              :         // Fill the cache.
    1213            1 :         let handle = cache
    1214            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
    1215            1 :             .await
    1216            1 :             .expect("we have the timeline");
    1217            1 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
    1218            1 :         let handle_inner_weak = Arc::downgrade(&handle.inner);
    1219            1 :         assert!(
    1220            1 :             per_timeline_state_refs_handle(&handle_inner_weak),
    1221            0 :             "we still hold `handle` _and_ haven't dropped `cache` yet"
    1222              :         );
    1223              : 
    1224              :         // Drop the cache.
    1225            1 :         drop(cache);
    1226              : 
    1227            1 :         assert!(
    1228            1 :             !(per_timeline_state_refs_handle(&handle_inner_weak)),
    1229            0 :             "nothing should reference the handle allocation anymore"
    1230              :         );
    1231            1 :         assert!(
    1232            1 :             Weak::upgrade(&handle_inner_weak).is_some(),
    1233            0 :             "the local `handle` still keeps the allocation alive"
    1234              :         );
    1235              :         // but obviously the cache is gone so no new allocations can be handed out.
    1236              : 
    1237              :         // Drop handle.
    1238            1 :         drop(handle);
    1239            1 :         assert!(
    1240            1 :             Weak::upgrade(&handle_inner_weak).is_none(),
    1241            1 :             "the local `handle` is dropped, so the allocation should be dropped by now"
    1242            1 :         );
    1243            1 :     }
    1244              : 
    1245              :     #[tokio::test(start_paused = true)]
    1246            1 :     async fn test_reference_cycle_broken_when_per_timeline_state_shutdown() {
    1247            1 :         crate::tenant::harness::setup_logging();
    1248            1 :         let timeline_id = TimelineId::generate();
    1249            1 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
    1250            1 :             gate: Default::default(),
    1251            1 :             id: timeline_id,
    1252            1 :             shard: ShardIdentity::unsharded(),
    1253            1 :             per_timeline_state: PerTimelineState::default(),
    1254            1 :             myself: myself.clone(),
    1255            1 :         });
    1256            1 :         let mgr = StubManager {
    1257            1 :             shards: vec![shard0.clone()],
    1258            1 :         };
    1259            1 :         let key = DBDIR_KEY;
    1260              : 
    1261            1 :         let mut cache = Cache::<TestTypes>::default();
    1262            1 :         let handle = cache
    1263            1 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
    1264            1 :             .await
    1265            1 :             .expect("we have the timeline");
    1266              :         // grab a weak reference to the inner so can later try to Weak::upgrade it and assert that fails
    1267            1 :         let handle_inner_weak = Arc::downgrade(&handle.inner);
    1268              : 
    1269              :         // drop the handle, obviously the lifetime of `inner` is at least as long as each strong reference to it
    1270            1 :         drop(handle);
    1271            1 :         assert!(Weak::upgrade(&handle_inner_weak).is_some(), "can still");
    1272              : 
    1273              :         // Shutdown the per_timeline_state.
    1274            1 :         shard0.per_timeline_state.shutdown();
    1275            1 :         assert!(Weak::upgrade(&handle_inner_weak).is_none(), "can no longer");
    1276              : 
    1277              :         // cache only contains Weak's, so, it can outlive the per_timeline_state without
    1278              :         // Drop explicitly solely to make this point.
    1279            1 :         drop(cache);
    1280            1 :     }
    1281              : }
        

Generated by: LCOV version 2.1-beta