Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::ops::Deref;
5 :
6 : use anyhow::Result;
7 : use serde::{Deserialize, Serialize};
8 : use utils::{
9 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use crate::{
14 : control_file,
15 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
16 : wal_backup_partial::{self},
17 : };
18 :
19 : /// Persistent information stored on safekeeper node about timeline.
20 : /// On disk data is prefixed by magic and format version and followed by checksum.
21 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22 : pub struct TimelinePersistentState {
23 : #[serde(with = "hex")]
24 : pub tenant_id: TenantId,
25 : #[serde(with = "hex")]
26 : pub timeline_id: TimelineId,
27 : /// persistent acceptor state
28 : pub acceptor_state: AcceptorState,
29 : /// information about server
30 : pub server: ServerInfo,
31 : /// Unique id of the last *elected* proposer we dealt with. Not needed
32 : /// for correctness, exists for monitoring purposes.
33 : #[serde(with = "hex")]
34 : pub proposer_uuid: PgUuid,
35 : /// Since which LSN this timeline generally starts. Safekeeper might have
36 : /// joined later.
37 : pub timeline_start_lsn: Lsn,
38 : /// Since which LSN safekeeper has (had) WAL for this timeline.
39 : /// All WAL segments next to one containing local_start_lsn are
40 : /// filled with data from the beginning.
41 : pub local_start_lsn: Lsn,
42 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
43 : /// to record boundary.
44 : pub commit_lsn: Lsn,
45 : /// LSN that points to the end of the last backed up segment. Useful to
46 : /// persist to avoid finding out offloading progress on boot.
47 : pub backup_lsn: Lsn,
48 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
49 : /// of last record streamed to everyone). Persisting it helps skipping
50 : /// recovery in walproposer, generally we compute it from peers. In
51 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
52 : /// only by walproposer.
53 : pub peer_horizon_lsn: Lsn,
54 : /// LSN of the oldest known checkpoint made by pageserver and successfully
55 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
56 : /// informational purposes, we receive it from pageserver (or broker).
57 : pub remote_consistent_lsn: Lsn,
58 : /// Peers and their state as we remember it. Knowing peers themselves is
59 : /// fundamental; but state is saved here only for informational purposes and
60 : /// obviously can be stale. (Currently not saved at all, but let's provision
61 : /// place to have less file version upgrades).
62 : pub peers: PersistedPeers,
63 : /// Holds names of partial segments uploaded to remote storage. Used to
64 : /// clean up old objects without leaving garbage in remote storage.
65 : pub partial_backup: wal_backup_partial::State,
66 : /// Eviction state of the timeline. If it's Offloaded, we should download
67 : /// WAL files from remote storage to serve the timeline.
68 : pub eviction_state: EvictionState,
69 : }
70 :
71 4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
73 :
74 : /// State of the local WAL files. Used to track current timeline state,
75 : /// that can be either WAL files are present on disk or last partial segment
76 : /// is offloaded to remote storage.
77 2 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
78 : pub enum EvictionState {
79 : /// WAL files are present on disk.
80 : Present,
81 : /// Last partial segment is offloaded to remote storage.
82 : /// Contains flush_lsn of the last offloaded segment.
83 : Offloaded(Lsn),
84 : }
85 :
86 : impl TimelinePersistentState {
87 5736 : pub fn new(
88 5736 : ttid: &TenantTimelineId,
89 5736 : server_info: ServerInfo,
90 5736 : peers: Vec<NodeId>,
91 5736 : commit_lsn: Lsn,
92 5736 : local_start_lsn: Lsn,
93 5736 : ) -> TimelinePersistentState {
94 5736 : TimelinePersistentState {
95 5736 : tenant_id: ttid.tenant_id,
96 5736 : timeline_id: ttid.timeline_id,
97 5736 : acceptor_state: AcceptorState {
98 5736 : term: 0,
99 5736 : term_history: TermHistory::empty(),
100 5736 : },
101 5736 : server: server_info,
102 5736 : proposer_uuid: [0; 16],
103 5736 : timeline_start_lsn: Lsn(0),
104 5736 : local_start_lsn,
105 5736 : commit_lsn,
106 5736 : backup_lsn: local_start_lsn,
107 5736 : peer_horizon_lsn: local_start_lsn,
108 5736 : remote_consistent_lsn: Lsn(0),
109 5736 : peers: PersistedPeers(
110 5736 : peers
111 5736 : .iter()
112 5736 : .map(|p| (*p, PersistedPeerInfo::new()))
113 5736 : .collect(),
114 5736 : ),
115 5736 : partial_backup: wal_backup_partial::State::default(),
116 5736 : eviction_state: EvictionState::Present,
117 5736 : }
118 5736 : }
119 :
120 : #[cfg(test)]
121 5 : pub fn empty() -> Self {
122 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
123 :
124 5 : TimelinePersistentState::new(
125 5 : &TenantTimelineId::empty(),
126 5 : ServerInfo {
127 5 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
128 5 : system_id: 0, /* Postgres system identifier */
129 5 : wal_seg_size: 0,
130 5 : },
131 5 : vec![],
132 5 : Lsn::INVALID,
133 5 : Lsn::INVALID,
134 5 : )
135 5 : }
136 : }
137 :
138 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
139 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
140 : // are not flushed yet.
141 : pub struct TimelineMemState {
142 : pub commit_lsn: Lsn,
143 : pub backup_lsn: Lsn,
144 : pub peer_horizon_lsn: Lsn,
145 : pub remote_consistent_lsn: Lsn,
146 : #[serde(with = "hex")]
147 : pub proposer_uuid: PgUuid,
148 : }
149 :
150 : /// Safekeeper persistent state plus in memory layer.
151 : ///
152 : /// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
153 : /// which don't need immediate persistence. Provides transactional like API
154 : /// to atomically update the state.
155 : ///
156 : /// Implements Deref into *persistent* part.
157 : pub struct TimelineState<CTRL: control_file::Storage> {
158 : pub inmem: TimelineMemState,
159 : pub pers: CTRL, // persistent
160 : }
161 :
162 : impl<CTRL> TimelineState<CTRL>
163 : where
164 : CTRL: control_file::Storage,
165 : {
166 35172 : pub fn new(state: CTRL) -> Self {
167 35172 : TimelineState {
168 35172 : inmem: TimelineMemState {
169 35172 : commit_lsn: state.commit_lsn,
170 35172 : backup_lsn: state.backup_lsn,
171 35172 : peer_horizon_lsn: state.peer_horizon_lsn,
172 35172 : remote_consistent_lsn: state.remote_consistent_lsn,
173 35172 : proposer_uuid: state.proposer_uuid,
174 35172 : },
175 35172 : pers: state,
176 35172 : }
177 35172 : }
178 :
179 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
180 : /// values applied; the protocol is to 1) change returned struct as desired
181 : /// 2) atomically persist it with finish_change.
182 15466 : pub fn start_change(&self) -> TimelinePersistentState {
183 15466 : let mut s = self.pers.clone();
184 15466 : s.commit_lsn = self.inmem.commit_lsn;
185 15466 : s.backup_lsn = self.inmem.backup_lsn;
186 15466 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
187 15466 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
188 15466 : s.proposer_uuid = self.inmem.proposer_uuid;
189 15466 : s
190 15466 : }
191 :
192 : /// Persist given state. c.f. start_change.
193 15466 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
194 15466 : if s.eq(&*self.pers) {
195 165 : // nothing to do if state didn't change
196 165 : } else {
197 15301 : self.pers.persist(s).await?;
198 : }
199 :
200 : // keep in memory values up to date
201 15466 : self.inmem.commit_lsn = s.commit_lsn;
202 15466 : self.inmem.backup_lsn = s.backup_lsn;
203 15466 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
204 15466 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
205 15466 : self.inmem.proposer_uuid = s.proposer_uuid;
206 15466 : Ok(())
207 15466 : }
208 :
209 : /// Flush in memory values.
210 489 : pub async fn flush(&mut self) -> Result<()> {
211 489 : let s = self.start_change();
212 489 : self.finish_change(&s).await
213 489 : }
214 : }
215 :
216 : impl<CTRL> Deref for TimelineState<CTRL>
217 : where
218 : CTRL: control_file::Storage,
219 : {
220 : type Target = TimelinePersistentState;
221 :
222 752244 : fn deref(&self) -> &Self::Target {
223 752244 : &self.pers
224 752244 : }
225 : }
|