Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::{cmp::max, ops::Deref};
5 :
6 : use anyhow::{bail, Result};
7 : use postgres_ffi::WAL_SEGMENT_SIZE;
8 : use safekeeper_api::models::TimelineTermBumpResponse;
9 : use serde::{Deserialize, Serialize};
10 : use utils::{
11 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
12 : lsn::Lsn,
13 : };
14 :
15 : use crate::{
16 : control_file,
17 : safekeeper::{
18 : AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
19 : UNKNOWN_SERVER_VERSION,
20 : },
21 : timeline::TimelineError,
22 : wal_backup_partial::{self},
23 : };
24 :
25 : /// Persistent information stored on safekeeper node about timeline.
26 : /// On disk data is prefixed by magic and format version and followed by checksum.
27 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28 : pub struct TimelinePersistentState {
29 : #[serde(with = "hex")]
30 : pub tenant_id: TenantId,
31 : #[serde(with = "hex")]
32 : pub timeline_id: TimelineId,
33 : /// persistent acceptor state
34 : pub acceptor_state: AcceptorState,
35 : /// information about server
36 : pub server: ServerInfo,
37 : /// Unique id of the last *elected* proposer we dealt with. Not needed
38 : /// for correctness, exists for monitoring purposes.
39 : #[serde(with = "hex")]
40 : pub proposer_uuid: PgUuid,
41 : /// Since which LSN this timeline generally starts. Safekeeper might have
42 : /// joined later.
43 : pub timeline_start_lsn: Lsn,
44 : /// Since which LSN safekeeper has (had) WAL for this timeline.
45 : /// All WAL segments next to one containing local_start_lsn are
46 : /// filled with data from the beginning.
47 : pub local_start_lsn: Lsn,
48 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
49 : /// to record boundary.
50 : pub commit_lsn: Lsn,
51 : /// LSN that points to the end of the last backed up segment. Useful to
52 : /// persist to avoid finding out offloading progress on boot.
53 : pub backup_lsn: Lsn,
54 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
55 : /// of last record streamed to everyone). Persisting it helps skipping
56 : /// recovery in walproposer, generally we compute it from peers. In
57 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
58 : /// only by walproposer.
59 : pub peer_horizon_lsn: Lsn,
60 : /// LSN of the oldest known checkpoint made by pageserver and successfully
61 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
62 : /// informational purposes, we receive it from pageserver (or broker).
63 : pub remote_consistent_lsn: Lsn,
64 : /// Peers and their state as we remember it. Knowing peers themselves is
65 : /// fundamental; but state is saved here only for informational purposes and
66 : /// obviously can be stale. (Currently not saved at all, but let's provision
67 : /// place to have less file version upgrades).
68 : pub peers: PersistedPeers,
69 : /// Holds names of partial segments uploaded to remote storage. Used to
70 : /// clean up old objects without leaving garbage in remote storage.
71 : pub partial_backup: wal_backup_partial::State,
72 : /// Eviction state of the timeline. If it's Offloaded, we should download
73 : /// WAL files from remote storage to serve the timeline.
74 : pub eviction_state: EvictionState,
75 : }
76 :
77 3 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
79 :
80 : /// State of the local WAL files. Used to track current timeline state,
81 : /// that can be either WAL files are present on disk or last partial segment
82 : /// is offloaded to remote storage.
83 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
84 : pub enum EvictionState {
85 : /// WAL files are present on disk.
86 : Present,
87 : /// Last partial segment is offloaded to remote storage.
88 : /// Contains flush_lsn of the last offloaded segment.
89 : Offloaded(Lsn),
90 : }
91 :
92 : impl TimelinePersistentState {
93 1435 : pub fn new(
94 1435 : ttid: &TenantTimelineId,
95 1435 : server_info: ServerInfo,
96 1435 : peers: Vec<NodeId>,
97 1435 : commit_lsn: Lsn,
98 1435 : local_start_lsn: Lsn,
99 1435 : ) -> anyhow::Result<TimelinePersistentState> {
100 1435 : if server_info.wal_seg_size == 0 {
101 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
102 1435 : }
103 1435 :
104 1435 : if server_info.pg_version == UNKNOWN_SERVER_VERSION {
105 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
106 1435 : }
107 1435 :
108 1435 : if commit_lsn < local_start_lsn {
109 0 : bail!(
110 0 : "commit_lsn {} is smaller than local_start_lsn {}",
111 0 : commit_lsn,
112 0 : local_start_lsn
113 0 : );
114 1435 : }
115 1435 :
116 1435 : Ok(TimelinePersistentState {
117 1435 : tenant_id: ttid.tenant_id,
118 1435 : timeline_id: ttid.timeline_id,
119 1435 : acceptor_state: AcceptorState {
120 1435 : term: 0,
121 1435 : term_history: TermHistory::empty(),
122 1435 : },
123 1435 : server: server_info,
124 1435 : proposer_uuid: [0; 16],
125 1435 : timeline_start_lsn: Lsn(0),
126 1435 : local_start_lsn,
127 1435 : commit_lsn,
128 1435 : backup_lsn: local_start_lsn,
129 1435 : peer_horizon_lsn: local_start_lsn,
130 1435 : remote_consistent_lsn: Lsn(0),
131 1435 : peers: PersistedPeers(
132 1435 : peers
133 1435 : .iter()
134 1435 : .map(|p| (*p, PersistedPeerInfo::new()))
135 1435 : .collect(),
136 1435 : ),
137 1435 : partial_backup: wal_backup_partial::State::default(),
138 1435 : eviction_state: EvictionState::Present,
139 1435 : })
140 1435 : }
141 :
142 5 : pub fn empty() -> Self {
143 5 : TimelinePersistentState::new(
144 5 : &TenantTimelineId::empty(),
145 5 : ServerInfo {
146 5 : pg_version: 170000, /* Postgres server version (major * 10000) */
147 5 : system_id: 0, /* Postgres system identifier */
148 5 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
149 5 : },
150 5 : vec![],
151 5 : Lsn::INVALID,
152 5 : Lsn::INVALID,
153 5 : )
154 5 : .unwrap()
155 5 : }
156 : }
157 :
158 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
159 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
160 : // are not flushed yet.
161 : pub struct TimelineMemState {
162 : pub commit_lsn: Lsn,
163 : pub backup_lsn: Lsn,
164 : pub peer_horizon_lsn: Lsn,
165 : pub remote_consistent_lsn: Lsn,
166 : #[serde(with = "hex")]
167 : pub proposer_uuid: PgUuid,
168 : }
169 :
170 : /// Safekeeper persistent state plus in memory layer.
171 : ///
172 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
173 : /// which don't need immediate persistence. Provides transactional like API
174 : /// to atomically update the state.
175 : ///
176 : /// Implements Deref into *persistent* part.
177 : pub struct TimelineState<CTRL: control_file::Storage> {
178 : pub inmem: TimelineMemState,
179 : pub pers: CTRL, // persistent
180 : }
181 :
182 : impl<CTRL> TimelineState<CTRL>
183 : where
184 : CTRL: control_file::Storage,
185 : {
186 8397 : pub fn new(state: CTRL) -> Self {
187 8397 : TimelineState {
188 8397 : inmem: TimelineMemState {
189 8397 : commit_lsn: state.commit_lsn,
190 8397 : backup_lsn: state.backup_lsn,
191 8397 : peer_horizon_lsn: state.peer_horizon_lsn,
192 8397 : remote_consistent_lsn: state.remote_consistent_lsn,
193 8397 : proposer_uuid: state.proposer_uuid,
194 8397 : },
195 8397 : pers: state,
196 8397 : }
197 8397 : }
198 :
199 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
200 : /// values applied; the protocol is to 1) change returned struct as desired
201 : /// 2) atomically persist it with finish_change.
202 3614 : pub fn start_change(&self) -> TimelinePersistentState {
203 3614 : let mut s = self.pers.clone();
204 3614 : s.commit_lsn = self.inmem.commit_lsn;
205 3614 : s.backup_lsn = self.inmem.backup_lsn;
206 3614 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
207 3614 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
208 3614 : s.proposer_uuid = self.inmem.proposer_uuid;
209 3614 : s
210 3614 : }
211 :
212 : /// Persist given state. c.f. start_change.
213 3614 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
214 3614 : if s.eq(&*self.pers) {
215 50 : // nothing to do if state didn't change
216 50 : } else {
217 3564 : self.pers.persist(s).await?;
218 : }
219 :
220 : // keep in memory values up to date
221 3614 : self.inmem.commit_lsn = s.commit_lsn;
222 3614 : self.inmem.backup_lsn = s.backup_lsn;
223 3614 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
224 3614 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
225 3614 : self.inmem.proposer_uuid = s.proposer_uuid;
226 3614 : Ok(())
227 3614 : }
228 :
229 : /// Flush in memory values.
230 99 : pub async fn flush(&mut self) -> Result<()> {
231 99 : let s = self.start_change();
232 99 : self.finish_change(&s).await
233 99 : }
234 :
235 : /// Make term at least as `to`. If `to` is None, increment current one. This
236 : /// is not in safekeeper.rs because we want to be able to do it even if
237 : /// timeline is offloaded.
238 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
239 0 : let before = self.acceptor_state.term;
240 0 : let mut state = self.start_change();
241 0 : let new = match to {
242 0 : Some(to) => max(state.acceptor_state.term, to),
243 0 : None => state.acceptor_state.term + 1,
244 : };
245 0 : if new > state.acceptor_state.term {
246 0 : state.acceptor_state.term = new;
247 0 : self.finish_change(&state).await?;
248 0 : }
249 0 : let after = self.acceptor_state.term;
250 0 : Ok(TimelineTermBumpResponse {
251 0 : previous_term: before,
252 0 : current_term: after,
253 0 : })
254 0 : }
255 : }
256 :
257 : impl<CTRL> Deref for TimelineState<CTRL>
258 : where
259 : CTRL: control_file::Storage,
260 : {
261 : type Target = TimelinePersistentState;
262 :
263 182376 : fn deref(&self) -> &Self::Target {
264 182376 : &self.pers
265 182376 : }
266 : }
|