LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: c789ec21f6053d4c25d2419c4a34ed298d5f69f5.info Lines: 63.4 % 464 294
Test Date: 2024-06-20 08:12:09 Functions: 79.5 % 44 35

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Materialized pages**, filled & used by page reconstruction
      21              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      22              : //!
      23              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      24              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      25              : //!
      26              : //! # Filling The Page Cache
      27              : //!
      28              : //! Page cache maps from a cache key to a buffer slot.
      29              : //! The cache key uniquely identifies the piece of data that is being cached.
      30              : //!
      31              : //! The cache key for **materialized pages** is  [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
      32              : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
      33              : //!
      34              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      35              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      36              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      37              : //! * Get a [`FileId`] using [`next_file_id`].
      38              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      39              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      40              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      41              : //!   a read guard for the page. Just use it.
      42              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      43              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      44              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      45              : //!   Then try again to [`PageCache::read_immutable_buf`].
      46              : //!   Unless there's high cache pressure, the page should now be cached.
      47              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      48              : //!
      49              : //! # Locking
      50              : //!
      51              : //! There are two levels of locking involved: There's one lock for the "mapping"
      52              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      53              : //! slot, and a separate lock on each slot. To read or write the contents of a
      54              : //! slot, you must hold the lock on the slot in read or write mode,
      55              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      56              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      57              : //! the slot at the same time.
      58              : //!
      59              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      60              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      61              : //! in the cache, you would first look up the mapping, while holding the mapping
      62              : //! lock, and then lock the slot. You must release the mapping lock in between,
      63              : //! to obey the lock ordering and avoid deadlock.
      64              : //!
      65              : //! A slot can momentarily have invalid contents, even if it's already been
      66              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      67              : //! the contents are valid. If you need to release the lock without initializing
      68              : //! the contents, you must remove the mapping first. We make that easy for the
      69              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      70              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      71              : //! mapping is automatically removed and the slot is marked free.
      72              : //!
      73              : 
      74              : use std::{
      75              :     collections::{hash_map::Entry, HashMap},
      76              :     sync::{
      77              :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      78              :         Arc, Weak,
      79              :     },
      80              :     time::Duration,
      81              : };
      82              : 
      83              : use anyhow::Context;
      84              : use once_cell::sync::OnceCell;
      85              : use pageserver_api::shard::TenantShardId;
      86              : use utils::{id::TimelineId, lsn::Lsn};
      87              : 
      88              : use crate::{
      89              :     context::RequestContext,
      90              :     metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
      91              :     repository::Key,
      92              : };
      93              : 
      94              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      95              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      96              : 
      97              : ///
      98              : /// Initialize the page cache. This must be called once at page server startup.
      99              : ///
     100            0 : pub fn init(size: usize) {
     101            0 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
     102            0 :         panic!("page cache already initialized");
     103            0 :     }
     104            0 : }
     105              : 
     106              : ///
     107              : /// Get a handle to the page cache.
     108              : ///
     109      7671985 : pub fn get() -> &'static PageCache {
     110      7671985 :     //
     111      7671985 :     // In unit tests, page server startup doesn't happen and no one calls
     112      7671985 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     113      7671985 :     // page cache is usable in unit tests.
     114      7671985 :     //
     115      7671985 :     if cfg!(test) {
     116      7671985 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     117              :     } else {
     118            0 :         PAGE_CACHE.get().expect("page cache not initialized")
     119              :     }
     120      7671985 : }
     121              : 
     122              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     123              : const MAX_USAGE_COUNT: u8 = 5;
     124              : 
     125              : /// See module-level comment.
     126              : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     127              : pub struct FileId(u64);
     128              : 
     129              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     130              : 
     131              : /// See module-level comment.
     132         2309 : pub fn next_file_id() -> FileId {
     133         2309 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     134         2309 : }
     135              : 
     136              : ///
     137              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     138              : ///
     139              : #[derive(Debug, PartialEq, Eq, Clone)]
     140              : #[allow(clippy::enum_variant_names)]
     141              : enum CacheKey {
     142              :     MaterializedPage {
     143              :         hash_key: MaterializedPageHashKey,
     144              :         lsn: Lsn,
     145              :     },
     146              :     ImmutableFilePage {
     147              :         file_id: FileId,
     148              :         blkno: u32,
     149              :     },
     150              : }
     151              : 
     152              : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
     153              : struct MaterializedPageHashKey {
     154              :     /// Why is this TenantShardId rather than TenantId?
     155              :     ///
     156              :     /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant.  However, this
     157              :     /// this not the case for certain internally-generated pages (e.g. relation sizes).  In future, we may make this
     158              :     /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
     159              :     /// special-cased in some other way.
     160              :     tenant_shard_id: TenantShardId,
     161              :     timeline_id: TimelineId,
     162              :     key: Key,
     163              : }
     164              : 
     165              : #[derive(Clone)]
     166              : struct Version {
     167              :     lsn: Lsn,
     168              :     slot_idx: usize,
     169              : }
     170              : 
     171              : struct Slot {
     172              :     inner: tokio::sync::RwLock<SlotInner>,
     173              :     usage_count: AtomicU8,
     174              : }
     175              : 
     176              : struct SlotInner {
     177              :     key: Option<CacheKey>,
     178              :     // for `coalesce_readers_permit`
     179              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     180              :     buf: &'static mut [u8; PAGE_SZ],
     181              : }
     182              : 
     183              : impl Slot {
     184              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     185      6927487 :     fn inc_usage_count(&self) {
     186      6927487 :         let _ = self
     187      6927487 :             .usage_count
     188      6927487 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     189      6927487 :                 if val == MAX_USAGE_COUNT {
     190      6606892 :                     None
     191              :                 } else {
     192       320595 :                     Some(val + 1)
     193              :                 }
     194      6927487 :             });
     195      6927487 :     }
     196              : 
     197              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     198              :     /// the old usage count.
     199       650804 :     fn dec_usage_count(&self) -> u8 {
     200       650804 :         let count_res =
     201       650804 :             self.usage_count
     202       650804 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     203       650804 :                     if val == 0 {
     204       166630 :                         None
     205              :                     } else {
     206       484174 :                         Some(val - 1)
     207              :                     }
     208       650804 :                 });
     209       650804 : 
     210       650804 :         match count_res {
     211       484174 :             Ok(usage_count) => usage_count,
     212       166630 :             Err(usage_count) => usage_count,
     213              :         }
     214       650804 :     }
     215              : 
     216              :     /// Sets the usage count to a specific value.
     217       166628 :     fn set_usage_count(&self, count: u8) {
     218       166628 :         self.usage_count.store(count, Ordering::Relaxed);
     219       166628 :     }
     220              : }
     221              : 
     222              : impl SlotInner {
     223              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     224      6927487 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     225      6927487 :         let mut guard = self.permit.lock().unwrap();
     226      6927487 :         if let Some(existing_permit) = guard.upgrade() {
     227            0 :             drop(guard);
     228            0 :             drop(permit);
     229            0 :             existing_permit
     230              :         } else {
     231      6927487 :             let permit = Arc::new(permit);
     232      6927487 :             *guard = Arc::downgrade(&permit);
     233      6927487 :             permit
     234              :         }
     235      6927487 :     }
     236              : }
     237              : 
     238              : pub struct PageCache {
     239              :     /// This contains the mapping from the cache key to buffer slot that currently
     240              :     /// contains the page, if any.
     241              :     ///
     242              :     /// TODO: This is protected by a single lock. If that becomes a bottleneck,
     243              :     /// this HashMap can be replaced with a more concurrent version, there are
     244              :     /// plenty of such crates around.
     245              :     ///
     246              :     /// If you add support for caching different kinds of objects, each object kind
     247              :     /// can have a separate mapping map, next to this field.
     248              :     materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
     249              : 
     250              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     251              : 
     252              :     /// The actual buffers with their metadata.
     253              :     slots: Box<[Slot]>,
     254              : 
     255              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     256              : 
     257              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     258              :     /// This is interpreted modulo the page cache size.
     259              :     next_evict_slot: AtomicUsize,
     260              : 
     261              :     size_metrics: &'static PageCacheSizeMetrics,
     262              : }
     263              : 
     264              : struct PinnedSlotsPermit {
     265              :     _permit: tokio::sync::OwnedSemaphorePermit,
     266              : }
     267              : 
     268              : ///
     269              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     270              : /// until the guard is dropped.
     271              : ///
     272              : pub struct PageReadGuard<'i> {
     273              :     _permit: Arc<PinnedSlotsPermit>,
     274              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     275              : }
     276              : 
     277              : impl std::ops::Deref for PageReadGuard<'_> {
     278              :     type Target = [u8; PAGE_SZ];
     279              : 
     280     13497675 :     fn deref(&self) -> &Self::Target {
     281     13497675 :         self.slot_guard.buf
     282     13497675 :     }
     283              : }
     284              : 
     285              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     286            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     287            0 :         self.slot_guard.buf
     288            0 :     }
     289              : }
     290              : 
     291              : ///
     292              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     293              : /// until the guard is dropped.
     294              : ///
     295              : /// Counterintuitively, this is used even for a read, if the requested page is not
     296              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     297              : /// is expected to fill in the page contents and call mark_valid().
     298              : pub struct PageWriteGuard<'i> {
     299              :     state: PageWriteGuardState<'i>,
     300              : }
     301              : 
     302              : enum PageWriteGuardState<'i> {
     303              :     Invalid {
     304              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     305              :         _permit: PinnedSlotsPermit,
     306              :     },
     307              :     Downgraded,
     308              : }
     309              : 
     310              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     311       166628 :     fn deref_mut(&mut self) -> &mut Self::Target {
     312       166628 :         match &mut self.state {
     313       166628 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     314            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     315              :         }
     316       166628 :     }
     317              : }
     318              : 
     319              : impl std::ops::Deref for PageWriteGuard<'_> {
     320              :     type Target = [u8; PAGE_SZ];
     321              : 
     322       401069 :     fn deref(&self) -> &Self::Target {
     323       401069 :         match &self.state {
     324       401069 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     325            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     326              :         }
     327       401069 :     }
     328              : }
     329              : 
     330              : impl<'a> PageWriteGuard<'a> {
     331              :     /// Mark that the buffer contents are now valid.
     332              :     #[must_use]
     333       166628 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     334       166628 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     335       166628 :         match prev {
     336       166628 :             PageWriteGuardState::Invalid { inner, _permit } => {
     337       166628 :                 assert!(inner.key.is_some());
     338       166628 :                 PageReadGuard {
     339       166628 :                     _permit: Arc::new(_permit),
     340       166628 :                     slot_guard: inner.downgrade(),
     341       166628 :                 }
     342              :             }
     343            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     344              :         }
     345       166628 :     }
     346              : }
     347              : 
     348              : impl Drop for PageWriteGuard<'_> {
     349              :     ///
     350              :     /// If the buffer was allocated for a page that was not already in the
     351              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     352              :     /// initializing it, remove the mapping from the page cache.
     353              :     ///
     354       166628 :     fn drop(&mut self) {
     355       166628 :         match &mut self.state {
     356            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     357            0 :                 assert!(inner.key.is_some());
     358            0 :                 let self_key = inner.key.as_ref().unwrap();
     359            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     360            0 :                 inner.key = None;
     361              :             }
     362       166628 :             PageWriteGuardState::Downgraded => {}
     363              :         }
     364       166628 :     }
     365              : }
     366              : 
     367              : /// lock_for_read() return value
     368              : pub enum ReadBufResult<'a> {
     369              :     Found(PageReadGuard<'a>),
     370              :     NotFound(PageWriteGuard<'a>),
     371              : }
     372              : 
     373              : impl PageCache {
     374              :     //
     375              :     // Section 1.1: Public interface functions for looking up and memorizing materialized page
     376              :     // versions in the page cache
     377              :     //
     378              : 
     379              :     /// Look up a materialized page version.
     380              :     ///
     381              :     /// The 'lsn' is an upper bound, this will return the latest version of
     382              :     /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
     383              :     /// returned page.
     384       624140 :     pub async fn lookup_materialized_page(
     385       624140 :         &self,
     386       624140 :         tenant_shard_id: TenantShardId,
     387       624140 :         timeline_id: TimelineId,
     388       624140 :         key: &Key,
     389       624140 :         lsn: Lsn,
     390       624140 :         ctx: &RequestContext,
     391       624140 :     ) -> Option<(Lsn, PageReadGuard)> {
     392       624140 :         let Ok(permit) = self.try_get_pinned_slot_permit().await else {
     393            0 :             return None;
     394              :         };
     395              : 
     396       624140 :         crate::metrics::PAGE_CACHE
     397       624140 :             .for_ctx(ctx)
     398       624140 :             .read_accesses_materialized_page
     399       624140 :             .inc();
     400       624140 : 
     401       624140 :         let mut cache_key = CacheKey::MaterializedPage {
     402       624140 :             hash_key: MaterializedPageHashKey {
     403       624140 :                 tenant_shard_id,
     404       624140 :                 timeline_id,
     405       624140 :                 key: *key,
     406       624140 :             },
     407       624140 :             lsn,
     408       624140 :         };
     409              : 
     410       624140 :         if let Some(guard) = self
     411       624140 :             .try_lock_for_read(&mut cache_key, &mut Some(permit))
     412            0 :             .await
     413              :         {
     414              :             if let CacheKey::MaterializedPage {
     415              :                 hash_key: _,
     416            0 :                 lsn: available_lsn,
     417            0 :             } = cache_key
     418              :             {
     419            0 :                 if available_lsn == lsn {
     420            0 :                     crate::metrics::PAGE_CACHE
     421            0 :                         .for_ctx(ctx)
     422            0 :                         .read_hits_materialized_page_exact
     423            0 :                         .inc();
     424            0 :                 } else {
     425            0 :                     crate::metrics::PAGE_CACHE
     426            0 :                         .for_ctx(ctx)
     427            0 :                         .read_hits_materialized_page_older_lsn
     428            0 :                         .inc();
     429            0 :                 }
     430            0 :                 Some((available_lsn, guard))
     431              :             } else {
     432            0 :                 panic!("unexpected key type in slot");
     433              :             }
     434              :         } else {
     435       624140 :             None
     436              :         }
     437       624140 :     }
     438              : 
     439              :     ///
     440              :     /// Store an image of the given page in the cache.
     441              :     ///
     442            0 :     pub async fn memorize_materialized_page(
     443            0 :         &self,
     444            0 :         tenant_shard_id: TenantShardId,
     445            0 :         timeline_id: TimelineId,
     446            0 :         key: Key,
     447            0 :         lsn: Lsn,
     448            0 :         img: &[u8],
     449            0 :     ) -> anyhow::Result<()> {
     450            0 :         let cache_key = CacheKey::MaterializedPage {
     451            0 :             hash_key: MaterializedPageHashKey {
     452            0 :                 tenant_shard_id,
     453            0 :                 timeline_id,
     454            0 :                 key,
     455            0 :             },
     456            0 :             lsn,
     457            0 :         };
     458              : 
     459            0 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     460              :         loop {
     461              :             // First check if the key already exists in the cache.
     462            0 :             if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
     463              :                 // The page was found in the mapping. Lock the slot, and re-check
     464              :                 // that it's still what we expected (because we don't released the mapping
     465              :                 // lock already, another thread could have evicted the page)
     466            0 :                 let slot = &self.slots[slot_idx];
     467            0 :                 let inner = slot.inner.write().await;
     468            0 :                 if inner.key.as_ref() == Some(&cache_key) {
     469            0 :                     slot.inc_usage_count();
     470            0 :                     debug_assert!(
     471              :                         {
     472            0 :                             let guard = inner.permit.lock().unwrap();
     473            0 :                             guard.upgrade().is_none()
     474              :                         },
     475            0 :                         "we hold a write lock, so, no one else should have a permit"
     476              :                     );
     477            0 :                     debug_assert_eq!(inner.buf.len(), img.len());
     478              :                     // We already had it in cache. Another thread must've put it there
     479              :                     // concurrently. Check that it had the same contents that we
     480              :                     // replayed.
     481            0 :                     assert!(inner.buf == img);
     482            0 :                     return Ok(());
     483            0 :                 }
     484            0 :             }
     485            0 :             debug_assert!(permit.is_some());
     486              : 
     487              :             // Not found. Find a victim buffer
     488            0 :             let (slot_idx, mut inner) = self
     489            0 :                 .find_victim(permit.as_ref().unwrap())
     490            0 :                 .await
     491            0 :                 .context("Failed to find evict victim")?;
     492              : 
     493              :             // Insert mapping for this. At this point, we may find that another
     494              :             // thread did the same thing concurrently. In that case, we evicted
     495              :             // our victim buffer unnecessarily. Put it into the free list and
     496              :             // continue with the slot that the other thread chose.
     497            0 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
     498              :                 // TODO: put to free list
     499              : 
     500              :                 // We now just loop back to start from beginning. This is not
     501              :                 // optimal, we'll perform the lookup in the mapping again, which
     502              :                 // is not really necessary because we already got
     503              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     504              :                 // to matter much.
     505            0 :                 continue;
     506            0 :             }
     507            0 : 
     508            0 :             // Make the slot ready
     509            0 :             let slot = &self.slots[slot_idx];
     510            0 :             inner.key = Some(cache_key.clone());
     511            0 :             slot.set_usage_count(1);
     512            0 :             // Create a write guard for the slot so we go through the expected motions.
     513            0 :             debug_assert!(
     514              :                 {
     515            0 :                     let guard = inner.permit.lock().unwrap();
     516            0 :                     guard.upgrade().is_none()
     517              :                 },
     518            0 :                 "we hold a write lock, so, no one else should have a permit"
     519              :             );
     520            0 :             let mut write_guard = PageWriteGuard {
     521            0 :                 state: PageWriteGuardState::Invalid {
     522            0 :                     _permit: permit.take().unwrap(),
     523            0 :                     inner,
     524            0 :                 },
     525            0 :             };
     526            0 :             write_guard.copy_from_slice(img);
     527            0 :             let _ = write_guard.mark_valid();
     528            0 :             return Ok(());
     529              :         }
     530            0 :     }
     531              : 
     532              :     // Section 1.2: Public interface functions for working with immutable file pages.
     533              : 
     534      7094115 :     pub async fn read_immutable_buf(
     535      7094115 :         &self,
     536      7094115 :         file_id: FileId,
     537      7094115 :         blkno: u32,
     538      7094115 :         ctx: &RequestContext,
     539      7094115 :     ) -> anyhow::Result<ReadBufResult> {
     540      7094115 :         let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
     541      7094115 : 
     542      7094115 :         self.lock_for_read(&mut cache_key, ctx).await
     543      7094115 :     }
     544              : 
     545              :     //
     546              :     // Section 2: Internal interface functions for lookup/update.
     547              :     //
     548              :     // To add support for a new kind of "thing" to cache, you will need
     549              :     // to add public interface routines above, and code to deal with the
     550              :     // "mappings" after this section. But the routines in this section should
     551              :     // not require changes.
     552              : 
     553      7718255 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     554      7718255 :         match tokio::time::timeout(
     555      7718255 :             // Choose small timeout, neon_smgr does its own retries.
     556      7718255 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     557      7718255 :             Duration::from_secs(10),
     558      7718255 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     559      7718255 :         )
     560        63263 :         .await
     561              :         {
     562      7718255 :             Ok(res) => Ok(PinnedSlotsPermit {
     563      7718255 :                 _permit: res.expect("this semaphore is never closed"),
     564      7718255 :             }),
     565            0 :             Err(_timeout) => {
     566            0 :                 crate::metrics::page_cache_errors_inc(
     567            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     568            0 :                 );
     569            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     570              :             }
     571              :         }
     572      7718255 :     }
     573              : 
     574              :     /// Look up a page in the cache.
     575              :     ///
     576              :     /// If the search criteria is not exact, *cache_key is updated with the key
     577              :     /// for exact key of the returned page. (For materialized pages, that means
     578              :     /// that the LSN in 'cache_key' is updated with the LSN of the returned page
     579              :     /// version.)
     580              :     ///
     581              :     /// If no page is found, returns None and *cache_key is left unmodified.
     582              :     ///
     583      7718255 :     async fn try_lock_for_read(
     584      7718255 :         &self,
     585      7718255 :         cache_key: &mut CacheKey,
     586      7718255 :         permit: &mut Option<PinnedSlotsPermit>,
     587      7718255 :     ) -> Option<PageReadGuard> {
     588      7718255 :         let cache_key_orig = cache_key.clone();
     589      7718255 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     590              :             // The page was found in the mapping. Lock the slot, and re-check
     591              :             // that it's still what we expected (because we released the mapping
     592              :             // lock already, another thread could have evicted the page)
     593      6927487 :             let slot = &self.slots[slot_idx];
     594      6927487 :             let inner = slot.inner.read().await;
     595      6927487 :             if inner.key.as_ref() == Some(cache_key) {
     596      6927487 :                 slot.inc_usage_count();
     597      6927487 :                 return Some(PageReadGuard {
     598      6927487 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     599      6927487 :                     slot_guard: inner,
     600      6927487 :                 });
     601            0 :             } else {
     602            0 :                 // search_mapping might have modified the search key; restore it.
     603            0 :                 *cache_key = cache_key_orig;
     604            0 :             }
     605       790768 :         }
     606       790768 :         None
     607      7718255 :     }
     608              : 
     609              :     /// Return a locked buffer for given block.
     610              :     ///
     611              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     612              :     /// page is already found in the cache, *cache_key is updated.
     613              :     ///
     614              :     /// If the page is not found in the cache, this allocates a new buffer for
     615              :     /// it. The caller may then initialize the buffer with the contents, and
     616              :     /// call mark_valid().
     617              :     ///
     618              :     /// Example usage:
     619              :     ///
     620              :     /// ```ignore
     621              :     /// let cache = page_cache::get();
     622              :     ///
     623              :     /// match cache.lock_for_read(&key) {
     624              :     ///     ReadBufResult::Found(read_guard) => {
     625              :     ///         // The page was found in cache. Use it
     626              :     ///     },
     627              :     ///     ReadBufResult::NotFound(write_guard) => {
     628              :     ///         // The page was not found in cache. Read it from disk into the
     629              :     ///         // buffer.
     630              :     ///         //read_my_page_from_disk(write_guard);
     631              :     ///
     632              :     ///         // The buffer contents are now valid. Tell the page cache.
     633              :     ///         write_guard.mark_valid();
     634              :     ///     },
     635              :     /// }
     636              :     /// ```
     637              :     ///
     638      7094115 :     async fn lock_for_read(
     639      7094115 :         &self,
     640      7094115 :         cache_key: &mut CacheKey,
     641      7094115 :         ctx: &RequestContext,
     642      7094115 :     ) -> anyhow::Result<ReadBufResult> {
     643      7094115 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     644              : 
     645      7094115 :         let (read_access, hit) = match cache_key {
     646              :             CacheKey::MaterializedPage { .. } => {
     647            0 :                 unreachable!("Materialized pages use lookup_materialized_page")
     648              :             }
     649      7094115 :             CacheKey::ImmutableFilePage { .. } => (
     650      7094115 :                 &crate::metrics::PAGE_CACHE
     651      7094115 :                     .for_ctx(ctx)
     652      7094115 :                     .read_accesses_immutable,
     653      7094115 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     654      7094115 :             ),
     655      7094115 :         };
     656      7094115 :         read_access.inc();
     657      7094115 : 
     658      7094115 :         let mut is_first_iteration = true;
     659              :         loop {
     660              :             // First check if the key already exists in the cache.
     661      7094115 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     662      6927487 :                 debug_assert!(permit.is_none());
     663      6927487 :                 if is_first_iteration {
     664      6927487 :                     hit.inc();
     665      6927487 :                 }
     666      6927487 :                 return Ok(ReadBufResult::Found(read_guard));
     667       166628 :             }
     668       166628 :             debug_assert!(permit.is_some());
     669       166628 :             is_first_iteration = false;
     670              : 
     671              :             // Not found. Find a victim buffer
     672       166628 :             let (slot_idx, mut inner) = self
     673       166628 :                 .find_victim(permit.as_ref().unwrap())
     674            0 :                 .await
     675       166628 :                 .context("Failed to find evict victim")?;
     676              : 
     677              :             // Insert mapping for this. At this point, we may find that another
     678              :             // thread did the same thing concurrently. In that case, we evicted
     679              :             // our victim buffer unnecessarily. Put it into the free list and
     680              :             // continue with the slot that the other thread chose.
     681       166628 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     682              :                 // TODO: put to free list
     683              : 
     684              :                 // We now just loop back to start from beginning. This is not
     685              :                 // optimal, we'll perform the lookup in the mapping again, which
     686              :                 // is not really necessary because we already got
     687              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     688              :                 // to matter much.
     689            0 :                 continue;
     690       166628 :             }
     691       166628 : 
     692       166628 :             // Make the slot ready
     693       166628 :             let slot = &self.slots[slot_idx];
     694       166628 :             inner.key = Some(cache_key.clone());
     695       166628 :             slot.set_usage_count(1);
     696       166628 : 
     697       166628 :             debug_assert!(
     698              :                 {
     699       166628 :                     let guard = inner.permit.lock().unwrap();
     700       166628 :                     guard.upgrade().is_none()
     701              :                 },
     702            0 :                 "we hold a write lock, so, no one else should have a permit"
     703              :             );
     704              : 
     705       166628 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     706       166628 :                 state: PageWriteGuardState::Invalid {
     707       166628 :                     _permit: permit.take().unwrap(),
     708       166628 :                     inner,
     709       166628 :                 },
     710       166628 :             }));
     711              :         }
     712      7094115 :     }
     713              : 
     714              :     //
     715              :     // Section 3: Mapping functions
     716              :     //
     717              : 
     718              :     /// Search for a page in the cache using the given search key.
     719              :     ///
     720              :     /// Returns the slot index, if any. If the search criteria is not exact,
     721              :     /// *cache_key is updated with the actual key of the found page.
     722              :     ///
     723              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     724              :     /// get recycled for an unrelated page immediately after this function
     725              :     /// returns.  The caller is responsible for re-checking that the slot still
     726              :     /// contains the page with the same key before using it.
     727              :     ///
     728      7718255 :     fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
     729      7718255 :         match cache_key {
     730       624140 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     731       624140 :                 let map = self.materialized_page_map.read().unwrap();
     732       624140 :                 let versions = map.get(hash_key)?;
     733              : 
     734            0 :                 let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
     735            0 :                     Ok(version_idx) => version_idx,
     736            0 :                     Err(0) => return None,
     737            0 :                     Err(version_idx) => version_idx - 1,
     738              :                 };
     739            0 :                 let version = &versions[version_idx];
     740            0 :                 *lsn = version.lsn;
     741            0 :                 Some(version.slot_idx)
     742              :             }
     743      7094115 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     744      7094115 :                 let map = self.immutable_page_map.read().unwrap();
     745      7094115 :                 Some(*map.get(&(*file_id, *blkno))?)
     746              :             }
     747              :         }
     748      7718255 :     }
     749              : 
     750              :     /// Search for a page in the cache using the given search key.
     751              :     ///
     752              :     /// Like 'search_mapping, but performs an "exact" search. Used for
     753              :     /// allocating a new buffer.
     754            0 :     fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
     755            0 :         match key {
     756            0 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     757            0 :                 let map = self.materialized_page_map.read().unwrap();
     758            0 :                 let versions = map.get(hash_key)?;
     759              : 
     760            0 :                 if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
     761            0 :                     Some(versions[version_idx].slot_idx)
     762              :                 } else {
     763            0 :                     None
     764              :                 }
     765              :             }
     766            0 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     767            0 :                 let map = self.immutable_page_map.read().unwrap();
     768            0 :                 Some(*map.get(&(*file_id, *blkno))?)
     769              :             }
     770              :         }
     771            0 :     }
     772              : 
     773              :     ///
     774              :     /// Remove mapping for given key.
     775              :     ///
     776       164796 :     fn remove_mapping(&self, old_key: &CacheKey) {
     777       164796 :         match old_key {
     778              :             CacheKey::MaterializedPage {
     779            0 :                 hash_key: old_hash_key,
     780            0 :                 lsn: old_lsn,
     781            0 :             } => {
     782            0 :                 let mut map = self.materialized_page_map.write().unwrap();
     783            0 :                 if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
     784            0 :                     let versions = old_entry.get_mut();
     785              : 
     786            0 :                     if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
     787            0 :                         versions.remove(version_idx);
     788            0 :                         self.size_metrics
     789            0 :                             .current_bytes_materialized_page
     790            0 :                             .sub_page_sz(1);
     791            0 :                         if versions.is_empty() {
     792            0 :                             old_entry.remove_entry();
     793            0 :                         }
     794            0 :                     }
     795              :                 } else {
     796            0 :                     panic!("could not find old key in mapping")
     797              :                 }
     798              :             }
     799       164796 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     800       164796 :                 let mut map = self.immutable_page_map.write().unwrap();
     801       164796 :                 map.remove(&(*file_id, *blkno))
     802       164796 :                     .expect("could not find old key in mapping");
     803       164796 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     804       164796 :             }
     805              :         }
     806       164796 :     }
     807              : 
     808              :     ///
     809              :     /// Insert mapping for given key.
     810              :     ///
     811              :     /// If a mapping already existed for the given key, returns the slot index
     812              :     /// of the existing mapping and leaves it untouched.
     813       166628 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     814       166628 :         match new_key {
     815              :             CacheKey::MaterializedPage {
     816            0 :                 hash_key: new_key,
     817            0 :                 lsn: new_lsn,
     818            0 :             } => {
     819            0 :                 let mut map = self.materialized_page_map.write().unwrap();
     820            0 :                 let versions = map.entry(new_key.clone()).or_default();
     821            0 :                 match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
     822            0 :                     Ok(version_idx) => Some(versions[version_idx].slot_idx),
     823            0 :                     Err(version_idx) => {
     824            0 :                         versions.insert(
     825            0 :                             version_idx,
     826            0 :                             Version {
     827            0 :                                 lsn: *new_lsn,
     828            0 :                                 slot_idx,
     829            0 :                             },
     830            0 :                         );
     831            0 :                         self.size_metrics
     832            0 :                             .current_bytes_materialized_page
     833            0 :                             .add_page_sz(1);
     834            0 :                         None
     835              :                     }
     836              :                 }
     837              :             }
     838              : 
     839       166628 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     840       166628 :                 let mut map = self.immutable_page_map.write().unwrap();
     841       166628 :                 match map.entry((*file_id, *blkno)) {
     842            0 :                     Entry::Occupied(entry) => Some(*entry.get()),
     843       166628 :                     Entry::Vacant(entry) => {
     844       166628 :                         entry.insert(slot_idx);
     845       166628 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     846       166628 :                         None
     847              :                     }
     848              :                 }
     849              :             }
     850              :         }
     851       166628 :     }
     852              : 
     853              :     //
     854              :     // Section 4: Misc internal helpers
     855              :     //
     856              : 
     857              :     /// Find a slot to evict.
     858              :     ///
     859              :     /// On return, the slot is empty and write-locked.
     860       166628 :     async fn find_victim(
     861       166628 :         &self,
     862       166628 :         _permit_witness: &PinnedSlotsPermit,
     863       166628 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     864       166628 :         let iter_limit = self.slots.len() * 10;
     865       166628 :         let mut iters = 0;
     866       650804 :         loop {
     867       650804 :             iters += 1;
     868       650804 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     869       650804 : 
     870       650804 :             let slot = &self.slots[slot_idx];
     871       650804 : 
     872       650804 :             if slot.dec_usage_count() == 0 {
     873       166630 :                 let mut inner = match slot.inner.try_write() {
     874       166628 :                     Ok(inner) => inner,
     875            2 :                     Err(_err) => {
     876            2 :                         if iters > iter_limit {
     877              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     878              :                             // any particular number of iterations: other threads might race ahead and acquire and
     879              :                             // release pins just as we're scanning the array.
     880              :                             //
     881              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     882              :                             // slots. There are two threads running concurrently, A and B. A has just
     883              :                             // acquired the permit from the semaphore.
     884              :                             //
     885              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     886              :                             //   B: Acquire permit.
     887              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     888              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     889              :                             //   B: Release pin and permit again
     890              :                             //   B: Acquire permit.
     891              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     892              :                             //   B: Release pin and permit again
     893              :                             //
     894              :                             // Now we're back in the starting situation that both slots have
     895              :                             // usage_count 1, but A has now been through one iteration of the
     896              :                             // find_victim() loop. This can repeat indefinitely and on each
     897              :                             // iteration, A's iteration count increases by one.
     898              :                             //
     899              :                             // So, even though the semaphore for the permits is fair, the victim search
     900              :                             // itself happens in parallel and is not fair.
     901              :                             // Hence even with a permit, a task can theoretically be starved.
     902              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     903              :                             // permits for longer.
     904              :                             // Note that just yielding to tokio during iteration without such
     905              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     906              :                             // for B to bump usage count, further starving A.
     907            0 :                             page_cache_eviction_metrics::observe(
     908            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     909            0 :                                     iters: iters.try_into().unwrap(),
     910            0 :                                 },
     911            0 :                             );
     912            0 :                             anyhow::bail!("exceeded evict iter limit");
     913            2 :                         }
     914            2 :                         continue;
     915              :                     }
     916              :                 };
     917       166628 :                 if let Some(old_key) = &inner.key {
     918       164796 :                     // remove mapping for old buffer
     919       164796 :                     self.remove_mapping(old_key);
     920       164796 :                     inner.key = None;
     921       164796 :                     page_cache_eviction_metrics::observe(
     922       164796 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     923       164796 :                             iters: iters.try_into().unwrap(),
     924       164796 :                         },
     925       164796 :                     );
     926       164796 :                 } else {
     927         1832 :                     page_cache_eviction_metrics::observe(
     928         1832 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     929         1832 :                             iters: iters.try_into().unwrap(),
     930         1832 :                         },
     931         1832 :                     );
     932         1832 :                 }
     933       166628 :                 return Ok((slot_idx, inner));
     934       484174 :             }
     935              :         }
     936       166628 :     }
     937              : 
     938              :     /// Initialize a new page cache
     939              :     ///
     940              :     /// This should be called only once at page server startup.
     941          137 :     fn new(num_pages: usize) -> Self {
     942          137 :         assert!(num_pages > 0, "page cache size must be > 0");
     943              : 
     944              :         // We could use Vec::leak here, but that potentially also leaks
     945              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     946              :         // this is avoided.
     947          137 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     948          137 : 
     949          137 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     950          137 :         size_metrics.max_bytes.set_page_sz(num_pages);
     951          137 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     952          137 :         size_metrics.current_bytes_materialized_page.set_page_sz(0);
     953          137 : 
     954          137 :         let slots = page_buffer
     955          137 :             .chunks_exact_mut(PAGE_SZ)
     956         6850 :             .map(|chunk| {
     957         6850 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     958         6850 : 
     959         6850 :                 Slot {
     960         6850 :                     inner: tokio::sync::RwLock::new(SlotInner {
     961         6850 :                         key: None,
     962         6850 :                         buf,
     963         6850 :                         permit: std::sync::Mutex::new(Weak::new()),
     964         6850 :                     }),
     965         6850 :                     usage_count: AtomicU8::new(0),
     966         6850 :                 }
     967         6850 :             })
     968          137 :             .collect();
     969          137 : 
     970          137 :         Self {
     971          137 :             materialized_page_map: Default::default(),
     972          137 :             immutable_page_map: Default::default(),
     973          137 :             slots,
     974          137 :             next_evict_slot: AtomicUsize::new(0),
     975          137 :             size_metrics,
     976          137 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     977          137 :         }
     978          137 :     }
     979              : }
     980              : 
     981              : trait PageSzBytesMetric {
     982              :     fn set_page_sz(&self, count: usize);
     983              :     fn add_page_sz(&self, count: usize);
     984              :     fn sub_page_sz(&self, count: usize);
     985              : }
     986              : 
     987              : #[inline(always)]
     988       331835 : fn count_times_page_sz(count: usize) -> u64 {
     989       331835 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     990       331835 : }
     991              : 
     992              : impl PageSzBytesMetric for metrics::UIntGauge {
     993          411 :     fn set_page_sz(&self, count: usize) {
     994          411 :         self.set(count_times_page_sz(count));
     995          411 :     }
     996       166628 :     fn add_page_sz(&self, count: usize) {
     997       166628 :         self.add(count_times_page_sz(count));
     998       166628 :     }
     999       164796 :     fn sub_page_sz(&self, count: usize) {
    1000       164796 :         self.sub(count_times_page_sz(count));
    1001       164796 :     }
    1002              : }
        

Generated by: LCOV version 2.1-beta