Line data Source code
1 : use std::collections::HashMap;
2 : use std::ops::Deref;
3 : use std::sync::Arc;
4 : use std::time::Instant;
5 :
6 : use anyhow::Result;
7 : use bytes::{Buf, BytesMut};
8 : use futures::future::BoxFuture;
9 : use parking_lot::Mutex;
10 : use postgres_ffi::waldecoder::WalStreamDecoder;
11 : use postgres_ffi::{PgMajorVersion, XLogSegNo};
12 : use safekeeper::metrics::WalStorageMetrics;
13 : use safekeeper::state::TimelinePersistentState;
14 : use safekeeper::{control_file, wal_storage};
15 : use tracing::{debug, info};
16 : use utils::id::TenantTimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use super::block_storage::BlockStorage;
20 :
21 : /// All safekeeper state that is usually saved to disk.
22 : pub struct SafekeeperDisk {
23 : pub timelines: Mutex<HashMap<TenantTimelineId, Arc<TimelineDisk>>>,
24 : }
25 :
26 : impl Default for SafekeeperDisk {
27 0 : fn default() -> Self {
28 0 : Self::new()
29 0 : }
30 : }
31 :
32 : impl SafekeeperDisk {
33 1524 : pub fn new() -> Self {
34 1524 : SafekeeperDisk {
35 1524 : timelines: Mutex::new(HashMap::new()),
36 1524 : }
37 1524 : }
38 :
39 1443 : pub fn put_state(
40 1443 : &self,
41 1443 : ttid: &TenantTimelineId,
42 1443 : state: TimelinePersistentState,
43 1443 : ) -> Arc<TimelineDisk> {
44 1443 : self.timelines
45 1443 : .lock()
46 1443 : .entry(*ttid)
47 1443 : .and_modify(|e| {
48 0 : let mut mu = e.state.lock();
49 0 : *mu = state.clone();
50 0 : })
51 1443 : .or_insert_with(|| {
52 1443 : Arc::new(TimelineDisk {
53 1443 : state: Mutex::new(state),
54 1443 : wal: Mutex::new(BlockStorage::new()),
55 1443 : })
56 1443 : })
57 1443 : .clone()
58 1443 : }
59 : }
60 :
61 : /// Control file state and WAL storage.
62 : pub struct TimelineDisk {
63 : pub state: Mutex<TimelinePersistentState>,
64 : pub wal: Mutex<BlockStorage>,
65 : }
66 :
67 : /// Implementation of `control_file::Storage` trait.
68 : pub struct DiskStateStorage {
69 : persisted_state: TimelinePersistentState,
70 : disk: Arc<TimelineDisk>,
71 : last_persist_at: Instant,
72 : }
73 :
74 : impl DiskStateStorage {
75 8736 : pub fn new(disk: Arc<TimelineDisk>) -> Self {
76 8736 : let guard = disk.state.lock();
77 8736 : let state = guard.clone();
78 8736 : drop(guard);
79 8736 : DiskStateStorage {
80 8736 : persisted_state: state,
81 8736 : disk,
82 8736 : last_persist_at: Instant::now(),
83 8736 : }
84 8736 : }
85 : }
86 :
87 : impl control_file::Storage for DiskStateStorage {
88 : /// Persist safekeeper state on disk and update internal state.
89 4438 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
90 4438 : self.persisted_state = s.clone();
91 4438 : *self.disk.state.lock() = s.clone();
92 4438 : Ok(())
93 4438 : }
94 :
95 : /// Timestamp of last persist.
96 0 : fn last_persist_at(&self) -> Instant {
97 : // TODO: don't rely on it in tests
98 0 : self.last_persist_at
99 0 : }
100 : }
101 :
102 : impl Deref for DiskStateStorage {
103 : type Target = TimelinePersistentState;
104 :
105 348215 : fn deref(&self) -> &Self::Target {
106 348215 : &self.persisted_state
107 348215 : }
108 : }
109 :
110 : /// Implementation of `wal_storage::Storage` trait.
111 : pub struct DiskWALStorage {
112 : /// Written to disk, but possibly still in the cache and not fully persisted.
113 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
114 : write_lsn: Lsn,
115 :
116 : /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
117 : write_record_lsn: Lsn,
118 :
119 : /// The LSN of the last WAL record flushed to disk.
120 : flush_record_lsn: Lsn,
121 :
122 : /// Decoder is required for detecting boundaries of WAL records.
123 : decoder: WalStreamDecoder,
124 :
125 : /// Bytes of WAL records that are not yet written to disk.
126 : unflushed_bytes: BytesMut,
127 :
128 : /// Contains BlockStorage for WAL.
129 : disk: Arc<TimelineDisk>,
130 : }
131 :
132 : impl DiskWALStorage {
133 8736 : pub fn new(disk: Arc<TimelineDisk>, state: &TimelinePersistentState) -> Result<Self> {
134 8736 : let write_lsn = if state.commit_lsn == Lsn(0) {
135 7972 : Lsn(0)
136 : } else {
137 764 : Self::find_end_of_wal(disk.clone(), state.commit_lsn)?
138 : };
139 :
140 8736 : let flush_lsn = write_lsn;
141 8736 : Ok(DiskWALStorage {
142 8736 : write_lsn,
143 8736 : write_record_lsn: flush_lsn,
144 8736 : flush_record_lsn: flush_lsn,
145 8736 : decoder: WalStreamDecoder::new(flush_lsn, PgMajorVersion::PG16),
146 8736 : unflushed_bytes: BytesMut::new(),
147 8736 : disk,
148 8736 : })
149 8736 : }
150 :
151 764 : fn find_end_of_wal(disk: Arc<TimelineDisk>, start_lsn: Lsn) -> Result<Lsn> {
152 764 : let mut buf = [0; 8192];
153 764 : let mut pos = start_lsn.0;
154 764 : let mut decoder = WalStreamDecoder::new(start_lsn, PgMajorVersion::PG16);
155 764 : let mut result = start_lsn;
156 : loop {
157 764 : disk.wal.lock().read(pos, &mut buf);
158 764 : pos += buf.len() as u64;
159 764 : decoder.feed_bytes(&buf);
160 :
161 : loop {
162 4673 : match decoder.poll_decode() {
163 3909 : Ok(Some(record)) => result = record.0,
164 764 : Err(e) => {
165 764 : debug!(
166 0 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
167 : result, e
168 : );
169 764 : return Ok(result);
170 : }
171 0 : Ok(None) => break, // need more data
172 : }
173 : }
174 : }
175 764 : }
176 : }
177 :
178 : impl wal_storage::Storage for DiskWALStorage {
179 : // Last written LSN.
180 3961 : fn write_lsn(&self) -> Lsn {
181 3961 : self.write_lsn
182 3961 : }
183 : /// LSN of last durably stored WAL record.
184 30852 : fn flush_lsn(&self) -> Lsn {
185 30852 : self.flush_record_lsn
186 30852 : }
187 :
188 189 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
189 189 : Ok(())
190 189 : }
191 :
192 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
193 1063 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
194 1063 : if self.write_lsn != startpos {
195 0 : panic!("write_wal called with wrong startpos");
196 1063 : }
197 :
198 1063 : self.unflushed_bytes.extend_from_slice(buf);
199 1063 : self.write_lsn += buf.len() as u64;
200 :
201 1063 : if self.decoder.available() != startpos {
202 0 : info!(
203 0 : "restart decoder from {} to {}",
204 0 : self.decoder.available(),
205 : startpos,
206 : );
207 0 : self.decoder = WalStreamDecoder::new(startpos, PgMajorVersion::PG16);
208 1063 : }
209 1063 : self.decoder.feed_bytes(buf);
210 : loop {
211 33548 : match self.decoder.poll_decode()? {
212 1063 : None => break, // no full record yet
213 32485 : Some((lsn, _rec)) => {
214 32485 : self.write_record_lsn = lsn;
215 32485 : }
216 : }
217 : }
218 :
219 1063 : Ok(())
220 1063 : }
221 :
222 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
223 951 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
224 951 : if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
225 0 : panic!(
226 0 : "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
227 : self.write_lsn, end_pos
228 : );
229 951 : }
230 :
231 951 : self.flush_wal().await?;
232 :
233 : // write zeroes to disk from end_pos until self.write_lsn
234 951 : let buf = [0; 8192];
235 951 : let mut pos = end_pos.0;
236 957 : while pos < self.write_lsn.0 {
237 6 : self.disk.wal.lock().write(pos, &buf);
238 6 : pos += buf.len() as u64;
239 6 : }
240 :
241 951 : self.write_lsn = end_pos;
242 951 : self.write_record_lsn = end_pos;
243 951 : self.flush_record_lsn = end_pos;
244 951 : self.unflushed_bytes.clear();
245 951 : self.decoder = WalStreamDecoder::new(end_pos, PgMajorVersion::PG16);
246 :
247 951 : Ok(())
248 951 : }
249 :
250 : /// Durably store WAL on disk, up to the last written WAL record.
251 7811 : async fn flush_wal(&mut self) -> Result<()> {
252 7811 : if self.flush_record_lsn == self.write_record_lsn {
253 : // no need to do extra flush
254 6754 : return Ok(());
255 1057 : }
256 :
257 1057 : let num_bytes = self.write_record_lsn.0 - self.flush_record_lsn.0;
258 :
259 1057 : self.disk.wal.lock().write(
260 1057 : self.flush_record_lsn.0,
261 1057 : &self.unflushed_bytes[..num_bytes as usize],
262 1057 : );
263 1057 : self.unflushed_bytes.advance(num_bytes as usize);
264 1057 : self.flush_record_lsn = self.write_record_lsn;
265 :
266 1057 : Ok(())
267 7811 : }
268 :
269 : /// Remove all segments <= given segno. Returns function doing that as we
270 : /// want to perform it without timeline lock.
271 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
272 0 : Box::pin(async move { Ok(()) })
273 0 : }
274 :
275 : /// Release resources associated with the storage -- technically, close FDs.
276 : /// Currently we don't remove timelines until restart (#3146), so need to
277 : /// spare descriptors. This would be useful for temporary tli detach as
278 : /// well.
279 0 : fn close(&mut self) {}
280 :
281 : /// Get metrics for this timeline.
282 0 : fn get_metrics(&self) -> WalStorageMetrics {
283 0 : WalStorageMetrics::default()
284 0 : }
285 : }
|