LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 86.5 % 297 257
Test Date: 2024-09-19 20:36:02 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      21              : //!
      22              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      23              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      24              : //!
      25              : //! # Filling The Page Cache
      26              : //!
      27              : //! Page cache maps from a cache key to a buffer slot.
      28              : //! The cache key uniquely identifies the piece of data that is being cached.
      29              : //!
      30              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      31              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      32              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      33              : //! * Get a [`FileId`] using [`next_file_id`].
      34              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      35              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      36              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      37              : //!   a read guard for the page. Just use it.
      38              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      39              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      40              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      41              : //!   Then try again to [`PageCache::read_immutable_buf`].
      42              : //!   Unless there's high cache pressure, the page should now be cached.
      43              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      44              : //!
      45              : //! # Locking
      46              : //!
      47              : //! There are two levels of locking involved: There's one lock for the "mapping"
      48              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      49              : //! slot, and a separate lock on each slot. To read or write the contents of a
      50              : //! slot, you must hold the lock on the slot in read or write mode,
      51              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      52              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      53              : //! the slot at the same time.
      54              : //!
      55              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      56              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      57              : //! in the cache, you would first look up the mapping, while holding the mapping
      58              : //! lock, and then lock the slot. You must release the mapping lock in between,
      59              : //! to obey the lock ordering and avoid deadlock.
      60              : //!
      61              : //! A slot can momentarily have invalid contents, even if it's already been
      62              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      63              : //! the contents are valid. If you need to release the lock without initializing
      64              : //! the contents, you must remove the mapping first. We make that easy for the
      65              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      66              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      67              : //! mapping is automatically removed and the slot is marked free.
      68              : //!
      69              : 
      70              : use std::{
      71              :     collections::{hash_map::Entry, HashMap},
      72              :     sync::{
      73              :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      74              :         Arc, Weak,
      75              :     },
      76              :     time::Duration,
      77              : };
      78              : 
      79              : use anyhow::Context;
      80              : use once_cell::sync::OnceCell;
      81              : 
      82              : use crate::{
      83              :     context::RequestContext,
      84              :     metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
      85              : };
      86              : 
      87              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      88              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      89              : 
      90              : ///
      91              : /// Initialize the page cache. This must be called once at page server startup.
      92              : ///
      93            0 : pub fn init(size: usize) {
      94            0 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
      95            0 :         panic!("page cache already initialized");
      96            0 :     }
      97            0 : }
      98              : 
      99              : ///
     100              : /// Get a handle to the page cache.
     101              : ///
     102      1650306 : pub fn get() -> &'static PageCache {
     103      1650306 :     //
     104      1650306 :     // In unit tests, page server startup doesn't happen and no one calls
     105      1650306 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     106      1650306 :     // page cache is usable in unit tests.
     107      1650306 :     //
     108      1650306 :     if cfg!(test) {
     109      1650306 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     110              :     } else {
     111            0 :         PAGE_CACHE.get().expect("page cache not initialized")
     112              :     }
     113      1650306 : }
     114              : 
     115              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     116              : const MAX_USAGE_COUNT: u8 = 5;
     117              : 
     118              : /// See module-level comment.
     119              : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     120              : pub struct FileId(u64);
     121              : 
     122              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     123              : 
     124              : /// See module-level comment.
     125         7290 : pub fn next_file_id() -> FileId {
     126         7290 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     127         7290 : }
     128              : 
     129              : ///
     130              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     131              : ///
     132              : #[derive(Debug, PartialEq, Eq, Clone)]
     133              : #[allow(clippy::enum_variant_names)]
     134              : enum CacheKey {
     135              :     ImmutableFilePage { file_id: FileId, blkno: u32 },
     136              : }
     137              : 
     138              : struct Slot {
     139              :     inner: tokio::sync::RwLock<SlotInner>,
     140              :     usage_count: AtomicU8,
     141              : }
     142              : 
     143              : struct SlotInner {
     144              :     key: Option<CacheKey>,
     145              :     // for `coalesce_readers_permit`
     146              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     147              :     buf: &'static mut [u8; PAGE_SZ],
     148              : }
     149              : 
     150              : impl Slot {
     151              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     152      1553956 :     fn inc_usage_count(&self) {
     153      1553956 :         let _ = self
     154      1553956 :             .usage_count
     155      1553956 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     156      1553956 :                 if val == MAX_USAGE_COUNT {
     157      1482934 :                     None
     158              :                 } else {
     159        71022 :                     Some(val + 1)
     160              :                 }
     161      1553956 :             });
     162      1553956 :     }
     163              : 
     164              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     165              :     /// the old usage count.
     166       257261 :     fn dec_usage_count(&self) -> u8 {
     167       257261 :         let count_res =
     168       257261 :             self.usage_count
     169       257261 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     170       257261 :                     if val == 0 {
     171        96350 :                         None
     172              :                     } else {
     173       160911 :                         Some(val - 1)
     174              :                     }
     175       257261 :                 });
     176       257261 : 
     177       257261 :         match count_res {
     178       160911 :             Ok(usage_count) => usage_count,
     179        96350 :             Err(usage_count) => usage_count,
     180              :         }
     181       257261 :     }
     182              : 
     183              :     /// Sets the usage count to a specific value.
     184        96350 :     fn set_usage_count(&self, count: u8) {
     185        96350 :         self.usage_count.store(count, Ordering::Relaxed);
     186        96350 :     }
     187              : }
     188              : 
     189              : impl SlotInner {
     190              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     191      1553956 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     192      1553956 :         let mut guard = self.permit.lock().unwrap();
     193      1553956 :         if let Some(existing_permit) = guard.upgrade() {
     194            0 :             drop(guard);
     195            0 :             drop(permit);
     196            0 :             existing_permit
     197              :         } else {
     198      1553956 :             let permit = Arc::new(permit);
     199      1553956 :             *guard = Arc::downgrade(&permit);
     200      1553956 :             permit
     201              :         }
     202      1553956 :     }
     203              : }
     204              : 
     205              : pub struct PageCache {
     206              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     207              : 
     208              :     /// The actual buffers with their metadata.
     209              :     slots: Box<[Slot]>,
     210              : 
     211              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     212              : 
     213              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     214              :     /// This is interpreted modulo the page cache size.
     215              :     next_evict_slot: AtomicUsize,
     216              : 
     217              :     size_metrics: &'static PageCacheSizeMetrics,
     218              : }
     219              : 
     220              : struct PinnedSlotsPermit {
     221              :     _permit: tokio::sync::OwnedSemaphorePermit,
     222              : }
     223              : 
     224              : ///
     225              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     226              : /// until the guard is dropped.
     227              : ///
     228              : pub struct PageReadGuard<'i> {
     229              :     _permit: Arc<PinnedSlotsPermit>,
     230              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     231              : }
     232              : 
     233              : impl std::ops::Deref for PageReadGuard<'_> {
     234              :     type Target = [u8; PAGE_SZ];
     235              : 
     236      1650546 :     fn deref(&self) -> &Self::Target {
     237      1650546 :         self.slot_guard.buf
     238      1650546 :     }
     239              : }
     240              : 
     241              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     242            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     243            0 :         self.slot_guard.buf
     244            0 :     }
     245              : }
     246              : 
     247              : ///
     248              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     249              : /// until the guard is dropped.
     250              : ///
     251              : /// Counterintuitively, this is used even for a read, if the requested page is not
     252              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     253              : /// is expected to fill in the page contents and call mark_valid().
     254              : pub struct PageWriteGuard<'i> {
     255              :     state: PageWriteGuardState<'i>,
     256              : }
     257              : 
     258              : enum PageWriteGuardState<'i> {
     259              :     Invalid {
     260              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     261              :         _permit: PinnedSlotsPermit,
     262              :     },
     263              :     Downgraded,
     264              : }
     265              : 
     266              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     267       144566 :     fn deref_mut(&mut self) -> &mut Self::Target {
     268       144566 :         match &mut self.state {
     269       144566 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     270            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     271              :         }
     272       144566 :     }
     273              : }
     274              : 
     275              : impl std::ops::Deref for PageWriteGuard<'_> {
     276              :     type Target = [u8; PAGE_SZ];
     277              : 
     278      1589980 :     fn deref(&self) -> &Self::Target {
     279      1589980 :         match &self.state {
     280      1589980 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
     281            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     282              :         }
     283      1589980 :     }
     284              : }
     285              : 
     286              : impl<'a> PageWriteGuard<'a> {
     287              :     /// Mark that the buffer contents are now valid.
     288              :     #[must_use]
     289        96350 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     290        96350 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     291        96350 :         match prev {
     292        96350 :             PageWriteGuardState::Invalid { inner, _permit } => {
     293        96350 :                 assert!(inner.key.is_some());
     294        96350 :                 PageReadGuard {
     295        96350 :                     _permit: Arc::new(_permit),
     296        96350 :                     slot_guard: inner.downgrade(),
     297        96350 :                 }
     298              :             }
     299            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     300              :         }
     301        96350 :     }
     302              : }
     303              : 
     304              : impl Drop for PageWriteGuard<'_> {
     305              :     ///
     306              :     /// If the buffer was allocated for a page that was not already in the
     307              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     308              :     /// initializing it, remove the mapping from the page cache.
     309              :     ///
     310        96350 :     fn drop(&mut self) {
     311        96350 :         match &mut self.state {
     312            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     313            0 :                 assert!(inner.key.is_some());
     314            0 :                 let self_key = inner.key.as_ref().unwrap();
     315            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     316            0 :                 inner.key = None;
     317              :             }
     318        96350 :             PageWriteGuardState::Downgraded => {}
     319              :         }
     320        96350 :     }
     321              : }
     322              : 
     323              : /// lock_for_read() return value
     324              : pub enum ReadBufResult<'a> {
     325              :     Found(PageReadGuard<'a>),
     326              :     NotFound(PageWriteGuard<'a>),
     327              : }
     328              : 
     329              : impl PageCache {
     330      1650306 :     pub async fn read_immutable_buf(
     331      1650306 :         &self,
     332      1650306 :         file_id: FileId,
     333      1650306 :         blkno: u32,
     334      1650306 :         ctx: &RequestContext,
     335      1650306 :     ) -> anyhow::Result<ReadBufResult> {
     336      1650306 :         self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
     337        14762 :             .await
     338      1650306 :     }
     339              : 
     340              :     //
     341              :     // Section 2: Internal interface functions for lookup/update.
     342              :     //
     343              :     // To add support for a new kind of "thing" to cache, you will need
     344              :     // to add public interface routines above, and code to deal with the
     345              :     // "mappings" after this section. But the routines in this section should
     346              :     // not require changes.
     347              : 
     348      1650306 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     349      1650306 :         match tokio::time::timeout(
     350      1650306 :             // Choose small timeout, neon_smgr does its own retries.
     351      1650306 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     352      1650306 :             Duration::from_secs(10),
     353      1650306 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     354      1650306 :         )
     355         6882 :         .await
     356              :         {
     357      1650306 :             Ok(res) => Ok(PinnedSlotsPermit {
     358      1650306 :                 _permit: res.expect("this semaphore is never closed"),
     359      1650306 :             }),
     360            0 :             Err(_timeout) => {
     361            0 :                 crate::metrics::page_cache_errors_inc(
     362            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     363            0 :                 );
     364            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     365              :             }
     366              :         }
     367      1650306 :     }
     368              : 
     369              :     /// Look up a page in the cache.
     370              :     ///
     371      1650306 :     async fn try_lock_for_read(
     372      1650306 :         &self,
     373      1650306 :         cache_key: &CacheKey,
     374      1650306 :         permit: &mut Option<PinnedSlotsPermit>,
     375      1650306 :     ) -> Option<PageReadGuard> {
     376      1650306 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     377              :             // The page was found in the mapping. Lock the slot, and re-check
     378              :             // that it's still what we expected (because we released the mapping
     379              :             // lock already, another thread could have evicted the page)
     380      1553956 :             let slot = &self.slots[slot_idx];
     381      1553956 :             let inner = slot.inner.read().await;
     382      1553956 :             if inner.key.as_ref() == Some(cache_key) {
     383      1553956 :                 slot.inc_usage_count();
     384      1553956 :                 return Some(PageReadGuard {
     385      1553956 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     386      1553956 :                     slot_guard: inner,
     387      1553956 :                 });
     388            0 :             }
     389        96350 :         }
     390        96350 :         None
     391      1650306 :     }
     392              : 
     393              :     /// Return a locked buffer for given block.
     394              :     ///
     395              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     396              :     /// page is already found in the cache, *cache_key is updated.
     397              :     ///
     398              :     /// If the page is not found in the cache, this allocates a new buffer for
     399              :     /// it. The caller may then initialize the buffer with the contents, and
     400              :     /// call mark_valid().
     401              :     ///
     402              :     /// Example usage:
     403              :     ///
     404              :     /// ```ignore
     405              :     /// let cache = page_cache::get();
     406              :     ///
     407              :     /// match cache.lock_for_read(&key) {
     408              :     ///     ReadBufResult::Found(read_guard) => {
     409              :     ///         // The page was found in cache. Use it
     410              :     ///     },
     411              :     ///     ReadBufResult::NotFound(write_guard) => {
     412              :     ///         // The page was not found in cache. Read it from disk into the
     413              :     ///         // buffer.
     414              :     ///         //read_my_page_from_disk(write_guard);
     415              :     ///
     416              :     ///         // The buffer contents are now valid. Tell the page cache.
     417              :     ///         write_guard.mark_valid();
     418              :     ///     },
     419              :     /// }
     420              :     /// ```
     421              :     ///
     422      1650306 :     async fn lock_for_read(
     423      1650306 :         &self,
     424      1650306 :         cache_key: &CacheKey,
     425      1650306 :         ctx: &RequestContext,
     426      1650306 :     ) -> anyhow::Result<ReadBufResult> {
     427      1650306 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     428              : 
     429      1650306 :         let (read_access, hit) = match cache_key {
     430      1650306 :             CacheKey::ImmutableFilePage { .. } => (
     431      1650306 :                 &crate::metrics::PAGE_CACHE
     432      1650306 :                     .for_ctx(ctx)
     433      1650306 :                     .read_accesses_immutable,
     434      1650306 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     435      1650306 :             ),
     436      1650306 :         };
     437      1650306 :         read_access.inc();
     438      1650306 : 
     439      1650306 :         let mut is_first_iteration = true;
     440              :         loop {
     441              :             // First check if the key already exists in the cache.
     442      1650306 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     443      1553956 :                 debug_assert!(permit.is_none());
     444      1553956 :                 if is_first_iteration {
     445      1553956 :                     hit.inc();
     446      1553956 :                 }
     447      1553956 :                 return Ok(ReadBufResult::Found(read_guard));
     448        96350 :             }
     449        96350 :             debug_assert!(permit.is_some());
     450        96350 :             is_first_iteration = false;
     451              : 
     452              :             // Not found. Find a victim buffer
     453        96350 :             let (slot_idx, mut inner) = self
     454        96350 :                 .find_victim(permit.as_ref().unwrap())
     455            0 :                 .await
     456        96350 :                 .context("Failed to find evict victim")?;
     457              : 
     458              :             // Insert mapping for this. At this point, we may find that another
     459              :             // thread did the same thing concurrently. In that case, we evicted
     460              :             // our victim buffer unnecessarily. Put it into the free list and
     461              :             // continue with the slot that the other thread chose.
     462        96350 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     463              :                 // TODO: put to free list
     464              : 
     465              :                 // We now just loop back to start from beginning. This is not
     466              :                 // optimal, we'll perform the lookup in the mapping again, which
     467              :                 // is not really necessary because we already got
     468              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     469              :                 // to matter much.
     470            0 :                 continue;
     471        96350 :             }
     472        96350 : 
     473        96350 :             // Make the slot ready
     474        96350 :             let slot = &self.slots[slot_idx];
     475        96350 :             inner.key = Some(cache_key.clone());
     476        96350 :             slot.set_usage_count(1);
     477        96350 : 
     478        96350 :             debug_assert!(
     479              :                 {
     480        96350 :                     let guard = inner.permit.lock().unwrap();
     481        96350 :                     guard.upgrade().is_none()
     482              :                 },
     483            0 :                 "we hold a write lock, so, no one else should have a permit"
     484              :             );
     485              : 
     486        96350 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     487        96350 :                 state: PageWriteGuardState::Invalid {
     488        96350 :                     _permit: permit.take().unwrap(),
     489        96350 :                     inner,
     490        96350 :                 },
     491        96350 :             }));
     492              :         }
     493      1650306 :     }
     494              : 
     495              :     //
     496              :     // Section 3: Mapping functions
     497              :     //
     498              : 
     499              :     /// Search for a page in the cache using the given search key.
     500              :     ///
     501              :     /// Returns the slot index, if any.
     502              :     ///
     503              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     504              :     /// get recycled for an unrelated page immediately after this function
     505              :     /// returns.  The caller is responsible for re-checking that the slot still
     506              :     /// contains the page with the same key before using it.
     507              :     ///
     508      1650306 :     fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
     509      1650306 :         match cache_key {
     510      1650306 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     511      1650306 :                 let map = self.immutable_page_map.read().unwrap();
     512      1650306 :                 Some(*map.get(&(*file_id, *blkno))?)
     513              :             }
     514              :         }
     515      1650306 :     }
     516              : 
     517              :     ///
     518              :     /// Remove mapping for given key.
     519              :     ///
     520        91910 :     fn remove_mapping(&self, old_key: &CacheKey) {
     521        91910 :         match old_key {
     522        91910 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     523        91910 :                 let mut map = self.immutable_page_map.write().unwrap();
     524        91910 :                 map.remove(&(*file_id, *blkno))
     525        91910 :                     .expect("could not find old key in mapping");
     526        91910 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     527        91910 :             }
     528        91910 :         }
     529        91910 :     }
     530              : 
     531              :     ///
     532              :     /// Insert mapping for given key.
     533              :     ///
     534              :     /// If a mapping already existed for the given key, returns the slot index
     535              :     /// of the existing mapping and leaves it untouched.
     536        96350 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     537        96350 :         match new_key {
     538        96350 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     539        96350 :                 let mut map = self.immutable_page_map.write().unwrap();
     540        96350 :                 match map.entry((*file_id, *blkno)) {
     541            0 :                     Entry::Occupied(entry) => Some(*entry.get()),
     542        96350 :                     Entry::Vacant(entry) => {
     543        96350 :                         entry.insert(slot_idx);
     544        96350 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     545        96350 :                         None
     546              :                     }
     547              :                 }
     548              :             }
     549              :         }
     550        96350 :     }
     551              : 
     552              :     //
     553              :     // Section 4: Misc internal helpers
     554              :     //
     555              : 
     556              :     /// Find a slot to evict.
     557              :     ///
     558              :     /// On return, the slot is empty and write-locked.
     559        96350 :     async fn find_victim(
     560        96350 :         &self,
     561        96350 :         _permit_witness: &PinnedSlotsPermit,
     562        96350 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     563        96350 :         let iter_limit = self.slots.len() * 10;
     564        96350 :         let mut iters = 0;
     565              :         loop {
     566       257261 :             iters += 1;
     567       257261 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     568       257261 : 
     569       257261 :             let slot = &self.slots[slot_idx];
     570       257261 : 
     571       257261 :             if slot.dec_usage_count() == 0 {
     572        96350 :                 let mut inner = match slot.inner.try_write() {
     573        96350 :                     Ok(inner) => inner,
     574            0 :                     Err(_err) => {
     575            0 :                         if iters > iter_limit {
     576              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     577              :                             // any particular number of iterations: other threads might race ahead and acquire and
     578              :                             // release pins just as we're scanning the array.
     579              :                             //
     580              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     581              :                             // slots. There are two threads running concurrently, A and B. A has just
     582              :                             // acquired the permit from the semaphore.
     583              :                             //
     584              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     585              :                             //   B: Acquire permit.
     586              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     587              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     588              :                             //   B: Release pin and permit again
     589              :                             //   B: Acquire permit.
     590              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     591              :                             //   B: Release pin and permit again
     592              :                             //
     593              :                             // Now we're back in the starting situation that both slots have
     594              :                             // usage_count 1, but A has now been through one iteration of the
     595              :                             // find_victim() loop. This can repeat indefinitely and on each
     596              :                             // iteration, A's iteration count increases by one.
     597              :                             //
     598              :                             // So, even though the semaphore for the permits is fair, the victim search
     599              :                             // itself happens in parallel and is not fair.
     600              :                             // Hence even with a permit, a task can theoretically be starved.
     601              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     602              :                             // permits for longer.
     603              :                             // Note that just yielding to tokio during iteration without such
     604              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     605              :                             // for B to bump usage count, further starving A.
     606            0 :                             page_cache_eviction_metrics::observe(
     607            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     608            0 :                                     iters: iters.try_into().unwrap(),
     609            0 :                                 },
     610            0 :                             );
     611            0 :                             anyhow::bail!("exceeded evict iter limit");
     612            0 :                         }
     613            0 :                         continue;
     614              :                     }
     615              :                 };
     616        96350 :                 if let Some(old_key) = &inner.key {
     617        91910 :                     // remove mapping for old buffer
     618        91910 :                     self.remove_mapping(old_key);
     619        91910 :                     inner.key = None;
     620        91910 :                     page_cache_eviction_metrics::observe(
     621        91910 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     622        91910 :                             iters: iters.try_into().unwrap(),
     623        91910 :                         },
     624        91910 :                     );
     625        91910 :                 } else {
     626         4440 :                     page_cache_eviction_metrics::observe(
     627         4440 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     628         4440 :                             iters: iters.try_into().unwrap(),
     629         4440 :                         },
     630         4440 :                     );
     631         4440 :                 }
     632        96350 :                 return Ok((slot_idx, inner));
     633       160911 :             }
     634              :         }
     635        96350 :     }
     636              : 
     637              :     /// Initialize a new page cache
     638              :     ///
     639              :     /// This should be called only once at page server startup.
     640          258 :     fn new(num_pages: usize) -> Self {
     641          258 :         assert!(num_pages > 0, "page cache size must be > 0");
     642              : 
     643              :         // We could use Vec::leak here, but that potentially also leaks
     644              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     645              :         // this is avoided.
     646          258 :         let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
     647          258 : 
     648          258 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     649          258 :         size_metrics.max_bytes.set_page_sz(num_pages);
     650          258 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     651          258 : 
     652          258 :         let slots = page_buffer
     653          258 :             .chunks_exact_mut(PAGE_SZ)
     654        12900 :             .map(|chunk| {
     655        12900 :                 let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
     656        12900 : 
     657        12900 :                 Slot {
     658        12900 :                     inner: tokio::sync::RwLock::new(SlotInner {
     659        12900 :                         key: None,
     660        12900 :                         buf,
     661        12900 :                         permit: std::sync::Mutex::new(Weak::new()),
     662        12900 :                     }),
     663        12900 :                     usage_count: AtomicU8::new(0),
     664        12900 :                 }
     665        12900 :             })
     666          258 :             .collect();
     667          258 : 
     668          258 :         Self {
     669          258 :             immutable_page_map: Default::default(),
     670          258 :             slots,
     671          258 :             next_evict_slot: AtomicUsize::new(0),
     672          258 :             size_metrics,
     673          258 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     674          258 :         }
     675          258 :     }
     676              : }
     677              : 
     678              : trait PageSzBytesMetric {
     679              :     fn set_page_sz(&self, count: usize);
     680              :     fn add_page_sz(&self, count: usize);
     681              :     fn sub_page_sz(&self, count: usize);
     682              : }
     683              : 
     684              : #[inline(always)]
     685       188776 : fn count_times_page_sz(count: usize) -> u64 {
     686       188776 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     687       188776 : }
     688              : 
     689              : impl PageSzBytesMetric for metrics::UIntGauge {
     690          516 :     fn set_page_sz(&self, count: usize) {
     691          516 :         self.set(count_times_page_sz(count));
     692          516 :     }
     693        96350 :     fn add_page_sz(&self, count: usize) {
     694        96350 :         self.add(count_times_page_sz(count));
     695        96350 :     }
     696        91910 :     fn sub_page_sz(&self, count: usize) {
     697        91910 :         self.sub(count_times_page_sz(count));
     698        91910 :     }
     699              : }
        

Generated by: LCOV version 2.1-beta