Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Materialized pages**, filled & used by page reconstruction
21 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
22 : //!
23 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
24 : //! It uses the page cache only for the blocks that are already fully written and immutable.
25 : //!
26 : //! # Filling The Page Cache
27 : //!
28 : //! Page cache maps from a cache key to a buffer slot.
29 : //! The cache key uniquely identifies the piece of data that is being cached.
30 : //!
31 : //! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
32 : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
33 : //!
34 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
35 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
36 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
37 : //! * Get a [`FileId`] using [`next_file_id`].
38 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
39 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
40 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
41 : //! a read guard for the page. Just use it.
42 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
43 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
44 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
45 : //! Then try again to [`PageCache::read_immutable_buf`].
46 : //! Unless there's high cache pressure, the page should now be cached.
47 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
48 : //!
49 : //! # Locking
50 : //!
51 : //! There are two levels of locking involved: There's one lock for the "mapping"
52 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
53 : //! slot, and a separate lock on each slot. To read or write the contents of a
54 : //! slot, you must hold the lock on the slot in read or write mode,
55 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
56 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
57 : //! the slot at the same time.
58 : //!
59 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
60 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
61 : //! in the cache, you would first look up the mapping, while holding the mapping
62 : //! lock, and then lock the slot. You must release the mapping lock in between,
63 : //! to obey the lock ordering and avoid deadlock.
64 : //!
65 : //! A slot can momentarily have invalid contents, even if it's already been
66 : //! inserted to the mapping, but you must hold the write-lock on the slot until
67 : //! the contents are valid. If you need to release the lock without initializing
68 : //! the contents, you must remove the mapping first. We make that easy for the
69 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
70 : //! initialized it. If the guard is dropped without calling mark_valid(), the
71 : //! mapping is automatically removed and the slot is marked free.
72 : //!
73 :
74 : use std::{
75 : collections::{hash_map::Entry, HashMap},
76 : convert::TryInto,
77 : sync::{
78 : atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
79 : Arc, Weak,
80 : },
81 : time::Duration,
82 : };
83 :
84 : use anyhow::Context;
85 : use once_cell::sync::OnceCell;
86 : use pageserver_api::shard::TenantShardId;
87 : use utils::{id::TimelineId, lsn::Lsn};
88 :
89 : use crate::{
90 : context::RequestContext,
91 : metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
92 : repository::Key,
93 : };
94 :
95 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
96 : const TEST_PAGE_CACHE_SIZE: usize = 50;
97 :
98 : ///
99 : /// Initialize the page cache. This must be called once at page server startup.
100 : ///
101 604 : pub fn init(size: usize) {
102 604 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
103 0 : panic!("page cache already initialized");
104 604 : }
105 604 : }
106 :
107 : ///
108 : /// Get a handle to the page cache.
109 : ///
110 195785441 : pub fn get() -> &'static PageCache {
111 195785441 : //
112 195785441 : // In unit tests, page server startup doesn't happen and no one calls
113 195785441 : // page_cache::init(). Initialize it here with a tiny cache, so that the
114 195785441 : // page cache is usable in unit tests.
115 195785441 : //
116 195785441 : if cfg!(test) {
117 3336990 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
118 : } else {
119 192448451 : PAGE_CACHE.get().expect("page cache not initialized")
120 : }
121 195785441 : }
122 :
123 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
124 : const MAX_USAGE_COUNT: u8 = 5;
125 :
126 : /// See module-level comment.
127 358902761 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
128 : pub struct FileId(u64);
129 :
130 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
131 :
132 : /// See module-level comment.
133 40122 : pub fn next_file_id() -> FileId {
134 40122 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
135 40122 : }
136 :
137 : ///
138 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
139 : ///
140 205800103 : #[derive(Debug, PartialEq, Eq, Clone)]
141 : #[allow(clippy::enum_variant_names)]
142 : enum CacheKey {
143 : MaterializedPage {
144 : hash_key: MaterializedPageHashKey,
145 : lsn: Lsn,
146 : },
147 : ImmutableFilePage {
148 : file_id: FileId,
149 : blkno: u32,
150 : },
151 : }
152 :
153 17013914 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
154 : struct MaterializedPageHashKey {
155 : /// Why is this TenantShardId rather than TenantId?
156 : ///
157 : /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
158 : /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
159 : /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
160 : /// special-cased in some other way.
161 : tenant_shard_id: TenantShardId,
162 : timeline_id: TimelineId,
163 : key: Key,
164 : }
165 :
166 0 : #[derive(Clone)]
167 : struct Version {
168 : lsn: Lsn,
169 : slot_idx: usize,
170 : }
171 :
172 : struct Slot {
173 : inner: tokio::sync::RwLock<SlotInner>,
174 : usage_count: AtomicU8,
175 : }
176 :
177 : struct SlotInner {
178 : key: Option<CacheKey>,
179 : // for `coalesce_readers_permit`
180 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
181 : buf: &'static mut [u8; PAGE_SZ],
182 : }
183 :
184 : impl Slot {
185 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
186 176065896 : fn inc_usage_count(&self) {
187 176065896 : let _ = self
188 176065896 : .usage_count
189 176067664 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
190 176067664 : if val == MAX_USAGE_COUNT {
191 153568314 : None
192 : } else {
193 22499350 : Some(val + 1)
194 : }
195 176067664 : });
196 176065896 : }
197 :
198 : /// Decrement usage count on the buffer, unless it's already zero. Returns
199 : /// the old usage count.
200 41696902 : fn dec_usage_count(&self) -> u8 {
201 41696902 : let count_res =
202 41696902 : self.usage_count
203 41696902 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
204 41696889 : if val == 0 {
205 12871934 : None
206 : } else {
207 28824955 : Some(val - 1)
208 : }
209 41696902 : });
210 41696902 :
211 41696902 : match count_res {
212 28824963 : Ok(usage_count) => usage_count,
213 12871939 : Err(usage_count) => usage_count,
214 : }
215 41696902 : }
216 :
217 : /// Sets the usage count to a specific value.
218 12871093 : fn set_usage_count(&self, count: u8) {
219 12871093 : self.usage_count.store(count, Ordering::Relaxed);
220 12871093 : }
221 : }
222 :
223 : impl SlotInner {
224 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
225 176065741 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
226 176065741 : let mut guard = self.permit.lock().unwrap();
227 176065741 : if let Some(existing_permit) = guard.upgrade() {
228 167940 : drop(guard);
229 167940 : drop(permit);
230 167940 : existing_permit
231 : } else {
232 175897801 : let permit = Arc::new(permit);
233 175897801 : *guard = Arc::downgrade(&permit);
234 175897801 : permit
235 : }
236 176065741 : }
237 : }
238 :
239 : pub struct PageCache {
240 : /// This contains the mapping from the cache key to buffer slot that currently
241 : /// contains the page, if any.
242 : ///
243 : /// TODO: This is protected by a single lock. If that becomes a bottleneck,
244 : /// this HashMap can be replaced with a more concurrent version, there are
245 : /// plenty of such crates around.
246 : ///
247 : /// If you add support for caching different kinds of objects, each object kind
248 : /// can have a separate mapping map, next to this field.
249 : materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
250 :
251 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
252 :
253 : /// The actual buffers with their metadata.
254 : slots: Box<[Slot]>,
255 :
256 : pinned_slots: Arc<tokio::sync::Semaphore>,
257 :
258 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
259 : /// This is interpreted modulo the page cache size.
260 : next_evict_slot: AtomicUsize,
261 :
262 : size_metrics: &'static PageCacheSizeMetrics,
263 : }
264 :
265 : struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
266 :
267 : ///
268 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
269 : /// until the guard is dropped.
270 : ///
271 : pub struct PageReadGuard<'i> {
272 : _permit: Arc<PinnedSlotsPermit>,
273 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
274 : }
275 :
276 : impl std::ops::Deref for PageReadGuard<'_> {
277 : type Target = [u8; PAGE_SZ];
278 :
279 356745121 : fn deref(&self) -> &Self::Target {
280 356745121 : self.slot_guard.buf
281 356745121 : }
282 : }
283 :
284 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
285 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
286 0 : self.slot_guard.buf
287 0 : }
288 : }
289 :
290 : ///
291 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
292 : /// until the guard is dropped.
293 : ///
294 : /// Counterintuitively, this is used even for a read, if the requested page is not
295 : /// currently found in the page cache. In that case, the caller of lock_for_read()
296 : /// is expected to fill in the page contents and call mark_valid().
297 : pub struct PageWriteGuard<'i> {
298 : state: PageWriteGuardState<'i>,
299 : }
300 :
301 : enum PageWriteGuardState<'i> {
302 : Invalid {
303 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
304 : _permit: PinnedSlotsPermit,
305 : },
306 : Downgraded,
307 : }
308 :
309 : impl std::ops::DerefMut for PageWriteGuard<'_> {
310 12871093 : fn deref_mut(&mut self) -> &mut Self::Target {
311 12871093 : match &mut self.state {
312 12871093 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
313 0 : PageWriteGuardState::Downgraded => unreachable!(),
314 : }
315 12871093 : }
316 : }
317 :
318 : impl std::ops::Deref for PageWriteGuard<'_> {
319 : type Target = [u8; PAGE_SZ];
320 :
321 23654036 : fn deref(&self) -> &Self::Target {
322 23654036 : match &self.state {
323 23654036 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
324 0 : PageWriteGuardState::Downgraded => unreachable!(),
325 : }
326 23654036 : }
327 : }
328 :
329 : impl<'a> PageWriteGuard<'a> {
330 : /// Mark that the buffer contents are now valid.
331 : #[must_use]
332 12871091 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
333 12871091 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
334 12871091 : match prev {
335 12871091 : PageWriteGuardState::Invalid { inner, _permit } => {
336 12871091 : assert!(inner.key.is_some());
337 12871091 : PageReadGuard {
338 12871091 : _permit: Arc::new(_permit),
339 12871091 : slot_guard: inner.downgrade(),
340 12871091 : }
341 : }
342 0 : PageWriteGuardState::Downgraded => unreachable!(),
343 : }
344 12871091 : }
345 : }
346 :
347 : impl Drop for PageWriteGuard<'_> {
348 : ///
349 : /// If the buffer was allocated for a page that was not already in the
350 : /// cache, but the lock_for_read/write() caller dropped the buffer without
351 : /// initializing it, remove the mapping from the page cache.
352 : ///
353 12871076 : fn drop(&mut self) {
354 12871076 : match &mut self.state {
355 0 : PageWriteGuardState::Invalid { inner, _permit } => {
356 0 : assert!(inner.key.is_some());
357 0 : let self_key = inner.key.as_ref().unwrap();
358 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
359 0 : inner.key = None;
360 : }
361 12871076 : PageWriteGuardState::Downgraded => {}
362 : }
363 12871076 : }
364 : }
365 :
366 : /// lock_for_read() return value
367 : pub enum ReadBufResult<'a> {
368 : Found(PageReadGuard<'a>),
369 : NotFound(PageWriteGuard<'a>),
370 : }
371 :
372 : impl PageCache {
373 : //
374 : // Section 1.1: Public interface functions for looking up and memorizing materialized page
375 : // versions in the page cache
376 : //
377 :
378 : /// Look up a materialized page version.
379 : ///
380 : /// The 'lsn' is an upper bound, this will return the latest version of
381 : /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
382 : /// returned page.
383 8625194 : pub async fn lookup_materialized_page(
384 8625194 : &self,
385 8625194 : tenant_shard_id: TenantShardId,
386 8625194 : timeline_id: TimelineId,
387 8625194 : key: &Key,
388 8625194 : lsn: Lsn,
389 8625194 : ctx: &RequestContext,
390 8625194 : ) -> Option<(Lsn, PageReadGuard)> {
391 8625192 : let Ok(permit) = self.try_get_pinned_slot_permit().await else {
392 0 : return None;
393 : };
394 :
395 8625192 : crate::metrics::PAGE_CACHE
396 8625192 : .for_ctx(ctx)
397 8625192 : .read_accesses_materialized_page
398 8625192 : .inc();
399 8625192 :
400 8625192 : let mut cache_key = CacheKey::MaterializedPage {
401 8625192 : hash_key: MaterializedPageHashKey {
402 8625192 : tenant_shard_id,
403 8625192 : timeline_id,
404 8625192 : key: *key,
405 8625192 : },
406 8625192 : lsn,
407 8625192 : };
408 :
409 8625192 : if let Some(guard) = self
410 8625192 : .try_lock_for_read(&mut cache_key, &mut Some(permit))
411 1375 : .await
412 : {
413 : if let CacheKey::MaterializedPage {
414 : hash_key: _,
415 1776743 : lsn: available_lsn,
416 1776743 : } = cache_key
417 : {
418 1776743 : if available_lsn == lsn {
419 17 : crate::metrics::PAGE_CACHE
420 17 : .for_ctx(ctx)
421 17 : .read_hits_materialized_page_exact
422 17 : .inc();
423 1776726 : } else {
424 1776726 : crate::metrics::PAGE_CACHE
425 1776726 : .for_ctx(ctx)
426 1776726 : .read_hits_materialized_page_older_lsn
427 1776726 : .inc();
428 1776726 : }
429 1776743 : Some((available_lsn, guard))
430 : } else {
431 0 : panic!("unexpected key type in slot");
432 : }
433 : } else {
434 6848449 : None
435 : }
436 8625192 : }
437 :
438 : ///
439 : /// Store an image of the given page in the cache.
440 : ///
441 2857268 : pub async fn memorize_materialized_page(
442 2857268 : &self,
443 2857268 : tenant_shard_id: TenantShardId,
444 2857268 : timeline_id: TimelineId,
445 2857268 : key: Key,
446 2857268 : lsn: Lsn,
447 2857268 : img: &[u8],
448 2857268 : ) -> anyhow::Result<()> {
449 2857268 : let cache_key = CacheKey::MaterializedPage {
450 2857268 : hash_key: MaterializedPageHashKey {
451 2857268 : tenant_shard_id,
452 2857268 : timeline_id,
453 2857268 : key,
454 2857268 : },
455 2857268 : lsn,
456 2857268 : };
457 :
458 2857268 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
459 : loop {
460 : // First check if the key already exists in the cache.
461 2857276 : if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
462 : // The page was found in the mapping. Lock the slot, and re-check
463 : // that it's still what we expected (because we don't released the mapping
464 : // lock already, another thread could have evicted the page)
465 155 : let slot = &self.slots[slot_idx];
466 155 : let inner = slot.inner.write().await;
467 155 : if inner.key.as_ref() == Some(&cache_key) {
468 155 : slot.inc_usage_count();
469 : debug_assert!(
470 : {
471 155 : let guard = inner.permit.lock().unwrap();
472 155 : guard.upgrade().is_none()
473 : },
474 0 : "we hold a write lock, so, no one else should have a permit"
475 : );
476 155 : debug_assert_eq!(inner.buf.len(), img.len());
477 : // We already had it in cache. Another thread must've put it there
478 : // concurrently. Check that it had the same contents that we
479 : // replayed.
480 155 : assert!(inner.buf == img);
481 155 : return Ok(());
482 0 : }
483 2857121 : }
484 2857121 : debug_assert!(permit.is_some());
485 :
486 : // Not found. Find a victim buffer
487 2857121 : let (slot_idx, mut inner) = self
488 2857121 : .find_victim(permit.as_ref().unwrap())
489 0 : .await
490 2857121 : .context("Failed to find evict victim")?;
491 :
492 : // Insert mapping for this. At this point, we may find that another
493 : // thread did the same thing concurrently. In that case, we evicted
494 : // our victim buffer unnecessarily. Put it into the free list and
495 : // continue with the slot that the other thread chose.
496 2857121 : if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
497 : // TODO: put to free list
498 :
499 : // We now just loop back to start from beginning. This is not
500 : // optimal, we'll perform the lookup in the mapping again, which
501 : // is not really necessary because we already got
502 : // 'existing_slot_idx'. But this shouldn't happen often enough
503 : // to matter much.
504 8 : continue;
505 2857113 : }
506 2857113 :
507 2857113 : // Make the slot ready
508 2857113 : let slot = &self.slots[slot_idx];
509 2857113 : inner.key = Some(cache_key.clone());
510 2857113 : slot.set_usage_count(1);
511 : // Create a write guard for the slot so we go through the expected motions.
512 : debug_assert!(
513 : {
514 2857113 : let guard = inner.permit.lock().unwrap();
515 2857113 : guard.upgrade().is_none()
516 : },
517 0 : "we hold a write lock, so, no one else should have a permit"
518 : );
519 2857113 : let mut write_guard = PageWriteGuard {
520 2857113 : state: PageWriteGuardState::Invalid {
521 2857113 : _permit: permit.take().unwrap(),
522 2857113 : inner,
523 2857113 : },
524 2857113 : };
525 2857113 : write_guard.copy_from_slice(img);
526 2857113 : let _ = write_guard.mark_valid();
527 2857113 : return Ok(());
528 : }
529 2857268 : }
530 :
531 : // Section 1.2: Public interface functions for working with immutable file pages.
532 :
533 184302979 : pub async fn read_immutable_buf(
534 184302979 : &self,
535 184302979 : file_id: FileId,
536 184302979 : blkno: u32,
537 184302979 : ctx: &RequestContext,
538 184302979 : ) -> anyhow::Result<ReadBufResult> {
539 184302937 : let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
540 184302937 :
541 184302937 : self.lock_for_read(&mut cache_key, ctx).await
542 184302933 : }
543 :
544 : //
545 : // Section 2: Internal interface functions for lookup/update.
546 : //
547 : // To add support for a new kind of "thing" to cache, you will need
548 : // to add public interface routines above, and code to deal with the
549 : // "mappings" after this section. But the routines in this section should
550 : // not require changes.
551 :
552 195785440 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
553 195785396 : match tokio::time::timeout(
554 195785396 : // Choose small timeout, neon_smgr does its own retries.
555 195785396 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
556 195785396 : Duration::from_secs(10),
557 195785396 : Arc::clone(&self.pinned_slots).acquire_owned(),
558 195785396 : )
559 1080486 : .await
560 : {
561 195785395 : Ok(res) => Ok(PinnedSlotsPermit(
562 195785395 : res.expect("this semaphore is never closed"),
563 195785395 : )),
564 0 : Err(_timeout) => {
565 0 : crate::metrics::page_cache_errors_inc(
566 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
567 0 : );
568 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
569 : }
570 : }
571 195785395 : }
572 :
573 : /// Look up a page in the cache.
574 : ///
575 : /// If the search criteria is not exact, *cache_key is updated with the key
576 : /// for exact key of the returned page. (For materialized pages, that means
577 : /// that the LSN in 'cache_key' is updated with the LSN of the returned page
578 : /// version.)
579 : ///
580 : /// If no page is found, returns None and *cache_key is left unmodified.
581 : ///
582 192929010 : async fn try_lock_for_read(
583 192929010 : &self,
584 192929010 : cache_key: &mut CacheKey,
585 192929010 : permit: &mut Option<PinnedSlotsPermit>,
586 192929010 : ) -> Option<PageReadGuard> {
587 192928964 : let cache_key_orig = cache_key.clone();
588 192928964 : if let Some(slot_idx) = self.search_mapping(cache_key) {
589 : // The page was found in the mapping. Lock the slot, and re-check
590 : // that it's still what we expected (because we released the mapping
591 : // lock already, another thread could have evicted the page)
592 176065706 : let slot = &self.slots[slot_idx];
593 176065706 : let inner = slot.inner.read().await;
594 176065706 : if inner.key.as_ref() == Some(cache_key) {
595 176065702 : slot.inc_usage_count();
596 176065702 : return Some(PageReadGuard {
597 176065702 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
598 176065702 : slot_guard: inner,
599 176065702 : });
600 4 : } else {
601 4 : // search_mapping might have modified the search key; restore it.
602 4 : *cache_key = cache_key_orig;
603 4 : }
604 16863258 : }
605 16863262 : None
606 192928964 : }
607 :
608 : /// Return a locked buffer for given block.
609 : ///
610 : /// Like try_lock_for_read(), if the search criteria is not exact and the
611 : /// page is already found in the cache, *cache_key is updated.
612 : ///
613 : /// If the page is not found in the cache, this allocates a new buffer for
614 : /// it. The caller may then initialize the buffer with the contents, and
615 : /// call mark_valid().
616 : ///
617 : /// Example usage:
618 : ///
619 : /// ```ignore
620 : /// let cache = page_cache::get();
621 : ///
622 : /// match cache.lock_for_read(&key) {
623 : /// ReadBufResult::Found(read_guard) => {
624 : /// // The page was found in cache. Use it
625 : /// },
626 : /// ReadBufResult::NotFound(write_guard) => {
627 : /// // The page was not found in cache. Read it from disk into the
628 : /// // buffer.
629 : /// //read_my_page_from_disk(write_guard);
630 : ///
631 : /// // The buffer contents are now valid. Tell the page cache.
632 : /// write_guard.mark_valid();
633 : /// },
634 : /// }
635 : /// ```
636 : ///
637 184302979 : async fn lock_for_read(
638 184302979 : &self,
639 184302979 : cache_key: &mut CacheKey,
640 184302979 : ctx: &RequestContext,
641 184302979 : ) -> anyhow::Result<ReadBufResult> {
642 184302936 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
643 :
644 184302935 : let (read_access, hit) = match cache_key {
645 : CacheKey::MaterializedPage { .. } => {
646 0 : unreachable!("Materialized pages use lookup_materialized_page")
647 : }
648 184302935 : CacheKey::ImmutableFilePage { .. } => (
649 184302935 : &crate::metrics::PAGE_CACHE
650 184302935 : .for_ctx(ctx)
651 184302935 : .read_accesses_immutable,
652 184302935 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
653 184302935 : ),
654 184302935 : };
655 184302935 : read_access.inc();
656 184302935 :
657 184302935 : let mut is_first_iteration = true;
658 : loop {
659 : // First check if the key already exists in the cache.
660 184303773 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
661 174288959 : debug_assert!(permit.is_none());
662 174288959 : if is_first_iteration {
663 174288121 : hit.inc();
664 174288121 : }
665 174288959 : return Ok(ReadBufResult::Found(read_guard));
666 10014813 : }
667 10014813 : debug_assert!(permit.is_some());
668 10014813 : is_first_iteration = false;
669 :
670 : // Not found. Find a victim buffer
671 10014813 : let (slot_idx, mut inner) = self
672 10014813 : .find_victim(permit.as_ref().unwrap())
673 0 : .await
674 10014812 : .context("Failed to find evict victim")?;
675 :
676 : // Insert mapping for this. At this point, we may find that another
677 : // thread did the same thing concurrently. In that case, we evicted
678 : // our victim buffer unnecessarily. Put it into the free list and
679 : // continue with the slot that the other thread chose.
680 10014812 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
681 : // TODO: put to free list
682 :
683 : // We now just loop back to start from beginning. This is not
684 : // optimal, we'll perform the lookup in the mapping again, which
685 : // is not really necessary because we already got
686 : // 'existing_slot_idx'. But this shouldn't happen often enough
687 : // to matter much.
688 838 : continue;
689 10013974 : }
690 10013974 :
691 10013974 : // Make the slot ready
692 10013974 : let slot = &self.slots[slot_idx];
693 10013974 : inner.key = Some(cache_key.clone());
694 10013974 : slot.set_usage_count(1);
695 :
696 : debug_assert!(
697 : {
698 10013974 : let guard = inner.permit.lock().unwrap();
699 10013974 : guard.upgrade().is_none()
700 : },
701 0 : "we hold a write lock, so, no one else should have a permit"
702 : );
703 :
704 10013974 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
705 10013974 : state: PageWriteGuardState::Invalid {
706 10013974 : _permit: permit.take().unwrap(),
707 10013974 : inner,
708 10013974 : },
709 10013974 : }));
710 : }
711 184302933 : }
712 :
713 : //
714 : // Section 3: Mapping functions
715 : //
716 :
717 : /// Search for a page in the cache using the given search key.
718 : ///
719 : /// Returns the slot index, if any. If the search criteria is not exact,
720 : /// *cache_key is updated with the actual key of the found page.
721 : ///
722 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
723 : /// get recycled for an unrelated page immediately after this function
724 : /// returns. The caller is responsible for re-checking that the slot still
725 : /// contains the page with the same key before using it.
726 : ///
727 192929010 : fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
728 192929010 : match cache_key {
729 8625194 : CacheKey::MaterializedPage { hash_key, lsn } => {
730 8625194 : let map = self.materialized_page_map.read().unwrap();
731 8625194 : let versions = map.get(hash_key)?;
732 :
733 2214950 : let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
734 17 : Ok(version_idx) => version_idx,
735 460 : Err(0) => return None,
736 1776730 : Err(version_idx) => version_idx - 1,
737 : };
738 1776747 : let version = &versions[version_idx];
739 1776747 : *lsn = version.lsn;
740 1776747 : Some(version.slot_idx)
741 : }
742 184303816 : CacheKey::ImmutableFilePage { file_id, blkno } => {
743 184303816 : let map = self.immutable_page_map.read().unwrap();
744 184303816 : Some(*map.get(&(*file_id, *blkno))?)
745 : }
746 : }
747 192929010 : }
748 :
749 : /// Search for a page in the cache using the given search key.
750 : ///
751 : /// Like 'search_mapping, but performs an "exact" search. Used for
752 : /// allocating a new buffer.
753 2857276 : fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
754 2857276 : match key {
755 2857276 : CacheKey::MaterializedPage { hash_key, lsn } => {
756 2857276 : let map = self.materialized_page_map.read().unwrap();
757 2857276 : let versions = map.get(hash_key)?;
758 :
759 678017 : if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
760 155 : Some(versions[version_idx].slot_idx)
761 : } else {
762 438637 : None
763 : }
764 : }
765 0 : CacheKey::ImmutableFilePage { file_id, blkno } => {
766 0 : let map = self.immutable_page_map.read().unwrap();
767 0 : Some(*map.get(&(*file_id, *blkno))?)
768 : }
769 : }
770 2857276 : }
771 :
772 : ///
773 : /// Remove mapping for given key.
774 : ///
775 10020320 : fn remove_mapping(&self, old_key: &CacheKey) {
776 10020320 : match old_key {
777 : CacheKey::MaterializedPage {
778 2674486 : hash_key: old_hash_key,
779 2674486 : lsn: old_lsn,
780 2674486 : } => {
781 2674486 : let mut map = self.materialized_page_map.write().unwrap();
782 2674486 : if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
783 2674486 : let versions = old_entry.get_mut();
784 :
785 3216787 : if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
786 2674486 : versions.remove(version_idx);
787 2674486 : self.size_metrics
788 2674486 : .current_bytes_materialized_page
789 2674486 : .sub_page_sz(1);
790 2674486 : if versions.is_empty() {
791 2258724 : old_entry.remove_entry();
792 2258724 : }
793 0 : }
794 : } else {
795 0 : panic!("could not find old key in mapping")
796 : }
797 : }
798 7345834 : CacheKey::ImmutableFilePage { file_id, blkno } => {
799 7345834 : let mut map = self.immutable_page_map.write().unwrap();
800 7345834 : map.remove(&(*file_id, *blkno))
801 7345834 : .expect("could not find old key in mapping");
802 7345834 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
803 7345834 : }
804 : }
805 10020320 : }
806 :
807 : ///
808 : /// Insert mapping for given key.
809 : ///
810 : /// If a mapping already existed for the given key, returns the slot index
811 : /// of the existing mapping and leaves it untouched.
812 12871939 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
813 12871939 : match new_key {
814 : CacheKey::MaterializedPage {
815 2857121 : hash_key: new_key,
816 2857121 : lsn: new_lsn,
817 2857121 : } => {
818 2857121 : let mut map = self.materialized_page_map.write().unwrap();
819 2857121 : let versions = map.entry(new_key.clone()).or_default();
820 2857121 : match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
821 8 : Ok(version_idx) => Some(versions[version_idx].slot_idx),
822 2857113 : Err(version_idx) => {
823 2857113 : versions.insert(
824 2857113 : version_idx,
825 2857113 : Version {
826 2857113 : lsn: *new_lsn,
827 2857113 : slot_idx,
828 2857113 : },
829 2857113 : );
830 2857113 : self.size_metrics
831 2857113 : .current_bytes_materialized_page
832 2857113 : .add_page_sz(1);
833 2857113 : None
834 : }
835 : }
836 : }
837 :
838 10014818 : CacheKey::ImmutableFilePage { file_id, blkno } => {
839 10014818 : let mut map = self.immutable_page_map.write().unwrap();
840 10014818 : match map.entry((*file_id, *blkno)) {
841 838 : Entry::Occupied(entry) => Some(*entry.get()),
842 10013980 : Entry::Vacant(entry) => {
843 10013980 : entry.insert(slot_idx);
844 10013980 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
845 10013980 : None
846 : }
847 : }
848 : }
849 : }
850 12871939 : }
851 :
852 : //
853 : // Section 4: Misc internal helpers
854 : //
855 :
856 : /// Find a slot to evict.
857 : ///
858 : /// On return, the slot is empty and write-locked.
859 12871939 : async fn find_victim(
860 12871939 : &self,
861 12871939 : _permit_witness: &PinnedSlotsPermit,
862 12871939 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
863 12871934 : let iter_limit = self.slots.len() * 10;
864 12871934 : let mut iters = 0;
865 41696889 : loop {
866 41696889 : iters += 1;
867 41696889 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
868 41696889 :
869 41696889 : let slot = &self.slots[slot_idx];
870 41696889 :
871 41696889 : if slot.dec_usage_count() == 0 {
872 12871934 : let mut inner = match slot.inner.try_write() {
873 12871933 : Ok(inner) => inner,
874 1 : Err(_err) => {
875 1 : if iters > iter_limit {
876 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
877 : // any particular number of iterations: other threads might race ahead and acquire and
878 : // release pins just as we're scanning the array.
879 : //
880 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
881 : // slots. There are two threads running concurrently, A and B. A has just
882 : // acquired the permit from the semaphore.
883 : //
884 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
885 : // B: Acquire permit.
886 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
887 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
888 : // B: Release pin and permit again
889 : // B: Acquire permit.
890 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
891 : // B: Release pin and permit again
892 : //
893 : // Now we're back in the starting situation that both slots have
894 : // usage_count 1, but A has now been through one iteration of the
895 : // find_victim() loop. This can repeat indefinitely and on each
896 : // iteration, A's iteration count increases by one.
897 : //
898 : // So, even though the semaphore for the permits is fair, the victim search
899 : // itself happens in parallel and is not fair.
900 : // Hence even with a permit, a task can theoretically be starved.
901 : // To avoid this, we'd need tokio to give priority to tasks that are holding
902 : // permits for longer.
903 : // Note that just yielding to tokio during iteration without such
904 : // priority boosting is likely counter-productive. We'd just give more opportunities
905 : // for B to bump usage count, further starving A.
906 0 : page_cache_eviction_metrics::observe(
907 0 : page_cache_eviction_metrics::Outcome::ItersExceeded {
908 0 : iters: iters.try_into().unwrap(),
909 0 : },
910 0 : );
911 0 : anyhow::bail!("exceeded evict iter limit");
912 0 : }
913 0 : continue;
914 : }
915 : };
916 12871933 : if let Some(old_key) = &inner.key {
917 10020315 : // remove mapping for old buffer
918 10020315 : self.remove_mapping(old_key);
919 10020315 : inner.key = None;
920 10020315 : page_cache_eviction_metrics::observe(
921 10020315 : page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
922 10020315 : iters: iters.try_into().unwrap(),
923 10020315 : },
924 10020315 : );
925 10020315 : } else {
926 2851618 : page_cache_eviction_metrics::observe(
927 2851618 : page_cache_eviction_metrics::Outcome::FoundSlotUnused {
928 2851618 : iters: iters.try_into().unwrap(),
929 2851618 : },
930 2851618 : );
931 2851618 : }
932 12871933 : return Ok((slot_idx, inner));
933 28824955 : }
934 : }
935 12871933 : }
936 :
937 : /// Initialize a new page cache
938 : ///
939 : /// This should be called only once at page server startup.
940 678 : fn new(num_pages: usize) -> Self {
941 678 : assert!(num_pages > 0, "page cache size must be > 0");
942 :
943 : // We could use Vec::leak here, but that potentially also leaks
944 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
945 : // this is avoided.
946 678 : let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
947 678 :
948 678 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
949 678 : size_metrics.max_bytes.set_page_sz(num_pages);
950 678 : size_metrics.current_bytes_immutable.set_page_sz(0);
951 678 : size_metrics.current_bytes_materialized_page.set_page_sz(0);
952 678 :
953 678 : let slots = page_buffer
954 678 : .chunks_exact_mut(PAGE_SZ)
955 4928424 : .map(|chunk| {
956 4928424 : let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
957 4928424 :
958 4928424 : Slot {
959 4928424 : inner: tokio::sync::RwLock::new(SlotInner {
960 4928424 : key: None,
961 4928424 : buf,
962 4928424 : permit: std::sync::Mutex::new(Weak::new()),
963 4928424 : }),
964 4928424 : usage_count: AtomicU8::new(0),
965 4928424 : }
966 4928424 : })
967 678 : .collect();
968 678 :
969 678 : Self {
970 678 : materialized_page_map: Default::default(),
971 678 : immutable_page_map: Default::default(),
972 678 : slots,
973 678 : next_evict_slot: AtomicUsize::new(0),
974 678 : size_metrics,
975 678 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
976 678 : }
977 678 : }
978 : }
979 :
980 : trait PageSzBytesMetric {
981 : fn set_page_sz(&self, count: usize);
982 : fn add_page_sz(&self, count: usize);
983 : fn sub_page_sz(&self, count: usize);
984 : }
985 :
986 : #[inline(always)]
987 22893429 : fn count_times_page_sz(count: usize) -> u64 {
988 22893429 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
989 22893429 : }
990 :
991 : impl PageSzBytesMetric for metrics::UIntGauge {
992 2034 : fn set_page_sz(&self, count: usize) {
993 2034 : self.set(count_times_page_sz(count));
994 2034 : }
995 12871083 : fn add_page_sz(&self, count: usize) {
996 12871083 : self.add(count_times_page_sz(count));
997 12871083 : }
998 10020312 : fn sub_page_sz(&self, count: usize) {
999 10020312 : self.sub(count_times_page_sz(count));
1000 10020312 : }
1001 : }
|