Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::{cmp::max, ops::Deref};
5 :
6 : use anyhow::Result;
7 : use safekeeper_api::models::TimelineTermBumpResponse;
8 : use serde::{Deserialize, Serialize};
9 : use utils::{
10 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
11 : lsn::Lsn,
12 : };
13 :
14 : use crate::{
15 : control_file,
16 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
17 : wal_backup_partial::{self},
18 : };
19 :
20 : /// Persistent information stored on safekeeper node about timeline.
21 : /// On disk data is prefixed by magic and format version and followed by checksum.
22 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
23 : pub struct TimelinePersistentState {
24 : #[serde(with = "hex")]
25 : pub tenant_id: TenantId,
26 : #[serde(with = "hex")]
27 : pub timeline_id: TimelineId,
28 : /// persistent acceptor state
29 : pub acceptor_state: AcceptorState,
30 : /// information about server
31 : pub server: ServerInfo,
32 : /// Unique id of the last *elected* proposer we dealt with. Not needed
33 : /// for correctness, exists for monitoring purposes.
34 : #[serde(with = "hex")]
35 : pub proposer_uuid: PgUuid,
36 : /// Since which LSN this timeline generally starts. Safekeeper might have
37 : /// joined later.
38 : pub timeline_start_lsn: Lsn,
39 : /// Since which LSN safekeeper has (had) WAL for this timeline.
40 : /// All WAL segments next to one containing local_start_lsn are
41 : /// filled with data from the beginning.
42 : pub local_start_lsn: Lsn,
43 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
44 : /// to record boundary.
45 : pub commit_lsn: Lsn,
46 : /// LSN that points to the end of the last backed up segment. Useful to
47 : /// persist to avoid finding out offloading progress on boot.
48 : pub backup_lsn: Lsn,
49 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
50 : /// of last record streamed to everyone). Persisting it helps skipping
51 : /// recovery in walproposer, generally we compute it from peers. In
52 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
53 : /// only by walproposer.
54 : pub peer_horizon_lsn: Lsn,
55 : /// LSN of the oldest known checkpoint made by pageserver and successfully
56 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
57 : /// informational purposes, we receive it from pageserver (or broker).
58 : pub remote_consistent_lsn: Lsn,
59 : /// Peers and their state as we remember it. Knowing peers themselves is
60 : /// fundamental; but state is saved here only for informational purposes and
61 : /// obviously can be stale. (Currently not saved at all, but let's provision
62 : /// place to have less file version upgrades).
63 : pub peers: PersistedPeers,
64 : /// Holds names of partial segments uploaded to remote storage. Used to
65 : /// clean up old objects without leaving garbage in remote storage.
66 : pub partial_backup: wal_backup_partial::State,
67 : /// Eviction state of the timeline. If it's Offloaded, we should download
68 : /// WAL files from remote storage to serve the timeline.
69 : pub eviction_state: EvictionState,
70 : }
71 :
72 4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
73 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
74 :
75 : /// State of the local WAL files. Used to track current timeline state,
76 : /// that can be either WAL files are present on disk or last partial segment
77 : /// is offloaded to remote storage.
78 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
79 : pub enum EvictionState {
80 : /// WAL files are present on disk.
81 : Present,
82 : /// Last partial segment is offloaded to remote storage.
83 : /// Contains flush_lsn of the last offloaded segment.
84 : Offloaded(Lsn),
85 : }
86 :
87 : impl TimelinePersistentState {
88 1445 : pub fn new(
89 1445 : ttid: &TenantTimelineId,
90 1445 : server_info: ServerInfo,
91 1445 : peers: Vec<NodeId>,
92 1445 : commit_lsn: Lsn,
93 1445 : local_start_lsn: Lsn,
94 1445 : ) -> TimelinePersistentState {
95 1445 : TimelinePersistentState {
96 1445 : tenant_id: ttid.tenant_id,
97 1445 : timeline_id: ttid.timeline_id,
98 1445 : acceptor_state: AcceptorState {
99 1445 : term: 0,
100 1445 : term_history: TermHistory::empty(),
101 1445 : },
102 1445 : server: server_info,
103 1445 : proposer_uuid: [0; 16],
104 1445 : timeline_start_lsn: Lsn(0),
105 1445 : local_start_lsn,
106 1445 : commit_lsn,
107 1445 : backup_lsn: local_start_lsn,
108 1445 : peer_horizon_lsn: local_start_lsn,
109 1445 : remote_consistent_lsn: Lsn(0),
110 1445 : peers: PersistedPeers(
111 1445 : peers
112 1445 : .iter()
113 1445 : .map(|p| (*p, PersistedPeerInfo::new()))
114 1445 : .collect(),
115 1445 : ),
116 1445 : partial_backup: wal_backup_partial::State::default(),
117 1445 : eviction_state: EvictionState::Present,
118 1445 : }
119 1445 : }
120 :
121 : #[cfg(test)]
122 5 : pub fn empty() -> Self {
123 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
124 :
125 5 : TimelinePersistentState::new(
126 5 : &TenantTimelineId::empty(),
127 5 : ServerInfo {
128 5 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
129 5 : system_id: 0, /* Postgres system identifier */
130 5 : wal_seg_size: 0,
131 5 : },
132 5 : vec![],
133 5 : Lsn::INVALID,
134 5 : Lsn::INVALID,
135 5 : )
136 5 : }
137 : }
138 :
139 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
140 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
141 : // are not flushed yet.
142 : pub struct TimelineMemState {
143 : pub commit_lsn: Lsn,
144 : pub backup_lsn: Lsn,
145 : pub peer_horizon_lsn: Lsn,
146 : pub remote_consistent_lsn: Lsn,
147 : #[serde(with = "hex")]
148 : pub proposer_uuid: PgUuid,
149 : }
150 :
151 : /// Safekeeper persistent state plus in memory layer.
152 : ///
153 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
154 : /// which don't need immediate persistence. Provides transactional like API
155 : /// to atomically update the state.
156 : ///
157 : /// Implements Deref into *persistent* part.
158 : pub struct TimelineState<CTRL: control_file::Storage> {
159 : pub inmem: TimelineMemState,
160 : pub pers: CTRL, // persistent
161 : }
162 :
163 : impl<CTRL> TimelineState<CTRL>
164 : where
165 : CTRL: control_file::Storage,
166 : {
167 8796 : pub fn new(state: CTRL) -> Self {
168 8796 : TimelineState {
169 8796 : inmem: TimelineMemState {
170 8796 : commit_lsn: state.commit_lsn,
171 8796 : backup_lsn: state.backup_lsn,
172 8796 : peer_horizon_lsn: state.peer_horizon_lsn,
173 8796 : remote_consistent_lsn: state.remote_consistent_lsn,
174 8796 : proposer_uuid: state.proposer_uuid,
175 8796 : },
176 8796 : pers: state,
177 8796 : }
178 8796 : }
179 :
180 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
181 : /// values applied; the protocol is to 1) change returned struct as desired
182 : /// 2) atomically persist it with finish_change.
183 4096 : pub fn start_change(&self) -> TimelinePersistentState {
184 4096 : let mut s = self.pers.clone();
185 4096 : s.commit_lsn = self.inmem.commit_lsn;
186 4096 : s.backup_lsn = self.inmem.backup_lsn;
187 4096 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
188 4096 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
189 4096 : s.proposer_uuid = self.inmem.proposer_uuid;
190 4096 : s
191 4096 : }
192 :
193 : /// Persist given state. c.f. start_change.
194 4096 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
195 4096 : if s.eq(&*self.pers) {
196 57 : // nothing to do if state didn't change
197 57 : } else {
198 4039 : self.pers.persist(s).await?;
199 : }
200 :
201 : // keep in memory values up to date
202 4096 : self.inmem.commit_lsn = s.commit_lsn;
203 4096 : self.inmem.backup_lsn = s.backup_lsn;
204 4096 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
205 4096 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
206 4096 : self.inmem.proposer_uuid = s.proposer_uuid;
207 4096 : Ok(())
208 4096 : }
209 :
210 : /// Flush in memory values.
211 121 : pub async fn flush(&mut self) -> Result<()> {
212 121 : let s = self.start_change();
213 121 : self.finish_change(&s).await
214 121 : }
215 :
216 : /// Make term at least as `to`. If `to` is None, increment current one. This
217 : /// is not in safekeeper.rs because we want to be able to do it even if
218 : /// timeline is offloaded.
219 0 : pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
220 0 : let before = self.acceptor_state.term;
221 0 : let mut state = self.start_change();
222 0 : let new = match to {
223 0 : Some(to) => max(state.acceptor_state.term, to),
224 0 : None => state.acceptor_state.term + 1,
225 : };
226 0 : if new > state.acceptor_state.term {
227 0 : state.acceptor_state.term = new;
228 0 : self.finish_change(&state).await?;
229 0 : }
230 0 : let after = self.acceptor_state.term;
231 0 : Ok(TimelineTermBumpResponse {
232 0 : previous_term: before,
233 0 : current_term: after,
234 0 : })
235 0 : }
236 : }
237 :
238 : impl<CTRL> Deref for TimelineState<CTRL>
239 : where
240 : CTRL: control_file::Storage,
241 : {
242 : type Target = TimelinePersistentState;
243 :
244 198591 : fn deref(&self) -> &Self::Target {
245 198591 : &self.pers
246 198591 : }
247 : }
|