Line data Source code
1 : //! Defines per timeline data stored persistently (SafeKeeperPersistentState)
2 : //! and its wrapper with in memory layer (SafekeeperState).
3 :
4 : use std::ops::Deref;
5 :
6 : use anyhow::Result;
7 : use serde::{Deserialize, Serialize};
8 : use utils::{
9 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
10 : lsn::Lsn,
11 : };
12 :
13 : use crate::{
14 : control_file,
15 : safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
16 : };
17 :
18 : /// Persistent information stored on safekeeper node about timeline.
19 : /// On disk data is prefixed by magic and format version and followed by checksum.
20 12875 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
21 : pub struct TimelinePersistentState {
22 : #[serde(with = "hex")]
23 : pub tenant_id: TenantId,
24 : #[serde(with = "hex")]
25 : pub timeline_id: TimelineId,
26 : /// persistent acceptor state
27 : pub acceptor_state: AcceptorState,
28 : /// information about server
29 : pub server: ServerInfo,
30 : /// Unique id of the last *elected* proposer we dealt with. Not needed
31 : /// for correctness, exists for monitoring purposes.
32 : #[serde(with = "hex")]
33 : pub proposer_uuid: PgUuid,
34 : /// Since which LSN this timeline generally starts. Safekeeper might have
35 : /// joined later.
36 : pub timeline_start_lsn: Lsn,
37 : /// Since which LSN safekeeper has (had) WAL for this timeline.
38 : /// All WAL segments next to one containing local_start_lsn are
39 : /// filled with data from the beginning.
40 : pub local_start_lsn: Lsn,
41 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
42 : /// to record boundary.
43 : pub commit_lsn: Lsn,
44 : /// LSN that points to the end of the last backed up segment. Useful to
45 : /// persist to avoid finding out offloading progress on boot.
46 : pub backup_lsn: Lsn,
47 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
48 : /// of last record streamed to everyone). Persisting it helps skipping
49 : /// recovery in walproposer, generally we compute it from peers. In
50 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
51 : /// only by walproposer.
52 : pub peer_horizon_lsn: Lsn,
53 : /// LSN of the oldest known checkpoint made by pageserver and successfully
54 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
55 : /// informational purposes, we receive it from pageserver (or broker).
56 : pub remote_consistent_lsn: Lsn,
57 : // Peers and their state as we remember it. Knowing peers themselves is
58 : // fundamental; but state is saved here only for informational purposes and
59 : // obviously can be stale. (Currently not saved at all, but let's provision
60 : // place to have less file version upgrades).
61 : pub peers: PersistedPeers,
62 : }
63 :
64 12875 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
66 :
67 : impl TimelinePersistentState {
68 535 : pub fn new(
69 535 : ttid: &TenantTimelineId,
70 535 : server_info: ServerInfo,
71 535 : peers: Vec<NodeId>,
72 535 : commit_lsn: Lsn,
73 535 : local_start_lsn: Lsn,
74 535 : ) -> TimelinePersistentState {
75 535 : TimelinePersistentState {
76 535 : tenant_id: ttid.tenant_id,
77 535 : timeline_id: ttid.timeline_id,
78 535 : acceptor_state: AcceptorState {
79 535 : term: 0,
80 535 : term_history: TermHistory::empty(),
81 535 : },
82 535 : server: server_info,
83 535 : proposer_uuid: [0; 16],
84 535 : timeline_start_lsn: Lsn(0),
85 535 : local_start_lsn,
86 535 : commit_lsn,
87 535 : backup_lsn: local_start_lsn,
88 535 : peer_horizon_lsn: local_start_lsn,
89 535 : remote_consistent_lsn: Lsn(0),
90 535 : peers: PersistedPeers(
91 535 : peers
92 535 : .iter()
93 535 : .map(|p| (*p, PersistedPeerInfo::new()))
94 535 : .collect(),
95 535 : ),
96 535 : }
97 535 : }
98 :
99 : #[cfg(test)]
100 8 : pub fn empty() -> Self {
101 8 : use crate::safekeeper::UNKNOWN_SERVER_VERSION;
102 8 :
103 8 : TimelinePersistentState::new(
104 8 : &TenantTimelineId::empty(),
105 8 : ServerInfo {
106 8 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
107 8 : system_id: 0, /* Postgres system identifier */
108 8 : wal_seg_size: 0,
109 8 : },
110 8 : vec![],
111 8 : Lsn::INVALID,
112 8 : Lsn::INVALID,
113 8 : )
114 8 : }
115 : }
116 :
117 2870 : #[derive(Debug, Clone, Serialize, Deserialize)]
118 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
119 : // are not flushed yet.
120 : pub struct TimelineMemState {
121 : pub commit_lsn: Lsn,
122 : pub backup_lsn: Lsn,
123 : pub peer_horizon_lsn: Lsn,
124 : pub remote_consistent_lsn: Lsn,
125 : #[serde(with = "hex")]
126 : pub proposer_uuid: PgUuid,
127 : }
128 :
129 : /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
130 : /// when we update fields like commit_lsn which don't need immediate
131 : /// persistence. Provides transactional like API to atomically update the state.
132 : ///
133 : /// Implements Deref into *persistent* part.
134 : pub struct TimelineState<CTRL: control_file::Storage> {
135 : pub inmem: TimelineMemState,
136 : pub pers: CTRL, // persistent
137 : }
138 :
139 : impl<CTRL> TimelineState<CTRL>
140 : where
141 : CTRL: control_file::Storage,
142 : {
143 618 : pub fn new(state: CTRL) -> Self {
144 618 : TimelineState {
145 618 : inmem: TimelineMemState {
146 618 : commit_lsn: state.commit_lsn,
147 618 : backup_lsn: state.backup_lsn,
148 618 : peer_horizon_lsn: state.peer_horizon_lsn,
149 618 : remote_consistent_lsn: state.remote_consistent_lsn,
150 618 : proposer_uuid: state.proposer_uuid,
151 618 : },
152 618 : pers: state,
153 618 : }
154 618 : }
155 :
156 : /// Start atomic change. Returns SafeKeeperPersistentState with in memory
157 : /// values applied; the protocol is to 1) change returned struct as desired
158 : /// 2) atomically persist it with finish_change.
159 4951 : pub fn start_change(&self) -> TimelinePersistentState {
160 4951 : let mut s = self.pers.clone();
161 4951 : s.commit_lsn = self.inmem.commit_lsn;
162 4951 : s.backup_lsn = self.inmem.backup_lsn;
163 4951 : s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
164 4951 : s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
165 4951 : s.proposer_uuid = self.inmem.proposer_uuid;
166 4951 : s
167 4951 : }
168 :
169 : /// Persist given state. c.f. start_change.
170 4951 : pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
171 14875 : self.pers.persist(s).await?;
172 : // keep in memory values up to date
173 4951 : self.inmem.commit_lsn = s.commit_lsn;
174 4951 : self.inmem.backup_lsn = s.backup_lsn;
175 4951 : self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
176 4951 : self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
177 4951 : self.inmem.proposer_uuid = s.proposer_uuid;
178 4951 : Ok(())
179 4951 : }
180 :
181 : /// Flush in memory values.
182 1806 : pub async fn flush(&mut self) -> Result<()> {
183 1806 : let s = self.start_change();
184 5506 : self.finish_change(&s).await
185 1806 : }
186 : }
187 :
188 : impl<CTRL> Deref for TimelineState<CTRL>
189 : where
190 : CTRL: control_file::Storage,
191 : {
192 : type Target = TimelinePersistentState;
193 :
194 29917445 : fn deref(&self) -> &Self::Target {
195 29917445 : &self.pers
196 29917445 : }
197 : }
|