LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - safekeeper_disk.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 82.8 % 157 130
Test Date: 2025-03-12 18:28:53 Functions: 71.8 % 78 56

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::ops::Deref;
       3              : use std::sync::Arc;
       4              : use std::time::Instant;
       5              : 
       6              : use anyhow::Result;
       7              : use bytes::{Buf, BytesMut};
       8              : use futures::future::BoxFuture;
       9              : use parking_lot::Mutex;
      10              : use postgres_ffi::XLogSegNo;
      11              : use postgres_ffi::waldecoder::WalStreamDecoder;
      12              : use safekeeper::metrics::WalStorageMetrics;
      13              : use safekeeper::state::TimelinePersistentState;
      14              : use safekeeper::{control_file, wal_storage};
      15              : use tracing::{debug, info};
      16              : use utils::id::TenantTimelineId;
      17              : use utils::lsn::Lsn;
      18              : 
      19              : use super::block_storage::BlockStorage;
      20              : 
      21              : /// All safekeeper state that is usually saved to disk.
      22              : pub struct SafekeeperDisk {
      23              :     pub timelines: Mutex<HashMap<TenantTimelineId, Arc<TimelineDisk>>>,
      24              : }
      25              : 
      26              : impl Default for SafekeeperDisk {
      27            0 :     fn default() -> Self {
      28            0 :         Self::new()
      29            0 :     }
      30              : }
      31              : 
      32              : impl SafekeeperDisk {
      33         1524 :     pub fn new() -> Self {
      34         1524 :         SafekeeperDisk {
      35         1524 :             timelines: Mutex::new(HashMap::new()),
      36         1524 :         }
      37         1524 :     }
      38              : 
      39         1424 :     pub fn put_state(
      40         1424 :         &self,
      41         1424 :         ttid: &TenantTimelineId,
      42         1424 :         state: TimelinePersistentState,
      43         1424 :     ) -> Arc<TimelineDisk> {
      44         1424 :         self.timelines
      45         1424 :             .lock()
      46         1424 :             .entry(*ttid)
      47         1424 :             .and_modify(|e| {
      48            0 :                 let mut mu = e.state.lock();
      49            0 :                 *mu = state.clone();
      50         1424 :             })
      51         1424 :             .or_insert_with(|| {
      52         1424 :                 Arc::new(TimelineDisk {
      53         1424 :                     state: Mutex::new(state),
      54         1424 :                     wal: Mutex::new(BlockStorage::new()),
      55         1424 :                 })
      56         1424 :             })
      57         1424 :             .clone()
      58         1424 :     }
      59              : }
      60              : 
      61              : /// Control file state and WAL storage.
      62              : pub struct TimelineDisk {
      63              :     pub state: Mutex<TimelinePersistentState>,
      64              :     pub wal: Mutex<BlockStorage>,
      65              : }
      66              : 
      67              : /// Implementation of `control_file::Storage` trait.
      68              : pub struct DiskStateStorage {
      69              :     persisted_state: TimelinePersistentState,
      70              :     disk: Arc<TimelineDisk>,
      71              :     last_persist_at: Instant,
      72              : }
      73              : 
      74              : impl DiskStateStorage {
      75         8703 :     pub fn new(disk: Arc<TimelineDisk>) -> Self {
      76         8703 :         let guard = disk.state.lock();
      77         8703 :         let state = guard.clone();
      78         8703 :         drop(guard);
      79         8703 :         DiskStateStorage {
      80         8703 :             persisted_state: state,
      81         8703 :             disk,
      82         8703 :             last_persist_at: Instant::now(),
      83         8703 :         }
      84         8703 :     }
      85              : }
      86              : 
      87              : impl control_file::Storage for DiskStateStorage {
      88              :     /// Persist safekeeper state on disk and update internal state.
      89         3943 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
      90         3943 :         self.persisted_state = s.clone();
      91         3943 :         *self.disk.state.lock() = s.clone();
      92         3943 :         Ok(())
      93         3943 :     }
      94              : 
      95              :     /// Timestamp of last persist.
      96            0 :     fn last_persist_at(&self) -> Instant {
      97            0 :         // TODO: don't rely on it in tests
      98            0 :         self.last_persist_at
      99            0 :     }
     100              : }
     101              : 
     102              : impl Deref for DiskStateStorage {
     103              :     type Target = TimelinePersistentState;
     104              : 
     105       342755 :     fn deref(&self) -> &Self::Target {
     106       342755 :         &self.persisted_state
     107       342755 :     }
     108              : }
     109              : 
     110              : /// Implementation of `wal_storage::Storage` trait.
     111              : pub struct DiskWALStorage {
     112              :     /// Written to disk, but possibly still in the cache and not fully persisted.
     113              :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
     114              :     write_lsn: Lsn,
     115              : 
     116              :     /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
     117              :     write_record_lsn: Lsn,
     118              : 
     119              :     /// The LSN of the last WAL record flushed to disk.
     120              :     flush_record_lsn: Lsn,
     121              : 
     122              :     /// Decoder is required for detecting boundaries of WAL records.
     123              :     decoder: WalStreamDecoder,
     124              : 
     125              :     /// Bytes of WAL records that are not yet written to disk.
     126              :     unflushed_bytes: BytesMut,
     127              : 
     128              :     /// Contains BlockStorage for WAL.
     129              :     disk: Arc<TimelineDisk>,
     130              : }
     131              : 
     132              : impl DiskWALStorage {
     133         8703 :     pub fn new(disk: Arc<TimelineDisk>, state: &TimelinePersistentState) -> Result<Self> {
     134         8703 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     135         8091 :             Lsn(0)
     136              :         } else {
     137          612 :             Self::find_end_of_wal(disk.clone(), state.commit_lsn)?
     138              :         };
     139              : 
     140         8703 :         let flush_lsn = write_lsn;
     141         8703 :         Ok(DiskWALStorage {
     142         8703 :             write_lsn,
     143         8703 :             write_record_lsn: flush_lsn,
     144         8703 :             flush_record_lsn: flush_lsn,
     145         8703 :             decoder: WalStreamDecoder::new(flush_lsn, 16),
     146         8703 :             unflushed_bytes: BytesMut::new(),
     147         8703 :             disk,
     148         8703 :         })
     149         8703 :     }
     150              : 
     151          612 :     fn find_end_of_wal(disk: Arc<TimelineDisk>, start_lsn: Lsn) -> Result<Lsn> {
     152          612 :         let mut buf = [0; 8192];
     153          612 :         let mut pos = start_lsn.0;
     154          612 :         let mut decoder = WalStreamDecoder::new(start_lsn, 16);
     155          612 :         let mut result = start_lsn;
     156              :         loop {
     157          612 :             disk.wal.lock().read(pos, &mut buf);
     158          612 :             pos += buf.len() as u64;
     159          612 :             decoder.feed_bytes(&buf);
     160              : 
     161              :             loop {
     162         2736 :                 match decoder.poll_decode() {
     163         2124 :                     Ok(Some(record)) => result = record.0,
     164          612 :                     Err(e) => {
     165          612 :                         debug!(
     166            0 :                             "find_end_of_wal reached end at {:?}, decode error: {:?}",
     167              :                             result, e
     168              :                         );
     169          612 :                         return Ok(result);
     170              :                     }
     171            0 :                     Ok(None) => break, // need more data
     172              :                 }
     173              :             }
     174              :         }
     175          612 :     }
     176              : }
     177              : 
     178              : impl wal_storage::Storage for DiskWALStorage {
     179              :     // Last written LSN.
     180         3551 :     fn write_lsn(&self) -> Lsn {
     181         3551 :         self.write_lsn
     182         3551 :     }
     183              :     /// LSN of last durably stored WAL record.
     184        28486 :     fn flush_lsn(&self) -> Lsn {
     185        28486 :         self.flush_record_lsn
     186        28486 :     }
     187              : 
     188          148 :     async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
     189          148 :         Ok(())
     190          148 :     }
     191              : 
     192              :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
     193          869 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     194          869 :         if self.write_lsn != startpos {
     195            0 :             panic!("write_wal called with wrong startpos");
     196          869 :         }
     197          869 : 
     198          869 :         self.unflushed_bytes.extend_from_slice(buf);
     199          869 :         self.write_lsn += buf.len() as u64;
     200          869 : 
     201          869 :         if self.decoder.available() != startpos {
     202            0 :             info!(
     203            0 :                 "restart decoder from {} to {}",
     204            0 :                 self.decoder.available(),
     205              :                 startpos,
     206              :             );
     207            0 :             self.decoder = WalStreamDecoder::new(startpos, 16);
     208          869 :         }
     209          869 :         self.decoder.feed_bytes(buf);
     210              :         loop {
     211        23136 :             match self.decoder.poll_decode()? {
     212          869 :                 None => break, // no full record yet
     213        22267 :                 Some((lsn, _rec)) => {
     214        22267 :                     self.write_record_lsn = lsn;
     215        22267 :                 }
     216              :             }
     217              :         }
     218              : 
     219          869 :         Ok(())
     220          869 :     }
     221              : 
     222              :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
     223          860 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     224          860 :         if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
     225            0 :             panic!(
     226            0 :                 "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
     227            0 :                 self.write_lsn, end_pos
     228            0 :             );
     229          860 :         }
     230          860 : 
     231          860 :         self.flush_wal().await?;
     232              : 
     233              :         // write zeroes to disk from end_pos until self.write_lsn
     234          860 :         let buf = [0; 8192];
     235          860 :         let mut pos = end_pos.0;
     236          864 :         while pos < self.write_lsn.0 {
     237            4 :             self.disk.wal.lock().write(pos, &buf);
     238            4 :             pos += buf.len() as u64;
     239            4 :         }
     240              : 
     241          860 :         self.write_lsn = end_pos;
     242          860 :         self.write_record_lsn = end_pos;
     243          860 :         self.flush_record_lsn = end_pos;
     244          860 :         self.unflushed_bytes.clear();
     245          860 :         self.decoder = WalStreamDecoder::new(end_pos, 16);
     246          860 : 
     247          860 :         Ok(())
     248          860 :     }
     249              : 
     250              :     /// Durably store WAL on disk, up to the last written WAL record.
     251         7020 :     async fn flush_wal(&mut self) -> Result<()> {
     252         7020 :         if self.flush_record_lsn == self.write_record_lsn {
     253              :             // no need to do extra flush
     254         6157 :             return Ok(());
     255          863 :         }
     256          863 : 
     257          863 :         let num_bytes = self.write_record_lsn.0 - self.flush_record_lsn.0;
     258          863 : 
     259          863 :         self.disk.wal.lock().write(
     260          863 :             self.flush_record_lsn.0,
     261          863 :             &self.unflushed_bytes[..num_bytes as usize],
     262          863 :         );
     263          863 :         self.unflushed_bytes.advance(num_bytes as usize);
     264          863 :         self.flush_record_lsn = self.write_record_lsn;
     265          863 : 
     266          863 :         Ok(())
     267         7020 :     }
     268              : 
     269              :     /// Remove all segments <= given segno. Returns function doing that as we
     270              :     /// want to perform it without timeline lock.
     271            0 :     fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     272            0 :         Box::pin(async move { Ok(()) })
     273            0 :     }
     274              : 
     275              :     /// Release resources associated with the storage -- technically, close FDs.
     276              :     /// Currently we don't remove timelines until restart (#3146), so need to
     277              :     /// spare descriptors. This would be useful for temporary tli detach as
     278              :     /// well.
     279            0 :     fn close(&mut self) {}
     280              : 
     281              :     /// Get metrics for this timeline.
     282            0 :     fn get_metrics(&self) -> WalStorageMetrics {
     283            0 :         WalStorageMetrics::default()
     284            0 :     }
     285              : }
        

Generated by: LCOV version 2.1-beta