LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 86.4 % 273 236
Test Date: 2025-07-16 12:29:03 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      21              : //!
      22              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      23              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      24              : //!
      25              : //! # Filling The Page Cache
      26              : //!
      27              : //! Page cache maps from a cache key to a buffer slot.
      28              : //! The cache key uniquely identifies the piece of data that is being cached.
      29              : //!
      30              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      31              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      32              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      33              : //! * Get a [`FileId`] using [`next_file_id`].
      34              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      35              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      36              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      37              : //!   a read guard for the page. Just use it.
      38              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      39              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      40              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      41              : //!   Then try again to [`PageCache::read_immutable_buf`].
      42              : //!   Unless there's high cache pressure, the page should now be cached.
      43              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      44              : //!
      45              : //! # Locking
      46              : //!
      47              : //! There are two levels of locking involved: There's one lock for the "mapping"
      48              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      49              : //! slot, and a separate lock on each slot. To read or write the contents of a
      50              : //! slot, you must hold the lock on the slot in read or write mode,
      51              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      52              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      53              : //! the slot at the same time.
      54              : //!
      55              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      56              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      57              : //! in the cache, you would first look up the mapping, while holding the mapping
      58              : //! lock, and then lock the slot. You must release the mapping lock in between,
      59              : //! to obey the lock ordering and avoid deadlock.
      60              : //!
      61              : //! A slot can momentarily have invalid contents, even if it's already been
      62              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      63              : //! the contents are valid. If you need to release the lock without initializing
      64              : //! the contents, you must remove the mapping first. We make that easy for the
      65              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      66              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      67              : //! mapping is automatically removed and the slot is marked free.
      68              : //!
      69              : 
      70              : use std::collections::HashMap;
      71              : use std::collections::hash_map::Entry;
      72              : use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
      73              : use std::sync::{Arc, Weak};
      74              : use std::time::Duration;
      75              : 
      76              : use anyhow::Context;
      77              : use once_cell::sync::OnceCell;
      78              : 
      79              : use crate::context::RequestContext;
      80              : use crate::metrics::{PageCacheSizeMetrics, page_cache_eviction_metrics};
      81              : use crate::virtual_file::{IoBufferMut, IoPageSlice};
      82              : 
      83              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      84              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      85              : 
      86              : ///
      87              : /// Initialize the page cache. This must be called once at page server startup.
      88              : ///
      89            0 : pub fn init(size: usize) {
      90            0 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
      91            0 :         panic!("page cache already initialized");
      92            0 :     }
      93            0 : }
      94              : 
      95              : ///
      96              : /// Get a handle to the page cache.
      97              : ///
      98       295610 : pub fn get() -> &'static PageCache {
      99              :     //
     100              :     // In unit tests, page server startup doesn't happen and no one calls
     101              :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     102              :     // page cache is usable in unit tests.
     103              :     //
     104       295610 :     if cfg!(test) {
     105       295610 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     106              :     } else {
     107            0 :         PAGE_CACHE.get().expect("page cache not initialized")
     108              :     }
     109       295610 : }
     110              : 
     111              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     112              : const MAX_USAGE_COUNT: u8 = 5;
     113              : 
     114              : /// See module-level comment.
     115              : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     116              : pub struct FileId(u64);
     117              : 
     118              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     119              : 
     120              : /// See module-level comment.
     121         1304 : pub fn next_file_id() -> FileId {
     122         1304 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     123         1304 : }
     124              : 
     125              : ///
     126              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     127              : ///
     128              : #[derive(Debug, PartialEq, Eq, Clone)]
     129              : #[allow(clippy::enum_variant_names)]
     130              : enum CacheKey {
     131              :     ImmutableFilePage { file_id: FileId, blkno: u32 },
     132              : }
     133              : 
     134              : struct Slot {
     135              :     inner: tokio::sync::RwLock<SlotInner>,
     136              :     usage_count: AtomicU8,
     137              : }
     138              : 
     139              : struct SlotInner {
     140              :     key: Option<CacheKey>,
     141              :     // for `coalesce_readers_permit`
     142              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     143              :     buf: IoPageSlice<'static>,
     144              : }
     145              : 
     146              : impl Slot {
     147              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     148       278291 :     fn inc_usage_count(&self) {
     149       278291 :         let _ = self
     150       278291 :             .usage_count
     151       278291 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     152       278291 :                 if val == MAX_USAGE_COUNT {
     153       265023 :                     None
     154              :                 } else {
     155        13268 :                     Some(val + 1)
     156              :                 }
     157       278291 :             });
     158       278291 :     }
     159              : 
     160              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     161              :     /// the old usage count.
     162        46510 :     fn dec_usage_count(&self) -> u8 {
     163        46510 :         let count_res =
     164        46510 :             self.usage_count
     165        46510 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     166        46510 :                     if val == 0 { None } else { Some(val - 1) }
     167        46510 :                 });
     168              : 
     169        46510 :         match count_res {
     170        29191 :             Ok(usage_count) => usage_count,
     171        17319 :             Err(usage_count) => usage_count,
     172              :         }
     173        46510 :     }
     174              : 
     175              :     /// Sets the usage count to a specific value.
     176        17319 :     fn set_usage_count(&self, count: u8) {
     177        17319 :         self.usage_count.store(count, Ordering::Relaxed);
     178        17319 :     }
     179              : }
     180              : 
     181              : impl SlotInner {
     182              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     183       278291 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     184       278291 :         let mut guard = self.permit.lock().unwrap();
     185       278291 :         if let Some(existing_permit) = guard.upgrade() {
     186            0 :             drop(guard);
     187            0 :             drop(permit);
     188            0 :             existing_permit
     189              :         } else {
     190       278291 :             let permit = Arc::new(permit);
     191       278291 :             *guard = Arc::downgrade(&permit);
     192       278291 :             permit
     193              :         }
     194       278291 :     }
     195              : }
     196              : 
     197              : pub struct PageCache {
     198              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     199              : 
     200              :     /// The actual buffers with their metadata.
     201              :     slots: Box<[Slot]>,
     202              : 
     203              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     204              : 
     205              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     206              :     /// This is interpreted modulo the page cache size.
     207              :     next_evict_slot: AtomicUsize,
     208              : 
     209              :     size_metrics: &'static PageCacheSizeMetrics,
     210              : }
     211              : 
     212              : struct PinnedSlotsPermit {
     213              :     _permit: tokio::sync::OwnedSemaphorePermit,
     214              : }
     215              : 
     216              : ///
     217              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     218              : /// until the guard is dropped.
     219              : ///
     220              : pub struct PageReadGuard<'i> {
     221              :     _permit: Arc<PinnedSlotsPermit>,
     222              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     223              : }
     224              : 
     225              : impl std::ops::Deref for PageReadGuard<'_> {
     226              :     type Target = [u8; PAGE_SZ];
     227              : 
     228       295650 :     fn deref(&self) -> &Self::Target {
     229       295650 :         self.slot_guard.buf.deref()
     230       295650 :     }
     231              : }
     232              : 
     233              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     234            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     235            0 :         self.slot_guard.buf.as_ref()
     236            0 :     }
     237              : }
     238              : 
     239              : ///
     240              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     241              : /// until the guard is dropped.
     242              : ///
     243              : /// Counterintuitively, this is used even for a read, if the requested page is not
     244              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     245              : /// is expected to fill in the page contents and call mark_valid().
     246              : pub struct PageWriteGuard<'i> {
     247              :     state: PageWriteGuardState<'i>,
     248              : }
     249              : 
     250              : enum PageWriteGuardState<'i> {
     251              :     Invalid {
     252              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     253              :         _permit: PinnedSlotsPermit,
     254              :     },
     255              :     Downgraded,
     256              : }
     257              : 
     258              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     259        17319 :     fn deref_mut(&mut self) -> &mut Self::Target {
     260        17319 :         match &mut self.state {
     261        17319 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
     262            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     263              :         }
     264        17319 :     }
     265              : }
     266              : 
     267              : impl std::ops::Deref for PageWriteGuard<'_> {
     268              :     type Target = [u8; PAGE_SZ];
     269              : 
     270       277104 :     fn deref(&self) -> &Self::Target {
     271       277104 :         match &self.state {
     272       277104 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
     273            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     274              :         }
     275       277104 :     }
     276              : }
     277              : 
     278              : impl<'a> PageWriteGuard<'a> {
     279              :     /// Mark that the buffer contents are now valid.
     280              :     #[must_use]
     281        17319 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     282        17319 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     283        17319 :         match prev {
     284        17319 :             PageWriteGuardState::Invalid { inner, _permit } => {
     285        17319 :                 assert!(inner.key.is_some());
     286        17319 :                 PageReadGuard {
     287        17319 :                     _permit: Arc::new(_permit),
     288        17319 :                     slot_guard: inner.downgrade(),
     289        17319 :                 }
     290              :             }
     291            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     292              :         }
     293        17319 :     }
     294              : }
     295              : 
     296              : impl Drop for PageWriteGuard<'_> {
     297              :     ///
     298              :     /// If the buffer was allocated for a page that was not already in the
     299              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     300              :     /// initializing it, remove the mapping from the page cache.
     301              :     ///
     302        17319 :     fn drop(&mut self) {
     303        17319 :         match &mut self.state {
     304            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     305            0 :                 assert!(inner.key.is_some());
     306            0 :                 let self_key = inner.key.as_ref().unwrap();
     307            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     308            0 :                 inner.key = None;
     309              :             }
     310        17319 :             PageWriteGuardState::Downgraded => {}
     311              :         }
     312        17319 :     }
     313              : }
     314              : 
     315              : /// lock_for_read() return value
     316              : pub enum ReadBufResult<'a> {
     317              :     Found(PageReadGuard<'a>),
     318              :     NotFound(PageWriteGuard<'a>),
     319              : }
     320              : 
     321              : impl PageCache {
     322       295610 :     pub async fn read_immutable_buf(
     323       295610 :         &self,
     324       295610 :         file_id: FileId,
     325       295610 :         blkno: u32,
     326       295610 :         ctx: &RequestContext,
     327       295610 :     ) -> anyhow::Result<ReadBufResult> {
     328       295610 :         self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
     329       295610 :             .await
     330       295610 :     }
     331              : 
     332              :     //
     333              :     // Section 2: Internal interface functions for lookup/update.
     334              :     //
     335              :     // To add support for a new kind of "thing" to cache, you will need
     336              :     // to add public interface routines above, and code to deal with the
     337              :     // "mappings" after this section. But the routines in this section should
     338              :     // not require changes.
     339              : 
     340       295610 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     341       295610 :         match tokio::time::timeout(
     342              :             // Choose small timeout, neon_smgr does its own retries.
     343              :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     344       295610 :             Duration::from_secs(10),
     345       295610 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     346              :         )
     347       295610 :         .await
     348              :         {
     349       295610 :             Ok(res) => Ok(PinnedSlotsPermit {
     350       295610 :                 _permit: res.expect("this semaphore is never closed"),
     351       295610 :             }),
     352            0 :             Err(_timeout) => {
     353            0 :                 crate::metrics::page_cache_errors_inc(
     354            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     355              :                 );
     356            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     357              :             }
     358              :         }
     359       295610 :     }
     360              : 
     361              :     /// Look up a page in the cache.
     362              :     ///
     363       295610 :     async fn try_lock_for_read(
     364       295610 :         &self,
     365       295610 :         cache_key: &CacheKey,
     366       295610 :         permit: &mut Option<PinnedSlotsPermit>,
     367       295610 :     ) -> Option<PageReadGuard> {
     368       295610 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     369              :             // The page was found in the mapping. Lock the slot, and re-check
     370              :             // that it's still what we expected (because we released the mapping
     371              :             // lock already, another thread could have evicted the page)
     372       278291 :             let slot = &self.slots[slot_idx];
     373       278291 :             let inner = slot.inner.read().await;
     374       278291 :             if inner.key.as_ref() == Some(cache_key) {
     375       278291 :                 slot.inc_usage_count();
     376       278291 :                 return Some(PageReadGuard {
     377       278291 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     378       278291 :                     slot_guard: inner,
     379       278291 :                 });
     380            0 :             }
     381        17319 :         }
     382        17319 :         None
     383       295610 :     }
     384              : 
     385              :     /// Return a locked buffer for given block.
     386              :     ///
     387              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     388              :     /// page is already found in the cache, *cache_key is updated.
     389              :     ///
     390              :     /// If the page is not found in the cache, this allocates a new buffer for
     391              :     /// it. The caller may then initialize the buffer with the contents, and
     392              :     /// call mark_valid().
     393              :     ///
     394              :     /// Example usage:
     395              :     ///
     396              :     /// ```ignore
     397              :     /// let cache = page_cache::get();
     398              :     ///
     399              :     /// match cache.lock_for_read(&key) {
     400              :     ///     ReadBufResult::Found(read_guard) => {
     401              :     ///         // The page was found in cache. Use it
     402              :     ///     },
     403              :     ///     ReadBufResult::NotFound(write_guard) => {
     404              :     ///         // The page was not found in cache. Read it from disk into the
     405              :     ///         // buffer.
     406              :     ///         //read_my_page_from_disk(write_guard);
     407              :     ///
     408              :     ///         // The buffer contents are now valid. Tell the page cache.
     409              :     ///         write_guard.mark_valid();
     410              :     ///     },
     411              :     /// }
     412              :     /// ```
     413              :     ///
     414       295610 :     async fn lock_for_read(
     415       295610 :         &self,
     416       295610 :         cache_key: &CacheKey,
     417       295610 :         ctx: &RequestContext,
     418       295610 :     ) -> anyhow::Result<ReadBufResult> {
     419       295610 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     420              : 
     421       295610 :         let (read_access, hit) = match cache_key {
     422       295610 :             CacheKey::ImmutableFilePage { .. } => (
     423       295610 :                 &crate::metrics::PAGE_CACHE
     424       295610 :                     .for_ctx(ctx)
     425       295610 :                     .read_accesses_immutable,
     426       295610 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     427       295610 :             ),
     428       295610 :         };
     429       295610 :         read_access.inc();
     430              : 
     431       295610 :         let mut is_first_iteration = true;
     432              :         loop {
     433              :             // First check if the key already exists in the cache.
     434       295610 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     435       278291 :                 debug_assert!(permit.is_none());
     436       278291 :                 if is_first_iteration {
     437       278291 :                     hit.inc();
     438       278291 :                 }
     439       278291 :                 return Ok(ReadBufResult::Found(read_guard));
     440        17319 :             }
     441        17319 :             debug_assert!(permit.is_some());
     442        17319 :             is_first_iteration = false;
     443              : 
     444              :             // Not found. Find a victim buffer
     445        17319 :             let (slot_idx, mut inner) = self
     446        17319 :                 .find_victim(permit.as_ref().unwrap())
     447        17319 :                 .await
     448        17319 :                 .context("Failed to find evict victim")?;
     449              : 
     450              :             // Insert mapping for this. At this point, we may find that another
     451              :             // thread did the same thing concurrently. In that case, we evicted
     452              :             // our victim buffer unnecessarily. Put it into the free list and
     453              :             // continue with the slot that the other thread chose.
     454        17319 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     455              :                 // TODO: put to free list
     456              : 
     457              :                 // We now just loop back to start from beginning. This is not
     458              :                 // optimal, we'll perform the lookup in the mapping again, which
     459              :                 // is not really necessary because we already got
     460              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     461              :                 // to matter much.
     462            0 :                 continue;
     463        17319 :             }
     464              : 
     465              :             // Make the slot ready
     466        17319 :             let slot = &self.slots[slot_idx];
     467        17319 :             inner.key = Some(cache_key.clone());
     468        17319 :             slot.set_usage_count(1);
     469              : 
     470        17319 :             debug_assert!(
     471              :                 {
     472        17319 :                     let guard = inner.permit.lock().unwrap();
     473        17319 :                     guard.upgrade().is_none()
     474              :                 },
     475            0 :                 "we hold a write lock, so, no one else should have a permit"
     476              :             );
     477              : 
     478        17319 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     479        17319 :                 state: PageWriteGuardState::Invalid {
     480        17319 :                     _permit: permit.take().unwrap(),
     481        17319 :                     inner,
     482        17319 :                 },
     483        17319 :             }));
     484              :         }
     485       295610 :     }
     486              : 
     487              :     //
     488              :     // Section 3: Mapping functions
     489              :     //
     490              : 
     491              :     /// Search for a page in the cache using the given search key.
     492              :     ///
     493              :     /// Returns the slot index, if any.
     494              :     ///
     495              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     496              :     /// get recycled for an unrelated page immediately after this function
     497              :     /// returns.  The caller is responsible for re-checking that the slot still
     498              :     /// contains the page with the same key before using it.
     499              :     ///
     500       295610 :     fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
     501       295610 :         match cache_key {
     502       295610 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     503       295610 :                 let map = self.immutable_page_map.read().unwrap();
     504       295610 :                 Some(*map.get(&(*file_id, *blkno))?)
     505              :             }
     506              :         }
     507       295610 :     }
     508              : 
     509              :     ///
     510              :     /// Remove mapping for given key.
     511              :     ///
     512        16499 :     fn remove_mapping(&self, old_key: &CacheKey) {
     513        16499 :         match old_key {
     514        16499 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     515        16499 :                 let mut map = self.immutable_page_map.write().unwrap();
     516        16499 :                 map.remove(&(*file_id, *blkno))
     517        16499 :                     .expect("could not find old key in mapping");
     518        16499 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     519        16499 :             }
     520              :         }
     521        16499 :     }
     522              : 
     523              :     ///
     524              :     /// Insert mapping for given key.
     525              :     ///
     526              :     /// If a mapping already existed for the given key, returns the slot index
     527              :     /// of the existing mapping and leaves it untouched.
     528        17319 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     529        17319 :         match new_key {
     530        17319 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     531        17319 :                 let mut map = self.immutable_page_map.write().unwrap();
     532        17319 :                 match map.entry((*file_id, *blkno)) {
     533            0 :                     Entry::Occupied(entry) => Some(*entry.get()),
     534        17319 :                     Entry::Vacant(entry) => {
     535        17319 :                         entry.insert(slot_idx);
     536        17319 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     537        17319 :                         None
     538              :                     }
     539              :                 }
     540              :             }
     541              :         }
     542        17319 :     }
     543              : 
     544              :     //
     545              :     // Section 4: Misc internal helpers
     546              :     //
     547              : 
     548              :     /// Find a slot to evict.
     549              :     ///
     550              :     /// On return, the slot is empty and write-locked.
     551        17319 :     async fn find_victim(
     552        17319 :         &self,
     553        17319 :         _permit_witness: &PinnedSlotsPermit,
     554        17319 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     555        17319 :         let iter_limit = self.slots.len() * 10;
     556        17319 :         let mut iters = 0;
     557              :         loop {
     558        46510 :             iters += 1;
     559        46510 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     560              : 
     561        46510 :             let slot = &self.slots[slot_idx];
     562              : 
     563        46510 :             if slot.dec_usage_count() == 0 {
     564        17319 :                 let mut inner = match slot.inner.try_write() {
     565        17319 :                     Ok(inner) => inner,
     566            0 :                     Err(_err) => {
     567            0 :                         if iters > iter_limit {
     568              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     569              :                             // any particular number of iterations: other threads might race ahead and acquire and
     570              :                             // release pins just as we're scanning the array.
     571              :                             //
     572              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     573              :                             // slots. There are two threads running concurrently, A and B. A has just
     574              :                             // acquired the permit from the semaphore.
     575              :                             //
     576              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     577              :                             //   B: Acquire permit.
     578              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     579              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     580              :                             //   B: Release pin and permit again
     581              :                             //   B: Acquire permit.
     582              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     583              :                             //   B: Release pin and permit again
     584              :                             //
     585              :                             // Now we're back in the starting situation that both slots have
     586              :                             // usage_count 1, but A has now been through one iteration of the
     587              :                             // find_victim() loop. This can repeat indefinitely and on each
     588              :                             // iteration, A's iteration count increases by one.
     589              :                             //
     590              :                             // So, even though the semaphore for the permits is fair, the victim search
     591              :                             // itself happens in parallel and is not fair.
     592              :                             // Hence even with a permit, a task can theoretically be starved.
     593              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     594              :                             // permits for longer.
     595              :                             // Note that just yielding to tokio during iteration without such
     596              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     597              :                             // for B to bump usage count, further starving A.
     598            0 :                             page_cache_eviction_metrics::observe(
     599            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     600            0 :                                     iters: iters.try_into().unwrap(),
     601            0 :                                 },
     602              :                             );
     603            0 :                             anyhow::bail!("exceeded evict iter limit");
     604            0 :                         }
     605            0 :                         continue;
     606              :                     }
     607              :                 };
     608        17319 :                 if let Some(old_key) = &inner.key {
     609        16499 :                     // remove mapping for old buffer
     610        16499 :                     self.remove_mapping(old_key);
     611        16499 :                     inner.key = None;
     612        16499 :                     page_cache_eviction_metrics::observe(
     613        16499 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     614        16499 :                             iters: iters.try_into().unwrap(),
     615        16499 :                         },
     616        16499 :                     );
     617        16499 :                 } else {
     618          820 :                     page_cache_eviction_metrics::observe(
     619          820 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     620          820 :                             iters: iters.try_into().unwrap(),
     621          820 :                         },
     622          820 :                     );
     623          820 :                 }
     624        17319 :                 return Ok((slot_idx, inner));
     625        29191 :             }
     626              :         }
     627        17319 :     }
     628              : 
     629              :     /// Initialize a new page cache
     630              :     ///
     631              :     /// This should be called only once at page server startup.
     632           50 :     fn new(num_pages: usize) -> Self {
     633           50 :         assert!(num_pages > 0, "page cache size must be > 0");
     634              : 
     635              :         // We could use Vec::leak here, but that potentially also leaks
     636              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     637              :         // this is avoided.
     638           50 :         let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
     639              : 
     640           50 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     641           50 :         size_metrics.max_bytes.set_page_sz(num_pages);
     642           50 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     643              : 
     644           50 :         let slots = page_buffer
     645           50 :             .chunks_exact_mut(PAGE_SZ)
     646         2500 :             .map(|chunk| {
     647              :                 // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
     648         2500 :                 let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
     649              : 
     650         2500 :                 Slot {
     651         2500 :                     inner: tokio::sync::RwLock::new(SlotInner {
     652         2500 :                         key: None,
     653         2500 :                         buf,
     654         2500 :                         permit: std::sync::Mutex::new(Weak::new()),
     655         2500 :                     }),
     656         2500 :                     usage_count: AtomicU8::new(0),
     657         2500 :                 }
     658         2500 :             })
     659           50 :             .collect();
     660              : 
     661           50 :         Self {
     662           50 :             immutable_page_map: Default::default(),
     663           50 :             slots,
     664           50 :             next_evict_slot: AtomicUsize::new(0),
     665           50 :             size_metrics,
     666           50 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     667           50 :         }
     668           50 :     }
     669              : }
     670              : 
     671              : trait PageSzBytesMetric {
     672              :     fn set_page_sz(&self, count: usize);
     673              :     fn add_page_sz(&self, count: usize);
     674              :     fn sub_page_sz(&self, count: usize);
     675              : }
     676              : 
     677              : #[inline(always)]
     678        33918 : fn count_times_page_sz(count: usize) -> u64 {
     679        33918 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     680        33918 : }
     681              : 
     682              : impl PageSzBytesMetric for metrics::UIntGauge {
     683          100 :     fn set_page_sz(&self, count: usize) {
     684          100 :         self.set(count_times_page_sz(count));
     685          100 :     }
     686        17319 :     fn add_page_sz(&self, count: usize) {
     687        17319 :         self.add(count_times_page_sz(count));
     688        17319 :     }
     689        16499 :     fn sub_page_sz(&self, count: usize) {
     690        16499 :         self.sub(count_times_page_sz(count));
     691        16499 :     }
     692              : }
        

Generated by: LCOV version 2.1-beta