LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 91.4 % 463 423
Test Date: 2024-02-07 07:37:29 Functions: 91.1 % 56 51

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Materialized pages**, filled & used by page reconstruction
      21              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      22              : //!
      23              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      24              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      25              : //!
      26              : //! # Filling The Page Cache
      27              : //!
      28              : //! Page cache maps from a cache key to a buffer slot.
      29              : //! The cache key uniquely identifies the piece of data that is being cached.
      30              : //!
      31              : //! The cache key for **materialized pages** is  [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
      32              : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
      33              : //!
      34              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      35              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      36              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      37              : //! * Get a [`FileId`] using [`next_file_id`].
      38              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      39              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      40              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      41              : //!   a read guard for the page. Just use it.
      42              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      43              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      44              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      45              : //!   Then try again to [`PageCache::read_immutable_buf`].
      46              : //!   Unless there's high cache pressure, the page should now be cached.
      47              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      48              : //!
      49              : //! # Locking
      50              : //!
      51              : //! There are two levels of locking involved: There's one lock for the "mapping"
      52              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      53              : //! slot, and a separate lock on each slot. To read or write the contents of a
      54              : //! slot, you must hold the lock on the slot in read or write mode,
      55              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      56              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      57              : //! the slot at the same time.
      58              : //!
      59              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      60              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      61              : //! in the cache, you would first look up the mapping, while holding the mapping
      62              : //! lock, and then lock the slot. You must release the mapping lock in between,
      63              : //! to obey the lock ordering and avoid deadlock.
      64              : //!
      65              : //! A slot can momentarily have invalid contents, even if it's already been
      66              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      67              : //! the contents are valid. If you need to release the lock without initializing
      68              : //! the contents, you must remove the mapping first. We make that easy for the
      69              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      70              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      71              : //! mapping is automatically removed and the slot is marked free.
      72              : //!
      73              : 
      74              : use std::{
      75              :     collections::{hash_map::Entry, HashMap},
      76              :     convert::TryInto,
      77              :     sync::{
      78              :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      79              :         Arc, Weak,
      80              :     },
      81              :     time::Duration,
      82              : };
      83              : 
      84              : use anyhow::Context;
      85              : use once_cell::sync::OnceCell;
      86              : use pageserver_api::shard::TenantShardId;
      87              : use utils::{id::TimelineId, lsn::Lsn};
      88              : 
      89              : use crate::{
      90              :     context::RequestContext,
      91              :     metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
      92              :     repository::Key,
      93              : };
      94              : 
      95              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      96              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      97              : 
      98              : ///
      99              : /// Initialize the page cache. This must be called once at page server startup.
     100              : ///
     101          604 : pub fn init(size: usize) {
     102          604 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
     103            0 :         panic!("page cache already initialized");
     104          604 :     }
     105          604 : }
     106              : 
     107              : ///
     108              : /// Get a handle to the page cache.
     109              : ///
     110    195785441 : pub fn get() -> &'static PageCache {
     111    195785441 :     //
     112    195785441 :     // In unit tests, page server startup doesn't happen and no one calls
     113    195785441 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     114    195785441 :     // page cache is usable in unit tests.
     115    195785441 :     //
     116    195785441 :     if cfg!(test) {
     117      3336990 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     118              :     } else {
     119    192448451 :         PAGE_CACHE.get().expect("page cache not initialized")
     120              :     }
     121    195785441 : }
     122              : 
     123              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     124              : const MAX_USAGE_COUNT: u8 = 5;
     125              : 
     126              : /// See module-level comment.
     127    358902761 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     128              : pub struct FileId(u64);
     129              : 
     130              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     131              : 
     132              : /// See module-level comment.
     133        40122 : pub fn next_file_id() -> FileId {
     134        40122 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     135        40122 : }
     136              : 
     137              : ///
     138              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     139              : ///
     140    205800103 : #[derive(Debug, PartialEq, Eq, Clone)]
     141              : #[allow(clippy::enum_variant_names)]
     142              : enum CacheKey {
     143              :     MaterializedPage {
     144              :         hash_key: MaterializedPageHashKey,
     145              :         lsn: Lsn,
     146              :     },
     147              :     ImmutableFilePage {
     148              :         file_id: FileId,
     149              :         blkno: u32,
     150              :     },
     151              : }
     152              : 
     153     17013914 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
     154              : struct MaterializedPageHashKey {
     155              :     /// Why is this TenantShardId rather than TenantId?
     156              :     ///
     157              :     /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant.  However, this
     158              :     /// this not the case for certain internally-generated pages (e.g. relation sizes).  In future, we may make this
     159              :     /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
     160              :     /// special-cased in some other way.
     161              :     tenant_shard_id: TenantShardId,
     162              :     timeline_id: TimelineId,
     163              :     key: Key,
     164              : }
     165              : 
     166            0 : #[derive(Clone)]
     167              : struct Version {
     168              :     lsn: Lsn,
     169              :     slot_idx: usize,
     170              : }
     171              : 
     172              : struct Slot {
     173              :     inner: tokio::sync::RwLock<SlotInner>,
     174              :     usage_count: AtomicU8,
     175              : }
     176              : 
     177              : struct SlotInner {
     178              :     key: Option<CacheKey>,
     179              :     // for `coalesce_readers_permit`
     180              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     181              :     buf: &'static mut [u8; PAGE_SZ],
     182              : }
     183              : 
     184              : impl Slot {
     185              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     186    176065896 :     fn inc_usage_count(&self) {
     187    176065896 :         let _ = self
     188    176065896 :             .usage_count
     189    176067664 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     190    176067664 :                 if val == MAX_USAGE_COUNT {
     191    153568314 :                     None
     192              :                 } else {
     193     22499350 :                     Some(val + 1)
     194              :                 }
     195    176067664 :             });
     196    176065896 :     }
     197              : 
     198              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     199              :     /// the old usage count.
     200     41696902 :     fn dec_usage_count(&self) -> u8 {
     201     41696902 :         let count_res =
     202     41696902 :             self.usage_count
     203     41696902 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     204     41696889 :                     if val == 0 {
     205     12871934 :                         None
     206              :                     } else {
     207     28824955 :                         Some(val - 1)
     208              :                     }
     209     41696902 :                 });
     210     41696902 : 
     211     41696902 :         match count_res {
     212     28824963 :             Ok(usage_count) => usage_count,
     213     12871939 :             Err(usage_count) => usage_count,
     214              :         }
     215     41696902 :     }
     216              : 
     217              :     /// Sets the usage count to a specific value.
     218     12871093 :     fn set_usage_count(&self, count: u8) {
     219     12871093 :         self.usage_count.store(count, Ordering::Relaxed);
     220     12871093 :     }
     221              : }
     222              : 
     223              : impl SlotInner {
     224              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     225    176065741 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     226    176065741 :         let mut guard = self.permit.lock().unwrap();
     227    176065741 :         if let Some(existing_permit) = guard.upgrade() {
     228       167940 :             drop(guard);
     229       167940 :             drop(permit);
     230       167940 :             existing_permit
     231              :         } else {
     232    175897801 :             let permit = Arc::new(permit);
     233    175897801 :             *guard = Arc::downgrade(&permit);
     234    175897801 :             permit
     235              :         }
     236    176065741 :     }
     237              : }
     238              : 
     239              : pub struct PageCache {
     240              :     /// This contains the mapping from the cache key to buffer slot that currently
     241              :     /// contains the page, if any.
     242              :     ///
     243              :     /// TODO: This is protected by a single lock. If that becomes a bottleneck,
     244              :     /// this HashMap can be replaced with a more concurrent version, there are
     245              :     /// plenty of such crates around.
     246              :     ///
     247              :     /// If you add support for caching different kinds of objects, each object kind
     248              :     /// can have a separate mapping map, next to this field.
     249              :     materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
     250              : 
     251              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     252              : 
     253              :     /// The actual buffers with their metadata.
     254              :     slots: Box<[Slot]>,
     255              : 
     256              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     257              : 
     258              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     259              :     /// This is interpreted modulo the page cache size.
     260              :     next_evict_slot: AtomicUsize,
     261              : 
     262              :     size_metrics: &'static PageCacheSizeMetrics,
     263              : }
     264              : 
     265              : struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
     266              : 
     267              : ///
     268              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     269              : /// until the guard is dropped.
     270              : ///
     271              : pub struct PageReadGuard<'i> {
     272              :     _permit: Arc<PinnedSlotsPermit>,
     273              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     274              : }
     275              : 
     276              : impl std::ops::Deref for PageReadGuard<'_> {
     277              :     type Target = [u8; PAGE_SZ];
     278              : 
     279    356745121 :     fn deref(&self) -> &Self::Target {
     280    356745121 :         self.slot_guard.buf
     281    356745121 :     }
     282              : }
     283              : 
     284              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     285            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     286            0 :         self.slot_guard.buf
     287            0 :     }
     288              : }
     289              : 
     290              : ///
     291              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     292              : /// until the guard is dropped.
     293              : ///
     294              : /// Counterintuitively, this is used even for a read, if the requested page is not
     295              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     296              : /// is expected to fill in the page contents and call mark_valid().
     297              : pub struct PageWriteGuard<'i> {
     298              :     state: PageWriteGuardState<'i>,
     299              : }
     300              : 
     301              : enum PageWriteGuardState<'i> {
     302              :     Invalid {
     303              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     304              :         _permit: PinnedSlotsPermit,
     305              :     },
     306              :     Downgraded,
     307              : }
     308              : 
     309              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     310     12871093 :     fn deref_mut(&mut self) -> &mut Self::Target {
     311     12871093 :         match &mut self.state {
     312     12871093 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     313            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     314              :         }
     315     12871093 :     }
     316              : }
     317              : 
     318              : impl std::ops::Deref for PageWriteGuard<'_> {
     319              :     type Target = [u8; PAGE_SZ];
     320              : 
     321     23654036 :     fn deref(&self) -> &Self::Target {
     322     23654036 :         match &self.state {
     323     23654036 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     324            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     325              :         }
     326     23654036 :     }
     327              : }
     328              : 
     329              : impl<'a> PageWriteGuard<'a> {
     330              :     /// Mark that the buffer contents are now valid.
     331              :     #[must_use]
     332     12871091 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     333     12871091 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     334     12871091 :         match prev {
     335     12871091 :             PageWriteGuardState::Invalid { inner, _permit } => {
     336     12871091 :                 assert!(inner.key.is_some());
     337     12871091 :                 PageReadGuard {
     338     12871091 :                     _permit: Arc::new(_permit),
     339     12871091 :                     slot_guard: inner.downgrade(),
     340     12871091 :                 }
     341              :             }
     342            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     343              :         }
     344     12871091 :     }
     345              : }
     346              : 
     347              : impl Drop for PageWriteGuard<'_> {
     348              :     ///
     349              :     /// If the buffer was allocated for a page that was not already in the
     350              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     351              :     /// initializing it, remove the mapping from the page cache.
     352              :     ///
     353     12871076 :     fn drop(&mut self) {
     354     12871076 :         match &mut self.state {
     355            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     356            0 :                 assert!(inner.key.is_some());
     357            0 :                 let self_key = inner.key.as_ref().unwrap();
     358            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     359            0 :                 inner.key = None;
     360              :             }
     361     12871076 :             PageWriteGuardState::Downgraded => {}
     362              :         }
     363     12871076 :     }
     364              : }
     365              : 
     366              : /// lock_for_read() return value
     367              : pub enum ReadBufResult<'a> {
     368              :     Found(PageReadGuard<'a>),
     369              :     NotFound(PageWriteGuard<'a>),
     370              : }
     371              : 
     372              : impl PageCache {
     373              :     //
     374              :     // Section 1.1: Public interface functions for looking up and memorizing materialized page
     375              :     // versions in the page cache
     376              :     //
     377              : 
     378              :     /// Look up a materialized page version.
     379              :     ///
     380              :     /// The 'lsn' is an upper bound, this will return the latest version of
     381              :     /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
     382              :     /// returned page.
     383      8625194 :     pub async fn lookup_materialized_page(
     384      8625194 :         &self,
     385      8625194 :         tenant_shard_id: TenantShardId,
     386      8625194 :         timeline_id: TimelineId,
     387      8625194 :         key: &Key,
     388      8625194 :         lsn: Lsn,
     389      8625194 :         ctx: &RequestContext,
     390      8625194 :     ) -> Option<(Lsn, PageReadGuard)> {
     391      8625192 :         let Ok(permit) = self.try_get_pinned_slot_permit().await else {
     392            0 :             return None;
     393              :         };
     394              : 
     395      8625192 :         crate::metrics::PAGE_CACHE
     396      8625192 :             .for_ctx(ctx)
     397      8625192 :             .read_accesses_materialized_page
     398      8625192 :             .inc();
     399      8625192 : 
     400      8625192 :         let mut cache_key = CacheKey::MaterializedPage {
     401      8625192 :             hash_key: MaterializedPageHashKey {
     402      8625192 :                 tenant_shard_id,
     403      8625192 :                 timeline_id,
     404      8625192 :                 key: *key,
     405      8625192 :             },
     406      8625192 :             lsn,
     407      8625192 :         };
     408              : 
     409      8625192 :         if let Some(guard) = self
     410      8625192 :             .try_lock_for_read(&mut cache_key, &mut Some(permit))
     411         1375 :             .await
     412              :         {
     413              :             if let CacheKey::MaterializedPage {
     414              :                 hash_key: _,
     415      1776743 :                 lsn: available_lsn,
     416      1776743 :             } = cache_key
     417              :             {
     418      1776743 :                 if available_lsn == lsn {
     419           17 :                     crate::metrics::PAGE_CACHE
     420           17 :                         .for_ctx(ctx)
     421           17 :                         .read_hits_materialized_page_exact
     422           17 :                         .inc();
     423      1776726 :                 } else {
     424      1776726 :                     crate::metrics::PAGE_CACHE
     425      1776726 :                         .for_ctx(ctx)
     426      1776726 :                         .read_hits_materialized_page_older_lsn
     427      1776726 :                         .inc();
     428      1776726 :                 }
     429      1776743 :                 Some((available_lsn, guard))
     430              :             } else {
     431            0 :                 panic!("unexpected key type in slot");
     432              :             }
     433              :         } else {
     434      6848449 :             None
     435              :         }
     436      8625192 :     }
     437              : 
     438              :     ///
     439              :     /// Store an image of the given page in the cache.
     440              :     ///
     441      2857268 :     pub async fn memorize_materialized_page(
     442      2857268 :         &self,
     443      2857268 :         tenant_shard_id: TenantShardId,
     444      2857268 :         timeline_id: TimelineId,
     445      2857268 :         key: Key,
     446      2857268 :         lsn: Lsn,
     447      2857268 :         img: &[u8],
     448      2857268 :     ) -> anyhow::Result<()> {
     449      2857268 :         let cache_key = CacheKey::MaterializedPage {
     450      2857268 :             hash_key: MaterializedPageHashKey {
     451      2857268 :                 tenant_shard_id,
     452      2857268 :                 timeline_id,
     453      2857268 :                 key,
     454      2857268 :             },
     455      2857268 :             lsn,
     456      2857268 :         };
     457              : 
     458      2857268 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     459              :         loop {
     460              :             // First check if the key already exists in the cache.
     461      2857276 :             if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
     462              :                 // The page was found in the mapping. Lock the slot, and re-check
     463              :                 // that it's still what we expected (because we don't released the mapping
     464              :                 // lock already, another thread could have evicted the page)
     465          155 :                 let slot = &self.slots[slot_idx];
     466          155 :                 let inner = slot.inner.write().await;
     467          155 :                 if inner.key.as_ref() == Some(&cache_key) {
     468          155 :                     slot.inc_usage_count();
     469              :                     debug_assert!(
     470              :                         {
     471          155 :                             let guard = inner.permit.lock().unwrap();
     472          155 :                             guard.upgrade().is_none()
     473              :                         },
     474            0 :                         "we hold a write lock, so, no one else should have a permit"
     475              :                     );
     476          155 :                     debug_assert_eq!(inner.buf.len(), img.len());
     477              :                     // We already had it in cache. Another thread must've put it there
     478              :                     // concurrently. Check that it had the same contents that we
     479              :                     // replayed.
     480          155 :                     assert!(inner.buf == img);
     481          155 :                     return Ok(());
     482            0 :                 }
     483      2857121 :             }
     484      2857121 :             debug_assert!(permit.is_some());
     485              : 
     486              :             // Not found. Find a victim buffer
     487      2857121 :             let (slot_idx, mut inner) = self
     488      2857121 :                 .find_victim(permit.as_ref().unwrap())
     489            0 :                 .await
     490      2857121 :                 .context("Failed to find evict victim")?;
     491              : 
     492              :             // Insert mapping for this. At this point, we may find that another
     493              :             // thread did the same thing concurrently. In that case, we evicted
     494              :             // our victim buffer unnecessarily. Put it into the free list and
     495              :             // continue with the slot that the other thread chose.
     496      2857121 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
     497              :                 // TODO: put to free list
     498              : 
     499              :                 // We now just loop back to start from beginning. This is not
     500              :                 // optimal, we'll perform the lookup in the mapping again, which
     501              :                 // is not really necessary because we already got
     502              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     503              :                 // to matter much.
     504            8 :                 continue;
     505      2857113 :             }
     506      2857113 : 
     507      2857113 :             // Make the slot ready
     508      2857113 :             let slot = &self.slots[slot_idx];
     509      2857113 :             inner.key = Some(cache_key.clone());
     510      2857113 :             slot.set_usage_count(1);
     511              :             // Create a write guard for the slot so we go through the expected motions.
     512              :             debug_assert!(
     513              :                 {
     514      2857113 :                     let guard = inner.permit.lock().unwrap();
     515      2857113 :                     guard.upgrade().is_none()
     516              :                 },
     517            0 :                 "we hold a write lock, so, no one else should have a permit"
     518              :             );
     519      2857113 :             let mut write_guard = PageWriteGuard {
     520      2857113 :                 state: PageWriteGuardState::Invalid {
     521      2857113 :                     _permit: permit.take().unwrap(),
     522      2857113 :                     inner,
     523      2857113 :                 },
     524      2857113 :             };
     525      2857113 :             write_guard.copy_from_slice(img);
     526      2857113 :             let _ = write_guard.mark_valid();
     527      2857113 :             return Ok(());
     528              :         }
     529      2857268 :     }
     530              : 
     531              :     // Section 1.2: Public interface functions for working with immutable file pages.
     532              : 
     533    184302979 :     pub async fn read_immutable_buf(
     534    184302979 :         &self,
     535    184302979 :         file_id: FileId,
     536    184302979 :         blkno: u32,
     537    184302979 :         ctx: &RequestContext,
     538    184302979 :     ) -> anyhow::Result<ReadBufResult> {
     539    184302937 :         let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
     540    184302937 : 
     541    184302937 :         self.lock_for_read(&mut cache_key, ctx).await
     542    184302933 :     }
     543              : 
     544              :     //
     545              :     // Section 2: Internal interface functions for lookup/update.
     546              :     //
     547              :     // To add support for a new kind of "thing" to cache, you will need
     548              :     // to add public interface routines above, and code to deal with the
     549              :     // "mappings" after this section. But the routines in this section should
     550              :     // not require changes.
     551              : 
     552    195785440 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     553    195785396 :         match tokio::time::timeout(
     554    195785396 :             // Choose small timeout, neon_smgr does its own retries.
     555    195785396 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     556    195785396 :             Duration::from_secs(10),
     557    195785396 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     558    195785396 :         )
     559      1080486 :         .await
     560              :         {
     561    195785395 :             Ok(res) => Ok(PinnedSlotsPermit(
     562    195785395 :                 res.expect("this semaphore is never closed"),
     563    195785395 :             )),
     564            0 :             Err(_timeout) => {
     565            0 :                 crate::metrics::page_cache_errors_inc(
     566            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     567            0 :                 );
     568            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     569              :             }
     570              :         }
     571    195785395 :     }
     572              : 
     573              :     /// Look up a page in the cache.
     574              :     ///
     575              :     /// If the search criteria is not exact, *cache_key is updated with the key
     576              :     /// for exact key of the returned page. (For materialized pages, that means
     577              :     /// that the LSN in 'cache_key' is updated with the LSN of the returned page
     578              :     /// version.)
     579              :     ///
     580              :     /// If no page is found, returns None and *cache_key is left unmodified.
     581              :     ///
     582    192929010 :     async fn try_lock_for_read(
     583    192929010 :         &self,
     584    192929010 :         cache_key: &mut CacheKey,
     585    192929010 :         permit: &mut Option<PinnedSlotsPermit>,
     586    192929010 :     ) -> Option<PageReadGuard> {
     587    192928964 :         let cache_key_orig = cache_key.clone();
     588    192928964 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     589              :             // The page was found in the mapping. Lock the slot, and re-check
     590              :             // that it's still what we expected (because we released the mapping
     591              :             // lock already, another thread could have evicted the page)
     592    176065706 :             let slot = &self.slots[slot_idx];
     593    176065706 :             let inner = slot.inner.read().await;
     594    176065706 :             if inner.key.as_ref() == Some(cache_key) {
     595    176065702 :                 slot.inc_usage_count();
     596    176065702 :                 return Some(PageReadGuard {
     597    176065702 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     598    176065702 :                     slot_guard: inner,
     599    176065702 :                 });
     600            4 :             } else {
     601            4 :                 // search_mapping might have modified the search key; restore it.
     602            4 :                 *cache_key = cache_key_orig;
     603            4 :             }
     604     16863258 :         }
     605     16863262 :         None
     606    192928964 :     }
     607              : 
     608              :     /// Return a locked buffer for given block.
     609              :     ///
     610              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     611              :     /// page is already found in the cache, *cache_key is updated.
     612              :     ///
     613              :     /// If the page is not found in the cache, this allocates a new buffer for
     614              :     /// it. The caller may then initialize the buffer with the contents, and
     615              :     /// call mark_valid().
     616              :     ///
     617              :     /// Example usage:
     618              :     ///
     619              :     /// ```ignore
     620              :     /// let cache = page_cache::get();
     621              :     ///
     622              :     /// match cache.lock_for_read(&key) {
     623              :     ///     ReadBufResult::Found(read_guard) => {
     624              :     ///         // The page was found in cache. Use it
     625              :     ///     },
     626              :     ///     ReadBufResult::NotFound(write_guard) => {
     627              :     ///         // The page was not found in cache. Read it from disk into the
     628              :     ///         // buffer.
     629              :     ///         //read_my_page_from_disk(write_guard);
     630              :     ///
     631              :     ///         // The buffer contents are now valid. Tell the page cache.
     632              :     ///         write_guard.mark_valid();
     633              :     ///     },
     634              :     /// }
     635              :     /// ```
     636              :     ///
     637    184302979 :     async fn lock_for_read(
     638    184302979 :         &self,
     639    184302979 :         cache_key: &mut CacheKey,
     640    184302979 :         ctx: &RequestContext,
     641    184302979 :     ) -> anyhow::Result<ReadBufResult> {
     642    184302936 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     643              : 
     644    184302935 :         let (read_access, hit) = match cache_key {
     645              :             CacheKey::MaterializedPage { .. } => {
     646            0 :                 unreachable!("Materialized pages use lookup_materialized_page")
     647              :             }
     648    184302935 :             CacheKey::ImmutableFilePage { .. } => (
     649    184302935 :                 &crate::metrics::PAGE_CACHE
     650    184302935 :                     .for_ctx(ctx)
     651    184302935 :                     .read_accesses_immutable,
     652    184302935 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     653    184302935 :             ),
     654    184302935 :         };
     655    184302935 :         read_access.inc();
     656    184302935 : 
     657    184302935 :         let mut is_first_iteration = true;
     658              :         loop {
     659              :             // First check if the key already exists in the cache.
     660    184303773 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     661    174288959 :                 debug_assert!(permit.is_none());
     662    174288959 :                 if is_first_iteration {
     663    174288121 :                     hit.inc();
     664    174288121 :                 }
     665    174288959 :                 return Ok(ReadBufResult::Found(read_guard));
     666     10014813 :             }
     667     10014813 :             debug_assert!(permit.is_some());
     668     10014813 :             is_first_iteration = false;
     669              : 
     670              :             // Not found. Find a victim buffer
     671     10014813 :             let (slot_idx, mut inner) = self
     672     10014813 :                 .find_victim(permit.as_ref().unwrap())
     673            0 :                 .await
     674     10014812 :                 .context("Failed to find evict victim")?;
     675              : 
     676              :             // Insert mapping for this. At this point, we may find that another
     677              :             // thread did the same thing concurrently. In that case, we evicted
     678              :             // our victim buffer unnecessarily. Put it into the free list and
     679              :             // continue with the slot that the other thread chose.
     680     10014812 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     681              :                 // TODO: put to free list
     682              : 
     683              :                 // We now just loop back to start from beginning. This is not
     684              :                 // optimal, we'll perform the lookup in the mapping again, which
     685              :                 // is not really necessary because we already got
     686              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     687              :                 // to matter much.
     688          838 :                 continue;
     689     10013974 :             }
     690     10013974 : 
     691     10013974 :             // Make the slot ready
     692     10013974 :             let slot = &self.slots[slot_idx];
     693     10013974 :             inner.key = Some(cache_key.clone());
     694     10013974 :             slot.set_usage_count(1);
     695              : 
     696              :             debug_assert!(
     697              :                 {
     698     10013974 :                     let guard = inner.permit.lock().unwrap();
     699     10013974 :                     guard.upgrade().is_none()
     700              :                 },
     701            0 :                 "we hold a write lock, so, no one else should have a permit"
     702              :             );
     703              : 
     704     10013974 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     705     10013974 :                 state: PageWriteGuardState::Invalid {
     706     10013974 :                     _permit: permit.take().unwrap(),
     707     10013974 :                     inner,
     708     10013974 :                 },
     709     10013974 :             }));
     710              :         }
     711    184302933 :     }
     712              : 
     713              :     //
     714              :     // Section 3: Mapping functions
     715              :     //
     716              : 
     717              :     /// Search for a page in the cache using the given search key.
     718              :     ///
     719              :     /// Returns the slot index, if any. If the search criteria is not exact,
     720              :     /// *cache_key is updated with the actual key of the found page.
     721              :     ///
     722              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     723              :     /// get recycled for an unrelated page immediately after this function
     724              :     /// returns.  The caller is responsible for re-checking that the slot still
     725              :     /// contains the page with the same key before using it.
     726              :     ///
     727    192929010 :     fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
     728    192929010 :         match cache_key {
     729      8625194 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     730      8625194 :                 let map = self.materialized_page_map.read().unwrap();
     731      8625194 :                 let versions = map.get(hash_key)?;
     732              : 
     733      2214950 :                 let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
     734           17 :                     Ok(version_idx) => version_idx,
     735          460 :                     Err(0) => return None,
     736      1776730 :                     Err(version_idx) => version_idx - 1,
     737              :                 };
     738      1776747 :                 let version = &versions[version_idx];
     739      1776747 :                 *lsn = version.lsn;
     740      1776747 :                 Some(version.slot_idx)
     741              :             }
     742    184303816 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     743    184303816 :                 let map = self.immutable_page_map.read().unwrap();
     744    184303816 :                 Some(*map.get(&(*file_id, *blkno))?)
     745              :             }
     746              :         }
     747    192929010 :     }
     748              : 
     749              :     /// Search for a page in the cache using the given search key.
     750              :     ///
     751              :     /// Like 'search_mapping, but performs an "exact" search. Used for
     752              :     /// allocating a new buffer.
     753      2857276 :     fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
     754      2857276 :         match key {
     755      2857276 :             CacheKey::MaterializedPage { hash_key, lsn } => {
     756      2857276 :                 let map = self.materialized_page_map.read().unwrap();
     757      2857276 :                 let versions = map.get(hash_key)?;
     758              : 
     759       678017 :                 if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
     760          155 :                     Some(versions[version_idx].slot_idx)
     761              :                 } else {
     762       438637 :                     None
     763              :                 }
     764              :             }
     765            0 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     766            0 :                 let map = self.immutable_page_map.read().unwrap();
     767            0 :                 Some(*map.get(&(*file_id, *blkno))?)
     768              :             }
     769              :         }
     770      2857276 :     }
     771              : 
     772              :     ///
     773              :     /// Remove mapping for given key.
     774              :     ///
     775     10020320 :     fn remove_mapping(&self, old_key: &CacheKey) {
     776     10020320 :         match old_key {
     777              :             CacheKey::MaterializedPage {
     778      2674486 :                 hash_key: old_hash_key,
     779      2674486 :                 lsn: old_lsn,
     780      2674486 :             } => {
     781      2674486 :                 let mut map = self.materialized_page_map.write().unwrap();
     782      2674486 :                 if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
     783      2674486 :                     let versions = old_entry.get_mut();
     784              : 
     785      3216787 :                     if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
     786      2674486 :                         versions.remove(version_idx);
     787      2674486 :                         self.size_metrics
     788      2674486 :                             .current_bytes_materialized_page
     789      2674486 :                             .sub_page_sz(1);
     790      2674486 :                         if versions.is_empty() {
     791      2258724 :                             old_entry.remove_entry();
     792      2258724 :                         }
     793            0 :                     }
     794              :                 } else {
     795            0 :                     panic!("could not find old key in mapping")
     796              :                 }
     797              :             }
     798      7345834 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     799      7345834 :                 let mut map = self.immutable_page_map.write().unwrap();
     800      7345834 :                 map.remove(&(*file_id, *blkno))
     801      7345834 :                     .expect("could not find old key in mapping");
     802      7345834 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     803      7345834 :             }
     804              :         }
     805     10020320 :     }
     806              : 
     807              :     ///
     808              :     /// Insert mapping for given key.
     809              :     ///
     810              :     /// If a mapping already existed for the given key, returns the slot index
     811              :     /// of the existing mapping and leaves it untouched.
     812     12871939 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     813     12871939 :         match new_key {
     814              :             CacheKey::MaterializedPage {
     815      2857121 :                 hash_key: new_key,
     816      2857121 :                 lsn: new_lsn,
     817      2857121 :             } => {
     818      2857121 :                 let mut map = self.materialized_page_map.write().unwrap();
     819      2857121 :                 let versions = map.entry(new_key.clone()).or_default();
     820      2857121 :                 match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
     821            8 :                     Ok(version_idx) => Some(versions[version_idx].slot_idx),
     822      2857113 :                     Err(version_idx) => {
     823      2857113 :                         versions.insert(
     824      2857113 :                             version_idx,
     825      2857113 :                             Version {
     826      2857113 :                                 lsn: *new_lsn,
     827      2857113 :                                 slot_idx,
     828      2857113 :                             },
     829      2857113 :                         );
     830      2857113 :                         self.size_metrics
     831      2857113 :                             .current_bytes_materialized_page
     832      2857113 :                             .add_page_sz(1);
     833      2857113 :                         None
     834              :                     }
     835              :                 }
     836              :             }
     837              : 
     838     10014818 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     839     10014818 :                 let mut map = self.immutable_page_map.write().unwrap();
     840     10014818 :                 match map.entry((*file_id, *blkno)) {
     841          838 :                     Entry::Occupied(entry) => Some(*entry.get()),
     842     10013980 :                     Entry::Vacant(entry) => {
     843     10013980 :                         entry.insert(slot_idx);
     844     10013980 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     845     10013980 :                         None
     846              :                     }
     847              :                 }
     848              :             }
     849              :         }
     850     12871939 :     }
     851              : 
     852              :     //
     853              :     // Section 4: Misc internal helpers
     854              :     //
     855              : 
     856              :     /// Find a slot to evict.
     857              :     ///
     858              :     /// On return, the slot is empty and write-locked.
     859     12871939 :     async fn find_victim(
     860     12871939 :         &self,
     861     12871939 :         _permit_witness: &PinnedSlotsPermit,
     862     12871939 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     863     12871934 :         let iter_limit = self.slots.len() * 10;
     864     12871934 :         let mut iters = 0;
     865     41696889 :         loop {
     866     41696889 :             iters += 1;
     867     41696889 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     868     41696889 : 
     869     41696889 :             let slot = &self.slots[slot_idx];
     870     41696889 : 
     871     41696889 :             if slot.dec_usage_count() == 0 {
     872     12871934 :                 let mut inner = match slot.inner.try_write() {
     873     12871933 :                     Ok(inner) => inner,
     874            1 :                     Err(_err) => {
     875            1 :                         if iters > iter_limit {
     876              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     877              :                             // any particular number of iterations: other threads might race ahead and acquire and
     878              :                             // release pins just as we're scanning the array.
     879              :                             //
     880              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     881              :                             // slots. There are two threads running concurrently, A and B. A has just
     882              :                             // acquired the permit from the semaphore.
     883              :                             //
     884              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     885              :                             //   B: Acquire permit.
     886              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     887              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     888              :                             //   B: Release pin and permit again
     889              :                             //   B: Acquire permit.
     890              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     891              :                             //   B: Release pin and permit again
     892              :                             //
     893              :                             // Now we're back in the starting situation that both slots have
     894              :                             // usage_count 1, but A has now been through one iteration of the
     895              :                             // find_victim() loop. This can repeat indefinitely and on each
     896              :                             // iteration, A's iteration count increases by one.
     897              :                             //
     898              :                             // So, even though the semaphore for the permits is fair, the victim search
     899              :                             // itself happens in parallel and is not fair.
     900              :                             // Hence even with a permit, a task can theoretically be starved.
     901              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     902              :                             // permits for longer.
     903              :                             // Note that just yielding to tokio during iteration without such
     904              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     905              :                             // for B to bump usage count, further starving A.
     906            0 :                             page_cache_eviction_metrics::observe(
     907            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     908            0 :                                     iters: iters.try_into().unwrap(),
     909            0 :                                 },
     910            0 :                             );
     911            0 :                             anyhow::bail!("exceeded evict iter limit");
     912            0 :                         }
     913            0 :                         continue;
     914              :                     }
     915              :                 };
     916     12871933 :                 if let Some(old_key) = &inner.key {
     917     10020315 :                     // remove mapping for old buffer
     918     10020315 :                     self.remove_mapping(old_key);
     919     10020315 :                     inner.key = None;
     920     10020315 :                     page_cache_eviction_metrics::observe(
     921     10020315 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     922     10020315 :                             iters: iters.try_into().unwrap(),
     923     10020315 :                         },
     924     10020315 :                     );
     925     10020315 :                 } else {
     926      2851618 :                     page_cache_eviction_metrics::observe(
     927      2851618 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     928      2851618 :                             iters: iters.try_into().unwrap(),
     929      2851618 :                         },
     930      2851618 :                     );
     931      2851618 :                 }
     932     12871933 :                 return Ok((slot_idx, inner));
     933     28824955 :             }
     934              :         }
     935     12871933 :     }
     936              : 
     937              :     /// Initialize a new page cache
     938              :     ///
     939              :     /// This should be called only once at page server startup.
     940          678 :     fn new(num_pages: usize) -> Self {
     941          678 :         assert!(num_pages > 0, "page cache size must be > 0");
     942              : 
     943              :         // We could use Vec::leak here, but that potentially also leaks
     944              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     945              :         // this is avoided.
     946          678 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     947          678 : 
     948          678 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     949          678 :         size_metrics.max_bytes.set_page_sz(num_pages);
     950          678 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     951          678 :         size_metrics.current_bytes_materialized_page.set_page_sz(0);
     952          678 : 
     953          678 :         let slots = page_buffer
     954          678 :             .chunks_exact_mut(PAGE_SZ)
     955      4928424 :             .map(|chunk| {
     956      4928424 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     957      4928424 : 
     958      4928424 :                 Slot {
     959      4928424 :                     inner: tokio::sync::RwLock::new(SlotInner {
     960      4928424 :                         key: None,
     961      4928424 :                         buf,
     962      4928424 :                         permit: std::sync::Mutex::new(Weak::new()),
     963      4928424 :                     }),
     964      4928424 :                     usage_count: AtomicU8::new(0),
     965      4928424 :                 }
     966      4928424 :             })
     967          678 :             .collect();
     968          678 : 
     969          678 :         Self {
     970          678 :             materialized_page_map: Default::default(),
     971          678 :             immutable_page_map: Default::default(),
     972          678 :             slots,
     973          678 :             next_evict_slot: AtomicUsize::new(0),
     974          678 :             size_metrics,
     975          678 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     976          678 :         }
     977          678 :     }
     978              : }
     979              : 
     980              : trait PageSzBytesMetric {
     981              :     fn set_page_sz(&self, count: usize);
     982              :     fn add_page_sz(&self, count: usize);
     983              :     fn sub_page_sz(&self, count: usize);
     984              : }
     985              : 
     986              : #[inline(always)]
     987     22893429 : fn count_times_page_sz(count: usize) -> u64 {
     988     22893429 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     989     22893429 : }
     990              : 
     991              : impl PageSzBytesMetric for metrics::UIntGauge {
     992         2034 :     fn set_page_sz(&self, count: usize) {
     993         2034 :         self.set(count_times_page_sz(count));
     994         2034 :     }
     995     12871083 :     fn add_page_sz(&self, count: usize) {
     996     12871083 :         self.add(count_times_page_sz(count));
     997     12871083 :     }
     998     10020312 :     fn sub_page_sz(&self, count: usize) {
     999     10020312 :         self.sub(count_times_page_sz(count));
    1000     10020312 :     }
    1001              : }
        

Generated by: LCOV version 2.1-beta