LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: fcf55189004bd3119eed75e2873a97da8078700c.info Lines: 87.9 % 298 262
Test Date: 2024-06-25 12:07:31 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      21              : //!
      22              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      23              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      24              : //!
      25              : //! # Filling The Page Cache
      26              : //!
      27              : //! Page cache maps from a cache key to a buffer slot.
      28              : //! The cache key uniquely identifies the piece of data that is being cached.
      29              : //!
      30              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      31              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      32              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      33              : //! * Get a [`FileId`] using [`next_file_id`].
      34              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      35              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      36              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      37              : //!   a read guard for the page. Just use it.
      38              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      39              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      40              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      41              : //!   Then try again to [`PageCache::read_immutable_buf`].
      42              : //!   Unless there's high cache pressure, the page should now be cached.
      43              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      44              : //!
      45              : //! # Locking
      46              : //!
      47              : //! There are two levels of locking involved: There's one lock for the "mapping"
      48              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      49              : //! slot, and a separate lock on each slot. To read or write the contents of a
      50              : //! slot, you must hold the lock on the slot in read or write mode,
      51              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      52              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      53              : //! the slot at the same time.
      54              : //!
      55              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      56              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      57              : //! in the cache, you would first look up the mapping, while holding the mapping
      58              : //! lock, and then lock the slot. You must release the mapping lock in between,
      59              : //! to obey the lock ordering and avoid deadlock.
      60              : //!
      61              : //! A slot can momentarily have invalid contents, even if it's already been
      62              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      63              : //! the contents are valid. If you need to release the lock without initializing
      64              : //! the contents, you must remove the mapping first. We make that easy for the
      65              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      66              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      67              : //! mapping is automatically removed and the slot is marked free.
      68              : //!
      69              : 
      70              : use std::{
      71              :     collections::{hash_map::Entry, HashMap},
      72              :     sync::{
      73              :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      74              :         Arc, Weak,
      75              :     },
      76              :     time::Duration,
      77              : };
      78              : 
      79              : use anyhow::Context;
      80              : use once_cell::sync::OnceCell;
      81              : 
      82              : use crate::{
      83              :     context::RequestContext,
      84              :     metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
      85              : };
      86              : 
      87              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      88              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      89              : 
      90              : ///
      91              : /// Initialize the page cache. This must be called once at page server startup.
      92              : ///
      93            0 : pub fn init(size: usize) {
      94            0 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
      95            0 :         panic!("page cache already initialized");
      96            0 :     }
      97            0 : }
      98              : 
      99              : ///
     100              : /// Get a handle to the page cache.
     101              : ///
     102      7049166 : pub fn get() -> &'static PageCache {
     103      7049166 :     //
     104      7049166 :     // In unit tests, page server startup doesn't happen and no one calls
     105      7049166 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     106      7049166 :     // page cache is usable in unit tests.
     107      7049166 :     //
     108      7049166 :     if cfg!(test) {
     109      7049166 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     110              :     } else {
     111            0 :         PAGE_CACHE.get().expect("page cache not initialized")
     112              :     }
     113      7049166 : }
     114              : 
     115              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     116              : const MAX_USAGE_COUNT: u8 = 5;
     117              : 
     118              : /// See module-level comment.
     119              : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     120              : pub struct FileId(u64);
     121              : 
     122              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     123              : 
     124              : /// See module-level comment.
     125         2328 : pub fn next_file_id() -> FileId {
     126         2328 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     127         2328 : }
     128              : 
     129              : ///
     130              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     131              : ///
     132              : #[derive(Debug, PartialEq, Eq, Clone)]
     133              : #[allow(clippy::enum_variant_names)]
     134              : enum CacheKey {
     135              :     ImmutableFilePage { file_id: FileId, blkno: u32 },
     136              : }
     137              : 
     138              : struct Slot {
     139              :     inner: tokio::sync::RwLock<SlotInner>,
     140              :     usage_count: AtomicU8,
     141              : }
     142              : 
     143              : struct SlotInner {
     144              :     key: Option<CacheKey>,
     145              :     // for `coalesce_readers_permit`
     146              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     147              :     buf: &'static mut [u8; PAGE_SZ],
     148              : }
     149              : 
     150              : impl Slot {
     151              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     152      6928495 :     fn inc_usage_count(&self) {
     153      6928495 :         let _ = self
     154      6928495 :             .usage_count
     155      6928495 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     156      6928495 :                 if val == MAX_USAGE_COUNT {
     157      6607100 :                     None
     158              :                 } else {
     159       321395 :                     Some(val + 1)
     160              :                 }
     161      6928495 :             });
     162      6928495 :     }
     163              : 
     164              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     165              :     /// the old usage count.
     166       651994 :     fn dec_usage_count(&self) -> u8 {
     167       651994 :         let count_res =
     168       651994 :             self.usage_count
     169       651994 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     170       651994 :                     if val == 0 {
     171       166943 :                         None
     172              :                     } else {
     173       485051 :                         Some(val - 1)
     174              :                     }
     175       651994 :                 });
     176       651994 : 
     177       651994 :         match count_res {
     178       485051 :             Ok(usage_count) => usage_count,
     179       166943 :             Err(usage_count) => usage_count,
     180              :         }
     181       651994 :     }
     182              : 
     183              :     /// Sets the usage count to a specific value.
     184       166941 :     fn set_usage_count(&self, count: u8) {
     185       166941 :         self.usage_count.store(count, Ordering::Relaxed);
     186       166941 :     }
     187              : }
     188              : 
     189              : impl SlotInner {
     190              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     191      6928495 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     192      6928495 :         let mut guard = self.permit.lock().unwrap();
     193      6928495 :         if let Some(existing_permit) = guard.upgrade() {
     194            0 :             drop(guard);
     195            0 :             drop(permit);
     196            0 :             existing_permit
     197              :         } else {
     198      6928495 :             let permit = Arc::new(permit);
     199      6928495 :             *guard = Arc::downgrade(&permit);
     200      6928495 :             permit
     201              :         }
     202      6928495 :     }
     203              : }
     204              : 
     205              : pub struct PageCache {
     206              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     207              : 
     208              :     /// The actual buffers with their metadata.
     209              :     slots: Box<[Slot]>,
     210              : 
     211              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     212              : 
     213              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     214              :     /// This is interpreted modulo the page cache size.
     215              :     next_evict_slot: AtomicUsize,
     216              : 
     217              :     size_metrics: &'static PageCacheSizeMetrics,
     218              : }
     219              : 
     220              : struct PinnedSlotsPermit {
     221              :     _permit: tokio::sync::OwnedSemaphorePermit,
     222              : }
     223              : 
     224              : ///
     225              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     226              : /// until the guard is dropped.
     227              : ///
     228              : pub struct PageReadGuard<'i> {
     229              :     _permit: Arc<PinnedSlotsPermit>,
     230              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     231              : }
     232              : 
     233              : impl std::ops::Deref for PageReadGuard<'_> {
     234              :     type Target = [u8; PAGE_SZ];
     235              : 
     236     13499331 :     fn deref(&self) -> &Self::Target {
     237     13499331 :         self.slot_guard.buf
     238     13499331 :     }
     239              : }
     240              : 
     241              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     242            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     243            0 :         self.slot_guard.buf
     244            0 :     }
     245              : }
     246              : 
     247              : ///
     248              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     249              : /// until the guard is dropped.
     250              : ///
     251              : /// Counterintuitively, this is used even for a read, if the requested page is not
     252              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     253              : /// is expected to fill in the page contents and call mark_valid().
     254              : pub struct PageWriteGuard<'i> {
     255              :     state: PageWriteGuardState<'i>,
     256              : }
     257              : 
     258              : enum PageWriteGuardState<'i> {
     259              :     Invalid {
     260              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     261              :         _permit: PinnedSlotsPermit,
     262              :     },
     263              :     Downgraded,
     264              : }
     265              : 
     266              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     267       166941 :     fn deref_mut(&mut self) -> &mut Self::Target {
     268       166941 :         match &mut self.state {
     269       166941 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     270            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     271              :         }
     272       166941 :     }
     273              : }
     274              : 
     275              : impl std::ops::Deref for PageWriteGuard<'_> {
     276              :     type Target = [u8; PAGE_SZ];
     277              : 
     278       402281 :     fn deref(&self) -> &Self::Target {
     279       402281 :         match &self.state {
     280       402281 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     281            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     282              :         }
     283       402281 :     }
     284              : }
     285              : 
     286              : impl<'a> PageWriteGuard<'a> {
     287              :     /// Mark that the buffer contents are now valid.
     288              :     #[must_use]
     289       166941 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     290       166941 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     291       166941 :         match prev {
     292       166941 :             PageWriteGuardState::Invalid { inner, _permit } => {
     293       166941 :                 assert!(inner.key.is_some());
     294       166941 :                 PageReadGuard {
     295       166941 :                     _permit: Arc::new(_permit),
     296       166941 :                     slot_guard: inner.downgrade(),
     297       166941 :                 }
     298              :             }
     299            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     300              :         }
     301       166941 :     }
     302              : }
     303              : 
     304              : impl Drop for PageWriteGuard<'_> {
     305              :     ///
     306              :     /// If the buffer was allocated for a page that was not already in the
     307              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     308              :     /// initializing it, remove the mapping from the page cache.
     309              :     ///
     310       166941 :     fn drop(&mut self) {
     311       166941 :         match &mut self.state {
     312            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     313            0 :                 assert!(inner.key.is_some());
     314            0 :                 let self_key = inner.key.as_ref().unwrap();
     315            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     316            0 :                 inner.key = None;
     317              :             }
     318       166941 :             PageWriteGuardState::Downgraded => {}
     319              :         }
     320       166941 :     }
     321              : }
     322              : 
     323              : /// lock_for_read() return value
     324              : pub enum ReadBufResult<'a> {
     325              :     Found(PageReadGuard<'a>),
     326              :     NotFound(PageWriteGuard<'a>),
     327              : }
     328              : 
     329              : impl PageCache {
     330      7095436 :     pub async fn read_immutable_buf(
     331      7095436 :         &self,
     332      7095436 :         file_id: FileId,
     333      7095436 :         blkno: u32,
     334      7095436 :         ctx: &RequestContext,
     335      7095436 :     ) -> anyhow::Result<ReadBufResult> {
     336      7095436 :         self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
     337        87741 :             .await
     338      7095436 :     }
     339              : 
     340              :     //
     341              :     // Section 2: Internal interface functions for lookup/update.
     342              :     //
     343              :     // To add support for a new kind of "thing" to cache, you will need
     344              :     // to add public interface routines above, and code to deal with the
     345              :     // "mappings" after this section. But the routines in this section should
     346              :     // not require changes.
     347              : 
     348      7095436 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     349      7095436 :         match tokio::time::timeout(
     350      7095436 :             // Choose small timeout, neon_smgr does its own retries.
     351      7095436 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     352      7095436 :             Duration::from_secs(10),
     353      7095436 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     354      7095436 :         )
     355        57756 :         .await
     356              :         {
     357      7095436 :             Ok(res) => Ok(PinnedSlotsPermit {
     358      7095436 :                 _permit: res.expect("this semaphore is never closed"),
     359      7095436 :             }),
     360            0 :             Err(_timeout) => {
     361            0 :                 crate::metrics::page_cache_errors_inc(
     362            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     363            0 :                 );
     364            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     365              :             }
     366              :         }
     367      7095436 :     }
     368              : 
     369              :     /// Look up a page in the cache.
     370              :     ///
     371      7095436 :     async fn try_lock_for_read(
     372      7095436 :         &self,
     373      7095436 :         cache_key: &CacheKey,
     374      7095436 :         permit: &mut Option<PinnedSlotsPermit>,
     375      7095436 :     ) -> Option<PageReadGuard> {
     376      7095436 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     377              :             // The page was found in the mapping. Lock the slot, and re-check
     378              :             // that it's still what we expected (because we released the mapping
     379              :             // lock already, another thread could have evicted the page)
     380      6928495 :             let slot = &self.slots[slot_idx];
     381      6928495 :             let inner = slot.inner.read().await;
     382      6928495 :             if inner.key.as_ref() == Some(cache_key) {
     383      6928495 :                 slot.inc_usage_count();
     384      6928495 :                 return Some(PageReadGuard {
     385      6928495 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     386      6928495 :                     slot_guard: inner,
     387      6928495 :                 });
     388            0 :             }
     389       166941 :         }
     390       166941 :         None
     391      7095436 :     }
     392              : 
     393              :     /// Return a locked buffer for given block.
     394              :     ///
     395              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     396              :     /// page is already found in the cache, *cache_key is updated.
     397              :     ///
     398              :     /// If the page is not found in the cache, this allocates a new buffer for
     399              :     /// it. The caller may then initialize the buffer with the contents, and
     400              :     /// call mark_valid().
     401              :     ///
     402              :     /// Example usage:
     403              :     ///
     404              :     /// ```ignore
     405              :     /// let cache = page_cache::get();
     406              :     ///
     407              :     /// match cache.lock_for_read(&key) {
     408              :     ///     ReadBufResult::Found(read_guard) => {
     409              :     ///         // The page was found in cache. Use it
     410              :     ///     },
     411              :     ///     ReadBufResult::NotFound(write_guard) => {
     412              :     ///         // The page was not found in cache. Read it from disk into the
     413              :     ///         // buffer.
     414              :     ///         //read_my_page_from_disk(write_guard);
     415              :     ///
     416              :     ///         // The buffer contents are now valid. Tell the page cache.
     417              :     ///         write_guard.mark_valid();
     418              :     ///     },
     419              :     /// }
     420              :     /// ```
     421              :     ///
     422      7095436 :     async fn lock_for_read(
     423      7095436 :         &self,
     424      7095436 :         cache_key: &CacheKey,
     425      7095436 :         ctx: &RequestContext,
     426      7095436 :     ) -> anyhow::Result<ReadBufResult> {
     427      7095436 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     428              : 
     429      7095436 :         let (read_access, hit) = match cache_key {
     430      7095436 :             CacheKey::ImmutableFilePage { .. } => (
     431      7095436 :                 &crate::metrics::PAGE_CACHE
     432      7095436 :                     .for_ctx(ctx)
     433      7095436 :                     .read_accesses_immutable,
     434      7095436 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     435      7095436 :             ),
     436      7095436 :         };
     437      7095436 :         read_access.inc();
     438      7095436 : 
     439      7095436 :         let mut is_first_iteration = true;
     440              :         loop {
     441              :             // First check if the key already exists in the cache.
     442      7095436 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     443      6928495 :                 debug_assert!(permit.is_none());
     444      6928495 :                 if is_first_iteration {
     445      6928495 :                     hit.inc();
     446      6928495 :                 }
     447      6928495 :                 return Ok(ReadBufResult::Found(read_guard));
     448       166941 :             }
     449       166941 :             debug_assert!(permit.is_some());
     450       166941 :             is_first_iteration = false;
     451              : 
     452              :             // Not found. Find a victim buffer
     453       166941 :             let (slot_idx, mut inner) = self
     454       166941 :                 .find_victim(permit.as_ref().unwrap())
     455            0 :                 .await
     456       166941 :                 .context("Failed to find evict victim")?;
     457              : 
     458              :             // Insert mapping for this. At this point, we may find that another
     459              :             // thread did the same thing concurrently. In that case, we evicted
     460              :             // our victim buffer unnecessarily. Put it into the free list and
     461              :             // continue with the slot that the other thread chose.
     462       166941 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     463              :                 // TODO: put to free list
     464              : 
     465              :                 // We now just loop back to start from beginning. This is not
     466              :                 // optimal, we'll perform the lookup in the mapping again, which
     467              :                 // is not really necessary because we already got
     468              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     469              :                 // to matter much.
     470            0 :                 continue;
     471       166941 :             }
     472       166941 : 
     473       166941 :             // Make the slot ready
     474       166941 :             let slot = &self.slots[slot_idx];
     475       166941 :             inner.key = Some(cache_key.clone());
     476       166941 :             slot.set_usage_count(1);
     477       166941 : 
     478       166941 :             debug_assert!(
     479              :                 {
     480       166941 :                     let guard = inner.permit.lock().unwrap();
     481       166941 :                     guard.upgrade().is_none()
     482              :                 },
     483            0 :                 "we hold a write lock, so, no one else should have a permit"
     484              :             );
     485              : 
     486       166941 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     487       166941 :                 state: PageWriteGuardState::Invalid {
     488       166941 :                     _permit: permit.take().unwrap(),
     489       166941 :                     inner,
     490       166941 :                 },
     491       166941 :             }));
     492              :         }
     493      7095436 :     }
     494              : 
     495              :     //
     496              :     // Section 3: Mapping functions
     497              :     //
     498              : 
     499              :     /// Search for a page in the cache using the given search key.
     500              :     ///
     501              :     /// Returns the slot index, if any.
     502              :     ///
     503              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     504              :     /// get recycled for an unrelated page immediately after this function
     505              :     /// returns.  The caller is responsible for re-checking that the slot still
     506              :     /// contains the page with the same key before using it.
     507              :     ///
     508      7095436 :     fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
     509      7095436 :         match cache_key {
     510      7095436 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     511      7095436 :                 let map = self.immutable_page_map.read().unwrap();
     512      7095436 :                 Some(*map.get(&(*file_id, *blkno))?)
     513              :             }
     514              :         }
     515      7095436 :     }
     516              : 
     517              :     ///
     518              :     /// Remove mapping for given key.
     519              :     ///
     520       165053 :     fn remove_mapping(&self, old_key: &CacheKey) {
     521       165053 :         match old_key {
     522       165053 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     523       165053 :                 let mut map = self.immutable_page_map.write().unwrap();
     524       165053 :                 map.remove(&(*file_id, *blkno))
     525       165053 :                     .expect("could not find old key in mapping");
     526       165053 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     527       165053 :             }
     528       165053 :         }
     529       165053 :     }
     530              : 
     531              :     ///
     532              :     /// Insert mapping for given key.
     533              :     ///
     534              :     /// If a mapping already existed for the given key, returns the slot index
     535              :     /// of the existing mapping and leaves it untouched.
     536       166941 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     537       166941 :         match new_key {
     538       166941 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     539       166941 :                 let mut map = self.immutable_page_map.write().unwrap();
     540       166941 :                 match map.entry((*file_id, *blkno)) {
     541            0 :                     Entry::Occupied(entry) => Some(*entry.get()),
     542       166941 :                     Entry::Vacant(entry) => {
     543       166941 :                         entry.insert(slot_idx);
     544       166941 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     545       166941 :                         None
     546              :                     }
     547              :                 }
     548              :             }
     549              :         }
     550       166941 :     }
     551              : 
     552              :     //
     553              :     // Section 4: Misc internal helpers
     554              :     //
     555              : 
     556              :     /// Find a slot to evict.
     557              :     ///
     558              :     /// On return, the slot is empty and write-locked.
     559       166941 :     async fn find_victim(
     560       166941 :         &self,
     561       166941 :         _permit_witness: &PinnedSlotsPermit,
     562       166941 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     563       166941 :         let iter_limit = self.slots.len() * 10;
     564       166941 :         let mut iters = 0;
     565       651994 :         loop {
     566       651994 :             iters += 1;
     567       651994 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     568       651994 : 
     569       651994 :             let slot = &self.slots[slot_idx];
     570       651994 : 
     571       651994 :             if slot.dec_usage_count() == 0 {
     572       166943 :                 let mut inner = match slot.inner.try_write() {
     573       166941 :                     Ok(inner) => inner,
     574            2 :                     Err(_err) => {
     575            2 :                         if iters > iter_limit {
     576              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     577              :                             // any particular number of iterations: other threads might race ahead and acquire and
     578              :                             // release pins just as we're scanning the array.
     579              :                             //
     580              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     581              :                             // slots. There are two threads running concurrently, A and B. A has just
     582              :                             // acquired the permit from the semaphore.
     583              :                             //
     584              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     585              :                             //   B: Acquire permit.
     586              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     587              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     588              :                             //   B: Release pin and permit again
     589              :                             //   B: Acquire permit.
     590              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     591              :                             //   B: Release pin and permit again
     592              :                             //
     593              :                             // Now we're back in the starting situation that both slots have
     594              :                             // usage_count 1, but A has now been through one iteration of the
     595              :                             // find_victim() loop. This can repeat indefinitely and on each
     596              :                             // iteration, A's iteration count increases by one.
     597              :                             //
     598              :                             // So, even though the semaphore for the permits is fair, the victim search
     599              :                             // itself happens in parallel and is not fair.
     600              :                             // Hence even with a permit, a task can theoretically be starved.
     601              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     602              :                             // permits for longer.
     603              :                             // Note that just yielding to tokio during iteration without such
     604              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     605              :                             // for B to bump usage count, further starving A.
     606            0 :                             page_cache_eviction_metrics::observe(
     607            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     608            0 :                                     iters: iters.try_into().unwrap(),
     609            0 :                                 },
     610            0 :                             );
     611            0 :                             anyhow::bail!("exceeded evict iter limit");
     612            2 :                         }
     613            2 :                         continue;
     614              :                     }
     615              :                 };
     616       166941 :                 if let Some(old_key) = &inner.key {
     617       165053 :                     // remove mapping for old buffer
     618       165053 :                     self.remove_mapping(old_key);
     619       165053 :                     inner.key = None;
     620       165053 :                     page_cache_eviction_metrics::observe(
     621       165053 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     622       165053 :                             iters: iters.try_into().unwrap(),
     623       165053 :                         },
     624       165053 :                     );
     625       165053 :                 } else {
     626         1888 :                     page_cache_eviction_metrics::observe(
     627         1888 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     628         1888 :                             iters: iters.try_into().unwrap(),
     629         1888 :                         },
     630         1888 :                     );
     631         1888 :                 }
     632       166941 :                 return Ok((slot_idx, inner));
     633       485051 :             }
     634              :         }
     635       166941 :     }
     636              : 
     637              :     /// Initialize a new page cache
     638              :     ///
     639              :     /// This should be called only once at page server startup.
     640           72 :     fn new(num_pages: usize) -> Self {
     641           72 :         assert!(num_pages > 0, "page cache size must be > 0");
     642              : 
     643              :         // We could use Vec::leak here, but that potentially also leaks
     644              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     645              :         // this is avoided.
     646           72 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     647           72 : 
     648           72 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     649           72 :         size_metrics.max_bytes.set_page_sz(num_pages);
     650           72 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     651           72 : 
     652           72 :         let slots = page_buffer
     653           72 :             .chunks_exact_mut(PAGE_SZ)
     654         3600 :             .map(|chunk| {
     655         3600 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     656         3600 : 
     657         3600 :                 Slot {
     658         3600 :                     inner: tokio::sync::RwLock::new(SlotInner {
     659         3600 :                         key: None,
     660         3600 :                         buf,
     661         3600 :                         permit: std::sync::Mutex::new(Weak::new()),
     662         3600 :                     }),
     663         3600 :                     usage_count: AtomicU8::new(0),
     664         3600 :                 }
     665         3600 :             })
     666           72 :             .collect();
     667           72 : 
     668           72 :         Self {
     669           72 :             immutable_page_map: Default::default(),
     670           72 :             slots,
     671           72 :             next_evict_slot: AtomicUsize::new(0),
     672           72 :             size_metrics,
     673           72 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     674           72 :         }
     675           72 :     }
     676              : }
     677              : 
     678              : trait PageSzBytesMetric {
     679              :     fn set_page_sz(&self, count: usize);
     680              :     fn add_page_sz(&self, count: usize);
     681              :     fn sub_page_sz(&self, count: usize);
     682              : }
     683              : 
     684              : #[inline(always)]
     685       332138 : fn count_times_page_sz(count: usize) -> u64 {
     686       332138 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     687       332138 : }
     688              : 
     689              : impl PageSzBytesMetric for metrics::UIntGauge {
     690          144 :     fn set_page_sz(&self, count: usize) {
     691          144 :         self.set(count_times_page_sz(count));
     692          144 :     }
     693       166941 :     fn add_page_sz(&self, count: usize) {
     694       166941 :         self.add(count_times_page_sz(count));
     695       166941 :     }
     696       165053 :     fn sub_page_sz(&self, count: usize) {
     697       165053 :         self.sub(count_times_page_sz(count));
     698       165053 :     }
     699              : }
        

Generated by: LCOV version 2.1-beta