Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::cmp::max;
5 : use std::ops::Deref;
6 : use std::time::SystemTime;
7 :
8 : use anyhow::{Result, bail};
9 : use postgres_ffi::WAL_SEGMENT_SIZE;
10 : use safekeeper_api::membership::Configuration;
11 : use safekeeper_api::models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse};
12 : use safekeeper_api::{INITIAL_TERM, ServerInfo, Term};
13 : use serde::{Deserialize, Serialize};
14 : use tracing::info;
15 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
16 : use utils::lsn::Lsn;
17 :
18 : use crate::control_file;
19 : use crate::safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION};
20 : use crate::timeline::TimelineError;
21 : use crate::wal_backup_partial::{self};
22 :
23 : /// Persistent information stored on safekeeper node about timeline.
24 : /// On disk data is prefixed by magic and format version and followed by checksum.
25 12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26 : pub struct TimelinePersistentState {
27 : #[serde(with = "hex")]
28 : pub tenant_id: TenantId,
29 : #[serde(with = "hex")]
30 : pub timeline_id: TimelineId,
31 : /// Membership configuration.
32 : pub mconf: Configuration,
33 : /// persistent acceptor state
34 : pub acceptor_state: AcceptorState,
35 : /// information about server
36 : pub server: ServerInfo,
37 : /// Unique id of the last *elected* proposer we dealt with. Not needed
38 : /// for correctness, exists for monitoring purposes.
39 : #[serde(with = "hex")]
40 : pub proposer_uuid: PgUuid,
41 : /// Since which LSN this timeline generally starts. Safekeeper might have
42 : /// joined later.
43 : pub timeline_start_lsn: Lsn,
44 : /// Since which LSN safekeeper has (had) WAL for this timeline.
45 : /// All WAL segments next to one containing local_start_lsn are
46 : /// filled with data from the beginning.
47 : pub local_start_lsn: Lsn,
48 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
49 : /// to record boundary.
50 : pub commit_lsn: Lsn,
51 : /// LSN that points to the end of the last backed up segment. Useful to
52 : /// persist to avoid finding out offloading progress on boot.
53 : pub backup_lsn: Lsn,
54 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
55 : /// of last record streamed to everyone). Persisting it helps skipping
56 : /// recovery in walproposer, generally we compute it from peers. In
57 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
58 : /// only by walproposer.
59 : pub peer_horizon_lsn: Lsn,
60 : /// LSN of the oldest known checkpoint made by pageserver and successfully
61 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
62 : /// informational purposes, we receive it from pageserver (or broker).
63 : pub remote_consistent_lsn: Lsn,
64 : /// Holds names of partial segments uploaded to remote storage. Used to
65 : /// clean up old objects without leaving garbage in remote storage.
66 : pub partial_backup: wal_backup_partial::State,
67 : /// Eviction state of the timeline. If it's Offloaded, we should download
68 : /// WAL files from remote storage to serve the timeline.
69 : pub eviction_state: EvictionState,
70 : pub creation_ts: SystemTime,
71 : }
72 :
73 : /// State of the local WAL files. Used to track current timeline state,
74 : /// that can be either WAL files are present on disk or last partial segment
75 : /// is offloaded to remote storage.
76 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
77 : pub enum EvictionState {
78 : /// WAL files are present on disk.
79 : Present,
80 : /// Last partial segment is offloaded to remote storage.
81 : /// Contains flush_lsn of the last offloaded segment.
82 : Offloaded(Lsn),
83 : }
84 :
85 : impl TimelinePersistentState {
86 : /// commit_lsn is the same as start_lsn in the normal creaiton; see
87 : /// `TimelineCreateRequest` comments.`
88 1434 : pub fn new(
89 1434 : ttid: &TenantTimelineId,
90 1434 : mconf: Configuration,
91 1434 : server_info: ServerInfo,
92 1434 : start_lsn: Lsn,
93 1434 : commit_lsn: Lsn,
94 1434 : ) -> anyhow::Result<TimelinePersistentState> {
95 1434 : if server_info.wal_seg_size == 0 {
96 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
97 1434 : }
98 1434 :
99 1434 : if server_info.pg_version == UNKNOWN_SERVER_VERSION {
100 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
101 1434 : }
102 1434 :
103 1434 : if commit_lsn < start_lsn {
104 0 : bail!(
105 0 : "commit_lsn {} is smaller than start_lsn {}",
106 0 : commit_lsn,
107 0 : start_lsn
108 0 : );
109 1434 : }
110 :
111 : // If we are given with init LSN, initialize term history with it. It
112 : // ensures that walproposer always must be able to find a common point
113 : // in histories; if it can't something is corrupted. Not having LSN here
114 : // is so far left for legacy case where timeline is created by compute
115 : // and LSN during creation is not known yet.
116 1434 : let term_history = if commit_lsn != Lsn::INVALID {
117 0 : TermHistory(vec![TermLsn {
118 0 : term: INITIAL_TERM,
119 0 : lsn: start_lsn,
120 0 : }])
121 : } else {
122 1434 : TermHistory::empty()
123 : };
124 :
125 1434 : Ok(TimelinePersistentState {
126 1434 : tenant_id: ttid.tenant_id,
127 1434 : timeline_id: ttid.timeline_id,
128 1434 : mconf,
129 1434 : acceptor_state: AcceptorState {
130 1434 : term: INITIAL_TERM,
131 1434 : term_history,
132 1434 : },
133 1434 : server: server_info,
134 1434 : proposer_uuid: [0; 16],
135 1434 : timeline_start_lsn: start_lsn,
136 1434 : local_start_lsn: start_lsn,
137 1434 : commit_lsn,
138 1434 : backup_lsn: start_lsn,
139 1434 : peer_horizon_lsn: start_lsn,
140 1434 : remote_consistent_lsn: Lsn(0),
141 1434 : partial_backup: wal_backup_partial::State::default(),
142 1434 : eviction_state: EvictionState::Present,
143 1434 : creation_ts: SystemTime::now(),
144 1434 : })
145 1434 : }
146 :
147 10 : pub fn empty() -> Self {
148 10 : TimelinePersistentState::new(
149 10 : &TenantTimelineId::empty(),
150 10 : Configuration::empty(),
151 10 : ServerInfo {
152 10 : pg_version: 170000, /* Postgres server version (major * 10000) */
153 10 : system_id: 0, /* Postgres system identifier */
154 10 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
155 10 : },
156 10 : Lsn::INVALID,
157 10 : Lsn::INVALID,
158 10 : )
159 10 : .unwrap()
160 10 : }
161 : }
162 :
163 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
164 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
165 : // are not flushed yet.
166 : pub struct TimelineMemState {
167 : pub commit_lsn: Lsn,
168 : pub backup_lsn: Lsn,
169 : pub peer_horizon_lsn: Lsn,
170 : pub remote_consistent_lsn: Lsn,
171 : #[serde(with = "hex")]
172 : pub proposer_uuid: PgUuid,
173 : }
174 :
175 : /// Safekeeper persistent state plus in memory layer.
176 : ///
177 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
178 : /// which don't need immediate persistence. Provides transactional like API
179 : /// to atomically update the state.
180 : ///
181 : /// Implements Deref into *persistent* part.
182 : pub struct TimelineState<CTRL: control_file::Storage> {
183 : pub inmem: TimelineMemState,
184 : pub pers: CTRL, // persistent
185 : }
186 :
187 : impl<CTRL> TimelineState<CTRL>
188 : where
189 : CTRL: control_file::Storage,
190 : {
191 8712 : pub fn new(state: CTRL) -> Self {
192 8712 : TimelineState {
193 8712 : inmem: TimelineMemState {
194 8712 : commit_lsn: state.commit_lsn,
195 8712 : backup_lsn: state.backup_lsn,
196 8712 : peer_horizon_lsn: state.peer_horizon_lsn,
197 8712 : remote_consistent_lsn: state.remote_consistent_lsn,
198 8712 : proposer_uuid: state.proposer_uuid,
199 8712 : },
200 8712 : pers: state,
201 8712 : }
202 8712 : }
203 :
204 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
205 : /// values applied; the protocol is to 1) change returned struct as desired
206 : /// 2) atomically persist it with finish_change.
207 4004 : pub fn start_change(&self) -> TimelinePersistentState {
208 4004 : let mut s = self.pers.clone();
209 4004 : s.commit_lsn = self.inmem.commit_lsn;
210 4004 : s.backup_lsn = self.inmem.backup_lsn;
211 4004 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
212 4004 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
213 4004 : s.proposer_uuid = self.inmem.proposer_uuid;
214 4004 : s
215 4004 : }
216 :
217 : /// Persist given state. c.f. start_change.
218 4004 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
219 4004 : if s.eq(&*self.pers) {
220 42 : // nothing to do if state didn't change
221 42 : } else {
222 3962 : self.pers.persist(s).await?;
223 : }
224 :
225 : // keep in memory values up to date
226 4004 : self.inmem.commit_lsn = s.commit_lsn;
227 4004 : self.inmem.backup_lsn = s.backup_lsn;
228 4004 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
229 4004 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
230 4004 : self.inmem.proposer_uuid = s.proposer_uuid;
231 4004 : Ok(())
232 19 : }
233 :
234 : /// Flush in memory values.
235 130 : pub async fn flush(&mut self) -> Result<()> {
236 130 : let s = self.start_change();
237 130 : self.finish_change(&s).await
238 4 : }
239 :
240 : /// Make term at least as `to`. If `to` is None, increment current one. This
241 : /// is not in safekeeper.rs because we want to be able to do it even if
242 : /// timeline is offloaded.
243 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
244 0 : let before = self.acceptor_state.term;
245 0 : let mut state = self.start_change();
246 0 : let new = match to {
247 0 : Some(to) => max(state.acceptor_state.term, to),
248 0 : None => state.acceptor_state.term + 1,
249 : };
250 0 : if new > state.acceptor_state.term {
251 0 : state.acceptor_state.term = new;
252 0 : self.finish_change(&state).await?;
253 0 : }
254 0 : let after = self.acceptor_state.term;
255 0 : Ok(TimelineTermBumpResponse {
256 0 : previous_term: before,
257 0 : current_term: after,
258 0 : })
259 0 : }
260 :
261 : /// Switch into membership configuration `to` if it is higher than the
262 : /// current one.
263 19681 : pub async fn membership_switch(
264 19681 : &mut self,
265 19681 : to: Configuration,
266 19681 : ) -> Result<TimelineMembershipSwitchResponse> {
267 19681 : let before = self.mconf.clone();
268 19681 : // Is switch allowed?
269 19681 : if to.generation <= self.mconf.generation {
270 19681 : info!(
271 0 : "ignoring request to switch membership conf to {}, current conf {}",
272 0 : to, self.mconf
273 : );
274 : } else {
275 0 : let mut state = self.start_change();
276 0 : state.mconf = to.clone();
277 0 : self.finish_change(&state).await?;
278 0 : info!("switched membership conf to {} from {}", to, before);
279 : }
280 19681 : Ok(TimelineMembershipSwitchResponse {
281 19681 : previous_conf: before,
282 19681 : current_conf: self.mconf.clone(),
283 19681 : })
284 0 : }
285 : }
286 :
287 : impl<CTRL> Deref for TimelineState<CTRL>
288 : where
289 : CTRL: control_file::Storage,
290 : {
291 : type Target = TimelinePersistentState;
292 :
293 290427 : fn deref(&self) -> &Self::Target {
294 290427 : &self.pers
295 290427 : }
296 : }
|