Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Materialized pages**, filled & used by page reconstruction
21 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
22 : //!
23 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
24 : //! It uses the page cache only for the blocks that are already fully written and immutable.
25 : //!
26 : //! # Filling The Page Cache
27 : //!
28 : //! Page cache maps from a cache key to a buffer slot.
29 : //! The cache key uniquely identifies the piece of data that is being cached.
30 : //!
31 : //! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
32 : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
33 : //!
34 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
35 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
36 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
37 : //! * Get a [`FileId`] using [`next_file_id`].
38 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
39 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
40 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
41 : //! a read guard for the page. Just use it.
42 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
43 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
44 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
45 : //! Then try again to [`PageCache::read_immutable_buf`].
46 : //! Unless there's high cache pressure, the page should now be cached.
47 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
48 : //!
49 : //! # Locking
50 : //!
51 : //! There are two levels of locking involved: There's one lock for the "mapping"
52 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
53 : //! slot, and a separate lock on each slot. To read or write the contents of a
54 : //! slot, you must hold the lock on the slot in read or write mode,
55 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
56 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
57 : //! the slot at the same time.
58 : //!
59 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
60 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
61 : //! in the cache, you would first look up the mapping, while holding the mapping
62 : //! lock, and then lock the slot. You must release the mapping lock in between,
63 : //! to obey the lock ordering and avoid deadlock.
64 : //!
65 : //! A slot can momentarily have invalid contents, even if it's already been
66 : //! inserted to the mapping, but you must hold the write-lock on the slot until
67 : //! the contents are valid. If you need to release the lock without initializing
68 : //! the contents, you must remove the mapping first. We make that easy for the
69 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
70 : //! initialized it. If the guard is dropped without calling mark_valid(), the
71 : //! mapping is automatically removed and the slot is marked free.
72 : //!
73 :
74 : use std::{
75 : collections::{hash_map::Entry, HashMap},
76 : sync::{
77 : atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
78 : Arc, Weak,
79 : },
80 : time::Duration,
81 : };
82 :
83 : use anyhow::Context;
84 : use once_cell::sync::OnceCell;
85 : use pageserver_api::shard::TenantShardId;
86 : use utils::{id::TimelineId, lsn::Lsn};
87 :
88 : use crate::{
89 : context::RequestContext,
90 : metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
91 : repository::Key,
92 : };
93 :
94 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
95 : const TEST_PAGE_CACHE_SIZE: usize = 50;
96 :
97 : ///
98 : /// Initialize the page cache. This must be called once at page server startup.
99 : ///
100 0 : pub fn init(size: usize) {
101 0 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
102 0 : panic!("page cache already initialized");
103 0 : }
104 0 : }
105 :
106 : ///
107 : /// Get a handle to the page cache.
108 : ///
109 7672945 : pub fn get() -> &'static PageCache {
110 7672945 : //
111 7672945 : // In unit tests, page server startup doesn't happen and no one calls
112 7672945 : // page_cache::init(). Initialize it here with a tiny cache, so that the
113 7672945 : // page cache is usable in unit tests.
114 7672945 : //
115 7672945 : if cfg!(test) {
116 7672945 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
117 : } else {
118 0 : PAGE_CACHE.get().expect("page cache not initialized")
119 : }
120 7672945 : }
121 :
122 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
123 : const MAX_USAGE_COUNT: u8 = 5;
124 :
125 : /// See module-level comment.
126 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
127 : pub struct FileId(u64);
128 :
129 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
130 :
131 : /// See module-level comment.
132 2308 : pub fn next_file_id() -> FileId {
133 2308 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
134 2308 : }
135 :
136 : ///
137 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
138 : ///
139 : #[derive(Debug, PartialEq, Eq, Clone)]
140 : #[allow(clippy::enum_variant_names)]
141 : enum CacheKey {
142 : MaterializedPage {
143 : hash_key: MaterializedPageHashKey,
144 : lsn: Lsn,
145 : },
146 : ImmutableFilePage {
147 : file_id: FileId,
148 : blkno: u32,
149 : },
150 : }
151 :
152 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
153 : struct MaterializedPageHashKey {
154 : /// Why is this TenantShardId rather than TenantId?
155 : ///
156 : /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
157 : /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
158 : /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
159 : /// special-cased in some other way.
160 : tenant_shard_id: TenantShardId,
161 : timeline_id: TimelineId,
162 : key: Key,
163 : }
164 :
165 : #[derive(Clone)]
166 : struct Version {
167 : lsn: Lsn,
168 : slot_idx: usize,
169 : }
170 :
171 : struct Slot {
172 : inner: tokio::sync::RwLock<SlotInner>,
173 : usage_count: AtomicU8,
174 : }
175 :
176 : struct SlotInner {
177 : key: Option<CacheKey>,
178 : // for `coalesce_readers_permit`
179 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
180 : buf: &'static mut [u8; PAGE_SZ],
181 : }
182 :
183 : impl Slot {
184 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
185 6927958 : fn inc_usage_count(&self) {
186 6927958 : let _ = self
187 6927958 : .usage_count
188 6927958 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
189 6927958 : if val == MAX_USAGE_COUNT {
190 6606098 : None
191 : } else {
192 321860 : Some(val + 1)
193 : }
194 6927958 : });
195 6927958 : }
196 :
197 : /// Decrement usage count on the buffer, unless it's already zero. Returns
198 : /// the old usage count.
199 652942 : fn dec_usage_count(&self) -> u8 {
200 652942 : let count_res =
201 652942 : self.usage_count
202 652942 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
203 652942 : if val == 0 {
204 167083 : None
205 : } else {
206 485859 : Some(val - 1)
207 : }
208 652942 : });
209 652942 :
210 652942 : match count_res {
211 485859 : Ok(usage_count) => usage_count,
212 167083 : Err(usage_count) => usage_count,
213 : }
214 652942 : }
215 :
216 : /// Sets the usage count to a specific value.
217 167081 : fn set_usage_count(&self, count: u8) {
218 167081 : self.usage_count.store(count, Ordering::Relaxed);
219 167081 : }
220 : }
221 :
222 : impl SlotInner {
223 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
224 6927958 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
225 6927958 : let mut guard = self.permit.lock().unwrap();
226 6927958 : if let Some(existing_permit) = guard.upgrade() {
227 0 : drop(guard);
228 0 : drop(permit);
229 0 : existing_permit
230 : } else {
231 6927958 : let permit = Arc::new(permit);
232 6927958 : *guard = Arc::downgrade(&permit);
233 6927958 : permit
234 : }
235 6927958 : }
236 : }
237 :
238 : pub struct PageCache {
239 : /// This contains the mapping from the cache key to buffer slot that currently
240 : /// contains the page, if any.
241 : ///
242 : /// TODO: This is protected by a single lock. If that becomes a bottleneck,
243 : /// this HashMap can be replaced with a more concurrent version, there are
244 : /// plenty of such crates around.
245 : ///
246 : /// If you add support for caching different kinds of objects, each object kind
247 : /// can have a separate mapping map, next to this field.
248 : materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
249 :
250 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
251 :
252 : /// The actual buffers with their metadata.
253 : slots: Box<[Slot]>,
254 :
255 : pinned_slots: Arc<tokio::sync::Semaphore>,
256 :
257 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
258 : /// This is interpreted modulo the page cache size.
259 : next_evict_slot: AtomicUsize,
260 :
261 : size_metrics: &'static PageCacheSizeMetrics,
262 : }
263 :
264 : struct PinnedSlotsPermit {
265 : _permit: tokio::sync::OwnedSemaphorePermit,
266 : }
267 :
268 : ///
269 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
270 : /// until the guard is dropped.
271 : ///
272 : pub struct PageReadGuard<'i> {
273 : _permit: Arc<PinnedSlotsPermit>,
274 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
275 : }
276 :
277 : impl std::ops::Deref for PageReadGuard<'_> {
278 : type Target = [u8; PAGE_SZ];
279 :
280 13498633 : fn deref(&self) -> &Self::Target {
281 13498633 : self.slot_guard.buf
282 13498633 : }
283 : }
284 :
285 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
286 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
287 0 : self.slot_guard.buf
288 0 : }
289 : }
290 :
291 : ///
292 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
293 : /// until the guard is dropped.
294 : ///
295 : /// Counterintuitively, this is used even for a read, if the requested page is not
296 : /// currently found in the page cache. In that case, the caller of lock_for_read()
297 : /// is expected to fill in the page contents and call mark_valid().
298 : pub struct PageWriteGuard<'i> {
299 : state: PageWriteGuardState<'i>,
300 : }
301 :
302 : enum PageWriteGuardState<'i> {
303 : Invalid {
304 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
305 : _permit: PinnedSlotsPermit,
306 : },
307 : Downgraded,
308 : }
309 :
310 : impl std::ops::DerefMut for PageWriteGuard<'_> {
311 167081 : fn deref_mut(&mut self) -> &mut Self::Target {
312 167081 : match &mut self.state {
313 167081 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
314 0 : PageWriteGuardState::Downgraded => unreachable!(),
315 : }
316 167081 : }
317 : }
318 :
319 : impl std::ops::Deref for PageWriteGuard<'_> {
320 : type Target = [u8; PAGE_SZ];
321 :
322 402874 : fn deref(&self) -> &Self::Target {
323 402874 : match &self.state {
324 402874 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
325 0 : PageWriteGuardState::Downgraded => unreachable!(),
326 : }
327 402874 : }
328 : }
329 :
330 : impl<'a> PageWriteGuard<'a> {
331 : /// Mark that the buffer contents are now valid.
332 : #[must_use]
333 167081 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
334 167081 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
335 167081 : match prev {
336 167081 : PageWriteGuardState::Invalid { inner, _permit } => {
337 167081 : assert!(inner.key.is_some());
338 167081 : PageReadGuard {
339 167081 : _permit: Arc::new(_permit),
340 167081 : slot_guard: inner.downgrade(),
341 167081 : }
342 : }
343 0 : PageWriteGuardState::Downgraded => unreachable!(),
344 : }
345 167081 : }
346 : }
347 :
348 : impl Drop for PageWriteGuard<'_> {
349 : ///
350 : /// If the buffer was allocated for a page that was not already in the
351 : /// cache, but the lock_for_read/write() caller dropped the buffer without
352 : /// initializing it, remove the mapping from the page cache.
353 : ///
354 167081 : fn drop(&mut self) {
355 167081 : match &mut self.state {
356 0 : PageWriteGuardState::Invalid { inner, _permit } => {
357 0 : assert!(inner.key.is_some());
358 0 : let self_key = inner.key.as_ref().unwrap();
359 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
360 0 : inner.key = None;
361 : }
362 167081 : PageWriteGuardState::Downgraded => {}
363 : }
364 167081 : }
365 : }
366 :
367 : /// lock_for_read() return value
368 : pub enum ReadBufResult<'a> {
369 : Found(PageReadGuard<'a>),
370 : NotFound(PageWriteGuard<'a>),
371 : }
372 :
373 : impl PageCache {
374 : //
375 : // Section 1.1: Public interface functions for looking up and memorizing materialized page
376 : // versions in the page cache
377 : //
378 :
379 : /// Look up a materialized page version.
380 : ///
381 : /// The 'lsn' is an upper bound, this will return the latest version of
382 : /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
383 : /// returned page.
384 624176 : pub async fn lookup_materialized_page(
385 624176 : &self,
386 624176 : tenant_shard_id: TenantShardId,
387 624176 : timeline_id: TimelineId,
388 624176 : key: &Key,
389 624176 : lsn: Lsn,
390 624176 : ctx: &RequestContext,
391 624176 : ) -> Option<(Lsn, PageReadGuard)> {
392 624176 : let Ok(permit) = self.try_get_pinned_slot_permit().await else {
393 0 : return None;
394 : };
395 :
396 624176 : crate::metrics::PAGE_CACHE
397 624176 : .for_ctx(ctx)
398 624176 : .read_accesses_materialized_page
399 624176 : .inc();
400 624176 :
401 624176 : let mut cache_key = CacheKey::MaterializedPage {
402 624176 : hash_key: MaterializedPageHashKey {
403 624176 : tenant_shard_id,
404 624176 : timeline_id,
405 624176 : key: *key,
406 624176 : },
407 624176 : lsn,
408 624176 : };
409 :
410 624176 : if let Some(guard) = self
411 624176 : .try_lock_for_read(&mut cache_key, &mut Some(permit))
412 0 : .await
413 : {
414 : if let CacheKey::MaterializedPage {
415 : hash_key: _,
416 0 : lsn: available_lsn,
417 0 : } = cache_key
418 : {
419 0 : if available_lsn == lsn {
420 0 : crate::metrics::PAGE_CACHE
421 0 : .for_ctx(ctx)
422 0 : .read_hits_materialized_page_exact
423 0 : .inc();
424 0 : } else {
425 0 : crate::metrics::PAGE_CACHE
426 0 : .for_ctx(ctx)
427 0 : .read_hits_materialized_page_older_lsn
428 0 : .inc();
429 0 : }
430 0 : Some((available_lsn, guard))
431 : } else {
432 0 : panic!("unexpected key type in slot");
433 : }
434 : } else {
435 624176 : None
436 : }
437 624176 : }
438 :
439 : ///
440 : /// Store an image of the given page in the cache.
441 : ///
442 0 : pub async fn memorize_materialized_page(
443 0 : &self,
444 0 : tenant_shard_id: TenantShardId,
445 0 : timeline_id: TimelineId,
446 0 : key: Key,
447 0 : lsn: Lsn,
448 0 : img: &[u8],
449 0 : ) -> anyhow::Result<()> {
450 0 : let cache_key = CacheKey::MaterializedPage {
451 0 : hash_key: MaterializedPageHashKey {
452 0 : tenant_shard_id,
453 0 : timeline_id,
454 0 : key,
455 0 : },
456 0 : lsn,
457 0 : };
458 :
459 0 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
460 : loop {
461 : // First check if the key already exists in the cache.
462 0 : if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
463 : // The page was found in the mapping. Lock the slot, and re-check
464 : // that it's still what we expected (because we don't released the mapping
465 : // lock already, another thread could have evicted the page)
466 0 : let slot = &self.slots[slot_idx];
467 0 : let inner = slot.inner.write().await;
468 0 : if inner.key.as_ref() == Some(&cache_key) {
469 0 : slot.inc_usage_count();
470 0 : debug_assert!(
471 : {
472 0 : let guard = inner.permit.lock().unwrap();
473 0 : guard.upgrade().is_none()
474 : },
475 0 : "we hold a write lock, so, no one else should have a permit"
476 : );
477 0 : debug_assert_eq!(inner.buf.len(), img.len());
478 : // We already had it in cache. Another thread must've put it there
479 : // concurrently. Check that it had the same contents that we
480 : // replayed.
481 0 : assert!(inner.buf == img);
482 0 : return Ok(());
483 0 : }
484 0 : }
485 0 : debug_assert!(permit.is_some());
486 :
487 : // Not found. Find a victim buffer
488 0 : let (slot_idx, mut inner) = self
489 0 : .find_victim(permit.as_ref().unwrap())
490 0 : .await
491 0 : .context("Failed to find evict victim")?;
492 :
493 : // Insert mapping for this. At this point, we may find that another
494 : // thread did the same thing concurrently. In that case, we evicted
495 : // our victim buffer unnecessarily. Put it into the free list and
496 : // continue with the slot that the other thread chose.
497 0 : if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
498 : // TODO: put to free list
499 :
500 : // We now just loop back to start from beginning. This is not
501 : // optimal, we'll perform the lookup in the mapping again, which
502 : // is not really necessary because we already got
503 : // 'existing_slot_idx'. But this shouldn't happen often enough
504 : // to matter much.
505 0 : continue;
506 0 : }
507 0 :
508 0 : // Make the slot ready
509 0 : let slot = &self.slots[slot_idx];
510 0 : inner.key = Some(cache_key.clone());
511 0 : slot.set_usage_count(1);
512 0 : // Create a write guard for the slot so we go through the expected motions.
513 0 : debug_assert!(
514 : {
515 0 : let guard = inner.permit.lock().unwrap();
516 0 : guard.upgrade().is_none()
517 : },
518 0 : "we hold a write lock, so, no one else should have a permit"
519 : );
520 0 : let mut write_guard = PageWriteGuard {
521 0 : state: PageWriteGuardState::Invalid {
522 0 : _permit: permit.take().unwrap(),
523 0 : inner,
524 0 : },
525 0 : };
526 0 : write_guard.copy_from_slice(img);
527 0 : let _ = write_guard.mark_valid();
528 0 : return Ok(());
529 : }
530 0 : }
531 :
532 : // Section 1.2: Public interface functions for working with immutable file pages.
533 :
534 7095039 : pub async fn read_immutable_buf(
535 7095039 : &self,
536 7095039 : file_id: FileId,
537 7095039 : blkno: u32,
538 7095039 : ctx: &RequestContext,
539 7095039 : ) -> anyhow::Result<ReadBufResult> {
540 7095039 : let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
541 7095039 :
542 7095039 : self.lock_for_read(&mut cache_key, ctx).await
543 7095039 : }
544 :
545 : //
546 : // Section 2: Internal interface functions for lookup/update.
547 : //
548 : // To add support for a new kind of "thing" to cache, you will need
549 : // to add public interface routines above, and code to deal with the
550 : // "mappings" after this section. But the routines in this section should
551 : // not require changes.
552 :
553 7719215 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
554 7719215 : match tokio::time::timeout(
555 7719215 : // Choose small timeout, neon_smgr does its own retries.
556 7719215 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
557 7719215 : Duration::from_secs(10),
558 7719215 : Arc::clone(&self.pinned_slots).acquire_owned(),
559 7719215 : )
560 63379 : .await
561 : {
562 7719215 : Ok(res) => Ok(PinnedSlotsPermit {
563 7719215 : _permit: res.expect("this semaphore is never closed"),
564 7719215 : }),
565 0 : Err(_timeout) => {
566 0 : crate::metrics::page_cache_errors_inc(
567 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
568 0 : );
569 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
570 : }
571 : }
572 7719215 : }
573 :
574 : /// Look up a page in the cache.
575 : ///
576 : /// If the search criteria is not exact, *cache_key is updated with the key
577 : /// for exact key of the returned page. (For materialized pages, that means
578 : /// that the LSN in 'cache_key' is updated with the LSN of the returned page
579 : /// version.)
580 : ///
581 : /// If no page is found, returns None and *cache_key is left unmodified.
582 : ///
583 7719215 : async fn try_lock_for_read(
584 7719215 : &self,
585 7719215 : cache_key: &mut CacheKey,
586 7719215 : permit: &mut Option<PinnedSlotsPermit>,
587 7719215 : ) -> Option<PageReadGuard> {
588 7719215 : let cache_key_orig = cache_key.clone();
589 7719215 : if let Some(slot_idx) = self.search_mapping(cache_key) {
590 : // The page was found in the mapping. Lock the slot, and re-check
591 : // that it's still what we expected (because we released the mapping
592 : // lock already, another thread could have evicted the page)
593 6927958 : let slot = &self.slots[slot_idx];
594 6927958 : let inner = slot.inner.read().await;
595 6927958 : if inner.key.as_ref() == Some(cache_key) {
596 6927958 : slot.inc_usage_count();
597 6927958 : return Some(PageReadGuard {
598 6927958 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
599 6927958 : slot_guard: inner,
600 6927958 : });
601 0 : } else {
602 0 : // search_mapping might have modified the search key; restore it.
603 0 : *cache_key = cache_key_orig;
604 0 : }
605 791257 : }
606 791257 : None
607 7719215 : }
608 :
609 : /// Return a locked buffer for given block.
610 : ///
611 : /// Like try_lock_for_read(), if the search criteria is not exact and the
612 : /// page is already found in the cache, *cache_key is updated.
613 : ///
614 : /// If the page is not found in the cache, this allocates a new buffer for
615 : /// it. The caller may then initialize the buffer with the contents, and
616 : /// call mark_valid().
617 : ///
618 : /// Example usage:
619 : ///
620 : /// ```ignore
621 : /// let cache = page_cache::get();
622 : ///
623 : /// match cache.lock_for_read(&key) {
624 : /// ReadBufResult::Found(read_guard) => {
625 : /// // The page was found in cache. Use it
626 : /// },
627 : /// ReadBufResult::NotFound(write_guard) => {
628 : /// // The page was not found in cache. Read it from disk into the
629 : /// // buffer.
630 : /// //read_my_page_from_disk(write_guard);
631 : ///
632 : /// // The buffer contents are now valid. Tell the page cache.
633 : /// write_guard.mark_valid();
634 : /// },
635 : /// }
636 : /// ```
637 : ///
638 7095039 : async fn lock_for_read(
639 7095039 : &self,
640 7095039 : cache_key: &mut CacheKey,
641 7095039 : ctx: &RequestContext,
642 7095039 : ) -> anyhow::Result<ReadBufResult> {
643 7095039 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
644 :
645 7095039 : let (read_access, hit) = match cache_key {
646 : CacheKey::MaterializedPage { .. } => {
647 0 : unreachable!("Materialized pages use lookup_materialized_page")
648 : }
649 7095039 : CacheKey::ImmutableFilePage { .. } => (
650 7095039 : &crate::metrics::PAGE_CACHE
651 7095039 : .for_ctx(ctx)
652 7095039 : .read_accesses_immutable,
653 7095039 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
654 7095039 : ),
655 7095039 : };
656 7095039 : read_access.inc();
657 7095039 :
658 7095039 : let mut is_first_iteration = true;
659 : loop {
660 : // First check if the key already exists in the cache.
661 7095039 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
662 6927958 : debug_assert!(permit.is_none());
663 6927958 : if is_first_iteration {
664 6927958 : hit.inc();
665 6927958 : }
666 6927958 : return Ok(ReadBufResult::Found(read_guard));
667 167081 : }
668 167081 : debug_assert!(permit.is_some());
669 167081 : is_first_iteration = false;
670 :
671 : // Not found. Find a victim buffer
672 167081 : let (slot_idx, mut inner) = self
673 167081 : .find_victim(permit.as_ref().unwrap())
674 0 : .await
675 167081 : .context("Failed to find evict victim")?;
676 :
677 : // Insert mapping for this. At this point, we may find that another
678 : // thread did the same thing concurrently. In that case, we evicted
679 : // our victim buffer unnecessarily. Put it into the free list and
680 : // continue with the slot that the other thread chose.
681 167081 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
682 : // TODO: put to free list
683 :
684 : // We now just loop back to start from beginning. This is not
685 : // optimal, we'll perform the lookup in the mapping again, which
686 : // is not really necessary because we already got
687 : // 'existing_slot_idx'. But this shouldn't happen often enough
688 : // to matter much.
689 0 : continue;
690 167081 : }
691 167081 :
692 167081 : // Make the slot ready
693 167081 : let slot = &self.slots[slot_idx];
694 167081 : inner.key = Some(cache_key.clone());
695 167081 : slot.set_usage_count(1);
696 167081 :
697 167081 : debug_assert!(
698 : {
699 167081 : let guard = inner.permit.lock().unwrap();
700 167081 : guard.upgrade().is_none()
701 : },
702 0 : "we hold a write lock, so, no one else should have a permit"
703 : );
704 :
705 167081 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
706 167081 : state: PageWriteGuardState::Invalid {
707 167081 : _permit: permit.take().unwrap(),
708 167081 : inner,
709 167081 : },
710 167081 : }));
711 : }
712 7095039 : }
713 :
714 : //
715 : // Section 3: Mapping functions
716 : //
717 :
718 : /// Search for a page in the cache using the given search key.
719 : ///
720 : /// Returns the slot index, if any. If the search criteria is not exact,
721 : /// *cache_key is updated with the actual key of the found page.
722 : ///
723 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
724 : /// get recycled for an unrelated page immediately after this function
725 : /// returns. The caller is responsible for re-checking that the slot still
726 : /// contains the page with the same key before using it.
727 : ///
728 7719215 : fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
729 7719215 : match cache_key {
730 624176 : CacheKey::MaterializedPage { hash_key, lsn } => {
731 624176 : let map = self.materialized_page_map.read().unwrap();
732 624176 : let versions = map.get(hash_key)?;
733 :
734 0 : let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
735 0 : Ok(version_idx) => version_idx,
736 0 : Err(0) => return None,
737 0 : Err(version_idx) => version_idx - 1,
738 : };
739 0 : let version = &versions[version_idx];
740 0 : *lsn = version.lsn;
741 0 : Some(version.slot_idx)
742 : }
743 7095039 : CacheKey::ImmutableFilePage { file_id, blkno } => {
744 7095039 : let map = self.immutable_page_map.read().unwrap();
745 7095039 : Some(*map.get(&(*file_id, *blkno))?)
746 : }
747 : }
748 7719215 : }
749 :
750 : /// Search for a page in the cache using the given search key.
751 : ///
752 : /// Like 'search_mapping, but performs an "exact" search. Used for
753 : /// allocating a new buffer.
754 0 : fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
755 0 : match key {
756 0 : CacheKey::MaterializedPage { hash_key, lsn } => {
757 0 : let map = self.materialized_page_map.read().unwrap();
758 0 : let versions = map.get(hash_key)?;
759 :
760 0 : if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
761 0 : Some(versions[version_idx].slot_idx)
762 : } else {
763 0 : None
764 : }
765 : }
766 0 : CacheKey::ImmutableFilePage { file_id, blkno } => {
767 0 : let map = self.immutable_page_map.read().unwrap();
768 0 : Some(*map.get(&(*file_id, *blkno))?)
769 : }
770 : }
771 0 : }
772 :
773 : ///
774 : /// Remove mapping for given key.
775 : ///
776 165249 : fn remove_mapping(&self, old_key: &CacheKey) {
777 165249 : match old_key {
778 : CacheKey::MaterializedPage {
779 0 : hash_key: old_hash_key,
780 0 : lsn: old_lsn,
781 0 : } => {
782 0 : let mut map = self.materialized_page_map.write().unwrap();
783 0 : if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
784 0 : let versions = old_entry.get_mut();
785 :
786 0 : if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
787 0 : versions.remove(version_idx);
788 0 : self.size_metrics
789 0 : .current_bytes_materialized_page
790 0 : .sub_page_sz(1);
791 0 : if versions.is_empty() {
792 0 : old_entry.remove_entry();
793 0 : }
794 0 : }
795 : } else {
796 0 : panic!("could not find old key in mapping")
797 : }
798 : }
799 165249 : CacheKey::ImmutableFilePage { file_id, blkno } => {
800 165249 : let mut map = self.immutable_page_map.write().unwrap();
801 165249 : map.remove(&(*file_id, *blkno))
802 165249 : .expect("could not find old key in mapping");
803 165249 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
804 165249 : }
805 : }
806 165249 : }
807 :
808 : ///
809 : /// Insert mapping for given key.
810 : ///
811 : /// If a mapping already existed for the given key, returns the slot index
812 : /// of the existing mapping and leaves it untouched.
813 167081 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
814 167081 : match new_key {
815 : CacheKey::MaterializedPage {
816 0 : hash_key: new_key,
817 0 : lsn: new_lsn,
818 0 : } => {
819 0 : let mut map = self.materialized_page_map.write().unwrap();
820 0 : let versions = map.entry(new_key.clone()).or_default();
821 0 : match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
822 0 : Ok(version_idx) => Some(versions[version_idx].slot_idx),
823 0 : Err(version_idx) => {
824 0 : versions.insert(
825 0 : version_idx,
826 0 : Version {
827 0 : lsn: *new_lsn,
828 0 : slot_idx,
829 0 : },
830 0 : );
831 0 : self.size_metrics
832 0 : .current_bytes_materialized_page
833 0 : .add_page_sz(1);
834 0 : None
835 : }
836 : }
837 : }
838 :
839 167081 : CacheKey::ImmutableFilePage { file_id, blkno } => {
840 167081 : let mut map = self.immutable_page_map.write().unwrap();
841 167081 : match map.entry((*file_id, *blkno)) {
842 0 : Entry::Occupied(entry) => Some(*entry.get()),
843 167081 : Entry::Vacant(entry) => {
844 167081 : entry.insert(slot_idx);
845 167081 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
846 167081 : None
847 : }
848 : }
849 : }
850 : }
851 167081 : }
852 :
853 : //
854 : // Section 4: Misc internal helpers
855 : //
856 :
857 : /// Find a slot to evict.
858 : ///
859 : /// On return, the slot is empty and write-locked.
860 167081 : async fn find_victim(
861 167081 : &self,
862 167081 : _permit_witness: &PinnedSlotsPermit,
863 167081 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
864 167081 : let iter_limit = self.slots.len() * 10;
865 167081 : let mut iters = 0;
866 652942 : loop {
867 652942 : iters += 1;
868 652942 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
869 652942 :
870 652942 : let slot = &self.slots[slot_idx];
871 652942 :
872 652942 : if slot.dec_usage_count() == 0 {
873 167083 : let mut inner = match slot.inner.try_write() {
874 167081 : Ok(inner) => inner,
875 2 : Err(_err) => {
876 2 : if iters > iter_limit {
877 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
878 : // any particular number of iterations: other threads might race ahead and acquire and
879 : // release pins just as we're scanning the array.
880 : //
881 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
882 : // slots. There are two threads running concurrently, A and B. A has just
883 : // acquired the permit from the semaphore.
884 : //
885 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
886 : // B: Acquire permit.
887 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
888 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
889 : // B: Release pin and permit again
890 : // B: Acquire permit.
891 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
892 : // B: Release pin and permit again
893 : //
894 : // Now we're back in the starting situation that both slots have
895 : // usage_count 1, but A has now been through one iteration of the
896 : // find_victim() loop. This can repeat indefinitely and on each
897 : // iteration, A's iteration count increases by one.
898 : //
899 : // So, even though the semaphore for the permits is fair, the victim search
900 : // itself happens in parallel and is not fair.
901 : // Hence even with a permit, a task can theoretically be starved.
902 : // To avoid this, we'd need tokio to give priority to tasks that are holding
903 : // permits for longer.
904 : // Note that just yielding to tokio during iteration without such
905 : // priority boosting is likely counter-productive. We'd just give more opportunities
906 : // for B to bump usage count, further starving A.
907 0 : page_cache_eviction_metrics::observe(
908 0 : page_cache_eviction_metrics::Outcome::ItersExceeded {
909 0 : iters: iters.try_into().unwrap(),
910 0 : },
911 0 : );
912 0 : anyhow::bail!("exceeded evict iter limit");
913 2 : }
914 2 : continue;
915 : }
916 : };
917 167081 : if let Some(old_key) = &inner.key {
918 165249 : // remove mapping for old buffer
919 165249 : self.remove_mapping(old_key);
920 165249 : inner.key = None;
921 165249 : page_cache_eviction_metrics::observe(
922 165249 : page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
923 165249 : iters: iters.try_into().unwrap(),
924 165249 : },
925 165249 : );
926 165249 : } else {
927 1832 : page_cache_eviction_metrics::observe(
928 1832 : page_cache_eviction_metrics::Outcome::FoundSlotUnused {
929 1832 : iters: iters.try_into().unwrap(),
930 1832 : },
931 1832 : );
932 1832 : }
933 167081 : return Ok((slot_idx, inner));
934 485859 : }
935 : }
936 167081 : }
937 :
938 : /// Initialize a new page cache
939 : ///
940 : /// This should be called only once at page server startup.
941 136 : fn new(num_pages: usize) -> Self {
942 136 : assert!(num_pages > 0, "page cache size must be > 0");
943 :
944 : // We could use Vec::leak here, but that potentially also leaks
945 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
946 : // this is avoided.
947 136 : let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
948 136 :
949 136 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
950 136 : size_metrics.max_bytes.set_page_sz(num_pages);
951 136 : size_metrics.current_bytes_immutable.set_page_sz(0);
952 136 : size_metrics.current_bytes_materialized_page.set_page_sz(0);
953 136 :
954 136 : let slots = page_buffer
955 136 : .chunks_exact_mut(PAGE_SZ)
956 6800 : .map(|chunk| {
957 6800 : let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
958 6800 :
959 6800 : Slot {
960 6800 : inner: tokio::sync::RwLock::new(SlotInner {
961 6800 : key: None,
962 6800 : buf,
963 6800 : permit: std::sync::Mutex::new(Weak::new()),
964 6800 : }),
965 6800 : usage_count: AtomicU8::new(0),
966 6800 : }
967 6800 : })
968 136 : .collect();
969 136 :
970 136 : Self {
971 136 : materialized_page_map: Default::default(),
972 136 : immutable_page_map: Default::default(),
973 136 : slots,
974 136 : next_evict_slot: AtomicUsize::new(0),
975 136 : size_metrics,
976 136 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
977 136 : }
978 136 : }
979 : }
980 :
981 : trait PageSzBytesMetric {
982 : fn set_page_sz(&self, count: usize);
983 : fn add_page_sz(&self, count: usize);
984 : fn sub_page_sz(&self, count: usize);
985 : }
986 :
987 : #[inline(always)]
988 332738 : fn count_times_page_sz(count: usize) -> u64 {
989 332738 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
990 332738 : }
991 :
992 : impl PageSzBytesMetric for metrics::UIntGauge {
993 408 : fn set_page_sz(&self, count: usize) {
994 408 : self.set(count_times_page_sz(count));
995 408 : }
996 167081 : fn add_page_sz(&self, count: usize) {
997 167081 : self.add(count_times_page_sz(count));
998 167081 : }
999 165249 : fn sub_page_sz(&self, count: usize) {
1000 165249 : self.sub(count_times_page_sz(count));
1001 165249 : }
1002 : }
|