Line data Source code
1 : use std::{ffi::CStr, sync::Arc};
2 :
3 : use parking_lot::{Mutex, MutexGuard};
4 : use postgres_ffi::v16::wal_generator::{LogicalMessageGenerator, WalGenerator};
5 : use utils::lsn::Lsn;
6 :
7 : use super::block_storage::BlockStorage;
8 :
9 : /// Simulation implementation of walproposer WAL storage.
10 : pub struct DiskWalProposer {
11 : state: Mutex<State>,
12 : }
13 :
14 : impl DiskWalProposer {
15 9341 : pub fn new() -> Arc<DiskWalProposer> {
16 9341 : Arc::new(DiskWalProposer {
17 9341 : state: Mutex::new(State {
18 9341 : internal_available_lsn: Lsn(0),
19 9341 : prev_lsn: Lsn(0),
20 9341 : disk: BlockStorage::new(),
21 9341 : wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
22 9341 : }),
23 9341 : })
24 9341 : }
25 :
26 15991 : pub fn lock(&self) -> MutexGuard<State> {
27 15991 : self.state.lock()
28 15991 : }
29 : }
30 :
31 : pub struct State {
32 : // flush_lsn
33 : internal_available_lsn: Lsn,
34 : // needed for WAL generation
35 : prev_lsn: Lsn,
36 : // actual WAL storage
37 : disk: BlockStorage,
38 : // WAL record generator
39 : wal_generator: WalGenerator<LogicalMessageGenerator>,
40 : }
41 :
42 : impl State {
43 1030 : pub fn read(&self, pos: u64, buf: &mut [u8]) {
44 1030 : self.disk.read(pos, buf);
45 1030 : // TODO: fail on reading uninitialized data
46 1030 : }
47 :
48 303 : pub fn write(&mut self, pos: u64, buf: &[u8]) {
49 303 : self.disk.write(pos, buf);
50 303 : }
51 :
52 : /// Update the internal available LSN to the given value.
53 456 : pub fn reset_to(&mut self, lsn: Lsn) {
54 456 : self.internal_available_lsn = lsn;
55 456 : self.prev_lsn = Lsn(0); // Safekeeper doesn't care if this is omitted
56 456 : self.wal_generator.lsn = self.internal_available_lsn;
57 456 : self.wal_generator.prev_lsn = self.prev_lsn;
58 456 : }
59 :
60 : /// Get current LSN.
61 2190 : pub fn flush_rec_ptr(&self) -> Lsn {
62 2190 : self.internal_available_lsn
63 2190 : }
64 :
65 : /// Inserts a logical record in the WAL at the current LSN.
66 12012 : pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) {
67 12012 : let (_, record) = self.wal_generator.append_logical_message(prefix, msg);
68 12012 : self.disk.write(self.internal_available_lsn.into(), &record);
69 12012 : self.prev_lsn = self.internal_available_lsn;
70 12012 : self.internal_available_lsn += record.len() as u64;
71 12012 : }
72 : }
|