Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::ops::Deref;
5 :
6 : use anyhow::Result;
7 : use serde::{Deserialize, Serialize};
8 : use utils::{
9 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use crate::{
14 : control_file,
15 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
16 : wal_backup_partial::{self},
17 : };
18 :
19 : /// Persistent information stored on safekeeper node about timeline.
20 : /// On disk data is prefixed by magic and format version and followed by checksum.
21 24 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22 : pub struct TimelinePersistentState {
23 : #[serde(with = "hex")]
24 : pub tenant_id: TenantId,
25 : #[serde(with = "hex")]
26 : pub timeline_id: TimelineId,
27 : /// persistent acceptor state
28 : pub acceptor_state: AcceptorState,
29 : /// information about server
30 : pub server: ServerInfo,
31 : /// Unique id of the last *elected* proposer we dealt with. Not needed
32 : /// for correctness, exists for monitoring purposes.
33 : #[serde(with = "hex")]
34 : pub proposer_uuid: PgUuid,
35 : /// Since which LSN this timeline generally starts. Safekeeper might have
36 : /// joined later.
37 : pub timeline_start_lsn: Lsn,
38 : /// Since which LSN safekeeper has (had) WAL for this timeline.
39 : /// All WAL segments next to one containing local_start_lsn are
40 : /// filled with data from the beginning.
41 : pub local_start_lsn: Lsn,
42 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
43 : /// to record boundary.
44 : pub commit_lsn: Lsn,
45 : /// LSN that points to the end of the last backed up segment. Useful to
46 : /// persist to avoid finding out offloading progress on boot.
47 : pub backup_lsn: Lsn,
48 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
49 : /// of last record streamed to everyone). Persisting it helps skipping
50 : /// recovery in walproposer, generally we compute it from peers. In
51 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
52 : /// only by walproposer.
53 : pub peer_horizon_lsn: Lsn,
54 : /// LSN of the oldest known checkpoint made by pageserver and successfully
55 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
56 : /// informational purposes, we receive it from pageserver (or broker).
57 : pub remote_consistent_lsn: Lsn,
58 : /// Peers and their state as we remember it. Knowing peers themselves is
59 : /// fundamental; but state is saved here only for informational purposes and
60 : /// obviously can be stale. (Currently not saved at all, but let's provision
61 : /// place to have less file version upgrades).
62 : pub peers: PersistedPeers,
63 : /// Holds names of partial segments uploaded to remote storage. Used to
64 : /// clean up old objects without leaving garbage in remote storage.
65 : pub partial_backup: wal_backup_partial::State,
66 : }
67 :
68 8 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
69 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
70 :
71 : impl TimelinePersistentState {
72 11460 : pub fn new(
73 11460 : ttid: &TenantTimelineId,
74 11460 : server_info: ServerInfo,
75 11460 : peers: Vec<NodeId>,
76 11460 : commit_lsn: Lsn,
77 11460 : local_start_lsn: Lsn,
78 11460 : ) -> TimelinePersistentState {
79 11460 : TimelinePersistentState {
80 11460 : tenant_id: ttid.tenant_id,
81 11460 : timeline_id: ttid.timeline_id,
82 11460 : acceptor_state: AcceptorState {
83 11460 : term: 0,
84 11460 : term_history: TermHistory::empty(),
85 11460 : },
86 11460 : server: server_info,
87 11460 : proposer_uuid: [0; 16],
88 11460 : timeline_start_lsn: Lsn(0),
89 11460 : local_start_lsn,
90 11460 : commit_lsn,
91 11460 : backup_lsn: local_start_lsn,
92 11460 : peer_horizon_lsn: local_start_lsn,
93 11460 : remote_consistent_lsn: Lsn(0),
94 11460 : peers: PersistedPeers(
95 11460 : peers
96 11460 : .iter()
97 11460 : .map(|p| (*p, PersistedPeerInfo::new()))
98 11460 : .collect(),
99 11460 : ),
100 11460 : partial_backup: wal_backup_partial::State::default(),
101 11460 : }
102 11460 : }
103 :
104 : #[cfg(test)]
105 8 : pub fn empty() -> Self {
106 8 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
107 8 :
108 8 : TimelinePersistentState::new(
109 8 : &TenantTimelineId::empty(),
110 8 : ServerInfo {
111 8 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
112 8 : system_id: 0, /* Postgres system identifier */
113 8 : wal_seg_size: 0,
114 8 : },
115 8 : vec![],
116 8 : Lsn::INVALID,
117 8 : Lsn::INVALID,
118 8 : )
119 8 : }
120 : }
121 :
122 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
123 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
124 : // are not flushed yet.
125 : pub struct TimelineMemState {
126 : pub commit_lsn: Lsn,
127 : pub backup_lsn: Lsn,
128 : pub peer_horizon_lsn: Lsn,
129 : pub remote_consistent_lsn: Lsn,
130 : #[serde(with = "hex")]
131 : pub proposer_uuid: PgUuid,
132 : }
133 :
134 : /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
135 : /// when we update fields like commit_lsn which don't need immediate
136 : /// persistence. Provides transactional like API to atomically update the state.
137 : ///
138 : /// Implements Deref into *persistent* part.
139 : pub struct TimelineState<CTRL: control_file::Storage> {
140 : pub inmem: TimelineMemState,
141 : pub pers: CTRL, // persistent
142 : }
143 :
144 : impl<CTRL> TimelineState<CTRL>
145 : where
146 : CTRL: control_file::Storage,
147 : {
148 70214 : pub fn new(state: CTRL) -> Self {
149 70214 : TimelineState {
150 70214 : inmem: TimelineMemState {
151 70214 : commit_lsn: state.commit_lsn,
152 70214 : backup_lsn: state.backup_lsn,
153 70214 : peer_horizon_lsn: state.peer_horizon_lsn,
154 70214 : remote_consistent_lsn: state.remote_consistent_lsn,
155 70214 : proposer_uuid: state.proposer_uuid,
156 70214 : },
157 70214 : pers: state,
158 70214 : }
159 70214 : }
160 :
161 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
162 : /// values applied; the protocol is to 1) change returned struct as desired
163 : /// 2) atomically persist it with finish_change.
164 28024 : pub fn start_change(&self) -> TimelinePersistentState {
165 28024 : let mut s = self.pers.clone();
166 28024 : s.commit_lsn = self.inmem.commit_lsn;
167 28024 : s.backup_lsn = self.inmem.backup_lsn;
168 28024 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
169 28024 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
170 28024 : s.proposer_uuid = self.inmem.proposer_uuid;
171 28024 : s
172 28024 : }
173 :
174 : /// Persist given state. c.f. start_change.
175 28024 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
176 28024 : self.pers.persist(s).await?;
177 : // keep in memory values up to date
178 28024 : self.inmem.commit_lsn = s.commit_lsn;
179 28024 : self.inmem.backup_lsn = s.backup_lsn;
180 28024 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
181 28024 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
182 28024 : self.inmem.proposer_uuid = s.proposer_uuid;
183 28024 : Ok(())
184 28024 : }
185 :
186 : /// Flush in memory values.
187 750 : pub async fn flush(&mut self) -> Result<()> {
188 750 : let s = self.start_change();
189 750 : self.finish_change(&s).await
190 750 : }
191 : }
192 :
193 : impl<CTRL> Deref for TimelineState<CTRL>
194 : where
195 : CTRL: control_file::Storage,
196 : {
197 : type Target = TimelinePersistentState;
198 :
199 1323592 : fn deref(&self) -> &Self::Target {
200 1323592 : &self.pers
201 1323592 : }
202 : }
|