Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::ops::Deref;
5 :
6 : use anyhow::Result;
7 : use serde::{Deserialize, Serialize};
8 : use utils::{
9 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use crate::{
14 : control_file,
15 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
16 : wal_backup_partial::{self},
17 : };
18 :
19 : /// Persistent information stored on safekeeper node about timeline.
20 : /// On disk data is prefixed by magic and format version and followed by checksum.
21 12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22 : pub struct TimelinePersistentState {
23 : #[serde(with = "hex")]
24 : pub tenant_id: TenantId,
25 : #[serde(with = "hex")]
26 : pub timeline_id: TimelineId,
27 : /// persistent acceptor state
28 : pub acceptor_state: AcceptorState,
29 : /// information about server
30 : pub server: ServerInfo,
31 : /// Unique id of the last *elected* proposer we dealt with. Not needed
32 : /// for correctness, exists for monitoring purposes.
33 : #[serde(with = "hex")]
34 : pub proposer_uuid: PgUuid,
35 : /// Since which LSN this timeline generally starts. Safekeeper might have
36 : /// joined later.
37 : pub timeline_start_lsn: Lsn,
38 : /// Since which LSN safekeeper has (had) WAL for this timeline.
39 : /// All WAL segments next to one containing local_start_lsn are
40 : /// filled with data from the beginning.
41 : pub local_start_lsn: Lsn,
42 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
43 : /// to record boundary.
44 : pub commit_lsn: Lsn,
45 : /// LSN that points to the end of the last backed up segment. Useful to
46 : /// persist to avoid finding out offloading progress on boot.
47 : pub backup_lsn: Lsn,
48 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
49 : /// of last record streamed to everyone). Persisting it helps skipping
50 : /// recovery in walproposer, generally we compute it from peers. In
51 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
52 : /// only by walproposer.
53 : pub peer_horizon_lsn: Lsn,
54 : /// LSN of the oldest known checkpoint made by pageserver and successfully
55 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
56 : /// informational purposes, we receive it from pageserver (or broker).
57 : pub remote_consistent_lsn: Lsn,
58 : /// Peers and their state as we remember it. Knowing peers themselves is
59 : /// fundamental; but state is saved here only for informational purposes and
60 : /// obviously can be stale. (Currently not saved at all, but let's provision
61 : /// place to have less file version upgrades).
62 : pub peers: PersistedPeers,
63 : /// Holds names of partial segments uploaded to remote storage. Used to
64 : /// clean up old objects without leaving garbage in remote storage.
65 : pub partial_backup: wal_backup_partial::State,
66 : /// Eviction state of the timeline. If it's Offloaded, we should download
67 : /// WAL files from remote storage to serve the timeline.
68 : pub eviction_state: EvictionState,
69 : }
70 :
71 8 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
73 :
74 : /// State of the local WAL files. Used to track current timeline state,
75 : /// that can be either WAL files are present on disk or last partial segment
76 : /// is offloaded to remote storage.
77 4 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
78 : pub enum EvictionState {
79 : /// WAL files are present on disk.
80 : Present,
81 : /// Last partial segment is offloaded to remote storage.
82 : /// Contains flush_lsn of the last offloaded segment.
83 : Offloaded(Lsn),
84 : }
85 :
86 : impl TimelinePersistentState {
87 11507 : pub fn new(
88 11507 : ttid: &TenantTimelineId,
89 11507 : server_info: ServerInfo,
90 11507 : peers: Vec<NodeId>,
91 11507 : commit_lsn: Lsn,
92 11507 : local_start_lsn: Lsn,
93 11507 : ) -> TimelinePersistentState {
94 11507 : TimelinePersistentState {
95 11507 : tenant_id: ttid.tenant_id,
96 11507 : timeline_id: ttid.timeline_id,
97 11507 : acceptor_state: AcceptorState {
98 11507 : term: 0,
99 11507 : term_history: TermHistory::empty(),
100 11507 : },
101 11507 : server: server_info,
102 11507 : proposer_uuid: [0; 16],
103 11507 : timeline_start_lsn: Lsn(0),
104 11507 : local_start_lsn,
105 11507 : commit_lsn,
106 11507 : backup_lsn: local_start_lsn,
107 11507 : peer_horizon_lsn: local_start_lsn,
108 11507 : remote_consistent_lsn: Lsn(0),
109 11507 : peers: PersistedPeers(
110 11507 : peers
111 11507 : .iter()
112 11507 : .map(|p| (*p, PersistedPeerInfo::new()))
113 11507 : .collect(),
114 11507 : ),
115 11507 : partial_backup: wal_backup_partial::State::default(),
116 11507 : eviction_state: EvictionState::Present,
117 11507 : }
118 11507 : }
119 :
120 : #[cfg(test)]
121 8 : pub fn empty() -> Self {
122 8 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
123 8 :
124 8 : TimelinePersistentState::new(
125 8 : &TenantTimelineId::empty(),
126 8 : ServerInfo {
127 8 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
128 8 : system_id: 0, /* Postgres system identifier */
129 8 : wal_seg_size: 0,
130 8 : },
131 8 : vec![],
132 8 : Lsn::INVALID,
133 8 : Lsn::INVALID,
134 8 : )
135 8 : }
136 : }
137 :
138 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
139 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
140 : // are not flushed yet.
141 : pub struct TimelineMemState {
142 : pub commit_lsn: Lsn,
143 : pub backup_lsn: Lsn,
144 : pub peer_horizon_lsn: Lsn,
145 : pub remote_consistent_lsn: Lsn,
146 : #[serde(with = "hex")]
147 : pub proposer_uuid: PgUuid,
148 : }
149 :
150 : /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
151 : /// when we update fields like commit_lsn which don't need immediate
152 : /// persistence. Provides transactional like API to atomically update the state.
153 : ///
154 : /// Implements Deref into *persistent* part.
155 : pub struct TimelineState<CTRL: control_file::Storage> {
156 : pub inmem: TimelineMemState,
157 : pub pers: CTRL, // persistent
158 : }
159 :
160 : impl<CTRL> TimelineState<CTRL>
161 : where
162 : CTRL: control_file::Storage,
163 : {
164 71280 : pub fn new(state: CTRL) -> Self {
165 71280 : TimelineState {
166 71280 : inmem: TimelineMemState {
167 71280 : commit_lsn: state.commit_lsn,
168 71280 : backup_lsn: state.backup_lsn,
169 71280 : peer_horizon_lsn: state.peer_horizon_lsn,
170 71280 : remote_consistent_lsn: state.remote_consistent_lsn,
171 71280 : proposer_uuid: state.proposer_uuid,
172 71280 : },
173 71280 : pers: state,
174 71280 : }
175 71280 : }
176 :
177 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
178 : /// values applied; the protocol is to 1) change returned struct as desired
179 : /// 2) atomically persist it with finish_change.
180 31328 : pub fn start_change(&self) -> TimelinePersistentState {
181 31328 : let mut s = self.pers.clone();
182 31328 : s.commit_lsn = self.inmem.commit_lsn;
183 31328 : s.backup_lsn = self.inmem.backup_lsn;
184 31328 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
185 31328 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
186 31328 : s.proposer_uuid = self.inmem.proposer_uuid;
187 31328 : s
188 31328 : }
189 :
190 : /// Persist given state. c.f. start_change.
191 31328 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
192 31328 : self.pers.persist(s).await?;
193 : // keep in memory values up to date
194 31328 : self.inmem.commit_lsn = s.commit_lsn;
195 31328 : self.inmem.backup_lsn = s.backup_lsn;
196 31328 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
197 31328 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
198 31328 : self.inmem.proposer_uuid = s.proposer_uuid;
199 31328 : Ok(())
200 31328 : }
201 :
202 : /// Flush in memory values.
203 880 : pub async fn flush(&mut self) -> Result<()> {
204 880 : let s = self.start_change();
205 880 : self.finish_change(&s).await
206 880 : }
207 : }
208 :
209 : impl<CTRL> Deref for TimelineState<CTRL>
210 : where
211 : CTRL: control_file::Storage,
212 : {
213 : type Target = TimelinePersistentState;
214 :
215 1380448 : fn deref(&self) -> &Self::Target {
216 1380448 : &self.pers
217 1380448 : }
218 : }
|