Line data Source code
1 : use std::ffi::CStr;
2 : use std::sync::Arc;
3 :
4 : use parking_lot::{Mutex, MutexGuard};
5 : use postgres_ffi::v16::wal_generator::{LogicalMessageGenerator, WalGenerator};
6 : use utils::lsn::Lsn;
7 :
8 : use super::block_storage::BlockStorage;
9 :
10 : /// Simulation implementation of walproposer WAL storage.
11 : pub struct DiskWalProposer {
12 : state: Mutex<State>,
13 : }
14 :
15 : impl DiskWalProposer {
16 9338 : pub fn new() -> Arc<DiskWalProposer> {
17 9338 : Arc::new(DiskWalProposer {
18 9338 : state: Mutex::new(State {
19 9338 : internal_available_lsn: Lsn(0),
20 9338 : prev_lsn: Lsn(0),
21 9338 : disk: BlockStorage::new(),
22 9338 : wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
23 9338 : }),
24 9338 : })
25 9338 : }
26 :
27 15313 : pub fn lock(&self) -> MutexGuard<State> {
28 15313 : self.state.lock()
29 15313 : }
30 : }
31 :
32 : pub struct State {
33 : // flush_lsn
34 : internal_available_lsn: Lsn,
35 : // needed for WAL generation
36 : prev_lsn: Lsn,
37 : // actual WAL storage
38 : disk: BlockStorage,
39 : // WAL record generator
40 : wal_generator: WalGenerator<LogicalMessageGenerator>,
41 : }
42 :
43 : impl State {
44 1027 : pub fn read(&self, pos: u64, buf: &mut [u8]) {
45 1027 : self.disk.read(pos, buf);
46 1027 : // TODO: fail on reading uninitialized data
47 1027 : }
48 :
49 193 : pub fn write(&mut self, pos: u64, buf: &[u8]) {
50 193 : self.disk.write(pos, buf);
51 193 : }
52 :
53 : /// Update the internal available LSN to the given value.
54 353 : pub fn reset_to(&mut self, lsn: Lsn) {
55 353 : self.internal_available_lsn = lsn;
56 353 : self.prev_lsn = Lsn(0); // Safekeeper doesn't care if this is omitted
57 353 : self.wal_generator.lsn = self.internal_available_lsn;
58 353 : self.wal_generator.prev_lsn = self.prev_lsn;
59 353 : }
60 :
61 : /// Get current LSN.
62 2011 : pub fn flush_rec_ptr(&self) -> Lsn {
63 2011 : self.internal_available_lsn
64 2011 : }
65 :
66 : /// Inserts a logical record in the WAL at the current LSN.
67 11729 : pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) {
68 11729 : let (_, record) = self.wal_generator.append_logical_message(prefix, msg);
69 11729 : self.disk.write(self.internal_available_lsn.into(), &record);
70 11729 : self.prev_lsn = self.internal_available_lsn;
71 11729 : self.internal_available_lsn += record.len() as u64;
72 11729 : }
73 : }
|