Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
21 : //!
22 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
23 : //! It uses the page cache only for the blocks that are already fully written and immutable.
24 : //!
25 : //! # Filling The Page Cache
26 : //!
27 : //! Page cache maps from a cache key to a buffer slot.
28 : //! The cache key uniquely identifies the piece of data that is being cached.
29 : //!
30 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
31 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
32 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
33 : //! * Get a [`FileId`] using [`next_file_id`].
34 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
35 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
36 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
37 : //! a read guard for the page. Just use it.
38 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
39 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
40 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
41 : //! Then try again to [`PageCache::read_immutable_buf`].
42 : //! Unless there's high cache pressure, the page should now be cached.
43 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
44 : //!
45 : //! # Locking
46 : //!
47 : //! There are two levels of locking involved: There's one lock for the "mapping"
48 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
49 : //! slot, and a separate lock on each slot. To read or write the contents of a
50 : //! slot, you must hold the lock on the slot in read or write mode,
51 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
52 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
53 : //! the slot at the same time.
54 : //!
55 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
56 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
57 : //! in the cache, you would first look up the mapping, while holding the mapping
58 : //! lock, and then lock the slot. You must release the mapping lock in between,
59 : //! to obey the lock ordering and avoid deadlock.
60 : //!
61 : //! A slot can momentarily have invalid contents, even if it's already been
62 : //! inserted to the mapping, but you must hold the write-lock on the slot until
63 : //! the contents are valid. If you need to release the lock without initializing
64 : //! the contents, you must remove the mapping first. We make that easy for the
65 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
66 : //! initialized it. If the guard is dropped without calling mark_valid(), the
67 : //! mapping is automatically removed and the slot is marked free.
68 : //!
69 :
70 : use std::{
71 : collections::{hash_map::Entry, HashMap},
72 : sync::{
73 : atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
74 : Arc, Weak,
75 : },
76 : time::Duration,
77 : };
78 :
79 : use anyhow::Context;
80 : use once_cell::sync::OnceCell;
81 :
82 : use crate::{
83 : context::RequestContext,
84 : metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
85 : virtual_file::{IoBufferMut, IoPageSlice},
86 : };
87 :
88 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
89 : const TEST_PAGE_CACHE_SIZE: usize = 50;
90 :
91 : ///
92 : /// Initialize the page cache. This must be called once at page server startup.
93 : ///
94 0 : pub fn init(size: usize) {
95 0 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
96 0 : panic!("page cache already initialized");
97 0 : }
98 0 : }
99 :
100 : ///
101 : /// Get a handle to the page cache.
102 : ///
103 485383 : pub fn get() -> &'static PageCache {
104 485383 : //
105 485383 : // In unit tests, page server startup doesn't happen and no one calls
106 485383 : // page_cache::init(). Initialize it here with a tiny cache, so that the
107 485383 : // page cache is usable in unit tests.
108 485383 : //
109 485383 : if cfg!(test) {
110 485383 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
111 : } else {
112 0 : PAGE_CACHE.get().expect("page cache not initialized")
113 : }
114 485383 : }
115 :
116 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
117 : const MAX_USAGE_COUNT: u8 = 5;
118 :
119 : /// See module-level comment.
120 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
121 : pub struct FileId(u64);
122 :
123 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
124 :
125 : /// See module-level comment.
126 2466 : pub fn next_file_id() -> FileId {
127 2466 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
128 2466 : }
129 :
130 : ///
131 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
132 : ///
133 : #[derive(Debug, PartialEq, Eq, Clone)]
134 : #[allow(clippy::enum_variant_names)]
135 : enum CacheKey {
136 : ImmutableFilePage { file_id: FileId, blkno: u32 },
137 : }
138 :
139 : struct Slot {
140 : inner: tokio::sync::RwLock<SlotInner>,
141 : usage_count: AtomicU8,
142 : }
143 :
144 : struct SlotInner {
145 : key: Option<CacheKey>,
146 : // for `coalesce_readers_permit`
147 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
148 : buf: IoPageSlice<'static>,
149 : }
150 :
151 : impl Slot {
152 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
153 453172 : fn inc_usage_count(&self) {
154 453172 : let _ = self
155 453172 : .usage_count
156 453172 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
157 453172 : if val == MAX_USAGE_COUNT {
158 429226 : None
159 : } else {
160 23946 : Some(val + 1)
161 : }
162 453172 : });
163 453172 : }
164 :
165 : /// Decrement usage count on the buffer, unless it's already zero. Returns
166 : /// the old usage count.
167 86188 : fn dec_usage_count(&self) -> u8 {
168 86188 : let count_res =
169 86188 : self.usage_count
170 86188 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
171 86188 : if val == 0 {
172 32211 : None
173 : } else {
174 53977 : Some(val - 1)
175 : }
176 86188 : });
177 86188 :
178 86188 : match count_res {
179 53977 : Ok(usage_count) => usage_count,
180 32211 : Err(usage_count) => usage_count,
181 : }
182 86188 : }
183 :
184 : /// Sets the usage count to a specific value.
185 32211 : fn set_usage_count(&self, count: u8) {
186 32211 : self.usage_count.store(count, Ordering::Relaxed);
187 32211 : }
188 : }
189 :
190 : impl SlotInner {
191 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
192 453172 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
193 453172 : let mut guard = self.permit.lock().unwrap();
194 453172 : if let Some(existing_permit) = guard.upgrade() {
195 0 : drop(guard);
196 0 : drop(permit);
197 0 : existing_permit
198 : } else {
199 453172 : let permit = Arc::new(permit);
200 453172 : *guard = Arc::downgrade(&permit);
201 453172 : permit
202 : }
203 453172 : }
204 : }
205 :
206 : pub struct PageCache {
207 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
208 :
209 : /// The actual buffers with their metadata.
210 : slots: Box<[Slot]>,
211 :
212 : pinned_slots: Arc<tokio::sync::Semaphore>,
213 :
214 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
215 : /// This is interpreted modulo the page cache size.
216 : next_evict_slot: AtomicUsize,
217 :
218 : size_metrics: &'static PageCacheSizeMetrics,
219 : }
220 :
221 : struct PinnedSlotsPermit {
222 : _permit: tokio::sync::OwnedSemaphorePermit,
223 : }
224 :
225 : ///
226 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
227 : /// until the guard is dropped.
228 : ///
229 : pub struct PageReadGuard<'i> {
230 : _permit: Arc<PinnedSlotsPermit>,
231 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
232 : }
233 :
234 : impl std::ops::Deref for PageReadGuard<'_> {
235 : type Target = [u8; PAGE_SZ];
236 :
237 485463 : fn deref(&self) -> &Self::Target {
238 485463 : self.slot_guard.buf.deref()
239 485463 : }
240 : }
241 :
242 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
243 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
244 0 : self.slot_guard.buf.as_ref()
245 0 : }
246 : }
247 :
248 : ///
249 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
250 : /// until the guard is dropped.
251 : ///
252 : /// Counterintuitively, this is used even for a read, if the requested page is not
253 : /// currently found in the page cache. In that case, the caller of lock_for_read()
254 : /// is expected to fill in the page contents and call mark_valid().
255 : pub struct PageWriteGuard<'i> {
256 : state: PageWriteGuardState<'i>,
257 : }
258 :
259 : enum PageWriteGuardState<'i> {
260 : Invalid {
261 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
262 : _permit: PinnedSlotsPermit,
263 : },
264 : Downgraded,
265 : }
266 :
267 : impl std::ops::DerefMut for PageWriteGuard<'_> {
268 48331 : fn deref_mut(&mut self) -> &mut Self::Target {
269 48331 : match &mut self.state {
270 48331 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
271 0 : PageWriteGuardState::Downgraded => unreachable!(),
272 : }
273 48331 : }
274 : }
275 :
276 : impl std::ops::Deref for PageWriteGuard<'_> {
277 : type Target = [u8; PAGE_SZ];
278 :
279 531554 : fn deref(&self) -> &Self::Target {
280 531554 : match &self.state {
281 531554 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
282 0 : PageWriteGuardState::Downgraded => unreachable!(),
283 : }
284 531554 : }
285 : }
286 :
287 : impl<'a> PageWriteGuard<'a> {
288 : /// Mark that the buffer contents are now valid.
289 : #[must_use]
290 32211 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
291 32211 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
292 32211 : match prev {
293 32211 : PageWriteGuardState::Invalid { inner, _permit } => {
294 32211 : assert!(inner.key.is_some());
295 32211 : PageReadGuard {
296 32211 : _permit: Arc::new(_permit),
297 32211 : slot_guard: inner.downgrade(),
298 32211 : }
299 : }
300 0 : PageWriteGuardState::Downgraded => unreachable!(),
301 : }
302 32211 : }
303 : }
304 :
305 : impl Drop for PageWriteGuard<'_> {
306 : ///
307 : /// If the buffer was allocated for a page that was not already in the
308 : /// cache, but the lock_for_read/write() caller dropped the buffer without
309 : /// initializing it, remove the mapping from the page cache.
310 : ///
311 32211 : fn drop(&mut self) {
312 32211 : match &mut self.state {
313 0 : PageWriteGuardState::Invalid { inner, _permit } => {
314 0 : assert!(inner.key.is_some());
315 0 : let self_key = inner.key.as_ref().unwrap();
316 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
317 0 : inner.key = None;
318 : }
319 32211 : PageWriteGuardState::Downgraded => {}
320 : }
321 32211 : }
322 : }
323 :
324 : /// lock_for_read() return value
325 : pub enum ReadBufResult<'a> {
326 : Found(PageReadGuard<'a>),
327 : NotFound(PageWriteGuard<'a>),
328 : }
329 :
330 : impl PageCache {
331 485383 : pub async fn read_immutable_buf(
332 485383 : &self,
333 485383 : file_id: FileId,
334 485383 : blkno: u32,
335 485383 : ctx: &RequestContext,
336 485383 : ) -> anyhow::Result<ReadBufResult> {
337 485383 : self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
338 3651 : .await
339 485383 : }
340 :
341 : //
342 : // Section 2: Internal interface functions for lookup/update.
343 : //
344 : // To add support for a new kind of "thing" to cache, you will need
345 : // to add public interface routines above, and code to deal with the
346 : // "mappings" after this section. But the routines in this section should
347 : // not require changes.
348 :
349 485383 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
350 485383 : match tokio::time::timeout(
351 485383 : // Choose small timeout, neon_smgr does its own retries.
352 485383 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
353 485383 : Duration::from_secs(10),
354 485383 : Arc::clone(&self.pinned_slots).acquire_owned(),
355 485383 : )
356 1818 : .await
357 : {
358 485383 : Ok(res) => Ok(PinnedSlotsPermit {
359 485383 : _permit: res.expect("this semaphore is never closed"),
360 485383 : }),
361 0 : Err(_timeout) => {
362 0 : crate::metrics::page_cache_errors_inc(
363 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
364 0 : );
365 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
366 : }
367 : }
368 485383 : }
369 :
370 : /// Look up a page in the cache.
371 : ///
372 485383 : async fn try_lock_for_read(
373 485383 : &self,
374 485383 : cache_key: &CacheKey,
375 485383 : permit: &mut Option<PinnedSlotsPermit>,
376 485383 : ) -> Option<PageReadGuard> {
377 485383 : if let Some(slot_idx) = self.search_mapping(cache_key) {
378 : // The page was found in the mapping. Lock the slot, and re-check
379 : // that it's still what we expected (because we released the mapping
380 : // lock already, another thread could have evicted the page)
381 453172 : let slot = &self.slots[slot_idx];
382 453172 : let inner = slot.inner.read().await;
383 453172 : if inner.key.as_ref() == Some(cache_key) {
384 453172 : slot.inc_usage_count();
385 453172 : return Some(PageReadGuard {
386 453172 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
387 453172 : slot_guard: inner,
388 453172 : });
389 0 : }
390 32211 : }
391 32211 : None
392 485383 : }
393 :
394 : /// Return a locked buffer for given block.
395 : ///
396 : /// Like try_lock_for_read(), if the search criteria is not exact and the
397 : /// page is already found in the cache, *cache_key is updated.
398 : ///
399 : /// If the page is not found in the cache, this allocates a new buffer for
400 : /// it. The caller may then initialize the buffer with the contents, and
401 : /// call mark_valid().
402 : ///
403 : /// Example usage:
404 : ///
405 : /// ```ignore
406 : /// let cache = page_cache::get();
407 : ///
408 : /// match cache.lock_for_read(&key) {
409 : /// ReadBufResult::Found(read_guard) => {
410 : /// // The page was found in cache. Use it
411 : /// },
412 : /// ReadBufResult::NotFound(write_guard) => {
413 : /// // The page was not found in cache. Read it from disk into the
414 : /// // buffer.
415 : /// //read_my_page_from_disk(write_guard);
416 : ///
417 : /// // The buffer contents are now valid. Tell the page cache.
418 : /// write_guard.mark_valid();
419 : /// },
420 : /// }
421 : /// ```
422 : ///
423 485383 : async fn lock_for_read(
424 485383 : &self,
425 485383 : cache_key: &CacheKey,
426 485383 : ctx: &RequestContext,
427 485383 : ) -> anyhow::Result<ReadBufResult> {
428 485383 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
429 :
430 485383 : let (read_access, hit) = match cache_key {
431 485383 : CacheKey::ImmutableFilePage { .. } => (
432 485383 : &crate::metrics::PAGE_CACHE
433 485383 : .for_ctx(ctx)
434 485383 : .read_accesses_immutable,
435 485383 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
436 485383 : ),
437 485383 : };
438 485383 : read_access.inc();
439 485383 :
440 485383 : let mut is_first_iteration = true;
441 : loop {
442 : // First check if the key already exists in the cache.
443 485383 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
444 453172 : debug_assert!(permit.is_none());
445 453172 : if is_first_iteration {
446 453172 : hit.inc();
447 453172 : }
448 453172 : return Ok(ReadBufResult::Found(read_guard));
449 32211 : }
450 32211 : debug_assert!(permit.is_some());
451 32211 : is_first_iteration = false;
452 :
453 : // Not found. Find a victim buffer
454 32211 : let (slot_idx, mut inner) = self
455 32211 : .find_victim(permit.as_ref().unwrap())
456 0 : .await
457 32211 : .context("Failed to find evict victim")?;
458 :
459 : // Insert mapping for this. At this point, we may find that another
460 : // thread did the same thing concurrently. In that case, we evicted
461 : // our victim buffer unnecessarily. Put it into the free list and
462 : // continue with the slot that the other thread chose.
463 32211 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
464 : // TODO: put to free list
465 :
466 : // We now just loop back to start from beginning. This is not
467 : // optimal, we'll perform the lookup in the mapping again, which
468 : // is not really necessary because we already got
469 : // 'existing_slot_idx'. But this shouldn't happen often enough
470 : // to matter much.
471 0 : continue;
472 32211 : }
473 32211 :
474 32211 : // Make the slot ready
475 32211 : let slot = &self.slots[slot_idx];
476 32211 : inner.key = Some(cache_key.clone());
477 32211 : slot.set_usage_count(1);
478 32211 :
479 32211 : debug_assert!(
480 : {
481 32211 : let guard = inner.permit.lock().unwrap();
482 32211 : guard.upgrade().is_none()
483 : },
484 0 : "we hold a write lock, so, no one else should have a permit"
485 : );
486 :
487 32211 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
488 32211 : state: PageWriteGuardState::Invalid {
489 32211 : _permit: permit.take().unwrap(),
490 32211 : inner,
491 32211 : },
492 32211 : }));
493 : }
494 485383 : }
495 :
496 : //
497 : // Section 3: Mapping functions
498 : //
499 :
500 : /// Search for a page in the cache using the given search key.
501 : ///
502 : /// Returns the slot index, if any.
503 : ///
504 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
505 : /// get recycled for an unrelated page immediately after this function
506 : /// returns. The caller is responsible for re-checking that the slot still
507 : /// contains the page with the same key before using it.
508 : ///
509 485383 : fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
510 485383 : match cache_key {
511 485383 : CacheKey::ImmutableFilePage { file_id, blkno } => {
512 485383 : let map = self.immutable_page_map.read().unwrap();
513 485383 : Some(*map.get(&(*file_id, *blkno))?)
514 : }
515 : }
516 485383 : }
517 :
518 : ///
519 : /// Remove mapping for given key.
520 : ///
521 30651 : fn remove_mapping(&self, old_key: &CacheKey) {
522 30651 : match old_key {
523 30651 : CacheKey::ImmutableFilePage { file_id, blkno } => {
524 30651 : let mut map = self.immutable_page_map.write().unwrap();
525 30651 : map.remove(&(*file_id, *blkno))
526 30651 : .expect("could not find old key in mapping");
527 30651 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
528 30651 : }
529 30651 : }
530 30651 : }
531 :
532 : ///
533 : /// Insert mapping for given key.
534 : ///
535 : /// If a mapping already existed for the given key, returns the slot index
536 : /// of the existing mapping and leaves it untouched.
537 32211 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
538 32211 : match new_key {
539 32211 : CacheKey::ImmutableFilePage { file_id, blkno } => {
540 32211 : let mut map = self.immutable_page_map.write().unwrap();
541 32211 : match map.entry((*file_id, *blkno)) {
542 0 : Entry::Occupied(entry) => Some(*entry.get()),
543 32211 : Entry::Vacant(entry) => {
544 32211 : entry.insert(slot_idx);
545 32211 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
546 32211 : None
547 : }
548 : }
549 : }
550 : }
551 32211 : }
552 :
553 : //
554 : // Section 4: Misc internal helpers
555 : //
556 :
557 : /// Find a slot to evict.
558 : ///
559 : /// On return, the slot is empty and write-locked.
560 32211 : async fn find_victim(
561 32211 : &self,
562 32211 : _permit_witness: &PinnedSlotsPermit,
563 32211 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
564 32211 : let iter_limit = self.slots.len() * 10;
565 32211 : let mut iters = 0;
566 : loop {
567 86188 : iters += 1;
568 86188 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
569 86188 :
570 86188 : let slot = &self.slots[slot_idx];
571 86188 :
572 86188 : if slot.dec_usage_count() == 0 {
573 32211 : let mut inner = match slot.inner.try_write() {
574 32211 : Ok(inner) => inner,
575 0 : Err(_err) => {
576 0 : if iters > iter_limit {
577 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
578 : // any particular number of iterations: other threads might race ahead and acquire and
579 : // release pins just as we're scanning the array.
580 : //
581 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
582 : // slots. There are two threads running concurrently, A and B. A has just
583 : // acquired the permit from the semaphore.
584 : //
585 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
586 : // B: Acquire permit.
587 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
588 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
589 : // B: Release pin and permit again
590 : // B: Acquire permit.
591 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
592 : // B: Release pin and permit again
593 : //
594 : // Now we're back in the starting situation that both slots have
595 : // usage_count 1, but A has now been through one iteration of the
596 : // find_victim() loop. This can repeat indefinitely and on each
597 : // iteration, A's iteration count increases by one.
598 : //
599 : // So, even though the semaphore for the permits is fair, the victim search
600 : // itself happens in parallel and is not fair.
601 : // Hence even with a permit, a task can theoretically be starved.
602 : // To avoid this, we'd need tokio to give priority to tasks that are holding
603 : // permits for longer.
604 : // Note that just yielding to tokio during iteration without such
605 : // priority boosting is likely counter-productive. We'd just give more opportunities
606 : // for B to bump usage count, further starving A.
607 0 : page_cache_eviction_metrics::observe(
608 0 : page_cache_eviction_metrics::Outcome::ItersExceeded {
609 0 : iters: iters.try_into().unwrap(),
610 0 : },
611 0 : );
612 0 : anyhow::bail!("exceeded evict iter limit");
613 0 : }
614 0 : continue;
615 : }
616 : };
617 32211 : if let Some(old_key) = &inner.key {
618 30651 : // remove mapping for old buffer
619 30651 : self.remove_mapping(old_key);
620 30651 : inner.key = None;
621 30651 : page_cache_eviction_metrics::observe(
622 30651 : page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
623 30651 : iters: iters.try_into().unwrap(),
624 30651 : },
625 30651 : );
626 30651 : } else {
627 1560 : page_cache_eviction_metrics::observe(
628 1560 : page_cache_eviction_metrics::Outcome::FoundSlotUnused {
629 1560 : iters: iters.try_into().unwrap(),
630 1560 : },
631 1560 : );
632 1560 : }
633 32211 : return Ok((slot_idx, inner));
634 53977 : }
635 : }
636 32211 : }
637 :
638 : /// Initialize a new page cache
639 : ///
640 : /// This should be called only once at page server startup.
641 88 : fn new(num_pages: usize) -> Self {
642 88 : assert!(num_pages > 0, "page cache size must be > 0");
643 :
644 : // We could use Vec::leak here, but that potentially also leaks
645 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
646 : // this is avoided.
647 88 : let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
648 88 :
649 88 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
650 88 : size_metrics.max_bytes.set_page_sz(num_pages);
651 88 : size_metrics.current_bytes_immutable.set_page_sz(0);
652 88 :
653 88 : let slots = page_buffer
654 88 : .chunks_exact_mut(PAGE_SZ)
655 4400 : .map(|chunk| {
656 4400 : // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
657 4400 : let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
658 4400 :
659 4400 : Slot {
660 4400 : inner: tokio::sync::RwLock::new(SlotInner {
661 4400 : key: None,
662 4400 : buf,
663 4400 : permit: std::sync::Mutex::new(Weak::new()),
664 4400 : }),
665 4400 : usage_count: AtomicU8::new(0),
666 4400 : }
667 4400 : })
668 88 : .collect();
669 88 :
670 88 : Self {
671 88 : immutable_page_map: Default::default(),
672 88 : slots,
673 88 : next_evict_slot: AtomicUsize::new(0),
674 88 : size_metrics,
675 88 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
676 88 : }
677 88 : }
678 : }
679 :
680 : trait PageSzBytesMetric {
681 : fn set_page_sz(&self, count: usize);
682 : fn add_page_sz(&self, count: usize);
683 : fn sub_page_sz(&self, count: usize);
684 : }
685 :
686 : #[inline(always)]
687 63038 : fn count_times_page_sz(count: usize) -> u64 {
688 63038 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
689 63038 : }
690 :
691 : impl PageSzBytesMetric for metrics::UIntGauge {
692 176 : fn set_page_sz(&self, count: usize) {
693 176 : self.set(count_times_page_sz(count));
694 176 : }
695 32211 : fn add_page_sz(&self, count: usize) {
696 32211 : self.add(count_times_page_sz(count));
697 32211 : }
698 30651 : fn sub_page_sz(&self, count: usize) {
699 30651 : self.sub(count_times_page_sz(count));
700 30651 : }
701 : }
|