LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - safekeeper_disk.rs (source / functions) Coverage Total Hit
Test: 2b0730d767f560e20b6748f57465922aa8bb805e.info Lines: 82.8 % 157 130
Test Date: 2024-09-25 14:04:07 Functions: 71.8 % 78 56

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::sync::Arc;
       3              : 
       4              : use parking_lot::Mutex;
       5              : use safekeeper::state::TimelinePersistentState;
       6              : use utils::id::TenantTimelineId;
       7              : 
       8              : use super::block_storage::BlockStorage;
       9              : 
      10              : use std::{ops::Deref, time::Instant};
      11              : 
      12              : use anyhow::Result;
      13              : use bytes::{Buf, BytesMut};
      14              : use futures::future::BoxFuture;
      15              : use postgres_ffi::{waldecoder::WalStreamDecoder, XLogSegNo};
      16              : use safekeeper::{control_file, metrics::WalStorageMetrics, wal_storage};
      17              : use tracing::{debug, info};
      18              : use utils::lsn::Lsn;
      19              : 
      20              : /// All safekeeper state that is usually saved to disk.
      21              : pub struct SafekeeperDisk {
      22              :     pub timelines: Mutex<HashMap<TenantTimelineId, Arc<TimelineDisk>>>,
      23              : }
      24              : 
      25              : impl Default for SafekeeperDisk {
      26            0 :     fn default() -> Self {
      27            0 :         Self::new()
      28            0 :     }
      29              : }
      30              : 
      31              : impl SafekeeperDisk {
      32         6024 :     pub fn new() -> Self {
      33         6024 :         SafekeeperDisk {
      34         6024 :             timelines: Mutex::new(HashMap::new()),
      35         6024 :         }
      36         6024 :     }
      37              : 
      38         5722 :     pub fn put_state(
      39         5722 :         &self,
      40         5722 :         ttid: &TenantTimelineId,
      41         5722 :         state: TimelinePersistentState,
      42         5722 :     ) -> Arc<TimelineDisk> {
      43         5722 :         self.timelines
      44         5722 :             .lock()
      45         5722 :             .entry(*ttid)
      46         5722 :             .and_modify(|e| {
      47            0 :                 let mut mu = e.state.lock();
      48            0 :                 *mu = state.clone();
      49         5722 :             })
      50         5722 :             .or_insert_with(|| {
      51         5722 :                 Arc::new(TimelineDisk {
      52         5722 :                     state: Mutex::new(state),
      53         5722 :                     wal: Mutex::new(BlockStorage::new()),
      54         5722 :                 })
      55         5722 :             })
      56         5722 :             .clone()
      57         5722 :     }
      58              : }
      59              : 
      60              : /// Control file state and WAL storage.
      61              : pub struct TimelineDisk {
      62              :     pub state: Mutex<TimelinePersistentState>,
      63              :     pub wal: Mutex<BlockStorage>,
      64              : }
      65              : 
      66              : /// Implementation of `control_file::Storage` trait.
      67              : pub struct DiskStateStorage {
      68              :     persisted_state: TimelinePersistentState,
      69              :     disk: Arc<TimelineDisk>,
      70              :     last_persist_at: Instant,
      71              : }
      72              : 
      73              : impl DiskStateStorage {
      74        35173 :     pub fn new(disk: Arc<TimelineDisk>) -> Self {
      75        35173 :         let guard = disk.state.lock();
      76        35173 :         let state = guard.clone();
      77        35173 :         drop(guard);
      78        35173 :         DiskStateStorage {
      79        35173 :             persisted_state: state,
      80        35173 :             disk,
      81        35173 :             last_persist_at: Instant::now(),
      82        35173 :         }
      83        35173 :     }
      84              : }
      85              : 
      86              : impl control_file::Storage for DiskStateStorage {
      87              :     /// Persist safekeeper state on disk and update internal state.
      88        16227 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
      89        16227 :         self.persisted_state = s.clone();
      90        16227 :         *self.disk.state.lock() = s.clone();
      91        16227 :         Ok(())
      92        16227 :     }
      93              : 
      94              :     /// Timestamp of last persist.
      95            0 :     fn last_persist_at(&self) -> Instant {
      96            0 :         // TODO: don't rely on it in tests
      97            0 :         self.last_persist_at
      98            0 :     }
      99              : }
     100              : 
     101              : impl Deref for DiskStateStorage {
     102              :     type Target = TimelinePersistentState;
     103              : 
     104      1016638 :     fn deref(&self) -> &Self::Target {
     105      1016638 :         &self.persisted_state
     106      1016638 :     }
     107              : }
     108              : 
     109              : /// Implementation of `wal_storage::Storage` trait.
     110              : pub struct DiskWALStorage {
     111              :     /// Written to disk, but possibly still in the cache and not fully persisted.
     112              :     /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
     113              :     write_lsn: Lsn,
     114              : 
     115              :     /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
     116              :     write_record_lsn: Lsn,
     117              : 
     118              :     /// The LSN of the last WAL record flushed to disk.
     119              :     flush_record_lsn: Lsn,
     120              : 
     121              :     /// Decoder is required for detecting boundaries of WAL records.
     122              :     decoder: WalStreamDecoder,
     123              : 
     124              :     /// Bytes of WAL records that are not yet written to disk.
     125              :     unflushed_bytes: BytesMut,
     126              : 
     127              :     /// Contains BlockStorage for WAL.
     128              :     disk: Arc<TimelineDisk>,
     129              : }
     130              : 
     131              : impl DiskWALStorage {
     132        35173 :     pub fn new(disk: Arc<TimelineDisk>, state: &TimelinePersistentState) -> Result<Self> {
     133        35173 :         let write_lsn = if state.commit_lsn == Lsn(0) {
     134        32690 :             Lsn(0)
     135              :         } else {
     136         2483 :             Self::find_end_of_wal(disk.clone(), state.commit_lsn)?
     137              :         };
     138              : 
     139        35173 :         let flush_lsn = write_lsn;
     140        35173 :         Ok(DiskWALStorage {
     141        35173 :             write_lsn,
     142        35173 :             write_record_lsn: flush_lsn,
     143        35173 :             flush_record_lsn: flush_lsn,
     144        35173 :             decoder: WalStreamDecoder::new(flush_lsn, 16),
     145        35173 :             unflushed_bytes: BytesMut::new(),
     146        35173 :             disk,
     147        35173 :         })
     148        35173 :     }
     149              : 
     150         2483 :     fn find_end_of_wal(disk: Arc<TimelineDisk>, start_lsn: Lsn) -> Result<Lsn> {
     151         2483 :         let mut buf = [0; 8192];
     152         2483 :         let mut pos = start_lsn.0;
     153         2483 :         let mut decoder = WalStreamDecoder::new(start_lsn, 16);
     154         2483 :         let mut result = start_lsn;
     155              :         loop {
     156         2483 :             disk.wal.lock().read(pos, &mut buf);
     157         2483 :             pos += buf.len() as u64;
     158         2483 :             decoder.feed_bytes(&buf);
     159              : 
     160              :             loop {
     161        11841 :                 match decoder.poll_decode() {
     162         9358 :                     Ok(Some(record)) => result = record.0,
     163         2483 :                     Err(e) => {
     164         2483 :                         debug!(
     165            0 :                             "find_end_of_wal reached end at {:?}, decode error: {:?}",
     166              :                             result, e
     167              :                         );
     168         2483 :                         return Ok(result);
     169              :                     }
     170            0 :                     Ok(None) => break, // need more data
     171              :                 }
     172              :             }
     173              :         }
     174         2483 :     }
     175              : }
     176              : 
     177              : impl wal_storage::Storage for DiskWALStorage {
     178              :     // Last written LSN.
     179        11818 :     fn write_lsn(&self) -> Lsn {
     180        11818 :         self.write_lsn
     181        11818 :     }
     182              :     /// LSN of last durably stored WAL record.
     183        81928 :     fn flush_lsn(&self) -> Lsn {
     184        81928 :         self.flush_record_lsn
     185        81928 :     }
     186              : 
     187          584 :     async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
     188          584 :         Ok(())
     189          584 :     }
     190              : 
     191              :     /// Write piece of WAL from buf to disk, but not necessarily sync it.
     192         2125 :     async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
     193         2125 :         if self.write_lsn != startpos {
     194            0 :             panic!("write_wal called with wrong startpos");
     195         2125 :         }
     196         2125 : 
     197         2125 :         self.unflushed_bytes.extend_from_slice(buf);
     198         2125 :         self.write_lsn += buf.len() as u64;
     199         2125 : 
     200         2125 :         if self.decoder.available() != startpos {
     201            0 :             info!(
     202            0 :                 "restart decoder from {} to {}",
     203            0 :                 self.decoder.available(),
     204              :                 startpos,
     205              :             );
     206            0 :             self.decoder = WalStreamDecoder::new(startpos, 16);
     207         2125 :         }
     208         2125 :         self.decoder.feed_bytes(buf);
     209              :         loop {
     210        32723 :             match self.decoder.poll_decode()? {
     211         2125 :                 None => break, // no full record yet
     212        30598 :                 Some((lsn, _rec)) => {
     213        30598 :                     self.write_record_lsn = lsn;
     214        30598 :                 }
     215              :             }
     216              :         }
     217              : 
     218         2125 :         Ok(())
     219         2125 :     }
     220              : 
     221              :     /// Truncate WAL at specified LSN, which must be the end of WAL record.
     222         3647 :     async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
     223         3647 :         if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
     224            0 :             panic!(
     225            0 :                 "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
     226            0 :                 self.write_lsn, end_pos
     227            0 :             );
     228         3647 :         }
     229         3647 : 
     230         3647 :         self.flush_wal().await?;
     231              : 
     232              :         // write zeroes to disk from end_pos until self.write_lsn
     233         3647 :         let buf = [0; 8192];
     234         3647 :         let mut pos = end_pos.0;
     235         3657 :         while pos < self.write_lsn.0 {
     236           10 :             self.disk.wal.lock().write(pos, &buf);
     237           10 :             pos += buf.len() as u64;
     238           10 :         }
     239              : 
     240         3647 :         self.write_lsn = end_pos;
     241         3647 :         self.write_record_lsn = end_pos;
     242         3647 :         self.flush_record_lsn = end_pos;
     243         3647 :         self.unflushed_bytes.clear();
     244         3647 :         self.decoder = WalStreamDecoder::new(end_pos, 16);
     245         3647 : 
     246         3647 :         Ok(())
     247         3647 :     }
     248              : 
     249              :     /// Durably store WAL on disk, up to the last written WAL record.
     250        26310 :     async fn flush_wal(&mut self) -> Result<()> {
     251        26310 :         if self.flush_record_lsn == self.write_record_lsn {
     252              :             // no need to do extra flush
     253        24196 :             return Ok(());
     254         2114 :         }
     255         2114 : 
     256         2114 :         let num_bytes = self.write_record_lsn.0 - self.flush_record_lsn.0;
     257         2114 : 
     258         2114 :         self.disk.wal.lock().write(
     259         2114 :             self.flush_record_lsn.0,
     260         2114 :             &self.unflushed_bytes[..num_bytes as usize],
     261         2114 :         );
     262         2114 :         self.unflushed_bytes.advance(num_bytes as usize);
     263         2114 :         self.flush_record_lsn = self.write_record_lsn;
     264         2114 : 
     265         2114 :         Ok(())
     266        26310 :     }
     267              : 
     268              :     /// Remove all segments <= given segno. Returns function doing that as we
     269              :     /// want to perform it without timeline lock.
     270            0 :     fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
     271            0 :         Box::pin(async move { Ok(()) })
     272            0 :     }
     273              : 
     274              :     /// Release resources associated with the storage -- technically, close FDs.
     275              :     /// Currently we don't remove timelines until restart (#3146), so need to
     276              :     /// spare descriptors. This would be useful for temporary tli detach as
     277              :     /// well.
     278            0 :     fn close(&mut self) {}
     279              : 
     280              :     /// Get metrics for this timeline.
     281            0 :     fn get_metrics(&self) -> WalStorageMetrics {
     282            0 :         WalStorageMetrics::default()
     283            0 :     }
     284              : }
        

Generated by: LCOV version 2.1-beta