Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
21 : //!
22 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
23 : //! It uses the page cache only for the blocks that are already fully written and immutable.
24 : //!
25 : //! # Filling The Page Cache
26 : //!
27 : //! Page cache maps from a cache key to a buffer slot.
28 : //! The cache key uniquely identifies the piece of data that is being cached.
29 : //!
30 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
31 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
32 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
33 : //! * Get a [`FileId`] using [`next_file_id`].
34 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
35 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
36 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
37 : //! a read guard for the page. Just use it.
38 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
39 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
40 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
41 : //! Then try again to [`PageCache::read_immutable_buf`].
42 : //! Unless there's high cache pressure, the page should now be cached.
43 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
44 : //!
45 : //! # Locking
46 : //!
47 : //! There are two levels of locking involved: There's one lock for the "mapping"
48 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
49 : //! slot, and a separate lock on each slot. To read or write the contents of a
50 : //! slot, you must hold the lock on the slot in read or write mode,
51 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
52 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
53 : //! the slot at the same time.
54 : //!
55 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
56 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
57 : //! in the cache, you would first look up the mapping, while holding the mapping
58 : //! lock, and then lock the slot. You must release the mapping lock in between,
59 : //! to obey the lock ordering and avoid deadlock.
60 : //!
61 : //! A slot can momentarily have invalid contents, even if it's already been
62 : //! inserted to the mapping, but you must hold the write-lock on the slot until
63 : //! the contents are valid. If you need to release the lock without initializing
64 : //! the contents, you must remove the mapping first. We make that easy for the
65 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
66 : //! initialized it. If the guard is dropped without calling mark_valid(), the
67 : //! mapping is automatically removed and the slot is marked free.
68 : //!
69 :
70 : use std::{
71 : collections::{hash_map::Entry, HashMap},
72 : sync::{
73 : atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
74 : Arc, Weak,
75 : },
76 : time::Duration,
77 : };
78 :
79 : use anyhow::Context;
80 : use once_cell::sync::OnceCell;
81 :
82 : use crate::{
83 : context::RequestContext,
84 : metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
85 : };
86 :
87 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
88 : const TEST_PAGE_CACHE_SIZE: usize = 50;
89 :
90 : ///
91 : /// Initialize the page cache. This must be called once at page server startup.
92 : ///
93 0 : pub fn init(size: usize) {
94 0 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
95 0 : panic!("page cache already initialized");
96 0 : }
97 0 : }
98 :
99 : ///
100 : /// Get a handle to the page cache.
101 : ///
102 1651139 : pub fn get() -> &'static PageCache {
103 1651139 : //
104 1651139 : // In unit tests, page server startup doesn't happen and no one calls
105 1651139 : // page_cache::init(). Initialize it here with a tiny cache, so that the
106 1651139 : // page cache is usable in unit tests.
107 1651139 : //
108 1651139 : if cfg!(test) {
109 1651139 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
110 : } else {
111 0 : PAGE_CACHE.get().expect("page cache not initialized")
112 : }
113 1651139 : }
114 :
115 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
116 : const MAX_USAGE_COUNT: u8 = 5;
117 :
118 : /// See module-level comment.
119 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
120 : pub struct FileId(u64);
121 :
122 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
123 :
124 : /// See module-level comment.
125 7302 : pub fn next_file_id() -> FileId {
126 7302 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
127 7302 : }
128 :
129 : ///
130 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
131 : ///
132 : #[derive(Debug, PartialEq, Eq, Clone)]
133 : #[allow(clippy::enum_variant_names)]
134 : enum CacheKey {
135 : ImmutableFilePage { file_id: FileId, blkno: u32 },
136 : }
137 :
138 : struct Slot {
139 : inner: tokio::sync::RwLock<SlotInner>,
140 : usage_count: AtomicU8,
141 : }
142 :
143 : struct SlotInner {
144 : key: Option<CacheKey>,
145 : // for `coalesce_readers_permit`
146 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
147 : buf: &'static mut [u8; PAGE_SZ],
148 : }
149 :
150 : impl Slot {
151 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
152 1554524 : fn inc_usage_count(&self) {
153 1554524 : let _ = self
154 1554524 : .usage_count
155 1554524 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
156 1554524 : if val == MAX_USAGE_COUNT {
157 1483309 : None
158 : } else {
159 71215 : Some(val + 1)
160 : }
161 1554524 : });
162 1554524 : }
163 :
164 : /// Decrement usage count on the buffer, unless it's already zero. Returns
165 : /// the old usage count.
166 258012 : fn dec_usage_count(&self) -> u8 {
167 258012 : let count_res =
168 258012 : self.usage_count
169 258012 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
170 258012 : if val == 0 {
171 96615 : None
172 : } else {
173 161397 : Some(val - 1)
174 : }
175 258012 : });
176 258012 :
177 258012 : match count_res {
178 161397 : Ok(usage_count) => usage_count,
179 96615 : Err(usage_count) => usage_count,
180 : }
181 258012 : }
182 :
183 : /// Sets the usage count to a specific value.
184 96615 : fn set_usage_count(&self, count: u8) {
185 96615 : self.usage_count.store(count, Ordering::Relaxed);
186 96615 : }
187 : }
188 :
189 : impl SlotInner {
190 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
191 1554524 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
192 1554524 : let mut guard = self.permit.lock().unwrap();
193 1554524 : if let Some(existing_permit) = guard.upgrade() {
194 0 : drop(guard);
195 0 : drop(permit);
196 0 : existing_permit
197 : } else {
198 1554524 : let permit = Arc::new(permit);
199 1554524 : *guard = Arc::downgrade(&permit);
200 1554524 : permit
201 : }
202 1554524 : }
203 : }
204 :
205 : pub struct PageCache {
206 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
207 :
208 : /// The actual buffers with their metadata.
209 : slots: Box<[Slot]>,
210 :
211 : pinned_slots: Arc<tokio::sync::Semaphore>,
212 :
213 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
214 : /// This is interpreted modulo the page cache size.
215 : next_evict_slot: AtomicUsize,
216 :
217 : size_metrics: &'static PageCacheSizeMetrics,
218 : }
219 :
220 : struct PinnedSlotsPermit {
221 : _permit: tokio::sync::OwnedSemaphorePermit,
222 : }
223 :
224 : ///
225 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
226 : /// until the guard is dropped.
227 : ///
228 : pub struct PageReadGuard<'i> {
229 : _permit: Arc<PinnedSlotsPermit>,
230 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
231 : }
232 :
233 : impl std::ops::Deref for PageReadGuard<'_> {
234 : type Target = [u8; PAGE_SZ];
235 :
236 1651379 : fn deref(&self) -> &Self::Target {
237 1651379 : self.slot_guard.buf
238 1651379 : }
239 : }
240 :
241 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
242 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
243 0 : self.slot_guard.buf
244 0 : }
245 : }
246 :
247 : ///
248 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
249 : /// until the guard is dropped.
250 : ///
251 : /// Counterintuitively, this is used even for a read, if the requested page is not
252 : /// currently found in the page cache. In that case, the caller of lock_for_read()
253 : /// is expected to fill in the page contents and call mark_valid().
254 : pub struct PageWriteGuard<'i> {
255 : state: PageWriteGuardState<'i>,
256 : }
257 :
258 : enum PageWriteGuardState<'i> {
259 : Invalid {
260 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
261 : _permit: PinnedSlotsPermit,
262 : },
263 : Downgraded,
264 : }
265 :
266 : impl std::ops::DerefMut for PageWriteGuard<'_> {
267 144848 : fn deref_mut(&mut self) -> &mut Self::Target {
268 144848 : match &mut self.state {
269 144848 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
270 0 : PageWriteGuardState::Downgraded => unreachable!(),
271 : }
272 144848 : }
273 : }
274 :
275 : impl std::ops::Deref for PageWriteGuard<'_> {
276 : type Target = [u8; PAGE_SZ];
277 :
278 1593775 : fn deref(&self) -> &Self::Target {
279 1593775 : match &self.state {
280 1593775 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
281 0 : PageWriteGuardState::Downgraded => unreachable!(),
282 : }
283 1593775 : }
284 : }
285 :
286 : impl<'a> PageWriteGuard<'a> {
287 : /// Mark that the buffer contents are now valid.
288 : #[must_use]
289 96615 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
290 96615 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
291 96615 : match prev {
292 96615 : PageWriteGuardState::Invalid { inner, _permit } => {
293 96615 : assert!(inner.key.is_some());
294 96615 : PageReadGuard {
295 96615 : _permit: Arc::new(_permit),
296 96615 : slot_guard: inner.downgrade(),
297 96615 : }
298 : }
299 0 : PageWriteGuardState::Downgraded => unreachable!(),
300 : }
301 96615 : }
302 : }
303 :
304 : impl Drop for PageWriteGuard<'_> {
305 : ///
306 : /// If the buffer was allocated for a page that was not already in the
307 : /// cache, but the lock_for_read/write() caller dropped the buffer without
308 : /// initializing it, remove the mapping from the page cache.
309 : ///
310 96615 : fn drop(&mut self) {
311 96615 : match &mut self.state {
312 0 : PageWriteGuardState::Invalid { inner, _permit } => {
313 0 : assert!(inner.key.is_some());
314 0 : let self_key = inner.key.as_ref().unwrap();
315 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
316 0 : inner.key = None;
317 : }
318 96615 : PageWriteGuardState::Downgraded => {}
319 : }
320 96615 : }
321 : }
322 :
323 : /// lock_for_read() return value
324 : pub enum ReadBufResult<'a> {
325 : Found(PageReadGuard<'a>),
326 : NotFound(PageWriteGuard<'a>),
327 : }
328 :
329 : impl PageCache {
330 1651139 : pub async fn read_immutable_buf(
331 1651139 : &self,
332 1651139 : file_id: FileId,
333 1651139 : blkno: u32,
334 1651139 : ctx: &RequestContext,
335 1651139 : ) -> anyhow::Result<ReadBufResult> {
336 1651139 : self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
337 14978 : .await
338 1651139 : }
339 :
340 : //
341 : // Section 2: Internal interface functions for lookup/update.
342 : //
343 : // To add support for a new kind of "thing" to cache, you will need
344 : // to add public interface routines above, and code to deal with the
345 : // "mappings" after this section. But the routines in this section should
346 : // not require changes.
347 :
348 1651139 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
349 1651139 : match tokio::time::timeout(
350 1651139 : // Choose small timeout, neon_smgr does its own retries.
351 1651139 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
352 1651139 : Duration::from_secs(10),
353 1651139 : Arc::clone(&self.pinned_slots).acquire_owned(),
354 1651139 : )
355 6834 : .await
356 : {
357 1651139 : Ok(res) => Ok(PinnedSlotsPermit {
358 1651139 : _permit: res.expect("this semaphore is never closed"),
359 1651139 : }),
360 0 : Err(_timeout) => {
361 0 : crate::metrics::page_cache_errors_inc(
362 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
363 0 : );
364 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
365 : }
366 : }
367 1651139 : }
368 :
369 : /// Look up a page in the cache.
370 : ///
371 1651139 : async fn try_lock_for_read(
372 1651139 : &self,
373 1651139 : cache_key: &CacheKey,
374 1651139 : permit: &mut Option<PinnedSlotsPermit>,
375 1651139 : ) -> Option<PageReadGuard> {
376 1651139 : if let Some(slot_idx) = self.search_mapping(cache_key) {
377 : // The page was found in the mapping. Lock the slot, and re-check
378 : // that it's still what we expected (because we released the mapping
379 : // lock already, another thread could have evicted the page)
380 1554524 : let slot = &self.slots[slot_idx];
381 1554524 : let inner = slot.inner.read().await;
382 1554524 : if inner.key.as_ref() == Some(cache_key) {
383 1554524 : slot.inc_usage_count();
384 1554524 : return Some(PageReadGuard {
385 1554524 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
386 1554524 : slot_guard: inner,
387 1554524 : });
388 0 : }
389 96615 : }
390 96615 : None
391 1651139 : }
392 :
393 : /// Return a locked buffer for given block.
394 : ///
395 : /// Like try_lock_for_read(), if the search criteria is not exact and the
396 : /// page is already found in the cache, *cache_key is updated.
397 : ///
398 : /// If the page is not found in the cache, this allocates a new buffer for
399 : /// it. The caller may then initialize the buffer with the contents, and
400 : /// call mark_valid().
401 : ///
402 : /// Example usage:
403 : ///
404 : /// ```ignore
405 : /// let cache = page_cache::get();
406 : ///
407 : /// match cache.lock_for_read(&key) {
408 : /// ReadBufResult::Found(read_guard) => {
409 : /// // The page was found in cache. Use it
410 : /// },
411 : /// ReadBufResult::NotFound(write_guard) => {
412 : /// // The page was not found in cache. Read it from disk into the
413 : /// // buffer.
414 : /// //read_my_page_from_disk(write_guard);
415 : ///
416 : /// // The buffer contents are now valid. Tell the page cache.
417 : /// write_guard.mark_valid();
418 : /// },
419 : /// }
420 : /// ```
421 : ///
422 1651139 : async fn lock_for_read(
423 1651139 : &self,
424 1651139 : cache_key: &CacheKey,
425 1651139 : ctx: &RequestContext,
426 1651139 : ) -> anyhow::Result<ReadBufResult> {
427 1651139 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
428 :
429 1651139 : let (read_access, hit) = match cache_key {
430 1651139 : CacheKey::ImmutableFilePage { .. } => (
431 1651139 : &crate::metrics::PAGE_CACHE
432 1651139 : .for_ctx(ctx)
433 1651139 : .read_accesses_immutable,
434 1651139 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
435 1651139 : ),
436 1651139 : };
437 1651139 : read_access.inc();
438 1651139 :
439 1651139 : let mut is_first_iteration = true;
440 : loop {
441 : // First check if the key already exists in the cache.
442 1651139 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
443 1554524 : debug_assert!(permit.is_none());
444 1554524 : if is_first_iteration {
445 1554524 : hit.inc();
446 1554524 : }
447 1554524 : return Ok(ReadBufResult::Found(read_guard));
448 96615 : }
449 96615 : debug_assert!(permit.is_some());
450 96615 : is_first_iteration = false;
451 :
452 : // Not found. Find a victim buffer
453 96615 : let (slot_idx, mut inner) = self
454 96615 : .find_victim(permit.as_ref().unwrap())
455 0 : .await
456 96615 : .context("Failed to find evict victim")?;
457 :
458 : // Insert mapping for this. At this point, we may find that another
459 : // thread did the same thing concurrently. In that case, we evicted
460 : // our victim buffer unnecessarily. Put it into the free list and
461 : // continue with the slot that the other thread chose.
462 96615 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
463 : // TODO: put to free list
464 :
465 : // We now just loop back to start from beginning. This is not
466 : // optimal, we'll perform the lookup in the mapping again, which
467 : // is not really necessary because we already got
468 : // 'existing_slot_idx'. But this shouldn't happen often enough
469 : // to matter much.
470 0 : continue;
471 96615 : }
472 96615 :
473 96615 : // Make the slot ready
474 96615 : let slot = &self.slots[slot_idx];
475 96615 : inner.key = Some(cache_key.clone());
476 96615 : slot.set_usage_count(1);
477 96615 :
478 96615 : debug_assert!(
479 : {
480 96615 : let guard = inner.permit.lock().unwrap();
481 96615 : guard.upgrade().is_none()
482 : },
483 0 : "we hold a write lock, so, no one else should have a permit"
484 : );
485 :
486 96615 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
487 96615 : state: PageWriteGuardState::Invalid {
488 96615 : _permit: permit.take().unwrap(),
489 96615 : inner,
490 96615 : },
491 96615 : }));
492 : }
493 1651139 : }
494 :
495 : //
496 : // Section 3: Mapping functions
497 : //
498 :
499 : /// Search for a page in the cache using the given search key.
500 : ///
501 : /// Returns the slot index, if any.
502 : ///
503 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
504 : /// get recycled for an unrelated page immediately after this function
505 : /// returns. The caller is responsible for re-checking that the slot still
506 : /// contains the page with the same key before using it.
507 : ///
508 1651139 : fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
509 1651139 : match cache_key {
510 1651139 : CacheKey::ImmutableFilePage { file_id, blkno } => {
511 1651139 : let map = self.immutable_page_map.read().unwrap();
512 1651139 : Some(*map.get(&(*file_id, *blkno))?)
513 : }
514 : }
515 1651139 : }
516 :
517 : ///
518 : /// Remove mapping for given key.
519 : ///
520 92163 : fn remove_mapping(&self, old_key: &CacheKey) {
521 92163 : match old_key {
522 92163 : CacheKey::ImmutableFilePage { file_id, blkno } => {
523 92163 : let mut map = self.immutable_page_map.write().unwrap();
524 92163 : map.remove(&(*file_id, *blkno))
525 92163 : .expect("could not find old key in mapping");
526 92163 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
527 92163 : }
528 92163 : }
529 92163 : }
530 :
531 : ///
532 : /// Insert mapping for given key.
533 : ///
534 : /// If a mapping already existed for the given key, returns the slot index
535 : /// of the existing mapping and leaves it untouched.
536 96615 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
537 96615 : match new_key {
538 96615 : CacheKey::ImmutableFilePage { file_id, blkno } => {
539 96615 : let mut map = self.immutable_page_map.write().unwrap();
540 96615 : match map.entry((*file_id, *blkno)) {
541 0 : Entry::Occupied(entry) => Some(*entry.get()),
542 96615 : Entry::Vacant(entry) => {
543 96615 : entry.insert(slot_idx);
544 96615 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
545 96615 : None
546 : }
547 : }
548 : }
549 : }
550 96615 : }
551 :
552 : //
553 : // Section 4: Misc internal helpers
554 : //
555 :
556 : /// Find a slot to evict.
557 : ///
558 : /// On return, the slot is empty and write-locked.
559 96615 : async fn find_victim(
560 96615 : &self,
561 96615 : _permit_witness: &PinnedSlotsPermit,
562 96615 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
563 96615 : let iter_limit = self.slots.len() * 10;
564 96615 : let mut iters = 0;
565 : loop {
566 258012 : iters += 1;
567 258012 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
568 258012 :
569 258012 : let slot = &self.slots[slot_idx];
570 258012 :
571 258012 : if slot.dec_usage_count() == 0 {
572 96615 : let mut inner = match slot.inner.try_write() {
573 96615 : Ok(inner) => inner,
574 0 : Err(_err) => {
575 0 : if iters > iter_limit {
576 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
577 : // any particular number of iterations: other threads might race ahead and acquire and
578 : // release pins just as we're scanning the array.
579 : //
580 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
581 : // slots. There are two threads running concurrently, A and B. A has just
582 : // acquired the permit from the semaphore.
583 : //
584 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
585 : // B: Acquire permit.
586 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
587 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
588 : // B: Release pin and permit again
589 : // B: Acquire permit.
590 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
591 : // B: Release pin and permit again
592 : //
593 : // Now we're back in the starting situation that both slots have
594 : // usage_count 1, but A has now been through one iteration of the
595 : // find_victim() loop. This can repeat indefinitely and on each
596 : // iteration, A's iteration count increases by one.
597 : //
598 : // So, even though the semaphore for the permits is fair, the victim search
599 : // itself happens in parallel and is not fair.
600 : // Hence even with a permit, a task can theoretically be starved.
601 : // To avoid this, we'd need tokio to give priority to tasks that are holding
602 : // permits for longer.
603 : // Note that just yielding to tokio during iteration without such
604 : // priority boosting is likely counter-productive. We'd just give more opportunities
605 : // for B to bump usage count, further starving A.
606 0 : page_cache_eviction_metrics::observe(
607 0 : page_cache_eviction_metrics::Outcome::ItersExceeded {
608 0 : iters: iters.try_into().unwrap(),
609 0 : },
610 0 : );
611 0 : anyhow::bail!("exceeded evict iter limit");
612 0 : }
613 0 : continue;
614 : }
615 : };
616 96615 : if let Some(old_key) = &inner.key {
617 92163 : // remove mapping for old buffer
618 92163 : self.remove_mapping(old_key);
619 92163 : inner.key = None;
620 92163 : page_cache_eviction_metrics::observe(
621 92163 : page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
622 92163 : iters: iters.try_into().unwrap(),
623 92163 : },
624 92163 : );
625 92163 : } else {
626 4452 : page_cache_eviction_metrics::observe(
627 4452 : page_cache_eviction_metrics::Outcome::FoundSlotUnused {
628 4452 : iters: iters.try_into().unwrap(),
629 4452 : },
630 4452 : );
631 4452 : }
632 96615 : return Ok((slot_idx, inner));
633 161397 : }
634 : }
635 96615 : }
636 :
637 : /// Initialize a new page cache
638 : ///
639 : /// This should be called only once at page server startup.
640 264 : fn new(num_pages: usize) -> Self {
641 264 : assert!(num_pages > 0, "page cache size must be > 0");
642 :
643 : // We could use Vec::leak here, but that potentially also leaks
644 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
645 : // this is avoided.
646 264 : let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
647 264 :
648 264 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
649 264 : size_metrics.max_bytes.set_page_sz(num_pages);
650 264 : size_metrics.current_bytes_immutable.set_page_sz(0);
651 264 :
652 264 : let slots = page_buffer
653 264 : .chunks_exact_mut(PAGE_SZ)
654 13200 : .map(|chunk| {
655 13200 : let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
656 13200 :
657 13200 : Slot {
658 13200 : inner: tokio::sync::RwLock::new(SlotInner {
659 13200 : key: None,
660 13200 : buf,
661 13200 : permit: std::sync::Mutex::new(Weak::new()),
662 13200 : }),
663 13200 : usage_count: AtomicU8::new(0),
664 13200 : }
665 13200 : })
666 264 : .collect();
667 264 :
668 264 : Self {
669 264 : immutable_page_map: Default::default(),
670 264 : slots,
671 264 : next_evict_slot: AtomicUsize::new(0),
672 264 : size_metrics,
673 264 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
674 264 : }
675 264 : }
676 : }
677 :
678 : trait PageSzBytesMetric {
679 : fn set_page_sz(&self, count: usize);
680 : fn add_page_sz(&self, count: usize);
681 : fn sub_page_sz(&self, count: usize);
682 : }
683 :
684 : #[inline(always)]
685 189306 : fn count_times_page_sz(count: usize) -> u64 {
686 189306 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
687 189306 : }
688 :
689 : impl PageSzBytesMetric for metrics::UIntGauge {
690 528 : fn set_page_sz(&self, count: usize) {
691 528 : self.set(count_times_page_sz(count));
692 528 : }
693 96615 : fn add_page_sz(&self, count: usize) {
694 96615 : self.add(count_times_page_sz(count));
695 96615 : }
696 92163 : fn sub_page_sz(&self, count: usize) {
697 92163 : self.sub(count_times_page_sz(count));
698 92163 : }
699 : }
|