Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::{cmp::max, ops::Deref};
5 :
6 : use anyhow::{bail, Result};
7 : use safekeeper_api::models::TimelineTermBumpResponse;
8 : use serde::{Deserialize, Serialize};
9 : use utils::{
10 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
11 : lsn::Lsn,
12 : };
13 :
14 : use crate::{
15 : control_file,
16 : safekeeper::{
17 : AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
18 : UNKNOWN_SERVER_VERSION,
19 : },
20 : timeline::TimelineError,
21 : wal_backup_partial::{self},
22 : };
23 :
24 : /// Persistent information stored on safekeeper node about timeline.
25 : /// On disk data is prefixed by magic and format version and followed by checksum.
26 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27 : pub struct TimelinePersistentState {
28 : #[serde(with = "hex")]
29 : pub tenant_id: TenantId,
30 : #[serde(with = "hex")]
31 : pub timeline_id: TimelineId,
32 : /// persistent acceptor state
33 : pub acceptor_state: AcceptorState,
34 : /// information about server
35 : pub server: ServerInfo,
36 : /// Unique id of the last *elected* proposer we dealt with. Not needed
37 : /// for correctness, exists for monitoring purposes.
38 : #[serde(with = "hex")]
39 : pub proposer_uuid: PgUuid,
40 : /// Since which LSN this timeline generally starts. Safekeeper might have
41 : /// joined later.
42 : pub timeline_start_lsn: Lsn,
43 : /// Since which LSN safekeeper has (had) WAL for this timeline.
44 : /// All WAL segments next to one containing local_start_lsn are
45 : /// filled with data from the beginning.
46 : pub local_start_lsn: Lsn,
47 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
48 : /// to record boundary.
49 : pub commit_lsn: Lsn,
50 : /// LSN that points to the end of the last backed up segment. Useful to
51 : /// persist to avoid finding out offloading progress on boot.
52 : pub backup_lsn: Lsn,
53 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
54 : /// of last record streamed to everyone). Persisting it helps skipping
55 : /// recovery in walproposer, generally we compute it from peers. In
56 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
57 : /// only by walproposer.
58 : pub peer_horizon_lsn: Lsn,
59 : /// LSN of the oldest known checkpoint made by pageserver and successfully
60 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
61 : /// informational purposes, we receive it from pageserver (or broker).
62 : pub remote_consistent_lsn: Lsn,
63 : /// Peers and their state as we remember it. Knowing peers themselves is
64 : /// fundamental; but state is saved here only for informational purposes and
65 : /// obviously can be stale. (Currently not saved at all, but let's provision
66 : /// place to have less file version upgrades).
67 : pub peers: PersistedPeers,
68 : /// Holds names of partial segments uploaded to remote storage. Used to
69 : /// clean up old objects without leaving garbage in remote storage.
70 : pub partial_backup: wal_backup_partial::State,
71 : /// Eviction state of the timeline. If it's Offloaded, we should download
72 : /// WAL files from remote storage to serve the timeline.
73 : pub eviction_state: EvictionState,
74 : }
75 :
76 4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
78 :
79 : /// State of the local WAL files. Used to track current timeline state,
80 : /// that can be either WAL files are present on disk or last partial segment
81 : /// is offloaded to remote storage.
82 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
83 : pub enum EvictionState {
84 : /// WAL files are present on disk.
85 : Present,
86 : /// Last partial segment is offloaded to remote storage.
87 : /// Contains flush_lsn of the last offloaded segment.
88 : Offloaded(Lsn),
89 : }
90 :
91 : impl TimelinePersistentState {
92 1446 : pub fn new(
93 1446 : ttid: &TenantTimelineId,
94 1446 : server_info: ServerInfo,
95 1446 : peers: Vec<NodeId>,
96 1446 : commit_lsn: Lsn,
97 1446 : local_start_lsn: Lsn,
98 1446 : ) -> anyhow::Result<TimelinePersistentState> {
99 1446 : if server_info.wal_seg_size == 0 {
100 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
101 1446 : }
102 1446 :
103 1446 : if server_info.pg_version == UNKNOWN_SERVER_VERSION {
104 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
105 1446 : }
106 1446 :
107 1446 : if commit_lsn < local_start_lsn {
108 0 : bail!(
109 0 : "commit_lsn {} is smaller than local_start_lsn {}",
110 0 : commit_lsn,
111 0 : local_start_lsn
112 0 : );
113 1446 : }
114 1446 :
115 1446 : Ok(TimelinePersistentState {
116 1446 : tenant_id: ttid.tenant_id,
117 1446 : timeline_id: ttid.timeline_id,
118 1446 : acceptor_state: AcceptorState {
119 1446 : term: 0,
120 1446 : term_history: TermHistory::empty(),
121 1446 : },
122 1446 : server: server_info,
123 1446 : proposer_uuid: [0; 16],
124 1446 : timeline_start_lsn: Lsn(0),
125 1446 : local_start_lsn,
126 1446 : commit_lsn,
127 1446 : backup_lsn: local_start_lsn,
128 1446 : peer_horizon_lsn: local_start_lsn,
129 1446 : remote_consistent_lsn: Lsn(0),
130 1446 : peers: PersistedPeers(
131 1446 : peers
132 1446 : .iter()
133 1446 : .map(|p| (*p, PersistedPeerInfo::new()))
134 1446 : .collect(),
135 1446 : ),
136 1446 : partial_backup: wal_backup_partial::State::default(),
137 1446 : eviction_state: EvictionState::Present,
138 1446 : })
139 1446 : }
140 :
141 : #[cfg(test)]
142 5 : pub fn empty() -> Self {
143 5 : TimelinePersistentState::new(
144 5 : &TenantTimelineId::empty(),
145 5 : ServerInfo {
146 5 : pg_version: 17, /* Postgres server version */
147 5 : system_id: 0, /* Postgres system identifier */
148 5 : wal_seg_size: 16 * 1024 * 1024,
149 5 : },
150 5 : vec![],
151 5 : Lsn::INVALID,
152 5 : Lsn::INVALID,
153 5 : )
154 5 : .unwrap()
155 5 : }
156 : }
157 :
158 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
159 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
160 : // are not flushed yet.
161 : pub struct TimelineMemState {
162 : pub commit_lsn: Lsn,
163 : pub backup_lsn: Lsn,
164 : pub peer_horizon_lsn: Lsn,
165 : pub remote_consistent_lsn: Lsn,
166 : #[serde(with = "hex")]
167 : pub proposer_uuid: PgUuid,
168 : }
169 :
170 : /// Safekeeper persistent state plus in memory layer.
171 : ///
172 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
173 : /// which don't need immediate persistence. Provides transactional like API
174 : /// to atomically update the state.
175 : ///
176 : /// Implements Deref into *persistent* part.
177 : pub struct TimelineState<CTRL: control_file::Storage> {
178 : pub inmem: TimelineMemState,
179 : pub pers: CTRL, // persistent
180 : }
181 :
182 : impl<CTRL> TimelineState<CTRL>
183 : where
184 : CTRL: control_file::Storage,
185 : {
186 8903 : pub fn new(state: CTRL) -> Self {
187 8903 : TimelineState {
188 8903 : inmem: TimelineMemState {
189 8903 : commit_lsn: state.commit_lsn,
190 8903 : backup_lsn: state.backup_lsn,
191 8903 : peer_horizon_lsn: state.peer_horizon_lsn,
192 8903 : remote_consistent_lsn: state.remote_consistent_lsn,
193 8903 : proposer_uuid: state.proposer_uuid,
194 8903 : },
195 8903 : pers: state,
196 8903 : }
197 8903 : }
198 :
199 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
200 : /// values applied; the protocol is to 1) change returned struct as desired
201 : /// 2) atomically persist it with finish_change.
202 3517 : pub fn start_change(&self) -> TimelinePersistentState {
203 3517 : let mut s = self.pers.clone();
204 3517 : s.commit_lsn = self.inmem.commit_lsn;
205 3517 : s.backup_lsn = self.inmem.backup_lsn;
206 3517 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
207 3517 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
208 3517 : s.proposer_uuid = self.inmem.proposer_uuid;
209 3517 : s
210 3517 : }
211 :
212 : /// Persist given state. c.f. start_change.
213 3517 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
214 3517 : if s.eq(&*self.pers) {
215 50 : // nothing to do if state didn't change
216 50 : } else {
217 3467 : self.pers.persist(s).await?;
218 : }
219 :
220 : // keep in memory values up to date
221 3517 : self.inmem.commit_lsn = s.commit_lsn;
222 3517 : self.inmem.backup_lsn = s.backup_lsn;
223 3517 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
224 3517 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
225 3517 : self.inmem.proposer_uuid = s.proposer_uuid;
226 3517 : Ok(())
227 3517 : }
228 :
229 : /// Flush in memory values.
230 77 : pub async fn flush(&mut self) -> Result<()> {
231 77 : let s = self.start_change();
232 77 : self.finish_change(&s).await
233 77 : }
234 :
235 : /// Make term at least as `to`. If `to` is None, increment current one. This
236 : /// is not in safekeeper.rs because we want to be able to do it even if
237 : /// timeline is offloaded.
238 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
239 0 : let before = self.acceptor_state.term;
240 0 : let mut state = self.start_change();
241 0 : let new = match to {
242 0 : Some(to) => max(state.acceptor_state.term, to),
243 0 : None => state.acceptor_state.term + 1,
244 : };
245 0 : if new > state.acceptor_state.term {
246 0 : state.acceptor_state.term = new;
247 0 : self.finish_change(&state).await?;
248 0 : }
249 0 : let after = self.acceptor_state.term;
250 0 : Ok(TimelineTermBumpResponse {
251 0 : previous_term: before,
252 0 : current_term: after,
253 0 : })
254 0 : }
255 : }
256 :
257 : impl<CTRL> Deref for TimelineState<CTRL>
258 : where
259 : CTRL: control_file::Storage,
260 : {
261 : type Target = TimelinePersistentState;
262 :
263 192586 : fn deref(&self) -> &Self::Target {
264 192586 : &self.pers
265 192586 : }
266 : }
|