LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 86.8 % 296 257
Test Date: 2025-03-12 00:01:28 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      21              : //!
      22              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      23              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      24              : //!
      25              : //! # Filling The Page Cache
      26              : //!
      27              : //! Page cache maps from a cache key to a buffer slot.
      28              : //! The cache key uniquely identifies the piece of data that is being cached.
      29              : //!
      30              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      31              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      32              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      33              : //! * Get a [`FileId`] using [`next_file_id`].
      34              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      35              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      36              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      37              : //!   a read guard for the page. Just use it.
      38              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      39              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      40              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      41              : //!   Then try again to [`PageCache::read_immutable_buf`].
      42              : //!   Unless there's high cache pressure, the page should now be cached.
      43              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      44              : //!
      45              : //! # Locking
      46              : //!
      47              : //! There are two levels of locking involved: There's one lock for the "mapping"
      48              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      49              : //! slot, and a separate lock on each slot. To read or write the contents of a
      50              : //! slot, you must hold the lock on the slot in read or write mode,
      51              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      52              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      53              : //! the slot at the same time.
      54              : //!
      55              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      56              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      57              : //! in the cache, you would first look up the mapping, while holding the mapping
      58              : //! lock, and then lock the slot. You must release the mapping lock in between,
      59              : //! to obey the lock ordering and avoid deadlock.
      60              : //!
      61              : //! A slot can momentarily have invalid contents, even if it's already been
      62              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      63              : //! the contents are valid. If you need to release the lock without initializing
      64              : //! the contents, you must remove the mapping first. We make that easy for the
      65              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      66              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      67              : //! mapping is automatically removed and the slot is marked free.
      68              : //!
      69              : 
      70              : use std::collections::HashMap;
      71              : use std::collections::hash_map::Entry;
      72              : use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
      73              : use std::sync::{Arc, Weak};
      74              : use std::time::Duration;
      75              : 
      76              : use anyhow::Context;
      77              : use once_cell::sync::OnceCell;
      78              : 
      79              : use crate::context::RequestContext;
      80              : use crate::metrics::{PageCacheSizeMetrics, page_cache_eviction_metrics};
      81              : use crate::virtual_file::{IoBufferMut, IoPageSlice};
      82              : 
      83              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      84              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      85              : 
      86              : ///
      87              : /// Initialize the page cache. This must be called once at page server startup.
      88              : ///
      89            0 : pub fn init(size: usize) {
      90            0 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
      91            0 :         panic!("page cache already initialized");
      92            0 :     }
      93            0 : }
      94              : 
      95              : ///
      96              : /// Get a handle to the page cache.
      97              : ///
      98       974273 : pub fn get() -> &'static PageCache {
      99       974273 :     //
     100       974273 :     // In unit tests, page server startup doesn't happen and no one calls
     101       974273 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     102       974273 :     // page cache is usable in unit tests.
     103       974273 :     //
     104       974273 :     if cfg!(test) {
     105       974273 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     106              :     } else {
     107            0 :         PAGE_CACHE.get().expect("page cache not initialized")
     108              :     }
     109       974273 : }
     110              : 
     111              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     112              : const MAX_USAGE_COUNT: u8 = 5;
     113              : 
     114              : /// See module-level comment.
     115              : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     116              : pub struct FileId(u64);
     117              : 
     118              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     119              : 
     120              : /// See module-level comment.
     121         5096 : pub fn next_file_id() -> FileId {
     122         5096 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     123         5096 : }
     124              : 
     125              : ///
     126              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     127              : ///
     128              : #[derive(Debug, PartialEq, Eq, Clone)]
     129              : #[allow(clippy::enum_variant_names)]
     130              : enum CacheKey {
     131              :     ImmutableFilePage { file_id: FileId, blkno: u32 },
     132              : }
     133              : 
     134              : struct Slot {
     135              :     inner: tokio::sync::RwLock<SlotInner>,
     136              :     usage_count: AtomicU8,
     137              : }
     138              : 
     139              : struct SlotInner {
     140              :     key: Option<CacheKey>,
     141              :     // for `coalesce_readers_permit`
     142              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     143              :     buf: IoPageSlice<'static>,
     144              : }
     145              : 
     146              : impl Slot {
     147              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     148       910805 :     fn inc_usage_count(&self) {
     149       910805 :         let _ = self
     150       910805 :             .usage_count
     151       910805 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     152       910805 :                 if val == MAX_USAGE_COUNT {
     153       863307 :                     None
     154              :                 } else {
     155        47498 :                     Some(val + 1)
     156              :                 }
     157       910805 :             });
     158       910805 :     }
     159              : 
     160              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     161              :     /// the old usage count.
     162       169657 :     fn dec_usage_count(&self) -> u8 {
     163       169657 :         let count_res =
     164       169657 :             self.usage_count
     165       169657 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     166       169657 :                     if val == 0 { None } else { Some(val - 1) }
     167       169657 :                 });
     168       169657 : 
     169       169657 :         match count_res {
     170       106189 :             Ok(usage_count) => usage_count,
     171        63468 :             Err(usage_count) => usage_count,
     172              :         }
     173       169657 :     }
     174              : 
     175              :     /// Sets the usage count to a specific value.
     176        63468 :     fn set_usage_count(&self, count: u8) {
     177        63468 :         self.usage_count.store(count, Ordering::Relaxed);
     178        63468 :     }
     179              : }
     180              : 
     181              : impl SlotInner {
     182              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     183       910805 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     184       910805 :         let mut guard = self.permit.lock().unwrap();
     185       910805 :         if let Some(existing_permit) = guard.upgrade() {
     186            0 :             drop(guard);
     187            0 :             drop(permit);
     188            0 :             existing_permit
     189              :         } else {
     190       910805 :             let permit = Arc::new(permit);
     191       910805 :             *guard = Arc::downgrade(&permit);
     192       910805 :             permit
     193              :         }
     194       910805 :     }
     195              : }
     196              : 
     197              : pub struct PageCache {
     198              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     199              : 
     200              :     /// The actual buffers with their metadata.
     201              :     slots: Box<[Slot]>,
     202              : 
     203              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     204              : 
     205              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     206              :     /// This is interpreted modulo the page cache size.
     207              :     next_evict_slot: AtomicUsize,
     208              : 
     209              :     size_metrics: &'static PageCacheSizeMetrics,
     210              : }
     211              : 
     212              : struct PinnedSlotsPermit {
     213              :     _permit: tokio::sync::OwnedSemaphorePermit,
     214              : }
     215              : 
     216              : ///
     217              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     218              : /// until the guard is dropped.
     219              : ///
     220              : pub struct PageReadGuard<'i> {
     221              :     _permit: Arc<PinnedSlotsPermit>,
     222              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     223              : }
     224              : 
     225              : impl std::ops::Deref for PageReadGuard<'_> {
     226              :     type Target = [u8; PAGE_SZ];
     227              : 
     228       974433 :     fn deref(&self) -> &Self::Target {
     229       974433 :         self.slot_guard.buf.deref()
     230       974433 :     }
     231              : }
     232              : 
     233              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     234            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     235            0 :         self.slot_guard.buf.as_ref()
     236            0 :     }
     237              : }
     238              : 
     239              : ///
     240              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     241              : /// until the guard is dropped.
     242              : ///
     243              : /// Counterintuitively, this is used even for a read, if the requested page is not
     244              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     245              : /// is expected to fill in the page contents and call mark_valid().
     246              : pub struct PageWriteGuard<'i> {
     247              :     state: PageWriteGuardState<'i>,
     248              : }
     249              : 
     250              : enum PageWriteGuardState<'i> {
     251              :     Invalid {
     252              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     253              :         _permit: PinnedSlotsPermit,
     254              :     },
     255              :     Downgraded,
     256              : }
     257              : 
     258              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     259        95213 :     fn deref_mut(&mut self) -> &mut Self::Target {
     260        95213 :         match &mut self.state {
     261        95213 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
     262            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     263              :         }
     264        95213 :     }
     265              : }
     266              : 
     267              : impl std::ops::Deref for PageWriteGuard<'_> {
     268              :     type Target = [u8; PAGE_SZ];
     269              : 
     270      1047277 :     fn deref(&self) -> &Self::Target {
     271      1047277 :         match &self.state {
     272      1047277 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
     273            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     274              :         }
     275      1047277 :     }
     276              : }
     277              : 
     278              : impl<'a> PageWriteGuard<'a> {
     279              :     /// Mark that the buffer contents are now valid.
     280              :     #[must_use]
     281        63468 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     282        63468 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     283        63468 :         match prev {
     284        63468 :             PageWriteGuardState::Invalid { inner, _permit } => {
     285        63468 :                 assert!(inner.key.is_some());
     286        63468 :                 PageReadGuard {
     287        63468 :                     _permit: Arc::new(_permit),
     288        63468 :                     slot_guard: inner.downgrade(),
     289        63468 :                 }
     290              :             }
     291            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     292              :         }
     293        63468 :     }
     294              : }
     295              : 
     296              : impl Drop for PageWriteGuard<'_> {
     297              :     ///
     298              :     /// If the buffer was allocated for a page that was not already in the
     299              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     300              :     /// initializing it, remove the mapping from the page cache.
     301              :     ///
     302        63468 :     fn drop(&mut self) {
     303        63468 :         match &mut self.state {
     304            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     305            0 :                 assert!(inner.key.is_some());
     306            0 :                 let self_key = inner.key.as_ref().unwrap();
     307            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     308            0 :                 inner.key = None;
     309              :             }
     310        63468 :             PageWriteGuardState::Downgraded => {}
     311              :         }
     312        63468 :     }
     313              : }
     314              : 
     315              : /// lock_for_read() return value
     316              : pub enum ReadBufResult<'a> {
     317              :     Found(PageReadGuard<'a>),
     318              :     NotFound(PageWriteGuard<'a>),
     319              : }
     320              : 
     321              : impl PageCache {
     322       974273 :     pub async fn read_immutable_buf(
     323       974273 :         &self,
     324       974273 :         file_id: FileId,
     325       974273 :         blkno: u32,
     326       974273 :         ctx: &RequestContext,
     327       974273 :     ) -> anyhow::Result<ReadBufResult> {
     328       974273 :         self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
     329       974273 :             .await
     330       974273 :     }
     331              : 
     332              :     //
     333              :     // Section 2: Internal interface functions for lookup/update.
     334              :     //
     335              :     // To add support for a new kind of "thing" to cache, you will need
     336              :     // to add public interface routines above, and code to deal with the
     337              :     // "mappings" after this section. But the routines in this section should
     338              :     // not require changes.
     339              : 
     340       974273 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     341       974273 :         match tokio::time::timeout(
     342       974273 :             // Choose small timeout, neon_smgr does its own retries.
     343       974273 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     344       974273 :             Duration::from_secs(10),
     345       974273 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     346       974273 :         )
     347       974273 :         .await
     348              :         {
     349       974273 :             Ok(res) => Ok(PinnedSlotsPermit {
     350       974273 :                 _permit: res.expect("this semaphore is never closed"),
     351       974273 :             }),
     352            0 :             Err(_timeout) => {
     353            0 :                 crate::metrics::page_cache_errors_inc(
     354            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     355            0 :                 );
     356            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     357              :             }
     358              :         }
     359       974273 :     }
     360              : 
     361              :     /// Look up a page in the cache.
     362              :     ///
     363       974273 :     async fn try_lock_for_read(
     364       974273 :         &self,
     365       974273 :         cache_key: &CacheKey,
     366       974273 :         permit: &mut Option<PinnedSlotsPermit>,
     367       974273 :     ) -> Option<PageReadGuard> {
     368       974273 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     369              :             // The page was found in the mapping. Lock the slot, and re-check
     370              :             // that it's still what we expected (because we released the mapping
     371              :             // lock already, another thread could have evicted the page)
     372       910805 :             let slot = &self.slots[slot_idx];
     373       910805 :             let inner = slot.inner.read().await;
     374       910805 :             if inner.key.as_ref() == Some(cache_key) {
     375       910805 :                 slot.inc_usage_count();
     376       910805 :                 return Some(PageReadGuard {
     377       910805 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     378       910805 :                     slot_guard: inner,
     379       910805 :                 });
     380            0 :             }
     381        63468 :         }
     382        63468 :         None
     383       974273 :     }
     384              : 
     385              :     /// Return a locked buffer for given block.
     386              :     ///
     387              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     388              :     /// page is already found in the cache, *cache_key is updated.
     389              :     ///
     390              :     /// If the page is not found in the cache, this allocates a new buffer for
     391              :     /// it. The caller may then initialize the buffer with the contents, and
     392              :     /// call mark_valid().
     393              :     ///
     394              :     /// Example usage:
     395              :     ///
     396              :     /// ```ignore
     397              :     /// let cache = page_cache::get();
     398              :     ///
     399              :     /// match cache.lock_for_read(&key) {
     400              :     ///     ReadBufResult::Found(read_guard) => {
     401              :     ///         // The page was found in cache. Use it
     402              :     ///     },
     403              :     ///     ReadBufResult::NotFound(write_guard) => {
     404              :     ///         // The page was not found in cache. Read it from disk into the
     405              :     ///         // buffer.
     406              :     ///         //read_my_page_from_disk(write_guard);
     407              :     ///
     408              :     ///         // The buffer contents are now valid. Tell the page cache.
     409              :     ///         write_guard.mark_valid();
     410              :     ///     },
     411              :     /// }
     412              :     /// ```
     413              :     ///
     414       974273 :     async fn lock_for_read(
     415       974273 :         &self,
     416       974273 :         cache_key: &CacheKey,
     417       974273 :         ctx: &RequestContext,
     418       974273 :     ) -> anyhow::Result<ReadBufResult> {
     419       974273 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     420              : 
     421       974273 :         let (read_access, hit) = match cache_key {
     422       974273 :             CacheKey::ImmutableFilePage { .. } => (
     423       974273 :                 &crate::metrics::PAGE_CACHE
     424       974273 :                     .for_ctx(ctx)
     425       974273 :                     .read_accesses_immutable,
     426       974273 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     427       974273 :             ),
     428       974273 :         };
     429       974273 :         read_access.inc();
     430       974273 : 
     431       974273 :         let mut is_first_iteration = true;
     432              :         loop {
     433              :             // First check if the key already exists in the cache.
     434       974273 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     435       910805 :                 debug_assert!(permit.is_none());
     436       910805 :                 if is_first_iteration {
     437       910805 :                     hit.inc();
     438       910805 :                 }
     439       910805 :                 return Ok(ReadBufResult::Found(read_guard));
     440        63468 :             }
     441        63468 :             debug_assert!(permit.is_some());
     442        63468 :             is_first_iteration = false;
     443              : 
     444              :             // Not found. Find a victim buffer
     445        63468 :             let (slot_idx, mut inner) = self
     446        63468 :                 .find_victim(permit.as_ref().unwrap())
     447        63468 :                 .await
     448        63468 :                 .context("Failed to find evict victim")?;
     449              : 
     450              :             // Insert mapping for this. At this point, we may find that another
     451              :             // thread did the same thing concurrently. In that case, we evicted
     452              :             // our victim buffer unnecessarily. Put it into the free list and
     453              :             // continue with the slot that the other thread chose.
     454        63468 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     455              :                 // TODO: put to free list
     456              : 
     457              :                 // We now just loop back to start from beginning. This is not
     458              :                 // optimal, we'll perform the lookup in the mapping again, which
     459              :                 // is not really necessary because we already got
     460              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     461              :                 // to matter much.
     462            0 :                 continue;
     463        63468 :             }
     464        63468 : 
     465        63468 :             // Make the slot ready
     466        63468 :             let slot = &self.slots[slot_idx];
     467        63468 :             inner.key = Some(cache_key.clone());
     468        63468 :             slot.set_usage_count(1);
     469        63468 : 
     470        63468 :             debug_assert!(
     471              :                 {
     472        63468 :                     let guard = inner.permit.lock().unwrap();
     473        63468 :                     guard.upgrade().is_none()
     474              :                 },
     475            0 :                 "we hold a write lock, so, no one else should have a permit"
     476              :             );
     477              : 
     478        63468 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     479        63468 :                 state: PageWriteGuardState::Invalid {
     480        63468 :                     _permit: permit.take().unwrap(),
     481        63468 :                     inner,
     482        63468 :                 },
     483        63468 :             }));
     484              :         }
     485       974273 :     }
     486              : 
     487              :     //
     488              :     // Section 3: Mapping functions
     489              :     //
     490              : 
     491              :     /// Search for a page in the cache using the given search key.
     492              :     ///
     493              :     /// Returns the slot index, if any.
     494              :     ///
     495              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     496              :     /// get recycled for an unrelated page immediately after this function
     497              :     /// returns.  The caller is responsible for re-checking that the slot still
     498              :     /// contains the page with the same key before using it.
     499              :     ///
     500       974273 :     fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
     501       974273 :         match cache_key {
     502       974273 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     503       974273 :                 let map = self.immutable_page_map.read().unwrap();
     504       974273 :                 Some(*map.get(&(*file_id, *blkno))?)
     505              :             }
     506              :         }
     507       974273 :     }
     508              : 
     509              :     ///
     510              :     /// Remove mapping for given key.
     511              :     ///
     512        60180 :     fn remove_mapping(&self, old_key: &CacheKey) {
     513        60180 :         match old_key {
     514        60180 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     515        60180 :                 let mut map = self.immutable_page_map.write().unwrap();
     516        60180 :                 map.remove(&(*file_id, *blkno))
     517        60180 :                     .expect("could not find old key in mapping");
     518        60180 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     519        60180 :             }
     520        60180 :         }
     521        60180 :     }
     522              : 
     523              :     ///
     524              :     /// Insert mapping for given key.
     525              :     ///
     526              :     /// If a mapping already existed for the given key, returns the slot index
     527              :     /// of the existing mapping and leaves it untouched.
     528        63468 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     529        63468 :         match new_key {
     530        63468 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     531        63468 :                 let mut map = self.immutable_page_map.write().unwrap();
     532        63468 :                 match map.entry((*file_id, *blkno)) {
     533            0 :                     Entry::Occupied(entry) => Some(*entry.get()),
     534        63468 :                     Entry::Vacant(entry) => {
     535        63468 :                         entry.insert(slot_idx);
     536        63468 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     537        63468 :                         None
     538              :                     }
     539              :                 }
     540              :             }
     541              :         }
     542        63468 :     }
     543              : 
     544              :     //
     545              :     // Section 4: Misc internal helpers
     546              :     //
     547              : 
     548              :     /// Find a slot to evict.
     549              :     ///
     550              :     /// On return, the slot is empty and write-locked.
     551        63468 :     async fn find_victim(
     552        63468 :         &self,
     553        63468 :         _permit_witness: &PinnedSlotsPermit,
     554        63468 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     555        63468 :         let iter_limit = self.slots.len() * 10;
     556        63468 :         let mut iters = 0;
     557              :         loop {
     558       169657 :             iters += 1;
     559       169657 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     560       169657 : 
     561       169657 :             let slot = &self.slots[slot_idx];
     562       169657 : 
     563       169657 :             if slot.dec_usage_count() == 0 {
     564        63468 :                 let mut inner = match slot.inner.try_write() {
     565        63468 :                     Ok(inner) => inner,
     566            0 :                     Err(_err) => {
     567            0 :                         if iters > iter_limit {
     568              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     569              :                             // any particular number of iterations: other threads might race ahead and acquire and
     570              :                             // release pins just as we're scanning the array.
     571              :                             //
     572              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     573              :                             // slots. There are two threads running concurrently, A and B. A has just
     574              :                             // acquired the permit from the semaphore.
     575              :                             //
     576              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     577              :                             //   B: Acquire permit.
     578              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     579              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     580              :                             //   B: Release pin and permit again
     581              :                             //   B: Acquire permit.
     582              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     583              :                             //   B: Release pin and permit again
     584              :                             //
     585              :                             // Now we're back in the starting situation that both slots have
     586              :                             // usage_count 1, but A has now been through one iteration of the
     587              :                             // find_victim() loop. This can repeat indefinitely and on each
     588              :                             // iteration, A's iteration count increases by one.
     589              :                             //
     590              :                             // So, even though the semaphore for the permits is fair, the victim search
     591              :                             // itself happens in parallel and is not fair.
     592              :                             // Hence even with a permit, a task can theoretically be starved.
     593              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     594              :                             // permits for longer.
     595              :                             // Note that just yielding to tokio during iteration without such
     596              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     597              :                             // for B to bump usage count, further starving A.
     598            0 :                             page_cache_eviction_metrics::observe(
     599            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     600            0 :                                     iters: iters.try_into().unwrap(),
     601            0 :                                 },
     602            0 :                             );
     603            0 :                             anyhow::bail!("exceeded evict iter limit");
     604            0 :                         }
     605            0 :                         continue;
     606              :                     }
     607              :                 };
     608        63468 :                 if let Some(old_key) = &inner.key {
     609        60180 :                     // remove mapping for old buffer
     610        60180 :                     self.remove_mapping(old_key);
     611        60180 :                     inner.key = None;
     612        60180 :                     page_cache_eviction_metrics::observe(
     613        60180 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     614        60180 :                             iters: iters.try_into().unwrap(),
     615        60180 :                         },
     616        60180 :                     );
     617        60180 :                 } else {
     618         3288 :                     page_cache_eviction_metrics::observe(
     619         3288 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     620         3288 :                             iters: iters.try_into().unwrap(),
     621         3288 :                         },
     622         3288 :                     );
     623         3288 :                 }
     624        63468 :                 return Ok((slot_idx, inner));
     625       106189 :             }
     626              :         }
     627        63468 :     }
     628              : 
     629              :     /// Initialize a new page cache
     630              :     ///
     631              :     /// This should be called only once at page server startup.
     632          188 :     fn new(num_pages: usize) -> Self {
     633          188 :         assert!(num_pages > 0, "page cache size must be > 0");
     634              : 
     635              :         // We could use Vec::leak here, but that potentially also leaks
     636              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     637              :         // this is avoided.
     638          188 :         let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
     639          188 : 
     640          188 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     641          188 :         size_metrics.max_bytes.set_page_sz(num_pages);
     642          188 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     643          188 : 
     644          188 :         let slots = page_buffer
     645          188 :             .chunks_exact_mut(PAGE_SZ)
     646         9400 :             .map(|chunk| {
     647         9400 :                 // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
     648         9400 :                 let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
     649         9400 : 
     650         9400 :                 Slot {
     651         9400 :                     inner: tokio::sync::RwLock::new(SlotInner {
     652         9400 :                         key: None,
     653         9400 :                         buf,
     654         9400 :                         permit: std::sync::Mutex::new(Weak::new()),
     655         9400 :                     }),
     656         9400 :                     usage_count: AtomicU8::new(0),
     657         9400 :                 }
     658         9400 :             })
     659          188 :             .collect();
     660          188 : 
     661          188 :         Self {
     662          188 :             immutable_page_map: Default::default(),
     663          188 :             slots,
     664          188 :             next_evict_slot: AtomicUsize::new(0),
     665          188 :             size_metrics,
     666          188 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     667          188 :         }
     668          188 :     }
     669              : }
     670              : 
     671              : trait PageSzBytesMetric {
     672              :     fn set_page_sz(&self, count: usize);
     673              :     fn add_page_sz(&self, count: usize);
     674              :     fn sub_page_sz(&self, count: usize);
     675              : }
     676              : 
     677              : #[inline(always)]
     678       124024 : fn count_times_page_sz(count: usize) -> u64 {
     679       124024 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     680       124024 : }
     681              : 
     682              : impl PageSzBytesMetric for metrics::UIntGauge {
     683          376 :     fn set_page_sz(&self, count: usize) {
     684          376 :         self.set(count_times_page_sz(count));
     685          376 :     }
     686        63468 :     fn add_page_sz(&self, count: usize) {
     687        63468 :         self.add(count_times_page_sz(count));
     688        63468 :     }
     689        60180 :     fn sub_page_sz(&self, count: usize) {
     690        60180 :         self.sub(count_times_page_sz(count));
     691        60180 :     }
     692              : }
        

Generated by: LCOV version 2.1-beta