Line data Source code
1 : use std::collections::HashMap;
2 : use std::sync::Arc;
3 :
4 : use parking_lot::Mutex;
5 : use safekeeper::state::TimelinePersistentState;
6 : use utils::id::TenantTimelineId;
7 :
8 : use super::block_storage::BlockStorage;
9 :
10 : use std::{ops::Deref, time::Instant};
11 :
12 : use anyhow::Result;
13 : use bytes::{Buf, BytesMut};
14 : use futures::future::BoxFuture;
15 : use postgres_ffi::{waldecoder::WalStreamDecoder, XLogSegNo};
16 : use safekeeper::{control_file, metrics::WalStorageMetrics, wal_storage};
17 : use tracing::{debug, info};
18 : use utils::lsn::Lsn;
19 :
20 : /// All safekeeper state that is usually saved to disk.
21 : pub struct SafekeeperDisk {
22 : pub timelines: Mutex<HashMap<TenantTimelineId, Arc<TimelineDisk>>>,
23 : }
24 :
25 : impl Default for SafekeeperDisk {
26 0 : fn default() -> Self {
27 0 : Self::new()
28 0 : }
29 : }
30 :
31 : impl SafekeeperDisk {
32 6024 : pub fn new() -> Self {
33 6024 : SafekeeperDisk {
34 6024 : timelines: Mutex::new(HashMap::new()),
35 6024 : }
36 6024 : }
37 :
38 5714 : pub fn put_state(
39 5714 : &self,
40 5714 : ttid: &TenantTimelineId,
41 5714 : state: TimelinePersistentState,
42 5714 : ) -> Arc<TimelineDisk> {
43 5714 : self.timelines
44 5714 : .lock()
45 5714 : .entry(*ttid)
46 5714 : .and_modify(|e| {
47 0 : let mut mu = e.state.lock();
48 0 : *mu = state.clone();
49 5714 : })
50 5714 : .or_insert_with(|| {
51 5714 : Arc::new(TimelineDisk {
52 5714 : state: Mutex::new(state),
53 5714 : wal: Mutex::new(BlockStorage::new()),
54 5714 : })
55 5714 : })
56 5714 : .clone()
57 5714 : }
58 : }
59 :
60 : /// Control file state and WAL storage.
61 : pub struct TimelineDisk {
62 : pub state: Mutex<TimelinePersistentState>,
63 : pub wal: Mutex<BlockStorage>,
64 : }
65 :
66 : /// Implementation of `control_file::Storage` trait.
67 : pub struct DiskStateStorage {
68 : persisted_state: TimelinePersistentState,
69 : disk: Arc<TimelineDisk>,
70 : last_persist_at: Instant,
71 : }
72 :
73 : impl DiskStateStorage {
74 34770 : pub fn new(disk: Arc<TimelineDisk>) -> Self {
75 34770 : let guard = disk.state.lock();
76 34770 : let state = guard.clone();
77 34770 : drop(guard);
78 34770 : DiskStateStorage {
79 34770 : persisted_state: state,
80 34770 : disk,
81 34770 : last_persist_at: Instant::now(),
82 34770 : }
83 34770 : }
84 : }
85 :
86 : impl control_file::Storage for DiskStateStorage {
87 : /// Persist safekeeper state on disk and update internal state.
88 16444 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
89 16444 : self.persisted_state = s.clone();
90 16444 : *self.disk.state.lock() = s.clone();
91 16444 : Ok(())
92 16444 : }
93 :
94 : /// Timestamp of last persist.
95 0 : fn last_persist_at(&self) -> Instant {
96 0 : // TODO: don't rely on it in tests
97 0 : self.last_persist_at
98 0 : }
99 : }
100 :
101 : impl Deref for DiskStateStorage {
102 : type Target = TimelinePersistentState;
103 :
104 1016691 : fn deref(&self) -> &Self::Target {
105 1016691 : &self.persisted_state
106 1016691 : }
107 : }
108 :
109 : /// Implementation of `wal_storage::Storage` trait.
110 : pub struct DiskWALStorage {
111 : /// Written to disk, but possibly still in the cache and not fully persisted.
112 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
113 : write_lsn: Lsn,
114 :
115 : /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
116 : write_record_lsn: Lsn,
117 :
118 : /// The LSN of the last WAL record flushed to disk.
119 : flush_record_lsn: Lsn,
120 :
121 : /// Decoder is required for detecting boundaries of WAL records.
122 : decoder: WalStreamDecoder,
123 :
124 : /// Bytes of WAL records that are not yet written to disk.
125 : unflushed_bytes: BytesMut,
126 :
127 : /// Contains BlockStorage for WAL.
128 : disk: Arc<TimelineDisk>,
129 : }
130 :
131 : impl DiskWALStorage {
132 34770 : pub fn new(disk: Arc<TimelineDisk>, state: &TimelinePersistentState) -> Result<Self> {
133 34770 : let write_lsn = if state.commit_lsn == Lsn(0) {
134 32377 : Lsn(0)
135 : } else {
136 2393 : Self::find_end_of_wal(disk.clone(), state.commit_lsn)?
137 : };
138 :
139 34770 : let flush_lsn = write_lsn;
140 34770 : Ok(DiskWALStorage {
141 34770 : write_lsn,
142 34770 : write_record_lsn: flush_lsn,
143 34770 : flush_record_lsn: flush_lsn,
144 34770 : decoder: WalStreamDecoder::new(flush_lsn, 16),
145 34770 : unflushed_bytes: BytesMut::new(),
146 34770 : disk,
147 34770 : })
148 34770 : }
149 :
150 2393 : fn find_end_of_wal(disk: Arc<TimelineDisk>, start_lsn: Lsn) -> Result<Lsn> {
151 2393 : let mut buf = [0; 8192];
152 2393 : let mut pos = start_lsn.0;
153 2393 : let mut decoder = WalStreamDecoder::new(start_lsn, 16);
154 2393 : let mut result = start_lsn;
155 : loop {
156 2393 : disk.wal.lock().read(pos, &mut buf);
157 2393 : pos += buf.len() as u64;
158 2393 : decoder.feed_bytes(&buf);
159 :
160 : loop {
161 11621 : match decoder.poll_decode() {
162 9228 : Ok(Some(record)) => result = record.0,
163 2393 : Err(e) => {
164 2393 : debug!(
165 0 : "find_end_of_wal reached end at {:?}, decode error: {:?}",
166 : result, e
167 : );
168 2393 : return Ok(result);
169 : }
170 0 : Ok(None) => break, // need more data
171 : }
172 : }
173 : }
174 2393 : }
175 : }
176 :
177 : impl wal_storage::Storage for DiskWALStorage {
178 : // Last written LSN.
179 13151 : fn write_lsn(&self) -> Lsn {
180 13151 : self.write_lsn
181 13151 : }
182 : /// LSN of last durably stored WAL record.
183 84355 : fn flush_lsn(&self) -> Lsn {
184 84355 : self.flush_record_lsn
185 84355 : }
186 :
187 563 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
188 563 : Ok(())
189 563 : }
190 :
191 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
192 2310 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
193 2310 : if self.write_lsn != startpos {
194 0 : panic!("write_wal called with wrong startpos");
195 2310 : }
196 2310 :
197 2310 : self.unflushed_bytes.extend_from_slice(buf);
198 2310 : self.write_lsn += buf.len() as u64;
199 2310 :
200 2310 : if self.decoder.available() != startpos {
201 0 : info!(
202 0 : "restart decoder from {} to {}",
203 0 : self.decoder.available(),
204 : startpos,
205 : );
206 0 : self.decoder = WalStreamDecoder::new(startpos, 16);
207 2310 : }
208 2310 : self.decoder.feed_bytes(buf);
209 : loop {
210 33858 : match self.decoder.poll_decode()? {
211 2310 : None => break, // no full record yet
212 31548 : Some((lsn, _rec)) => {
213 31548 : self.write_record_lsn = lsn;
214 31548 : }
215 : }
216 : }
217 :
218 2310 : Ok(())
219 2310 : }
220 :
221 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
222 3913 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
223 3913 : if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
224 0 : panic!(
225 0 : "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
226 0 : self.write_lsn, end_pos
227 0 : );
228 3913 : }
229 3913 :
230 3913 : self.flush_wal().await?;
231 :
232 : // write zeroes to disk from end_pos until self.write_lsn
233 3913 : let buf = [0; 8192];
234 3913 : let mut pos = end_pos.0;
235 3930 : while pos < self.write_lsn.0 {
236 17 : self.disk.wal.lock().write(pos, &buf);
237 17 : pos += buf.len() as u64;
238 17 : }
239 :
240 3913 : self.write_lsn = end_pos;
241 3913 : self.write_record_lsn = end_pos;
242 3913 : self.flush_record_lsn = end_pos;
243 3913 : self.unflushed_bytes.clear();
244 3913 : self.decoder = WalStreamDecoder::new(end_pos, 16);
245 3913 :
246 3913 : Ok(())
247 3913 : }
248 :
249 : /// Durably store WAL on disk, up to the last written WAL record.
250 27735 : async fn flush_wal(&mut self) -> Result<()> {
251 27735 : if self.flush_record_lsn == self.write_record_lsn {
252 : // no need to do extra flush
253 25437 : return Ok(());
254 2298 : }
255 2298 :
256 2298 : let num_bytes = self.write_record_lsn.0 - self.flush_record_lsn.0;
257 2298 :
258 2298 : self.disk.wal.lock().write(
259 2298 : self.flush_record_lsn.0,
260 2298 : &self.unflushed_bytes[..num_bytes as usize],
261 2298 : );
262 2298 : self.unflushed_bytes.advance(num_bytes as usize);
263 2298 : self.flush_record_lsn = self.write_record_lsn;
264 2298 :
265 2298 : Ok(())
266 27735 : }
267 :
268 : /// Remove all segments <= given segno. Returns function doing that as we
269 : /// want to perform it without timeline lock.
270 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
271 0 : Box::pin(async move { Ok(()) })
272 0 : }
273 :
274 : /// Release resources associated with the storage -- technically, close FDs.
275 : /// Currently we don't remove timelines until restart (#3146), so need to
276 : /// spare descriptors. This would be useful for temporary tli detach as
277 : /// well.
278 0 : fn close(&mut self) {}
279 :
280 : /// Get metrics for this timeline.
281 0 : fn get_metrics(&self) -> WalStorageMetrics {
282 0 : WalStorageMetrics::default()
283 0 : }
284 : }
|