Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::{cmp::max, ops::Deref, time::SystemTime};
5 :
6 : use anyhow::{bail, Result};
7 : use postgres_ffi::WAL_SEGMENT_SIZE;
8 : use safekeeper_api::{
9 : membership::Configuration,
10 : models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse},
11 : ServerInfo, Term, INITIAL_TERM,
12 : };
13 : use serde::{Deserialize, Serialize};
14 : use tracing::info;
15 : use utils::{
16 : id::{TenantId, TenantTimelineId, TimelineId},
17 : lsn::Lsn,
18 : };
19 :
20 : use crate::{
21 : control_file,
22 : safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION},
23 : timeline::TimelineError,
24 : wal_backup_partial::{self},
25 : };
26 :
27 : /// Persistent information stored on safekeeper node about timeline.
28 : /// On disk data is prefixed by magic and format version and followed by checksum.
29 12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
30 : pub struct TimelinePersistentState {
31 : #[serde(with = "hex")]
32 : pub tenant_id: TenantId,
33 : #[serde(with = "hex")]
34 : pub timeline_id: TimelineId,
35 : /// Membership configuration.
36 : pub mconf: Configuration,
37 : /// persistent acceptor state
38 : pub acceptor_state: AcceptorState,
39 : /// information about server
40 : pub server: ServerInfo,
41 : /// Unique id of the last *elected* proposer we dealt with. Not needed
42 : /// for correctness, exists for monitoring purposes.
43 : #[serde(with = "hex")]
44 : pub proposer_uuid: PgUuid,
45 : /// Since which LSN this timeline generally starts. Safekeeper might have
46 : /// joined later.
47 : pub timeline_start_lsn: Lsn,
48 : /// Since which LSN safekeeper has (had) WAL for this timeline.
49 : /// All WAL segments next to one containing local_start_lsn are
50 : /// filled with data from the beginning.
51 : pub local_start_lsn: Lsn,
52 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
53 : /// to record boundary.
54 : pub commit_lsn: Lsn,
55 : /// LSN that points to the end of the last backed up segment. Useful to
56 : /// persist to avoid finding out offloading progress on boot.
57 : pub backup_lsn: Lsn,
58 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
59 : /// of last record streamed to everyone). Persisting it helps skipping
60 : /// recovery in walproposer, generally we compute it from peers. In
61 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
62 : /// only by walproposer.
63 : pub peer_horizon_lsn: Lsn,
64 : /// LSN of the oldest known checkpoint made by pageserver and successfully
65 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
66 : /// informational purposes, we receive it from pageserver (or broker).
67 : pub remote_consistent_lsn: Lsn,
68 : /// Holds names of partial segments uploaded to remote storage. Used to
69 : /// clean up old objects without leaving garbage in remote storage.
70 : pub partial_backup: wal_backup_partial::State,
71 : /// Eviction state of the timeline. If it's Offloaded, we should download
72 : /// WAL files from remote storage to serve the timeline.
73 : pub eviction_state: EvictionState,
74 : pub creation_ts: SystemTime,
75 : }
76 :
77 : /// State of the local WAL files. Used to track current timeline state,
78 : /// that can be either WAL files are present on disk or last partial segment
79 : /// is offloaded to remote storage.
80 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
81 : pub enum EvictionState {
82 : /// WAL files are present on disk.
83 : Present,
84 : /// Last partial segment is offloaded to remote storage.
85 : /// Contains flush_lsn of the last offloaded segment.
86 : Offloaded(Lsn),
87 : }
88 :
89 : impl TimelinePersistentState {
90 : /// commit_lsn is the same as start_lsn in the normal creaiton; see
91 : /// `TimelineCreateRequest` comments.`
92 1440 : pub fn new(
93 1440 : ttid: &TenantTimelineId,
94 1440 : mconf: Configuration,
95 1440 : server_info: ServerInfo,
96 1440 : start_lsn: Lsn,
97 1440 : commit_lsn: Lsn,
98 1440 : ) -> anyhow::Result<TimelinePersistentState> {
99 1440 : if server_info.wal_seg_size == 0 {
100 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
101 1440 : }
102 1440 :
103 1440 : if server_info.pg_version == UNKNOWN_SERVER_VERSION {
104 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
105 1440 : }
106 1440 :
107 1440 : if commit_lsn < start_lsn {
108 0 : bail!(
109 0 : "commit_lsn {} is smaller than start_lsn {}",
110 0 : commit_lsn,
111 0 : start_lsn
112 0 : );
113 1440 : }
114 :
115 : // If we are given with init LSN, initialize term history with it. It
116 : // ensures that walproposer always must be able to find a common point
117 : // in histories; if it can't something is corrupted. Not having LSN here
118 : // is so far left for legacy case where timeline is created by compute
119 : // and LSN during creation is not known yet.
120 1440 : let term_history = if commit_lsn != Lsn::INVALID {
121 0 : TermHistory(vec![TermLsn {
122 0 : term: INITIAL_TERM,
123 0 : lsn: start_lsn,
124 0 : }])
125 : } else {
126 1440 : TermHistory::empty()
127 : };
128 :
129 1440 : Ok(TimelinePersistentState {
130 1440 : tenant_id: ttid.tenant_id,
131 1440 : timeline_id: ttid.timeline_id,
132 1440 : mconf,
133 1440 : acceptor_state: AcceptorState {
134 1440 : term: INITIAL_TERM,
135 1440 : term_history,
136 1440 : },
137 1440 : server: server_info,
138 1440 : proposer_uuid: [0; 16],
139 1440 : timeline_start_lsn: start_lsn,
140 1440 : local_start_lsn: start_lsn,
141 1440 : commit_lsn,
142 1440 : backup_lsn: start_lsn,
143 1440 : peer_horizon_lsn: start_lsn,
144 1440 : remote_consistent_lsn: Lsn(0),
145 1440 : partial_backup: wal_backup_partial::State::default(),
146 1440 : eviction_state: EvictionState::Present,
147 1440 : creation_ts: SystemTime::now(),
148 1440 : })
149 1440 : }
150 :
151 8 : pub fn empty() -> Self {
152 8 : TimelinePersistentState::new(
153 8 : &TenantTimelineId::empty(),
154 8 : Configuration::empty(),
155 8 : ServerInfo {
156 8 : pg_version: 170000, /* Postgres server version (major * 10000) */
157 8 : system_id: 0, /* Postgres system identifier */
158 8 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
159 8 : },
160 8 : Lsn::INVALID,
161 8 : Lsn::INVALID,
162 8 : )
163 8 : .unwrap()
164 8 : }
165 : }
166 :
167 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
168 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
169 : // are not flushed yet.
170 : pub struct TimelineMemState {
171 : pub commit_lsn: Lsn,
172 : pub backup_lsn: Lsn,
173 : pub peer_horizon_lsn: Lsn,
174 : pub remote_consistent_lsn: Lsn,
175 : #[serde(with = "hex")]
176 : pub proposer_uuid: PgUuid,
177 : }
178 :
179 : /// Safekeeper persistent state plus in memory layer.
180 : ///
181 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
182 : /// which don't need immediate persistence. Provides transactional like API
183 : /// to atomically update the state.
184 : ///
185 : /// Implements Deref into *persistent* part.
186 : pub struct TimelineState<CTRL: control_file::Storage> {
187 : pub inmem: TimelineMemState,
188 : pub pers: CTRL, // persistent
189 : }
190 :
191 : impl<CTRL> TimelineState<CTRL>
192 : where
193 : CTRL: control_file::Storage,
194 : {
195 8439 : pub fn new(state: CTRL) -> Self {
196 8439 : TimelineState {
197 8439 : inmem: TimelineMemState {
198 8439 : commit_lsn: state.commit_lsn,
199 8439 : backup_lsn: state.backup_lsn,
200 8439 : peer_horizon_lsn: state.peer_horizon_lsn,
201 8439 : remote_consistent_lsn: state.remote_consistent_lsn,
202 8439 : proposer_uuid: state.proposer_uuid,
203 8439 : },
204 8439 : pers: state,
205 8439 : }
206 8439 : }
207 :
208 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
209 : /// values applied; the protocol is to 1) change returned struct as desired
210 : /// 2) atomically persist it with finish_change.
211 4101 : pub fn start_change(&self) -> TimelinePersistentState {
212 4101 : let mut s = self.pers.clone();
213 4101 : s.commit_lsn = self.inmem.commit_lsn;
214 4101 : s.backup_lsn = self.inmem.backup_lsn;
215 4101 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
216 4101 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
217 4101 : s.proposer_uuid = self.inmem.proposer_uuid;
218 4101 : s
219 4101 : }
220 :
221 : /// Persist given state. c.f. start_change.
222 4101 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
223 4101 : if s.eq(&*self.pers) {
224 50 : // nothing to do if state didn't change
225 50 : } else {
226 4051 : self.pers.persist(s).await?;
227 : }
228 :
229 : // keep in memory values up to date
230 4101 : self.inmem.commit_lsn = s.commit_lsn;
231 4101 : self.inmem.backup_lsn = s.backup_lsn;
232 4101 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
233 4101 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
234 4101 : self.inmem.proposer_uuid = s.proposer_uuid;
235 4101 : Ok(())
236 4101 : }
237 :
238 : /// Flush in memory values.
239 127 : pub async fn flush(&mut self) -> Result<()> {
240 127 : let s = self.start_change();
241 127 : self.finish_change(&s).await
242 127 : }
243 :
244 : /// Make term at least as `to`. If `to` is None, increment current one. This
245 : /// is not in safekeeper.rs because we want to be able to do it even if
246 : /// timeline is offloaded.
247 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
248 0 : let before = self.acceptor_state.term;
249 0 : let mut state = self.start_change();
250 0 : let new = match to {
251 0 : Some(to) => max(state.acceptor_state.term, to),
252 0 : None => state.acceptor_state.term + 1,
253 : };
254 0 : if new > state.acceptor_state.term {
255 0 : state.acceptor_state.term = new;
256 0 : self.finish_change(&state).await?;
257 0 : }
258 0 : let after = self.acceptor_state.term;
259 0 : Ok(TimelineTermBumpResponse {
260 0 : previous_term: before,
261 0 : current_term: after,
262 0 : })
263 0 : }
264 :
265 : /// Switch into membership configuration `to` if it is higher than the
266 : /// current one.
267 0 : pub async fn membership_switch(
268 0 : &mut self,
269 0 : to: Configuration,
270 0 : ) -> Result<TimelineMembershipSwitchResponse> {
271 0 : let before = self.mconf.clone();
272 0 : // Is switch allowed?
273 0 : if to.generation <= self.mconf.generation {
274 0 : info!(
275 0 : "ignoring request to switch membership conf to lower {}, current conf {}",
276 0 : to, self.mconf
277 : );
278 : } else {
279 0 : let mut state = self.start_change();
280 0 : state.mconf = to.clone();
281 0 : self.finish_change(&state).await?;
282 0 : info!("switched membership conf to {} from {}", to, before);
283 : }
284 0 : Ok(TimelineMembershipSwitchResponse {
285 0 : previous_conf: before,
286 0 : current_conf: self.mconf.clone(),
287 0 : })
288 0 : }
289 : }
290 :
291 : impl<CTRL> Deref for TimelineState<CTRL>
292 : where
293 : CTRL: control_file::Storage,
294 : {
295 : type Target = TimelinePersistentState;
296 :
297 194565 : fn deref(&self) -> &Self::Target {
298 194565 : &self.pers
299 194565 : }
300 : }
|