LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 95.3 % 385 367
Test Date: 2023-09-06 10:18:01 Functions: 89.5 % 57 51

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Materialized pages**, filled & used by page reconstruction
      21              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      22              : //!
      23              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      24              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      25              : //!
      26              : //! # Filling The Page Cache
      27              : //!
      28              : //! Page cache maps from a cache key to a buffer slot.
      29              : //! The cache key uniquely identifies the piece of data that is being cached.
      30              : //!
      31              : //! The cache key for **materialized pages** is  [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
      32              : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
      33              : //!
      34              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      35              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      36              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      37              : //! * Get a [`FileId`] using [`next_file_id`].
      38              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      39              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      40              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      41              : //!   a read guard for the page. Just use it.
      42              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      43              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      44              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      45              : //!   Then try again to [`PageCache::read_immutable_buf`].
      46              : //!   Unless there's high cache pressure, the page should now be cached.
      47              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      48              : //!
      49              : //! # Locking
      50              : //!
      51              : //! There are two levels of locking involved: There's one lock for the "mapping"
      52              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      53              : //! slot, and a separate lock on each slot. To read or write the contents of a
      54              : //! slot, you must hold the lock on the slot in read or write mode,
      55              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      56              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      57              : //! the slot at the same time.
      58              : //!
      59              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      60              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      61              : //! in the cache, you would first look up the mapping, while holding the mapping
      62              : //! lock, and then lock the slot. You must release the mapping lock in between,
      63              : //! to obey the lock ordering and avoid deadlock.
      64              : //!
      65              : //! A slot can momentarily have invalid contents, even if it's already been
      66              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      67              : //! the contents are valid. If you need to release the lock without initializing
      68              : //! the contents, you must remove the mapping first. We make that easy for the
      69              : //! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
      70              : //! page, the caller must explicitly call guard.mark_valid() after it has
      71              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      72              : //! mapping is automatically removed and the slot is marked free.
      73              : //!
      74              : 
      75              : use std::{
      76              :     collections::{hash_map::Entry, HashMap},
      77              :     convert::TryInto,
      78              :     sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      79              : };
      80              : 
      81              : use anyhow::Context;
      82              : use once_cell::sync::OnceCell;
      83              : use utils::{
      84              :     id::{TenantId, TimelineId},
      85              :     lsn::Lsn,
      86              : };
      87              : 
      88              : use crate::{metrics::PageCacheSizeMetrics, repository::Key};
      89              : 
      90              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      91              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      92              : 
      93              : ///
      94              : /// Initialize the page cache. This must be called once at page server startup.
      95              : ///
      96          575 : pub fn init(size: usize) {
      97          575 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
      98            0 :         panic!("page cache already initialized");
      99          575 :     }
     100          575 : }
     101              : 
     102              : ///
     103              : /// Get a handle to the page cache.
     104              : ///
     105    399446033 : pub fn get() -> &'static PageCache {
     106    399446033 :     //
     107    399446033 :     // In unit tests, page server startup doesn't happen and no one calls
     108    399446033 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     109    399446033 :     // page cache is usable in unit tests.
     110    399446033 :     //
     111    399446033 :     if cfg!(test) {
     112      1651689 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     113              :     } else {
     114    397794344 :         PAGE_CACHE.get().expect("page cache not initialized")
     115              :     }
     116    399446033 : }
     117              : 
     118              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     119              : const MAX_USAGE_COUNT: u8 = 5;
     120              : 
     121              : /// See module-level comment.
     122    784690923 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     123              : pub struct FileId(u64);
     124              : 
     125              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     126              : 
     127              : /// See module-level comment.
     128        19009 : pub fn next_file_id() -> FileId {
     129        19009 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     130        19009 : }
     131              : 
     132              : ///
     133              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     134              : ///
     135    417416954 : #[derive(Debug, PartialEq, Eq, Clone)]
     136              : #[allow(clippy::enum_variant_names)]
     137              : enum CacheKey {
     138              :     MaterializedPage {
     139              :         hash_key: MaterializedPageHashKey,
     140              :         lsn: Lsn,
     141              :     },
     142              :     ImmutableFilePage {
     143              :         file_id: FileId,
     144              :         blkno: u32,
     145              :     },
     146              : }
     147              : 
     148     15268817 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
     149              : struct MaterializedPageHashKey {
     150              :     tenant_id: TenantId,
     151              :     timeline_id: TimelineId,
     152              :     key: Key,
     153              : }
     154              : 
     155            0 : #[derive(Clone)]
     156              : struct Version {
     157              :     lsn: Lsn,
     158              :     slot_idx: usize,
     159              : }
     160              : 
     161              : struct Slot {
     162              :     inner: tokio::sync::RwLock<SlotInner>,
     163              :     usage_count: AtomicU8,
     164              : }
     165              : 
     166              : struct SlotInner {
     167              :     key: Option<CacheKey>,
     168              :     buf: &'static mut [u8; PAGE_SZ],
     169              : }
     170              : 
     171              : impl Slot {
     172              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     173    387218430 :     fn inc_usage_count(&self) {
     174    387218430 :         let _ = self
     175    387218430 :             .usage_count
     176    387219677 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     177    387219677 :                 if val == MAX_USAGE_COUNT {
     178    345664814 :                     None
     179              :                 } else {
     180     41554863 :                     Some(val + 1)
     181              :                 }
     182    387219677 :             });
     183    387218430 :     }
     184              : 
     185              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     186              :     /// the old usage count.
     187     61375494 :     fn dec_usage_count(&self) -> u8 {
     188     61375494 :         let count_res =
     189     61375494 :             self.usage_count
     190     61375521 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     191     61375521 :                     if val == 0 {
     192     13717901 :                         None
     193              :                     } else {
     194     47657620 :                         Some(val - 1)
     195              :                     }
     196     61375521 :                 });
     197     61375494 : 
     198     61375494 :         match count_res {
     199     47657600 :             Ok(usage_count) => usage_count,
     200     13717894 :             Err(usage_count) => usage_count,
     201              :         }
     202     61375494 :     }
     203              : 
     204              :     /// Sets the usage count to a specific value.
     205     13717435 :     fn set_usage_count(&self, count: u8) {
     206     13717435 :         self.usage_count.store(count, Ordering::Relaxed);
     207     13717435 :     }
     208              : }
     209              : 
     210              : pub struct PageCache {
     211              :     /// This contains the mapping from the cache key to buffer slot that currently
     212              :     /// contains the page, if any.
     213              :     ///
     214              :     /// TODO: This is protected by a single lock. If that becomes a bottleneck,
     215              :     /// this HashMap can be replaced with a more concurrent version, there are
     216              :     /// plenty of such crates around.
     217              :     ///
     218              :     /// If you add support for caching different kinds of objects, each object kind
     219              :     /// can have a separate mapping map, next to this field.
     220              :     materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
     221              : 
     222              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     223              : 
     224              :     /// The actual buffers with their metadata.
     225              :     slots: Box<[Slot]>,
     226              : 
     227              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     228              :     /// This is interpreted modulo the page cache size.
     229              :     next_evict_slot: AtomicUsize,
     230              : 
     231              :     size_metrics: &'static PageCacheSizeMetrics,
     232              : }
     233              : 
     234              : ///
     235              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     236              : /// until the guard is dropped.
     237              : ///
     238              : pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
     239              : 
     240              : impl std::ops::Deref for PageReadGuard<'_> {
     241              :     type Target = [u8; PAGE_SZ];
     242              : 
     243    786427816 :     fn deref(&self) -> &Self::Target {
     244    786427816 :         self.0.buf
     245    786427816 :     }
     246              : }
     247              : 
     248              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     249            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     250            0 :         self.0.buf
     251            0 :     }
     252              : }
     253              : 
     254              : ///
     255              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     256              : /// until the guard is dropped.
     257              : ///
     258              : /// Counterintuitively, this is used even for a read, if the requested page is not
     259              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     260              : /// is expected to fill in the page contents and call mark_valid(). Similarly
     261              : /// lock_for_write() can return an invalid buffer that the caller is expected to
     262              : /// to initialize.
     263              : ///
     264              : pub struct PageWriteGuard<'i> {
     265              :     inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     266              : 
     267              :     // Are the page contents currently valid?
     268              :     // Used to mark pages as invalid that are assigned but not yet filled with data.
     269              :     valid: bool,
     270              : }
     271              : 
     272              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     273     13717435 :     fn deref_mut(&mut self) -> &mut Self::Target {
     274     13717435 :         self.inner.buf
     275     13717435 :     }
     276              : }
     277              : 
     278              : impl std::ops::Deref for PageWriteGuard<'_> {
     279              :     type Target = [u8; PAGE_SZ];
     280              : 
     281         3721 :     fn deref(&self) -> &Self::Target {
     282         3721 :         self.inner.buf
     283         3721 :     }
     284              : }
     285              : 
     286              : impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
     287            0 :     fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
     288            0 :         self.inner.buf
     289            0 :     }
     290              : }
     291              : 
     292              : impl PageWriteGuard<'_> {
     293              :     /// Mark that the buffer contents are now valid.
     294     13717423 :     pub fn mark_valid(&mut self) {
     295     13717423 :         assert!(self.inner.key.is_some());
     296     13717423 :         assert!(
     297     13717423 :             !self.valid,
     298            0 :             "mark_valid called on a buffer that was already valid"
     299              :         );
     300     13717423 :         self.valid = true;
     301     13717423 :     }
     302              : }
     303              : 
     304              : impl Drop for PageWriteGuard<'_> {
     305              :     ///
     306              :     /// If the buffer was allocated for a page that was not already in the
     307              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     308              :     /// initializing it, remove the mapping from the page cache.
     309              :     ///
     310     13721145 :     fn drop(&mut self) {
     311     13721145 :         assert!(self.inner.key.is_some());
     312     13721145 :         if !self.valid {
     313           11 :             let self_key = self.inner.key.as_ref().unwrap();
     314           11 :             PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     315           11 :             self.inner.key = None;
     316     13721134 :         }
     317     13721145 :     }
     318              : }
     319              : 
     320              : /// lock_for_read() return value
     321              : pub enum ReadBufResult<'a> {
     322              :     Found(PageReadGuard<'a>),
     323              :     NotFound(PageWriteGuard<'a>),
     324              : }
     325              : 
     326              : /// lock_for_write() return value
     327              : pub enum WriteBufResult<'a> {
     328              :     Found(PageWriteGuard<'a>),
     329              :     NotFound(PageWriteGuard<'a>),
     330              : }
     331              : 
     332              : impl PageCache {
     333              :     //
     334              :     // Section 1.1: Public interface functions for looking up and memorizing materialized page
     335              :     // versions in the page cache
     336              :     //
     337              : 
     338              :     /// Look up a materialized page version.
     339              :     ///
     340              :     /// The 'lsn' is an upper bound, this will return the latest version of
     341              :     /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
     342              :     /// returned page.
     343      7262299 :     pub async fn lookup_materialized_page(
     344      7262299 :         &self,
     345      7262299 :         tenant_id: TenantId,
     346      7262299 :         timeline_id: TimelineId,
     347      7262299 :         key: &Key,
     348      7262299 :         lsn: Lsn,
     349      7262300 :     ) -> Option<(Lsn, PageReadGuard)> {
     350      7262300 :         crate::metrics::PAGE_CACHE
     351      7262300 :             .read_accesses_materialized_page
     352      7262300 :             .inc();
     353      7262300 : 
     354      7262300 :         let mut cache_key = CacheKey::MaterializedPage {
     355      7262300 :             hash_key: MaterializedPageHashKey {
     356      7262300 :                 tenant_id,
     357      7262300 :                 timeline_id,
     358      7262300 :                 key: *key,
     359      7262300 :             },
     360      7262300 :             lsn,
     361      7262300 :         };
     362              : 
     363      7262300 :         if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
     364              :             if let CacheKey::MaterializedPage {
     365              :                 hash_key: _,
     366      1740445 :                 lsn: available_lsn,
     367      1740445 :             } = cache_key
     368              :             {
     369      1740445 :                 if available_lsn == lsn {
     370           40 :                     crate::metrics::PAGE_CACHE
     371           40 :                         .read_hits_materialized_page_exact
     372           40 :                         .inc();
     373      1740405 :                 } else {
     374      1740405 :                     crate::metrics::PAGE_CACHE
     375      1740405 :                         .read_hits_materialized_page_older_lsn
     376      1740405 :                         .inc();
     377      1740405 :                 }
     378      1740445 :                 Some((available_lsn, guard))
     379              :             } else {
     380            0 :                 panic!("unexpected key type in slot");
     381              :             }
     382              :         } else {
     383      5521855 :             None
     384              :         }
     385      7262300 :     }
     386              : 
     387              :     ///
     388              :     /// Store an image of the given page in the cache.
     389              :     ///
     390      2758635 :     pub async fn memorize_materialized_page(
     391      2758635 :         &self,
     392      2758635 :         tenant_id: TenantId,
     393      2758635 :         timeline_id: TimelineId,
     394      2758635 :         key: Key,
     395      2758635 :         lsn: Lsn,
     396      2758635 :         img: &[u8],
     397      2758635 :     ) -> anyhow::Result<()> {
     398      2758635 :         let cache_key = CacheKey::MaterializedPage {
     399      2758635 :             hash_key: MaterializedPageHashKey {
     400      2758635 :                 tenant_id,
     401      2758635 :                 timeline_id,
     402      2758635 :                 key,
     403      2758635 :             },
     404      2758635 :             lsn,
     405      2758635 :         };
     406      2758635 : 
     407      2758635 :         match self.lock_for_write(&cache_key).await? {
     408         3721 :             WriteBufResult::Found(write_guard) => {
     409         3721 :                 // We already had it in cache. Another thread must've put it there
     410         3721 :                 // concurrently. Check that it had the same contents that we
     411         3721 :                 // replayed.
     412         3721 :                 assert!(*write_guard == img);
     413              :             }
     414      2754914 :             WriteBufResult::NotFound(mut write_guard) => {
     415      2754914 :                 write_guard.copy_from_slice(img);
     416      2754914 :                 write_guard.mark_valid();
     417      2754914 :             }
     418              :         }
     419              : 
     420      2758635 :         Ok(())
     421      2758635 :     }
     422              : 
     423              :     // Section 1.2: Public interface functions for working with immutable file pages.
     424              : 
     425    396436785 :     pub async fn read_immutable_buf(
     426    396436785 :         &self,
     427    396436785 :         file_id: FileId,
     428    396436785 :         blkno: u32,
     429    396437058 :     ) -> anyhow::Result<ReadBufResult> {
     430    396437058 :         let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
     431    396437058 : 
     432    396437058 :         self.lock_for_read(&mut cache_key).await
     433    396437055 :     }
     434              : 
     435              :     //
     436              :     // Section 2: Internal interface functions for lookup/update.
     437              :     //
     438              :     // To add support for a new kind of "thing" to cache, you will need
     439              :     // to add public interface routines above, and code to deal with the
     440              :     // "mappings" after this section. But the routines in this section should
     441              :     // not require changes.
     442              : 
     443              :     /// Look up a page in the cache.
     444              :     ///
     445              :     /// If the search criteria is not exact, *cache_key is updated with the key
     446              :     /// for exact key of the returned page. (For materialized pages, that means
     447              :     /// that the LSN in 'cache_key' is updated with the LSN of the returned page
     448              :     /// version.)
     449              :     ///
     450              :     /// If no page is found, returns None and *cache_key is left unmodified.
     451              :     ///
     452    403699793 :     async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
     453    403699793 :         let cache_key_orig = cache_key.clone();
     454    403699793 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     455              :             // The page was found in the mapping. Lock the slot, and re-check
     456              :             // that it's still what we expected (because we released the mapping
     457              :             // lock already, another thread could have evicted the page)
     458    387214987 :             let slot = &self.slots[slot_idx];
     459    387214987 :             let inner = slot.inner.read().await;
     460    387214984 :             if inner.key.as_ref() == Some(cache_key) {
     461    387214972 :                 slot.inc_usage_count();
     462    387214972 :                 return Some(PageReadGuard(inner));
     463           12 :             } else {
     464           12 :                 // search_mapping might have modified the search key; restore it.
     465           12 :                 *cache_key = cache_key_orig;
     466           12 :             }
     467     16484806 :         }
     468     16484818 :         None
     469    403699790 :     }
     470              : 
     471              :     /// Return a locked buffer for given block.
     472              :     ///
     473              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     474              :     /// page is already found in the cache, *cache_key is updated.
     475              :     ///
     476              :     /// If the page is not found in the cache, this allocates a new buffer for
     477              :     /// it. The caller may then initialize the buffer with the contents, and
     478              :     /// call mark_valid().
     479              :     ///
     480              :     /// Example usage:
     481              :     ///
     482              :     /// ```ignore
     483              :     /// let cache = page_cache::get();
     484              :     ///
     485              :     /// match cache.lock_for_read(&key) {
     486              :     ///     ReadBufResult::Found(read_guard) => {
     487              :     ///         // The page was found in cache. Use it
     488              :     ///     },
     489              :     ///     ReadBufResult::NotFound(write_guard) => {
     490              :     ///         // The page was not found in cache. Read it from disk into the
     491              :     ///         // buffer.
     492              :     ///         //read_my_page_from_disk(write_guard);
     493              :     ///
     494              :     ///         // The buffer contents are now valid. Tell the page cache.
     495              :     ///         write_guard.mark_valid();
     496              :     ///     },
     497              :     /// }
     498              :     /// ```
     499              :     ///
     500    396437058 :     async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
     501    396437058 :         let (read_access, hit) = match cache_key {
     502              :             CacheKey::MaterializedPage { .. } => {
     503            0 :                 unreachable!("Materialized pages use lookup_materialized_page")
     504              :             }
     505    396437058 :             CacheKey::ImmutableFilePage { .. } => (
     506    396437058 :                 &crate::metrics::PAGE_CACHE.read_accesses_immutable,
     507    396437058 :                 &crate::metrics::PAGE_CACHE.read_hits_immutable,
     508    396437058 :             ),
     509    396437058 :         };
     510    396437058 :         read_access.inc();
     511    396437058 : 
     512    396437058 :         let mut is_first_iteration = true;
     513              :         loop {
     514              :             // First check if the key already exists in the cache.
     515    396437493 :             if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
     516    385474527 :                 if is_first_iteration {
     517    385474092 :                     hit.inc();
     518    385474092 :                 }
     519    385474527 :                 return Ok(ReadBufResult::Found(read_guard));
     520     10962963 :             }
     521     10962963 :             is_first_iteration = false;
     522              : 
     523              :             // Not found. Find a victim buffer
     524     10962963 :             let (slot_idx, mut inner) =
     525     10962963 :                 self.find_victim().context("Failed to find evict victim")?;
     526              : 
     527              :             // Insert mapping for this. At this point, we may find that another
     528              :             // thread did the same thing concurrently. In that case, we evicted
     529              :             // our victim buffer unnecessarily. Put it into the free list and
     530              :             // continue with the slot that the other thread chose.
     531     10962963 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     532              :                 // TODO: put to free list
     533              : 
     534              :                 // We now just loop back to start from beginning. This is not
     535              :                 // optimal, we'll perform the lookup in the mapping again, which
     536              :                 // is not really necessary because we already got
     537              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     538              :                 // to matter much.
     539          435 :                 continue;
     540     10962528 :             }
     541     10962528 : 
     542     10962528 :             // Make the slot ready
     543     10962528 :             let slot = &self.slots[slot_idx];
     544     10962528 :             inner.key = Some(cache_key.clone());
     545     10962528 :             slot.set_usage_count(1);
     546     10962528 : 
     547     10962528 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     548     10962528 :                 inner,
     549     10962528 :                 valid: false,
     550     10962528 :             }));
     551              :         }
     552    396437055 :     }
     553              : 
     554              :     /// Look up a page in the cache and lock it in write mode. If it's not
     555              :     /// found, returns None.
     556              :     ///
     557              :     /// When locking a page for writing, the search criteria is always "exact".
     558      2758653 :     async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
     559      2758653 :         if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
     560              :             // The page was found in the mapping. Lock the slot, and re-check
     561              :             // that it's still what we expected (because we don't released the mapping
     562              :             // lock already, another thread could have evicted the page)
     563         3721 :             let slot = &self.slots[slot_idx];
     564         3721 :             let inner = slot.inner.write().await;
     565         3721 :             if inner.key.as_ref() == Some(cache_key) {
     566         3721 :                 slot.inc_usage_count();
     567         3721 :                 return Some(PageWriteGuard { inner, valid: true });
     568            0 :             }
     569      2754932 :         }
     570      2754932 :         None
     571      2758653 :     }
     572              : 
     573              :     /// Return a write-locked buffer for given block.
     574              :     ///
     575              :     /// Similar to lock_for_read(), but the returned buffer is write-locked and
     576              :     /// may be modified by the caller even if it's already found in the cache.
     577      2758635 :     async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
     578              :         loop {
     579              :             // First check if the key already exists in the cache.
     580      2758653 :             if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
     581         3721 :                 return Ok(WriteBufResult::Found(write_guard));
     582      2754932 :             }
     583              : 
     584              :             // Not found. Find a victim buffer
     585      2754932 :             let (slot_idx, mut inner) =
     586      2754932 :                 self.find_victim().context("Failed to find evict victim")?;
     587              : 
     588              :             // Insert mapping for this. At this point, we may find that another
     589              :             // thread did the same thing concurrently. In that case, we evicted
     590              :             // our victim buffer unnecessarily. Put it into the free list and
     591              :             // continue with the slot that the other thread chose.
     592      2754932 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     593              :                 // TODO: put to free list
     594              : 
     595              :                 // We now just loop back to start from beginning. This is not
     596              :                 // optimal, we'll perform the lookup in the mapping again, which
     597              :                 // is not really necessary because we already got
     598              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     599              :                 // to matter much.
     600           18 :                 continue;
     601      2754914 :             }
     602      2754914 : 
     603      2754914 :             // Make the slot ready
     604      2754914 :             let slot = &self.slots[slot_idx];
     605      2754914 :             inner.key = Some(cache_key.clone());
     606      2754914 :             slot.set_usage_count(1);
     607      2754914 : 
     608      2754914 :             return Ok(WriteBufResult::NotFound(PageWriteGuard {
     609      2754914 :                 inner,
     610      2754914 :                 valid: false,
     611      2754914 :             }));
     612              :         }
     613      2758635 :     }
     614              : 
     615              :     //
     616              :     // Section 3: Mapping functions
     617              :     //
     618              : 
     619              :     /// Search for a page in the cache using the given search key.
     620              :     ///
     621              :     /// Returns the slot index, if any. If the search criteria is not exact,
     622              :     /// *cache_key is updated with the actual key of the found page.
     623              :     ///
     624              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     625              :     /// get recycled for an unrelated page immediately after this function
     626              :     /// returns.  The caller is responsible for re-checking that the slot still
     627              :     /// contains the page with the same key before using it.
     628              :     ///
     629    403699519 :     fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
     630    403699519 :         match cache_key {
     631      7262299 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     632      7262299 :                 let map = self.materialized_page_map.read().unwrap();
     633      7262299 :                 let versions = map.get(hash_key)?;
     634              : 
     635      1986120 :                 let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
     636           40 :                     Ok(version_idx) => version_idx,
     637         1787 :                     Err(0) => return None,
     638      1740408 :                     Err(version_idx) => version_idx - 1,
     639              :                 };
     640      1740448 :                 let version = &versions[version_idx];
     641      1740448 :                 *lsn = version.lsn;
     642      1740448 :                 Some(version.slot_idx)
     643              :             }
     644    396437220 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     645    396437220 :                 let map = self.immutable_page_map.read().unwrap();
     646    396437220 :                 Some(*map.get(&(*file_id, *blkno))?)
     647              :             }
     648              :         }
     649    403699519 :     }
     650              : 
     651              :     /// Search for a page in the cache using the given search key.
     652              :     ///
     653              :     /// Like 'search_mapping, but performs an "exact" search. Used for
     654              :     /// allocating a new buffer.
     655      2758653 :     fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
     656      2758653 :         match key {
     657      2758653 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     658      2758653 :                 let map = self.materialized_page_map.read().unwrap();
     659      2758653 :                 let versions = map.get(hash_key)?;
     660              : 
     661       719054 :                 if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
     662         3721 :                     Some(versions[version_idx].slot_idx)
     663              :                 } else {
     664       542620 :                     None
     665              :                 }
     666              :             }
     667            0 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     668            0 :                 let map = self.immutable_page_map.read().unwrap();
     669            0 :                 Some(*map.get(&(*file_id, *blkno))?)
     670              :             }
     671              :         }
     672      2758653 :     }
     673              : 
     674              :     ///
     675              :     /// Remove mapping for given key.
     676              :     ///
     677     10815562 :     fn remove_mapping(&self, old_key: &CacheKey) {
     678     10815562 :         match old_key {
     679              :             CacheKey::MaterializedPage {
     680      2496672 :                 hash_key: old_hash_key,
     681      2496672 :                 lsn: old_lsn,
     682      2496672 :             } => {
     683      2496672 :                 let mut map = self.materialized_page_map.write().unwrap();
     684      2496672 :                 if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
     685      2496672 :                     let versions = old_entry.get_mut();
     686              : 
     687      3070504 :                     if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
     688      2496672 :                         versions.remove(version_idx);
     689      2496672 :                         self.size_metrics
     690      2496672 :                             .current_bytes_materialized_page
     691      2496672 :                             .sub_page_sz(1);
     692      2496672 :                         if versions.is_empty() {
     693      2018601 :                             old_entry.remove_entry();
     694      2018601 :                         }
     695            0 :                     }
     696              :                 } else {
     697            0 :                     panic!("could not find old key in mapping")
     698              :                 }
     699              :             }
     700      8318890 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     701      8318890 :                 let mut map = self.immutable_page_map.write().unwrap();
     702      8318890 :                 map.remove(&(*file_id, *blkno))
     703      8318890 :                     .expect("could not find old key in mapping");
     704      8318890 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     705      8318890 :             }
     706              :         }
     707     10815562 :     }
     708              : 
     709              :     ///
     710              :     /// Insert mapping for given key.
     711              :     ///
     712              :     /// If a mapping already existed for the given key, returns the slot index
     713              :     /// of the existing mapping and leaves it untouched.
     714     13717888 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     715     13717888 :         match new_key {
     716              :             CacheKey::MaterializedPage {
     717      2754932 :                 hash_key: new_key,
     718      2754932 :                 lsn: new_lsn,
     719      2754932 :             } => {
     720      2754932 :                 let mut map = self.materialized_page_map.write().unwrap();
     721      2754932 :                 let versions = map.entry(new_key.clone()).or_default();
     722      2754932 :                 match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
     723           18 :                     Ok(version_idx) => Some(versions[version_idx].slot_idx),
     724      2754914 :                     Err(version_idx) => {
     725      2754914 :                         versions.insert(
     726      2754914 :                             version_idx,
     727      2754914 :                             Version {
     728      2754914 :                                 lsn: *new_lsn,
     729      2754914 :                                 slot_idx,
     730      2754914 :                             },
     731      2754914 :                         );
     732      2754914 :                         self.size_metrics
     733      2754914 :                             .current_bytes_materialized_page
     734      2754914 :                             .add_page_sz(1);
     735      2754914 :                         None
     736              :                     }
     737              :                 }
     738              :             }
     739              : 
     740     10962956 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     741     10962956 :                 let mut map = self.immutable_page_map.write().unwrap();
     742     10962956 :                 match map.entry((*file_id, *blkno)) {
     743          435 :                     Entry::Occupied(entry) => Some(*entry.get()),
     744     10962521 :                     Entry::Vacant(entry) => {
     745     10962521 :                         entry.insert(slot_idx);
     746     10962521 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     747     10962521 :                         None
     748              :                     }
     749              :                 }
     750              :             }
     751              :         }
     752     13717888 :     }
     753              : 
     754              :     //
     755              :     // Section 4: Misc internal helpers
     756              :     //
     757              : 
     758              :     /// Find a slot to evict.
     759              :     ///
     760              :     /// On return, the slot is empty and write-locked.
     761     13717888 :     fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     762     13717888 :         let iter_limit = self.slots.len() * 10;
     763     13717888 :         let mut iters = 0;
     764     61375494 :         loop {
     765     61375494 :             iters += 1;
     766     61375494 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     767     61375494 : 
     768     61375494 :             let slot = &self.slots[slot_idx];
     769     61375494 : 
     770     61375494 :             if slot.dec_usage_count() == 0 {
     771     13717894 :                 let mut inner = match slot.inner.try_write() {
     772     13717888 :                     Ok(inner) => inner,
     773            6 :                     Err(_err) => {
     774            6 :                         // If we have looped through the whole buffer pool 10 times
     775            6 :                         // and still haven't found a victim buffer, something's wrong.
     776            6 :                         // Maybe all the buffers were in locked. That could happen in
     777            6 :                         // theory, if you have more threads holding buffers locked than
     778            6 :                         // there are buffers in the pool. In practice, with a reasonably
     779            6 :                         // large buffer pool it really shouldn't happen.
     780            6 :                         if iters > iter_limit {
     781            0 :                             anyhow::bail!("exceeded evict iter limit");
     782            6 :                         }
     783            6 :                         continue;
     784              :                     }
     785              :                 };
     786     13717888 :                 if let Some(old_key) = &inner.key {
     787     10815551 :                     // remove mapping for old buffer
     788     10815551 :                     self.remove_mapping(old_key);
     789     10815551 :                     inner.key = None;
     790     10815551 :                 }
     791     13717888 :                 return Ok((slot_idx, inner));
     792     47657600 :             }
     793              :         }
     794     13717888 :     }
     795              : 
     796              :     /// Initialize a new page cache
     797              :     ///
     798              :     /// This should be called only once at page server startup.
     799          576 :     fn new(num_pages: usize) -> Self {
     800          576 :         assert!(num_pages > 0, "page cache size must be > 0");
     801              : 
     802              :         // We use Box::leak here and into_boxed_slice to avoid leaking uninitialized
     803              :         // memory that Vec's might contain.
     804          576 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     805          576 : 
     806          576 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     807          576 :         size_metrics.max_bytes.set_page_sz(num_pages);
     808          576 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     809          576 :         size_metrics.current_bytes_materialized_page.set_page_sz(0);
     810          576 : 
     811          576 :         let slots = page_buffer
     812          576 :             .chunks_exact_mut(PAGE_SZ)
     813      4687206 :             .map(|chunk| {
     814      4687206 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     815      4687206 : 
     816      4687206 :                 Slot {
     817      4687206 :                     inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
     818      4687206 :                     usage_count: AtomicU8::new(0),
     819      4687206 :                 }
     820      4687206 :             })
     821          576 :             .collect();
     822          576 : 
     823          576 :         Self {
     824          576 :             materialized_page_map: Default::default(),
     825          576 :             immutable_page_map: Default::default(),
     826          576 :             slots,
     827          576 :             next_evict_slot: AtomicUsize::new(0),
     828          576 :             size_metrics,
     829          576 :         }
     830          576 :     }
     831              : }
     832              : 
     833              : trait PageSzBytesMetric {
     834              :     fn set_page_sz(&self, count: usize);
     835              :     fn add_page_sz(&self, count: usize);
     836              :     fn sub_page_sz(&self, count: usize);
     837              : }
     838              : 
     839              : #[inline(always)]
     840     24534710 : fn count_times_page_sz(count: usize) -> u64 {
     841     24534710 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     842     24534710 : }
     843              : 
     844              : impl PageSzBytesMetric for metrics::UIntGauge {
     845         1728 :     fn set_page_sz(&self, count: usize) {
     846         1728 :         self.set(count_times_page_sz(count));
     847         1728 :     }
     848     13717426 :     fn add_page_sz(&self, count: usize) {
     849     13717426 :         self.add(count_times_page_sz(count));
     850     13717426 :     }
     851     10815556 :     fn sub_page_sz(&self, count: usize) {
     852     10815556 :         self.sub(count_times_page_sz(count));
     853     10815556 :     }
     854              : }
        

Generated by: LCOV version 2.1-beta