TLA Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Materialized pages**, filled & used by page reconstruction
21 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
22 : //!
23 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
24 : //! It uses the page cache only for the blocks that are already fully written and immutable.
25 : //!
26 : //! # Filling The Page Cache
27 : //!
28 : //! Page cache maps from a cache key to a buffer slot.
29 : //! The cache key uniquely identifies the piece of data that is being cached.
30 : //!
31 : //! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`].
32 : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
33 : //!
34 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
35 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
36 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
37 : //! * Get a [`FileId`] using [`next_file_id`].
38 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
39 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
40 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
41 : //! a read guard for the page. Just use it.
42 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
43 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
44 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
45 : //! Then try again to [`PageCache::read_immutable_buf`].
46 : //! Unless there's high cache pressure, the page should now be cached.
47 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
48 : //!
49 : //! # Locking
50 : //!
51 : //! There are two levels of locking involved: There's one lock for the "mapping"
52 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
53 : //! slot, and a separate lock on each slot. To read or write the contents of a
54 : //! slot, you must hold the lock on the slot in read or write mode,
55 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
56 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
57 : //! the slot at the same time.
58 : //!
59 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
60 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
61 : //! in the cache, you would first look up the mapping, while holding the mapping
62 : //! lock, and then lock the slot. You must release the mapping lock in between,
63 : //! to obey the lock ordering and avoid deadlock.
64 : //!
65 : //! A slot can momentarily have invalid contents, even if it's already been
66 : //! inserted to the mapping, but you must hold the write-lock on the slot until
67 : //! the contents are valid. If you need to release the lock without initializing
68 : //! the contents, you must remove the mapping first. We make that easy for the
69 : //! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
70 : //! initialized it. If the guard is dropped without calling mark_valid(), the
71 : //! mapping is automatically removed and the slot is marked free.
72 : //!
73 :
74 : use std::{
75 : collections::{hash_map::Entry, HashMap},
76 : convert::TryInto,
77 : sync::{
78 : atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
79 : Arc, Weak,
80 : },
81 : time::Duration,
82 : };
83 :
84 : use anyhow::Context;
85 : use once_cell::sync::OnceCell;
86 : use pageserver_api::shard::TenantShardId;
87 : use utils::{id::TimelineId, lsn::Lsn};
88 :
89 : use crate::{
90 : context::RequestContext,
91 : metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
92 : repository::Key,
93 : };
94 :
95 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
96 : const TEST_PAGE_CACHE_SIZE: usize = 50;
97 :
98 : ///
99 : /// Initialize the page cache. This must be called once at page server startup.
100 : ///
101 CBC 557 : pub fn init(size: usize) {
102 557 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
103 UBC 0 : panic!("page cache already initialized");
104 CBC 557 : }
105 557 : }
106 :
107 : ///
108 : /// Get a handle to the page cache.
109 : ///
110 161197714 : pub fn get() -> &'static PageCache {
111 161197714 : //
112 161197714 : // In unit tests, page server startup doesn't happen and no one calls
113 161197714 : // page_cache::init(). Initialize it here with a tiny cache, so that the
114 161197714 : // page cache is usable in unit tests.
115 161197714 : //
116 161197714 : if cfg!(test) {
117 1670164 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
118 : } else {
119 159527550 : PAGE_CACHE.get().expect("page cache not initialized")
120 : }
121 161197714 : }
122 :
123 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
124 : const MAX_USAGE_COUNT: u8 = 5;
125 :
126 : /// See module-level comment.
127 298697992 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
128 : pub struct FileId(u64);
129 :
130 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
131 :
132 : /// See module-level comment.
133 38110 : pub fn next_file_id() -> FileId {
134 38110 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
135 38110 : }
136 :
137 : ///
138 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
139 : ///
140 169029285 : #[derive(Debug, PartialEq, Eq, Clone)]
141 : #[allow(clippy::enum_variant_names)]
142 : enum CacheKey {
143 : MaterializedPage {
144 : hash_key: MaterializedPageHashKey,
145 : lsn: Lsn,
146 : },
147 : ImmutableFilePage {
148 : file_id: FileId,
149 : blkno: u32,
150 : },
151 : }
152 :
153 11957545 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
154 : struct MaterializedPageHashKey {
155 : /// Why is this TenantShardId rather than TenantId?
156 : ///
157 : /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this
158 : /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this
159 : /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are
160 : /// special-cased in some other way.
161 : tenant_shard_id: TenantShardId,
162 : timeline_id: TimelineId,
163 : key: Key,
164 : }
165 :
166 UBC 0 : #[derive(Clone)]
167 : struct Version {
168 : lsn: Lsn,
169 : slot_idx: usize,
170 : }
171 :
172 : struct Slot {
173 : inner: tokio::sync::RwLock<SlotInner>,
174 : usage_count: AtomicU8,
175 : }
176 :
177 : struct SlotInner {
178 : key: Option<CacheKey>,
179 : // for `coalesce_readers_permit`
180 : permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
181 : buf: &'static mut [u8; PAGE_SZ],
182 : }
183 :
184 : impl Slot {
185 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
186 CBC 146713048 : fn inc_usage_count(&self) {
187 146713048 : let _ = self
188 146713048 : .usage_count
189 146714498 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
190 146714498 : if val == MAX_USAGE_COUNT {
191 128236445 : None
192 : } else {
193 18478053 : Some(val + 1)
194 : }
195 146714498 : });
196 146713048 : }
197 :
198 : /// Decrement usage count on the buffer, unless it's already zero. Returns
199 : /// the old usage count.
200 32163023 : fn dec_usage_count(&self) -> u8 {
201 32163023 : let count_res =
202 32163023 : self.usage_count
203 32163023 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
204 32163020 : if val == 0 {
205 9888455 : None
206 : } else {
207 22274565 : Some(val - 1)
208 : }
209 32163023 : });
210 32163023 :
211 32163023 : match count_res {
212 22274566 : Ok(usage_count) => usage_count,
213 9888457 : Err(usage_count) => usage_count,
214 : }
215 32163023 : }
216 :
217 : /// Sets the usage count to a specific value.
218 9887878 : fn set_usage_count(&self, count: u8) {
219 9887878 : self.usage_count.store(count, Ordering::Relaxed);
220 9887878 : }
221 : }
222 :
223 : impl SlotInner {
224 : /// If there is aready a reader, drop our permit and share its permit, just like we share read access.
225 146713007 : fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
226 146713007 : let mut guard = self.permit.lock().unwrap();
227 146713007 : if let Some(existing_permit) = guard.upgrade() {
228 93440 : drop(guard);
229 93440 : drop(permit);
230 93440 : existing_permit
231 : } else {
232 146619567 : let permit = Arc::new(permit);
233 146619567 : *guard = Arc::downgrade(&permit);
234 146619567 : permit
235 : }
236 146713007 : }
237 : }
238 :
239 : pub struct PageCache {
240 : /// This contains the mapping from the cache key to buffer slot that currently
241 : /// contains the page, if any.
242 : ///
243 : /// TODO: This is protected by a single lock. If that becomes a bottleneck,
244 : /// this HashMap can be replaced with a more concurrent version, there are
245 : /// plenty of such crates around.
246 : ///
247 : /// If you add support for caching different kinds of objects, each object kind
248 : /// can have a separate mapping map, next to this field.
249 : materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
250 :
251 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
252 :
253 : /// The actual buffers with their metadata.
254 : slots: Box<[Slot]>,
255 :
256 : pinned_slots: Arc<tokio::sync::Semaphore>,
257 :
258 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
259 : /// This is interpreted modulo the page cache size.
260 : next_evict_slot: AtomicUsize,
261 :
262 : size_metrics: &'static PageCacheSizeMetrics,
263 : }
264 :
265 : struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
266 :
267 : ///
268 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
269 : /// until the guard is dropped.
270 : ///
271 : pub struct PageReadGuard<'i> {
272 : _permit: Arc<PinnedSlotsPermit>,
273 : slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
274 : }
275 :
276 : impl std::ops::Deref for PageReadGuard<'_> {
277 : type Target = [u8; PAGE_SZ];
278 :
279 305328603 : fn deref(&self) -> &Self::Target {
280 305328603 : self.slot_guard.buf
281 305328603 : }
282 : }
283 :
284 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
285 UBC 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
286 0 : self.slot_guard.buf
287 0 : }
288 : }
289 :
290 : ///
291 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
292 : /// until the guard is dropped.
293 : ///
294 : /// Counterintuitively, this is used even for a read, if the requested page is not
295 : /// currently found in the page cache. In that case, the caller of lock_for_read()
296 : /// is expected to fill in the page contents and call mark_valid().
297 : pub struct PageWriteGuard<'i> {
298 : state: PageWriteGuardState<'i>,
299 : }
300 :
301 : enum PageWriteGuardState<'i> {
302 : Invalid {
303 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
304 : _permit: PinnedSlotsPermit,
305 : },
306 : Downgraded,
307 : }
308 :
309 : impl std::ops::DerefMut for PageWriteGuard<'_> {
310 CBC 9887878 : fn deref_mut(&mut self) -> &mut Self::Target {
311 9887878 : match &mut self.state {
312 9887878 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
313 UBC 0 : PageWriteGuardState::Downgraded => unreachable!(),
314 : }
315 CBC 9887878 : }
316 : }
317 :
318 : impl std::ops::Deref for PageWriteGuard<'_> {
319 : type Target = [u8; PAGE_SZ];
320 :
321 UBC 0 : fn deref(&self) -> &Self::Target {
322 0 : match &self.state {
323 0 : PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
324 0 : PageWriteGuardState::Downgraded => unreachable!(),
325 : }
326 0 : }
327 : }
328 :
329 : impl<'a> PageWriteGuard<'a> {
330 : /// Mark that the buffer contents are now valid.
331 : #[must_use]
332 CBC 9887878 : pub fn mark_valid(mut self) -> PageReadGuard<'a> {
333 9887878 : let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
334 9887878 : match prev {
335 9887878 : PageWriteGuardState::Invalid { inner, _permit } => {
336 9887878 : assert!(inner.key.is_some());
337 9887878 : PageReadGuard {
338 9887878 : _permit: Arc::new(_permit),
339 9887878 : slot_guard: inner.downgrade(),
340 9887878 : }
341 : }
342 UBC 0 : PageWriteGuardState::Downgraded => unreachable!(),
343 : }
344 CBC 9887878 : }
345 : }
346 :
347 : impl Drop for PageWriteGuard<'_> {
348 : ///
349 : /// If the buffer was allocated for a page that was not already in the
350 : /// cache, but the lock_for_read/write() caller dropped the buffer without
351 : /// initializing it, remove the mapping from the page cache.
352 : ///
353 9887873 : fn drop(&mut self) {
354 9887873 : match &mut self.state {
355 UBC 0 : PageWriteGuardState::Invalid { inner, _permit } => {
356 0 : assert!(inner.key.is_some());
357 0 : let self_key = inner.key.as_ref().unwrap();
358 0 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
359 0 : inner.key = None;
360 : }
361 CBC 9887873 : PageWriteGuardState::Downgraded => {}
362 : }
363 9887873 : }
364 : }
365 :
366 : /// lock_for_read() return value
367 : pub enum ReadBufResult<'a> {
368 : Found(PageReadGuard<'a>),
369 : NotFound(PageWriteGuard<'a>),
370 : }
371 :
372 : impl PageCache {
373 : //
374 : // Section 1.1: Public interface functions for looking up and memorizing materialized page
375 : // versions in the page cache
376 : //
377 :
378 : /// Look up a materialized page version.
379 : ///
380 : /// The 'lsn' is an upper bound, this will return the latest version of
381 : /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
382 : /// returned page.
383 5939235 : pub async fn lookup_materialized_page(
384 5939235 : &self,
385 5939235 : tenant_shard_id: TenantShardId,
386 5939235 : timeline_id: TimelineId,
387 5939235 : key: &Key,
388 5939235 : lsn: Lsn,
389 5939235 : ctx: &RequestContext,
390 5939235 : ) -> Option<(Lsn, PageReadGuard)> {
391 5939232 : let Ok(permit) = self.try_get_pinned_slot_permit().await else {
392 UBC 0 : return None;
393 : };
394 :
395 CBC 5939232 : crate::metrics::PAGE_CACHE
396 5939232 : .for_ctx(ctx)
397 5939232 : .read_accesses_materialized_page
398 5939232 : .inc();
399 5939232 :
400 5939232 : let mut cache_key = CacheKey::MaterializedPage {
401 5939232 : hash_key: MaterializedPageHashKey {
402 5939232 : tenant_shard_id,
403 5939232 : timeline_id,
404 5939232 : key: *key,
405 5939232 : },
406 5939232 : lsn,
407 5939232 : };
408 :
409 5939232 : if let Some(guard) = self
410 5939232 : .try_lock_for_read(&mut cache_key, &mut Some(permit))
411 1119 : .await
412 : {
413 : if let CacheKey::MaterializedPage {
414 : hash_key: _,
415 1342450 : lsn: available_lsn,
416 1342450 : } = cache_key
417 : {
418 1342450 : if available_lsn == lsn {
419 76548 : crate::metrics::PAGE_CACHE
420 76548 : .for_ctx(ctx)
421 76548 : .read_hits_materialized_page_exact
422 76548 : .inc();
423 1265902 : } else {
424 1265902 : crate::metrics::PAGE_CACHE
425 1265902 : .for_ctx(ctx)
426 1265902 : .read_hits_materialized_page_older_lsn
427 1265902 : .inc();
428 1265902 : }
429 1342450 : Some((available_lsn, guard))
430 : } else {
431 UBC 0 : panic!("unexpected key type in slot");
432 : }
433 : } else {
434 CBC 4596782 : None
435 : }
436 5939232 : }
437 :
438 : ///
439 : /// Store an image of the given page in the cache.
440 : ///
441 2056879 : pub async fn memorize_materialized_page(
442 2056879 : &self,
443 2056879 : tenant_shard_id: TenantShardId,
444 2056879 : timeline_id: TimelineId,
445 2056879 : key: Key,
446 2056879 : lsn: Lsn,
447 2056879 : img: &[u8],
448 2056879 : ) -> anyhow::Result<()> {
449 2056879 : let cache_key = CacheKey::MaterializedPage {
450 2056879 : hash_key: MaterializedPageHashKey {
451 2056879 : tenant_shard_id,
452 2056879 : timeline_id,
453 2056879 : key,
454 2056879 : },
455 2056879 : lsn,
456 2056879 : };
457 :
458 2056879 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
459 : loop {
460 : // First check if the key already exists in the cache.
461 2056879 : if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
462 : // The page was found in the mapping. Lock the slot, and re-check
463 : // that it's still what we expected (because we don't released the mapping
464 : // lock already, another thread could have evicted the page)
465 41 : let slot = &self.slots[slot_idx];
466 41 : let inner = slot.inner.write().await;
467 41 : if inner.key.as_ref() == Some(&cache_key) {
468 41 : slot.inc_usage_count();
469 : debug_assert!(
470 : {
471 41 : let guard = inner.permit.lock().unwrap();
472 41 : guard.upgrade().is_none()
473 : },
474 UBC 0 : "we hold a write lock, so, no one else should have a permit"
475 : );
476 CBC 41 : debug_assert_eq!(inner.buf.len(), img.len());
477 : // We already had it in cache. Another thread must've put it there
478 : // concurrently. Check that it had the same contents that we
479 : // replayed.
480 41 : assert!(inner.buf == img);
481 41 : return Ok(());
482 UBC 0 : }
483 CBC 2056838 : }
484 2056838 : debug_assert!(permit.is_some());
485 :
486 : // Not found. Find a victim buffer
487 2056838 : let (slot_idx, mut inner) = self
488 2056838 : .find_victim(permit.as_ref().unwrap())
489 UBC 0 : .await
490 CBC 2056838 : .context("Failed to find evict victim")?;
491 :
492 : // Insert mapping for this. At this point, we may find that another
493 : // thread did the same thing concurrently. In that case, we evicted
494 : // our victim buffer unnecessarily. Put it into the free list and
495 : // continue with the slot that the other thread chose.
496 2056838 : if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
497 : // TODO: put to free list
498 :
499 : // We now just loop back to start from beginning. This is not
500 : // optimal, we'll perform the lookup in the mapping again, which
501 : // is not really necessary because we already got
502 : // 'existing_slot_idx'. But this shouldn't happen often enough
503 : // to matter much.
504 UBC 0 : continue;
505 CBC 2056838 : }
506 2056838 :
507 2056838 : // Make the slot ready
508 2056838 : let slot = &self.slots[slot_idx];
509 2056838 : inner.key = Some(cache_key.clone());
510 2056838 : slot.set_usage_count(1);
511 : // Create a write guard for the slot so we go through the expected motions.
512 : debug_assert!(
513 : {
514 2056838 : let guard = inner.permit.lock().unwrap();
515 2056838 : guard.upgrade().is_none()
516 : },
517 UBC 0 : "we hold a write lock, so, no one else should have a permit"
518 : );
519 CBC 2056838 : let mut write_guard = PageWriteGuard {
520 2056838 : state: PageWriteGuardState::Invalid {
521 2056838 : _permit: permit.take().unwrap(),
522 2056838 : inner,
523 2056838 : },
524 2056838 : };
525 2056838 : write_guard.copy_from_slice(img);
526 2056838 : let _ = write_guard.mark_valid();
527 2056838 : return Ok(());
528 : }
529 2056879 : }
530 :
531 : // Section 1.2: Public interface functions for working with immutable file pages.
532 :
533 153201600 : pub async fn read_immutable_buf(
534 153201600 : &self,
535 153201600 : file_id: FileId,
536 153201600 : blkno: u32,
537 153201600 : ctx: &RequestContext,
538 153201600 : ) -> anyhow::Result<ReadBufResult> {
539 153201546 : let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
540 153201546 :
541 153201546 : self.lock_for_read(&mut cache_key, ctx).await
542 153201542 : }
543 :
544 : //
545 : // Section 2: Internal interface functions for lookup/update.
546 : //
547 : // To add support for a new kind of "thing" to cache, you will need
548 : // to add public interface routines above, and code to deal with the
549 : // "mappings" after this section. But the routines in this section should
550 : // not require changes.
551 :
552 161197714 : async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
553 161197657 : let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
554 161197657 : match tokio::time::timeout(
555 161197657 : // Choose small timeout, neon_smgr does its own retries.
556 161197657 : // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
557 161197657 : Duration::from_secs(10),
558 161197657 : Arc::clone(&self.pinned_slots).acquire_owned(),
559 161197657 : )
560 900218 : .await
561 : {
562 161197656 : Ok(res) => Ok(PinnedSlotsPermit(
563 161197656 : res.expect("this semaphore is never closed"),
564 161197656 : )),
565 UBC 0 : Err(_timeout) => {
566 0 : timer.stop_and_discard();
567 0 : crate::metrics::page_cache_errors_inc(
568 0 : crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
569 0 : );
570 0 : anyhow::bail!("timeout: there were page guards alive for all page cache slots")
571 : }
572 : }
573 CBC 161197656 : }
574 :
575 : /// Look up a page in the cache.
576 : ///
577 : /// If the search criteria is not exact, *cache_key is updated with the key
578 : /// for exact key of the returned page. (For materialized pages, that means
579 : /// that the LSN in 'cache_key' is updated with the LSN of the returned page
580 : /// version.)
581 : ///
582 : /// If no page is found, returns None and *cache_key is left unmodified.
583 : ///
584 159141407 : async fn try_lock_for_read(
585 159141407 : &self,
586 159141407 : cache_key: &mut CacheKey,
587 159141407 : permit: &mut Option<PinnedSlotsPermit>,
588 159141407 : ) -> Option<PageReadGuard> {
589 159141352 : let cache_key_orig = cache_key.clone();
590 159141352 : if let Some(slot_idx) = self.search_mapping(cache_key) {
591 : // The page was found in the mapping. Lock the slot, and re-check
592 : // that it's still what we expected (because we released the mapping
593 : // lock already, another thread could have evicted the page)
594 146712959 : let slot = &self.slots[slot_idx];
595 146712959 : let inner = slot.inner.read().await;
596 146712958 : if inner.key.as_ref() == Some(cache_key) {
597 146712955 : slot.inc_usage_count();
598 146712955 : return Some(PageReadGuard {
599 146712955 : _permit: inner.coalesce_readers_permit(permit.take().unwrap()),
600 146712955 : slot_guard: inner,
601 146712955 : });
602 3 : } else {
603 3 : // search_mapping might have modified the search key; restore it.
604 3 : *cache_key = cache_key_orig;
605 3 : }
606 12428393 : }
607 12428396 : None
608 159141351 : }
609 :
610 : /// Return a locked buffer for given block.
611 : ///
612 : /// Like try_lock_for_read(), if the search criteria is not exact and the
613 : /// page is already found in the cache, *cache_key is updated.
614 : ///
615 : /// If the page is not found in the cache, this allocates a new buffer for
616 : /// it. The caller may then initialize the buffer with the contents, and
617 : /// call mark_valid().
618 : ///
619 : /// Example usage:
620 : ///
621 : /// ```ignore
622 : /// let cache = page_cache::get();
623 : ///
624 : /// match cache.lock_for_read(&key) {
625 : /// ReadBufResult::Found(read_guard) => {
626 : /// // The page was found in cache. Use it
627 : /// },
628 : /// ReadBufResult::NotFound(write_guard) => {
629 : /// // The page was not found in cache. Read it from disk into the
630 : /// // buffer.
631 : /// //read_my_page_from_disk(write_guard);
632 : ///
633 : /// // The buffer contents are now valid. Tell the page cache.
634 : /// write_guard.mark_valid();
635 : /// },
636 : /// }
637 : /// ```
638 : ///
639 153201600 : async fn lock_for_read(
640 153201600 : &self,
641 153201600 : cache_key: &mut CacheKey,
642 153201600 : ctx: &RequestContext,
643 153201600 : ) -> anyhow::Result<ReadBufResult> {
644 153201546 : let mut permit = Some(self.try_get_pinned_slot_permit().await?);
645 :
646 153201545 : let (read_access, hit) = match cache_key {
647 : CacheKey::MaterializedPage { .. } => {
648 UBC 0 : unreachable!("Materialized pages use lookup_materialized_page")
649 : }
650 CBC 153201545 : CacheKey::ImmutableFilePage { .. } => (
651 153201545 : &crate::metrics::PAGE_CACHE
652 153201545 : .for_ctx(ctx)
653 153201545 : .read_accesses_immutable,
654 153201545 : &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable,
655 153201545 : ),
656 153201545 : };
657 153201545 : read_access.inc();
658 153201545 :
659 153201545 : let mut is_first_iteration = true;
660 : loop {
661 : // First check if the key already exists in the cache.
662 153202120 : if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
663 145370504 : debug_assert!(permit.is_none());
664 145370504 : if is_first_iteration {
665 145369929 : hit.inc();
666 145369929 : }
667 145370504 : return Ok(ReadBufResult::Found(read_guard));
668 7831614 : }
669 7831614 : debug_assert!(permit.is_some());
670 7831614 : is_first_iteration = false;
671 :
672 : // Not found. Find a victim buffer
673 7831614 : let (slot_idx, mut inner) = self
674 7831614 : .find_victim(permit.as_ref().unwrap())
675 UBC 0 : .await
676 CBC 7831614 : .context("Failed to find evict victim")?;
677 :
678 : // Insert mapping for this. At this point, we may find that another
679 : // thread did the same thing concurrently. In that case, we evicted
680 : // our victim buffer unnecessarily. Put it into the free list and
681 : // continue with the slot that the other thread chose.
682 7831614 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
683 : // TODO: put to free list
684 :
685 : // We now just loop back to start from beginning. This is not
686 : // optimal, we'll perform the lookup in the mapping again, which
687 : // is not really necessary because we already got
688 : // 'existing_slot_idx'. But this shouldn't happen often enough
689 : // to matter much.
690 575 : continue;
691 7831039 : }
692 7831039 :
693 7831039 : // Make the slot ready
694 7831039 : let slot = &self.slots[slot_idx];
695 7831039 : inner.key = Some(cache_key.clone());
696 7831039 : slot.set_usage_count(1);
697 :
698 : debug_assert!(
699 : {
700 7831039 : let guard = inner.permit.lock().unwrap();
701 7831039 : guard.upgrade().is_none()
702 : },
703 LBC (1) : "we hold a write lock, so, no one else should have a permit"
704 : );
705 :
706 CBC 7831039 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
707 7831039 : state: PageWriteGuardState::Invalid {
708 7831039 : _permit: permit.take().unwrap(),
709 7831039 : inner,
710 7831039 : },
711 7831039 : }));
712 : }
713 153201543 : }
714 :
715 : //
716 : // Section 3: Mapping functions
717 : //
718 :
719 : /// Search for a page in the cache using the given search key.
720 : ///
721 : /// Returns the slot index, if any. If the search criteria is not exact,
722 : /// *cache_key is updated with the actual key of the found page.
723 : ///
724 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
725 : /// get recycled for an unrelated page immediately after this function
726 : /// returns. The caller is responsible for re-checking that the slot still
727 : /// contains the page with the same key before using it.
728 : ///
729 159141407 : fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
730 159141407 : match cache_key {
731 5939234 : CacheKey::MaterializedPage { hash_key, lsn } => {
732 5939234 : let map = self.materialized_page_map.read().unwrap();
733 5939234 : let versions = map.get(hash_key)?;
734 :
735 1518724 : let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
736 76548 : Ok(version_idx) => version_idx,
737 167 : Err(0) => return None,
738 1265904 : Err(version_idx) => version_idx - 1,
739 : };
740 1342452 : let version = &versions[version_idx];
741 1342452 : *lsn = version.lsn;
742 1342452 : Some(version.slot_idx)
743 : }
744 153202173 : CacheKey::ImmutableFilePage { file_id, blkno } => {
745 153202173 : let map = self.immutable_page_map.read().unwrap();
746 153202173 : Some(*map.get(&(*file_id, *blkno))?)
747 : }
748 : }
749 159141407 : }
750 :
751 : /// Search for a page in the cache using the given search key.
752 : ///
753 : /// Like 'search_mapping, but performs an "exact" search. Used for
754 : /// allocating a new buffer.
755 2056879 : fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
756 2056879 : match key {
757 2056879 : CacheKey::MaterializedPage { hash_key, lsn } => {
758 2056879 : let map = self.materialized_page_map.read().unwrap();
759 2056879 : let versions = map.get(hash_key)?;
760 :
761 474290 : if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
762 41 : Some(versions[version_idx].slot_idx)
763 : } else {
764 359377 : None
765 : }
766 : }
767 UBC 0 : CacheKey::ImmutableFilePage { file_id, blkno } => {
768 0 : let map = self.immutable_page_map.read().unwrap();
769 0 : Some(*map.get(&(*file_id, *blkno))?)
770 : }
771 : }
772 CBC 2056879 : }
773 :
774 : ///
775 : /// Remove mapping for given key.
776 : ///
777 7295740 : fn remove_mapping(&self, old_key: &CacheKey) {
778 7295740 : match old_key {
779 : CacheKey::MaterializedPage {
780 1904635 : hash_key: old_hash_key,
781 1904635 : lsn: old_lsn,
782 1904635 : } => {
783 1904635 : let mut map = self.materialized_page_map.write().unwrap();
784 1904635 : if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
785 1904635 : let versions = old_entry.get_mut();
786 :
787 2295549 : if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
788 1904635 : versions.remove(version_idx);
789 1904635 : self.size_metrics
790 1904635 : .current_bytes_materialized_page
791 1904635 : .sub_page_sz(1);
792 1904635 : if versions.is_empty() {
793 1558559 : old_entry.remove_entry();
794 1558559 : }
795 UBC 0 : }
796 : } else {
797 0 : panic!("could not find old key in mapping")
798 : }
799 : }
800 CBC 5391105 : CacheKey::ImmutableFilePage { file_id, blkno } => {
801 5391105 : let mut map = self.immutable_page_map.write().unwrap();
802 5391105 : map.remove(&(*file_id, *blkno))
803 5391105 : .expect("could not find old key in mapping");
804 5391105 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
805 5391105 : }
806 : }
807 7295740 : }
808 :
809 : ///
810 : /// Insert mapping for given key.
811 : ///
812 : /// If a mapping already existed for the given key, returns the slot index
813 : /// of the existing mapping and leaves it untouched.
814 9888454 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
815 9888454 : match new_key {
816 : CacheKey::MaterializedPage {
817 2056838 : hash_key: new_key,
818 2056838 : lsn: new_lsn,
819 2056838 : } => {
820 2056838 : let mut map = self.materialized_page_map.write().unwrap();
821 2056838 : let versions = map.entry(new_key.clone()).or_default();
822 2056838 : match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
823 UBC 0 : Ok(version_idx) => Some(versions[version_idx].slot_idx),
824 CBC 2056838 : Err(version_idx) => {
825 2056838 : versions.insert(
826 2056838 : version_idx,
827 2056838 : Version {
828 2056838 : lsn: *new_lsn,
829 2056838 : slot_idx,
830 2056838 : },
831 2056838 : );
832 2056838 : self.size_metrics
833 2056838 : .current_bytes_materialized_page
834 2056838 : .add_page_sz(1);
835 2056838 : None
836 : }
837 : }
838 : }
839 :
840 7831616 : CacheKey::ImmutableFilePage { file_id, blkno } => {
841 7831616 : let mut map = self.immutable_page_map.write().unwrap();
842 7831616 : match map.entry((*file_id, *blkno)) {
843 575 : Entry::Occupied(entry) => Some(*entry.get()),
844 7831041 : Entry::Vacant(entry) => {
845 7831041 : entry.insert(slot_idx);
846 7831041 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
847 7831041 : None
848 : }
849 : }
850 : }
851 : }
852 9888454 : }
853 :
854 : //
855 : // Section 4: Misc internal helpers
856 : //
857 :
858 : /// Find a slot to evict.
859 : ///
860 : /// On return, the slot is empty and write-locked.
861 9888454 : async fn find_victim(
862 9888454 : &self,
863 9888454 : _permit_witness: &PinnedSlotsPermit,
864 9888454 : ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
865 9888452 : let iter_limit = self.slots.len() * 10;
866 9888452 : let mut iters = 0;
867 32163020 : loop {
868 32163020 : iters += 1;
869 32163020 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
870 32163020 :
871 32163020 : let slot = &self.slots[slot_idx];
872 32163020 :
873 32163020 : if slot.dec_usage_count() == 0 {
874 9888455 : let mut inner = match slot.inner.try_write() {
875 9888452 : Ok(inner) => inner,
876 3 : Err(_err) => {
877 3 : if iters > iter_limit {
878 : // NB: Even with the permits, there's no hard guarantee that we will find a slot with
879 : // any particular number of iterations: other threads might race ahead and acquire and
880 : // release pins just as we're scanning the array.
881 : //
882 : // Imagine that nslots is 2, and as starting point, usage_count==1 on all
883 : // slots. There are two threads running concurrently, A and B. A has just
884 : // acquired the permit from the semaphore.
885 : //
886 : // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
887 : // B: Acquire permit.
888 : // B: Look at slot 2, decrement its usage_count to zero and continue the search
889 : // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
890 : // B: Release pin and permit again
891 : // B: Acquire permit.
892 : // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
893 : // B: Release pin and permit again
894 : //
895 : // Now we're back in the starting situation that both slots have
896 : // usage_count 1, but A has now been through one iteration of the
897 : // find_victim() loop. This can repeat indefinitely and on each
898 : // iteration, A's iteration count increases by one.
899 : //
900 : // So, even though the semaphore for the permits is fair, the victim search
901 : // itself happens in parallel and is not fair.
902 : // Hence even with a permit, a task can theoretically be starved.
903 : // To avoid this, we'd need tokio to give priority to tasks that are holding
904 : // permits for longer.
905 : // Note that just yielding to tokio during iteration without such
906 : // priority boosting is likely counter-productive. We'd just give more opportunities
907 : // for B to bump usage count, further starving A.
908 UBC 0 : page_cache_eviction_metrics::observe(
909 0 : page_cache_eviction_metrics::Outcome::ItersExceeded {
910 0 : iters: iters.try_into().unwrap(),
911 0 : },
912 0 : );
913 0 : anyhow::bail!("exceeded evict iter limit");
914 CBC 3 : }
915 3 : continue;
916 : }
917 : };
918 9888452 : if let Some(old_key) = &inner.key {
919 7295738 : // remove mapping for old buffer
920 7295738 : self.remove_mapping(old_key);
921 7295738 : inner.key = None;
922 7295738 : page_cache_eviction_metrics::observe(
923 7295738 : page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
924 7295738 : iters: iters.try_into().unwrap(),
925 7295738 : },
926 7295738 : );
927 7295738 : } else {
928 2592714 : page_cache_eviction_metrics::observe(
929 2592714 : page_cache_eviction_metrics::Outcome::FoundSlotUnused {
930 2592714 : iters: iters.try_into().unwrap(),
931 2592714 : },
932 2592714 : );
933 2592714 : }
934 9888452 : return Ok((slot_idx, inner));
935 22274565 : }
936 : }
937 9888452 : }
938 :
939 : /// Initialize a new page cache
940 : ///
941 : /// This should be called only once at page server startup.
942 594 : fn new(num_pages: usize) -> Self {
943 594 : assert!(num_pages > 0, "page cache size must be > 0");
944 :
945 : // We could use Vec::leak here, but that potentially also leaks
946 : // uninitialized reserved capacity. With into_boxed_slice and Box::leak
947 : // this is avoided.
948 594 : let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
949 594 :
950 594 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
951 594 : size_metrics.max_bytes.set_page_sz(num_pages);
952 594 : size_metrics.current_bytes_immutable.set_page_sz(0);
953 594 : size_metrics.current_bytes_materialized_page.set_page_sz(0);
954 594 :
955 594 : let slots = page_buffer
956 594 : .chunks_exact_mut(PAGE_SZ)
957 4541550 : .map(|chunk| {
958 4541550 : let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
959 4541550 :
960 4541550 : Slot {
961 4541550 : inner: tokio::sync::RwLock::new(SlotInner {
962 4541550 : key: None,
963 4541550 : buf,
964 4541550 : permit: std::sync::Mutex::new(Weak::new()),
965 4541550 : }),
966 4541550 : usage_count: AtomicU8::new(0),
967 4541550 : }
968 4541550 : })
969 594 : .collect();
970 594 :
971 594 : Self {
972 594 : materialized_page_map: Default::default(),
973 594 : immutable_page_map: Default::default(),
974 594 : slots,
975 594 : next_evict_slot: AtomicUsize::new(0),
976 594 : size_metrics,
977 594 : pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
978 594 : }
979 594 : }
980 : }
981 :
982 : trait PageSzBytesMetric {
983 : fn set_page_sz(&self, count: usize);
984 : fn add_page_sz(&self, count: usize);
985 : fn sub_page_sz(&self, count: usize);
986 : }
987 :
988 : #[inline(always)]
989 17185392 : fn count_times_page_sz(count: usize) -> u64 {
990 17185392 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
991 17185392 : }
992 :
993 : impl PageSzBytesMetric for metrics::UIntGauge {
994 1782 : fn set_page_sz(&self, count: usize) {
995 1782 : self.set(count_times_page_sz(count));
996 1782 : }
997 9887874 : fn add_page_sz(&self, count: usize) {
998 9887874 : self.add(count_times_page_sz(count));
999 9887874 : }
1000 7295736 : fn sub_page_sz(&self, count: usize) {
1001 7295736 : self.sub(count_times_page_sz(count));
1002 7295736 : }
1003 : }
|