LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 91.6 % 463 424
Test Date: 2024-02-12 20:26:03 Functions: 91.1 % 56 51

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Materialized pages**, filled & used by page reconstruction
      21              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      22              : //!
      23              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      24              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      25              : //!
      26              : //! # Filling The Page Cache
      27              : //!
      28              : //! Page cache maps from a cache key to a buffer slot.
      29              : //! The cache key uniquely identifies the piece of data that is being cached.
      30              : //!
      31              : //! The cache key for **materialized pages** is  [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
      32              : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
      33              : //!
      34              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      35              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      36              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      37              : //! * Get a [`FileId`] using [`next_file_id`].
      38              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      39              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      40              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      41              : //!   a read guard for the page. Just use it.
      42              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      43              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      44              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      45              : //!   Then try again to [`PageCache::read_immutable_buf`].
      46              : //!   Unless there's high cache pressure, the page should now be cached.
      47              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      48              : //!
      49              : //! # Locking
      50              : //!
      51              : //! There are two levels of locking involved: There's one lock for the "mapping"
      52              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      53              : //! slot, and a separate lock on each slot. To read or write the contents of a
      54              : //! slot, you must hold the lock on the slot in read or write mode,
      55              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      56              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      57              : //! the slot at the same time.
      58              : //!
      59              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      60              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      61              : //! in the cache, you would first look up the mapping, while holding the mapping
      62              : //! lock, and then lock the slot. You must release the mapping lock in between,
      63              : //! to obey the lock ordering and avoid deadlock.
      64              : //!
      65              : //! A slot can momentarily have invalid contents, even if it's already been
      66              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      67              : //! the contents are valid. If you need to release the lock without initializing
      68              : //! the contents, you must remove the mapping first. We make that easy for the
      69              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      70              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      71              : //! mapping is automatically removed and the slot is marked free.
      72              : //!
      73              : 
      74              : use std::{
      75              :     collections::{hash_map::Entry, HashMap},
      76              :     convert::TryInto,
      77              :     sync::{
      78              :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      79              :         Arc, Weak,
      80              :     },
      81              :     time::Duration,
      82              : };
      83              : 
      84              : use anyhow::Context;
      85              : use once_cell::sync::OnceCell;
      86              : use pageserver_api::shard::TenantShardId;
      87              : use utils::{id::TimelineId, lsn::Lsn};
      88              : 
      89              : use crate::{
      90              :     context::RequestContext,
      91              :     metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
      92              :     repository::Key,
      93              : };
      94              : 
      95              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      96              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      97              : 
      98              : ///
      99              : /// Initialize the page cache. This must be called once at page server startup.
     100              : ///
     101          624 : pub fn init(size: usize) {
     102          624 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
     103            0 :         panic!("page cache already initialized");
     104          624 :     }
     105          624 : }
     106              : 
     107              : ///
     108              : /// Get a handle to the page cache.
     109              : ///
     110    180256134 : pub fn get() -> &'static PageCache {
     111    180256134 :     //
     112    180256134 :     // In unit tests, page server startup doesn't happen and no one calls
     113    180256134 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     114    180256134 :     // page cache is usable in unit tests.
     115    180256134 :     //
     116    180256134 :     if cfg!(test) {
     117      3338547 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     118              :     } else {
     119    176917587 :         PAGE_CACHE.get().expect("page cache not initialized")
     120              :     }
     121    180256134 : }
     122              : 
     123              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     124              : const MAX_USAGE_COUNT: u8 = 5;
     125              : 
     126              : /// See module-level comment.
     127    332313253 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     128              : pub struct FileId(u64);
     129              : 
     130              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     131              : 
     132              : /// See module-level comment.
     133        39089 : pub fn next_file_id() -> FileId {
     134        39089 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     135        39089 : }
     136              : 
     137              : ///
     138              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     139              : ///
     140    189060688 : #[derive(Debug, PartialEq, Eq, Clone)]
     141              : #[allow(clippy::enum_variant_names)]
     142              : enum CacheKey {
     143              :     MaterializedPage {
     144              :         hash_key: MaterializedPageHashKey,
     145              :         lsn: Lsn,
     146              :     },
     147              :     ImmutableFilePage {
     148              :         file_id: FileId,
     149              :         blkno: u32,
     150              :     },
     151              : }
     152              : 
     153     14288667 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
     154              : struct MaterializedPageHashKey {
     155              :     /// Why is this TenantShardId rather than TenantId?
     156              :     ///
     157              :     /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant.  However, this
     158              :     /// this not the case for certain internally-generated pages (e.g. relation sizes).  In future, we may make this
     159              :     /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
     160              :     /// special-cased in some other way.
     161              :     tenant_shard_id: TenantShardId,
     162              :     timeline_id: TimelineId,
     163              :     key: Key,
     164              : }
     165              : 
     166            0 : #[derive(Clone)]
     167              : struct Version {
     168              :     lsn: Lsn,
     169              :     slot_idx: usize,
     170              : }
     171              : 
     172              : struct Slot {
     173              :     inner: tokio::sync::RwLock<SlotInner>,
     174              :     usage_count: AtomicU8,
     175              : }
     176              : 
     177              : struct SlotInner {
     178              :     key: Option<CacheKey>,
     179              :     // for `coalesce_readers_permit`
     180              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     181              :     buf: &'static mut [u8; PAGE_SZ],
     182              : }
     183              : 
     184              : impl Slot {
     185              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     186    163478344 :     fn inc_usage_count(&self) {
     187    163478344 :         let _ = self
     188    163478344 :             .usage_count
     189    163480187 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     190    163480187 :                 if val == MAX_USAGE_COUNT {
     191    142457391 :                     None
     192              :                 } else {
     193     21022796 :                     Some(val + 1)
     194              :                 }
     195    163480187 :             });
     196    163478344 :     }
     197              : 
     198              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     199              :     /// the old usage count.
     200     36713175 :     fn dec_usage_count(&self) -> u8 {
     201     36713175 :         let count_res =
     202     36713175 :             self.usage_count
     203     36713177 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     204     36713177 :                     if val == 0 {
     205     11148094 :                         None
     206              :                     } else {
     207     25565083 :                         Some(val - 1)
     208              :                     }
     209     36713177 :                 });
     210     36713175 : 
     211     36713175 :         match count_res {
     212     25565082 :             Ok(usage_count) => usage_count,
     213     11148093 :             Err(usage_count) => usage_count,
     214              :         }
     215     36713175 :     }
     216              : 
     217              :     /// Sets the usage count to a specific value.
     218     11147325 :     fn set_usage_count(&self, count: u8) {
     219     11147325 :         self.usage_count.store(count, Ordering::Relaxed);
     220     11147325 :     }
     221              : }
     222              : 
     223              : impl SlotInner {
     224              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     225    163478340 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     226    163478340 :         let mut guard = self.permit.lock().unwrap();
     227    163478340 :         if let Some(existing_permit) = guard.upgrade() {
     228        77389 :             drop(guard);
     229        77389 :             drop(permit);
     230        77389 :             existing_permit
     231              :         } else {
     232    163400951 :             let permit = Arc::new(permit);
     233    163400951 :             *guard = Arc::downgrade(&permit);
     234    163400951 :             permit
     235              :         }
     236    163478340 :     }
     237              : }
     238              : 
     239              : pub struct PageCache {
     240              :     /// This contains the mapping from the cache key to buffer slot that currently
     241              :     /// contains the page, if any.
     242              :     ///
     243              :     /// TODO: This is protected by a single lock. If that becomes a bottleneck,
     244              :     /// this HashMap can be replaced with a more concurrent version, there are
     245              :     /// plenty of such crates around.
     246              :     ///
     247              :     /// If you add support for caching different kinds of objects, each object kind
     248              :     /// can have a separate mapping map, next to this field.
     249              :     materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
     250              : 
     251              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     252              : 
     253              :     /// The actual buffers with their metadata.
     254              :     slots: Box<[Slot]>,
     255              : 
     256              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     257              : 
     258              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     259              :     /// This is interpreted modulo the page cache size.
     260              :     next_evict_slot: AtomicUsize,
     261              : 
     262              :     size_metrics: &'static PageCacheSizeMetrics,
     263              : }
     264              : 
     265              : struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
     266              : 
     267              : ///
     268              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     269              : /// until the guard is dropped.
     270              : ///
     271              : pub struct PageReadGuard<'i> {
     272              :     _permit: Arc<PinnedSlotsPermit>,
     273              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     274              : }
     275              : 
     276              : impl std::ops::Deref for PageReadGuard<'_> {
     277              :     type Target = [u8; PAGE_SZ];
     278              : 
     279    339690066 :     fn deref(&self) -> &Self::Target {
     280    339690066 :         self.slot_guard.buf
     281    339690066 :     }
     282              : }
     283              : 
     284              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     285            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     286            0 :         self.slot_guard.buf
     287            0 :     }
     288              : }
     289              : 
     290              : ///
     291              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     292              : /// until the guard is dropped.
     293              : ///
     294              : /// Counterintuitively, this is used even for a read, if the requested page is not
     295              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     296              : /// is expected to fill in the page contents and call mark_valid().
     297              : pub struct PageWriteGuard<'i> {
     298              :     state: PageWriteGuardState<'i>,
     299              : }
     300              : 
     301              : enum PageWriteGuardState<'i> {
     302              :     Invalid {
     303              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     304              :         _permit: PinnedSlotsPermit,
     305              :     },
     306              :     Downgraded,
     307              : }
     308              : 
     309              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     310     11147325 :     fn deref_mut(&mut self) -> &mut Self::Target {
     311     11147325 :         match &mut self.state {
     312     11147325 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     313            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     314              :         }
     315     11147325 :     }
     316              : }
     317              : 
     318              : impl std::ops::Deref for PageWriteGuard<'_> {
     319              :     type Target = [u8; PAGE_SZ];
     320              : 
     321     18920954 :     fn deref(&self) -> &Self::Target {
     322     18920954 :         match &self.state {
     323     18920954 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     324            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     325              :         }
     326     18920954 :     }
     327              : }
     328              : 
     329              : impl<'a> PageWriteGuard<'a> {
     330              :     /// Mark that the buffer contents are now valid.
     331              :     #[must_use]
     332     11147324 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     333     11147324 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     334     11147324 :         match prev {
     335     11147324 :             PageWriteGuardState::Invalid { inner, _permit } => {
     336     11147324 :                 assert!(inner.key.is_some());
     337     11147324 :                 PageReadGuard {
     338     11147324 :                     _permit: Arc::new(_permit),
     339     11147324 :                     slot_guard: inner.downgrade(),
     340     11147324 :                 }
     341              :             }
     342            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     343              :         }
     344     11147324 :     }
     345              : }
     346              : 
     347              : impl Drop for PageWriteGuard<'_> {
     348              :     ///
     349              :     /// If the buffer was allocated for a page that was not already in the
     350              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     351              :     /// initializing it, remove the mapping from the page cache.
     352              :     ///
     353     11147328 :     fn drop(&mut self) {
     354     11147328 :         match &mut self.state {
     355            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     356            0 :                 assert!(inner.key.is_some());
     357            0 :                 let self_key = inner.key.as_ref().unwrap();
     358            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     359            0 :                 inner.key = None;
     360              :             }
     361     11147328 :             PageWriteGuardState::Downgraded => {}
     362              :         }
     363     11147328 :     }
     364              : }
     365              : 
     366              : /// lock_for_read() return value
     367              : pub enum ReadBufResult<'a> {
     368              :     Found(PageReadGuard<'a>),
     369              :     NotFound(PageWriteGuard<'a>),
     370              : }
     371              : 
     372              : impl PageCache {
     373              :     //
     374              :     // Section 1.1: Public interface functions for looking up and memorizing materialized page
     375              :     // versions in the page cache
     376              :     //
     377              : 
     378              :     /// Look up a materialized page version.
     379              :     ///
     380              :     /// The 'lsn' is an upper bound, this will return the latest version of
     381              :     /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
     382              :     /// returned page.
     383      7427635 :     pub async fn lookup_materialized_page(
     384      7427635 :         &self,
     385      7427635 :         tenant_shard_id: TenantShardId,
     386      7427635 :         timeline_id: TimelineId,
     387      7427635 :         key: &Key,
     388      7427635 :         lsn: Lsn,
     389      7427635 :         ctx: &RequestContext,
     390      7427646 :     ) -> Option<(Lsn, PageReadGuard)> {
     391      7427646 :         let Ok(permit) = self.try_get_pinned_slot_permit().await else {
     392            0 :             return None;
     393              :         };
     394              : 
     395      7427646 :         crate::metrics::PAGE_CACHE
     396      7427646 :             .for_ctx(ctx)
     397      7427646 :             .read_accesses_materialized_page
     398      7427646 :             .inc();
     399      7427646 : 
     400      7427646 :         let mut cache_key = CacheKey::MaterializedPage {
     401      7427646 :             hash_key: MaterializedPageHashKey {
     402      7427646 :                 tenant_shard_id,
     403      7427646 :                 timeline_id,
     404      7427646 :                 key: *key,
     405      7427646 :             },
     406      7427646 :             lsn,
     407      7427646 :         };
     408              : 
     409      7427646 :         if let Some(guard) = self
     410      7427646 :             .try_lock_for_read(&mut cache_key, &mut Some(permit))
     411         1309 :             .await
     412              :         {
     413              :             if let CacheKey::MaterializedPage {
     414              :                 hash_key: _,
     415      1797170 :                 lsn: available_lsn,
     416      1797170 :             } = cache_key
     417              :             {
     418      1797170 :                 if available_lsn == lsn {
     419        76644 :                     crate::metrics::PAGE_CACHE
     420        76644 :                         .for_ctx(ctx)
     421        76644 :                         .read_hits_materialized_page_exact
     422        76644 :                         .inc();
     423      1720526 :                 } else {
     424      1720526 :                     crate::metrics::PAGE_CACHE
     425      1720526 :                         .for_ctx(ctx)
     426      1720526 :                         .read_hits_materialized_page_older_lsn
     427      1720526 :                         .inc();
     428      1720526 :                 }
     429      1797170 :                 Some((available_lsn, guard))
     430              :             } else {
     431            0 :                 panic!("unexpected key type in slot");
     432              :             }
     433              :         } else {
     434      5630476 :             None
     435              :         }
     436      7427646 :     }
     437              : 
     438              :     ///
     439              :     /// Store an image of the given page in the cache.
     440              :     ///
     441      2343539 :     pub async fn memorize_materialized_page(
     442      2343539 :         &self,
     443      2343539 :         tenant_shard_id: TenantShardId,
     444      2343539 :         timeline_id: TimelineId,
     445      2343539 :         key: Key,
     446      2343539 :         lsn: Lsn,
     447      2343539 :         img: &[u8],
     448      2343539 :     ) -> anyhow::Result<()> {
     449      2343539 :         let cache_key = CacheKey::MaterializedPage {
     450      2343539 :             hash_key: MaterializedPageHashKey {
     451      2343539 :                 tenant_shard_id,
     452      2343539 :                 timeline_id,
     453      2343539 :                 key,
     454      2343539 :             },
     455      2343539 :             lsn,
     456      2343539 :         };
     457              : 
     458      2343539 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     459              :         loop {
     460              :             // First check if the key already exists in the cache.
     461      2343539 :             if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
     462              :                 // The page was found in the mapping. Lock the slot, and re-check
     463              :                 // that it's still what we expected (because we don't released the mapping
     464              :                 // lock already, another thread could have evicted the page)
     465            4 :                 let slot = &self.slots[slot_idx];
     466            4 :                 let inner = slot.inner.write().await;
     467            4 :                 if inner.key.as_ref() == Some(&cache_key) {
     468            4 :                     slot.inc_usage_count();
     469              :                     debug_assert!(
     470              :                         {
     471            4 :                             let guard = inner.permit.lock().unwrap();
     472            4 :                             guard.upgrade().is_none()
     473              :                         },
     474            0 :                         "we hold a write lock, so, no one else should have a permit"
     475              :                     );
     476            4 :                     debug_assert_eq!(inner.buf.len(), img.len());
     477              :                     // We already had it in cache. Another thread must've put it there
     478              :                     // concurrently. Check that it had the same contents that we
     479              :                     // replayed.
     480            4 :                     assert!(inner.buf == img);
     481            4 :                     return Ok(());
     482            0 :                 }
     483      2343535 :             }
     484      2343535 :             debug_assert!(permit.is_some());
     485              : 
     486              :             // Not found. Find a victim buffer
     487      2343535 :             let (slot_idx, mut inner) = self
     488      2343535 :                 .find_victim(permit.as_ref().unwrap())
     489            0 :                 .await
     490      2343535 :                 .context("Failed to find evict victim")?;
     491              : 
     492              :             // Insert mapping for this. At this point, we may find that another
     493              :             // thread did the same thing concurrently. In that case, we evicted
     494              :             // our victim buffer unnecessarily. Put it into the free list and
     495              :             // continue with the slot that the other thread chose.
     496      2343535 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
     497              :                 // TODO: put to free list
     498              : 
     499              :                 // We now just loop back to start from beginning. This is not
     500              :                 // optimal, we'll perform the lookup in the mapping again, which
     501              :                 // is not really necessary because we already got
     502              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     503              :                 // to matter much.
     504            0 :                 continue;
     505      2343535 :             }
     506      2343535 : 
     507      2343535 :             // Make the slot ready
     508      2343535 :             let slot = &self.slots[slot_idx];
     509      2343535 :             inner.key = Some(cache_key.clone());
     510      2343535 :             slot.set_usage_count(1);
     511              :             // Create a write guard for the slot so we go through the expected motions.
     512              :             debug_assert!(
     513              :                 {
     514      2343535 :                     let guard = inner.permit.lock().unwrap();
     515      2343535 :                     guard.upgrade().is_none()
     516              :                 },
     517            0 :                 "we hold a write lock, so, no one else should have a permit"
     518              :             );
     519      2343535 :             let mut write_guard = PageWriteGuard {
     520      2343535 :                 state: PageWriteGuardState::Invalid {
     521      2343535 :                     _permit: permit.take().unwrap(),
     522      2343535 :                     inner,
     523      2343535 :                 },
     524      2343535 :             };
     525      2343535 :             write_guard.copy_from_slice(img);
     526      2343535 :             let _ = write_guard.mark_valid();
     527      2343535 :             return Ok(());
     528              :         }
     529      2343539 :     }
     530              : 
     531              :     // Section 1.2: Public interface functions for working with immutable file pages.
     532              : 
     533    170484960 :     pub async fn read_immutable_buf(
     534    170484960 :         &self,
     535    170484960 :         file_id: FileId,
     536    170484960 :         blkno: u32,
     537    170484960 :         ctx: &RequestContext,
     538    170485035 :     ) -> anyhow::Result<ReadBufResult> {
     539    170485035 :         let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
     540    170485035 : 
     541    170485035 :         self.lock_for_read(&mut cache_key, ctx).await
     542    170485032 :     }
     543              : 
     544              :     //
     545              :     // Section 2: Internal interface functions for lookup/update.
     546              :     //
     547              :     // To add support for a new kind of "thing" to cache, you will need
     548              :     // to add public interface routines above, and code to deal with the
     549              :     // "mappings" after this section. But the routines in this section should
     550              :     // not require changes.
     551              : 
     552    180256220 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     553    180256220 :         match tokio::time::timeout(
     554    180256220 :             // Choose small timeout, neon_smgr does its own retries.
     555    180256220 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     556    180256220 :             Duration::from_secs(10),
     557    180256220 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     558    180256220 :         )
     559      1036947 :         .await
     560              :         {
     561    180256218 :             Ok(res) => Ok(PinnedSlotsPermit(
     562    180256218 :                 res.expect("this semaphore is never closed"),
     563    180256218 :             )),
     564            0 :             Err(_timeout) => {
     565            0 :                 crate::metrics::page_cache_errors_inc(
     566            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     567            0 :                 );
     568            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     569              :             }
     570              :         }
     571    180256218 :     }
     572              : 
     573              :     /// Look up a page in the cache.
     574              :     ///
     575              :     /// If the search criteria is not exact, *cache_key is updated with the key
     576              :     /// for exact key of the returned page. (For materialized pages, that means
     577              :     /// that the LSN in 'cache_key' is updated with the LSN of the returned page
     578              :     /// version.)
     579              :     ///
     580              :     /// If no page is found, returns None and *cache_key is left unmodified.
     581              :     ///
     582    177913362 :     async fn try_lock_for_read(
     583    177913362 :         &self,
     584    177913362 :         cache_key: &mut CacheKey,
     585    177913362 :         permit: &mut Option<PinnedSlotsPermit>,
     586    177913446 :     ) -> Option<PageReadGuard> {
     587    177913446 :         let cache_key_orig = cache_key.clone();
     588    177913446 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     589              :             // The page was found in the mapping. Lock the slot, and re-check
     590              :             // that it's still what we expected (because we released the mapping
     591              :             // lock already, another thread could have evicted the page)
     592    163478417 :             let slot = &self.slots[slot_idx];
     593    163478417 :             let inner = slot.inner.read().await;
     594    163478417 :             if inner.key.as_ref() == Some(cache_key) {
     595    163478412 :                 slot.inc_usage_count();
     596    163478412 :                 return Some(PageReadGuard {
     597    163478412 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     598    163478412 :                     slot_guard: inner,
     599    163478412 :                 });
     600            5 :             } else {
     601            5 :                 // search_mapping might have modified the search key; restore it.
     602            5 :                 *cache_key = cache_key_orig;
     603            5 :             }
     604     14435029 :         }
     605     14435034 :         None
     606    177913446 :     }
     607              : 
     608              :     /// Return a locked buffer for given block.
     609              :     ///
     610              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     611              :     /// page is already found in the cache, *cache_key is updated.
     612              :     ///
     613              :     /// If the page is not found in the cache, this allocates a new buffer for
     614              :     /// it. The caller may then initialize the buffer with the contents, and
     615              :     /// call mark_valid().
     616              :     ///
     617              :     /// Example usage:
     618              :     ///
     619              :     /// ```ignore
     620              :     /// let cache = page_cache::get();
     621              :     ///
     622              :     /// match cache.lock_for_read(&key) {
     623              :     ///     ReadBufResult::Found(read_guard) => {
     624              :     ///         // The page was found in cache. Use it
     625              :     ///     },
     626              :     ///     ReadBufResult::NotFound(write_guard) => {
     627              :     ///         // The page was not found in cache. Read it from disk into the
     628              :     ///         // buffer.
     629              :     ///         //read_my_page_from_disk(write_guard);
     630              :     ///
     631              :     ///         // The buffer contents are now valid. Tell the page cache.
     632              :     ///         write_guard.mark_valid();
     633              :     ///     },
     634              :     /// }
     635              :     /// ```
     636              :     ///
     637    170484960 :     async fn lock_for_read(
     638    170484960 :         &self,
     639    170484960 :         cache_key: &mut CacheKey,
     640    170484960 :         ctx: &RequestContext,
     641    170485035 :     ) -> anyhow::Result<ReadBufResult> {
     642    170485035 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     643              : 
     644    170485033 :         let (read_access, hit) = match cache_key {
     645              :             CacheKey::MaterializedPage { .. } => {
     646            0 :                 unreachable!("Materialized pages use lookup_materialized_page")
     647              :             }
     648    170485033 :             CacheKey::ImmutableFilePage { .. } => (
     649    170485033 :                 &crate::metrics::PAGE_CACHE
     650    170485033 :                     .for_ctx(ctx)
     651    170485033 :                     .read_accesses_immutable,
     652    170485033 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     653    170485033 :             ),
     654    170485033 :         };
     655    170485033 :         read_access.inc();
     656    170485033 : 
     657    170485033 :         let mut is_first_iteration = true;
     658              :         loop {
     659              :             // First check if the key already exists in the cache.
     660    170485800 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     661    161681242 :                 debug_assert!(permit.is_none());
     662    161681242 :                 if is_first_iteration {
     663    161680475 :                     hit.inc();
     664    161680475 :                 }
     665    161681242 :                 return Ok(ReadBufResult::Found(read_guard));
     666      8804558 :             }
     667      8804558 :             debug_assert!(permit.is_some());
     668      8804558 :             is_first_iteration = false;
     669              : 
     670              :             // Not found. Find a victim buffer
     671      8804558 :             let (slot_idx, mut inner) = self
     672      8804558 :                 .find_victim(permit.as_ref().unwrap())
     673            0 :                 .await
     674      8804558 :                 .context("Failed to find evict victim")?;
     675              : 
     676              :             // Insert mapping for this. At this point, we may find that another
     677              :             // thread did the same thing concurrently. In that case, we evicted
     678              :             // our victim buffer unnecessarily. Put it into the free list and
     679              :             // continue with the slot that the other thread chose.
     680      8804558 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     681              :                 // TODO: put to free list
     682              : 
     683              :                 // We now just loop back to start from beginning. This is not
     684              :                 // optimal, we'll perform the lookup in the mapping again, which
     685              :                 // is not really necessary because we already got
     686              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     687              :                 // to matter much.
     688          767 :                 continue;
     689      8803791 :             }
     690      8803791 : 
     691      8803791 :             // Make the slot ready
     692      8803791 :             let slot = &self.slots[slot_idx];
     693      8803791 :             inner.key = Some(cache_key.clone());
     694      8803791 :             slot.set_usage_count(1);
     695              : 
     696              :             debug_assert!(
     697              :                 {
     698      8803791 :                     let guard = inner.permit.lock().unwrap();
     699      8803791 :                     guard.upgrade().is_none()
     700              :                 },
     701            1 :                 "we hold a write lock, so, no one else should have a permit"
     702              :             );
     703              : 
     704      8803790 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     705      8803790 :                 state: PageWriteGuardState::Invalid {
     706      8803790 :                     _permit: permit.take().unwrap(),
     707      8803790 :                     inner,
     708      8803790 :                 },
     709      8803790 :             }));
     710              :         }
     711    170485032 :     }
     712              : 
     713              :     //
     714              :     // Section 3: Mapping functions
     715              :     //
     716              : 
     717              :     /// Search for a page in the cache using the given search key.
     718              :     ///
     719              :     /// Returns the slot index, if any. If the search criteria is not exact,
     720              :     /// *cache_key is updated with the actual key of the found page.
     721              :     ///
     722              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     723              :     /// get recycled for an unrelated page immediately after this function
     724              :     /// returns.  The caller is responsible for re-checking that the slot still
     725              :     /// contains the page with the same key before using it.
     726              :     ///
     727    177913362 :     fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
     728    177913362 :         match cache_key {
     729      7427635 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     730      7427635 :                 let map = self.materialized_page_map.read().unwrap();
     731      7427635 :                 let versions = map.get(hash_key)?;
     732              : 
     733      2251716 :                 let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
     734        76644 :                     Ok(version_idx) => version_idx,
     735           14 :                     Err(0) => return None,
     736      1720531 :                     Err(version_idx) => version_idx - 1,
     737              :                 };
     738      1797175 :                 let version = &versions[version_idx];
     739      1797175 :                 *lsn = version.lsn;
     740      1797175 :                 Some(version.slot_idx)
     741              :             }
     742    170485727 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     743    170485727 :                 let map = self.immutable_page_map.read().unwrap();
     744    170485727 :                 Some(*map.get(&(*file_id, *blkno))?)
     745              :             }
     746              :         }
     747    177913363 :     }
     748              : 
     749              :     /// Search for a page in the cache using the given search key.
     750              :     ///
     751              :     /// Like 'search_mapping, but performs an "exact" search. Used for
     752              :     /// allocating a new buffer.
     753      2343539 :     fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
     754      2343539 :         match key {
     755      2343539 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     756      2343539 :                 let map = self.materialized_page_map.read().unwrap();
     757      2343539 :                 let versions = map.get(hash_key)?;
     758              : 
     759       710634 :                 if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
     760            4 :                     Some(versions[version_idx].slot_idx)
     761              :                 } else {
     762       460727 :                     None
     763              :                 }
     764              :             }
     765            0 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     766            0 :                 let map = self.immutable_page_map.read().unwrap();
     767            0 :                 Some(*map.get(&(*file_id, *blkno))?)
     768              :             }
     769              :         }
     770      2343539 :     }
     771              : 
     772              :     ///
     773              :     /// Remove mapping for given key.
     774              :     ///
     775      8282048 :     fn remove_mapping(&self, old_key: &CacheKey) {
     776      8282048 :         match old_key {
     777              :             CacheKey::MaterializedPage {
     778      2173962 :                 hash_key: old_hash_key,
     779      2173962 :                 lsn: old_lsn,
     780      2173962 :             } => {
     781      2173962 :                 let mut map = self.materialized_page_map.write().unwrap();
     782      2173962 :                 if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
     783      2173962 :                     let versions = old_entry.get_mut();
     784              : 
     785      2741708 :                     if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
     786      2173962 :                         versions.remove(version_idx);
     787      2173962 :                         self.size_metrics
     788      2173962 :                             .current_bytes_materialized_page
     789      2173962 :                             .sub_page_sz(1);
     790      2173962 :                         if versions.is_empty() {
     791      1737282 :                             old_entry.remove_entry();
     792      1737282 :                         }
     793            0 :                     }
     794              :                 } else {
     795            0 :                     panic!("could not find old key in mapping")
     796              :                 }
     797              :             }
     798      6108086 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     799      6108086 :                 let mut map = self.immutable_page_map.write().unwrap();
     800      6108086 :                 map.remove(&(*file_id, *blkno))
     801      6108086 :                     .expect("could not find old key in mapping");
     802      6108086 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     803      6108086 :             }
     804              :         }
     805      8282048 :     }
     806              : 
     807              :     ///
     808              :     /// Insert mapping for given key.
     809              :     ///
     810              :     /// If a mapping already existed for the given key, returns the slot index
     811              :     /// of the existing mapping and leaves it untouched.
     812     11148092 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     813     11148092 :         match new_key {
     814              :             CacheKey::MaterializedPage {
     815      2343535 :                 hash_key: new_key,
     816      2343535 :                 lsn: new_lsn,
     817      2343535 :             } => {
     818      2343535 :                 let mut map = self.materialized_page_map.write().unwrap();
     819      2343535 :                 let versions = map.entry(new_key.clone()).or_default();
     820      2343535 :                 match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
     821            0 :                     Ok(version_idx) => Some(versions[version_idx].slot_idx),
     822      2343535 :                     Err(version_idx) => {
     823      2343535 :                         versions.insert(
     824      2343535 :                             version_idx,
     825      2343535 :                             Version {
     826      2343535 :                                 lsn: *new_lsn,
     827      2343535 :                                 slot_idx,
     828      2343535 :                             },
     829      2343535 :                         );
     830      2343535 :                         self.size_metrics
     831      2343535 :                             .current_bytes_materialized_page
     832      2343535 :                             .add_page_sz(1);
     833      2343535 :                         None
     834              :                     }
     835              :                 }
     836              :             }
     837              : 
     838      8804557 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     839      8804557 :                 let mut map = self.immutable_page_map.write().unwrap();
     840      8804557 :                 match map.entry((*file_id, *blkno)) {
     841          767 :                     Entry::Occupied(entry) => Some(*entry.get()),
     842      8803790 :                     Entry::Vacant(entry) => {
     843      8803790 :                         entry.insert(slot_idx);
     844      8803790 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     845      8803790 :                         None
     846              :                     }
     847              :                 }
     848              :             }
     849              :         }
     850     11148092 :     }
     851              : 
     852              :     //
     853              :     // Section 4: Misc internal helpers
     854              :     //
     855              : 
     856              :     /// Find a slot to evict.
     857              :     ///
     858              :     /// On return, the slot is empty and write-locked.
     859     11148092 :     async fn find_victim(
     860     11148092 :         &self,
     861     11148092 :         _permit_witness: &PinnedSlotsPermit,
     862     11148093 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     863     11148093 :         let iter_limit = self.slots.len() * 10;
     864     11148093 :         let mut iters = 0;
     865     36713176 :         loop {
     866     36713176 :             iters += 1;
     867     36713176 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     868     36713176 : 
     869     36713176 :             let slot = &self.slots[slot_idx];
     870     36713176 : 
     871     36713176 :             if slot.dec_usage_count() == 0 {
     872     11148094 :                 let mut inner = match slot.inner.try_write() {
     873     11148093 :                     Ok(inner) => inner,
     874            1 :                     Err(_err) => {
     875            1 :                         if iters > iter_limit {
     876              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     877              :                             // any particular number of iterations: other threads might race ahead and acquire and
     878              :                             // release pins just as we're scanning the array.
     879              :                             //
     880              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     881              :                             // slots. There are two threads running concurrently, A and B. A has just
     882              :                             // acquired the permit from the semaphore.
     883              :                             //
     884              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     885              :                             //   B: Acquire permit.
     886              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     887              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     888              :                             //   B: Release pin and permit again
     889              :                             //   B: Acquire permit.
     890              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     891              :                             //   B: Release pin and permit again
     892              :                             //
     893              :                             // Now we're back in the starting situation that both slots have
     894              :                             // usage_count 1, but A has now been through one iteration of the
     895              :                             // find_victim() loop. This can repeat indefinitely and on each
     896              :                             // iteration, A's iteration count increases by one.
     897              :                             //
     898              :                             // So, even though the semaphore for the permits is fair, the victim search
     899              :                             // itself happens in parallel and is not fair.
     900              :                             // Hence even with a permit, a task can theoretically be starved.
     901              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     902              :                             // permits for longer.
     903              :                             // Note that just yielding to tokio during iteration without such
     904              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     905              :                             // for B to bump usage count, further starving A.
     906            0 :                             page_cache_eviction_metrics::observe(
     907            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     908            0 :                                     iters: iters.try_into().unwrap(),
     909            0 :                                 },
     910            0 :                             );
     911            0 :                             anyhow::bail!("exceeded evict iter limit");
     912            1 :                         }
     913            1 :                         continue;
     914              :                     }
     915              :                 };
     916     11148093 :                 if let Some(old_key) = &inner.key {
     917      8282048 :                     // remove mapping for old buffer
     918      8282048 :                     self.remove_mapping(old_key);
     919      8282048 :                     inner.key = None;
     920      8282048 :                     page_cache_eviction_metrics::observe(
     921      8282048 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     922      8282048 :                             iters: iters.try_into().unwrap(),
     923      8282048 :                         },
     924      8282048 :                     );
     925      8282048 :                 } else {
     926      2866045 :                     page_cache_eviction_metrics::observe(
     927      2866045 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     928      2866045 :                             iters: iters.try_into().unwrap(),
     929      2866045 :                         },
     930      2866045 :                     );
     931      2866045 :                 }
     932     11148093 :                 return Ok((slot_idx, inner));
     933     25565082 :             }
     934              :         }
     935     11148093 :     }
     936              : 
     937              :     /// Initialize a new page cache
     938              :     ///
     939              :     /// This should be called only once at page server startup.
     940          698 :     fn new(num_pages: usize) -> Self {
     941          698 :         assert!(num_pages > 0, "page cache size must be > 0");
     942              : 
     943              :         // We could use Vec::leak here, but that potentially also leaks
     944              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     945              :         // this is avoided.
     946          698 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     947          698 : 
     948          698 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     949          698 :         size_metrics.max_bytes.set_page_sz(num_pages);
     950          698 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     951          698 :         size_metrics.current_bytes_materialized_page.set_page_sz(0);
     952          698 : 
     953          698 :         let slots = page_buffer
     954          698 :             .chunks_exact_mut(PAGE_SZ)
     955      5092264 :             .map(|chunk| {
     956      5092264 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     957      5092264 : 
     958      5092264 :                 Slot {
     959      5092264 :                     inner: tokio::sync::RwLock::new(SlotInner {
     960      5092264 :                         key: None,
     961      5092264 :                         buf,
     962      5092264 :                         permit: std::sync::Mutex::new(Weak::new()),
     963      5092264 :                     }),
     964      5092264 :                     usage_count: AtomicU8::new(0),
     965      5092264 :                 }
     966      5092264 :             })
     967          698 :             .collect();
     968          698 : 
     969          698 :         Self {
     970          698 :             materialized_page_map: Default::default(),
     971          698 :             immutable_page_map: Default::default(),
     972          698 :             slots,
     973          698 :             next_evict_slot: AtomicUsize::new(0),
     974          698 :             size_metrics,
     975          698 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     976          698 :         }
     977          698 :     }
     978              : }
     979              : 
     980              : trait PageSzBytesMetric {
     981              :     fn set_page_sz(&self, count: usize);
     982              :     fn add_page_sz(&self, count: usize);
     983              :     fn sub_page_sz(&self, count: usize);
     984              : }
     985              : 
     986              : #[inline(always)]
     987     19431467 : fn count_times_page_sz(count: usize) -> u64 {
     988     19431467 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     989     19431467 : }
     990              : 
     991              : impl PageSzBytesMetric for metrics::UIntGauge {
     992         2094 :     fn set_page_sz(&self, count: usize) {
     993         2094 :         self.set(count_times_page_sz(count));
     994         2094 :     }
     995     11147325 :     fn add_page_sz(&self, count: usize) {
     996     11147325 :         self.add(count_times_page_sz(count));
     997     11147325 :     }
     998      8282048 :     fn sub_page_sz(&self, count: usize) {
     999      8282048 :         self.sub(count_times_page_sz(count));
    1000      8282048 :     }
    1001              : }
        

Generated by: LCOV version 2.1-beta