LCOV - differential code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 90.9 % 453 412 41 412
Current Date: 2023-10-19 02:04:12 Functions: 89.3 % 56 50 6 50
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Global page cache
       3                 : //!
       4                 : //! The page cache uses up most of the memory in the page server. It is shared
       5                 : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6                 : //! the cache allows memory to be dynamically allocated where it's needed the
       7                 : //! most.
       8                 : //!
       9                 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10                 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11                 : //! information about what's stored in the buffer.
      12                 : //!
      13                 : //! # Types Of Pages
      14                 : //!
      15                 : //! [`PageCache`] only supports immutable pages.
      16                 : //! Hence there is no need to worry about coherency.
      17                 : //!
      18                 : //! Two types of pages are supported:
      19                 : //!
      20                 : //! * **Materialized pages**, filled & used by page reconstruction
      21                 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      22                 : //!
      23                 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      24                 : //! It uses the page cache only for the blocks that are already fully written and immutable.
      25                 : //!
      26                 : //! # Filling The Page Cache
      27                 : //!
      28                 : //! Page cache maps from a cache key to a buffer slot.
      29                 : //! The cache key uniquely identifies the piece of data that is being cached.
      30                 : //!
      31                 : //! The cache key for **materialized pages** is  [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
      32                 : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
      33                 : //!
      34                 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      35                 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      36                 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      37                 : //! * Get a [`FileId`] using [`next_file_id`].
      38                 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      39                 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      40                 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      41                 : //!   a read guard for the page. Just use it.
      42                 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      43                 : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      44                 : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      45                 : //!   Then try again to [`PageCache::read_immutable_buf`].
      46                 : //!   Unless there's high cache pressure, the page should now be cached.
      47                 : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      48                 : //!
      49                 : //! # Locking
      50                 : //!
      51                 : //! There are two levels of locking involved: There's one lock for the "mapping"
      52                 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      53                 : //! slot, and a separate lock on each slot. To read or write the contents of a
      54                 : //! slot, you must hold the lock on the slot in read or write mode,
      55                 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      56                 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      57                 : //! the slot at the same time.
      58                 : //!
      59                 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      60                 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      61                 : //! in the cache, you would first look up the mapping, while holding the mapping
      62                 : //! lock, and then lock the slot. You must release the mapping lock in between,
      63                 : //! to obey the lock ordering and avoid deadlock.
      64                 : //!
      65                 : //! A slot can momentarily have invalid contents, even if it's already been
      66                 : //! inserted to the mapping, but you must hold the write-lock on the slot until
      67                 : //! the contents are valid. If you need to release the lock without initializing
      68                 : //! the contents, you must remove the mapping first. We make that easy for the
      69                 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      70                 : //! initialized it. If the guard is dropped without calling mark_valid(), the
      71                 : //! mapping is automatically removed and the slot is marked free.
      72                 : //!
      73                 : 
      74                 : use std::{
      75                 :     collections::{hash_map::Entry, HashMap},
      76                 :     convert::TryInto,
      77                 :     sync::{
      78                 :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      79                 :         Arc, Weak,
      80                 :     },
      81                 :     time::Duration,
      82                 : };
      83                 : 
      84                 : use anyhow::Context;
      85                 : use once_cell::sync::OnceCell;
      86                 : use utils::{
      87                 :     id::{TenantId, TimelineId},
      88                 :     lsn::Lsn,
      89                 : };
      90                 : 
      91                 : use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key};
      92                 : 
      93                 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      94                 : const TEST_PAGE_CACHE_SIZE: usize = 50;
      95                 : 
      96                 : ///
      97                 : /// Initialize the page cache. This must be called once at page server startup.
      98                 : ///
      99 CBC         560 : pub fn init(size: usize) {
     100             560 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
     101 UBC           0 :         panic!("page cache already initialized");
     102 CBC         560 :     }
     103             560 : }
     104                 : 
     105                 : ///
     106                 : /// Get a handle to the page cache.
     107                 : ///
     108       300896511 : pub fn get() -> &'static PageCache {
     109       300896511 :     //
     110       300896511 :     // In unit tests, page server startup doesn't happen and no one calls
     111       300896511 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     112       300896511 :     // page cache is usable in unit tests.
     113       300896511 :     //
     114       300896511 :     if cfg!(test) {
     115         1652600 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     116                 :     } else {
     117       299243911 :         PAGE_CACHE.get().expect("page cache not initialized")
     118                 :     }
     119       300896511 : }
     120                 : 
     121                 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     122                 : const MAX_USAGE_COUNT: u8 = 5;
     123                 : 
     124                 : /// See module-level comment.
     125       576067250 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     126                 : pub struct FileId(u64);
     127                 : 
     128                 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     129                 : 
     130                 : /// See module-level comment.
     131           20489 : pub fn next_file_id() -> FileId {
     132           20489 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     133           20489 : }
     134                 : 
     135                 : ///
     136                 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     137                 : ///
     138       311165775 : #[derive(Debug, PartialEq, Eq, Clone)]
     139                 : #[allow(clippy::enum_variant_names)]
     140                 : enum CacheKey {
     141                 :     MaterializedPage {
     142                 :         hash_key: MaterializedPageHashKey,
     143                 :         lsn: Lsn,
     144                 :     },
     145                 :     ImmutableFilePage {
     146                 :         file_id: FileId,
     147                 :         blkno: u32,
     148                 :     },
     149                 : }
     150                 : 
     151        12948523 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
     152                 : struct MaterializedPageHashKey {
     153                 :     tenant_id: TenantId,
     154                 :     timeline_id: TimelineId,
     155                 :     key: Key,
     156                 : }
     157                 : 
     158 UBC           0 : #[derive(Clone)]
     159                 : struct Version {
     160                 :     lsn: Lsn,
     161                 :     slot_idx: usize,
     162                 : }
     163                 : 
     164                 : struct Slot {
     165                 :     inner: tokio::sync::RwLock<SlotInner>,
     166                 :     usage_count: AtomicU8,
     167                 : }
     168                 : 
     169                 : struct SlotInner {
     170                 :     key: Option<CacheKey>,
     171                 :     // for `coalesce_readers_permit`
     172                 :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     173                 :     buf: &'static mut [u8; PAGE_SZ],
     174                 : }
     175                 : 
     176                 : impl Slot {
     177                 :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     178 CBC   283300727 :     fn inc_usage_count(&self) {
     179       283300727 :         let _ = self
     180       283300727 :             .usage_count
     181       283300992 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     182       283300992 :                 if val == MAX_USAGE_COUNT {
     183       257003237 :                     None
     184                 :                 } else {
     185        26297755 :                     Some(val + 1)
     186                 :                 }
     187       283300992 :             });
     188       283300727 :     }
     189                 : 
     190                 :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     191                 :     /// the old usage count.
     192        44918480 :     fn dec_usage_count(&self) -> u8 {
     193        44918480 :         let count_res =
     194        44918480 :             self.usage_count
     195        44918480 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     196        44918464 :                     if val == 0 {
     197        12521726 :                         None
     198                 :                     } else {
     199        32396738 :                         Some(val - 1)
     200                 :                     }
     201        44918480 :                 });
     202        44918480 : 
     203        44918480 :         match count_res {
     204        32396749 :             Ok(usage_count) => usage_count,
     205        12521731 :             Err(usage_count) => usage_count,
     206                 :         }
     207        44918480 :     }
     208                 : 
     209                 :     /// Sets the usage count to a specific value.
     210        12521258 :     fn set_usage_count(&self, count: u8) {
     211        12521258 :         self.usage_count.store(count, Ordering::Relaxed);
     212        12521258 :     }
     213                 : }
     214                 : 
     215                 : impl SlotInner {
     216                 :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     217       283296540 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     218       283296540 :         let mut guard = self.permit.lock().unwrap();
     219       283296540 :         if let Some(existing_permit) = guard.upgrade() {
     220          147452 :             drop(guard);
     221          147452 :             drop(permit);
     222          147452 :             existing_permit
     223                 :         } else {
     224       283149088 :             let permit = Arc::new(permit);
     225       283149088 :             *guard = Arc::downgrade(&permit);
     226       283149088 :             permit
     227                 :         }
     228       283296540 :     }
     229                 : }
     230                 : 
     231                 : pub struct PageCache {
     232                 :     /// This contains the mapping from the cache key to buffer slot that currently
     233                 :     /// contains the page, if any.
     234                 :     ///
     235                 :     /// TODO: This is protected by a single lock. If that becomes a bottleneck,
     236                 :     /// this HashMap can be replaced with a more concurrent version, there are
     237                 :     /// plenty of such crates around.
     238                 :     ///
     239                 :     /// If you add support for caching different kinds of objects, each object kind
     240                 :     /// can have a separate mapping map, next to this field.
     241                 :     materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
     242                 : 
     243                 :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     244                 : 
     245                 :     /// The actual buffers with their metadata.
     246                 :     slots: Box<[Slot]>,
     247                 : 
     248                 :     pinned_slots: Arc<tokio::sync::Semaphore>,
     249                 : 
     250                 :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     251                 :     /// This is interpreted modulo the page cache size.
     252                 :     next_evict_slot: AtomicUsize,
     253                 : 
     254                 :     size_metrics: &'static PageCacheSizeMetrics,
     255                 : }
     256                 : 
     257                 : struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
     258                 : 
     259                 : ///
     260                 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     261                 : /// until the guard is dropped.
     262                 : ///
     263                 : pub struct PageReadGuard<'i> {
     264                 :     _permit: Arc<PinnedSlotsPermit>,
     265                 :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     266                 : }
     267                 : 
     268                 : impl std::ops::Deref for PageReadGuard<'_> {
     269                 :     type Target = [u8; PAGE_SZ];
     270                 : 
     271       622763980 :     fn deref(&self) -> &Self::Target {
     272       622763980 :         self.slot_guard.buf
     273       622763980 :     }
     274                 : }
     275                 : 
     276                 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     277 UBC           0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     278               0 :         self.slot_guard.buf
     279               0 :     }
     280                 : }
     281                 : 
     282                 : ///
     283                 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     284                 : /// until the guard is dropped.
     285                 : ///
     286                 : /// Counterintuitively, this is used even for a read, if the requested page is not
     287                 : /// currently found in the page cache. In that case, the caller of lock_for_read()
     288                 : /// is expected to fill in the page contents and call mark_valid().
     289                 : pub struct PageWriteGuard<'i> {
     290                 :     state: PageWriteGuardState<'i>,
     291                 : }
     292                 : 
     293                 : enum PageWriteGuardState<'i> {
     294                 :     Invalid {
     295                 :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     296                 :         _permit: PinnedSlotsPermit,
     297                 :     },
     298                 :     Downgraded,
     299                 : }
     300                 : 
     301                 : impl std::ops::DerefMut for PageWriteGuard<'_> {
     302 CBC    12521258 :     fn deref_mut(&mut self) -> &mut Self::Target {
     303        12521258 :         match &mut self.state {
     304        12521258 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     305 UBC           0 :             PageWriteGuardState::Downgraded => unreachable!(),
     306                 :         }
     307 CBC    12521258 :     }
     308                 : }
     309                 : 
     310                 : impl std::ops::Deref for PageWriteGuard<'_> {
     311                 :     type Target = [u8; PAGE_SZ];
     312                 : 
     313 UBC           0 :     fn deref(&self) -> &Self::Target {
     314               0 :         match &self.state {
     315               0 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     316               0 :             PageWriteGuardState::Downgraded => unreachable!(),
     317                 :         }
     318               0 :     }
     319                 : }
     320                 : 
     321                 : impl<'a> PageWriteGuard<'a> {
     322                 :     /// Mark that the buffer contents are now valid.
     323                 :     #[must_use]
     324 CBC    12521257 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     325        12521257 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     326        12521257 :         match prev {
     327        12521257 :             PageWriteGuardState::Invalid { inner, _permit } => {
     328        12521257 :                 assert!(inner.key.is_some());
     329        12521257 :                 PageReadGuard {
     330        12521257 :                     _permit: Arc::new(_permit),
     331        12521257 :                     slot_guard: inner.downgrade(),
     332        12521257 :                 }
     333                 :             }
     334 UBC           0 :             PageWriteGuardState::Downgraded => unreachable!(),
     335                 :         }
     336 CBC    12521257 :     }
     337                 : }
     338                 : 
     339                 : impl Drop for PageWriteGuard<'_> {
     340                 :     ///
     341                 :     /// If the buffer was allocated for a page that was not already in the
     342                 :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     343                 :     /// initializing it, remove the mapping from the page cache.
     344                 :     ///
     345        12521246 :     fn drop(&mut self) {
     346        12521246 :         match &mut self.state {
     347 UBC           0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     348               0 :                 assert!(inner.key.is_some());
     349               0 :                 let self_key = inner.key.as_ref().unwrap();
     350               0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     351               0 :                 inner.key = None;
     352                 :             }
     353 CBC    12521246 :             PageWriteGuardState::Downgraded => {}
     354                 :         }
     355        12521246 :     }
     356                 : }
     357                 : 
     358                 : /// lock_for_read() return value
     359                 : pub enum ReadBufResult<'a> {
     360                 :     Found(PageReadGuard<'a>),
     361                 :     NotFound(PageWriteGuard<'a>),
     362                 : }
     363                 : 
     364                 : impl PageCache {
     365                 :     //
     366                 :     // Section 1.1: Public interface functions for looking up and memorizing materialized page
     367                 :     // versions in the page cache
     368                 :     //
     369                 : 
     370                 :     /// Look up a materialized page version.
     371                 :     ///
     372                 :     /// The 'lsn' is an upper bound, this will return the latest version of
     373                 :     /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
     374                 :     /// returned page.
     375         6372324 :     pub async fn lookup_materialized_page(
     376         6372324 :         &self,
     377         6372324 :         tenant_id: TenantId,
     378         6372324 :         timeline_id: TimelineId,
     379         6372324 :         key: &Key,
     380         6372324 :         lsn: Lsn,
     381         6372324 :         ctx: &RequestContext,
     382         6372324 :     ) -> Option<(Lsn, PageReadGuard)> {
     383         6372322 :         let Ok(permit) = self.try_get_pinned_slot_permit().await else {
     384 UBC           0 :             return None;
     385                 :         };
     386                 : 
     387 CBC     6372322 :         crate::metrics::PAGE_CACHE
     388         6372322 :             .for_ctx(ctx)
     389         6372322 :             .read_accesses_materialized_page
     390         6372322 :             .inc();
     391         6372322 : 
     392         6372322 :         let mut cache_key = CacheKey::MaterializedPage {
     393         6372322 :             hash_key: MaterializedPageHashKey {
     394         6372322 :                 tenant_id,
     395         6372322 :                 timeline_id,
     396         6372322 :                 key: *key,
     397         6372322 :             },
     398         6372322 :             lsn,
     399         6372322 :         };
     400                 : 
     401         6372322 :         if let Some(guard) = self
     402         6372322 :             .try_lock_for_read(&mut cache_key, &mut Some(permit))
     403            1419 :             .await
     404                 :         {
     405                 :             if let CacheKey::MaterializedPage {
     406                 :                 hash_key: _,
     407         1297801 :                 lsn: available_lsn,
     408         1297801 :             } = cache_key
     409                 :             {
     410         1297801 :                 if available_lsn == lsn {
     411              16 :                     crate::metrics::PAGE_CACHE
     412              16 :                         .for_ctx(ctx)
     413              16 :                         .read_hits_materialized_page_exact
     414              16 :                         .inc();
     415         1297785 :                 } else {
     416         1297785 :                     crate::metrics::PAGE_CACHE
     417         1297785 :                         .for_ctx(ctx)
     418         1297785 :                         .read_hits_materialized_page_older_lsn
     419         1297785 :                         .inc();
     420         1297785 :                 }
     421         1297801 :                 Some((available_lsn, guard))
     422                 :             } else {
     423 UBC           0 :                 panic!("unexpected key type in slot");
     424                 :             }
     425                 :         } else {
     426 CBC     5074521 :             None
     427                 :         }
     428         6372322 :     }
     429                 : 
     430                 :     ///
     431                 :     /// Store an image of the given page in the cache.
     432                 :     ///
     433         2252422 :     pub async fn memorize_materialized_page(
     434         2252422 :         &self,
     435         2252422 :         tenant_id: TenantId,
     436         2252422 :         timeline_id: TimelineId,
     437         2252422 :         key: Key,
     438         2252422 :         lsn: Lsn,
     439         2252422 :         img: &[u8],
     440         2252422 :     ) -> anyhow::Result<()> {
     441         2252422 :         let cache_key = CacheKey::MaterializedPage {
     442         2252422 :             hash_key: MaterializedPageHashKey {
     443         2252422 :                 tenant_id,
     444         2252422 :                 timeline_id,
     445         2252422 :                 key,
     446         2252422 :             },
     447         2252422 :             lsn,
     448         2252422 :         };
     449                 : 
     450         2252422 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     451                 :         loop {
     452                 :             // First check if the key already exists in the cache.
     453         2252459 :             if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
     454                 :                 // The page was found in the mapping. Lock the slot, and re-check
     455                 :                 // that it's still what we expected (because we don't released the mapping
     456                 :                 // lock already, another thread could have evicted the page)
     457            4187 :                 let slot = &self.slots[slot_idx];
     458            4187 :                 let inner = slot.inner.write().await;
     459            4187 :                 if inner.key.as_ref() == Some(&cache_key) {
     460            4187 :                     slot.inc_usage_count();
     461                 :                     debug_assert!(
     462                 :                         {
     463            4187 :                             let guard = inner.permit.lock().unwrap();
     464            4187 :                             guard.upgrade().is_none()
     465                 :                         },
     466 UBC           0 :                         "we hold a write lock, so, no one else should have a permit"
     467                 :                     );
     468 CBC        4187 :                     debug_assert_eq!(inner.buf.len(), img.len());
     469                 :                     // We already had it in cache. Another thread must've put it there
     470                 :                     // concurrently. Check that it had the same contents that we
     471                 :                     // replayed.
     472            4187 :                     assert!(inner.buf == img);
     473            4187 :                     return Ok(());
     474 UBC           0 :                 }
     475 CBC     2248272 :             }
     476         2248272 :             debug_assert!(permit.is_some());
     477                 : 
     478                 :             // Not found. Find a victim buffer
     479         2248272 :             let (slot_idx, mut inner) = self
     480         2248272 :                 .find_victim(permit.as_ref().unwrap())
     481 UBC           0 :                 .await
     482 CBC     2248272 :                 .context("Failed to find evict victim")?;
     483                 : 
     484                 :             // Insert mapping for this. At this point, we may find that another
     485                 :             // thread did the same thing concurrently. In that case, we evicted
     486                 :             // our victim buffer unnecessarily. Put it into the free list and
     487                 :             // continue with the slot that the other thread chose.
     488         2248272 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
     489                 :                 // TODO: put to free list
     490                 : 
     491                 :                 // We now just loop back to start from beginning. This is not
     492                 :                 // optimal, we'll perform the lookup in the mapping again, which
     493                 :                 // is not really necessary because we already got
     494                 :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     495                 :                 // to matter much.
     496              37 :                 continue;
     497         2248235 :             }
     498         2248235 : 
     499         2248235 :             // Make the slot ready
     500         2248235 :             let slot = &self.slots[slot_idx];
     501         2248235 :             inner.key = Some(cache_key.clone());
     502         2248235 :             slot.set_usage_count(1);
     503                 :             // Create a write guard for the slot so we go through the expected motions.
     504                 :             debug_assert!(
     505                 :                 {
     506         2248235 :                     let guard = inner.permit.lock().unwrap();
     507         2248235 :                     guard.upgrade().is_none()
     508                 :                 },
     509 UBC           0 :                 "we hold a write lock, so, no one else should have a permit"
     510                 :             );
     511 CBC     2248235 :             let mut write_guard = PageWriteGuard {
     512         2248235 :                 state: PageWriteGuardState::Invalid {
     513         2248235 :                     _permit: permit.take().unwrap(),
     514         2248235 :                     inner,
     515         2248235 :                 },
     516         2248235 :             };
     517         2248235 :             write_guard.copy_from_slice(img);
     518         2248235 :             let _ = write_guard.mark_valid();
     519         2248235 :             return Ok(());
     520                 :         }
     521         2252422 :     }
     522                 : 
     523                 :     // Section 1.2: Public interface functions for working with immutable file pages.
     524                 : 
     525       292271765 :     pub async fn read_immutable_buf(
     526       292271765 :         &self,
     527       292271765 :         file_id: FileId,
     528       292271765 :         blkno: u32,
     529       292271765 :         ctx: &RequestContext,
     530       292271765 :     ) -> anyhow::Result<ReadBufResult> {
     531       292271415 :         let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
     532       292271415 : 
     533       292271415 :         self.lock_for_read(&mut cache_key, ctx).await
     534       292271411 :     }
     535                 : 
     536                 :     //
     537                 :     // Section 2: Internal interface functions for lookup/update.
     538                 :     //
     539                 :     // To add support for a new kind of "thing" to cache, you will need
     540                 :     // to add public interface routines above, and code to deal with the
     541                 :     // "mappings" after this section. But the routines in this section should
     542                 :     // not require changes.
     543                 : 
     544       300896511 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     545       300896158 :         let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
     546       300896158 :         match tokio::time::timeout(
     547       300896158 :             // Choose small timeout, neon_smgr does its own retries.
     548       300896158 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     549       300896158 :             Duration::from_secs(10),
     550       300896158 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     551       300896158 :         )
     552         1740328 :         .await
     553                 :         {
     554       300896156 :             Ok(res) => Ok(PinnedSlotsPermit(
     555       300896156 :                 res.expect("this semaphore is never closed"),
     556       300896156 :             )),
     557 UBC           0 :             Err(_timeout) => {
     558               0 :                 timer.stop_and_discard();
     559               0 :                 crate::metrics::page_cache_errors_inc(
     560               0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     561               0 :                 );
     562               0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     563                 :             }
     564                 :         }
     565 CBC   300896156 :     }
     566                 : 
     567                 :     /// Look up a page in the cache.
     568                 :     ///
     569                 :     /// If the search criteria is not exact, *cache_key is updated with the key
     570                 :     /// for exact key of the returned page. (For materialized pages, that means
     571                 :     /// that the LSN in 'cache_key' is updated with the LSN of the returned page
     572                 :     /// version.)
     573                 :     ///
     574                 :     /// If no page is found, returns None and *cache_key is left unmodified.
     575                 :     ///
     576       298644517 :     async fn try_lock_for_read(
     577       298644517 :         &self,
     578       298644517 :         cache_key: &mut CacheKey,
     579       298644517 :         permit: &mut Option<PinnedSlotsPermit>,
     580       298644517 :     ) -> Option<PageReadGuard> {
     581       298644164 :         let cache_key_orig = cache_key.clone();
     582       298644164 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     583                 :             // The page was found in the mapping. Lock the slot, and re-check
     584                 :             // that it's still what we expected (because we released the mapping
     585                 :             // lock already, another thread could have evicted the page)
     586       283296204 :             let slot = &self.slots[slot_idx];
     587       283296204 :             let inner = slot.inner.read().await;
     588       283296204 :             if inner.key.as_ref() == Some(cache_key) {
     589       283296194 :                 slot.inc_usage_count();
     590       283296194 :                 return Some(PageReadGuard {
     591       283296194 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     592       283296194 :                     slot_guard: inner,
     593       283296194 :                 });
     594              10 :             } else {
     595              10 :                 // search_mapping might have modified the search key; restore it.
     596              10 :                 *cache_key = cache_key_orig;
     597              10 :             }
     598        15347960 :         }
     599        15347970 :         None
     600       298644164 :     }
     601                 : 
     602                 :     /// Return a locked buffer for given block.
     603                 :     ///
     604                 :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     605                 :     /// page is already found in the cache, *cache_key is updated.
     606                 :     ///
     607                 :     /// If the page is not found in the cache, this allocates a new buffer for
     608                 :     /// it. The caller may then initialize the buffer with the contents, and
     609                 :     /// call mark_valid().
     610                 :     ///
     611                 :     /// Example usage:
     612                 :     ///
     613                 :     /// ```ignore
     614                 :     /// let cache = page_cache::get();
     615                 :     ///
     616                 :     /// match cache.lock_for_read(&key) {
     617                 :     ///     ReadBufResult::Found(read_guard) => {
     618                 :     ///         // The page was found in cache. Use it
     619                 :     ///     },
     620                 :     ///     ReadBufResult::NotFound(write_guard) => {
     621                 :     ///         // The page was not found in cache. Read it from disk into the
     622                 :     ///         // buffer.
     623                 :     ///         //read_my_page_from_disk(write_guard);
     624                 :     ///
     625                 :     ///         // The buffer contents are now valid. Tell the page cache.
     626                 :     ///         write_guard.mark_valid();
     627                 :     ///     },
     628                 :     /// }
     629                 :     /// ```
     630                 :     ///
     631       292271765 :     async fn lock_for_read(
     632       292271765 :         &self,
     633       292271765 :         cache_key: &mut CacheKey,
     634       292271765 :         ctx: &RequestContext,
     635       292271765 :     ) -> anyhow::Result<ReadBufResult> {
     636       292271415 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     637                 : 
     638       292271411 :         let (read_access, hit) = match cache_key {
     639                 :             CacheKey::MaterializedPage { .. } => {
     640 UBC           0 :                 unreachable!("Materialized pages use lookup_materialized_page")
     641                 :             }
     642 CBC   292271411 :             CacheKey::ImmutableFilePage { .. } => (
     643       292271411 :                 &crate::metrics::PAGE_CACHE
     644       292271411 :                     .for_ctx(ctx)
     645       292271411 :                     .read_accesses_immutable,
     646       292271411 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     647       292271411 :             ),
     648       292271411 :         };
     649       292271411 :         read_access.inc();
     650       292271411 : 
     651       292271411 :         let mut is_first_iteration = true;
     652                 :         loop {
     653                 :             // First check if the key already exists in the cache.
     654       292271842 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     655       281998393 :                 debug_assert!(permit.is_none());
     656       281998393 :                 if is_first_iteration {
     657       281997962 :                     hit.inc();
     658       281997962 :                 }
     659       281998393 :                 return Ok(ReadBufResult::Found(read_guard));
     660        10273449 :             }
     661        10273449 :             debug_assert!(permit.is_some());
     662        10273449 :             is_first_iteration = false;
     663                 : 
     664                 :             // Not found. Find a victim buffer
     665        10273449 :             let (slot_idx, mut inner) = self
     666        10273449 :                 .find_victim(permit.as_ref().unwrap())
     667 UBC           0 :                 .await
     668 CBC    10273449 :                 .context("Failed to find evict victim")?;
     669                 : 
     670                 :             // Insert mapping for this. At this point, we may find that another
     671                 :             // thread did the same thing concurrently. In that case, we evicted
     672                 :             // our victim buffer unnecessarily. Put it into the free list and
     673                 :             // continue with the slot that the other thread chose.
     674        10273449 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     675                 :                 // TODO: put to free list
     676                 : 
     677                 :                 // We now just loop back to start from beginning. This is not
     678                 :                 // optimal, we'll perform the lookup in the mapping again, which
     679                 :                 // is not really necessary because we already got
     680                 :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     681                 :                 // to matter much.
     682             431 :                 continue;
     683        10273018 :             }
     684        10273018 : 
     685        10273018 :             // Make the slot ready
     686        10273018 :             let slot = &self.slots[slot_idx];
     687        10273018 :             inner.key = Some(cache_key.clone());
     688        10273018 :             slot.set_usage_count(1);
     689                 : 
     690                 :             debug_assert!(
     691                 :                 {
     692        10273018 :                     let guard = inner.permit.lock().unwrap();
     693        10273018 :                     guard.upgrade().is_none()
     694                 :                 },
     695 UBC           0 :                 "we hold a write lock, so, no one else should have a permit"
     696                 :             );
     697                 : 
     698 CBC    10273018 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     699        10273018 :                 state: PageWriteGuardState::Invalid {
     700        10273018 :                     _permit: permit.take().unwrap(),
     701        10273018 :                     inner,
     702        10273018 :                 },
     703        10273018 :             }));
     704                 :         }
     705       292271411 :     }
     706                 : 
     707                 :     //
     708                 :     // Section 3: Mapping functions
     709                 :     //
     710                 : 
     711                 :     /// Search for a page in the cache using the given search key.
     712                 :     ///
     713                 :     /// Returns the slot index, if any. If the search criteria is not exact,
     714                 :     /// *cache_key is updated with the actual key of the found page.
     715                 :     ///
     716                 :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     717                 :     /// get recycled for an unrelated page immediately after this function
     718                 :     /// returns.  The caller is responsible for re-checking that the slot still
     719                 :     /// contains the page with the same key before using it.
     720                 :     ///
     721       298644517 :     fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
     722       298644517 :         match cache_key {
     723         6372324 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     724         6372324 :                 let map = self.materialized_page_map.read().unwrap();
     725         6372324 :                 let versions = map.get(hash_key)?;
     726                 : 
     727         1456250 :                 let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
     728              16 :                     Ok(version_idx) => version_idx,
     729            1270 :                     Err(0) => return None,
     730         1297786 :                     Err(version_idx) => version_idx - 1,
     731                 :                 };
     732         1297802 :                 let version = &versions[version_idx];
     733         1297802 :                 *lsn = version.lsn;
     734         1297802 :                 Some(version.slot_idx)
     735                 :             }
     736       292272193 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     737       292272193 :                 let map = self.immutable_page_map.read().unwrap();
     738       292272193 :                 Some(*map.get(&(*file_id, *blkno))?)
     739                 :             }
     740                 :         }
     741       298644517 :     }
     742                 : 
     743                 :     /// Search for a page in the cache using the given search key.
     744                 :     ///
     745                 :     /// Like 'search_mapping, but performs an "exact" search. Used for
     746                 :     /// allocating a new buffer.
     747         2252459 :     fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
     748         2252459 :         match key {
     749         2252459 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     750         2252459 :                 let map = self.materialized_page_map.read().unwrap();
     751         2252459 :                 let versions = map.get(hash_key)?;
     752                 : 
     753          440552 :                 if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
     754            4187 :                     Some(versions[version_idx].slot_idx)
     755                 :                 } else {
     756          336486 :                     None
     757                 :                 }
     758                 :             }
     759 UBC           0 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     760               0 :                 let map = self.immutable_page_map.read().unwrap();
     761               0 :                 Some(*map.get(&(*file_id, *blkno))?)
     762                 :             }
     763                 :         }
     764 CBC     2252459 :     }
     765                 : 
     766                 :     ///
     767                 :     /// Remove mapping for given key.
     768                 :     ///
     769         9824374 :     fn remove_mapping(&self, old_key: &CacheKey) {
     770         9824374 :         match old_key {
     771                 :             CacheKey::MaterializedPage {
     772         2079692 :                 hash_key: old_hash_key,
     773         2079692 :                 lsn: old_lsn,
     774         2079692 :             } => {
     775         2079692 :                 let mut map = self.materialized_page_map.write().unwrap();
     776         2079692 :                 if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
     777         2079692 :                     let versions = old_entry.get_mut();
     778                 : 
     779         2444267 :                     if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
     780         2079692 :                         versions.remove(version_idx);
     781         2079692 :                         self.size_metrics
     782         2079692 :                             .current_bytes_materialized_page
     783         2079692 :                             .sub_page_sz(1);
     784         2079692 :                         if versions.is_empty() {
     785         1762097 :                             old_entry.remove_entry();
     786         1762097 :                         }
     787 UBC           0 :                     }
     788                 :                 } else {
     789               0 :                     panic!("could not find old key in mapping")
     790                 :                 }
     791                 :             }
     792 CBC     7744682 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     793         7744682 :                 let mut map = self.immutable_page_map.write().unwrap();
     794         7744682 :                 map.remove(&(*file_id, *blkno))
     795         7744682 :                     .expect("could not find old key in mapping");
     796         7744682 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     797         7744682 :             }
     798                 :         }
     799         9824374 :     }
     800                 : 
     801                 :     ///
     802                 :     /// Insert mapping for given key.
     803                 :     ///
     804                 :     /// If a mapping already existed for the given key, returns the slot index
     805                 :     /// of the existing mapping and leaves it untouched.
     806        12521726 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     807        12521726 :         match new_key {
     808                 :             CacheKey::MaterializedPage {
     809         2248272 :                 hash_key: new_key,
     810         2248272 :                 lsn: new_lsn,
     811         2248272 :             } => {
     812         2248272 :                 let mut map = self.materialized_page_map.write().unwrap();
     813         2248272 :                 let versions = map.entry(new_key.clone()).or_default();
     814         2248272 :                 match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
     815              37 :                     Ok(version_idx) => Some(versions[version_idx].slot_idx),
     816         2248235 :                     Err(version_idx) => {
     817         2248235 :                         versions.insert(
     818         2248235 :                             version_idx,
     819         2248235 :                             Version {
     820         2248235 :                                 lsn: *new_lsn,
     821         2248235 :                                 slot_idx,
     822         2248235 :                             },
     823         2248235 :                         );
     824         2248235 :                         self.size_metrics
     825         2248235 :                             .current_bytes_materialized_page
     826         2248235 :                             .add_page_sz(1);
     827         2248235 :                         None
     828                 :                     }
     829                 :                 }
     830                 :             }
     831                 : 
     832        10273454 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     833        10273454 :                 let mut map = self.immutable_page_map.write().unwrap();
     834        10273454 :                 match map.entry((*file_id, *blkno)) {
     835             431 :                     Entry::Occupied(entry) => Some(*entry.get()),
     836        10273023 :                     Entry::Vacant(entry) => {
     837        10273023 :                         entry.insert(slot_idx);
     838        10273023 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     839        10273023 :                         None
     840                 :                     }
     841                 :                 }
     842                 :             }
     843                 :         }
     844        12521726 :     }
     845                 : 
     846                 :     //
     847                 :     // Section 4: Misc internal helpers
     848                 :     //
     849                 : 
     850                 :     /// Find a slot to evict.
     851                 :     ///
     852                 :     /// On return, the slot is empty and write-locked.
     853        12521726 :     async fn find_victim(
     854        12521726 :         &self,
     855        12521726 :         _permit_witness: &PinnedSlotsPermit,
     856        12521726 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     857        12521721 :         let iter_limit = self.slots.len() * 10;
     858        12521721 :         let mut iters = 0;
     859        44918460 :         loop {
     860        44918460 :             iters += 1;
     861        44918460 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     862        44918460 : 
     863        44918460 :             let slot = &self.slots[slot_idx];
     864        44918460 : 
     865        44918460 :             if slot.dec_usage_count() == 0 {
     866        12521726 :                 let mut inner = match slot.inner.try_write() {
     867        12521721 :                     Ok(inner) => inner,
     868               5 :                     Err(_err) => {
     869               5 :                         if iters > iter_limit {
     870                 :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     871                 :                             // any particular number of iterations: other threads might race ahead and acquire and
     872                 :                             // release pins just as we're scanning the array.
     873                 :                             //
     874                 :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     875                 :                             // slots. There are two threads running concurrently, A and B. A has just
     876                 :                             // acquired the permit from the semaphore.
     877                 :                             //
     878                 :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     879                 :                             //   B: Acquire permit.
     880                 :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     881                 :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     882                 :                             //   B: Release pin and permit again
     883                 :                             //   B: Acquire permit.
     884                 :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     885                 :                             //   B: Release pin and permit again
     886                 :                             //
     887                 :                             // Now we're back in the starting situation that both slots have
     888                 :                             // usage_count 1, but A has now been through one iteration of the
     889                 :                             // find_victim() loop. This can repeat indefinitely and on each
     890                 :                             // iteration, A's iteration count increases by one.
     891                 :                             //
     892                 :                             // So, even though the semaphore for the permits is fair, the victim search
     893                 :                             // itself happens in parallel and is not fair.
     894                 :                             // Hence even with a permit, a task can theoretically be starved.
     895                 :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     896                 :                             // permits for longer.
     897                 :                             // Note that just yielding to tokio during iteration without such
     898                 :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     899                 :                             // for B to bump usage count, further starving A.
     900 UBC           0 :                             crate::metrics::page_cache_errors_inc(
     901               0 :                                 crate::metrics::PageCacheErrorKind::EvictIterLimit,
     902               0 :                             );
     903               0 :                             anyhow::bail!("exceeded evict iter limit");
     904 CBC           5 :                         }
     905               5 :                         continue;
     906                 :                     }
     907                 :                 };
     908        12521721 :                 if let Some(old_key) = &inner.key {
     909         9824369 :                     // remove mapping for old buffer
     910         9824369 :                     self.remove_mapping(old_key);
     911         9824369 :                     inner.key = None;
     912         9824369 :                 }
     913        12521721 :                 crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
     914        12521721 :                 return Ok((slot_idx, inner));
     915        32396734 :             }
     916                 :         }
     917        12521721 :     }
     918                 : 
     919                 :     /// Initialize a new page cache
     920                 :     ///
     921                 :     /// This should be called only once at page server startup.
     922             561 :     fn new(num_pages: usize) -> Self {
     923             561 :         assert!(num_pages > 0, "page cache size must be > 0");
     924                 : 
     925                 :         // We could use Vec::leak here, but that potentially also leaks
     926                 :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     927                 :         // this is avoided.
     928             561 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     929             561 : 
     930             561 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     931             561 :         size_metrics.max_bytes.set_page_sz(num_pages);
     932             561 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     933             561 :         size_metrics.current_bytes_materialized_page.set_page_sz(0);
     934             561 : 
     935             561 :         let slots = page_buffer
     936             561 :             .chunks_exact_mut(PAGE_SZ)
     937         4564326 :             .map(|chunk| {
     938         4564326 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     939         4564326 : 
     940         4564326 :                 Slot {
     941         4564326 :                     inner: tokio::sync::RwLock::new(SlotInner {
     942         4564326 :                         key: None,
     943         4564326 :                         buf,
     944         4564326 :                         permit: std::sync::Mutex::new(Weak::new()),
     945         4564326 :                     }),
     946         4564326 :                     usage_count: AtomicU8::new(0),
     947         4564326 :                 }
     948         4564326 :             })
     949             561 :             .collect();
     950             561 : 
     951             561 :         Self {
     952             561 :             materialized_page_map: Default::default(),
     953             561 :             immutable_page_map: Default::default(),
     954             561 :             slots,
     955             561 :             next_evict_slot: AtomicUsize::new(0),
     956             561 :             size_metrics,
     957             561 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     958             561 :         }
     959             561 :     }
     960                 : }
     961                 : 
     962                 : trait PageSzBytesMetric {
     963                 :     fn set_page_sz(&self, count: usize);
     964                 :     fn add_page_sz(&self, count: usize);
     965                 :     fn sub_page_sz(&self, count: usize);
     966                 : }
     967                 : 
     968                 : #[inline(always)]
     969        22347267 : fn count_times_page_sz(count: usize) -> u64 {
     970        22347267 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     971        22347267 : }
     972                 : 
     973                 : impl PageSzBytesMetric for metrics::UIntGauge {
     974            1683 :     fn set_page_sz(&self, count: usize) {
     975            1683 :         self.set(count_times_page_sz(count));
     976            1683 :     }
     977        12521234 :     fn add_page_sz(&self, count: usize) {
     978        12521234 :         self.add(count_times_page_sz(count));
     979        12521234 :     }
     980         9824350 :     fn sub_page_sz(&self, count: usize) {
     981         9824350 :         self.sub(count_times_page_sz(count));
     982         9824350 :     }
     983                 : }
        

Generated by: LCOV version 2.1-beta