TLA Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Materialized pages**, filled & used by page reconstruction
21 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
22 : //!
23 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
24 : //! It uses the page cache only for the blocks that are already fully written and immutable.
25 : //!
26 : //! # Filling The Page Cache
27 : //!
28 : //! Page cache maps from a cache key to a buffer slot.
29 : //! The cache key uniquely identifies the piece of data that is being cached.
30 : //!
31 : //! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
32 : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
33 : //!
34 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
35 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
36 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
37 : //! * Get a [`FileId`] using [`next_file_id`].
38 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
39 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
40 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
41 : //! a read guard for the page. Just use it.
42 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
43 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
44 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
45 : //! Then try again to [`PageCache::read_immutable_buf`].
46 : //! Unless there's high cache pressure, the page should now be cached.
47 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
48 : //!
49 : //! # Locking
50 : //!
51 : //! There are two levels of locking involved: There's one lock for the "mapping"
52 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
53 : //! slot, and a separate lock on each slot. To read or write the contents of a
54 : //! slot, you must hold the lock on the slot in read or write mode,
55 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
56 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
57 : //! the slot at the same time.
58 : //!
59 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
60 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
61 : //! in the cache, you would first look up the mapping, while holding the mapping
62 : //! lock, and then lock the slot. You must release the mapping lock in between,
63 : //! to obey the lock ordering and avoid deadlock.
64 : //!
65 : //! A slot can momentarily have invalid contents, even if it's already been
66 : //! inserted to the mapping, but you must hold the write-lock on the slot until
67 : //! the contents are valid. If you need to release the lock without initializing
68 : //! the contents, you must remove the mapping first. We make that easy for the
69 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
70 : //! initialized it. If the guard is dropped without calling mark_valid(), the
71 : //! mapping is automatically removed and the slot is marked free.
72 : //!
73 :
74 : use std::{
75 : collections::{hash_map::Entry, HashMap},
76 : convert::TryInto,
77 : sync::{
78 : atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
79 : Arc, Weak,
80 : },
81 : time::Duration,
82 : };
83 :
84 : use anyhow::Context;
85 : use once_cell::sync::OnceCell;
86 : use utils::{
87 : id::{TenantId, TimelineId},
88 : lsn::Lsn,
89 : };
90 :
91 : use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key};
92 :
93 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
94 : const TEST_PAGE_CACHE_SIZE: usize = 50;
95 :
96 : ///
97 : /// Initialize the page cache. This must be called once at page server startup.
98 : ///
99 CBC 560 : pub fn init(size: usize) {
100 560 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
101 UBC 0 : panic!("page cache already initialized");
102 CBC 560 : }
103 560 : }
104 :
105 : ///
106 : /// Get a handle to the page cache.
107 : ///
108 300896511 : pub fn get() -> &'static PageCache {
109 300896511 : //
110 300896511 : // In unit tests, page server startup doesn't happen and no one calls
111 300896511 : // page_cache::init(). Initialize it here with a tiny cache, so that the
112 300896511 : // page cache is usable in unit tests.
113 300896511 : //
114 300896511 : if cfg!(test) {
115 1652600 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
116 : } else {
117 299243911 : PAGE_CACHE.get().expect("page cache not initialized")
118 : }
119 300896511 : }
120 :
121 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
122 : const MAX_USAGE_COUNT: u8 = 5;
123 :
124 : /// See module-level comment.
125 576067250 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
126 : pub struct FileId(u64);
127 :
128 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
129 :
130 : /// See module-level comment.
131 20489 : pub fn next_file_id() -> FileId {
132 20489 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
133 20489 : }
134 :
135 : ///
136 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
137 : ///
138 311165775 : #[derive(Debug, PartialEq, Eq, Clone)]
139 : #[allow(clippy::enum_variant_names)]
140 : enum CacheKey {
141 : MaterializedPage {
142 : hash_key: MaterializedPageHashKey,
143 : lsn: Lsn,
144 : },
145 : ImmutableFilePage {
146 : file_id: FileId,
147 : blkno: u32,
148 : },
149 : }
150 :
151 12948523 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
152 : struct MaterializedPageHashKey {
153 : tenant_id: TenantId,
154 : timeline_id: TimelineId,
155 : key: Key,
156 : }
157 :
158 UBC 0 : #[derive(Clone)]
159 : struct Version {
160 : lsn: Lsn,
161 : slot_idx: usize,
162 : }
163 :
164 : struct Slot {
165 : inner: tokio::sync::RwLock<SlotInner>,
166 : usage_count: AtomicU8,
167 : }
168 :
169 : struct SlotInner {
170 : key: Option<CacheKey>,
171 : // for `coalesce_readers_permit`
172 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
173 : buf: &'static mut [u8; PAGE_SZ],
174 : }
175 :
176 : impl Slot {
177 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
178 CBC 283300727 : fn inc_usage_count(&self) {
179 283300727 : let _ = self
180 283300727 : .usage_count
181 283300992 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
182 283300992 : if val == MAX_USAGE_COUNT {
183 257003237 : None
184 : } else {
185 26297755 : Some(val + 1)
186 : }
187 283300992 : });
188 283300727 : }
189 :
190 : /// Decrement usage count on the buffer, unless it's already zero. Returns
191 : /// the old usage count.
192 44918480 : fn dec_usage_count(&self) -> u8 {
193 44918480 : let count_res =
194 44918480 : self.usage_count
195 44918480 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
196 44918464 : if val == 0 {
197 12521726 : None
198 : } else {
199 32396738 : Some(val - 1)
200 : }
201 44918480 : });
202 44918480 :
203 44918480 : match count_res {
204 32396749 : Ok(usage_count) => usage_count,
205 12521731 : Err(usage_count) => usage_count,
206 : }
207 44918480 : }
208 :
209 : /// Sets the usage count to a specific value.
210 12521258 : fn set_usage_count(&self, count: u8) {
211 12521258 : self.usage_count.store(count, Ordering::Relaxed);
212 12521258 : }
213 : }
214 :
215 : impl SlotInner {
216 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
217 283296540 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
218 283296540 : let mut guard = self.permit.lock().unwrap();
219 283296540 : if let Some(existing_permit) = guard.upgrade() {
220 147452 : drop(guard);
221 147452 : drop(permit);
222 147452 : existing_permit
223 : } else {
224 283149088 : let permit = Arc::new(permit);
225 283149088 : *guard = Arc::downgrade(&permit);
226 283149088 : permit
227 : }
228 283296540 : }
229 : }
230 :
231 : pub struct PageCache {
232 : /// This contains the mapping from the cache key to buffer slot that currently
233 : /// contains the page, if any.
234 : ///
235 : /// TODO: This is protected by a single lock. If that becomes a bottleneck,
236 : /// this HashMap can be replaced with a more concurrent version, there are
237 : /// plenty of such crates around.
238 : ///
239 : /// If you add support for caching different kinds of objects, each object kind
240 : /// can have a separate mapping map, next to this field.
241 : materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
242 :
243 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
244 :
245 : /// The actual buffers with their metadata.
246 : slots: Box<[Slot]>,
247 :
248 : pinned_slots: Arc<tokio::sync::Semaphore>,
249 :
250 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
251 : /// This is interpreted modulo the page cache size.
252 : next_evict_slot: AtomicUsize,
253 :
254 : size_metrics: &'static PageCacheSizeMetrics,
255 : }
256 :
257 : struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
258 :
259 : ///
260 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
261 : /// until the guard is dropped.
262 : ///
263 : pub struct PageReadGuard<'i> {
264 : _permit: Arc<PinnedSlotsPermit>,
265 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
266 : }
267 :
268 : impl std::ops::Deref for PageReadGuard<'_> {
269 : type Target = [u8; PAGE_SZ];
270 :
271 622763980 : fn deref(&self) -> &Self::Target {
272 622763980 : self.slot_guard.buf
273 622763980 : }
274 : }
275 :
276 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
277 UBC 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
278 0 : self.slot_guard.buf
279 0 : }
280 : }
281 :
282 : ///
283 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
284 : /// until the guard is dropped.
285 : ///
286 : /// Counterintuitively, this is used even for a read, if the requested page is not
287 : /// currently found in the page cache. In that case, the caller of lock_for_read()
288 : /// is expected to fill in the page contents and call mark_valid().
289 : pub struct PageWriteGuard<'i> {
290 : state: PageWriteGuardState<'i>,
291 : }
292 :
293 : enum PageWriteGuardState<'i> {
294 : Invalid {
295 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
296 : _permit: PinnedSlotsPermit,
297 : },
298 : Downgraded,
299 : }
300 :
301 : impl std::ops::DerefMut for PageWriteGuard<'_> {
302 CBC 12521258 : fn deref_mut(&mut self) -> &mut Self::Target {
303 12521258 : match &mut self.state {
304 12521258 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
305 UBC 0 : PageWriteGuardState::Downgraded => unreachable!(),
306 : }
307 CBC 12521258 : }
308 : }
309 :
310 : impl std::ops::Deref for PageWriteGuard<'_> {
311 : type Target = [u8; PAGE_SZ];
312 :
313 UBC 0 : fn deref(&self) -> &Self::Target {
314 0 : match &self.state {
315 0 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
316 0 : PageWriteGuardState::Downgraded => unreachable!(),
317 : }
318 0 : }
319 : }
320 :
321 : impl<'a> PageWriteGuard<'a> {
322 : /// Mark that the buffer contents are now valid.
323 : #[must_use]
324 CBC 12521257 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
325 12521257 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
326 12521257 : match prev {
327 12521257 : PageWriteGuardState::Invalid { inner, _permit } => {
328 12521257 : assert!(inner.key.is_some());
329 12521257 : PageReadGuard {
330 12521257 : _permit: Arc::new(_permit),
331 12521257 : slot_guard: inner.downgrade(),
332 12521257 : }
333 : }
334 UBC 0 : PageWriteGuardState::Downgraded => unreachable!(),
335 : }
336 CBC 12521257 : }
337 : }
338 :
339 : impl Drop for PageWriteGuard<'_> {
340 : ///
341 : /// If the buffer was allocated for a page that was not already in the
342 : /// cache, but the lock_for_read/write() caller dropped the buffer without
343 : /// initializing it, remove the mapping from the page cache.
344 : ///
345 12521246 : fn drop(&mut self) {
346 12521246 : match &mut self.state {
347 UBC 0 : PageWriteGuardState::Invalid { inner, _permit } => {
348 0 : assert!(inner.key.is_some());
349 0 : let self_key = inner.key.as_ref().unwrap();
350 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
351 0 : inner.key = None;
352 : }
353 CBC 12521246 : PageWriteGuardState::Downgraded => {}
354 : }
355 12521246 : }
356 : }
357 :
358 : /// lock_for_read() return value
359 : pub enum ReadBufResult<'a> {
360 : Found(PageReadGuard<'a>),
361 : NotFound(PageWriteGuard<'a>),
362 : }
363 :
364 : impl PageCache {
365 : //
366 : // Section 1.1: Public interface functions for looking up and memorizing materialized page
367 : // versions in the page cache
368 : //
369 :
370 : /// Look up a materialized page version.
371 : ///
372 : /// The 'lsn' is an upper bound, this will return the latest version of
373 : /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
374 : /// returned page.
375 6372324 : pub async fn lookup_materialized_page(
376 6372324 : &self,
377 6372324 : tenant_id: TenantId,
378 6372324 : timeline_id: TimelineId,
379 6372324 : key: &Key,
380 6372324 : lsn: Lsn,
381 6372324 : ctx: &RequestContext,
382 6372324 : ) -> Option<(Lsn, PageReadGuard)> {
383 6372322 : let Ok(permit) = self.try_get_pinned_slot_permit().await else {
384 UBC 0 : return None;
385 : };
386 :
387 CBC 6372322 : crate::metrics::PAGE_CACHE
388 6372322 : .for_ctx(ctx)
389 6372322 : .read_accesses_materialized_page
390 6372322 : .inc();
391 6372322 :
392 6372322 : let mut cache_key = CacheKey::MaterializedPage {
393 6372322 : hash_key: MaterializedPageHashKey {
394 6372322 : tenant_id,
395 6372322 : timeline_id,
396 6372322 : key: *key,
397 6372322 : },
398 6372322 : lsn,
399 6372322 : };
400 :
401 6372322 : if let Some(guard) = self
402 6372322 : .try_lock_for_read(&mut cache_key, &mut Some(permit))
403 1419 : .await
404 : {
405 : if let CacheKey::MaterializedPage {
406 : hash_key: _,
407 1297801 : lsn: available_lsn,
408 1297801 : } = cache_key
409 : {
410 1297801 : if available_lsn == lsn {
411 16 : crate::metrics::PAGE_CACHE
412 16 : .for_ctx(ctx)
413 16 : .read_hits_materialized_page_exact
414 16 : .inc();
415 1297785 : } else {
416 1297785 : crate::metrics::PAGE_CACHE
417 1297785 : .for_ctx(ctx)
418 1297785 : .read_hits_materialized_page_older_lsn
419 1297785 : .inc();
420 1297785 : }
421 1297801 : Some((available_lsn, guard))
422 : } else {
423 UBC 0 : panic!("unexpected key type in slot");
424 : }
425 : } else {
426 CBC 5074521 : None
427 : }
428 6372322 : }
429 :
430 : ///
431 : /// Store an image of the given page in the cache.
432 : ///
433 2252422 : pub async fn memorize_materialized_page(
434 2252422 : &self,
435 2252422 : tenant_id: TenantId,
436 2252422 : timeline_id: TimelineId,
437 2252422 : key: Key,
438 2252422 : lsn: Lsn,
439 2252422 : img: &[u8],
440 2252422 : ) -> anyhow::Result<()> {
441 2252422 : let cache_key = CacheKey::MaterializedPage {
442 2252422 : hash_key: MaterializedPageHashKey {
443 2252422 : tenant_id,
444 2252422 : timeline_id,
445 2252422 : key,
446 2252422 : },
447 2252422 : lsn,
448 2252422 : };
449 :
450 2252422 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
451 : loop {
452 : // First check if the key already exists in the cache.
453 2252459 : if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
454 : // The page was found in the mapping. Lock the slot, and re-check
455 : // that it's still what we expected (because we don't released the mapping
456 : // lock already, another thread could have evicted the page)
457 4187 : let slot = &self.slots[slot_idx];
458 4187 : let inner = slot.inner.write().await;
459 4187 : if inner.key.as_ref() == Some(&cache_key) {
460 4187 : slot.inc_usage_count();
461 : debug_assert!(
462 : {
463 4187 : let guard = inner.permit.lock().unwrap();
464 4187 : guard.upgrade().is_none()
465 : },
466 UBC 0 : "we hold a write lock, so, no one else should have a permit"
467 : );
468 CBC 4187 : debug_assert_eq!(inner.buf.len(), img.len());
469 : // We already had it in cache. Another thread must've put it there
470 : // concurrently. Check that it had the same contents that we
471 : // replayed.
472 4187 : assert!(inner.buf == img);
473 4187 : return Ok(());
474 UBC 0 : }
475 CBC 2248272 : }
476 2248272 : debug_assert!(permit.is_some());
477 :
478 : // Not found. Find a victim buffer
479 2248272 : let (slot_idx, mut inner) = self
480 2248272 : .find_victim(permit.as_ref().unwrap())
481 UBC 0 : .await
482 CBC 2248272 : .context("Failed to find evict victim")?;
483 :
484 : // Insert mapping for this. At this point, we may find that another
485 : // thread did the same thing concurrently. In that case, we evicted
486 : // our victim buffer unnecessarily. Put it into the free list and
487 : // continue with the slot that the other thread chose.
488 2248272 : if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
489 : // TODO: put to free list
490 :
491 : // We now just loop back to start from beginning. This is not
492 : // optimal, we'll perform the lookup in the mapping again, which
493 : // is not really necessary because we already got
494 : // 'existing_slot_idx'. But this shouldn't happen often enough
495 : // to matter much.
496 37 : continue;
497 2248235 : }
498 2248235 :
499 2248235 : // Make the slot ready
500 2248235 : let slot = &self.slots[slot_idx];
501 2248235 : inner.key = Some(cache_key.clone());
502 2248235 : slot.set_usage_count(1);
503 : // Create a write guard for the slot so we go through the expected motions.
504 : debug_assert!(
505 : {
506 2248235 : let guard = inner.permit.lock().unwrap();
507 2248235 : guard.upgrade().is_none()
508 : },
509 UBC 0 : "we hold a write lock, so, no one else should have a permit"
510 : );
511 CBC 2248235 : let mut write_guard = PageWriteGuard {
512 2248235 : state: PageWriteGuardState::Invalid {
513 2248235 : _permit: permit.take().unwrap(),
514 2248235 : inner,
515 2248235 : },
516 2248235 : };
517 2248235 : write_guard.copy_from_slice(img);
518 2248235 : let _ = write_guard.mark_valid();
519 2248235 : return Ok(());
520 : }
521 2252422 : }
522 :
523 : // Section 1.2: Public interface functions for working with immutable file pages.
524 :
525 292271765 : pub async fn read_immutable_buf(
526 292271765 : &self,
527 292271765 : file_id: FileId,
528 292271765 : blkno: u32,
529 292271765 : ctx: &RequestContext,
530 292271765 : ) -> anyhow::Result<ReadBufResult> {
531 292271415 : let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
532 292271415 :
533 292271415 : self.lock_for_read(&mut cache_key, ctx).await
534 292271411 : }
535 :
536 : //
537 : // Section 2: Internal interface functions for lookup/update.
538 : //
539 : // To add support for a new kind of "thing" to cache, you will need
540 : // to add public interface routines above, and code to deal with the
541 : // "mappings" after this section. But the routines in this section should
542 : // not require changes.
543 :
544 300896511 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
545 300896158 : let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
546 300896158 : match tokio::time::timeout(
547 300896158 : // Choose small timeout, neon_smgr does its own retries.
548 300896158 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
549 300896158 : Duration::from_secs(10),
550 300896158 : Arc::clone(&self.pinned_slots).acquire_owned(),
551 300896158 : )
552 1740328 : .await
553 : {
554 300896156 : Ok(res) => Ok(PinnedSlotsPermit(
555 300896156 : res.expect("this semaphore is never closed"),
556 300896156 : )),
557 UBC 0 : Err(_timeout) => {
558 0 : timer.stop_and_discard();
559 0 : crate::metrics::page_cache_errors_inc(
560 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
561 0 : );
562 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
563 : }
564 : }
565 CBC 300896156 : }
566 :
567 : /// Look up a page in the cache.
568 : ///
569 : /// If the search criteria is not exact, *cache_key is updated with the key
570 : /// for exact key of the returned page. (For materialized pages, that means
571 : /// that the LSN in 'cache_key' is updated with the LSN of the returned page
572 : /// version.)
573 : ///
574 : /// If no page is found, returns None and *cache_key is left unmodified.
575 : ///
576 298644517 : async fn try_lock_for_read(
577 298644517 : &self,
578 298644517 : cache_key: &mut CacheKey,
579 298644517 : permit: &mut Option<PinnedSlotsPermit>,
580 298644517 : ) -> Option<PageReadGuard> {
581 298644164 : let cache_key_orig = cache_key.clone();
582 298644164 : if let Some(slot_idx) = self.search_mapping(cache_key) {
583 : // The page was found in the mapping. Lock the slot, and re-check
584 : // that it's still what we expected (because we released the mapping
585 : // lock already, another thread could have evicted the page)
586 283296204 : let slot = &self.slots[slot_idx];
587 283296204 : let inner = slot.inner.read().await;
588 283296204 : if inner.key.as_ref() == Some(cache_key) {
589 283296194 : slot.inc_usage_count();
590 283296194 : return Some(PageReadGuard {
591 283296194 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
592 283296194 : slot_guard: inner,
593 283296194 : });
594 10 : } else {
595 10 : // search_mapping might have modified the search key; restore it.
596 10 : *cache_key = cache_key_orig;
597 10 : }
598 15347960 : }
599 15347970 : None
600 298644164 : }
601 :
602 : /// Return a locked buffer for given block.
603 : ///
604 : /// Like try_lock_for_read(), if the search criteria is not exact and the
605 : /// page is already found in the cache, *cache_key is updated.
606 : ///
607 : /// If the page is not found in the cache, this allocates a new buffer for
608 : /// it. The caller may then initialize the buffer with the contents, and
609 : /// call mark_valid().
610 : ///
611 : /// Example usage:
612 : ///
613 : /// ```ignore
614 : /// let cache = page_cache::get();
615 : ///
616 : /// match cache.lock_for_read(&key) {
617 : /// ReadBufResult::Found(read_guard) => {
618 : /// // The page was found in cache. Use it
619 : /// },
620 : /// ReadBufResult::NotFound(write_guard) => {
621 : /// // The page was not found in cache. Read it from disk into the
622 : /// // buffer.
623 : /// //read_my_page_from_disk(write_guard);
624 : ///
625 : /// // The buffer contents are now valid. Tell the page cache.
626 : /// write_guard.mark_valid();
627 : /// },
628 : /// }
629 : /// ```
630 : ///
631 292271765 : async fn lock_for_read(
632 292271765 : &self,
633 292271765 : cache_key: &mut CacheKey,
634 292271765 : ctx: &RequestContext,
635 292271765 : ) -> anyhow::Result<ReadBufResult> {
636 292271415 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
637 :
638 292271411 : let (read_access, hit) = match cache_key {
639 : CacheKey::MaterializedPage { .. } => {
640 UBC 0 : unreachable!("Materialized pages use lookup_materialized_page")
641 : }
642 CBC 292271411 : CacheKey::ImmutableFilePage { .. } => (
643 292271411 : &crate::metrics::PAGE_CACHE
644 292271411 : .for_ctx(ctx)
645 292271411 : .read_accesses_immutable,
646 292271411 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
647 292271411 : ),
648 292271411 : };
649 292271411 : read_access.inc();
650 292271411 :
651 292271411 : let mut is_first_iteration = true;
652 : loop {
653 : // First check if the key already exists in the cache.
654 292271842 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
655 281998393 : debug_assert!(permit.is_none());
656 281998393 : if is_first_iteration {
657 281997962 : hit.inc();
658 281997962 : }
659 281998393 : return Ok(ReadBufResult::Found(read_guard));
660 10273449 : }
661 10273449 : debug_assert!(permit.is_some());
662 10273449 : is_first_iteration = false;
663 :
664 : // Not found. Find a victim buffer
665 10273449 : let (slot_idx, mut inner) = self
666 10273449 : .find_victim(permit.as_ref().unwrap())
667 UBC 0 : .await
668 CBC 10273449 : .context("Failed to find evict victim")?;
669 :
670 : // Insert mapping for this. At this point, we may find that another
671 : // thread did the same thing concurrently. In that case, we evicted
672 : // our victim buffer unnecessarily. Put it into the free list and
673 : // continue with the slot that the other thread chose.
674 10273449 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
675 : // TODO: put to free list
676 :
677 : // We now just loop back to start from beginning. This is not
678 : // optimal, we'll perform the lookup in the mapping again, which
679 : // is not really necessary because we already got
680 : // 'existing_slot_idx'. But this shouldn't happen often enough
681 : // to matter much.
682 431 : continue;
683 10273018 : }
684 10273018 :
685 10273018 : // Make the slot ready
686 10273018 : let slot = &self.slots[slot_idx];
687 10273018 : inner.key = Some(cache_key.clone());
688 10273018 : slot.set_usage_count(1);
689 :
690 : debug_assert!(
691 : {
692 10273018 : let guard = inner.permit.lock().unwrap();
693 10273018 : guard.upgrade().is_none()
694 : },
695 UBC 0 : "we hold a write lock, so, no one else should have a permit"
696 : );
697 :
698 CBC 10273018 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
699 10273018 : state: PageWriteGuardState::Invalid {
700 10273018 : _permit: permit.take().unwrap(),
701 10273018 : inner,
702 10273018 : },
703 10273018 : }));
704 : }
705 292271411 : }
706 :
707 : //
708 : // Section 3: Mapping functions
709 : //
710 :
711 : /// Search for a page in the cache using the given search key.
712 : ///
713 : /// Returns the slot index, if any. If the search criteria is not exact,
714 : /// *cache_key is updated with the actual key of the found page.
715 : ///
716 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
717 : /// get recycled for an unrelated page immediately after this function
718 : /// returns. The caller is responsible for re-checking that the slot still
719 : /// contains the page with the same key before using it.
720 : ///
721 298644517 : fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
722 298644517 : match cache_key {
723 6372324 : CacheKey::MaterializedPage { hash_key, lsn } => {
724 6372324 : let map = self.materialized_page_map.read().unwrap();
725 6372324 : let versions = map.get(hash_key)?;
726 :
727 1456250 : let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
728 16 : Ok(version_idx) => version_idx,
729 1270 : Err(0) => return None,
730 1297786 : Err(version_idx) => version_idx - 1,
731 : };
732 1297802 : let version = &versions[version_idx];
733 1297802 : *lsn = version.lsn;
734 1297802 : Some(version.slot_idx)
735 : }
736 292272193 : CacheKey::ImmutableFilePage { file_id, blkno } => {
737 292272193 : let map = self.immutable_page_map.read().unwrap();
738 292272193 : Some(*map.get(&(*file_id, *blkno))?)
739 : }
740 : }
741 298644517 : }
742 :
743 : /// Search for a page in the cache using the given search key.
744 : ///
745 : /// Like 'search_mapping, but performs an "exact" search. Used for
746 : /// allocating a new buffer.
747 2252459 : fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
748 2252459 : match key {
749 2252459 : CacheKey::MaterializedPage { hash_key, lsn } => {
750 2252459 : let map = self.materialized_page_map.read().unwrap();
751 2252459 : let versions = map.get(hash_key)?;
752 :
753 440552 : if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
754 4187 : Some(versions[version_idx].slot_idx)
755 : } else {
756 336486 : None
757 : }
758 : }
759 UBC 0 : CacheKey::ImmutableFilePage { file_id, blkno } => {
760 0 : let map = self.immutable_page_map.read().unwrap();
761 0 : Some(*map.get(&(*file_id, *blkno))?)
762 : }
763 : }
764 CBC 2252459 : }
765 :
766 : ///
767 : /// Remove mapping for given key.
768 : ///
769 9824374 : fn remove_mapping(&self, old_key: &CacheKey) {
770 9824374 : match old_key {
771 : CacheKey::MaterializedPage {
772 2079692 : hash_key: old_hash_key,
773 2079692 : lsn: old_lsn,
774 2079692 : } => {
775 2079692 : let mut map = self.materialized_page_map.write().unwrap();
776 2079692 : if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
777 2079692 : let versions = old_entry.get_mut();
778 :
779 2444267 : if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
780 2079692 : versions.remove(version_idx);
781 2079692 : self.size_metrics
782 2079692 : .current_bytes_materialized_page
783 2079692 : .sub_page_sz(1);
784 2079692 : if versions.is_empty() {
785 1762097 : old_entry.remove_entry();
786 1762097 : }
787 UBC 0 : }
788 : } else {
789 0 : panic!("could not find old key in mapping")
790 : }
791 : }
792 CBC 7744682 : CacheKey::ImmutableFilePage { file_id, blkno } => {
793 7744682 : let mut map = self.immutable_page_map.write().unwrap();
794 7744682 : map.remove(&(*file_id, *blkno))
795 7744682 : .expect("could not find old key in mapping");
796 7744682 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
797 7744682 : }
798 : }
799 9824374 : }
800 :
801 : ///
802 : /// Insert mapping for given key.
803 : ///
804 : /// If a mapping already existed for the given key, returns the slot index
805 : /// of the existing mapping and leaves it untouched.
806 12521726 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
807 12521726 : match new_key {
808 : CacheKey::MaterializedPage {
809 2248272 : hash_key: new_key,
810 2248272 : lsn: new_lsn,
811 2248272 : } => {
812 2248272 : let mut map = self.materialized_page_map.write().unwrap();
813 2248272 : let versions = map.entry(new_key.clone()).or_default();
814 2248272 : match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
815 37 : Ok(version_idx) => Some(versions[version_idx].slot_idx),
816 2248235 : Err(version_idx) => {
817 2248235 : versions.insert(
818 2248235 : version_idx,
819 2248235 : Version {
820 2248235 : lsn: *new_lsn,
821 2248235 : slot_idx,
822 2248235 : },
823 2248235 : );
824 2248235 : self.size_metrics
825 2248235 : .current_bytes_materialized_page
826 2248235 : .add_page_sz(1);
827 2248235 : None
828 : }
829 : }
830 : }
831 :
832 10273454 : CacheKey::ImmutableFilePage { file_id, blkno } => {
833 10273454 : let mut map = self.immutable_page_map.write().unwrap();
834 10273454 : match map.entry((*file_id, *blkno)) {
835 431 : Entry::Occupied(entry) => Some(*entry.get()),
836 10273023 : Entry::Vacant(entry) => {
837 10273023 : entry.insert(slot_idx);
838 10273023 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
839 10273023 : None
840 : }
841 : }
842 : }
843 : }
844 12521726 : }
845 :
846 : //
847 : // Section 4: Misc internal helpers
848 : //
849 :
850 : /// Find a slot to evict.
851 : ///
852 : /// On return, the slot is empty and write-locked.
853 12521726 : async fn find_victim(
854 12521726 : &self,
855 12521726 : _permit_witness: &PinnedSlotsPermit,
856 12521726 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
857 12521721 : let iter_limit = self.slots.len() * 10;
858 12521721 : let mut iters = 0;
859 44918460 : loop {
860 44918460 : iters += 1;
861 44918460 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
862 44918460 :
863 44918460 : let slot = &self.slots[slot_idx];
864 44918460 :
865 44918460 : if slot.dec_usage_count() == 0 {
866 12521726 : let mut inner = match slot.inner.try_write() {
867 12521721 : Ok(inner) => inner,
868 5 : Err(_err) => {
869 5 : if iters > iter_limit {
870 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
871 : // any particular number of iterations: other threads might race ahead and acquire and
872 : // release pins just as we're scanning the array.
873 : //
874 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
875 : // slots. There are two threads running concurrently, A and B. A has just
876 : // acquired the permit from the semaphore.
877 : //
878 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
879 : // B: Acquire permit.
880 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
881 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
882 : // B: Release pin and permit again
883 : // B: Acquire permit.
884 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
885 : // B: Release pin and permit again
886 : //
887 : // Now we're back in the starting situation that both slots have
888 : // usage_count 1, but A has now been through one iteration of the
889 : // find_victim() loop. This can repeat indefinitely and on each
890 : // iteration, A's iteration count increases by one.
891 : //
892 : // So, even though the semaphore for the permits is fair, the victim search
893 : // itself happens in parallel and is not fair.
894 : // Hence even with a permit, a task can theoretically be starved.
895 : // To avoid this, we'd need tokio to give priority to tasks that are holding
896 : // permits for longer.
897 : // Note that just yielding to tokio during iteration without such
898 : // priority boosting is likely counter-productive. We'd just give more opportunities
899 : // for B to bump usage count, further starving A.
900 UBC 0 : crate::metrics::page_cache_errors_inc(
901 0 : crate::metrics::PageCacheErrorKind::EvictIterLimit,
902 0 : );
903 0 : anyhow::bail!("exceeded evict iter limit");
904 CBC 5 : }
905 5 : continue;
906 : }
907 : };
908 12521721 : if let Some(old_key) = &inner.key {
909 9824369 : // remove mapping for old buffer
910 9824369 : self.remove_mapping(old_key);
911 9824369 : inner.key = None;
912 9824369 : }
913 12521721 : crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
914 12521721 : return Ok((slot_idx, inner));
915 32396734 : }
916 : }
917 12521721 : }
918 :
919 : /// Initialize a new page cache
920 : ///
921 : /// This should be called only once at page server startup.
922 561 : fn new(num_pages: usize) -> Self {
923 561 : assert!(num_pages > 0, "page cache size must be > 0");
924 :
925 : // We could use Vec::leak here, but that potentially also leaks
926 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
927 : // this is avoided.
928 561 : let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
929 561 :
930 561 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
931 561 : size_metrics.max_bytes.set_page_sz(num_pages);
932 561 : size_metrics.current_bytes_immutable.set_page_sz(0);
933 561 : size_metrics.current_bytes_materialized_page.set_page_sz(0);
934 561 :
935 561 : let slots = page_buffer
936 561 : .chunks_exact_mut(PAGE_SZ)
937 4564326 : .map(|chunk| {
938 4564326 : let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
939 4564326 :
940 4564326 : Slot {
941 4564326 : inner: tokio::sync::RwLock::new(SlotInner {
942 4564326 : key: None,
943 4564326 : buf,
944 4564326 : permit: std::sync::Mutex::new(Weak::new()),
945 4564326 : }),
946 4564326 : usage_count: AtomicU8::new(0),
947 4564326 : }
948 4564326 : })
949 561 : .collect();
950 561 :
951 561 : Self {
952 561 : materialized_page_map: Default::default(),
953 561 : immutable_page_map: Default::default(),
954 561 : slots,
955 561 : next_evict_slot: AtomicUsize::new(0),
956 561 : size_metrics,
957 561 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
958 561 : }
959 561 : }
960 : }
961 :
962 : trait PageSzBytesMetric {
963 : fn set_page_sz(&self, count: usize);
964 : fn add_page_sz(&self, count: usize);
965 : fn sub_page_sz(&self, count: usize);
966 : }
967 :
968 : #[inline(always)]
969 22347267 : fn count_times_page_sz(count: usize) -> u64 {
970 22347267 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
971 22347267 : }
972 :
973 : impl PageSzBytesMetric for metrics::UIntGauge {
974 1683 : fn set_page_sz(&self, count: usize) {
975 1683 : self.set(count_times_page_sz(count));
976 1683 : }
977 12521234 : fn add_page_sz(&self, count: usize) {
978 12521234 : self.add(count_times_page_sz(count));
979 12521234 : }
980 9824350 : fn sub_page_sz(&self, count: usize) {
981 9824350 : self.sub(count_times_page_sz(count));
982 9824350 : }
983 : }
|