Line data Source code
1 : //!
2 : //! Global page cache
3 : //!
4 : //! The page cache uses up most of the memory in the page server. It is shared
5 : //! by all tenants, and it is used to store different kinds of pages. Sharing
6 : //! the cache allows memory to be dynamically allocated where it's needed the
7 : //! most.
8 : //!
9 : //! The page cache consists of fixed-size buffers, 8 kB each to match the
10 : //! PostgreSQL buffer size, and a Slot struct for each buffer to contain
11 : //! information about what's stored in the buffer.
12 : //!
13 : //! # Types Of Pages
14 : //!
15 : //! [`PageCache`] only supports immutable pages.
16 : //! Hence there is no need to worry about coherency.
17 : //!
18 : //! Two types of pages are supported:
19 : //!
20 : //! * **Materialized pages**, filled & used by page reconstruction
21 : //! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
22 : //!
23 : //! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
24 : //! It uses the page cache only for the blocks that are already fully written and immutable.
25 : //!
26 : //! # Filling The Page Cache
27 : //!
28 : //! Page cache maps from a cache key to a buffer slot.
29 : //! The cache key uniquely identifies the piece of data that is being cached.
30 : //!
31 : //! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
32 : //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
33 : //!
34 : //! The cache key for **immutable file** pages is [`FileId`] and a block number.
35 : //! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
36 : //! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
37 : //! * Get a [`FileId`] using [`next_file_id`].
38 : //! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
39 : //! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
40 : //! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
41 : //! a read guard for the page. Just use it.
42 : //! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
43 : //! a write guard for the page. Fill the page with the contents of the on-disk file.
44 : //! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
45 : //! Then try again to [`PageCache::read_immutable_buf`].
46 : //! Unless there's high cache pressure, the page should now be cached.
47 : //! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
48 : //!
49 : //! # Locking
50 : //!
51 : //! There are two levels of locking involved: There's one lock for the "mapping"
52 : //! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
53 : //! slot, and a separate lock on each slot. To read or write the contents of a
54 : //! slot, you must hold the lock on the slot in read or write mode,
55 : //! respectively. To change the mapping of a slot, i.e. to evict a page or to
56 : //! assign a buffer for a page, you must hold the mapping lock and the lock on
57 : //! the slot at the same time.
58 : //!
59 : //! Whenever you need to hold both locks simultaneously, the slot lock must be
60 : //! acquired first. This consistent ordering avoids deadlocks. To look up a page
61 : //! in the cache, you would first look up the mapping, while holding the mapping
62 : //! lock, and then lock the slot. You must release the mapping lock in between,
63 : //! to obey the lock ordering and avoid deadlock.
64 : //!
65 : //! A slot can momentarily have invalid contents, even if it's already been
66 : //! inserted to the mapping, but you must hold the write-lock on the slot until
67 : //! the contents are valid. If you need to release the lock without initializing
68 : //! the contents, you must remove the mapping first. We make that easy for the
69 : //! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
70 : //! page, the caller must explicitly call guard.mark_valid() after it has
71 : //! initialized it. If the guard is dropped without calling mark_valid(), the
72 : //! mapping is automatically removed and the slot is marked free.
73 : //!
74 :
75 : use std::{
76 : collections::{hash_map::Entry, HashMap},
77 : convert::TryInto,
78 : sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
79 : };
80 :
81 : use anyhow::Context;
82 : use once_cell::sync::OnceCell;
83 : use utils::{
84 : id::{TenantId, TimelineId},
85 : lsn::Lsn,
86 : };
87 :
88 : use crate::{metrics::PageCacheSizeMetrics, repository::Key};
89 :
90 : static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
91 : const TEST_PAGE_CACHE_SIZE: usize = 50;
92 :
93 : ///
94 : /// Initialize the page cache. This must be called once at page server startup.
95 : ///
96 575 : pub fn init(size: usize) {
97 575 : if PAGE_CACHE.set(PageCache::new(size)).is_err() {
98 0 : panic!("page cache already initialized");
99 575 : }
100 575 : }
101 :
102 : ///
103 : /// Get a handle to the page cache.
104 : ///
105 399446033 : pub fn get() -> &'static PageCache {
106 399446033 : //
107 399446033 : // In unit tests, page server startup doesn't happen and no one calls
108 399446033 : // page_cache::init(). Initialize it here with a tiny cache, so that the
109 399446033 : // page cache is usable in unit tests.
110 399446033 : //
111 399446033 : if cfg!(test) {
112 1651689 : PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
113 : } else {
114 397794344 : PAGE_CACHE.get().expect("page cache not initialized")
115 : }
116 399446033 : }
117 :
118 : pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
119 : const MAX_USAGE_COUNT: u8 = 5;
120 :
121 : /// See module-level comment.
122 784690923 : #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
123 : pub struct FileId(u64);
124 :
125 : static NEXT_ID: AtomicU64 = AtomicU64::new(1);
126 :
127 : /// See module-level comment.
128 19009 : pub fn next_file_id() -> FileId {
129 19009 : FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
130 19009 : }
131 :
132 : ///
133 : /// CacheKey uniquely identifies a "thing" to cache in the page cache.
134 : ///
135 417416954 : #[derive(Debug, PartialEq, Eq, Clone)]
136 : #[allow(clippy::enum_variant_names)]
137 : enum CacheKey {
138 : MaterializedPage {
139 : hash_key: MaterializedPageHashKey,
140 : lsn: Lsn,
141 : },
142 : ImmutableFilePage {
143 : file_id: FileId,
144 : blkno: u32,
145 : },
146 : }
147 :
148 15268817 : #[derive(Debug, PartialEq, Eq, Hash, Clone)]
149 : struct MaterializedPageHashKey {
150 : tenant_id: TenantId,
151 : timeline_id: TimelineId,
152 : key: Key,
153 : }
154 :
155 0 : #[derive(Clone)]
156 : struct Version {
157 : lsn: Lsn,
158 : slot_idx: usize,
159 : }
160 :
161 : struct Slot {
162 : inner: tokio::sync::RwLock<SlotInner>,
163 : usage_count: AtomicU8,
164 : }
165 :
166 : struct SlotInner {
167 : key: Option<CacheKey>,
168 : buf: &'static mut [u8; PAGE_SZ],
169 : }
170 :
171 : impl Slot {
172 : /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
173 387218430 : fn inc_usage_count(&self) {
174 387218430 : let _ = self
175 387218430 : .usage_count
176 387219677 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
177 387219677 : if val == MAX_USAGE_COUNT {
178 345664814 : None
179 : } else {
180 41554863 : Some(val + 1)
181 : }
182 387219677 : });
183 387218430 : }
184 :
185 : /// Decrement usage count on the buffer, unless it's already zero. Returns
186 : /// the old usage count.
187 61375494 : fn dec_usage_count(&self) -> u8 {
188 61375494 : let count_res =
189 61375494 : self.usage_count
190 61375521 : .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
191 61375521 : if val == 0 {
192 13717901 : None
193 : } else {
194 47657620 : Some(val - 1)
195 : }
196 61375521 : });
197 61375494 :
198 61375494 : match count_res {
199 47657600 : Ok(usage_count) => usage_count,
200 13717894 : Err(usage_count) => usage_count,
201 : }
202 61375494 : }
203 :
204 : /// Sets the usage count to a specific value.
205 13717435 : fn set_usage_count(&self, count: u8) {
206 13717435 : self.usage_count.store(count, Ordering::Relaxed);
207 13717435 : }
208 : }
209 :
210 : pub struct PageCache {
211 : /// This contains the mapping from the cache key to buffer slot that currently
212 : /// contains the page, if any.
213 : ///
214 : /// TODO: This is protected by a single lock. If that becomes a bottleneck,
215 : /// this HashMap can be replaced with a more concurrent version, there are
216 : /// plenty of such crates around.
217 : ///
218 : /// If you add support for caching different kinds of objects, each object kind
219 : /// can have a separate mapping map, next to this field.
220 : materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
221 :
222 : immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
223 :
224 : /// The actual buffers with their metadata.
225 : slots: Box<[Slot]>,
226 :
227 : /// Index of the next candidate to evict, for the Clock replacement algorithm.
228 : /// This is interpreted modulo the page cache size.
229 : next_evict_slot: AtomicUsize,
230 :
231 : size_metrics: &'static PageCacheSizeMetrics,
232 : }
233 :
234 : ///
235 : /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
236 : /// until the guard is dropped.
237 : ///
238 : pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
239 :
240 : impl std::ops::Deref for PageReadGuard<'_> {
241 : type Target = [u8; PAGE_SZ];
242 :
243 786427816 : fn deref(&self) -> &Self::Target {
244 786427816 : self.0.buf
245 786427816 : }
246 : }
247 :
248 : impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
249 0 : fn as_ref(&self) -> &[u8; PAGE_SZ] {
250 0 : self.0.buf
251 0 : }
252 : }
253 :
254 : ///
255 : /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
256 : /// until the guard is dropped.
257 : ///
258 : /// Counterintuitively, this is used even for a read, if the requested page is not
259 : /// currently found in the page cache. In that case, the caller of lock_for_read()
260 : /// is expected to fill in the page contents and call mark_valid(). Similarly
261 : /// lock_for_write() can return an invalid buffer that the caller is expected to
262 : /// to initialize.
263 : ///
264 : pub struct PageWriteGuard<'i> {
265 : inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
266 :
267 : // Are the page contents currently valid?
268 : // Used to mark pages as invalid that are assigned but not yet filled with data.
269 : valid: bool,
270 : }
271 :
272 : impl std::ops::DerefMut for PageWriteGuard<'_> {
273 13717435 : fn deref_mut(&mut self) -> &mut Self::Target {
274 13717435 : self.inner.buf
275 13717435 : }
276 : }
277 :
278 : impl std::ops::Deref for PageWriteGuard<'_> {
279 : type Target = [u8; PAGE_SZ];
280 :
281 3721 : fn deref(&self) -> &Self::Target {
282 3721 : self.inner.buf
283 3721 : }
284 : }
285 :
286 : impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
287 0 : fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
288 0 : self.inner.buf
289 0 : }
290 : }
291 :
292 : impl PageWriteGuard<'_> {
293 : /// Mark that the buffer contents are now valid.
294 13717423 : pub fn mark_valid(&mut self) {
295 13717423 : assert!(self.inner.key.is_some());
296 13717423 : assert!(
297 13717423 : !self.valid,
298 0 : "mark_valid called on a buffer that was already valid"
299 : );
300 13717423 : self.valid = true;
301 13717423 : }
302 : }
303 :
304 : impl Drop for PageWriteGuard<'_> {
305 : ///
306 : /// If the buffer was allocated for a page that was not already in the
307 : /// cache, but the lock_for_read/write() caller dropped the buffer without
308 : /// initializing it, remove the mapping from the page cache.
309 : ///
310 13721145 : fn drop(&mut self) {
311 13721145 : assert!(self.inner.key.is_some());
312 13721145 : if !self.valid {
313 11 : let self_key = self.inner.key.as_ref().unwrap();
314 11 : PAGE_CACHE.get().unwrap().remove_mapping(self_key);
315 11 : self.inner.key = None;
316 13721134 : }
317 13721145 : }
318 : }
319 :
320 : /// lock_for_read() return value
321 : pub enum ReadBufResult<'a> {
322 : Found(PageReadGuard<'a>),
323 : NotFound(PageWriteGuard<'a>),
324 : }
325 :
326 : /// lock_for_write() return value
327 : pub enum WriteBufResult<'a> {
328 : Found(PageWriteGuard<'a>),
329 : NotFound(PageWriteGuard<'a>),
330 : }
331 :
332 : impl PageCache {
333 : //
334 : // Section 1.1: Public interface functions for looking up and memorizing materialized page
335 : // versions in the page cache
336 : //
337 :
338 : /// Look up a materialized page version.
339 : ///
340 : /// The 'lsn' is an upper bound, this will return the latest version of
341 : /// the given block, but not newer than 'lsn'. Returns the actual LSN of the
342 : /// returned page.
343 7262299 : pub async fn lookup_materialized_page(
344 7262299 : &self,
345 7262299 : tenant_id: TenantId,
346 7262299 : timeline_id: TimelineId,
347 7262299 : key: &Key,
348 7262299 : lsn: Lsn,
349 7262300 : ) -> Option<(Lsn, PageReadGuard)> {
350 7262300 : crate::metrics::PAGE_CACHE
351 7262300 : .read_accesses_materialized_page
352 7262300 : .inc();
353 7262300 :
354 7262300 : let mut cache_key = CacheKey::MaterializedPage {
355 7262300 : hash_key: MaterializedPageHashKey {
356 7262300 : tenant_id,
357 7262300 : timeline_id,
358 7262300 : key: *key,
359 7262300 : },
360 7262300 : lsn,
361 7262300 : };
362 :
363 7262300 : if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
364 : if let CacheKey::MaterializedPage {
365 : hash_key: _,
366 1740445 : lsn: available_lsn,
367 1740445 : } = cache_key
368 : {
369 1740445 : if available_lsn == lsn {
370 40 : crate::metrics::PAGE_CACHE
371 40 : .read_hits_materialized_page_exact
372 40 : .inc();
373 1740405 : } else {
374 1740405 : crate::metrics::PAGE_CACHE
375 1740405 : .read_hits_materialized_page_older_lsn
376 1740405 : .inc();
377 1740405 : }
378 1740445 : Some((available_lsn, guard))
379 : } else {
380 0 : panic!("unexpected key type in slot");
381 : }
382 : } else {
383 5521855 : None
384 : }
385 7262300 : }
386 :
387 : ///
388 : /// Store an image of the given page in the cache.
389 : ///
390 2758635 : pub async fn memorize_materialized_page(
391 2758635 : &self,
392 2758635 : tenant_id: TenantId,
393 2758635 : timeline_id: TimelineId,
394 2758635 : key: Key,
395 2758635 : lsn: Lsn,
396 2758635 : img: &[u8],
397 2758635 : ) -> anyhow::Result<()> {
398 2758635 : let cache_key = CacheKey::MaterializedPage {
399 2758635 : hash_key: MaterializedPageHashKey {
400 2758635 : tenant_id,
401 2758635 : timeline_id,
402 2758635 : key,
403 2758635 : },
404 2758635 : lsn,
405 2758635 : };
406 2758635 :
407 2758635 : match self.lock_for_write(&cache_key).await? {
408 3721 : WriteBufResult::Found(write_guard) => {
409 3721 : // We already had it in cache. Another thread must've put it there
410 3721 : // concurrently. Check that it had the same contents that we
411 3721 : // replayed.
412 3721 : assert!(*write_guard == img);
413 : }
414 2754914 : WriteBufResult::NotFound(mut write_guard) => {
415 2754914 : write_guard.copy_from_slice(img);
416 2754914 : write_guard.mark_valid();
417 2754914 : }
418 : }
419 :
420 2758635 : Ok(())
421 2758635 : }
422 :
423 : // Section 1.2: Public interface functions for working with immutable file pages.
424 :
425 396436785 : pub async fn read_immutable_buf(
426 396436785 : &self,
427 396436785 : file_id: FileId,
428 396436785 : blkno: u32,
429 396437058 : ) -> anyhow::Result<ReadBufResult> {
430 396437058 : let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
431 396437058 :
432 396437058 : self.lock_for_read(&mut cache_key).await
433 396437055 : }
434 :
435 : //
436 : // Section 2: Internal interface functions for lookup/update.
437 : //
438 : // To add support for a new kind of "thing" to cache, you will need
439 : // to add public interface routines above, and code to deal with the
440 : // "mappings" after this section. But the routines in this section should
441 : // not require changes.
442 :
443 : /// Look up a page in the cache.
444 : ///
445 : /// If the search criteria is not exact, *cache_key is updated with the key
446 : /// for exact key of the returned page. (For materialized pages, that means
447 : /// that the LSN in 'cache_key' is updated with the LSN of the returned page
448 : /// version.)
449 : ///
450 : /// If no page is found, returns None and *cache_key is left unmodified.
451 : ///
452 403699793 : async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
453 403699793 : let cache_key_orig = cache_key.clone();
454 403699793 : if let Some(slot_idx) = self.search_mapping(cache_key) {
455 : // The page was found in the mapping. Lock the slot, and re-check
456 : // that it's still what we expected (because we released the mapping
457 : // lock already, another thread could have evicted the page)
458 387214987 : let slot = &self.slots[slot_idx];
459 387214987 : let inner = slot.inner.read().await;
460 387214984 : if inner.key.as_ref() == Some(cache_key) {
461 387214972 : slot.inc_usage_count();
462 387214972 : return Some(PageReadGuard(inner));
463 12 : } else {
464 12 : // search_mapping might have modified the search key; restore it.
465 12 : *cache_key = cache_key_orig;
466 12 : }
467 16484806 : }
468 16484818 : None
469 403699790 : }
470 :
471 : /// Return a locked buffer for given block.
472 : ///
473 : /// Like try_lock_for_read(), if the search criteria is not exact and the
474 : /// page is already found in the cache, *cache_key is updated.
475 : ///
476 : /// If the page is not found in the cache, this allocates a new buffer for
477 : /// it. The caller may then initialize the buffer with the contents, and
478 : /// call mark_valid().
479 : ///
480 : /// Example usage:
481 : ///
482 : /// ```ignore
483 : /// let cache = page_cache::get();
484 : ///
485 : /// match cache.lock_for_read(&key) {
486 : /// ReadBufResult::Found(read_guard) => {
487 : /// // The page was found in cache. Use it
488 : /// },
489 : /// ReadBufResult::NotFound(write_guard) => {
490 : /// // The page was not found in cache. Read it from disk into the
491 : /// // buffer.
492 : /// //read_my_page_from_disk(write_guard);
493 : ///
494 : /// // The buffer contents are now valid. Tell the page cache.
495 : /// write_guard.mark_valid();
496 : /// },
497 : /// }
498 : /// ```
499 : ///
500 396437058 : async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
501 396437058 : let (read_access, hit) = match cache_key {
502 : CacheKey::MaterializedPage { .. } => {
503 0 : unreachable!("Materialized pages use lookup_materialized_page")
504 : }
505 396437058 : CacheKey::ImmutableFilePage { .. } => (
506 396437058 : &crate::metrics::PAGE_CACHE.read_accesses_immutable,
507 396437058 : &crate::metrics::PAGE_CACHE.read_hits_immutable,
508 396437058 : ),
509 396437058 : };
510 396437058 : read_access.inc();
511 396437058 :
512 396437058 : let mut is_first_iteration = true;
513 : loop {
514 : // First check if the key already exists in the cache.
515 396437493 : if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
516 385474527 : if is_first_iteration {
517 385474092 : hit.inc();
518 385474092 : }
519 385474527 : return Ok(ReadBufResult::Found(read_guard));
520 10962963 : }
521 10962963 : is_first_iteration = false;
522 :
523 : // Not found. Find a victim buffer
524 10962963 : let (slot_idx, mut inner) =
525 10962963 : self.find_victim().context("Failed to find evict victim")?;
526 :
527 : // Insert mapping for this. At this point, we may find that another
528 : // thread did the same thing concurrently. In that case, we evicted
529 : // our victim buffer unnecessarily. Put it into the free list and
530 : // continue with the slot that the other thread chose.
531 10962963 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
532 : // TODO: put to free list
533 :
534 : // We now just loop back to start from beginning. This is not
535 : // optimal, we'll perform the lookup in the mapping again, which
536 : // is not really necessary because we already got
537 : // 'existing_slot_idx'. But this shouldn't happen often enough
538 : // to matter much.
539 435 : continue;
540 10962528 : }
541 10962528 :
542 10962528 : // Make the slot ready
543 10962528 : let slot = &self.slots[slot_idx];
544 10962528 : inner.key = Some(cache_key.clone());
545 10962528 : slot.set_usage_count(1);
546 10962528 :
547 10962528 : return Ok(ReadBufResult::NotFound(PageWriteGuard {
548 10962528 : inner,
549 10962528 : valid: false,
550 10962528 : }));
551 : }
552 396437055 : }
553 :
554 : /// Look up a page in the cache and lock it in write mode. If it's not
555 : /// found, returns None.
556 : ///
557 : /// When locking a page for writing, the search criteria is always "exact".
558 2758653 : async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
559 2758653 : if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
560 : // The page was found in the mapping. Lock the slot, and re-check
561 : // that it's still what we expected (because we don't released the mapping
562 : // lock already, another thread could have evicted the page)
563 3721 : let slot = &self.slots[slot_idx];
564 3721 : let inner = slot.inner.write().await;
565 3721 : if inner.key.as_ref() == Some(cache_key) {
566 3721 : slot.inc_usage_count();
567 3721 : return Some(PageWriteGuard { inner, valid: true });
568 0 : }
569 2754932 : }
570 2754932 : None
571 2758653 : }
572 :
573 : /// Return a write-locked buffer for given block.
574 : ///
575 : /// Similar to lock_for_read(), but the returned buffer is write-locked and
576 : /// may be modified by the caller even if it's already found in the cache.
577 2758635 : async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
578 : loop {
579 : // First check if the key already exists in the cache.
580 2758653 : if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
581 3721 : return Ok(WriteBufResult::Found(write_guard));
582 2754932 : }
583 :
584 : // Not found. Find a victim buffer
585 2754932 : let (slot_idx, mut inner) =
586 2754932 : self.find_victim().context("Failed to find evict victim")?;
587 :
588 : // Insert mapping for this. At this point, we may find that another
589 : // thread did the same thing concurrently. In that case, we evicted
590 : // our victim buffer unnecessarily. Put it into the free list and
591 : // continue with the slot that the other thread chose.
592 2754932 : if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
593 : // TODO: put to free list
594 :
595 : // We now just loop back to start from beginning. This is not
596 : // optimal, we'll perform the lookup in the mapping again, which
597 : // is not really necessary because we already got
598 : // 'existing_slot_idx'. But this shouldn't happen often enough
599 : // to matter much.
600 18 : continue;
601 2754914 : }
602 2754914 :
603 2754914 : // Make the slot ready
604 2754914 : let slot = &self.slots[slot_idx];
605 2754914 : inner.key = Some(cache_key.clone());
606 2754914 : slot.set_usage_count(1);
607 2754914 :
608 2754914 : return Ok(WriteBufResult::NotFound(PageWriteGuard {
609 2754914 : inner,
610 2754914 : valid: false,
611 2754914 : }));
612 : }
613 2758635 : }
614 :
615 : //
616 : // Section 3: Mapping functions
617 : //
618 :
619 : /// Search for a page in the cache using the given search key.
620 : ///
621 : /// Returns the slot index, if any. If the search criteria is not exact,
622 : /// *cache_key is updated with the actual key of the found page.
623 : ///
624 : /// NOTE: We don't hold any lock on the mapping on return, so the slot might
625 : /// get recycled for an unrelated page immediately after this function
626 : /// returns. The caller is responsible for re-checking that the slot still
627 : /// contains the page with the same key before using it.
628 : ///
629 403699519 : fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
630 403699519 : match cache_key {
631 7262299 : CacheKey::MaterializedPage { hash_key, lsn } => {
632 7262299 : let map = self.materialized_page_map.read().unwrap();
633 7262299 : let versions = map.get(hash_key)?;
634 :
635 1986120 : let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
636 40 : Ok(version_idx) => version_idx,
637 1787 : Err(0) => return None,
638 1740408 : Err(version_idx) => version_idx - 1,
639 : };
640 1740448 : let version = &versions[version_idx];
641 1740448 : *lsn = version.lsn;
642 1740448 : Some(version.slot_idx)
643 : }
644 396437220 : CacheKey::ImmutableFilePage { file_id, blkno } => {
645 396437220 : let map = self.immutable_page_map.read().unwrap();
646 396437220 : Some(*map.get(&(*file_id, *blkno))?)
647 : }
648 : }
649 403699519 : }
650 :
651 : /// Search for a page in the cache using the given search key.
652 : ///
653 : /// Like 'search_mapping, but performs an "exact" search. Used for
654 : /// allocating a new buffer.
655 2758653 : fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
656 2758653 : match key {
657 2758653 : CacheKey::MaterializedPage { hash_key, lsn } => {
658 2758653 : let map = self.materialized_page_map.read().unwrap();
659 2758653 : let versions = map.get(hash_key)?;
660 :
661 719054 : if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
662 3721 : Some(versions[version_idx].slot_idx)
663 : } else {
664 542620 : None
665 : }
666 : }
667 0 : CacheKey::ImmutableFilePage { file_id, blkno } => {
668 0 : let map = self.immutable_page_map.read().unwrap();
669 0 : Some(*map.get(&(*file_id, *blkno))?)
670 : }
671 : }
672 2758653 : }
673 :
674 : ///
675 : /// Remove mapping for given key.
676 : ///
677 10815562 : fn remove_mapping(&self, old_key: &CacheKey) {
678 10815562 : match old_key {
679 : CacheKey::MaterializedPage {
680 2496672 : hash_key: old_hash_key,
681 2496672 : lsn: old_lsn,
682 2496672 : } => {
683 2496672 : let mut map = self.materialized_page_map.write().unwrap();
684 2496672 : if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
685 2496672 : let versions = old_entry.get_mut();
686 :
687 3070504 : if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
688 2496672 : versions.remove(version_idx);
689 2496672 : self.size_metrics
690 2496672 : .current_bytes_materialized_page
691 2496672 : .sub_page_sz(1);
692 2496672 : if versions.is_empty() {
693 2018601 : old_entry.remove_entry();
694 2018601 : }
695 0 : }
696 : } else {
697 0 : panic!("could not find old key in mapping")
698 : }
699 : }
700 8318890 : CacheKey::ImmutableFilePage { file_id, blkno } => {
701 8318890 : let mut map = self.immutable_page_map.write().unwrap();
702 8318890 : map.remove(&(*file_id, *blkno))
703 8318890 : .expect("could not find old key in mapping");
704 8318890 : self.size_metrics.current_bytes_immutable.sub_page_sz(1);
705 8318890 : }
706 : }
707 10815562 : }
708 :
709 : ///
710 : /// Insert mapping for given key.
711 : ///
712 : /// If a mapping already existed for the given key, returns the slot index
713 : /// of the existing mapping and leaves it untouched.
714 13717888 : fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
715 13717888 : match new_key {
716 : CacheKey::MaterializedPage {
717 2754932 : hash_key: new_key,
718 2754932 : lsn: new_lsn,
719 2754932 : } => {
720 2754932 : let mut map = self.materialized_page_map.write().unwrap();
721 2754932 : let versions = map.entry(new_key.clone()).or_default();
722 2754932 : match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
723 18 : Ok(version_idx) => Some(versions[version_idx].slot_idx),
724 2754914 : Err(version_idx) => {
725 2754914 : versions.insert(
726 2754914 : version_idx,
727 2754914 : Version {
728 2754914 : lsn: *new_lsn,
729 2754914 : slot_idx,
730 2754914 : },
731 2754914 : );
732 2754914 : self.size_metrics
733 2754914 : .current_bytes_materialized_page
734 2754914 : .add_page_sz(1);
735 2754914 : None
736 : }
737 : }
738 : }
739 :
740 10962956 : CacheKey::ImmutableFilePage { file_id, blkno } => {
741 10962956 : let mut map = self.immutable_page_map.write().unwrap();
742 10962956 : match map.entry((*file_id, *blkno)) {
743 435 : Entry::Occupied(entry) => Some(*entry.get()),
744 10962521 : Entry::Vacant(entry) => {
745 10962521 : entry.insert(slot_idx);
746 10962521 : self.size_metrics.current_bytes_immutable.add_page_sz(1);
747 10962521 : None
748 : }
749 : }
750 : }
751 : }
752 13717888 : }
753 :
754 : //
755 : // Section 4: Misc internal helpers
756 : //
757 :
758 : /// Find a slot to evict.
759 : ///
760 : /// On return, the slot is empty and write-locked.
761 13717888 : fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
762 13717888 : let iter_limit = self.slots.len() * 10;
763 13717888 : let mut iters = 0;
764 61375494 : loop {
765 61375494 : iters += 1;
766 61375494 : let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
767 61375494 :
768 61375494 : let slot = &self.slots[slot_idx];
769 61375494 :
770 61375494 : if slot.dec_usage_count() == 0 {
771 13717894 : let mut inner = match slot.inner.try_write() {
772 13717888 : Ok(inner) => inner,
773 6 : Err(_err) => {
774 6 : // If we have looped through the whole buffer pool 10 times
775 6 : // and still haven't found a victim buffer, something's wrong.
776 6 : // Maybe all the buffers were in locked. That could happen in
777 6 : // theory, if you have more threads holding buffers locked than
778 6 : // there are buffers in the pool. In practice, with a reasonably
779 6 : // large buffer pool it really shouldn't happen.
780 6 : if iters > iter_limit {
781 0 : anyhow::bail!("exceeded evict iter limit");
782 6 : }
783 6 : continue;
784 : }
785 : };
786 13717888 : if let Some(old_key) = &inner.key {
787 10815551 : // remove mapping for old buffer
788 10815551 : self.remove_mapping(old_key);
789 10815551 : inner.key = None;
790 10815551 : }
791 13717888 : return Ok((slot_idx, inner));
792 47657600 : }
793 : }
794 13717888 : }
795 :
796 : /// Initialize a new page cache
797 : ///
798 : /// This should be called only once at page server startup.
799 576 : fn new(num_pages: usize) -> Self {
800 576 : assert!(num_pages > 0, "page cache size must be > 0");
801 :
802 : // We use Box::leak here and into_boxed_slice to avoid leaking uninitialized
803 : // memory that Vec's might contain.
804 576 : let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
805 576 :
806 576 : let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
807 576 : size_metrics.max_bytes.set_page_sz(num_pages);
808 576 : size_metrics.current_bytes_immutable.set_page_sz(0);
809 576 : size_metrics.current_bytes_materialized_page.set_page_sz(0);
810 576 :
811 576 : let slots = page_buffer
812 576 : .chunks_exact_mut(PAGE_SZ)
813 4687206 : .map(|chunk| {
814 4687206 : let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
815 4687206 :
816 4687206 : Slot {
817 4687206 : inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
818 4687206 : usage_count: AtomicU8::new(0),
819 4687206 : }
820 4687206 : })
821 576 : .collect();
822 576 :
823 576 : Self {
824 576 : materialized_page_map: Default::default(),
825 576 : immutable_page_map: Default::default(),
826 576 : slots,
827 576 : next_evict_slot: AtomicUsize::new(0),
828 576 : size_metrics,
829 576 : }
830 576 : }
831 : }
832 :
833 : trait PageSzBytesMetric {
834 : fn set_page_sz(&self, count: usize);
835 : fn add_page_sz(&self, count: usize);
836 : fn sub_page_sz(&self, count: usize);
837 : }
838 :
839 : #[inline(always)]
840 24534710 : fn count_times_page_sz(count: usize) -> u64 {
841 24534710 : u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
842 24534710 : }
843 :
844 : impl PageSzBytesMetric for metrics::UIntGauge {
845 1728 : fn set_page_sz(&self, count: usize) {
846 1728 : self.set(count_times_page_sz(count));
847 1728 : }
848 13717426 : fn add_page_sz(&self, count: usize) {
849 13717426 : self.add(count_times_page_sz(count));
850 13717426 : }
851 10815556 : fn sub_page_sz(&self, count: usize) {
852 10815556 : self.sub(count_times_page_sz(count));
853 10815556 : }
854 : }
|