LCOV - code coverage report
Current view: top level - pageserver/src - page_cache.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 86.6 % 298 258
Test Date: 2024-11-13 18:23:39 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //!
       2              : //! Global page cache
       3              : //!
       4              : //! The page cache uses up most of the memory in the page server. It is shared
       5              : //! by all tenants, and it is used to store different kinds of pages. Sharing
       6              : //! the cache allows memory to be dynamically allocated where it's needed the
       7              : //! most.
       8              : //!
       9              : //! The page cache consists of fixed-size buffers, 8 kB each to match the
      10              : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
      11              : //! information about what's stored in the buffer.
      12              : //!
      13              : //! # Types Of Pages
      14              : //!
      15              : //! [`PageCache`] only supports immutable pages.
      16              : //! Hence there is no need to worry about coherency.
      17              : //!
      18              : //! Two types of pages are supported:
      19              : //!
      20              : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
      21              : //!
      22              : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
      23              : //! It uses the page cache only for the blocks that are already fully written and immutable.
      24              : //!
      25              : //! # Filling The Page Cache
      26              : //!
      27              : //! Page cache maps from a cache key to a buffer slot.
      28              : //! The cache key uniquely identifies the piece of data that is being cached.
      29              : //!
      30              : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
      31              : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
      32              : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
      33              : //! * Get a [`FileId`] using [`next_file_id`].
      34              : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
      35              : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
      36              : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
      37              : //!   a read guard for the page. Just use it.
      38              : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
      39              : //!   a write guard for the page. Fill the page with the contents of the on-disk file.
      40              : //!   Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
      41              : //!   Then try again to [`PageCache::read_immutable_buf`].
      42              : //!   Unless there's high cache pressure, the page should now be cached.
      43              : //!   (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
      44              : //!
      45              : //! # Locking
      46              : //!
      47              : //! There are two levels of locking involved: There's one lock for the "mapping"
      48              : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
      49              : //! slot, and a separate lock on each slot. To read or write the contents of a
      50              : //! slot, you must hold the lock on the slot in read or write mode,
      51              : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
      52              : //! assign a buffer for a page, you must hold the mapping lock and the lock on
      53              : //! the slot at the same time.
      54              : //!
      55              : //! Whenever you need to hold both locks simultaneously, the slot lock must be
      56              : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
      57              : //! in the cache, you would first look up the mapping, while holding the mapping
      58              : //! lock, and then lock the slot. You must release the mapping lock in between,
      59              : //! to obey the lock ordering and avoid deadlock.
      60              : //!
      61              : //! A slot can momentarily have invalid contents, even if it's already been
      62              : //! inserted to the mapping, but you must hold the write-lock on the slot until
      63              : //! the contents are valid. If you need to release the lock without initializing
      64              : //! the contents, you must remove the mapping first. We make that easy for the
      65              : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
      66              : //! initialized it. If the guard is dropped without calling mark_valid(), the
      67              : //! mapping is automatically removed and the slot is marked free.
      68              : //!
      69              : 
      70              : use std::{
      71              :     collections::{hash_map::Entry, HashMap},
      72              :     sync::{
      73              :         atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
      74              :         Arc, Weak,
      75              :     },
      76              :     time::Duration,
      77              : };
      78              : 
      79              : use anyhow::Context;
      80              : use once_cell::sync::OnceCell;
      81              : 
      82              : use crate::{
      83              :     context::RequestContext,
      84              :     metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
      85              :     virtual_file::{IoBufferMut, IoPageSlice},
      86              : };
      87              : 
      88              : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
      89              : const TEST_PAGE_CACHE_SIZE: usize = 50;
      90              : 
      91              : ///
      92              : /// Initialize the page cache. This must be called once at page server startup.
      93              : ///
      94            0 : pub fn init(size: usize) {
      95            0 :     if PAGE_CACHE.set(PageCache::new(size)).is_err() {
      96            0 :         panic!("page cache already initialized");
      97            0 :     }
      98            0 : }
      99              : 
     100              : ///
     101              : /// Get a handle to the page cache.
     102              : ///
     103       485594 : pub fn get() -> &'static PageCache {
     104       485594 :     //
     105       485594 :     // In unit tests, page server startup doesn't happen and no one calls
     106       485594 :     // page_cache::init(). Initialize it here with a tiny cache, so that the
     107       485594 :     // page cache is usable in unit tests.
     108       485594 :     //
     109       485594 :     if cfg!(test) {
     110       485594 :         PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
     111              :     } else {
     112            0 :         PAGE_CACHE.get().expect("page cache not initialized")
     113              :     }
     114       485594 : }
     115              : 
     116              : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
     117              : const MAX_USAGE_COUNT: u8 = 5;
     118              : 
     119              : /// See module-level comment.
     120              : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
     121              : pub struct FileId(u64);
     122              : 
     123              : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
     124              : 
     125              : /// See module-level comment.
     126         2464 : pub fn next_file_id() -> FileId {
     127         2464 :     FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
     128         2464 : }
     129              : 
     130              : ///
     131              : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
     132              : ///
     133              : #[derive(Debug, PartialEq, Eq, Clone)]
     134              : #[allow(clippy::enum_variant_names)]
     135              : enum CacheKey {
     136              :     ImmutableFilePage { file_id: FileId, blkno: u32 },
     137              : }
     138              : 
     139              : struct Slot {
     140              :     inner: tokio::sync::RwLock<SlotInner>,
     141              :     usage_count: AtomicU8,
     142              : }
     143              : 
     144              : struct SlotInner {
     145              :     key: Option<CacheKey>,
     146              :     // for `coalesce_readers_permit`
     147              :     permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
     148              :     buf: IoPageSlice<'static>,
     149              : }
     150              : 
     151              : impl Slot {
     152              :     /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
     153       453545 :     fn inc_usage_count(&self) {
     154       453545 :         let _ = self
     155       453545 :             .usage_count
     156       453545 :             .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     157       453545 :                 if val == MAX_USAGE_COUNT {
     158       429832 :                     None
     159              :                 } else {
     160        23713 :                     Some(val + 1)
     161              :                 }
     162       453545 :             });
     163       453545 :     }
     164              : 
     165              :     /// Decrement usage count on the buffer, unless it's already zero.  Returns
     166              :     /// the old usage count.
     167        85601 :     fn dec_usage_count(&self) -> u8 {
     168        85601 :         let count_res =
     169        85601 :             self.usage_count
     170        85601 :                 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
     171        85601 :                     if val == 0 {
     172        32049 :                         None
     173              :                     } else {
     174        53552 :                         Some(val - 1)
     175              :                     }
     176        85601 :                 });
     177        85601 : 
     178        85601 :         match count_res {
     179        53552 :             Ok(usage_count) => usage_count,
     180        32049 :             Err(usage_count) => usage_count,
     181              :         }
     182        85601 :     }
     183              : 
     184              :     /// Sets the usage count to a specific value.
     185        32049 :     fn set_usage_count(&self, count: u8) {
     186        32049 :         self.usage_count.store(count, Ordering::Relaxed);
     187        32049 :     }
     188              : }
     189              : 
     190              : impl SlotInner {
     191              :     /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
     192       453545 :     fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
     193       453545 :         let mut guard = self.permit.lock().unwrap();
     194       453545 :         if let Some(existing_permit) = guard.upgrade() {
     195            0 :             drop(guard);
     196            0 :             drop(permit);
     197            0 :             existing_permit
     198              :         } else {
     199       453545 :             let permit = Arc::new(permit);
     200       453545 :             *guard = Arc::downgrade(&permit);
     201       453545 :             permit
     202              :         }
     203       453545 :     }
     204              : }
     205              : 
     206              : pub struct PageCache {
     207              :     immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
     208              : 
     209              :     /// The actual buffers with their metadata.
     210              :     slots: Box<[Slot]>,
     211              : 
     212              :     pinned_slots: Arc<tokio::sync::Semaphore>,
     213              : 
     214              :     /// Index of the next candidate to evict, for the Clock replacement algorithm.
     215              :     /// This is interpreted modulo the page cache size.
     216              :     next_evict_slot: AtomicUsize,
     217              : 
     218              :     size_metrics: &'static PageCacheSizeMetrics,
     219              : }
     220              : 
     221              : struct PinnedSlotsPermit {
     222              :     _permit: tokio::sync::OwnedSemaphorePermit,
     223              : }
     224              : 
     225              : ///
     226              : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
     227              : /// until the guard is dropped.
     228              : ///
     229              : pub struct PageReadGuard<'i> {
     230              :     _permit: Arc<PinnedSlotsPermit>,
     231              :     slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
     232              : }
     233              : 
     234              : impl std::ops::Deref for PageReadGuard<'_> {
     235              :     type Target = [u8; PAGE_SZ];
     236              : 
     237       485674 :     fn deref(&self) -> &Self::Target {
     238       485674 :         self.slot_guard.buf.deref()
     239       485674 :     }
     240              : }
     241              : 
     242              : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
     243            0 :     fn as_ref(&self) -> &[u8; PAGE_SZ] {
     244            0 :         self.slot_guard.buf.as_ref()
     245            0 :     }
     246              : }
     247              : 
     248              : ///
     249              : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
     250              : /// until the guard is dropped.
     251              : ///
     252              : /// Counterintuitively, this is used even for a read, if the requested page is not
     253              : /// currently found in the page cache. In that case, the caller of lock_for_read()
     254              : /// is expected to fill in the page contents and call mark_valid().
     255              : pub struct PageWriteGuard<'i> {
     256              :     state: PageWriteGuardState<'i>,
     257              : }
     258              : 
     259              : enum PageWriteGuardState<'i> {
     260              :     Invalid {
     261              :         inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
     262              :         _permit: PinnedSlotsPermit,
     263              :     },
     264              :     Downgraded,
     265              : }
     266              : 
     267              : impl std::ops::DerefMut for PageWriteGuard<'_> {
     268        48089 :     fn deref_mut(&mut self) -> &mut Self::Target {
     269        48089 :         match &mut self.state {
     270        48089 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
     271            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     272              :         }
     273        48089 :     }
     274              : }
     275              : 
     276              : impl std::ops::Deref for PageWriteGuard<'_> {
     277              :     type Target = [u8; PAGE_SZ];
     278              : 
     279       528886 :     fn deref(&self) -> &Self::Target {
     280       528886 :         match &self.state {
     281       528886 :             PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
     282            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     283              :         }
     284       528886 :     }
     285              : }
     286              : 
     287              : impl<'a> PageWriteGuard<'a> {
     288              :     /// Mark that the buffer contents are now valid.
     289              :     #[must_use]
     290        32049 :     pub fn mark_valid(mut self) -> PageReadGuard<'a> {
     291        32049 :         let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
     292        32049 :         match prev {
     293        32049 :             PageWriteGuardState::Invalid { inner, _permit } => {
     294        32049 :                 assert!(inner.key.is_some());
     295        32049 :                 PageReadGuard {
     296        32049 :                     _permit: Arc::new(_permit),
     297        32049 :                     slot_guard: inner.downgrade(),
     298        32049 :                 }
     299              :             }
     300            0 :             PageWriteGuardState::Downgraded => unreachable!(),
     301              :         }
     302        32049 :     }
     303              : }
     304              : 
     305              : impl Drop for PageWriteGuard<'_> {
     306              :     ///
     307              :     /// If the buffer was allocated for a page that was not already in the
     308              :     /// cache, but the lock_for_read/write() caller dropped the buffer without
     309              :     /// initializing it, remove the mapping from the page cache.
     310              :     ///
     311        32049 :     fn drop(&mut self) {
     312        32049 :         match &mut self.state {
     313            0 :             PageWriteGuardState::Invalid { inner, _permit } => {
     314            0 :                 assert!(inner.key.is_some());
     315            0 :                 let self_key = inner.key.as_ref().unwrap();
     316            0 :                 PAGE_CACHE.get().unwrap().remove_mapping(self_key);
     317            0 :                 inner.key = None;
     318              :             }
     319        32049 :             PageWriteGuardState::Downgraded => {}
     320              :         }
     321        32049 :     }
     322              : }
     323              : 
     324              : /// lock_for_read() return value
     325              : pub enum ReadBufResult<'a> {
     326              :     Found(PageReadGuard<'a>),
     327              :     NotFound(PageWriteGuard<'a>),
     328              : }
     329              : 
     330              : impl PageCache {
     331       485594 :     pub async fn read_immutable_buf(
     332       485594 :         &self,
     333       485594 :         file_id: FileId,
     334       485594 :         blkno: u32,
     335       485594 :         ctx: &RequestContext,
     336       485594 :     ) -> anyhow::Result<ReadBufResult> {
     337       485594 :         self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
     338         3636 :             .await
     339       485594 :     }
     340              : 
     341              :     //
     342              :     // Section 2: Internal interface functions for lookup/update.
     343              :     //
     344              :     // To add support for a new kind of "thing" to cache, you will need
     345              :     // to add public interface routines above, and code to deal with the
     346              :     // "mappings" after this section. But the routines in this section should
     347              :     // not require changes.
     348              : 
     349       485594 :     async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
     350       485594 :         match tokio::time::timeout(
     351       485594 :             // Choose small timeout, neon_smgr does its own retries.
     352       485594 :             // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
     353       485594 :             Duration::from_secs(10),
     354       485594 :             Arc::clone(&self.pinned_slots).acquire_owned(),
     355       485594 :         )
     356         1801 :         .await
     357              :         {
     358       485594 :             Ok(res) => Ok(PinnedSlotsPermit {
     359       485594 :                 _permit: res.expect("this semaphore is never closed"),
     360       485594 :             }),
     361            0 :             Err(_timeout) => {
     362            0 :                 crate::metrics::page_cache_errors_inc(
     363            0 :                     crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
     364            0 :                 );
     365            0 :                 anyhow::bail!("timeout: there were page guards alive for all page cache slots")
     366              :             }
     367              :         }
     368       485594 :     }
     369              : 
     370              :     /// Look up a page in the cache.
     371              :     ///
     372       485594 :     async fn try_lock_for_read(
     373       485594 :         &self,
     374       485594 :         cache_key: &CacheKey,
     375       485594 :         permit: &mut Option<PinnedSlotsPermit>,
     376       485594 :     ) -> Option<PageReadGuard> {
     377       485594 :         if let Some(slot_idx) = self.search_mapping(cache_key) {
     378              :             // The page was found in the mapping. Lock the slot, and re-check
     379              :             // that it's still what we expected (because we released the mapping
     380              :             // lock already, another thread could have evicted the page)
     381       453545 :             let slot = &self.slots[slot_idx];
     382       453545 :             let inner = slot.inner.read().await;
     383       453545 :             if inner.key.as_ref() == Some(cache_key) {
     384       453545 :                 slot.inc_usage_count();
     385       453545 :                 return Some(PageReadGuard {
     386       453545 :                     _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
     387       453545 :                     slot_guard: inner,
     388       453545 :                 });
     389            0 :             }
     390        32049 :         }
     391        32049 :         None
     392       485594 :     }
     393              : 
     394              :     /// Return a locked buffer for given block.
     395              :     ///
     396              :     /// Like try_lock_for_read(), if the search criteria is not exact and the
     397              :     /// page is already found in the cache, *cache_key is updated.
     398              :     ///
     399              :     /// If the page is not found in the cache, this allocates a new buffer for
     400              :     /// it. The caller may then initialize the buffer with the contents, and
     401              :     /// call mark_valid().
     402              :     ///
     403              :     /// Example usage:
     404              :     ///
     405              :     /// ```ignore
     406              :     /// let cache = page_cache::get();
     407              :     ///
     408              :     /// match cache.lock_for_read(&key) {
     409              :     ///     ReadBufResult::Found(read_guard) => {
     410              :     ///         // The page was found in cache. Use it
     411              :     ///     },
     412              :     ///     ReadBufResult::NotFound(write_guard) => {
     413              :     ///         // The page was not found in cache. Read it from disk into the
     414              :     ///         // buffer.
     415              :     ///         //read_my_page_from_disk(write_guard);
     416              :     ///
     417              :     ///         // The buffer contents are now valid. Tell the page cache.
     418              :     ///         write_guard.mark_valid();
     419              :     ///     },
     420              :     /// }
     421              :     /// ```
     422              :     ///
     423       485594 :     async fn lock_for_read(
     424       485594 :         &self,
     425       485594 :         cache_key: &CacheKey,
     426       485594 :         ctx: &RequestContext,
     427       485594 :     ) -> anyhow::Result<ReadBufResult> {
     428       485594 :         let mut permit = Some(self.try_get_pinned_slot_permit().await?);
     429              : 
     430       485594 :         let (read_access, hit) = match cache_key {
     431       485594 :             CacheKey::ImmutableFilePage { .. } => (
     432       485594 :                 &crate::metrics::PAGE_CACHE
     433       485594 :                     .for_ctx(ctx)
     434       485594 :                     .read_accesses_immutable,
     435       485594 :                 &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
     436       485594 :             ),
     437       485594 :         };
     438       485594 :         read_access.inc();
     439       485594 : 
     440       485594 :         let mut is_first_iteration = true;
     441              :         loop {
     442              :             // First check if the key already exists in the cache.
     443       485594 :             if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
     444       453545 :                 debug_assert!(permit.is_none());
     445       453545 :                 if is_first_iteration {
     446       453545 :                     hit.inc();
     447       453545 :                 }
     448       453545 :                 return Ok(ReadBufResult::Found(read_guard));
     449        32049 :             }
     450        32049 :             debug_assert!(permit.is_some());
     451        32049 :             is_first_iteration = false;
     452              : 
     453              :             // Not found. Find a victim buffer
     454        32049 :             let (slot_idx, mut inner) = self
     455        32049 :                 .find_victim(permit.as_ref().unwrap())
     456            0 :                 .await
     457        32049 :                 .context("Failed to find evict victim")?;
     458              : 
     459              :             // Insert mapping for this. At this point, we may find that another
     460              :             // thread did the same thing concurrently. In that case, we evicted
     461              :             // our victim buffer unnecessarily. Put it into the free list and
     462              :             // continue with the slot that the other thread chose.
     463        32049 :             if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
     464              :                 // TODO: put to free list
     465              : 
     466              :                 // We now just loop back to start from beginning. This is not
     467              :                 // optimal, we'll perform the lookup in the mapping again, which
     468              :                 // is not really necessary because we already got
     469              :                 // 'existing_slot_idx'.  But this shouldn't happen often enough
     470              :                 // to matter much.
     471            0 :                 continue;
     472        32049 :             }
     473        32049 : 
     474        32049 :             // Make the slot ready
     475        32049 :             let slot = &self.slots[slot_idx];
     476        32049 :             inner.key = Some(cache_key.clone());
     477        32049 :             slot.set_usage_count(1);
     478        32049 : 
     479        32049 :             debug_assert!(
     480              :                 {
     481        32049 :                     let guard = inner.permit.lock().unwrap();
     482        32049 :                     guard.upgrade().is_none()
     483              :                 },
     484            0 :                 "we hold a write lock, so, no one else should have a permit"
     485              :             );
     486              : 
     487        32049 :             return Ok(ReadBufResult::NotFound(PageWriteGuard {
     488        32049 :                 state: PageWriteGuardState::Invalid {
     489        32049 :                     _permit: permit.take().unwrap(),
     490        32049 :                     inner,
     491        32049 :                 },
     492        32049 :             }));
     493              :         }
     494       485594 :     }
     495              : 
     496              :     //
     497              :     // Section 3: Mapping functions
     498              :     //
     499              : 
     500              :     /// Search for a page in the cache using the given search key.
     501              :     ///
     502              :     /// Returns the slot index, if any.
     503              :     ///
     504              :     /// NOTE: We don't hold any lock on the mapping on return, so the slot might
     505              :     /// get recycled for an unrelated page immediately after this function
     506              :     /// returns.  The caller is responsible for re-checking that the slot still
     507              :     /// contains the page with the same key before using it.
     508              :     ///
     509       485594 :     fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
     510       485594 :         match cache_key {
     511       485594 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     512       485594 :                 let map = self.immutable_page_map.read().unwrap();
     513       485594 :                 Some(*map.get(&(*file_id, *blkno))?)
     514              :             }
     515              :         }
     516       485594 :     }
     517              : 
     518              :     ///
     519              :     /// Remove mapping for given key.
     520              :     ///
     521        30489 :     fn remove_mapping(&self, old_key: &CacheKey) {
     522        30489 :         match old_key {
     523        30489 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     524        30489 :                 let mut map = self.immutable_page_map.write().unwrap();
     525        30489 :                 map.remove(&(*file_id, *blkno))
     526        30489 :                     .expect("could not find old key in mapping");
     527        30489 :                 self.size_metrics.current_bytes_immutable.sub_page_sz(1);
     528        30489 :             }
     529        30489 :         }
     530        30489 :     }
     531              : 
     532              :     ///
     533              :     /// Insert mapping for given key.
     534              :     ///
     535              :     /// If a mapping already existed for the given key, returns the slot index
     536              :     /// of the existing mapping and leaves it untouched.
     537        32049 :     fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
     538        32049 :         match new_key {
     539        32049 :             CacheKey::ImmutableFilePage { file_id, blkno } => {
     540        32049 :                 let mut map = self.immutable_page_map.write().unwrap();
     541        32049 :                 match map.entry((*file_id, *blkno)) {
     542            0 :                     Entry::Occupied(entry) => Some(*entry.get()),
     543        32049 :                     Entry::Vacant(entry) => {
     544        32049 :                         entry.insert(slot_idx);
     545        32049 :                         self.size_metrics.current_bytes_immutable.add_page_sz(1);
     546        32049 :                         None
     547              :                     }
     548              :                 }
     549              :             }
     550              :         }
     551        32049 :     }
     552              : 
     553              :     //
     554              :     // Section 4: Misc internal helpers
     555              :     //
     556              : 
     557              :     /// Find a slot to evict.
     558              :     ///
     559              :     /// On return, the slot is empty and write-locked.
     560        32049 :     async fn find_victim(
     561        32049 :         &self,
     562        32049 :         _permit_witness: &PinnedSlotsPermit,
     563        32049 :     ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
     564        32049 :         let iter_limit = self.slots.len() * 10;
     565        32049 :         let mut iters = 0;
     566              :         loop {
     567        85601 :             iters += 1;
     568        85601 :             let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
     569        85601 : 
     570        85601 :             let slot = &self.slots[slot_idx];
     571        85601 : 
     572        85601 :             if slot.dec_usage_count() == 0 {
     573        32049 :                 let mut inner = match slot.inner.try_write() {
     574        32049 :                     Ok(inner) => inner,
     575            0 :                     Err(_err) => {
     576            0 :                         if iters > iter_limit {
     577              :                             // NB: Even with the permits, there's no hard guarantee that we will find a slot with
     578              :                             // any particular number of iterations: other threads might race ahead and acquire and
     579              :                             // release pins just as we're scanning the array.
     580              :                             //
     581              :                             // Imagine that nslots is 2, and as starting point, usage_count==1 on all
     582              :                             // slots. There are two threads running concurrently, A and B. A has just
     583              :                             // acquired the permit from the semaphore.
     584              :                             //
     585              :                             //   A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
     586              :                             //   B: Acquire permit.
     587              :                             //   B: Look at slot 2, decrement its usage_count to zero and continue the search
     588              :                             //   B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     589              :                             //   B: Release pin and permit again
     590              :                             //   B: Acquire permit.
     591              :                             //   B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
     592              :                             //   B: Release pin and permit again
     593              :                             //
     594              :                             // Now we're back in the starting situation that both slots have
     595              :                             // usage_count 1, but A has now been through one iteration of the
     596              :                             // find_victim() loop. This can repeat indefinitely and on each
     597              :                             // iteration, A's iteration count increases by one.
     598              :                             //
     599              :                             // So, even though the semaphore for the permits is fair, the victim search
     600              :                             // itself happens in parallel and is not fair.
     601              :                             // Hence even with a permit, a task can theoretically be starved.
     602              :                             // To avoid this, we'd need tokio to give priority to tasks that are holding
     603              :                             // permits for longer.
     604              :                             // Note that just yielding to tokio during iteration without such
     605              :                             // priority boosting is likely counter-productive. We'd just give more opportunities
     606              :                             // for B to bump usage count, further starving A.
     607            0 :                             page_cache_eviction_metrics::observe(
     608            0 :                                 page_cache_eviction_metrics::Outcome::ItersExceeded {
     609            0 :                                     iters: iters.try_into().unwrap(),
     610            0 :                                 },
     611            0 :                             );
     612            0 :                             anyhow::bail!("exceeded evict iter limit");
     613            0 :                         }
     614            0 :                         continue;
     615              :                     }
     616              :                 };
     617        32049 :                 if let Some(old_key) = &inner.key {
     618        30489 :                     // remove mapping for old buffer
     619        30489 :                     self.remove_mapping(old_key);
     620        30489 :                     inner.key = None;
     621        30489 :                     page_cache_eviction_metrics::observe(
     622        30489 :                         page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
     623        30489 :                             iters: iters.try_into().unwrap(),
     624        30489 :                         },
     625        30489 :                     );
     626        30489 :                 } else {
     627         1560 :                     page_cache_eviction_metrics::observe(
     628         1560 :                         page_cache_eviction_metrics::Outcome::FoundSlotUnused {
     629         1560 :                             iters: iters.try_into().unwrap(),
     630         1560 :                         },
     631         1560 :                     );
     632         1560 :                 }
     633        32049 :                 return Ok((slot_idx, inner));
     634        53552 :             }
     635              :         }
     636        32049 :     }
     637              : 
     638              :     /// Initialize a new page cache
     639              :     ///
     640              :     /// This should be called only once at page server startup.
     641           88 :     fn new(num_pages: usize) -> Self {
     642           88 :         assert!(num_pages > 0, "page cache size must be > 0");
     643              : 
     644              :         // We could use Vec::leak here, but that potentially also leaks
     645              :         // uninitialized reserved capacity. With into_boxed_slice and Box::leak
     646              :         // this is avoided.
     647           88 :         let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
     648           88 : 
     649           88 :         let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
     650           88 :         size_metrics.max_bytes.set_page_sz(num_pages);
     651           88 :         size_metrics.current_bytes_immutable.set_page_sz(0);
     652           88 : 
     653           88 :         let slots = page_buffer
     654           88 :             .chunks_exact_mut(PAGE_SZ)
     655         4400 :             .map(|chunk| {
     656         4400 :                 // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
     657         4400 :                 let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
     658         4400 : 
     659         4400 :                 Slot {
     660         4400 :                     inner: tokio::sync::RwLock::new(SlotInner {
     661         4400 :                         key: None,
     662         4400 :                         buf,
     663         4400 :                         permit: std::sync::Mutex::new(Weak::new()),
     664         4400 :                     }),
     665         4400 :                     usage_count: AtomicU8::new(0),
     666         4400 :                 }
     667         4400 :             })
     668           88 :             .collect();
     669           88 : 
     670           88 :         Self {
     671           88 :             immutable_page_map: Default::default(),
     672           88 :             slots,
     673           88 :             next_evict_slot: AtomicUsize::new(0),
     674           88 :             size_metrics,
     675           88 :             pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
     676           88 :         }
     677           88 :     }
     678              : }
     679              : 
     680              : trait PageSzBytesMetric {
     681              :     fn set_page_sz(&self, count: usize);
     682              :     fn add_page_sz(&self, count: usize);
     683              :     fn sub_page_sz(&self, count: usize);
     684              : }
     685              : 
     686              : #[inline(always)]
     687        62714 : fn count_times_page_sz(count: usize) -> u64 {
     688        62714 :     u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
     689        62714 : }
     690              : 
     691              : impl PageSzBytesMetric for metrics::UIntGauge {
     692          176 :     fn set_page_sz(&self, count: usize) {
     693          176 :         self.set(count_times_page_sz(count));
     694          176 :     }
     695        32049 :     fn add_page_sz(&self, count: usize) {
     696        32049 :         self.add(count_times_page_sz(count));
     697        32049 :     }
     698        30489 :     fn sub_page_sz(&self, count: usize) {
     699        30489 :         self.sub(count_times_page_sz(count));
     700        30489 :     }
     701              : }
        

Generated by: LCOV version 2.1-beta