Line data Source code
1 : //! An efficient way to keep the timeline gate open without preventing
2 : //! timeline shutdown for longer than a single call to a timeline method.
3 : //!
4 : //! # Motivation
5 : //!
6 : //! On a single page service connection, we're typically serving a single TenantTimelineId.
7 : //!
8 : //! Without sharding, there is a single Timeline object to which we dispatch
9 : //! all requests. For example, a getpage request gets dispatched to the
10 : //! Timeline::get method of the Timeline object that represents the
11 : //! (tenant,timeline) of that connection.
12 : //!
13 : //! With sharding, for each request that comes in on the connection,
14 : //! we first have to perform shard routing based on the requested key (=~ page number).
15 : //! The result of shard routing is a Timeline object.
16 : //! We then dispatch the request to that Timeline object.
17 : //!
18 : //! Regardless of whether the tenant is sharded or not, we want to ensure that
19 : //! we hold the Timeline gate open while we're invoking the method on the
20 : //! Timeline object.
21 : //!
22 : //! However, we want to avoid the overhead of entering the gate for every
23 : //! method invocation.
24 : //!
25 : //! Further, for shard routing, we want to avoid calling the tenant manager to
26 : //! resolve the shard for every request. Instead, we want to cache the
27 : //! routing result so we can bypass the tenant manager for all subsequent requests
28 : //! that get routed to that shard.
29 : //!
30 : //! Regardless of how we accomplish the above, it should not
31 : //! prevent the Timeline from shutting down promptly.
32 : //!
33 : //! # Design
34 : //!
35 : //! There are three user-facing data structures:
36 : //! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime.
37 : //! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime.
38 : //! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`.
39 : //! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request)
40 : //!
41 : //! The `Handle` is just a wrapper around an `Arc<HandleInner>`.
42 : //!
43 : //! There is one long-lived `Arc<HandleInner>`, which is stored in the `PerTimelineState`.
44 : //! The `Cache` stores a `Weak<HandleInner>` for each cached Timeline.
45 : //!
46 : //! To dispatch a request, the page service connection calls `Cache::get`.
47 : //!
48 : //! A cache miss means we consult the tenant manager for shard routing,
49 : //! resulting in an `Arc<Timeline>`. We enter its gate _once_ and construct an
50 : //! `Arc<HandleInner>`. We store a `Weak<HandleInner>` in the cache
51 : //! and the `Arc<HandleInner>` in the `PerTimelineState`.
52 : //!
53 : //! For subsequent requests, `Cache::get` will perform a "fast path" shard routing
54 : //! and find the `Weak<HandleInner>` in the cache.
55 : //! We upgrade the `Weak<HandleInner>` to an `Arc<HandleInner>` and wrap it in the user-facing `Handle` type.
56 : //!
57 : //! The request handler dispatches the request to the right `<Handle as Deref<Target = Timeline>>::$request_method`.
58 : //! It then drops the `Handle`, which drops the `Arc<HandleInner>`.
59 : //!
60 : //! # Memory Management / How The Reference Cycle Is Broken
61 : //!
62 : //! The attentive reader may have noticed the strong reference cycle
63 : //! from `Arc<HandleInner>` to `PerTimelineState` to `Arc<Timeline>`.
64 : //!
65 : //! This cycle is intentional: while it exists, the `Cache` can upgrade its
66 : //! `Weak<HandleInner>` to an `Arc<HandleInner>` in a single atomic operation.
67 : //!
68 : //! The cycle is broken by either
69 : //! - `PerTimelineState::shutdown` or
70 : //! - dropping the `Cache`.
71 : //!
72 : //! Concurrently existing `Handle`s will extend the existence of the cycle.
73 : //! However, since `Handle`s are short-lived and new `Handle`s are not
74 : //! handed out after either `PerTimelineState::shutdown` or `Cache` drop,
75 : //! that extension of the cycle is bounded.
76 : //!
77 : //! # Fast Path for Shard Routing
78 : //!
79 : //! The `Cache` has a fast path for shard routing to avoid calling into
80 : //! the tenant manager for every request.
81 : //!
82 : //! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak<HandleInner>`.
83 : //!
84 : //! The current implementation uses the first entry in the hash map
85 : //! to determine the `ShardParameters` and derive the correct
86 : //! `ShardIndex` for the requested key.
87 : //!
88 : //! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`.
89 : //!
90 : //! If the lookup is successful and the `Weak<HandleInner>` can be upgraded,
91 : //! it's a hit.
92 : //!
93 : //! ## Cache invalidation
94 : //!
95 : //! The insight is that cache invalidation is sufficient and most efficiently done lazily.
96 : //! The only reasons why an entry in the cache can become stale are:
97 : //! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is
98 : //! being detached, timeline or shard deleted, or pageserver is shutting down.
99 : //! 2. We're doing a shard split and new traffic should be routed to the child shards.
100 : //!
101 : //! Regarding (1), we will eventually fail to upgrade the `Weak<HandleInner>` once the
102 : //! timeline has shut down, and when that happens, we remove the entry from the cache.
103 : //!
104 : //! Regarding (2), the insight is that it is toally fine to keep dispatching requests
105 : //! to the parent shard during a shard split. Eventually, the shard split task will
106 : //! shut down the parent => case (1).
107 :
108 : use std::collections::hash_map;
109 : use std::collections::HashMap;
110 : use std::sync::atomic::AtomicBool;
111 : use std::sync::atomic::Ordering;
112 : use std::sync::Arc;
113 : use std::sync::Mutex;
114 : use std::sync::Weak;
115 :
116 : use pageserver_api::shard::ShardIdentity;
117 : use tracing::instrument;
118 : use tracing::trace;
119 : use utils::id::TimelineId;
120 : use utils::shard::ShardIndex;
121 : use utils::shard::ShardNumber;
122 :
123 : use crate::tenant::mgr::ShardSelector;
124 :
125 : /// The requirement for Debug is so that #[derive(Debug)] works in some places.
126 : pub(crate) trait Types: Sized + std::fmt::Debug {
127 : type TenantManagerError: Sized + std::fmt::Debug;
128 : type TenantManager: TenantManager<Self> + Sized;
129 : type Timeline: ArcTimeline<Self> + Sized;
130 : }
131 :
132 : /// Uniquely identifies a [`Cache`] instance over the lifetime of the process.
133 : /// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`].
134 : /// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer.
135 : #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
136 : struct CacheId(u64);
137 :
138 : impl CacheId {
139 26 : fn next() -> Self {
140 : static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
141 26 : let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
142 26 : if id == 0 {
143 0 : panic!("CacheId::new() returned 0, overflow");
144 26 : }
145 26 : Self(id)
146 26 : }
147 : }
148 :
149 : /// See module-level comment.
150 : pub(crate) struct Cache<T: Types> {
151 : id: CacheId,
152 : map: Map<T>,
153 : }
154 :
155 : type Map<T> = HashMap<ShardTimelineId, Weak<HandleInner<T>>>;
156 :
157 : impl<T: Types> Default for Cache<T> {
158 26 : fn default() -> Self {
159 26 : Self {
160 26 : id: CacheId::next(),
161 26 : map: Default::default(),
162 26 : }
163 26 : }
164 : }
165 :
166 : #[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)]
167 : pub(crate) struct ShardTimelineId {
168 : pub(crate) shard_index: ShardIndex,
169 : pub(crate) timeline_id: TimelineId,
170 : }
171 :
172 : /// See module-level comment.
173 : pub(crate) struct Handle<T: Types>(Arc<HandleInner<T>>);
174 : struct HandleInner<T: Types> {
175 : shut_down: AtomicBool,
176 : timeline: T::Timeline,
177 : // The timeline's gate held open.
178 : _gate_guard: utils::sync::gate::GateGuard,
179 : }
180 :
181 : /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
182 : ///
183 : /// See module-level comment for details.
184 : pub struct PerTimelineState<T: Types> {
185 : // None = shutting down
186 : handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
187 : }
188 :
189 : impl<T: Types> Default for PerTimelineState<T> {
190 424 : fn default() -> Self {
191 424 : Self {
192 424 : handles: Mutex::new(Some(Default::default())),
193 424 : }
194 424 : }
195 : }
196 :
197 : /// Abstract view of [`crate::tenant::mgr`], for testability.
198 : pub(crate) trait TenantManager<T: Types> {
199 : /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`].
200 : /// Errors are returned as [`GetError::TenantManager`].
201 : async fn resolve(
202 : &self,
203 : timeline_id: TimelineId,
204 : shard_selector: ShardSelector,
205 : ) -> Result<T::Timeline, T::TenantManagerError>;
206 : }
207 :
208 : /// Abstract view of an [`Arc<Timeline>`], for testability.
209 : pub(crate) trait ArcTimeline<T: Types>: Clone {
210 : fn gate(&self) -> &utils::sync::gate::Gate;
211 : fn shard_timeline_id(&self) -> ShardTimelineId;
212 : fn get_shard_identity(&self) -> &ShardIdentity;
213 : fn per_timeline_state(&self) -> &PerTimelineState<T>;
214 : }
215 :
216 : /// Errors returned by [`Cache::get`].
217 : #[derive(Debug)]
218 : pub(crate) enum GetError<T: Types> {
219 : TenantManager(T::TenantManagerError),
220 : TimelineGateClosed,
221 : PerTimelineStateShutDown,
222 : }
223 :
224 : /// Internal type used in [`Cache::get`].
225 : enum RoutingResult<T: Types> {
226 : FastPath(Handle<T>),
227 : SlowPath(ShardTimelineId),
228 : NeedConsultTenantManager,
229 : }
230 :
231 : impl<T: Types> Cache<T> {
232 : /// See module-level comment for details.
233 : ///
234 : /// Does NOT check for the shutdown state of [`Types::Timeline`].
235 : /// Instead, the methods of [`Types::Timeline`] that are invoked through
236 : /// the [`Handle`] are responsible for checking these conditions
237 : /// and if so, return an error that causes the page service to
238 : /// close the connection.
239 50 : #[instrument(level = "trace", skip_all)]
240 : pub(crate) async fn get(
241 : &mut self,
242 : timeline_id: TimelineId,
243 : shard_selector: ShardSelector,
244 : tenant_manager: &T::TenantManager,
245 : ) -> Result<Handle<T>, GetError<T>> {
246 : // terminates because each iteration removes an element from the map
247 : loop {
248 : let handle = self
249 : .get_impl(timeline_id, shard_selector, tenant_manager)
250 : .await?;
251 : if handle.0.shut_down.load(Ordering::Relaxed) {
252 : let removed = self
253 : .map
254 : .remove(&handle.0.timeline.shard_timeline_id())
255 : .expect("invariant of get_impl is that the returned handle is in the map");
256 : assert!(
257 : Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
258 : "shard_timeline_id() incorrect?"
259 : );
260 : } else {
261 : return Ok(handle);
262 : }
263 : }
264 : }
265 :
266 54 : #[instrument(level = "trace", skip_all)]
267 : async fn get_impl(
268 : &mut self,
269 : timeline_id: TimelineId,
270 : shard_selector: ShardSelector,
271 : tenant_manager: &T::TenantManager,
272 : ) -> Result<Handle<T>, GetError<T>> {
273 : let miss: ShardSelector = {
274 : let routing_state = self.shard_routing(timeline_id, shard_selector);
275 : match routing_state {
276 : RoutingResult::FastPath(handle) => return Ok(handle),
277 : RoutingResult::SlowPath(key) => match self.map.get(&key) {
278 : Some(cached) => match cached.upgrade() {
279 : Some(upgraded) => return Ok(Handle(upgraded)),
280 : None => {
281 : trace!("handle cache stale");
282 : self.map.remove(&key).unwrap();
283 : ShardSelector::Known(key.shard_index)
284 : }
285 : },
286 : None => ShardSelector::Known(key.shard_index),
287 : },
288 : RoutingResult::NeedConsultTenantManager => shard_selector,
289 : }
290 : };
291 : self.get_miss(timeline_id, miss, tenant_manager).await
292 : }
293 :
294 : #[inline(always)]
295 54 : fn shard_routing(
296 54 : &mut self,
297 54 : timeline_id: TimelineId,
298 54 : shard_selector: ShardSelector,
299 54 : ) -> RoutingResult<T> {
300 : loop {
301 : // terminates because when every iteration we remove an element from the map
302 54 : let Some((first_key, first_handle)) = self.map.iter().next() else {
303 32 : return RoutingResult::NeedConsultTenantManager;
304 : };
305 22 : let Some(first_handle) = first_handle.upgrade() else {
306 : // TODO: dedup with get()
307 0 : trace!("handle cache stale");
308 0 : let first_key_owned = *first_key;
309 0 : self.map.remove(&first_key_owned).unwrap();
310 0 : continue;
311 : };
312 :
313 22 : let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
314 22 : let make_shard_index = |shard_num: ShardNumber| ShardIndex {
315 22 : shard_number: shard_num,
316 22 : shard_count: first_handle_shard_identity.count,
317 22 : };
318 :
319 22 : let need_idx = match shard_selector {
320 22 : ShardSelector::Page(key) => {
321 22 : make_shard_index(first_handle_shard_identity.get_shard_number(&key))
322 : }
323 0 : ShardSelector::Zero => make_shard_index(ShardNumber(0)),
324 0 : ShardSelector::Known(shard_idx) => shard_idx,
325 : };
326 22 : let need_shard_timeline_id = ShardTimelineId {
327 22 : shard_index: need_idx,
328 22 : timeline_id,
329 22 : };
330 22 : let first_handle_shard_timeline_id = ShardTimelineId {
331 22 : shard_index: first_handle_shard_identity.shard_index(),
332 22 : timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
333 22 : };
334 22 :
335 22 : if need_shard_timeline_id == first_handle_shard_timeline_id {
336 16 : return RoutingResult::FastPath(Handle(first_handle));
337 : } else {
338 6 : return RoutingResult::SlowPath(need_shard_timeline_id);
339 : }
340 : }
341 54 : }
342 :
343 38 : #[instrument(level = "trace", skip_all)]
344 : #[inline(always)]
345 : async fn get_miss(
346 : &mut self,
347 : timeline_id: TimelineId,
348 : shard_selector: ShardSelector,
349 : tenant_manager: &T::TenantManager,
350 : ) -> Result<Handle<T>, GetError<T>> {
351 : match tenant_manager.resolve(timeline_id, shard_selector).await {
352 : Ok(timeline) => {
353 : let key = timeline.shard_timeline_id();
354 : match &shard_selector {
355 : ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)),
356 : ShardSelector::Page(_) => (), // gotta trust tenant_manager
357 : ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
358 : }
359 :
360 : let gate_guard = match timeline.gate().enter() {
361 : Ok(guard) => guard,
362 : Err(_) => {
363 : return Err(GetError::TimelineGateClosed);
364 : }
365 : };
366 : trace!("creating new HandleInner");
367 : let handle = Arc::new(
368 : // TODO: global metric that keeps track of the number of live HandlerTimeline instances
369 : // so we can identify reference cycle bugs.
370 : HandleInner {
371 : shut_down: AtomicBool::new(false),
372 : _gate_guard: gate_guard,
373 : timeline: timeline.clone(),
374 : },
375 : );
376 : let handle = {
377 : let mut lock_guard = timeline
378 : .per_timeline_state()
379 : .handles
380 : .lock()
381 : .expect("mutex poisoned");
382 : match &mut *lock_guard {
383 : Some(per_timeline_state) => {
384 : let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
385 : assert!(replaced.is_none(), "some earlier code left a stale handle");
386 : match self.map.entry(key) {
387 : hash_map::Entry::Occupied(_o) => {
388 : // This cannot not happen because
389 : // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and
390 : // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle
391 : // while we were waiting for the tenant manager.
392 : unreachable!()
393 : }
394 : hash_map::Entry::Vacant(v) => {
395 : v.insert(Arc::downgrade(&handle));
396 : handle
397 : }
398 : }
399 : }
400 : None => {
401 : return Err(GetError::PerTimelineStateShutDown);
402 : }
403 : }
404 : };
405 : Ok(Handle(handle))
406 : }
407 : Err(e) => Err(GetError::TenantManager(e)),
408 : }
409 : }
410 : }
411 :
412 : impl<T: Types> PerTimelineState<T> {
413 : /// After this method returns, [`Cache::get`] will never again return a [`Handle`]
414 : /// to the [`Types::Timeline`] that embeds this per-timeline state.
415 : /// Even if [`TenantManager::resolve`] would still resolve to it.
416 : ///
417 : /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
418 : /// That's ok because they're short-lived. See module-level comment for details.
419 14 : #[instrument(level = "trace", skip_all)]
420 : pub(super) fn shutdown(&self) {
421 : let handles = self
422 : .handles
423 : .lock()
424 : .expect("mutex poisoned")
425 : // NB: this .take() sets locked to None.
426 : // That's what makes future `Cache::get` misses fail.
427 : // Cache hits are taken care of below.
428 : .take();
429 : let Some(handles) = handles else {
430 : trace!("already shut down");
431 : return;
432 : };
433 : for handle in handles.values() {
434 : // Make hits fail.
435 : handle.shut_down.store(true, Ordering::Relaxed);
436 : }
437 : drop(handles);
438 : }
439 : }
440 :
441 : impl<T: Types> std::ops::Deref for Handle<T> {
442 : type Target = T::Timeline;
443 62 : fn deref(&self) -> &Self::Target {
444 62 : &self.0.timeline
445 62 : }
446 : }
447 :
448 : #[cfg(test)]
449 : impl<T: Types> Drop for HandleInner<T> {
450 34 : fn drop(&mut self) {
451 34 : trace!("HandleInner dropped");
452 34 : }
453 : }
454 :
455 : // When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
456 : impl<T: Types> Drop for Cache<T> {
457 26 : fn drop(&mut self) {
458 26 : for (_, weak) in self.map.drain() {
459 26 : if let Some(strong) = weak.upgrade() {
460 : // handle is still being kept alive in PerTimelineState
461 26 : let timeline = strong.timeline.per_timeline_state();
462 26 : let mut handles = timeline.handles.lock().expect("mutex poisoned");
463 26 : if let Some(handles) = &mut *handles {
464 26 : let Some(removed) = handles.remove(&self.id) else {
465 : // There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
466 0 : continue;
467 : };
468 26 : assert!(Arc::ptr_eq(&removed, &strong));
469 0 : }
470 0 : }
471 : }
472 26 : }
473 : }
474 :
475 : #[cfg(test)]
476 : mod tests {
477 : use pageserver_api::{
478 : key::{rel_block_to_key, Key, DBDIR_KEY},
479 : models::ShardParameters,
480 : reltag::RelTag,
481 : shard::ShardStripeSize,
482 : };
483 : use utils::shard::ShardCount;
484 :
485 : use super::*;
486 :
487 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX);
488 :
489 : #[derive(Debug)]
490 : struct TestTypes;
491 : impl Types for TestTypes {
492 : type TenantManagerError = anyhow::Error;
493 : type TenantManager = StubManager;
494 : type Timeline = Arc<StubTimeline>;
495 : }
496 :
497 : struct StubManager {
498 : shards: Vec<Arc<StubTimeline>>,
499 : }
500 :
501 : struct StubTimeline {
502 : gate: utils::sync::gate::Gate,
503 : id: TimelineId,
504 : shard: ShardIdentity,
505 : per_timeline_state: PerTimelineState<TestTypes>,
506 : myself: Weak<StubTimeline>,
507 : }
508 :
509 : impl StubTimeline {
510 22 : fn getpage(&self) {
511 22 : // do nothing
512 22 : }
513 : }
514 :
515 : impl ArcTimeline<TestTypes> for Arc<StubTimeline> {
516 36 : fn gate(&self) -> &utils::sync::gate::Gate {
517 36 : &self.gate
518 36 : }
519 :
520 62 : fn shard_timeline_id(&self) -> ShardTimelineId {
521 62 : ShardTimelineId {
522 62 : shard_index: self.shard.shard_index(),
523 62 : timeline_id: self.id,
524 62 : }
525 62 : }
526 :
527 22 : fn get_shard_identity(&self) -> &ShardIdentity {
528 22 : &self.shard
529 22 : }
530 :
531 60 : fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
532 60 : &self.per_timeline_state
533 60 : }
534 : }
535 :
536 : impl TenantManager<TestTypes> for StubManager {
537 40 : async fn resolve(
538 40 : &self,
539 40 : timeline_id: TimelineId,
540 40 : shard_selector: ShardSelector,
541 40 : ) -> anyhow::Result<Arc<StubTimeline>> {
542 48 : for timeline in &self.shards {
543 44 : if timeline.id == timeline_id {
544 0 : match &shard_selector {
545 0 : ShardSelector::Zero if timeline.shard.is_shard_zero() => {
546 0 : return Ok(Arc::clone(timeline));
547 : }
548 0 : ShardSelector::Zero => continue,
549 32 : ShardSelector::Page(key) if timeline.shard.is_key_local(key) => {
550 32 : return Ok(Arc::clone(timeline));
551 : }
552 0 : ShardSelector::Page(_) => continue,
553 6 : ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => {
554 4 : return Ok(Arc::clone(timeline));
555 : }
556 2 : ShardSelector::Known(_) => continue,
557 : }
558 6 : }
559 : }
560 4 : anyhow::bail!("not found")
561 40 : }
562 : }
563 :
564 : #[tokio::test(start_paused = true)]
565 2 : async fn test_timeline_shutdown() {
566 2 : crate::tenant::harness::setup_logging();
567 2 :
568 2 : let timeline_id = TimelineId::generate();
569 2 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
570 2 : gate: Default::default(),
571 2 : id: timeline_id,
572 2 : shard: ShardIdentity::unsharded(),
573 2 : per_timeline_state: PerTimelineState::default(),
574 2 : myself: myself.clone(),
575 2 : });
576 2 : let mgr = StubManager {
577 2 : shards: vec![shard0.clone()],
578 2 : };
579 2 : let key = DBDIR_KEY;
580 2 :
581 2 : let mut cache = Cache::<TestTypes>::default();
582 2 :
583 2 : //
584 2 : // fill the cache
585 2 : //
586 2 : assert_eq!(
587 2 : (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
588 2 : (2, 1),
589 2 : "strong: shard0, mgr; weak: myself"
590 2 : );
591 2 :
592 2 : let handle: Handle<_> = cache
593 2 : .get(timeline_id, ShardSelector::Page(key), &mgr)
594 2 : .await
595 2 : .expect("we have the timeline");
596 2 : let handle_inner_weak = Arc::downgrade(&handle.0);
597 2 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
598 2 : assert_eq!(
599 2 : (
600 2 : Weak::strong_count(&handle_inner_weak),
601 2 : Weak::weak_count(&handle_inner_weak)
602 2 : ),
603 2 : (2, 2),
604 2 : "strong: handle, per_timeline_state, weak: handle_inner_weak, cache"
605 2 : );
606 2 : assert_eq!(cache.map.len(), 1);
607 2 :
608 2 : assert_eq!(
609 2 : (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
610 2 : (3, 1),
611 2 : "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
612 2 : );
613 2 : drop(handle);
614 2 : assert_eq!(
615 2 : (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
616 2 : (3, 1),
617 2 : "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself"
618 2 : );
619 2 :
620 2 : //
621 2 : // demonstrate that Handle holds up gate closure
622 2 : // but shutdown prevents new handles from being handed out
623 2 : //
624 2 :
625 2 : tokio::select! {
626 2 : _ = shard0.gate.close() => {
627 2 : panic!("cache and per-timeline handler state keep cache open");
628 2 : }
629 2 : _ = tokio::time::sleep(FOREVER) => {
630 2 : // NB: first poll of close() makes it enter closing state
631 2 : }
632 2 : }
633 2 :
634 2 : let handle = cache
635 2 : .get(timeline_id, ShardSelector::Page(key), &mgr)
636 2 : .await
637 2 : .expect("we have the timeline");
638 2 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
639 2 :
640 2 : // SHUTDOWN
641 2 : shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
642 2 :
643 2 : assert_eq!(
644 2 : 1,
645 2 : Weak::strong_count(&handle_inner_weak),
646 2 : "through local var handle"
647 2 : );
648 2 : assert_eq!(
649 2 : cache.map.len(),
650 2 : 1,
651 2 : "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after"
652 2 : );
653 2 : assert_eq!(
654 2 : (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
655 2 : (3, 1),
656 2 : "strong: handleinner(via handle), shard0, mgr; weak: myself"
657 2 : );
658 2 :
659 2 : // this handle is perfectly usable
660 2 : handle.getpage();
661 2 :
662 2 : cache
663 2 : .get(timeline_id, ShardSelector::Page(key), &mgr)
664 2 : .await
665 2 : .err()
666 2 : .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle");
667 2 : assert_eq!(
668 2 : cache.map.len(),
669 2 : 0,
670 2 : "first access after shutdown cleans up the Weak's from the cache"
671 2 : );
672 2 :
673 2 : tokio::select! {
674 2 : _ = shard0.gate.close() => {
675 2 : panic!("handle is keeping gate open");
676 2 : }
677 2 : _ = tokio::time::sleep(FOREVER) => { }
678 2 : }
679 2 :
680 2 : drop(handle);
681 2 : assert_eq!(
682 2 : 0,
683 2 : Weak::strong_count(&handle_inner_weak),
684 2 : "the HandleInner destructor already ran"
685 2 : );
686 2 : assert_eq!(
687 2 : (Arc::strong_count(&shard0), Arc::weak_count(&shard0)),
688 2 : (2, 1),
689 2 : "strong: shard0, mgr; weak: myself"
690 2 : );
691 2 :
692 2 : // closing gate succeeds after dropping handle
693 2 : tokio::select! {
694 2 : _ = shard0.gate.close() => { }
695 2 : _ = tokio::time::sleep(FOREVER) => {
696 2 : panic!("handle is dropped, no other gate holders exist")
697 2 : }
698 2 : }
699 2 :
700 2 : // map gets cleaned on next lookup
701 2 : cache
702 2 : .get(timeline_id, ShardSelector::Page(key), &mgr)
703 2 : .await
704 2 : .err()
705 2 : .expect("documented behavior: can't get new handle after shutdown");
706 2 : assert_eq!(cache.map.len(), 0);
707 2 :
708 2 : // ensure all refs to shard0 are gone and we're not leaking anything
709 2 : let myself = Weak::clone(&shard0.myself);
710 2 : drop(shard0);
711 2 : drop(mgr);
712 2 : assert_eq!(Weak::strong_count(&myself), 0);
713 2 : }
714 :
715 : #[tokio::test]
716 2 : async fn test_multiple_timelines_and_deletion() {
717 2 : crate::tenant::harness::setup_logging();
718 2 :
719 2 : let timeline_a = TimelineId::generate();
720 2 : let timeline_b = TimelineId::generate();
721 2 : assert_ne!(timeline_a, timeline_b);
722 2 : let timeline_a = Arc::new_cyclic(|myself| StubTimeline {
723 2 : gate: Default::default(),
724 2 : id: timeline_a,
725 2 : shard: ShardIdentity::unsharded(),
726 2 : per_timeline_state: PerTimelineState::default(),
727 2 : myself: myself.clone(),
728 2 : });
729 2 : let timeline_b = Arc::new_cyclic(|myself| StubTimeline {
730 2 : gate: Default::default(),
731 2 : id: timeline_b,
732 2 : shard: ShardIdentity::unsharded(),
733 2 : per_timeline_state: PerTimelineState::default(),
734 2 : myself: myself.clone(),
735 2 : });
736 2 : let mut mgr = StubManager {
737 2 : shards: vec![timeline_a.clone(), timeline_b.clone()],
738 2 : };
739 2 : let key = DBDIR_KEY;
740 2 :
741 2 : let mut cache = Cache::<TestTypes>::default();
742 2 :
743 2 : cache
744 2 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
745 2 : .await
746 2 : .expect("we have it");
747 2 : cache
748 2 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
749 2 : .await
750 2 : .expect("we have it");
751 2 : assert_eq!(cache.map.len(), 2);
752 2 :
753 2 : // delete timeline A
754 2 : timeline_a.per_timeline_state.shutdown();
755 4 : mgr.shards.retain(|t| t.id != timeline_a.id);
756 2 : assert!(
757 2 : mgr.resolve(timeline_a.id, ShardSelector::Page(key))
758 2 : .await
759 2 : .is_err(),
760 2 : "broken StubManager implementation"
761 2 : );
762 2 :
763 2 : assert_eq!(
764 2 : cache.map.len(),
765 2 : 2,
766 2 : "cache still has a Weak handle to Timeline A"
767 2 : );
768 2 : cache
769 2 : .get(timeline_a.id, ShardSelector::Page(key), &mgr)
770 2 : .await
771 2 : .err()
772 2 : .expect("documented behavior: can't get new handle after shutdown");
773 2 : assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
774 2 :
775 2 : cache
776 2 : .get(timeline_b.id, ShardSelector::Page(key), &mgr)
777 2 : .await
778 2 : .expect("we still have it");
779 2 : }
780 :
781 14 : fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key {
782 14 : rel_block_to_key(
783 14 : RelTag {
784 14 : spcnode: 1663,
785 14 : dbnode: 208101,
786 14 : relnode: 2620,
787 14 : forknum: 0,
788 14 : },
789 14 : shard.0 as u32 * params.stripe_size.0,
790 14 : )
791 14 : }
792 :
793 : #[tokio::test(start_paused = true)]
794 2 : async fn test_shard_split() {
795 2 : crate::tenant::harness::setup_logging();
796 2 : let timeline_id = TimelineId::generate();
797 2 : let parent = Arc::new_cyclic(|myself| StubTimeline {
798 2 : gate: Default::default(),
799 2 : id: timeline_id,
800 2 : shard: ShardIdentity::unsharded(),
801 2 : per_timeline_state: PerTimelineState::default(),
802 2 : myself: myself.clone(),
803 2 : });
804 2 : let child_params = ShardParameters {
805 2 : count: ShardCount(2),
806 2 : stripe_size: ShardStripeSize::default(),
807 2 : };
808 2 : let child0 = Arc::new_cyclic(|myself| StubTimeline {
809 2 : gate: Default::default(),
810 2 : id: timeline_id,
811 2 : shard: ShardIdentity::from_params(ShardNumber(0), &child_params),
812 2 : per_timeline_state: PerTimelineState::default(),
813 2 : myself: myself.clone(),
814 2 : });
815 2 : let child1 = Arc::new_cyclic(|myself| StubTimeline {
816 2 : gate: Default::default(),
817 2 : id: timeline_id,
818 2 : shard: ShardIdentity::from_params(ShardNumber(1), &child_params),
819 2 : per_timeline_state: PerTimelineState::default(),
820 2 : myself: myself.clone(),
821 2 : });
822 2 : let child_shards_by_shard_number = [child0.clone(), child1.clone()];
823 2 :
824 2 : let mut cache = Cache::<TestTypes>::default();
825 2 :
826 2 : // fill the cache with the parent
827 6 : for i in 0..2 {
828 4 : let handle = cache
829 4 : .get(
830 4 : timeline_id,
831 4 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
832 4 : &StubManager {
833 4 : shards: vec![parent.clone()],
834 4 : },
835 4 : )
836 2 : .await
837 4 : .expect("we have it");
838 4 : assert!(
839 4 : Weak::ptr_eq(&handle.myself, &parent.myself),
840 2 : "mgr returns parent first"
841 2 : );
842 4 : drop(handle);
843 2 : }
844 2 :
845 2 : //
846 2 : // SHARD SPLIT: tenant manager changes, but the cache isn't informed
847 2 : //
848 2 :
849 2 : // while we haven't shut down the parent, the cache will return the cached parent, even
850 2 : // if the tenant manager returns the child
851 6 : for i in 0..2 {
852 4 : let handle = cache
853 4 : .get(
854 4 : timeline_id,
855 4 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
856 4 : &StubManager {
857 4 : shards: vec![], // doesn't matter what's in here, the cache is fully loaded
858 4 : },
859 4 : )
860 2 : .await
861 4 : .expect("we have it");
862 4 : assert!(
863 4 : Weak::ptr_eq(&handle.myself, &parent.myself),
864 2 : "mgr returns parent"
865 2 : );
866 4 : drop(handle);
867 2 : }
868 2 :
869 2 : let parent_handle = cache
870 2 : .get(
871 2 : timeline_id,
872 2 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)),
873 2 : &StubManager {
874 2 : shards: vec![parent.clone()],
875 2 : },
876 2 : )
877 2 : .await
878 2 : .expect("we have it");
879 2 : assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
880 2 :
881 2 : // invalidate the cache
882 2 : parent.per_timeline_state.shutdown();
883 2 :
884 2 : // the cache will now return the child, even though the parent handle still exists
885 6 : for i in 0..2 {
886 4 : let handle = cache
887 4 : .get(
888 4 : timeline_id,
889 4 : ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)),
890 4 : &StubManager {
891 4 : shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop
892 4 : },
893 4 : )
894 2 : .await
895 4 : .expect("we have it");
896 4 : assert!(
897 4 : Weak::ptr_eq(
898 4 : &handle.myself,
899 4 : &child_shards_by_shard_number[i as usize].myself
900 4 : ),
901 2 : "mgr returns child"
902 2 : );
903 4 : drop(handle);
904 2 : }
905 2 :
906 2 : // all the while the parent handle kept the parent gate open
907 2 : tokio::select! {
908 2 : _ = parent_handle.gate.close() => {
909 2 : panic!("parent handle is keeping gate open");
910 2 : }
911 2 : _ = tokio::time::sleep(FOREVER) => { }
912 2 : }
913 2 : drop(parent_handle);
914 2 : tokio::select! {
915 2 : _ = parent.gate.close() => { }
916 2 : _ = tokio::time::sleep(FOREVER) => {
917 2 : panic!("parent handle is dropped, no other gate holders exist")
918 2 : }
919 2 : }
920 2 : }
921 :
922 : #[tokio::test(start_paused = true)]
923 2 : async fn test_connection_handler_exit() {
924 2 : crate::tenant::harness::setup_logging();
925 2 : let timeline_id = TimelineId::generate();
926 2 : let shard0 = Arc::new_cyclic(|myself| StubTimeline {
927 2 : gate: Default::default(),
928 2 : id: timeline_id,
929 2 : shard: ShardIdentity::unsharded(),
930 2 : per_timeline_state: PerTimelineState::default(),
931 2 : myself: myself.clone(),
932 2 : });
933 2 : let mgr = StubManager {
934 2 : shards: vec![shard0.clone()],
935 2 : };
936 2 : let key = DBDIR_KEY;
937 2 :
938 2 : // Simulate 10 connections that's opened, used, and closed
939 2 : let mut used_handles = vec![];
940 22 : for _ in 0..10 {
941 20 : let mut cache = Cache::<TestTypes>::default();
942 20 : let handle = {
943 20 : let handle = cache
944 20 : .get(timeline_id, ShardSelector::Page(key), &mgr)
945 2 : .await
946 20 : .expect("we have the timeline");
947 20 : assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
948 20 : handle
949 20 : };
950 20 : handle.getpage();
951 20 : used_handles.push(Arc::downgrade(&handle.0));
952 2 : }
953 2 :
954 2 : // No handles exist, thus gates are closed and don't require shutdown
955 2 : assert!(used_handles
956 2 : .iter()
957 20 : .all(|weak| Weak::strong_count(weak) == 0));
958 2 :
959 2 : // ... thus the gate should close immediately, even without shutdown
960 2 : tokio::select! {
961 2 : _ = shard0.gate.close() => { }
962 2 : _ = tokio::time::sleep(FOREVER) => {
963 2 : panic!("handle is dropped, no other gate holders exist")
964 2 : }
965 2 : }
966 2 : }
967 : }
|