Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::cmp::max;
5 : use std::ops::Deref;
6 : use std::time::SystemTime;
7 :
8 : use anyhow::{Result, bail};
9 : use postgres_ffi::WAL_SEGMENT_SIZE;
10 : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
11 : use safekeeper_api::membership::Configuration;
12 : use safekeeper_api::models::TimelineTermBumpResponse;
13 : use safekeeper_api::{INITIAL_TERM, ServerInfo, Term};
14 : use serde::{Deserialize, Serialize};
15 : use tracing::info;
16 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
17 : use utils::lsn::Lsn;
18 :
19 : use crate::control_file;
20 : use crate::safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION};
21 : use crate::timeline::TimelineError;
22 : use crate::wal_backup_partial::{self};
23 :
24 : /// Persistent information stored on safekeeper node about timeline.
25 : /// On disk data is prefixed by magic and format version and followed by checksum.
26 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27 : pub struct TimelinePersistentState {
28 : #[serde(with = "hex")]
29 : pub tenant_id: TenantId,
30 : #[serde(with = "hex")]
31 : pub timeline_id: TimelineId,
32 : /// Membership configuration.
33 : pub mconf: Configuration,
34 : /// persistent acceptor state
35 : pub acceptor_state: AcceptorState,
36 : /// information about server
37 : pub server: ServerInfo,
38 : /// Unique id of the last *elected* proposer we dealt with. Not needed
39 : /// for correctness, exists for monitoring purposes.
40 : #[serde(with = "hex")]
41 : pub proposer_uuid: PgUuid,
42 : /// Since which LSN this timeline generally starts. Safekeeper might have
43 : /// joined later.
44 : pub timeline_start_lsn: Lsn,
45 : /// Since which LSN safekeeper has (had) WAL for this timeline.
46 : /// All WAL segments next to one containing local_start_lsn are
47 : /// filled with data from the beginning.
48 : pub local_start_lsn: Lsn,
49 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
50 : /// to record boundary.
51 : pub commit_lsn: Lsn,
52 : /// LSN that points to the end of the last backed up segment. Useful to
53 : /// persist to avoid finding out offloading progress on boot.
54 : pub backup_lsn: Lsn,
55 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
56 : /// of last record streamed to everyone). Persisting it helps skipping
57 : /// recovery in walproposer, generally we compute it from peers. In
58 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
59 : /// only by walproposer.
60 : pub peer_horizon_lsn: Lsn,
61 : /// LSN of the oldest known checkpoint made by pageserver and successfully
62 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
63 : /// informational purposes, we receive it from pageserver (or broker).
64 : pub remote_consistent_lsn: Lsn,
65 : /// Holds names of partial segments uploaded to remote storage. Used to
66 : /// clean up old objects without leaving garbage in remote storage.
67 : pub partial_backup: wal_backup_partial::State,
68 : /// Eviction state of the timeline. If it's Offloaded, we should download
69 : /// WAL files from remote storage to serve the timeline.
70 : pub eviction_state: EvictionState,
71 : pub creation_ts: SystemTime,
72 : }
73 :
74 : /// State of the local WAL files. Used to track current timeline state,
75 : /// that can be either WAL files are present on disk or last partial segment
76 : /// is offloaded to remote storage.
77 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
78 : pub enum EvictionState {
79 : /// WAL files are present on disk.
80 : Present,
81 : /// Last partial segment is offloaded to remote storage.
82 : /// Contains flush_lsn of the last offloaded segment.
83 : Offloaded(Lsn),
84 : }
85 :
86 : pub struct MembershipSwitchResult {
87 : pub previous_conf: Configuration,
88 : pub current_conf: Configuration,
89 : }
90 :
91 : impl TimelinePersistentState {
92 : /// commit_lsn is the same as start_lsn in the normal creaiton; see
93 : /// `TimelineCreateRequest` comments.`
94 1471 : pub fn new(
95 1471 : ttid: &TenantTimelineId,
96 1471 : mconf: Configuration,
97 1471 : server_info: ServerInfo,
98 1471 : start_lsn: Lsn,
99 1471 : commit_lsn: Lsn,
100 1471 : ) -> anyhow::Result<TimelinePersistentState> {
101 1471 : if server_info.wal_seg_size == 0 {
102 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
103 1471 : }
104 :
105 1471 : if server_info.pg_version == UNKNOWN_SERVER_VERSION {
106 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
107 1471 : }
108 :
109 1471 : if commit_lsn < start_lsn {
110 0 : bail!(
111 0 : "commit_lsn {} is smaller than start_lsn {}",
112 : commit_lsn,
113 : start_lsn
114 : );
115 1471 : }
116 :
117 : // If we are given with init LSN, initialize term history with it. It
118 : // ensures that walproposer always must be able to find a common point
119 : // in histories; if it can't something is corrupted. Not having LSN here
120 : // is so far left for legacy case where timeline is created by compute
121 : // and LSN during creation is not known yet.
122 1471 : let term_history = if commit_lsn != Lsn::INVALID {
123 0 : TermHistory(vec![TermLsn {
124 0 : term: INITIAL_TERM,
125 0 : lsn: start_lsn,
126 0 : }])
127 : } else {
128 1471 : TermHistory::empty()
129 : };
130 :
131 1471 : Ok(TimelinePersistentState {
132 1471 : tenant_id: ttid.tenant_id,
133 1471 : timeline_id: ttid.timeline_id,
134 1471 : mconf,
135 1471 : acceptor_state: AcceptorState {
136 1471 : term: INITIAL_TERM,
137 1471 : term_history,
138 1471 : },
139 1471 : server: server_info,
140 1471 : proposer_uuid: [0; 16],
141 1471 : timeline_start_lsn: start_lsn,
142 1471 : local_start_lsn: start_lsn,
143 1471 : commit_lsn,
144 1471 : backup_lsn: start_lsn,
145 1471 : peer_horizon_lsn: start_lsn,
146 1471 : remote_consistent_lsn: Lsn(0),
147 1471 : partial_backup: wal_backup_partial::State::default(),
148 1471 : eviction_state: EvictionState::Present,
149 1471 : creation_ts: SystemTime::now(),
150 1471 : })
151 1471 : }
152 :
153 10 : pub fn empty() -> Self {
154 10 : TimelinePersistentState::new(
155 10 : &TenantTimelineId::empty(),
156 10 : Configuration::empty(),
157 10 : ServerInfo {
158 10 : pg_version: PgVersionId::from(PgMajorVersion::PG17),
159 10 : system_id: 0, /* Postgres system identifier */
160 10 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
161 10 : },
162 : Lsn::INVALID,
163 : Lsn::INVALID,
164 : )
165 10 : .unwrap()
166 10 : }
167 : }
168 :
169 : #[derive(Debug, Clone, Serialize, Deserialize)]
170 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
171 : // are not flushed yet.
172 : pub struct TimelineMemState {
173 : pub commit_lsn: Lsn,
174 : pub backup_lsn: Lsn,
175 : pub peer_horizon_lsn: Lsn,
176 : pub remote_consistent_lsn: Lsn,
177 : #[serde(with = "hex")]
178 : pub proposer_uuid: PgUuid,
179 : }
180 :
181 : /// Safekeeper persistent state plus in memory layer.
182 : ///
183 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
184 : /// which don't need immediate persistence. Provides transactional like API
185 : /// to atomically update the state.
186 : ///
187 : /// Implements Deref into *persistent* part.
188 : pub struct TimelineState<CTRL: control_file::Storage> {
189 : pub inmem: TimelineMemState,
190 : pub pers: CTRL, // persistent
191 : }
192 :
193 : impl<CTRL> TimelineState<CTRL>
194 : where
195 : CTRL: control_file::Storage,
196 : {
197 9112 : pub fn new(state: CTRL) -> Self {
198 9112 : TimelineState {
199 9112 : inmem: TimelineMemState {
200 9112 : commit_lsn: state.commit_lsn,
201 9112 : backup_lsn: state.backup_lsn,
202 9112 : peer_horizon_lsn: state.peer_horizon_lsn,
203 9112 : remote_consistent_lsn: state.remote_consistent_lsn,
204 9112 : proposer_uuid: state.proposer_uuid,
205 9112 : },
206 9112 : pers: state,
207 9112 : }
208 9 : }
209 :
210 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
211 : /// values applied; the protocol is to 1) change returned struct as desired
212 : /// 2) atomically persist it with finish_change.
213 3276 : pub fn start_change(&self) -> TimelinePersistentState {
214 3276 : let mut s = self.pers.clone();
215 3276 : s.commit_lsn = self.inmem.commit_lsn;
216 3276 : s.backup_lsn = self.inmem.backup_lsn;
217 3276 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
218 3276 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
219 3276 : s.proposer_uuid = self.inmem.proposer_uuid;
220 3276 : s
221 40 : }
222 :
223 : /// Persist given state. c.f. start_change.
224 3276 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
225 3276 : if s.eq(&*self.pers) {
226 43 : // nothing to do if state didn't change
227 43 : } else {
228 3233 : self.pers.persist(s).await?;
229 : }
230 :
231 : // keep in memory values up to date
232 3275 : self.inmem.commit_lsn = s.commit_lsn;
233 3275 : self.inmem.backup_lsn = s.backup_lsn;
234 3275 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
235 3275 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
236 3275 : self.inmem.proposer_uuid = s.proposer_uuid;
237 3275 : Ok(())
238 39 : }
239 :
240 : /// Flush in memory values.
241 110 : pub async fn flush(&mut self) -> Result<()> {
242 110 : let s = self.start_change();
243 110 : self.finish_change(&s).await
244 24 : }
245 :
246 : /// Make term at least as `to`. If `to` is None, increment current one. This
247 : /// is not in safekeeper.rs because we want to be able to do it even if
248 : /// timeline is offloaded.
249 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
250 0 : let before = self.acceptor_state.term;
251 0 : let mut state = self.start_change();
252 0 : let new = match to {
253 0 : Some(to) => max(state.acceptor_state.term, to),
254 0 : None => state.acceptor_state.term + 1,
255 : };
256 0 : if new > state.acceptor_state.term {
257 0 : state.acceptor_state.term = new;
258 0 : self.finish_change(&state).await?;
259 0 : }
260 0 : let after = self.acceptor_state.term;
261 0 : Ok(TimelineTermBumpResponse {
262 0 : previous_term: before,
263 0 : current_term: after,
264 0 : })
265 0 : }
266 :
267 : /// Switch into membership configuration `to` if it is higher than the
268 : /// current one.
269 20318 : pub async fn membership_switch(&mut self, to: Configuration) -> Result<MembershipSwitchResult> {
270 20318 : let before = self.mconf.clone();
271 : // Is switch allowed?
272 20318 : if to.generation <= self.mconf.generation {
273 20318 : info!(
274 0 : "ignoring request to switch membership conf to {}, current conf {}",
275 0 : to, self.mconf
276 : );
277 : } else {
278 0 : let mut state = self.start_change();
279 0 : state.mconf = to.clone();
280 0 : self.finish_change(&state).await?;
281 0 : info!("switched membership conf to {} from {}", to, before);
282 : }
283 20318 : Ok(MembershipSwitchResult {
284 20318 : previous_conf: before,
285 20318 : current_conf: self.mconf.clone(),
286 20318 : })
287 0 : }
288 : }
289 :
290 : impl<CTRL> Deref for TimelineState<CTRL>
291 : where
292 : CTRL: control_file::Storage,
293 : {
294 : type Target = TimelinePersistentState;
295 :
296 286451 : fn deref(&self) -> &Self::Target {
297 286451 : &self.pers
298 7955 : }
299 : }
|