Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::{cmp::max, ops::Deref};
5 :
6 : use anyhow::{bail, Result};
7 : use postgres_ffi::WAL_SEGMENT_SIZE;
8 : use safekeeper_api::{models::TimelineTermBumpResponse, ServerInfo, Term};
9 : use serde::{Deserialize, Serialize};
10 : use utils::{
11 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
12 : lsn::Lsn,
13 : };
14 :
15 : use crate::{
16 : control_file,
17 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, TermHistory, UNKNOWN_SERVER_VERSION},
18 : timeline::TimelineError,
19 : wal_backup_partial::{self},
20 : };
21 :
22 : /// Persistent information stored on safekeeper node about timeline.
23 : /// On disk data is prefixed by magic and format version and followed by checksum.
24 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25 : pub struct TimelinePersistentState {
26 : #[serde(with = "hex")]
27 : pub tenant_id: TenantId,
28 : #[serde(with = "hex")]
29 : pub timeline_id: TimelineId,
30 : /// persistent acceptor state
31 : pub acceptor_state: AcceptorState,
32 : /// information about server
33 : pub server: ServerInfo,
34 : /// Unique id of the last *elected* proposer we dealt with. Not needed
35 : /// for correctness, exists for monitoring purposes.
36 : #[serde(with = "hex")]
37 : pub proposer_uuid: PgUuid,
38 : /// Since which LSN this timeline generally starts. Safekeeper might have
39 : /// joined later.
40 : pub timeline_start_lsn: Lsn,
41 : /// Since which LSN safekeeper has (had) WAL for this timeline.
42 : /// All WAL segments next to one containing local_start_lsn are
43 : /// filled with data from the beginning.
44 : pub local_start_lsn: Lsn,
45 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
46 : /// to record boundary.
47 : pub commit_lsn: Lsn,
48 : /// LSN that points to the end of the last backed up segment. Useful to
49 : /// persist to avoid finding out offloading progress on boot.
50 : pub backup_lsn: Lsn,
51 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
52 : /// of last record streamed to everyone). Persisting it helps skipping
53 : /// recovery in walproposer, generally we compute it from peers. In
54 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
55 : /// only by walproposer.
56 : pub peer_horizon_lsn: Lsn,
57 : /// LSN of the oldest known checkpoint made by pageserver and successfully
58 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
59 : /// informational purposes, we receive it from pageserver (or broker).
60 : pub remote_consistent_lsn: Lsn,
61 : /// Peers and their state as we remember it. Knowing peers themselves is
62 : /// fundamental; but state is saved here only for informational purposes and
63 : /// obviously can be stale. (Currently not saved at all, but let's provision
64 : /// place to have less file version upgrades).
65 : pub peers: PersistedPeers,
66 : /// Holds names of partial segments uploaded to remote storage. Used to
67 : /// clean up old objects without leaving garbage in remote storage.
68 : pub partial_backup: wal_backup_partial::State,
69 : /// Eviction state of the timeline. If it's Offloaded, we should download
70 : /// WAL files from remote storage to serve the timeline.
71 : pub eviction_state: EvictionState,
72 : }
73 :
74 3 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
76 :
77 : /// State of the local WAL files. Used to track current timeline state,
78 : /// that can be either WAL files are present on disk or last partial segment
79 : /// is offloaded to remote storage.
80 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
81 : pub enum EvictionState {
82 : /// WAL files are present on disk.
83 : Present,
84 : /// Last partial segment is offloaded to remote storage.
85 : /// Contains flush_lsn of the last offloaded segment.
86 : Offloaded(Lsn),
87 : }
88 :
89 : impl TimelinePersistentState {
90 1461 : pub fn new(
91 1461 : ttid: &TenantTimelineId,
92 1461 : server_info: ServerInfo,
93 1461 : peers: Vec<NodeId>,
94 1461 : commit_lsn: Lsn,
95 1461 : local_start_lsn: Lsn,
96 1461 : ) -> anyhow::Result<TimelinePersistentState> {
97 1461 : if server_info.wal_seg_size == 0 {
98 0 : bail!(TimelineError::UninitializedWalSegSize(*ttid));
99 1461 : }
100 1461 :
101 1461 : if server_info.pg_version == UNKNOWN_SERVER_VERSION {
102 0 : bail!(TimelineError::UninitialinzedPgVersion(*ttid));
103 1461 : }
104 1461 :
105 1461 : if commit_lsn < local_start_lsn {
106 0 : bail!(
107 0 : "commit_lsn {} is smaller than local_start_lsn {}",
108 0 : commit_lsn,
109 0 : local_start_lsn
110 0 : );
111 1461 : }
112 1461 :
113 1461 : Ok(TimelinePersistentState {
114 1461 : tenant_id: ttid.tenant_id,
115 1461 : timeline_id: ttid.timeline_id,
116 1461 : acceptor_state: AcceptorState {
117 1461 : term: 0,
118 1461 : term_history: TermHistory::empty(),
119 1461 : },
120 1461 : server: server_info,
121 1461 : proposer_uuid: [0; 16],
122 1461 : timeline_start_lsn: Lsn(0),
123 1461 : local_start_lsn,
124 1461 : commit_lsn,
125 1461 : backup_lsn: local_start_lsn,
126 1461 : peer_horizon_lsn: local_start_lsn,
127 1461 : remote_consistent_lsn: Lsn(0),
128 1461 : peers: PersistedPeers(
129 1461 : peers
130 1461 : .iter()
131 1461 : .map(|p| (*p, PersistedPeerInfo::new()))
132 1461 : .collect(),
133 1461 : ),
134 1461 : partial_backup: wal_backup_partial::State::default(),
135 1461 : eviction_state: EvictionState::Present,
136 1461 : })
137 1461 : }
138 :
139 5 : pub fn empty() -> Self {
140 5 : TimelinePersistentState::new(
141 5 : &TenantTimelineId::empty(),
142 5 : ServerInfo {
143 5 : pg_version: 170000, /* Postgres server version (major * 10000) */
144 5 : system_id: 0, /* Postgres system identifier */
145 5 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
146 5 : },
147 5 : vec![],
148 5 : Lsn::INVALID,
149 5 : Lsn::INVALID,
150 5 : )
151 5 : .unwrap()
152 5 : }
153 : }
154 :
155 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
156 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
157 : // are not flushed yet.
158 : pub struct TimelineMemState {
159 : pub commit_lsn: Lsn,
160 : pub backup_lsn: Lsn,
161 : pub peer_horizon_lsn: Lsn,
162 : pub remote_consistent_lsn: Lsn,
163 : #[serde(with = "hex")]
164 : pub proposer_uuid: PgUuid,
165 : }
166 :
167 : /// Safekeeper persistent state plus in memory layer.
168 : ///
169 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
170 : /// which don't need immediate persistence. Provides transactional like API
171 : /// to atomically update the state.
172 : ///
173 : /// Implements Deref into *persistent* part.
174 : pub struct TimelineState<CTRL: control_file::Storage> {
175 : pub inmem: TimelineMemState,
176 : pub pers: CTRL, // persistent
177 : }
178 :
179 : impl<CTRL> TimelineState<CTRL>
180 : where
181 : CTRL: control_file::Storage,
182 : {
183 8922 : pub fn new(state: CTRL) -> Self {
184 8922 : TimelineState {
185 8922 : inmem: TimelineMemState {
186 8922 : commit_lsn: state.commit_lsn,
187 8922 : backup_lsn: state.backup_lsn,
188 8922 : peer_horizon_lsn: state.peer_horizon_lsn,
189 8922 : remote_consistent_lsn: state.remote_consistent_lsn,
190 8922 : proposer_uuid: state.proposer_uuid,
191 8922 : },
192 8922 : pers: state,
193 8922 : }
194 8922 : }
195 :
196 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
197 : /// values applied; the protocol is to 1) change returned struct as desired
198 : /// 2) atomically persist it with finish_change.
199 3407 : pub fn start_change(&self) -> TimelinePersistentState {
200 3407 : let mut s = self.pers.clone();
201 3407 : s.commit_lsn = self.inmem.commit_lsn;
202 3407 : s.backup_lsn = self.inmem.backup_lsn;
203 3407 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
204 3407 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
205 3407 : s.proposer_uuid = self.inmem.proposer_uuid;
206 3407 : s
207 3407 : }
208 :
209 : /// Persist given state. c.f. start_change.
210 3407 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
211 3407 : if s.eq(&*self.pers) {
212 31 : // nothing to do if state didn't change
213 31 : } else {
214 3376 : self.pers.persist(s).await?;
215 : }
216 :
217 : // keep in memory values up to date
218 3407 : self.inmem.commit_lsn = s.commit_lsn;
219 3407 : self.inmem.backup_lsn = s.backup_lsn;
220 3407 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
221 3407 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
222 3407 : self.inmem.proposer_uuid = s.proposer_uuid;
223 3407 : Ok(())
224 3407 : }
225 :
226 : /// Flush in memory values.
227 83 : pub async fn flush(&mut self) -> Result<()> {
228 83 : let s = self.start_change();
229 83 : self.finish_change(&s).await
230 83 : }
231 :
232 : /// Make term at least as `to`. If `to` is None, increment current one. This
233 : /// is not in safekeeper.rs because we want to be able to do it even if
234 : /// timeline is offloaded.
235 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
236 0 : let before = self.acceptor_state.term;
237 0 : let mut state = self.start_change();
238 0 : let new = match to {
239 0 : Some(to) => max(state.acceptor_state.term, to),
240 0 : None => state.acceptor_state.term + 1,
241 : };
242 0 : if new > state.acceptor_state.term {
243 0 : state.acceptor_state.term = new;
244 0 : self.finish_change(&state).await?;
245 0 : }
246 0 : let after = self.acceptor_state.term;
247 0 : Ok(TimelineTermBumpResponse {
248 0 : previous_term: before,
249 0 : current_term: after,
250 0 : })
251 0 : }
252 : }
253 :
254 : impl<CTRL> Deref for TimelineState<CTRL>
255 : where
256 : CTRL: control_file::Storage,
257 : {
258 : type Target = TimelinePersistentState;
259 :
260 184348 : fn deref(&self) -> &Self::Target {
261 184348 : &self.pers
262 184348 : }
263 : }
|