LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - handle.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 97.8 % 508 497
Test Date: 2024-09-25 14:04:07 Functions: 81.2 % 48 39

            Line data    Source code
       1              : //! An efficient way to keep the timeline gate open without preventing
       2              : //! timeline shutdown for longer than a single call to a timeline method.
       3              : //!
       4              : //! # Motivation
       5              : //!
       6              : //! On a single page service connection, we're typically serving a single TenantTimelineId.
       7              : //!
       8              : //! Without sharding, there is a single Timeline object to which we dispatch
       9              : //! all requests. For example, a getpage request gets dispatched to the
      10              : //! Timeline::get method of the Timeline object that represents the
      11              : //! (tenant,timeline) of that connection.
      12              : //!
      13              : //! With sharding, for each request that comes in on the connection,
      14              : //! we first have to perform shard routing based on the requested key (=~ page number).
      15              : //! The result of shard routing is a Timeline object.
      16              : //! We then dispatch the request to that Timeline object.
      17              : //!
      18              : //! Regardless of whether the tenant is sharded or not, we want to ensure that
      19              : //! we hold the Timeline gate open while we're invoking the method on the
      20              : //! Timeline object.
      21              : //!
      22              : //! However, we want to avoid the overhead of entering the gate for every
      23              : //! method invocation.
      24              : //!
      25              : //! Further, for shard routing, we want to avoid calling the tenant manager to
      26              : //! resolve the shard for every request. Instead, we want to cache the
      27              : //! routing result so we can bypass the tenant manager for all subsequent requests
      28              : //! that get routed to that shard.
      29              : //!
      30              : //! Regardless of how we accomplish the above, it should not
      31              : //! prevent the Timeline from shutting down promptly.
      32              : //!
      33              : //! # Design
      34              : //!
      35              : //! There are three user-facing data structures:
      36              : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
      37              : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
      38              : //! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
      39              : //!   Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request)
      40              : //!
      41              : //! The `Handle` is just a wrapper around an `Arc<HandleInner>`.
      42              : //!
      43              : //! There is one long-lived `Arc<HandleInner>`, which is stored in the `PerTimelineState`.
      44              : //! The `Cache` stores a `Weak<HandleInner>` for each cached Timeline.
      45              : //!
      46              : //! To dispatch a request, the page service connection calls `Cache::get`.
      47              : //!
      48              : //! A cache miss means we consult the tenant manager for shard routing,
      49              : //! resulting in an `Arc<Timeline>`. We enter its gate _once_ and construct an
      50              : //! `Arc<HandleInner>`. We store a `Weak<HandleInner>` in the cache
      51              : //! and the `Arc<HandleInner>` in the `PerTimelineState`.
      52              : //!
      53              : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
      54              : //! and find the `Weak<HandleInner>` in the cache.
      55              : //! We upgrade the `Weak<HandleInner>` to an `Arc<HandleInner>` and wrap it in the user-facing `Handle` type.
      56              : //!
      57              : //! The request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
      58              : //! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
      59              : //!
      60              : //! # Memory Management / How The Reference Cycle Is Broken
      61              : //!
      62              : //! The attentive reader may have noticed the strong reference cycle
      63              : //! from `Arc<HandleInner>` to `PerTimelineState` to `Arc<Timeline>`.
      64              : //!
      65              : //! This cycle is intentional: while it exists, the `Cache` can upgrade its
      66              : //! `Weak<HandleInner>` to an `Arc<HandleInner>` in a single atomic operation.
      67              : //!
      68              : //! The cycle is broken by either
      69              : //! - `PerTimelineState::shutdown` or
      70              : //! - dropping the `Cache`.
      71              : //!
      72              : //! Concurrently existing `Handle`s will extend the existence of the cycle.
      73              : //! However, since `Handle`s are short-lived and new `Handle`s are not
      74              : //! handed out after either `PerTimelineState::shutdown` or `Cache` drop,
      75              : //! that extension of the cycle is bounded.
      76              : //!
      77              : //! # Fast Path for Shard Routing
      78              : //!
      79              : //! The `Cache` has a fast path for shard routing to avoid calling into
      80              : //! the tenant manager for every request.
      81              : //!
      82              : //! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak<HandleInner>`.
      83              : //!
      84              : //! The current implementation uses the first entry in the hash map
      85              : //! to determine the `ShardParameters` and derive the correct
      86              : //! `ShardIndex` for the requested key.
      87              : //!
      88              : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
      89              : //!
      90              : //! If the lookup is successful and the `Weak<HandleInner>` can be upgraded,
      91              : //! it's a hit.
      92              : //!
      93              : //! ## Cache invalidation
      94              : //!
      95              : //! The insight is that cache invalidation is sufficient and most efficiently done lazily.
      96              : //! The only reasons why an entry in the cache can become stale are:
      97              : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
      98              : //!    being detached, timeline or shard deleted, or pageserver is shutting down.
      99              : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
     100              : //!
     101              : //! Regarding (1), we will eventually fail to upgrade the `Weak<HandleInner>` once the
     102              : //! timeline has shut down, and when that happens, we remove the entry from the cache.
     103              : //!
     104              : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
     105              : //! to the parent shard during a shard split. Eventually, the shard split task will
     106              : //! shut down the parent => case (1).
     107              : 
     108              : use std::collections::hash_map;
     109              : use std::collections::HashMap;
     110              : use std::sync::atomic::AtomicBool;
     111              : use std::sync::atomic::Ordering;
     112              : use std::sync::Arc;
     113              : use std::sync::Mutex;
     114              : use std::sync::Weak;
     115              : 
     116              : use pageserver_api::shard::ShardIdentity;
     117              : use tracing::instrument;
     118              : use tracing::trace;
     119              : use utils::id::TimelineId;
     120              : use utils::shard::ShardIndex;
     121              : use utils::shard::ShardNumber;
     122              : 
     123              : use crate::tenant::mgr::ShardSelector;
     124              : 
     125              : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
     126              : pub(crate) trait Types: Sized + std::fmt::Debug {
     127              :     type TenantManagerError: Sized + std::fmt::Debug;
     128              :     type TenantManager: TenantManager<Self> + Sized;
     129              :     type Timeline: ArcTimeline<Self> + Sized;
     130              : }
     131              : 
     132              : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
     133              : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
     134              : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
     135              : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
     136              : struct CacheId(u64);
     137              : 
     138              : impl CacheId {
     139           78 :     fn next() -> Self {
     140              :         static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
     141           78 :         let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     142           78 :         if id == 0 {
     143            0 :             panic!("CacheId::new() returned 0, overflow");
     144           78 :         }
     145           78 :         Self(id)
     146           78 :     }
     147              : }
     148              : 
     149              : /// See module-level comment.
     150              : pub(crate) struct Cache<T: Types> {
     151              :     id: CacheId,
     152              :     map: Map<T>,
     153              : }
     154              : 
     155              : type Map<T> = HashMap<ShardTimelineId, Weak<HandleInner<T>>>;
     156              : 
     157              : impl<T: Types> Default for Cache<T> {
     158           78 :     fn default() -> Self {
     159           78 :         Self {
     160           78 :             id: CacheId::next(),
     161           78 :             map: Default::default(),
     162           78 :         }
     163           78 :     }
     164              : }
     165              : 
     166              : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
     167              : pub(crate) struct ShardTimelineId {
     168              :     pub(crate) shard_index: ShardIndex,
     169              :     pub(crate) timeline_id: TimelineId,
     170              : }
     171              : 
     172              : /// See module-level comment.
     173              : pub(crate) struct Handle<T: Types>(Arc<HandleInner<T>>);
     174              : struct HandleInner<T: Types> {
     175              :     shut_down: AtomicBool,
     176              :     timeline: T::Timeline,
     177              :     // The timeline's gate held open.
     178              :     _gate_guard: utils::sync::gate::GateGuard,
     179              : }
     180              : 
     181              : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
     182              : ///
     183              : /// See module-level comment for details.
     184              : pub struct PerTimelineState<T: Types> {
     185              :     // None = shutting down
     186              :     handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
     187              : }
     188              : 
     189              : impl<T: Types> Default for PerTimelineState<T> {
     190         1284 :     fn default() -> Self {
     191         1284 :         Self {
     192         1284 :             handles: Mutex::new(Some(Default::default())),
     193         1284 :         }
     194         1284 :     }
     195              : }
     196              : 
     197              : /// Abstract view of [`crate::tenant::mgr`], for testability.
     198              : pub(crate) trait TenantManager<T: Types> {
     199              :     /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
     200              :     /// Errors are returned as [`GetError::TenantManager`].
     201              :     async fn resolve(
     202              :         &self,
     203              :         timeline_id: TimelineId,
     204              :         shard_selector: ShardSelector,
     205              :     ) -> Result<T::Timeline, T::TenantManagerError>;
     206              : }
     207              : 
     208              : /// Abstract view of an [`Arc<Timeline>`], for testability.
     209              : pub(crate) trait ArcTimeline<T: Types>: Clone {
     210              :     fn gate(&self) -> &utils::sync::gate::Gate;
     211              :     fn shard_timeline_id(&self) -> ShardTimelineId;
     212              :     fn get_shard_identity(&self) -> &ShardIdentity;
     213              :     fn per_timeline_state(&self) -> &PerTimelineState<T>;
     214              : }
     215              : 
     216              : /// Errors returned by [`Cache::get`].
     217              : #[derive(Debug)]
     218              : pub(crate) enum GetError<T: Types> {
     219              :     TenantManager(T::TenantManagerError),
     220              :     TimelineGateClosed,
     221              :     PerTimelineStateShutDown,
     222              : }
     223              : 
     224              : /// Internal type used in [`Cache::get`].
     225              : enum RoutingResult<T: Types> {
     226              :     FastPath(Handle<T>),
     227              :     SlowPath(ShardTimelineId),
     228              :     NeedConsultTenantManager,
     229              : }
     230              : 
     231              : impl<T: Types> Cache<T> {
     232              :     /// See module-level comment for details.
     233              :     ///
     234              :     /// Does NOT check for the shutdown state of [`Types::Timeline`].
     235              :     /// Instead, the methods of [`Types::Timeline`] that are invoked through
     236              :     /// the [`Handle`] are responsible for checking these conditions
     237              :     /// and if so, return an error that causes the page service to
     238              :     /// close the connection.
     239          150 :     #[instrument(level = "trace", skip_all)]
     240              :     pub(crate) async fn get(
     241              :         &mut self,
     242              :         timeline_id: TimelineId,
     243              :         shard_selector: ShardSelector,
     244              :         tenant_manager: &T::TenantManager,
     245              :     ) -> Result<Handle<T>, GetError<T>> {
     246              :         // terminates because each iteration removes an element from the map
     247              :         loop {
     248              :             let handle = self
     249              :                 .get_impl(timeline_id, shard_selector, tenant_manager)
     250              :                 .await?;
     251              :             if handle.0.shut_down.load(Ordering::Relaxed) {
     252              :                 let removed = self
     253              :                     .map
     254              :                     .remove(&handle.0.timeline.shard_timeline_id())
     255              :                     .expect("invariant of get_impl is that the returned handle is in the map");
     256              :                 assert!(
     257              :                     Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
     258              :                     "shard_timeline_id() incorrect?"
     259              :                 );
     260              :             } else {
     261              :                 return Ok(handle);
     262              :             }
     263              :         }
     264              :     }
     265              : 
     266          162 :     #[instrument(level = "trace", skip_all)]
     267              :     async fn get_impl(
     268              :         &mut self,
     269              :         timeline_id: TimelineId,
     270              :         shard_selector: ShardSelector,
     271              :         tenant_manager: &T::TenantManager,
     272              :     ) -> Result<Handle<T>, GetError<T>> {
     273              :         let miss: ShardSelector = {
     274              :             let routing_state = self.shard_routing(timeline_id, shard_selector);
     275              :             match routing_state {
     276              :                 RoutingResult::FastPath(handle) => return Ok(handle),
     277              :                 RoutingResult::SlowPath(key) => match self.map.get(&key) {
     278              :                     Some(cached) => match cached.upgrade() {
     279              :                         Some(upgraded) => return Ok(Handle(upgraded)),
     280              :                         None => {
     281              :                             trace!("handle cache stale");
     282              :                             self.map.remove(&key).unwrap();
     283              :                             ShardSelector::Known(key.shard_index)
     284              :                         }
     285              :                     },
     286              :                     None => ShardSelector::Known(key.shard_index),
     287              :                 },
     288              :                 RoutingResult::NeedConsultTenantManager => shard_selector,
     289              :             }
     290              :         };
     291              :         self.get_miss(timeline_id, miss, tenant_manager).await
     292              :     }
     293              : 
     294              :     #[inline(always)]
     295          162 :     fn shard_routing(
     296          162 :         &mut self,
     297          162 :         timeline_id: TimelineId,
     298          162 :         shard_selector: ShardSelector,
     299          162 :     ) -> RoutingResult<T> {
     300              :         loop {
     301              :             // terminates because when every iteration we remove an element from the map
     302          168 :             let Some((first_key, first_handle)) = self.map.iter().next() else {
     303           96 :                 return RoutingResult::NeedConsultTenantManager;
     304              :             };
     305           72 :             let Some(first_handle) = first_handle.upgrade() else {
     306              :                 // TODO: dedup with get()
     307            6 :                 trace!("handle cache stale");
     308            6 :                 let first_key_owned = *first_key;
     309            6 :                 self.map.remove(&first_key_owned).unwrap();
     310            6 :                 continue;
     311              :             };
     312              : 
     313           66 :             let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
     314           66 :             let make_shard_index = |shard_num: ShardNumber| ShardIndex {
     315           66 :                 shard_number: shard_num,
     316           66 :                 shard_count: first_handle_shard_identity.count,
     317           66 :             };
     318              : 
     319           66 :             let need_idx = match shard_selector {
     320           66 :                 ShardSelector::Page(key) => {
     321           66 :                     make_shard_index(first_handle_shard_identity.get_shard_number(&key))
     322              :                 }
     323            0 :                 ShardSelector::Zero => make_shard_index(ShardNumber(0)),
     324            0 :                 ShardSelector::Known(shard_idx) => shard_idx,
     325              :             };
     326           66 :             let need_shard_timeline_id = ShardTimelineId {
     327           66 :                 shard_index: need_idx,
     328           66 :                 timeline_id,
     329           66 :             };
     330           66 :             let first_handle_shard_timeline_id = ShardTimelineId {
     331           66 :                 shard_index: first_handle_shard_identity.shard_index(),
     332           66 :                 timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
     333           66 :             };
     334           66 : 
     335           66 :             if need_shard_timeline_id == first_handle_shard_timeline_id {
     336           48 :                 return RoutingResult::FastPath(Handle(first_handle));
     337              :             } else {
     338           18 :                 return RoutingResult::SlowPath(need_shard_timeline_id);
     339              :             }
     340              :         }
     341          162 :     }
     342              : 
     343          114 :     #[instrument(level = "trace", skip_all)]
     344              :     #[inline(always)]
     345              :     async fn get_miss(
     346              :         &mut self,
     347              :         timeline_id: TimelineId,
     348              :         shard_selector: ShardSelector,
     349              :         tenant_manager: &T::TenantManager,
     350              :     ) -> Result<Handle<T>, GetError<T>> {
     351              :         match tenant_manager.resolve(timeline_id, shard_selector).await {
     352              :             Ok(timeline) => {
     353              :                 let key = timeline.shard_timeline_id();
     354              :                 match &shard_selector {
     355              :                     ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
     356              :                     ShardSelector::Page(_) => (), // gotta trust tenant_manager
     357              :                     ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
     358              :                 }
     359              : 
     360              :                 let gate_guard = match timeline.gate().enter() {
     361              :                     Ok(guard) => guard,
     362              :                     Err(_) => {
     363              :                         return Err(GetError::TimelineGateClosed);
     364              :                     }
     365              :                 };
     366              :                 trace!("creating new HandleInner");
     367              :                 let handle = Arc::new(
     368              :                     // TODO: global metric that keeps track of the number of live HandlerTimeline instances
     369              :                     // so we can identify reference cycle bugs.
     370              :                     HandleInner {
     371              :                         shut_down: AtomicBool::new(false),
     372              :                         _gate_guard: gate_guard,
     373              :                         timeline: timeline.clone(),
     374              :                     },
     375              :                 );
     376              :                 let handle = {
     377              :                     let mut lock_guard = timeline
     378              :                         .per_timeline_state()
     379              :                         .handles
     380              :                         .lock()
     381              :                         .expect("mutex poisoned");
     382              :                     match &mut *lock_guard {
     383              :                         Some(per_timeline_state) => {
     384              :                             let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
     385              :                             assert!(replaced.is_none(), "some earlier code left a stale handle");
     386              :                             match self.map.entry(key) {
     387              :                                 hash_map::Entry::Occupied(_o) => {
     388              :                                     // This cannot not happen because
     389              :                                     // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
     390              :                                     // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
     391              :                                     //    while we were waiting for the tenant manager.
     392              :                                     unreachable!()
     393              :                                 }
     394              :                                 hash_map::Entry::Vacant(v) => {
     395              :                                     v.insert(Arc::downgrade(&handle));
     396              :                                     handle
     397              :                                 }
     398              :                             }
     399              :                         }
     400              :                         None => {
     401              :                             return Err(GetError::PerTimelineStateShutDown);
     402              :                         }
     403              :                     }
     404              :                 };
     405              :                 Ok(Handle(handle))
     406              :             }
     407              :             Err(e) => Err(GetError::TenantManager(e)),
     408              :         }
     409              :     }
     410              : }
     411              : 
     412              : impl<T: Types> PerTimelineState<T> {
     413              :     /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
     414              :     /// to the [`Types::Timeline`] that embeds this per-timeline state.
     415              :     /// Even if [`TenantManager::resolve`] would still resolve to it.
     416              :     ///
     417              :     /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
     418              :     /// That's ok because they're short-lived. See module-level comment for details.
     419           42 :     #[instrument(level = "trace", skip_all)]
     420              :     pub(super) fn shutdown(&self) {
     421              :         let handles = self
     422              :             .handles
     423              :             .lock()
     424              :             .expect("mutex poisoned")
     425              :             // NB: this .take() sets locked to None.
     426              :             // That's what makes future `Cache::get` misses fail.
     427              :             // Cache hits are taken care of below.
     428              :             .take();
     429              :         let Some(handles) = handles else {
     430              :             trace!("already shut down");
     431              :             return;
     432              :         };
     433              :         for handle in handles.values() {
     434              :             // Make hits fail.
     435              :             handle.shut_down.store(true, Ordering::Relaxed);
     436              :         }
     437              :         drop(handles);
     438              :     }
     439              : }
     440              : 
     441              : impl<T: Types> std::ops::Deref for Handle<T> {
     442              :     type Target = T::Timeline;
     443          186 :     fn deref(&self) -> &Self::Target {
     444          186 :         &self.0.timeline
     445          186 :     }
     446              : }
     447              : 
     448              : #[cfg(test)]
     449              : impl<T: Types> Drop for HandleInner<T> {
     450          102 :     fn drop(&mut self) {
     451          102 :         trace!("HandleInner dropped");
     452          102 :     }
     453              : }
     454              : 
     455              : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
     456              : impl<T: Types> Drop for Cache<T> {
     457           78 :     fn drop(&mut self) {
     458           78 :         for (_, weak) in self.map.drain() {
     459           78 :             if let Some(strong) = weak.upgrade() {
     460              :                 // handle is still being kept alive in PerTimelineState
     461           78 :                 let timeline = strong.timeline.per_timeline_state();
     462           78 :                 let mut handles = timeline.handles.lock().expect("mutex poisoned");
     463           78 :                 if let Some(handles) = &mut *handles {
     464           78 :                     let Some(removed) = handles.remove(&self.id) else {
     465              :                         // There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
     466            0 :                         continue;
     467              :                     };
     468           78 :                     assert!(Arc::ptr_eq(&removed, &strong));
     469            0 :                 }
     470            0 :             }
     471              :         }
     472           78 :     }
     473              : }
     474              : 
     475              : #[cfg(test)]
     476              : mod tests {
     477              :     use pageserver_api::{
     478              :         key::{rel_block_to_key, Key, DBDIR_KEY},
     479              :         models::ShardParameters,
     480              :         reltag::RelTag,
     481              :         shard::ShardStripeSize,
     482              :     };
     483              :     use utils::shard::ShardCount;
     484              : 
     485              :     use super::*;
     486              : 
     487              :     const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
     488              : 
     489              :     #[derive(Debug)]
     490              :     struct TestTypes;
     491              :     impl Types for TestTypes {
     492              :         type TenantManagerError = anyhow::Error;
     493              :         type TenantManager = StubManager;
     494              :         type Timeline = Arc<StubTimeline>;
     495              :     }
     496              : 
     497              :     struct StubManager {
     498              :         shards: Vec<Arc<StubTimeline>>,
     499              :     }
     500              : 
     501              :     struct StubTimeline {
     502              :         gate: utils::sync::gate::Gate,
     503              :         id: TimelineId,
     504              :         shard: ShardIdentity,
     505              :         per_timeline_state: PerTimelineState<TestTypes>,
     506              :         myself: Weak<StubTimeline>,
     507              :     }
     508              : 
     509              :     impl StubTimeline {
     510           66 :         fn getpage(&self) {
     511           66 :             // do nothing
     512           66 :         }
     513              :     }
     514              : 
     515              :     impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
     516          108 :         fn gate(&self) -> &utils::sync::gate::Gate {
     517          108 :             &self.gate
     518          108 :         }
     519              : 
     520          186 :         fn shard_timeline_id(&self) -> ShardTimelineId {
     521          186 :             ShardTimelineId {
     522          186 :                 shard_index: self.shard.shard_index(),
     523          186 :                 timeline_id: self.id,
     524          186 :             }
     525          186 :         }
     526              : 
     527           66 :         fn get_shard_identity(&self) -> &ShardIdentity {
     528           66 :             &self.shard
     529           66 :         }
     530              : 
     531          180 :         fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
     532          180 :             &self.per_timeline_state
     533          180 :         }
     534              :     }
     535              : 
     536              :     impl TenantManager<TestTypes> for StubManager {
     537          120 :         async fn resolve(
     538          120 :             &self,
     539          120 :             timeline_id: TimelineId,
     540          120 :             shard_selector: ShardSelector,
     541          120 :         ) -> anyhow::Result<Arc<StubTimeline>> {
     542          144 :             for timeline in &self.shards {
     543          132 :                 if timeline.id == timeline_id {
     544            0 :                     match &shard_selector {
     545            0 :                         ShardSelector::Zero if timeline.shard.is_shard_zero() => {
     546            0 :                             return Ok(Arc::clone(timeline));
     547              :                         }
     548            0 :                         ShardSelector::Zero => continue,
     549           96 :                         ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
     550           96 :                             return Ok(Arc::clone(timeline));
     551              :                         }
     552            0 :                         ShardSelector::Page(_) => continue,
     553           18 :                         ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
     554           12 :                             return Ok(Arc::clone(timeline));
     555              :                         }
     556            6 :                         ShardSelector::Known(_) => continue,
     557              :                     }
     558           18 :                 }
     559              :             }
     560           12 :             anyhow::bail!("not found")
     561          120 :         }
     562              :     }
     563              : 
     564              :     #[tokio::test(start_paused = true)]
     565            6 :     async fn test_timeline_shutdown() {
     566            6 :         crate::tenant::harness::setup_logging();
     567            6 : 
     568            6 :         let timeline_id = TimelineId::generate();
     569            6 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
     570            6 :             gate: Default::default(),
     571            6 :             id: timeline_id,
     572            6 :             shard: ShardIdentity::unsharded(),
     573            6 :             per_timeline_state: PerTimelineState::default(),
     574            6 :             myself: myself.clone(),
     575            6 :         });
     576            6 :         let mgr = StubManager {
     577            6 :             shards: vec![shard0.clone()],
     578            6 :         };
     579            6 :         let key = DBDIR_KEY;
     580            6 : 
     581            6 :         let mut cache = Cache::<TestTypes>::default();
     582            6 : 
     583            6 :         //
     584            6 :         // fill the cache
     585            6 :         //
     586            6 :         assert_eq!(
     587            6 :             (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
     588            6 :             (2, 1),
     589            6 :             "strong: shard0, mgr; weak: myself"
     590            6 :         );
     591            6 : 
     592            6 :         let handle: Handle<_> = cache
     593            6 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     594            6 :             .await
     595            6 :             .expect("we have the timeline");
     596            6 :         let handle_inner_weak = Arc::downgrade(&handle.0);
     597            6 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     598            6 :         assert_eq!(
     599            6 :             (
     600            6 :                 Weak::strong_count(&handle_inner_weak),
     601            6 :                 Weak::weak_count(&handle_inner_weak)
     602            6 :             ),
     603            6 :             (2, 2),
     604            6 :             "strong: handle, per_timeline_state, weak: handle_inner_weak, cache"
     605            6 :         );
     606            6 :         assert_eq!(cache.map.len(), 1);
     607            6 : 
     608            6 :         assert_eq!(
     609            6 :             (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
     610            6 :             (3, 1),
     611            6 :             "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
     612            6 :         );
     613            6 :         drop(handle);
     614            6 :         assert_eq!(
     615            6 :             (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
     616            6 :             (3, 1),
     617            6 :             "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
     618            6 :         );
     619            6 : 
     620            6 :         //
     621            6 :         // demonstrate that Handle holds up gate closure
     622            6 :         // but shutdown prevents new handles from being handed out
     623            6 :         //
     624            6 : 
     625            6 :         tokio::select! {
     626            6 :             _ = shard0.gate.close() => {
     627            6 :                 panic!("cache and per-timeline handler state keep cache open");
     628            6 :             }
     629            6 :             _ = tokio::time::sleep(FOREVER) => {
     630            6 :                 // NB: first poll of close() makes it enter closing state
     631            6 :             }
     632            6 :         }
     633            6 : 
     634            6 :         let handle = cache
     635            6 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     636            6 :             .await
     637            6 :             .expect("we have the timeline");
     638            6 :         assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     639            6 : 
     640            6 :         // SHUTDOWN
     641            6 :         shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
     642            6 : 
     643            6 :         assert_eq!(
     644            6 :             1,
     645            6 :             Weak::strong_count(&handle_inner_weak),
     646            6 :             "through local var handle"
     647            6 :         );
     648            6 :         assert_eq!(
     649            6 :             cache.map.len(),
     650            6 :             1,
     651            6 :             "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
     652            6 :         );
     653            6 :         assert_eq!(
     654            6 :             (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
     655            6 :             (3, 1),
     656            6 :             "strong: handleinner(via handle), shard0, mgr; weak: myself"
     657            6 :         );
     658            6 : 
     659            6 :         // this handle is perfectly usable
     660            6 :         handle.getpage();
     661            6 : 
     662            6 :         cache
     663            6 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     664            6 :             .await
     665            6 :             .err()
     666            6 :             .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
     667            6 :         assert_eq!(
     668            6 :             cache.map.len(),
     669            6 :             0,
     670            6 :             "first access after shutdown cleans up the Weak's from the cache"
     671            6 :         );
     672            6 : 
     673            6 :         tokio::select! {
     674            6 :             _ = shard0.gate.close() => {
     675            6 :                 panic!("handle is keeping gate open");
     676            6 :             }
     677            6 :             _ = tokio::time::sleep(FOREVER) => { }
     678            6 :         }
     679            6 : 
     680            6 :         drop(handle);
     681            6 :         assert_eq!(
     682            6 :             0,
     683            6 :             Weak::strong_count(&handle_inner_weak),
     684            6 :             "the HandleInner destructor already ran"
     685            6 :         );
     686            6 :         assert_eq!(
     687            6 :             (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
     688            6 :             (2, 1),
     689            6 :             "strong: shard0, mgr; weak: myself"
     690            6 :         );
     691            6 : 
     692            6 :         // closing gate succeeds after dropping handle
     693            6 :         tokio::select! {
     694            6 :             _ = shard0.gate.close() => { }
     695            6 :             _ = tokio::time::sleep(FOREVER) => {
     696            6 :                 panic!("handle is dropped, no other gate holders exist")
     697            6 :             }
     698            6 :         }
     699            6 : 
     700            6 :         // map gets cleaned on next lookup
     701            6 :         cache
     702            6 :             .get(timeline_id, ShardSelector::Page(key), &mgr)
     703            6 :             .await
     704            6 :             .err()
     705            6 :             .expect("documented behavior: can't get new handle after shutdown");
     706            6 :         assert_eq!(cache.map.len(), 0);
     707            6 : 
     708            6 :         // ensure all refs to shard0 are gone and we're not leaking anything
     709            6 :         let myself = Weak::clone(&shard0.myself);
     710            6 :         drop(shard0);
     711            6 :         drop(mgr);
     712            6 :         assert_eq!(Weak::strong_count(&myself), 0);
     713            6 :     }
     714              : 
     715              :     #[tokio::test]
     716            6 :     async fn test_multiple_timelines_and_deletion() {
     717            6 :         crate::tenant::harness::setup_logging();
     718            6 : 
     719            6 :         let timeline_a = TimelineId::generate();
     720            6 :         let timeline_b = TimelineId::generate();
     721            6 :         assert_ne!(timeline_a, timeline_b);
     722            6 :         let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
     723            6 :             gate: Default::default(),
     724            6 :             id: timeline_a,
     725            6 :             shard: ShardIdentity::unsharded(),
     726            6 :             per_timeline_state: PerTimelineState::default(),
     727            6 :             myself: myself.clone(),
     728            6 :         });
     729            6 :         let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
     730            6 :             gate: Default::default(),
     731            6 :             id: timeline_b,
     732            6 :             shard: ShardIdentity::unsharded(),
     733            6 :             per_timeline_state: PerTimelineState::default(),
     734            6 :             myself: myself.clone(),
     735            6 :         });
     736            6 :         let mut mgr = StubManager {
     737            6 :             shards: vec![timeline_a.clone(), timeline_b.clone()],
     738            6 :         };
     739            6 :         let key = DBDIR_KEY;
     740            6 : 
     741            6 :         let mut cache = Cache::<TestTypes>::default();
     742            6 : 
     743            6 :         cache
     744            6 :             .get(timeline_a.id, ShardSelector::Page(key), &mgr)
     745            6 :             .await
     746            6 :             .expect("we have it");
     747            6 :         cache
     748            6 :             .get(timeline_b.id, ShardSelector::Page(key), &mgr)
     749            6 :             .await
     750            6 :             .expect("we have it");
     751            6 :         assert_eq!(cache.map.len(), 2);
     752            6 : 
     753            6 :         // delete timeline A
     754            6 :         timeline_a.per_timeline_state.shutdown();
     755           12 :         mgr.shards.retain(|t| t.id != timeline_a.id);
     756            6 :         assert!(
     757            6 :             mgr.resolve(timeline_a.id, ShardSelector::Page(key))
     758            6 :                 .await
     759            6 :                 .is_err(),
     760            6 :             "broken StubManager implementation"
     761            6 :         );
     762            6 : 
     763            6 :         assert_eq!(
     764            6 :             cache.map.len(),
     765            6 :             2,
     766            6 :             "cache still has a Weak handle to Timeline A"
     767            6 :         );
     768            6 :         cache
     769            6 :             .get(timeline_a.id, ShardSelector::Page(key), &mgr)
     770            6 :             .await
     771            6 :             .err()
     772            6 :             .expect("documented behavior: can't get new handle after shutdown");
     773            6 :         assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
     774            6 : 
     775            6 :         cache
     776            6 :             .get(timeline_b.id, ShardSelector::Page(key), &mgr)
     777            6 :             .await
     778            6 :             .expect("we still have it");
     779            6 :     }
     780              : 
     781           42 :     fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
     782           42 :         rel_block_to_key(
     783           42 :             RelTag {
     784           42 :                 spcnode: 1663,
     785           42 :                 dbnode: 208101,
     786           42 :                 relnode: 2620,
     787           42 :                 forknum: 0,
     788           42 :             },
     789           42 :             shard.0 as u32 * params.stripe_size.0,
     790           42 :         )
     791           42 :     }
     792              : 
     793              :     #[tokio::test(start_paused = true)]
     794            6 :     async fn test_shard_split() {
     795            6 :         crate::tenant::harness::setup_logging();
     796            6 :         let timeline_id = TimelineId::generate();
     797            6 :         let parent = Arc::new_cyclic(|myself| StubTimeline {
     798            6 :             gate: Default::default(),
     799            6 :             id: timeline_id,
     800            6 :             shard: ShardIdentity::unsharded(),
     801            6 :             per_timeline_state: PerTimelineState::default(),
     802            6 :             myself: myself.clone(),
     803            6 :         });
     804            6 :         let child_params = ShardParameters {
     805            6 :             count: ShardCount(2),
     806            6 :             stripe_size: ShardStripeSize::default(),
     807            6 :         };
     808            6 :         let child0 = Arc::new_cyclic(|myself| StubTimeline {
     809            6 :             gate: Default::default(),
     810            6 :             id: timeline_id,
     811            6 :             shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
     812            6 :             per_timeline_state: PerTimelineState::default(),
     813            6 :             myself: myself.clone(),
     814            6 :         });
     815            6 :         let child1 = Arc::new_cyclic(|myself| StubTimeline {
     816            6 :             gate: Default::default(),
     817            6 :             id: timeline_id,
     818            6 :             shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
     819            6 :             per_timeline_state: PerTimelineState::default(),
     820            6 :             myself: myself.clone(),
     821            6 :         });
     822            6 :         let child_shards_by_shard_number = [child0.clone(), child1.clone()];
     823            6 : 
     824            6 :         let mut cache = Cache::<TestTypes>::default();
     825            6 : 
     826            6 :         // fill the cache with the parent
     827           18 :         for i in 0..2 {
     828           12 :             let handle = cache
     829           12 :                 .get(
     830           12 :                     timeline_id,
     831           12 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
     832           12 :                     &StubManager {
     833           12 :                         shards: vec![parent.clone()],
     834           12 :                     },
     835           12 :                 )
     836            6 :                 .await
     837           12 :                 .expect("we have it");
     838           12 :             assert!(
     839           12 :                 Weak::ptr_eq(&handle.myself, &parent.myself),
     840            6 :                 "mgr returns parent first"
     841            6 :             );
     842           12 :             drop(handle);
     843            6 :         }
     844            6 : 
     845            6 :         //
     846            6 :         // SHARD SPLIT: tenant manager changes, but the cache isn't informed
     847            6 :         //
     848            6 : 
     849            6 :         // while we haven't shut down the parent, the cache will return the cached parent, even
     850            6 :         // if the tenant manager returns the child
     851           18 :         for i in 0..2 {
     852           12 :             let handle = cache
     853           12 :                 .get(
     854           12 :                     timeline_id,
     855           12 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
     856           12 :                     &StubManager {
     857           12 :                         shards: vec![], // doesn't matter what's in here, the cache is fully loaded
     858           12 :                     },
     859           12 :                 )
     860            6 :                 .await
     861           12 :                 .expect("we have it");
     862           12 :             assert!(
     863           12 :                 Weak::ptr_eq(&handle.myself, &parent.myself),
     864            6 :                 "mgr returns parent"
     865            6 :             );
     866           12 :             drop(handle);
     867            6 :         }
     868            6 : 
     869            6 :         let parent_handle = cache
     870            6 :             .get(
     871            6 :                 timeline_id,
     872            6 :                 ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
     873            6 :                 &StubManager {
     874            6 :                     shards: vec![parent.clone()],
     875            6 :                 },
     876            6 :             )
     877            6 :             .await
     878            6 :             .expect("we have it");
     879            6 :         assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
     880            6 : 
     881            6 :         // invalidate the cache
     882            6 :         parent.per_timeline_state.shutdown();
     883            6 : 
     884            6 :         // the cache will now return the child, even though the parent handle still exists
     885           18 :         for i in 0..2 {
     886           12 :             let handle = cache
     887           12 :                 .get(
     888           12 :                     timeline_id,
     889           12 :                     ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
     890           12 :                     &StubManager {
     891           12 :                         shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
     892           12 :                     },
     893           12 :                 )
     894            6 :                 .await
     895           12 :                 .expect("we have it");
     896           12 :             assert!(
     897           12 :                 Weak::ptr_eq(
     898           12 :                     &handle.myself,
     899           12 :                     &child_shards_by_shard_number[i as usize].myself
     900           12 :                 ),
     901            6 :                 "mgr returns child"
     902            6 :             );
     903           12 :             drop(handle);
     904            6 :         }
     905            6 : 
     906            6 :         // all the while the parent handle kept the parent gate open
     907            6 :         tokio::select! {
     908            6 :             _ = parent_handle.gate.close() => {
     909            6 :                 panic!("parent handle is keeping gate open");
     910            6 :             }
     911            6 :             _ = tokio::time::sleep(FOREVER) => { }
     912            6 :         }
     913            6 :         drop(parent_handle);
     914            6 :         tokio::select! {
     915            6 :             _ = parent.gate.close() => { }
     916            6 :             _ = tokio::time::sleep(FOREVER) => {
     917            6 :                 panic!("parent handle is dropped, no other gate holders exist")
     918            6 :             }
     919            6 :         }
     920            6 :     }
     921              : 
     922              :     #[tokio::test(start_paused = true)]
     923            6 :     async fn test_connection_handler_exit() {
     924            6 :         crate::tenant::harness::setup_logging();
     925            6 :         let timeline_id = TimelineId::generate();
     926            6 :         let shard0 = Arc::new_cyclic(|myself| StubTimeline {
     927            6 :             gate: Default::default(),
     928            6 :             id: timeline_id,
     929            6 :             shard: ShardIdentity::unsharded(),
     930            6 :             per_timeline_state: PerTimelineState::default(),
     931            6 :             myself: myself.clone(),
     932            6 :         });
     933            6 :         let mgr = StubManager {
     934            6 :             shards: vec![shard0.clone()],
     935            6 :         };
     936            6 :         let key = DBDIR_KEY;
     937            6 : 
     938            6 :         // Simulate 10 connections that's opened, used, and closed
     939            6 :         let mut used_handles = vec![];
     940           66 :         for _ in 0..10 {
     941           60 :             let mut cache = Cache::<TestTypes>::default();
     942           60 :             let handle = {
     943           60 :                 let handle = cache
     944           60 :                     .get(timeline_id, ShardSelector::Page(key), &mgr)
     945            6 :                     .await
     946           60 :                     .expect("we have the timeline");
     947           60 :                 assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
     948           60 :                 handle
     949           60 :             };
     950           60 :             handle.getpage();
     951           60 :             used_handles.push(Arc::downgrade(&handle.0));
     952            6 :         }
     953            6 : 
     954            6 :         // No handles exist, thus gates are closed and don't require shutdown
     955            6 :         assert!(used_handles
     956            6 :             .iter()
     957           60 :             .all(|weak| Weak::strong_count(weak) == 0));
     958            6 : 
     959            6 :         // ... thus the gate should close immediately, even without shutdown
     960            6 :         tokio::select! {
     961            6 :             _ = shard0.gate.close() => { }
     962            6 :             _ = tokio::time::sleep(FOREVER) => {
     963            6 :                 panic!("handle is dropped, no other gate holders exist")
     964            6 :             }
     965            6 :         }
     966            6 :     }
     967              : }
        

Generated by: LCOV version 2.1-beta