Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::ops::Deref;
5 :
6 : use anyhow::Result;
7 : use serde::{Deserialize, Serialize};
8 : use utils::{
9 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use crate::{
14 : control_file,
15 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
16 : };
17 :
18 : /// Persistent information stored on safekeeper node about timeline.
19 : /// On disk data is prefixed by magic and format version and followed by checksum.
20 232255 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
21 : pub struct TimelinePersistentState {
22 : #[serde(with = "hex")]
23 : pub tenant_id: TenantId,
24 : #[serde(with = "hex")]
25 : pub timeline_id: TimelineId,
26 : /// persistent acceptor state
27 : pub acceptor_state: AcceptorState,
28 : /// information about server
29 : pub server: ServerInfo,
30 : /// Unique id of the last *elected* proposer we dealt with. Not needed
31 : /// for correctness, exists for monitoring purposes.
32 : #[serde(with = "hex")]
33 : pub proposer_uuid: PgUuid,
34 : /// Since which LSN this timeline generally starts. Safekeeper might have
35 : /// joined later.
36 : pub timeline_start_lsn: Lsn,
37 : /// Since which LSN safekeeper has (had) WAL for this timeline.
38 : /// All WAL segments next to one containing local_start_lsn are
39 : /// filled with data from the beginning.
40 : pub local_start_lsn: Lsn,
41 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
42 : /// to record boundary.
43 : pub commit_lsn: Lsn,
44 : /// LSN that points to the end of the last backed up segment. Useful to
45 : /// persist to avoid finding out offloading progress on boot.
46 : pub backup_lsn: Lsn,
47 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
48 : /// of last record streamed to everyone). Persisting it helps skipping
49 : /// recovery in walproposer, generally we compute it from peers. In
50 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
51 : /// only by walproposer.
52 : pub peer_horizon_lsn: Lsn,
53 : /// LSN of the oldest known checkpoint made by pageserver and successfully
54 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
55 : /// informational purposes, we receive it from pageserver (or broker).
56 : pub remote_consistent_lsn: Lsn,
57 : // Peers and their state as we remember it. Knowing peers themselves is
58 : // fundamental; but state is saved here only for informational purposes and
59 : // obviously can be stale. (Currently not saved at all, but let's provision
60 : // place to have less file version upgrades).
61 : pub peers: PersistedPeers,
62 : }
63 :
64 232255 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
66 :
67 : impl TimelinePersistentState {
68 11918 : pub fn new(
69 11918 : ttid: &TenantTimelineId,
70 11918 : server_info: ServerInfo,
71 11918 : peers: Vec<NodeId>,
72 11918 : commit_lsn: Lsn,
73 11918 : local_start_lsn: Lsn,
74 11918 : ) -> TimelinePersistentState {
75 11918 : TimelinePersistentState {
76 11918 : tenant_id: ttid.tenant_id,
77 11918 : timeline_id: ttid.timeline_id,
78 11918 : acceptor_state: AcceptorState {
79 11918 : term: 0,
80 11918 : term_history: TermHistory::empty(),
81 11918 : },
82 11918 : server: server_info,
83 11918 : proposer_uuid: [0; 16],
84 11918 : timeline_start_lsn: Lsn(0),
85 11918 : local_start_lsn,
86 11918 : commit_lsn,
87 11918 : backup_lsn: local_start_lsn,
88 11918 : peer_horizon_lsn: local_start_lsn,
89 11918 : remote_consistent_lsn: Lsn(0),
90 11918 : peers: PersistedPeers(
91 11918 : peers
92 11918 : .iter()
93 11918 : .map(|p| (*p, PersistedPeerInfo::new()))
94 11918 : .collect(),
95 11918 : ),
96 11918 : }
97 11918 : }
98 :
99 : #[cfg(test)]
100 8 : pub fn empty() -> Self {
101 8 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
102 8 :
103 8 : TimelinePersistentState::new(
104 8 : &TenantTimelineId::empty(),
105 8 : ServerInfo {
106 8 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
107 8 : system_id: 0, /* Postgres system identifier */
108 8 : wal_seg_size: 0,
109 8 : },
110 8 : vec![],
111 8 : Lsn::INVALID,
112 8 : Lsn::INVALID,
113 8 : )
114 8 : }
115 : }
116 :
117 2950 : #[derive(Debug, Clone, Serialize, Deserialize)]
118 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
119 : // are not flushed yet.
120 : pub struct TimelineMemState {
121 : pub commit_lsn: Lsn,
122 : pub backup_lsn: Lsn,
123 : pub peer_horizon_lsn: Lsn,
124 : pub remote_consistent_lsn: Lsn,
125 : #[serde(with = "hex")]
126 : pub proposer_uuid: PgUuid,
127 : }
128 :
129 : /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
130 : /// when we update fields like commit_lsn which don't need immediate
131 : /// persistence. Provides transactional like API to atomically update the state.
132 : ///
133 : /// Implements Deref into *persistent* part.
134 : pub struct TimelineState<CTRL: control_file::Storage> {
135 : pub inmem: TimelineMemState,
136 : pub pers: CTRL, // persistent
137 : }
138 :
139 : impl<CTRL> TimelineState<CTRL>
140 : where
141 : CTRL: control_file::Storage,
142 : {
143 71044 : pub fn new(state: CTRL) -> Self {
144 71044 : TimelineState {
145 71044 : inmem: TimelineMemState {
146 71044 : commit_lsn: state.commit_lsn,
147 71044 : backup_lsn: state.backup_lsn,
148 71044 : peer_horizon_lsn: state.peer_horizon_lsn,
149 71044 : remote_consistent_lsn: state.remote_consistent_lsn,
150 71044 : proposer_uuid: state.proposer_uuid,
151 71044 : },
152 71044 : pers: state,
153 71044 : }
154 71044 : }
155 :
156 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
157 : /// values applied; the protocol is to 1) change returned struct as desired
158 : /// 2) atomically persist it with finish_change.
159 34898 : pub fn start_change(&self) -> TimelinePersistentState {
160 34898 : let mut s = self.pers.clone();
161 34898 : s.commit_lsn = self.inmem.commit_lsn;
162 34898 : s.backup_lsn = self.inmem.backup_lsn;
163 34898 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
164 34898 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
165 34898 : s.proposer_uuid = self.inmem.proposer_uuid;
166 34898 : s
167 34898 : }
168 :
169 : /// Persist given state. c.f. start_change.
170 34898 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
171 34898 : self.pers.persist(s).await?;
172 : // keep in memory values up to date
173 34898 : self.inmem.commit_lsn = s.commit_lsn;
174 34898 : self.inmem.backup_lsn = s.backup_lsn;
175 34898 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
176 34898 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
177 34898 : self.inmem.proposer_uuid = s.proposer_uuid;
178 34898 : Ok(())
179 34898 : }
180 :
181 : /// Flush in memory values.
182 2682 : pub async fn flush(&mut self) -> Result<()> {
183 2682 : let s = self.start_change();
184 5571 : self.finish_change(&s).await
185 2682 : }
186 : }
187 :
188 : impl<CTRL> Deref for TimelineState<CTRL>
189 : where
190 : CTRL: control_file::Storage,
191 : {
192 : type Target = TimelinePersistentState;
193 :
194 28387829 : fn deref(&self) -> &Self::Target {
195 28387829 : &self.pers
196 28387829 : }
197 : }
|