Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
21 : //!
22 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
23 : //! It uses the page cache only for the blocks that are already fully written and immutable.
24 : //!
25 : //! # Filling The Page Cache
26 : //!
27 : //! Page cache maps from a cache key to a buffer slot.
28 : //! The cache key uniquely identifies the piece of data that is being cached.
29 : //!
30 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
31 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
32 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
33 : //! * Get a [`FileId`] using [`next_file_id`].
34 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
35 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
36 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
37 : //! a read guard for the page. Just use it.
38 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
39 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
40 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
41 : //! Then try again to [`PageCache::read_immutable_buf`].
42 : //! Unless there's high cache pressure, the page should now be cached.
43 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
44 : //!
45 : //! # Locking
46 : //!
47 : //! There are two levels of locking involved: There's one lock for the "mapping"
48 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
49 : //! slot, and a separate lock on each slot. To read or write the contents of a
50 : //! slot, you must hold the lock on the slot in read or write mode,
51 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
52 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
53 : //! the slot at the same time.
54 : //!
55 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
56 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
57 : //! in the cache, you would first look up the mapping, while holding the mapping
58 : //! lock, and then lock the slot. You must release the mapping lock in between,
59 : //! to obey the lock ordering and avoid deadlock.
60 : //!
61 : //! A slot can momentarily have invalid contents, even if it's already been
62 : //! inserted to the mapping, but you must hold the write-lock on the slot until
63 : //! the contents are valid. If you need to release the lock without initializing
64 : //! the contents, you must remove the mapping first. We make that easy for the
65 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
66 : //! initialized it. If the guard is dropped without calling mark_valid(), the
67 : //! mapping is automatically removed and the slot is marked free.
68 : //!
69 :
70 : use std::collections::HashMap;
71 : use std::collections::hash_map::Entry;
72 : use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
73 : use std::sync::{Arc, Weak};
74 : use std::time::Duration;
75 :
76 : use anyhow::Context;
77 : use once_cell::sync::OnceCell;
78 :
79 : use crate::context::RequestContext;
80 : use crate::metrics::{PageCacheSizeMetrics, page_cache_eviction_metrics};
81 : use crate::virtual_file::{IoBufferMut, IoPageSlice};
82 :
83 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
84 : const TEST_PAGE_CACHE_SIZE: usize = 50;
85 :
86 : ///
87 : /// Initialize the page cache. This must be called once at page server startup.
88 : ///
89 0 : pub fn init(size: usize) {
90 0 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
91 0 : panic!("page cache already initialized");
92 0 : }
93 0 : }
94 :
95 : ///
96 : /// Get a handle to the page cache.
97 : ///
98 974273 : pub fn get() -> &'static PageCache {
99 974273 : //
100 974273 : // In unit tests, page server startup doesn't happen and no one calls
101 974273 : // page_cache::init(). Initialize it here with a tiny cache, so that the
102 974273 : // page cache is usable in unit tests.
103 974273 : //
104 974273 : if cfg!(test) {
105 974273 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
106 : } else {
107 0 : PAGE_CACHE.get().expect("page cache not initialized")
108 : }
109 974273 : }
110 :
111 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
112 : const MAX_USAGE_COUNT: u8 = 5;
113 :
114 : /// See module-level comment.
115 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
116 : pub struct FileId(u64);
117 :
118 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
119 :
120 : /// See module-level comment.
121 5096 : pub fn next_file_id() -> FileId {
122 5096 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
123 5096 : }
124 :
125 : ///
126 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
127 : ///
128 : #[derive(Debug, PartialEq, Eq, Clone)]
129 : #[allow(clippy::enum_variant_names)]
130 : enum CacheKey {
131 : ImmutableFilePage { file_id: FileId, blkno: u32 },
132 : }
133 :
134 : struct Slot {
135 : inner: tokio::sync::RwLock<SlotInner>,
136 : usage_count: AtomicU8,
137 : }
138 :
139 : struct SlotInner {
140 : key: Option<CacheKey>,
141 : // for `coalesce_readers_permit`
142 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
143 : buf: IoPageSlice<'static>,
144 : }
145 :
146 : impl Slot {
147 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
148 910805 : fn inc_usage_count(&self) {
149 910805 : let _ = self
150 910805 : .usage_count
151 910805 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
152 910805 : if val == MAX_USAGE_COUNT {
153 863307 : None
154 : } else {
155 47498 : Some(val + 1)
156 : }
157 910805 : });
158 910805 : }
159 :
160 : /// Decrement usage count on the buffer, unless it's already zero. Returns
161 : /// the old usage count.
162 169657 : fn dec_usage_count(&self) -> u8 {
163 169657 : let count_res =
164 169657 : self.usage_count
165 169657 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
166 169657 : if val == 0 { None } else { Some(val - 1) }
167 169657 : });
168 169657 :
169 169657 : match count_res {
170 106189 : Ok(usage_count) => usage_count,
171 63468 : Err(usage_count) => usage_count,
172 : }
173 169657 : }
174 :
175 : /// Sets the usage count to a specific value.
176 63468 : fn set_usage_count(&self, count: u8) {
177 63468 : self.usage_count.store(count, Ordering::Relaxed);
178 63468 : }
179 : }
180 :
181 : impl SlotInner {
182 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
183 910805 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
184 910805 : let mut guard = self.permit.lock().unwrap();
185 910805 : if let Some(existing_permit) = guard.upgrade() {
186 0 : drop(guard);
187 0 : drop(permit);
188 0 : existing_permit
189 : } else {
190 910805 : let permit = Arc::new(permit);
191 910805 : *guard = Arc::downgrade(&permit);
192 910805 : permit
193 : }
194 910805 : }
195 : }
196 :
197 : pub struct PageCache {
198 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
199 :
200 : /// The actual buffers with their metadata.
201 : slots: Box<[Slot]>,
202 :
203 : pinned_slots: Arc<tokio::sync::Semaphore>,
204 :
205 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
206 : /// This is interpreted modulo the page cache size.
207 : next_evict_slot: AtomicUsize,
208 :
209 : size_metrics: &'static PageCacheSizeMetrics,
210 : }
211 :
212 : struct PinnedSlotsPermit {
213 : _permit: tokio::sync::OwnedSemaphorePermit,
214 : }
215 :
216 : ///
217 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
218 : /// until the guard is dropped.
219 : ///
220 : pub struct PageReadGuard<'i> {
221 : _permit: Arc<PinnedSlotsPermit>,
222 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
223 : }
224 :
225 : impl std::ops::Deref for PageReadGuard<'_> {
226 : type Target = [u8; PAGE_SZ];
227 :
228 974433 : fn deref(&self) -> &Self::Target {
229 974433 : self.slot_guard.buf.deref()
230 974433 : }
231 : }
232 :
233 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
234 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
235 0 : self.slot_guard.buf.as_ref()
236 0 : }
237 : }
238 :
239 : ///
240 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
241 : /// until the guard is dropped.
242 : ///
243 : /// Counterintuitively, this is used even for a read, if the requested page is not
244 : /// currently found in the page cache. In that case, the caller of lock_for_read()
245 : /// is expected to fill in the page contents and call mark_valid().
246 : pub struct PageWriteGuard<'i> {
247 : state: PageWriteGuardState<'i>,
248 : }
249 :
250 : enum PageWriteGuardState<'i> {
251 : Invalid {
252 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
253 : _permit: PinnedSlotsPermit,
254 : },
255 : Downgraded,
256 : }
257 :
258 : impl std::ops::DerefMut for PageWriteGuard<'_> {
259 95213 : fn deref_mut(&mut self) -> &mut Self::Target {
260 95213 : match &mut self.state {
261 95213 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref_mut(),
262 0 : PageWriteGuardState::Downgraded => unreachable!(),
263 : }
264 95213 : }
265 : }
266 :
267 : impl std::ops::Deref for PageWriteGuard<'_> {
268 : type Target = [u8; PAGE_SZ];
269 :
270 1047277 : fn deref(&self) -> &Self::Target {
271 1047277 : match &self.state {
272 1047277 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf.deref(),
273 0 : PageWriteGuardState::Downgraded => unreachable!(),
274 : }
275 1047277 : }
276 : }
277 :
278 : impl<'a> PageWriteGuard<'a> {
279 : /// Mark that the buffer contents are now valid.
280 : #[must_use]
281 63468 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
282 63468 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
283 63468 : match prev {
284 63468 : PageWriteGuardState::Invalid { inner, _permit } => {
285 63468 : assert!(inner.key.is_some());
286 63468 : PageReadGuard {
287 63468 : _permit: Arc::new(_permit),
288 63468 : slot_guard: inner.downgrade(),
289 63468 : }
290 : }
291 0 : PageWriteGuardState::Downgraded => unreachable!(),
292 : }
293 63468 : }
294 : }
295 :
296 : impl Drop for PageWriteGuard<'_> {
297 : ///
298 : /// If the buffer was allocated for a page that was not already in the
299 : /// cache, but the lock_for_read/write() caller dropped the buffer without
300 : /// initializing it, remove the mapping from the page cache.
301 : ///
302 63468 : fn drop(&mut self) {
303 63468 : match &mut self.state {
304 0 : PageWriteGuardState::Invalid { inner, _permit } => {
305 0 : assert!(inner.key.is_some());
306 0 : let self_key = inner.key.as_ref().unwrap();
307 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
308 0 : inner.key = None;
309 : }
310 63468 : PageWriteGuardState::Downgraded => {}
311 : }
312 63468 : }
313 : }
314 :
315 : /// lock_for_read() return value
316 : pub enum ReadBufResult<'a> {
317 : Found(PageReadGuard<'a>),
318 : NotFound(PageWriteGuard<'a>),
319 : }
320 :
321 : impl PageCache {
322 974273 : pub async fn read_immutable_buf(
323 974273 : &self,
324 974273 : file_id: FileId,
325 974273 : blkno: u32,
326 974273 : ctx: &RequestContext,
327 974273 : ) -> anyhow::Result<ReadBufResult> {
328 974273 : self.lock_for_read(&(CacheKey::ImmutableFilePage { file_id, blkno }), ctx)
329 974273 : .await
330 974273 : }
331 :
332 : //
333 : // Section 2: Internal interface functions for lookup/update.
334 : //
335 : // To add support for a new kind of "thing" to cache, you will need
336 : // to add public interface routines above, and code to deal with the
337 : // "mappings" after this section. But the routines in this section should
338 : // not require changes.
339 :
340 974273 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
341 974273 : match tokio::time::timeout(
342 974273 : // Choose small timeout, neon_smgr does its own retries.
343 974273 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
344 974273 : Duration::from_secs(10),
345 974273 : Arc::clone(&self.pinned_slots).acquire_owned(),
346 974273 : )
347 974273 : .await
348 : {
349 974273 : Ok(res) => Ok(PinnedSlotsPermit {
350 974273 : _permit: res.expect("this semaphore is never closed"),
351 974273 : }),
352 0 : Err(_timeout) => {
353 0 : crate::metrics::page_cache_errors_inc(
354 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
355 0 : );
356 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
357 : }
358 : }
359 974273 : }
360 :
361 : /// Look up a page in the cache.
362 : ///
363 974273 : async fn try_lock_for_read(
364 974273 : &self,
365 974273 : cache_key: &CacheKey,
366 974273 : permit: &mut Option<PinnedSlotsPermit>,
367 974273 : ) -> Option<PageReadGuard> {
368 974273 : if let Some(slot_idx) = self.search_mapping(cache_key) {
369 : // The page was found in the mapping. Lock the slot, and re-check
370 : // that it's still what we expected (because we released the mapping
371 : // lock already, another thread could have evicted the page)
372 910805 : let slot = &self.slots[slot_idx];
373 910805 : let inner = slot.inner.read().await;
374 910805 : if inner.key.as_ref() == Some(cache_key) {
375 910805 : slot.inc_usage_count();
376 910805 : return Some(PageReadGuard {
377 910805 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
378 910805 : slot_guard: inner,
379 910805 : });
380 0 : }
381 63468 : }
382 63468 : None
383 974273 : }
384 :
385 : /// Return a locked buffer for given block.
386 : ///
387 : /// Like try_lock_for_read(), if the search criteria is not exact and the
388 : /// page is already found in the cache, *cache_key is updated.
389 : ///
390 : /// If the page is not found in the cache, this allocates a new buffer for
391 : /// it. The caller may then initialize the buffer with the contents, and
392 : /// call mark_valid().
393 : ///
394 : /// Example usage:
395 : ///
396 : /// ```ignore
397 : /// let cache = page_cache::get();
398 : ///
399 : /// match cache.lock_for_read(&key) {
400 : /// ReadBufResult::Found(read_guard) => {
401 : /// // The page was found in cache. Use it
402 : /// },
403 : /// ReadBufResult::NotFound(write_guard) => {
404 : /// // The page was not found in cache. Read it from disk into the
405 : /// // buffer.
406 : /// //read_my_page_from_disk(write_guard);
407 : ///
408 : /// // The buffer contents are now valid. Tell the page cache.
409 : /// write_guard.mark_valid();
410 : /// },
411 : /// }
412 : /// ```
413 : ///
414 974273 : async fn lock_for_read(
415 974273 : &self,
416 974273 : cache_key: &CacheKey,
417 974273 : ctx: &RequestContext,
418 974273 : ) -> anyhow::Result<ReadBufResult> {
419 974273 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
420 :
421 974273 : let (read_access, hit) = match cache_key {
422 974273 : CacheKey::ImmutableFilePage { .. } => (
423 974273 : &crate::metrics::PAGE_CACHE
424 974273 : .for_ctx(ctx)
425 974273 : .read_accesses_immutable,
426 974273 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
427 974273 : ),
428 974273 : };
429 974273 : read_access.inc();
430 974273 :
431 974273 : let mut is_first_iteration = true;
432 : loop {
433 : // First check if the key already exists in the cache.
434 974273 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
435 910805 : debug_assert!(permit.is_none());
436 910805 : if is_first_iteration {
437 910805 : hit.inc();
438 910805 : }
439 910805 : return Ok(ReadBufResult::Found(read_guard));
440 63468 : }
441 63468 : debug_assert!(permit.is_some());
442 63468 : is_first_iteration = false;
443 :
444 : // Not found. Find a victim buffer
445 63468 : let (slot_idx, mut inner) = self
446 63468 : .find_victim(permit.as_ref().unwrap())
447 63468 : .await
448 63468 : .context("Failed to find evict victim")?;
449 :
450 : // Insert mapping for this. At this point, we may find that another
451 : // thread did the same thing concurrently. In that case, we evicted
452 : // our victim buffer unnecessarily. Put it into the free list and
453 : // continue with the slot that the other thread chose.
454 63468 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
455 : // TODO: put to free list
456 :
457 : // We now just loop back to start from beginning. This is not
458 : // optimal, we'll perform the lookup in the mapping again, which
459 : // is not really necessary because we already got
460 : // 'existing_slot_idx'. But this shouldn't happen often enough
461 : // to matter much.
462 0 : continue;
463 63468 : }
464 63468 :
465 63468 : // Make the slot ready
466 63468 : let slot = &self.slots[slot_idx];
467 63468 : inner.key = Some(cache_key.clone());
468 63468 : slot.set_usage_count(1);
469 63468 :
470 63468 : debug_assert!(
471 : {
472 63468 : let guard = inner.permit.lock().unwrap();
473 63468 : guard.upgrade().is_none()
474 : },
475 0 : "we hold a write lock, so, no one else should have a permit"
476 : );
477 :
478 63468 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
479 63468 : state: PageWriteGuardState::Invalid {
480 63468 : _permit: permit.take().unwrap(),
481 63468 : inner,
482 63468 : },
483 63468 : }));
484 : }
485 974273 : }
486 :
487 : //
488 : // Section 3: Mapping functions
489 : //
490 :
491 : /// Search for a page in the cache using the given search key.
492 : ///
493 : /// Returns the slot index, if any.
494 : ///
495 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
496 : /// get recycled for an unrelated page immediately after this function
497 : /// returns. The caller is responsible for re-checking that the slot still
498 : /// contains the page with the same key before using it.
499 : ///
500 974273 : fn search_mapping(&self, cache_key: &CacheKey) -> Option<usize> {
501 974273 : match cache_key {
502 974273 : CacheKey::ImmutableFilePage { file_id, blkno } => {
503 974273 : let map = self.immutable_page_map.read().unwrap();
504 974273 : Some(*map.get(&(*file_id, *blkno))?)
505 : }
506 : }
507 974273 : }
508 :
509 : ///
510 : /// Remove mapping for given key.
511 : ///
512 60180 : fn remove_mapping(&self, old_key: &CacheKey) {
513 60180 : match old_key {
514 60180 : CacheKey::ImmutableFilePage { file_id, blkno } => {
515 60180 : let mut map = self.immutable_page_map.write().unwrap();
516 60180 : map.remove(&(*file_id, *blkno))
517 60180 : .expect("could not find old key in mapping");
518 60180 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
519 60180 : }
520 60180 : }
521 60180 : }
522 :
523 : ///
524 : /// Insert mapping for given key.
525 : ///
526 : /// If a mapping already existed for the given key, returns the slot index
527 : /// of the existing mapping and leaves it untouched.
528 63468 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
529 63468 : match new_key {
530 63468 : CacheKey::ImmutableFilePage { file_id, blkno } => {
531 63468 : let mut map = self.immutable_page_map.write().unwrap();
532 63468 : match map.entry((*file_id, *blkno)) {
533 0 : Entry::Occupied(entry) => Some(*entry.get()),
534 63468 : Entry::Vacant(entry) => {
535 63468 : entry.insert(slot_idx);
536 63468 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
537 63468 : None
538 : }
539 : }
540 : }
541 : }
542 63468 : }
543 :
544 : //
545 : // Section 4: Misc internal helpers
546 : //
547 :
548 : /// Find a slot to evict.
549 : ///
550 : /// On return, the slot is empty and write-locked.
551 63468 : async fn find_victim(
552 63468 : &self,
553 63468 : _permit_witness: &PinnedSlotsPermit,
554 63468 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
555 63468 : let iter_limit = self.slots.len() * 10;
556 63468 : let mut iters = 0;
557 : loop {
558 169657 : iters += 1;
559 169657 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
560 169657 :
561 169657 : let slot = &self.slots[slot_idx];
562 169657 :
563 169657 : if slot.dec_usage_count() == 0 {
564 63468 : let mut inner = match slot.inner.try_write() {
565 63468 : Ok(inner) => inner,
566 0 : Err(_err) => {
567 0 : if iters > iter_limit {
568 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
569 : // any particular number of iterations: other threads might race ahead and acquire and
570 : // release pins just as we're scanning the array.
571 : //
572 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
573 : // slots. There are two threads running concurrently, A and B. A has just
574 : // acquired the permit from the semaphore.
575 : //
576 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
577 : // B: Acquire permit.
578 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
579 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
580 : // B: Release pin and permit again
581 : // B: Acquire permit.
582 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
583 : // B: Release pin and permit again
584 : //
585 : // Now we're back in the starting situation that both slots have
586 : // usage_count 1, but A has now been through one iteration of the
587 : // find_victim() loop. This can repeat indefinitely and on each
588 : // iteration, A's iteration count increases by one.
589 : //
590 : // So, even though the semaphore for the permits is fair, the victim search
591 : // itself happens in parallel and is not fair.
592 : // Hence even with a permit, a task can theoretically be starved.
593 : // To avoid this, we'd need tokio to give priority to tasks that are holding
594 : // permits for longer.
595 : // Note that just yielding to tokio during iteration without such
596 : // priority boosting is likely counter-productive. We'd just give more opportunities
597 : // for B to bump usage count, further starving A.
598 0 : page_cache_eviction_metrics::observe(
599 0 : page_cache_eviction_metrics::Outcome::ItersExceeded {
600 0 : iters: iters.try_into().unwrap(),
601 0 : },
602 0 : );
603 0 : anyhow::bail!("exceeded evict iter limit");
604 0 : }
605 0 : continue;
606 : }
607 : };
608 63468 : if let Some(old_key) = &inner.key {
609 60180 : // remove mapping for old buffer
610 60180 : self.remove_mapping(old_key);
611 60180 : inner.key = None;
612 60180 : page_cache_eviction_metrics::observe(
613 60180 : page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
614 60180 : iters: iters.try_into().unwrap(),
615 60180 : },
616 60180 : );
617 60180 : } else {
618 3288 : page_cache_eviction_metrics::observe(
619 3288 : page_cache_eviction_metrics::Outcome::FoundSlotUnused {
620 3288 : iters: iters.try_into().unwrap(),
621 3288 : },
622 3288 : );
623 3288 : }
624 63468 : return Ok((slot_idx, inner));
625 106189 : }
626 : }
627 63468 : }
628 :
629 : /// Initialize a new page cache
630 : ///
631 : /// This should be called only once at page server startup.
632 188 : fn new(num_pages: usize) -> Self {
633 188 : assert!(num_pages > 0, "page cache size must be > 0");
634 :
635 : // We could use Vec::leak here, but that potentially also leaks
636 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
637 : // this is avoided.
638 188 : let page_buffer = IoBufferMut::with_capacity_zeroed(num_pages * PAGE_SZ).leak();
639 188 :
640 188 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
641 188 : size_metrics.max_bytes.set_page_sz(num_pages);
642 188 : size_metrics.current_bytes_immutable.set_page_sz(0);
643 188 :
644 188 : let slots = page_buffer
645 188 : .chunks_exact_mut(PAGE_SZ)
646 9400 : .map(|chunk| {
647 9400 : // SAFETY: Each chunk has `PAGE_SZ` (8192) bytes, greater than 512, still aligned.
648 9400 : let buf = unsafe { IoPageSlice::new_unchecked(chunk.try_into().unwrap()) };
649 9400 :
650 9400 : Slot {
651 9400 : inner: tokio::sync::RwLock::new(SlotInner {
652 9400 : key: None,
653 9400 : buf,
654 9400 : permit: std::sync::Mutex::new(Weak::new()),
655 9400 : }),
656 9400 : usage_count: AtomicU8::new(0),
657 9400 : }
658 9400 : })
659 188 : .collect();
660 188 :
661 188 : Self {
662 188 : immutable_page_map: Default::default(),
663 188 : slots,
664 188 : next_evict_slot: AtomicUsize::new(0),
665 188 : size_metrics,
666 188 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
667 188 : }
668 188 : }
669 : }
670 :
671 : trait PageSzBytesMetric {
672 : fn set_page_sz(&self, count: usize);
673 : fn add_page_sz(&self, count: usize);
674 : fn sub_page_sz(&self, count: usize);
675 : }
676 :
677 : #[inline(always)]
678 124024 : fn count_times_page_sz(count: usize) -> u64 {
679 124024 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
680 124024 : }
681 :
682 : impl PageSzBytesMetric for metrics::UIntGauge {
683 376 : fn set_page_sz(&self, count: usize) {
684 376 : self.set(count_times_page_sz(count));
685 376 : }
686 63468 : fn add_page_sz(&self, count: usize) {
687 63468 : self.add(count_times_page_sz(count));
688 63468 : }
689 60180 : fn sub_page_sz(&self, count: usize) {
690 60180 : self.sub(count_times_page_sz(count));
691 60180 : }
692 : }
|