Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::ops::Deref;
5 :
6 : use anyhow::Result;
7 : use serde::{Deserialize, Serialize};
8 : use utils::{
9 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use crate::{
14 : control_file,
15 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
16 : wal_backup_partial::{self},
17 : };
18 :
19 : /// Persistent information stored on safekeeper node about timeline.
20 : /// On disk data is prefixed by magic and format version and followed by checksum.
21 12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22 : pub struct TimelinePersistentState {
23 : #[serde(with = "hex")]
24 : pub tenant_id: TenantId,
25 : #[serde(with = "hex")]
26 : pub timeline_id: TimelineId,
27 : /// persistent acceptor state
28 : pub acceptor_state: AcceptorState,
29 : /// information about server
30 : pub server: ServerInfo,
31 : /// Unique id of the last *elected* proposer we dealt with. Not needed
32 : /// for correctness, exists for monitoring purposes.
33 : #[serde(with = "hex")]
34 : pub proposer_uuid: PgUuid,
35 : /// Since which LSN this timeline generally starts. Safekeeper might have
36 : /// joined later.
37 : pub timeline_start_lsn: Lsn,
38 : /// Since which LSN safekeeper has (had) WAL for this timeline.
39 : /// All WAL segments next to one containing local_start_lsn are
40 : /// filled with data from the beginning.
41 : pub local_start_lsn: Lsn,
42 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
43 : /// to record boundary.
44 : pub commit_lsn: Lsn,
45 : /// LSN that points to the end of the last backed up segment. Useful to
46 : /// persist to avoid finding out offloading progress on boot.
47 : pub backup_lsn: Lsn,
48 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
49 : /// of last record streamed to everyone). Persisting it helps skipping
50 : /// recovery in walproposer, generally we compute it from peers. In
51 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
52 : /// only by walproposer.
53 : pub peer_horizon_lsn: Lsn,
54 : /// LSN of the oldest known checkpoint made by pageserver and successfully
55 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
56 : /// informational purposes, we receive it from pageserver (or broker).
57 : pub remote_consistent_lsn: Lsn,
58 : /// Peers and their state as we remember it. Knowing peers themselves is
59 : /// fundamental; but state is saved here only for informational purposes and
60 : /// obviously can be stale. (Currently not saved at all, but let's provision
61 : /// place to have less file version upgrades).
62 : pub peers: PersistedPeers,
63 : /// Holds names of partial segments uploaded to remote storage. Used to
64 : /// clean up old objects without leaving garbage in remote storage.
65 : pub partial_backup: wal_backup_partial::State,
66 : /// Eviction state of the timeline. If it's Offloaded, we should download
67 : /// WAL files from remote storage to serve the timeline.
68 : pub eviction_state: EvictionState,
69 : }
70 :
71 8 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
73 :
74 : /// State of the local WAL files. Used to track current timeline state,
75 : /// that can be either WAL files are present on disk or last partial segment
76 : /// is offloaded to remote storage.
77 4 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
78 : pub enum EvictionState {
79 : /// WAL files are present on disk.
80 : Present,
81 : /// Last partial segment is offloaded to remote storage.
82 : /// Contains flush_lsn of the last offloaded segment.
83 : Offloaded(Lsn),
84 : }
85 :
86 : impl TimelinePersistentState {
87 11446 : pub fn new(
88 11446 : ttid: &TenantTimelineId,
89 11446 : server_info: ServerInfo,
90 11446 : peers: Vec<NodeId>,
91 11446 : commit_lsn: Lsn,
92 11446 : local_start_lsn: Lsn,
93 11446 : ) -> TimelinePersistentState {
94 11446 : TimelinePersistentState {
95 11446 : tenant_id: ttid.tenant_id,
96 11446 : timeline_id: ttid.timeline_id,
97 11446 : acceptor_state: AcceptorState {
98 11446 : term: 0,
99 11446 : term_history: TermHistory::empty(),
100 11446 : },
101 11446 : server: server_info,
102 11446 : proposer_uuid: [0; 16],
103 11446 : timeline_start_lsn: Lsn(0),
104 11446 : local_start_lsn,
105 11446 : commit_lsn,
106 11446 : backup_lsn: local_start_lsn,
107 11446 : peer_horizon_lsn: local_start_lsn,
108 11446 : remote_consistent_lsn: Lsn(0),
109 11446 : peers: PersistedPeers(
110 11446 : peers
111 11446 : .iter()
112 11446 : .map(|p| (*p, PersistedPeerInfo::new()))
113 11446 : .collect(),
114 11446 : ),
115 11446 : partial_backup: wal_backup_partial::State::default(),
116 11446 : eviction_state: EvictionState::Present,
117 11446 : }
118 11446 : }
119 :
120 : #[cfg(test)]
121 8 : pub fn empty() -> Self {
122 8 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
123 8 :
124 8 : TimelinePersistentState::new(
125 8 : &TenantTimelineId::empty(),
126 8 : ServerInfo {
127 8 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
128 8 : system_id: 0, /* Postgres system identifier */
129 8 : wal_seg_size: 0,
130 8 : },
131 8 : vec![],
132 8 : Lsn::INVALID,
133 8 : Lsn::INVALID,
134 8 : )
135 8 : }
136 : }
137 :
138 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
139 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
140 : // are not flushed yet.
141 : pub struct TimelineMemState {
142 : pub commit_lsn: Lsn,
143 : pub backup_lsn: Lsn,
144 : pub peer_horizon_lsn: Lsn,
145 : pub remote_consistent_lsn: Lsn,
146 : #[serde(with = "hex")]
147 : pub proposer_uuid: PgUuid,
148 : }
149 :
150 : /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
151 : /// when we update fields like commit_lsn which don't need immediate
152 : /// persistence. Provides transactional like API to atomically update the state.
153 : ///
154 : /// Implements Deref into *persistent* part.
155 : pub struct TimelineState<CTRL: control_file::Storage> {
156 : pub inmem: TimelineMemState,
157 : pub pers: CTRL, // persistent
158 : }
159 :
160 : impl<CTRL> TimelineState<CTRL>
161 : where
162 : CTRL: control_file::Storage,
163 : {
164 69825 : pub fn new(state: CTRL) -> Self {
165 69825 : TimelineState {
166 69825 : inmem: TimelineMemState {
167 69825 : commit_lsn: state.commit_lsn,
168 69825 : backup_lsn: state.backup_lsn,
169 69825 : peer_horizon_lsn: state.peer_horizon_lsn,
170 69825 : remote_consistent_lsn: state.remote_consistent_lsn,
171 69825 : proposer_uuid: state.proposer_uuid,
172 69825 : },
173 69825 : pers: state,
174 69825 : }
175 69825 : }
176 :
177 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
178 : /// values applied; the protocol is to 1) change returned struct as desired
179 : /// 2) atomically persist it with finish_change.
180 28810 : pub fn start_change(&self) -> TimelinePersistentState {
181 28810 : let mut s = self.pers.clone();
182 28810 : s.commit_lsn = self.inmem.commit_lsn;
183 28810 : s.backup_lsn = self.inmem.backup_lsn;
184 28810 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
185 28810 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
186 28810 : s.proposer_uuid = self.inmem.proposer_uuid;
187 28810 : s
188 28810 : }
189 :
190 : /// Persist given state. c.f. start_change.
191 28810 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
192 28810 : self.pers.persist(s).await?;
193 : // keep in memory values up to date
194 28810 : self.inmem.commit_lsn = s.commit_lsn;
195 28810 : self.inmem.backup_lsn = s.backup_lsn;
196 28810 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
197 28810 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
198 28810 : self.inmem.proposer_uuid = s.proposer_uuid;
199 28810 : Ok(())
200 28810 : }
201 :
202 : /// Flush in memory values.
203 793 : pub async fn flush(&mut self) -> Result<()> {
204 793 : let s = self.start_change();
205 793 : self.finish_change(&s).await
206 793 : }
207 : }
208 :
209 : impl<CTRL> Deref for TimelineState<CTRL>
210 : where
211 : CTRL: control_file::Storage,
212 : {
213 : type Target = TimelinePersistentState;
214 :
215 1330869 : fn deref(&self) -> &Self::Target {
216 1330869 : &self.pers
217 1330869 : }
218 : }
|