TLA Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use std::time::Duration;
14 : use storage_broker::proto::SafekeeperTimelineInfo;
15 :
16 : use tracing::*;
17 :
18 : use crate::control_file;
19 : use crate::send_wal::HotStandbyFeedback;
20 :
21 : use crate::wal_storage;
22 : use pq_proto::SystemId;
23 : use utils::pageserver_feedback::PageserverFeedback;
24 : use utils::{
25 : bin_ser::LeSer,
26 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
27 : lsn::Lsn,
28 : };
29 :
30 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
31 : pub const SK_FORMAT_VERSION: u32 = 7;
32 : const SK_PROTOCOL_VERSION: u32 = 2;
33 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
34 :
35 : /// Consensus logical timestamp.
36 : pub type Term = u64;
37 : pub const INVALID_TERM: Term = 0;
38 :
39 CBC 8907 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
40 : pub struct TermLsn {
41 : pub term: Term,
42 : pub lsn: Lsn,
43 : }
44 :
45 : // Creation from tuple provides less typing (e.g. for unit tests).
46 : impl From<(Term, Lsn)> for TermLsn {
47 3274993 : fn from(pair: (Term, Lsn)) -> TermLsn {
48 3274993 : TermLsn {
49 3274993 : term: pair.0,
50 3274993 : lsn: pair.1,
51 3274993 : }
52 3274993 : }
53 : }
54 :
55 13927 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
56 : pub struct TermHistory(pub Vec<TermLsn>);
57 :
58 : impl TermHistory {
59 501 : pub fn empty() -> TermHistory {
60 501 : TermHistory(Vec::new())
61 501 : }
62 :
63 : // Parse TermHistory as n_entries followed by TermLsn pairs
64 830 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
65 830 : if bytes.remaining() < 4 {
66 UBC 0 : bail!("TermHistory misses len");
67 CBC 830 : }
68 830 : let n_entries = bytes.get_u32_le();
69 830 : let mut res = Vec::with_capacity(n_entries as usize);
70 830 : for _ in 0..n_entries {
71 3499 : if bytes.remaining() < 16 {
72 UBC 0 : bail!("TermHistory is incomplete");
73 CBC 3499 : }
74 3499 : res.push(TermLsn {
75 3499 : term: bytes.get_u64_le(),
76 3499 : lsn: bytes.get_u64_le().into(),
77 3499 : })
78 : }
79 830 : Ok(TermHistory(res))
80 830 : }
81 :
82 : /// Return copy of self with switches happening strictly after up_to
83 : /// truncated.
84 23337 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
85 23337 : let mut res = Vec::with_capacity(self.0.len());
86 62048 : for e in &self.0 {
87 38746 : if e.lsn > up_to {
88 35 : break;
89 38711 : }
90 38711 : res.push(*e);
91 : }
92 23337 : TermHistory(res)
93 23337 : }
94 :
95 : /// Find point of divergence between leader (walproposer) term history and
96 : /// safekeeper. Arguments are not symmetrics as proposer history ends at
97 : /// +infinity while safekeeper at flush_lsn.
98 : /// C version is at walproposer SendProposerElected.
99 5 : pub fn find_highest_common_point(
100 5 : prop_th: &TermHistory,
101 5 : sk_th: &TermHistory,
102 5 : sk_wal_end: Lsn,
103 5 : ) -> Option<TermLsn> {
104 5 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
105 :
106 5 : if let Some(sk_th_last) = sk_th.last() {
107 5 : assert!(
108 5 : sk_th_last.lsn <= sk_wal_end,
109 UBC 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
110 : sk_th_last,
111 : sk_wal_end
112 : );
113 0 : }
114 :
115 : // find last common term, if any...
116 CBC 5 : let mut last_common_idx = None;
117 9 : for i in 0..min(sk_th.len(), prop_th.len()) {
118 9 : if prop_th[i].term != sk_th[i].term {
119 2 : break;
120 7 : }
121 : // If term is the same, LSN must be equal as well.
122 7 : assert!(
123 7 : prop_th[i].lsn == sk_th[i].lsn,
124 UBC 0 : "same term {} has different start LSNs: prop {}, sk {}",
125 0 : prop_th[i].term,
126 0 : prop_th[i].lsn,
127 0 : sk_th[i].lsn
128 : );
129 CBC 7 : last_common_idx = Some(i);
130 : }
131 5 : let last_common_idx = match last_common_idx {
132 1 : None => return None, // no common point
133 4 : Some(lci) => lci,
134 4 : };
135 4 : // Now find where it ends at both prop and sk and take min. End of
136 4 : // (common) term is the start of the next except it is the last one;
137 4 : // there it is flush_lsn in case of safekeeper or, in case of proposer
138 4 : // +infinity, so we just take flush_lsn then.
139 4 : if last_common_idx == prop_th.len() - 1 {
140 1 : Some(TermLsn {
141 1 : term: prop_th[last_common_idx].term,
142 1 : lsn: sk_wal_end,
143 1 : })
144 : } else {
145 3 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
146 3 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
147 1 : sk_th[last_common_idx + 1].lsn
148 : } else {
149 2 : sk_wal_end
150 : };
151 3 : Some(TermLsn {
152 3 : term: prop_th[last_common_idx].term,
153 3 : lsn: min(prop_common_term_end, sk_common_term_end),
154 3 : })
155 : }
156 5 : }
157 : }
158 :
159 : /// Display only latest entries for Debug.
160 : impl fmt::Debug for TermHistory {
161 2577 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
162 2577 : let n_printed = 20;
163 2577 : write!(
164 2577 : fmt,
165 2577 : "{}{:?}",
166 2577 : if self.0.len() > n_printed { "... " } else { "" },
167 2577 : self.0
168 2577 : .iter()
169 2577 : .rev()
170 2577 : .take(n_printed)
171 6567 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
172 2577 : .collect::<Vec<_>>()
173 2577 : )
174 2577 : }
175 : }
176 :
177 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
178 : pub type PgUuid = [u8; 16];
179 :
180 : /// Persistent consensus state of the acceptor.
181 13092 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182 : pub struct AcceptorState {
183 : /// acceptor's last term it voted for (advanced in 1 phase)
184 : pub term: Term,
185 : /// History of term switches for safekeeper's WAL.
186 : /// Actually it often goes *beyond* WAL contents as we adopt term history
187 : /// from the proposer before recovery.
188 : pub term_history: TermHistory,
189 : }
190 :
191 : impl AcceptorState {
192 : /// acceptor's epoch is the term of the highest entry in the log
193 21589 : pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
194 21589 : let th = self.term_history.up_to(flush_lsn);
195 21589 : match th.0.last() {
196 21098 : Some(e) => e.term,
197 491 : None => 0,
198 : }
199 21589 : }
200 : }
201 :
202 : /// Information about Postgres. Safekeeper gets it once and then verifies
203 : /// all further connections from computes match.
204 13140 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
205 : pub struct ServerInfo {
206 : /// Postgres server version
207 : pub pg_version: u32,
208 : pub system_id: SystemId,
209 : pub wal_seg_size: u32,
210 : }
211 :
212 4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
213 : pub struct PersistedPeerInfo {
214 : /// LSN up to which safekeeper offloaded WAL to s3.
215 : pub backup_lsn: Lsn,
216 : /// Term of the last entry.
217 : pub term: Term,
218 : /// LSN of the last record.
219 : pub flush_lsn: Lsn,
220 : /// Up to which LSN safekeeper regards its WAL as committed.
221 : pub commit_lsn: Lsn,
222 : }
223 :
224 : impl PersistedPeerInfo {
225 UBC 0 : fn new() -> Self {
226 0 : Self {
227 0 : backup_lsn: Lsn::INVALID,
228 0 : term: INVALID_TERM,
229 0 : flush_lsn: Lsn(0),
230 0 : commit_lsn: Lsn(0),
231 0 : }
232 0 : }
233 : }
234 :
235 CBC 13092 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
236 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
237 :
238 : /// Persistent information stored on safekeeper node
239 : /// On disk data is prefixed by magic and format version and followed by checksum.
240 13092 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
241 : pub struct SafeKeeperState {
242 : #[serde(with = "hex")]
243 : pub tenant_id: TenantId,
244 : #[serde(with = "hex")]
245 : pub timeline_id: TimelineId,
246 : /// persistent acceptor state
247 : pub acceptor_state: AcceptorState,
248 : /// information about server
249 : pub server: ServerInfo,
250 : /// Unique id of the last *elected* proposer we dealt with. Not needed
251 : /// for correctness, exists for monitoring purposes.
252 : #[serde(with = "hex")]
253 : pub proposer_uuid: PgUuid,
254 : /// Since which LSN this timeline generally starts. Safekeeper might have
255 : /// joined later.
256 : pub timeline_start_lsn: Lsn,
257 : /// Since which LSN safekeeper has (had) WAL for this timeline.
258 : /// All WAL segments next to one containing local_start_lsn are
259 : /// filled with data from the beginning.
260 : pub local_start_lsn: Lsn,
261 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
262 : /// to record boundary.
263 : pub commit_lsn: Lsn,
264 : /// LSN that points to the end of the last backed up segment. Useful to
265 : /// persist to avoid finding out offloading progress on boot.
266 : pub backup_lsn: Lsn,
267 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
268 : /// of last record streamed to everyone). Persisting it helps skipping
269 : /// recovery in walproposer, generally we compute it from peers. In
270 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
271 : /// only by walproposer.
272 : pub peer_horizon_lsn: Lsn,
273 : /// LSN of the oldest known checkpoint made by pageserver and successfully
274 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
275 : /// informational purposes, we receive it from pageserver (or broker).
276 : pub remote_consistent_lsn: Lsn,
277 : // Peers and their state as we remember it. Knowing peers themselves is
278 : // fundamental; but state is saved here only for informational purposes and
279 : // obviously can be stale. (Currently not saved at all, but let's provision
280 : // place to have less file version upgrades).
281 : pub peers: PersistedPeers,
282 : }
283 :
284 2792 : #[derive(Debug, Clone, Serialize, Deserialize)]
285 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
286 : // are not flushed yet.
287 : pub struct SafekeeperMemState {
288 : pub commit_lsn: Lsn,
289 : pub backup_lsn: Lsn,
290 : pub peer_horizon_lsn: Lsn,
291 : #[serde(with = "hex")]
292 : pub proposer_uuid: PgUuid,
293 : }
294 :
295 : impl SafeKeeperState {
296 501 : pub fn new(
297 501 : ttid: &TenantTimelineId,
298 501 : server_info: ServerInfo,
299 501 : peers: Vec<NodeId>,
300 501 : commit_lsn: Lsn,
301 501 : local_start_lsn: Lsn,
302 501 : ) -> SafeKeeperState {
303 501 : SafeKeeperState {
304 501 : tenant_id: ttid.tenant_id,
305 501 : timeline_id: ttid.timeline_id,
306 501 : acceptor_state: AcceptorState {
307 501 : term: 0,
308 501 : term_history: TermHistory::empty(),
309 501 : },
310 501 : server: server_info,
311 501 : proposer_uuid: [0; 16],
312 501 : timeline_start_lsn: Lsn(0),
313 501 : local_start_lsn,
314 501 : commit_lsn,
315 501 : backup_lsn: local_start_lsn,
316 501 : peer_horizon_lsn: local_start_lsn,
317 501 : remote_consistent_lsn: Lsn(0),
318 501 : peers: PersistedPeers(
319 501 : peers
320 501 : .iter()
321 501 : .map(|p| (*p, PersistedPeerInfo::new()))
322 501 : .collect(),
323 501 : ),
324 501 : }
325 501 : }
326 :
327 : #[cfg(test)]
328 4 : pub fn empty() -> Self {
329 4 : SafeKeeperState::new(
330 4 : &TenantTimelineId::empty(),
331 4 : ServerInfo {
332 4 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
333 4 : system_id: 0, /* Postgres system identifier */
334 4 : wal_seg_size: 0,
335 4 : },
336 4 : vec![],
337 4 : Lsn::INVALID,
338 4 : Lsn::INVALID,
339 4 : )
340 4 : }
341 : }
342 :
343 : // protocol messages
344 :
345 : /// Initial Proposer -> Acceptor message
346 1745 : #[derive(Debug, Deserialize)]
347 : pub struct ProposerGreeting {
348 : /// proposer-acceptor protocol version
349 : pub protocol_version: u32,
350 : /// Postgres server version
351 : pub pg_version: u32,
352 : pub proposer_id: PgUuid,
353 : pub system_id: SystemId,
354 : pub timeline_id: TimelineId,
355 : pub tenant_id: TenantId,
356 : pub tli: TimeLineID,
357 : pub wal_seg_size: u32,
358 : }
359 :
360 : /// Acceptor -> Proposer initial response: the highest term known to me
361 : /// (acceptor voted for).
362 UBC 0 : #[derive(Debug, Serialize)]
363 : pub struct AcceptorGreeting {
364 : term: u64,
365 : node_id: NodeId,
366 : }
367 :
368 : /// Vote request sent from proposer to safekeepers
369 CBC 1742 : #[derive(Debug, Deserialize)]
370 : pub struct VoteRequest {
371 : pub term: Term,
372 : }
373 :
374 : /// Vote itself, sent from safekeeper to proposer
375 1743 : #[derive(Debug, Serialize)]
376 : pub struct VoteResponse {
377 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
378 : vote_given: u64, // fixme u64 due to padding
379 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
380 : // proposer to choose the most advanced one.
381 : pub flush_lsn: Lsn,
382 : truncate_lsn: Lsn,
383 : pub term_history: TermHistory,
384 : timeline_start_lsn: Lsn,
385 : }
386 :
387 : /*
388 : * Proposer -> Acceptor message announcing proposer is elected and communicating
389 : * term history to it.
390 : */
391 834 : #[derive(Debug)]
392 : pub struct ProposerElected {
393 : pub term: Term,
394 : pub start_streaming_at: Lsn,
395 : pub term_history: TermHistory,
396 : pub timeline_start_lsn: Lsn,
397 : }
398 :
399 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
400 : /// communicates commit_lsn.
401 UBC 0 : #[derive(Debug)]
402 : pub struct AppendRequest {
403 : pub h: AppendRequestHeader,
404 : pub wal_data: Bytes,
405 : }
406 CBC 1975841 : #[derive(Debug, Clone, Deserialize)]
407 : pub struct AppendRequestHeader {
408 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
409 : pub term: Term,
410 : // TODO: remove this field, it in unused -- LSN of term switch can be taken
411 : // from ProposerElected (as well as from term history).
412 : pub epoch_start_lsn: Lsn,
413 : /// start position of message in WAL
414 : pub begin_lsn: Lsn,
415 : /// end position of message in WAL
416 : pub end_lsn: Lsn,
417 : /// LSN committed by quorum of safekeepers
418 : pub commit_lsn: Lsn,
419 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
420 : pub truncate_lsn: Lsn,
421 : // only for logging/debugging
422 : pub proposer_uuid: PgUuid,
423 : }
424 :
425 : /// Report safekeeper state to proposer
426 3 : #[derive(Debug, Serialize)]
427 : pub struct AppendResponse {
428 : // Current term of the safekeeper; if it is higher than proposer's, the
429 : // compute is out of date.
430 : pub term: Term,
431 : // NOTE: this is physical end of wal on safekeeper; currently it doesn't
432 : // make much sense without taking epoch into account, as history can be
433 : // diverged.
434 : pub flush_lsn: Lsn,
435 : // We report back our awareness about which WAL is committed, as this is
436 : // a criterion for walproposer --sync mode exit
437 : pub commit_lsn: Lsn,
438 : pub hs_feedback: HotStandbyFeedback,
439 : pub pageserver_feedback: PageserverFeedback,
440 : }
441 :
442 : impl AppendResponse {
443 1 : fn term_only(term: Term) -> AppendResponse {
444 1 : AppendResponse {
445 1 : term,
446 1 : flush_lsn: Lsn(0),
447 1 : commit_lsn: Lsn(0),
448 1 : hs_feedback: HotStandbyFeedback::empty(),
449 1 : pageserver_feedback: PageserverFeedback::empty(),
450 1 : }
451 1 : }
452 : }
453 :
454 : /// Proposer -> Acceptor messages
455 UBC 0 : #[derive(Debug)]
456 : pub enum ProposerAcceptorMessage {
457 : Greeting(ProposerGreeting),
458 : VoteRequest(VoteRequest),
459 : Elected(ProposerElected),
460 : AppendRequest(AppendRequest),
461 : NoFlushAppendRequest(AppendRequest),
462 : FlushWAL,
463 : }
464 :
465 : impl ProposerAcceptorMessage {
466 : /// Parse proposer message.
467 CBC 1980158 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
468 1980158 : // xxx using Reader is inefficient but easy to work with bincode
469 1980158 : let mut stream = msg_bytes.reader();
470 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
471 1980158 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
472 1980158 : match tag {
473 : 'g' => {
474 1745 : let msg = ProposerGreeting::des_from(&mut stream)?;
475 1745 : Ok(ProposerAcceptorMessage::Greeting(msg))
476 : }
477 : 'v' => {
478 1742 : let msg = VoteRequest::des_from(&mut stream)?;
479 1742 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
480 : }
481 : 'e' => {
482 830 : let mut msg_bytes = stream.into_inner();
483 830 : if msg_bytes.remaining() < 16 {
484 UBC 0 : bail!("ProposerElected message is not complete");
485 CBC 830 : }
486 830 : let term = msg_bytes.get_u64_le();
487 830 : let start_streaming_at = msg_bytes.get_u64_le().into();
488 830 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
489 830 : if msg_bytes.remaining() < 8 {
490 UBC 0 : bail!("ProposerElected message is not complete");
491 CBC 830 : }
492 830 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
493 830 : let msg = ProposerElected {
494 830 : term,
495 830 : start_streaming_at,
496 830 : timeline_start_lsn,
497 830 : term_history,
498 830 : };
499 830 : Ok(ProposerAcceptorMessage::Elected(msg))
500 : }
501 : 'a' => {
502 : // read header followed by wal data
503 1975841 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
504 1975841 : let rec_size = hdr
505 1975841 : .end_lsn
506 1975841 : .checked_sub(hdr.begin_lsn)
507 1975841 : .context("begin_lsn > end_lsn in AppendRequest")?
508 : .0 as usize;
509 1975841 : if rec_size > MAX_SEND_SIZE {
510 UBC 0 : bail!(
511 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
512 0 : MAX_SEND_SIZE
513 0 : );
514 CBC 1975841 : }
515 1975841 :
516 1975841 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
517 1975841 : stream.read_exact(&mut wal_data_vec)?;
518 1975841 : let wal_data = Bytes::from(wal_data_vec);
519 1975841 : let msg = AppendRequest { h: hdr, wal_data };
520 1975841 :
521 1975841 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
522 : }
523 UBC 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
524 : }
525 CBC 1980158 : }
526 : }
527 :
528 : /// Acceptor -> Proposer messages
529 UBC 0 : #[derive(Debug)]
530 : pub enum AcceptorProposerMessage {
531 : Greeting(AcceptorGreeting),
532 : VoteResponse(VoteResponse),
533 : AppendResponse(AppendResponse),
534 : }
535 :
536 : impl AcceptorProposerMessage {
537 : /// Serialize acceptor -> proposer message.
538 CBC 1297710 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
539 1297710 : match self {
540 1744 : AcceptorProposerMessage::Greeting(msg) => {
541 1744 : buf.put_u64_le('g' as u64);
542 1744 : buf.put_u64_le(msg.term);
543 1744 : buf.put_u64_le(msg.node_id.0);
544 1744 : }
545 1740 : AcceptorProposerMessage::VoteResponse(msg) => {
546 1740 : buf.put_u64_le('v' as u64);
547 1740 : buf.put_u64_le(msg.term);
548 1740 : buf.put_u64_le(msg.vote_given);
549 1740 : buf.put_u64_le(msg.flush_lsn.into());
550 1740 : buf.put_u64_le(msg.truncate_lsn.into());
551 1740 : buf.put_u32_le(msg.term_history.0.len() as u32);
552 5584 : for e in &msg.term_history.0 {
553 3844 : buf.put_u64_le(e.term);
554 3844 : buf.put_u64_le(e.lsn.into());
555 3844 : }
556 1740 : buf.put_u64_le(msg.timeline_start_lsn.into());
557 : }
558 1294226 : AcceptorProposerMessage::AppendResponse(msg) => {
559 1294226 : buf.put_u64_le('a' as u64);
560 1294226 : buf.put_u64_le(msg.term);
561 1294226 : buf.put_u64_le(msg.flush_lsn.into());
562 1294226 : buf.put_u64_le(msg.commit_lsn.into());
563 1294226 : buf.put_i64_le(msg.hs_feedback.ts);
564 1294226 : buf.put_u64_le(msg.hs_feedback.xmin);
565 1294226 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
566 1294226 :
567 1294226 : msg.pageserver_feedback.serialize(buf);
568 1294226 : }
569 : }
570 :
571 1297710 : Ok(())
572 1297710 : }
573 : }
574 :
575 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
576 : /// It controls all WAL disk writes and updates of control file.
577 : ///
578 : /// Currently safekeeper processes:
579 : /// - messages from compute (proposers) and provides replies
580 : /// - messages from broker peers
581 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
582 : /// LSN since the proposer safekeeper currently talking to appends WAL;
583 : /// determines epoch switch point.
584 : pub epoch_start_lsn: Lsn,
585 :
586 : pub inmem: SafekeeperMemState, // in memory part
587 : pub state: CTRL, // persistent state storage
588 :
589 : pub wal_store: WAL,
590 :
591 : node_id: NodeId, // safekeeper's node id
592 : }
593 :
594 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
595 : where
596 : CTRL: control_file::Storage,
597 : WAL: wal_storage::Storage,
598 : {
599 : /// Accepts a control file storage containing the safekeeper state.
600 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
601 : /// and `server` (`wal_seg_size` inside it) fields.
602 585 : pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
603 585 : if state.tenant_id == TenantId::from([0u8; 16])
604 585 : || state.timeline_id == TimelineId::from([0u8; 16])
605 : {
606 UBC 0 : bail!(
607 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
608 0 : state.tenant_id,
609 0 : state.timeline_id
610 0 : );
611 CBC 585 : }
612 585 :
613 585 : Ok(SafeKeeper {
614 585 : epoch_start_lsn: Lsn(0),
615 585 : inmem: SafekeeperMemState {
616 585 : commit_lsn: state.commit_lsn,
617 585 : backup_lsn: state.backup_lsn,
618 585 : peer_horizon_lsn: state.peer_horizon_lsn,
619 585 : proposer_uuid: state.proposer_uuid,
620 585 : },
621 585 : state,
622 585 : wal_store,
623 585 : node_id,
624 585 : })
625 585 : }
626 :
627 : /// Get history of term switches for the available WAL
628 1745 : fn get_term_history(&self) -> TermHistory {
629 1745 : self.state
630 1745 : .acceptor_state
631 1745 : .term_history
632 1745 : .up_to(self.flush_lsn())
633 1745 : }
634 :
635 : /// Get current term.
636 3274526 : pub fn get_term(&self) -> Term {
637 3274526 : self.state.acceptor_state.term
638 3274526 : }
639 :
640 21342 : pub fn get_epoch(&self) -> Term {
641 21342 : self.state.acceptor_state.get_epoch(self.flush_lsn())
642 21342 : }
643 :
644 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
645 6587367 : pub fn flush_lsn(&self) -> Lsn {
646 6587367 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
647 6587367 : }
648 :
649 : /// Process message from proposer and possibly form reply. Concurrent
650 : /// callers must exclude each other.
651 3274399 : pub async fn process_msg(
652 3274399 : &mut self,
653 3274399 : msg: &ProposerAcceptorMessage,
654 3274399 : ) -> Result<Option<AcceptorProposerMessage>> {
655 3274399 : match msg {
656 1745 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
657 5065 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
658 1491227 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
659 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
660 21 : self.handle_append_request(msg, true).await
661 : }
662 1975841 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
663 2487787 : self.handle_append_request(msg, false).await
664 : }
665 1294228 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
666 : }
667 3274398 : }
668 :
669 : /// Handle initial message from proposer: check its sanity and send my
670 : /// current term.
671 1745 : async fn handle_greeting(
672 1745 : &mut self,
673 1745 : msg: &ProposerGreeting,
674 1745 : ) -> Result<Option<AcceptorProposerMessage>> {
675 1745 : // Check protocol compatibility
676 1745 : if msg.protocol_version != SK_PROTOCOL_VERSION {
677 UBC 0 : bail!(
678 0 : "incompatible protocol version {}, expected {}",
679 0 : msg.protocol_version,
680 0 : SK_PROTOCOL_VERSION
681 0 : );
682 CBC 1745 : }
683 1745 : /* Postgres major version mismatch is treated as fatal error
684 1745 : * because safekeepers parse WAL headers and the format
685 1745 : * may change between versions.
686 1745 : */
687 1745 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
688 UBC 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
689 : {
690 0 : bail!(
691 0 : "incompatible server version {}, expected {}",
692 0 : msg.pg_version,
693 0 : self.state.server.pg_version
694 0 : );
695 CBC 1745 : }
696 1745 :
697 1745 : if msg.tenant_id != self.state.tenant_id {
698 UBC 0 : bail!(
699 0 : "invalid tenant ID, got {}, expected {}",
700 0 : msg.tenant_id,
701 0 : self.state.tenant_id
702 0 : );
703 CBC 1745 : }
704 1745 : if msg.timeline_id != self.state.timeline_id {
705 UBC 0 : bail!(
706 0 : "invalid timeline ID, got {}, expected {}",
707 0 : msg.timeline_id,
708 0 : self.state.timeline_id
709 0 : );
710 CBC 1745 : }
711 1745 : if self.state.server.wal_seg_size != msg.wal_seg_size {
712 UBC 0 : bail!(
713 0 : "invalid wal_seg_size, got {}, expected {}",
714 0 : msg.wal_seg_size,
715 0 : self.state.server.wal_seg_size
716 0 : );
717 CBC 1745 : }
718 1745 :
719 1745 : // system_id will be updated on mismatch
720 1745 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
721 1745 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
722 443 : if self.state.server.system_id != 0 {
723 UBC 0 : warn!(
724 0 : "unexpected system ID arrived, got {}, expected {}",
725 0 : msg.system_id, self.state.server.system_id
726 0 : );
727 CBC 443 : }
728 :
729 443 : let mut state = self.state.clone();
730 443 : state.server.system_id = msg.system_id;
731 443 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
732 443 : state.server.pg_version = msg.pg_version;
733 443 : }
734 1326 : self.state.persist(&state).await?;
735 1302 : }
736 :
737 1745 : info!(
738 1745 : "processed greeting from walproposer {}, sending term {:?}",
739 27920 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
740 1745 : self.state.acceptor_state.term
741 1745 : );
742 1745 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
743 1745 : term: self.state.acceptor_state.term,
744 1745 : node_id: self.node_id,
745 1745 : })))
746 1745 : }
747 :
748 : /// Give vote for the given term, if we haven't done that previously.
749 1745 : async fn handle_vote_request(
750 1745 : &mut self,
751 1745 : msg: &VoteRequest,
752 1745 : ) -> Result<Option<AcceptorProposerMessage>> {
753 1745 : // Once voted, we won't accept data from older proposers; flush
754 1745 : // everything we've already received so that new proposer starts
755 1745 : // streaming at end of our WAL, without overlap. Currently we truncate
756 1745 : // WAL at streaming point, so this avoids truncating already committed
757 1745 : // WAL.
758 1745 : //
759 1745 : // TODO: it would be smoother to not truncate committed piece at
760 1745 : // handle_elected instead. Currently not a big deal, as proposer is the
761 1745 : // only source of WAL; with peer2peer recovery it would be more
762 1745 : // important.
763 1745 : self.wal_store.flush_wal().await?;
764 : // initialize with refusal
765 1745 : let mut resp = VoteResponse {
766 1745 : term: self.state.acceptor_state.term,
767 1745 : vote_given: false as u64,
768 1745 : flush_lsn: self.flush_lsn(),
769 1745 : truncate_lsn: self.inmem.peer_horizon_lsn,
770 1745 : term_history: self.get_term_history(),
771 1745 : timeline_start_lsn: self.state.timeline_start_lsn,
772 1745 : };
773 1745 : if self.state.acceptor_state.term < msg.term {
774 1691 : let mut state = self.state.clone();
775 1691 : state.acceptor_state.term = msg.term;
776 1691 : // persist vote before sending it out
777 5065 : self.state.persist(&state).await?;
778 :
779 1691 : resp.term = self.state.acceptor_state.term;
780 1691 : resp.vote_given = true as u64;
781 54 : }
782 1743 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
783 1745 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
784 1745 : }
785 :
786 : /// Form AppendResponse from current state.
787 1294233 : fn append_response(&self) -> AppendResponse {
788 1294233 : let ar = AppendResponse {
789 1294233 : term: self.state.acceptor_state.term,
790 1294233 : flush_lsn: self.flush_lsn(),
791 1294233 : commit_lsn: self.state.commit_lsn,
792 1294233 : // will be filled by the upper code to avoid bothering safekeeper
793 1294233 : hs_feedback: HotStandbyFeedback::empty(),
794 1294233 : pageserver_feedback: PageserverFeedback::empty(),
795 1294233 : };
796 1294233 : trace!("formed AppendResponse {:?}", ar);
797 1294233 : ar
798 1294233 : }
799 :
800 835 : async fn handle_elected(
801 835 : &mut self,
802 835 : msg: &ProposerElected,
803 835 : ) -> Result<Option<AcceptorProposerMessage>> {
804 834 : info!("received ProposerElected {:?}", msg);
805 835 : if self.state.acceptor_state.term < msg.term {
806 4 : let mut state = self.state.clone();
807 4 : state.acceptor_state.term = msg.term;
808 9 : self.state.persist(&state).await?;
809 831 : }
810 :
811 : // If our term is higher, ignore the message (next feedback will inform the compute)
812 835 : if self.state.acceptor_state.term > msg.term {
813 UBC 0 : return Ok(None);
814 CBC 835 : }
815 835 :
816 835 : // This might happen in a rare race when another (old) connection from
817 835 : // the same walproposer writes + flushes WAL after this connection
818 835 : // already sent flush_lsn in VoteRequest. It is generally safe to
819 835 : // proceed, but to prevent commit_lsn surprisingly going down we should
820 835 : // either refuse the session (simpler) or skip the part we already have
821 835 : // from the stream (can be implemented).
822 835 : if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
823 UBC 0 : bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
824 0 : msg.term, self.flush_lsn(), msg.start_streaming_at)
825 CBC 835 : }
826 835 : // Otherwise we must never attempt to truncate committed data.
827 835 : assert!(
828 835 : msg.start_streaming_at >= self.inmem.commit_lsn,
829 UBC 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
830 : msg.start_streaming_at,
831 : self.inmem.commit_lsn
832 : );
833 :
834 : // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
835 : // intersection of our history and history from msg
836 :
837 : // truncate wal, update the LSNs
838 CBC 1488771 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
839 :
840 : // and now adopt term history from proposer
841 : {
842 835 : let mut state = self.state.clone();
843 835 :
844 835 : // Here we learn initial LSN for the first time, set fields
845 835 : // interested in that.
846 835 :
847 835 : if state.timeline_start_lsn == Lsn(0) {
848 : // Remember point where WAL begins globally.
849 448 : state.timeline_start_lsn = msg.timeline_start_lsn;
850 447 : info!(
851 447 : "setting timeline_start_lsn to {:?}",
852 447 : state.timeline_start_lsn
853 447 : );
854 387 : }
855 835 : if state.local_start_lsn == Lsn(0) {
856 445 : state.local_start_lsn = msg.start_streaming_at;
857 444 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
858 390 : }
859 : // Initializing commit_lsn before acking first flushed record is
860 : // important to let find_end_of_wal skip the hole in the beginning
861 : // of the first segment.
862 : //
863 : // NB: on new clusters, this happens at the same time as
864 : // timeline_start_lsn initialization, it is taken outside to provide
865 : // upgrade.
866 835 : self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
867 835 :
868 835 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
869 835 : self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
870 835 :
871 835 : state.acceptor_state.term_history = msg.term_history.clone();
872 2447 : self.persist_control_file(state).await?;
873 : }
874 :
875 834 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
876 :
877 : // Cache LSN where term starts to immediately fsync control file with
878 : // commit_lsn once we reach it -- sync-safekeepers finishes when
879 : // persisted commit_lsn on majority of safekeepers aligns.
880 835 : self.epoch_start_lsn = match msg.term_history.0.last() {
881 UBC 0 : None => bail!("proposer elected with empty term history"),
882 CBC 835 : Some(term_lsn_start) => term_lsn_start.lsn,
883 835 : };
884 835 :
885 835 : Ok(None)
886 835 : }
887 :
888 : /// Advance commit_lsn taking into account what we have locally.
889 : ///
890 : /// Note: it is assumed that 'WAL we have is from the right term' check has
891 : /// already been done outside.
892 1985059 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
893 1985059 : // Both peers and walproposer communicate this value, we might already
894 1985059 : // have a fresher (higher) version.
895 1985059 : candidate = max(candidate, self.inmem.commit_lsn);
896 1985059 : let commit_lsn = min(candidate, self.flush_lsn());
897 1985059 : assert!(
898 1985059 : commit_lsn >= self.inmem.commit_lsn,
899 UBC 0 : "commit_lsn monotonicity violated: old={} new={}",
900 : self.inmem.commit_lsn,
901 : commit_lsn
902 : );
903 :
904 CBC 1985059 : self.inmem.commit_lsn = commit_lsn;
905 1985059 :
906 1985059 : // If new commit_lsn reached epoch switch, force sync of control
907 1985059 : // file: walproposer in sync mode is very interested when this
908 1985059 : // happens. Note: this is for sync-safekeepers mode only, as
909 1985059 : // otherwise commit_lsn might jump over epoch_start_lsn.
910 1985059 : if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
911 88 : self.persist_control_file(self.state.clone()).await?;
912 1985029 : }
913 :
914 1985059 : Ok(())
915 1985059 : }
916 :
917 : /// Persist in-memory state of control file to disk.
918 : //
919 : // TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better
920 : // separate state completely and give Arc to all those who need it.
921 920 : pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
922 920 : let mut state = self.state.clone();
923 920 : state.remote_consistent_lsn = inmem_remote_consistent_lsn;
924 2851 : self.persist_control_file(state).await
925 920 : }
926 :
927 : /// Persist in-memory state to the disk, taking other data from state.
928 2964 : async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
929 2964 : state.commit_lsn = self.inmem.commit_lsn;
930 2964 : state.backup_lsn = self.inmem.backup_lsn;
931 2964 : state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
932 2964 : state.proposer_uuid = self.inmem.proposer_uuid;
933 8840 : self.state.persist(&state).await
934 2964 : }
935 :
936 : /// Persist control file if there is something to save and enough time
937 : /// passed after the last save.
938 1846 : pub async fn maybe_persist_inmem_control_file(
939 1846 : &mut self,
940 1846 : inmem_remote_consistent_lsn: Lsn,
941 1846 : ) -> Result<()> {
942 1846 : const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
943 1846 : if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
944 1842 : return Ok(());
945 4 : }
946 4 : let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
947 2 : || self.inmem.backup_lsn > self.state.backup_lsn
948 2 : || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
949 2 : || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
950 4 : if need_persist {
951 6 : self.persist_inmem(inmem_remote_consistent_lsn).await?;
952 UBC 0 : trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
953 CBC 2 : }
954 4 : Ok(())
955 1846 : }
956 :
957 : /// Handle request to append WAL.
958 : #[allow(clippy::comparison_chain)]
959 1975846 : async fn handle_append_request(
960 1975846 : &mut self,
961 1975846 : msg: &AppendRequest,
962 1975846 : require_flush: bool,
963 1975846 : ) -> Result<Option<AcceptorProposerMessage>> {
964 1975846 : if self.state.acceptor_state.term < msg.h.term {
965 UBC 0 : bail!("got AppendRequest before ProposerElected");
966 CBC 1975846 : }
967 1975846 :
968 1975846 : // If our term is higher, immediately refuse the message.
969 1975846 : if self.state.acceptor_state.term > msg.h.term {
970 1 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
971 1 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
972 1975845 : }
973 1975845 :
974 1975845 : // Now we know that we are in the same term as the proposer,
975 1975845 : // processing the message.
976 1975845 :
977 1975845 : self.inmem.proposer_uuid = msg.h.proposer_uuid;
978 1975845 :
979 1975845 : // do the job
980 1975845 : if !msg.wal_data.is_empty() {
981 1216621 : self.wal_store
982 1216621 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
983 2485345 : .await?;
984 759224 : }
985 :
986 : // flush wal to the disk, if required
987 1975844 : if require_flush {
988 5 : self.wal_store.flush_wal().await?;
989 1975839 : }
990 :
991 : // Update commit_lsn.
992 1975844 : if msg.h.commit_lsn != Lsn(0) {
993 1973273 : self.update_commit_lsn(msg.h.commit_lsn).await?;
994 2571 : }
995 : // Value calculated by walproposer can always lag:
996 : // - safekeepers can forget inmem value and send to proposer lower
997 : // persisted one on restart;
998 : // - if we make safekeepers always send persistent value,
999 : // any compute restart would pull it down.
1000 : // Thus, take max before adopting.
1001 1975844 : self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
1002 1975844 :
1003 1975844 : // Update truncate and commit LSN in control file.
1004 1975844 : // To avoid negative impact on performance of extra fsync, do it only
1005 1975844 : // when truncate_lsn delta exceeds WAL segment size.
1006 1975844 : if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
1007 1975844 : < self.inmem.peer_horizon_lsn
1008 : {
1009 2387 : self.persist_control_file(self.state.clone()).await?;
1010 1975021 : }
1011 :
1012 UBC 0 : trace!(
1013 0 : "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
1014 0 : msg.wal_data.len(),
1015 0 : msg.h.end_lsn,
1016 0 : msg.h.commit_lsn,
1017 0 : msg.h.truncate_lsn,
1018 0 : require_flush,
1019 0 : );
1020 :
1021 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
1022 CBC 1975844 : if !require_flush {
1023 1975839 : return Ok(None);
1024 5 : }
1025 5 :
1026 5 : let resp = self.append_response();
1027 5 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1028 1975845 : }
1029 :
1030 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1031 1294228 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1032 1294228 : self.wal_store.flush_wal().await?;
1033 1294228 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1034 1294228 : self.append_response(),
1035 1294228 : )))
1036 1294228 : }
1037 :
1038 : /// Update timeline state with peer safekeeper data.
1039 11960 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1040 11960 : let mut sync_control_file = false;
1041 11960 :
1042 11960 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
1043 : // Note: the check is too restrictive, generally we can update local
1044 : // commit_lsn if our history matches (is part of) history of advanced
1045 : // commit_lsn provider.
1046 11841 : if sk_info.last_log_term == self.get_epoch() {
1047 11786 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1048 55 : }
1049 119 : }
1050 :
1051 11960 : let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
1052 11960 : sync_control_file |=
1053 11960 : self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
1054 11960 : self.inmem.backup_lsn = new_backup_lsn;
1055 11960 :
1056 11960 : // value in sk_info should be maximized over our local in memory value.
1057 11960 : let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
1058 11960 : assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
1059 11960 : sync_control_file |= self.state.remote_consistent_lsn
1060 11960 : + (self.state.server.wal_seg_size as u64)
1061 11960 : < new_remote_consistent_lsn;
1062 11960 :
1063 11960 : let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
1064 11960 : sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
1065 11960 : < new_peer_horizon_lsn;
1066 11960 : self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
1067 11960 :
1068 11960 : if sync_control_file {
1069 356 : let mut state = self.state.clone();
1070 356 : state.remote_consistent_lsn = new_remote_consistent_lsn;
1071 1067 : self.persist_control_file(state).await?;
1072 11604 : }
1073 11960 : Ok(())
1074 11960 : }
1075 :
1076 : /// Get oldest segno we still need to keep. We hold WAL till it is consumed
1077 : /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
1078 : /// offloading.
1079 : /// While it is safe to use inmem values for determining horizon,
1080 : /// we use persistent to make possible normal states less surprising.
1081 1846 : pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
1082 1846 : let mut horizon_lsn = min(
1083 1846 : self.state.remote_consistent_lsn,
1084 1846 : self.state.peer_horizon_lsn,
1085 1846 : );
1086 1846 : if wal_backup_enabled {
1087 1846 : horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
1088 1846 : }
1089 1846 : horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
1090 1846 : }
1091 : }
1092 :
1093 : #[cfg(test)]
1094 : mod tests {
1095 : use futures::future::BoxFuture;
1096 : use postgres_ffi::WAL_SEGMENT_SIZE;
1097 :
1098 : use super::*;
1099 : use crate::wal_storage::Storage;
1100 : use std::{ops::Deref, str::FromStr, time::Instant};
1101 :
1102 : // fake storage for tests
1103 : struct InMemoryState {
1104 : persisted_state: SafeKeeperState,
1105 : }
1106 :
1107 : #[async_trait::async_trait]
1108 : impl control_file::Storage for InMemoryState {
1109 3 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
1110 3 : self.persisted_state = s.clone();
1111 3 : Ok(())
1112 3 : }
1113 :
1114 UBC 0 : fn last_persist_at(&self) -> Instant {
1115 0 : Instant::now()
1116 0 : }
1117 : }
1118 :
1119 : impl Deref for InMemoryState {
1120 : type Target = SafeKeeperState;
1121 :
1122 CBC 56 : fn deref(&self) -> &Self::Target {
1123 56 : &self.persisted_state
1124 56 : }
1125 : }
1126 :
1127 2 : fn test_sk_state() -> SafeKeeperState {
1128 2 : let mut state = SafeKeeperState::empty();
1129 2 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1130 2 : state.tenant_id = TenantId::from([1u8; 16]);
1131 2 : state.timeline_id = TimelineId::from([1u8; 16]);
1132 2 : state
1133 2 : }
1134 :
1135 : struct DummyWalStore {
1136 : lsn: Lsn,
1137 : }
1138 :
1139 : #[async_trait::async_trait]
1140 : impl wal_storage::Storage for DummyWalStore {
1141 9 : fn flush_lsn(&self) -> Lsn {
1142 9 : self.lsn
1143 9 : }
1144 :
1145 2 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1146 2 : self.lsn = startpos + buf.len() as u64;
1147 2 : Ok(())
1148 2 : }
1149 :
1150 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1151 2 : self.lsn = end_pos;
1152 2 : Ok(())
1153 2 : }
1154 :
1155 4 : async fn flush_wal(&mut self) -> Result<()> {
1156 4 : Ok(())
1157 4 : }
1158 :
1159 UBC 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1160 0 : Box::pin(async { Ok(()) })
1161 0 : }
1162 :
1163 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1164 0 : crate::metrics::WalStorageMetrics::default()
1165 0 : }
1166 : }
1167 :
1168 CBC 1 : #[tokio::test]
1169 1 : async fn test_voting() {
1170 1 : let storage = InMemoryState {
1171 1 : persisted_state: test_sk_state(),
1172 1 : };
1173 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1174 1 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1175 1 :
1176 1 : // check voting for 1 is ok
1177 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1178 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1179 1 : match vote_resp.unwrap() {
1180 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1181 UBC 0 : r => panic!("unexpected response: {:?}", r),
1182 : }
1183 :
1184 : // reboot...
1185 CBC 1 : let state = sk.state.persisted_state.clone();
1186 1 : let storage = InMemoryState {
1187 1 : persisted_state: state,
1188 1 : };
1189 1 :
1190 1 : sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
1191 :
1192 : // and ensure voting second time for 1 is not ok
1193 1 : vote_resp = sk.process_msg(&vote_request).await;
1194 1 : match vote_resp.unwrap() {
1195 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1196 UBC 0 : r => panic!("unexpected response: {:?}", r),
1197 : }
1198 : }
1199 :
1200 CBC 1 : #[tokio::test]
1201 1 : async fn test_epoch_switch() {
1202 1 : let storage = InMemoryState {
1203 1 : persisted_state: test_sk_state(),
1204 1 : };
1205 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1206 1 :
1207 1 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1208 1 :
1209 1 : let mut ar_hdr = AppendRequestHeader {
1210 1 : term: 1,
1211 1 : epoch_start_lsn: Lsn(3),
1212 1 : begin_lsn: Lsn(1),
1213 1 : end_lsn: Lsn(2),
1214 1 : commit_lsn: Lsn(0),
1215 1 : truncate_lsn: Lsn(0),
1216 1 : proposer_uuid: [0; 16],
1217 1 : };
1218 1 : let mut append_request = AppendRequest {
1219 1 : h: ar_hdr.clone(),
1220 1 : wal_data: Bytes::from_static(b"b"),
1221 1 : };
1222 1 :
1223 1 : let pem = ProposerElected {
1224 1 : term: 1,
1225 1 : start_streaming_at: Lsn(1),
1226 1 : term_history: TermHistory(vec![TermLsn {
1227 1 : term: 1,
1228 1 : lsn: Lsn(3),
1229 1 : }]),
1230 1 : timeline_start_lsn: Lsn(0),
1231 1 : };
1232 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1233 UBC 0 : .await
1234 CBC 1 : .unwrap();
1235 :
1236 : // check that AppendRequest before epochStartLsn doesn't switch epoch
1237 1 : let resp = sk
1238 1 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1239 UBC 0 : .await;
1240 CBC 1 : assert!(resp.is_ok());
1241 1 : assert_eq!(sk.get_epoch(), 0);
1242 :
1243 : // but record at epochStartLsn does the switch
1244 1 : ar_hdr.begin_lsn = Lsn(2);
1245 1 : ar_hdr.end_lsn = Lsn(3);
1246 1 : append_request = AppendRequest {
1247 1 : h: ar_hdr,
1248 1 : wal_data: Bytes::from_static(b"b"),
1249 1 : };
1250 1 : let resp = sk
1251 1 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1252 UBC 0 : .await;
1253 CBC 1 : assert!(resp.is_ok());
1254 1 : sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
1255 1 : assert_eq!(sk.get_epoch(), 1);
1256 : }
1257 :
1258 1 : #[test]
1259 1 : fn test_find_highest_common_point_none() {
1260 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1261 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1262 1 : assert_eq!(
1263 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1264 1 : None
1265 1 : );
1266 1 : }
1267 :
1268 1 : #[test]
1269 1 : fn test_find_highest_common_point_middle() {
1270 1 : let prop_th = TermHistory(vec![
1271 1 : (1, Lsn(10)).into(),
1272 1 : (2, Lsn(20)).into(),
1273 1 : (4, Lsn(40)).into(),
1274 1 : ]);
1275 1 : let sk_th = TermHistory(vec![
1276 1 : (1, Lsn(10)).into(),
1277 1 : (2, Lsn(20)).into(),
1278 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1279 1 : ]);
1280 1 : assert_eq!(
1281 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1282 1 : Some(TermLsn {
1283 1 : term: 2,
1284 1 : lsn: Lsn(30),
1285 1 : })
1286 1 : );
1287 1 : }
1288 :
1289 1 : #[test]
1290 1 : fn test_find_highest_common_point_sk_end() {
1291 1 : let prop_th = TermHistory(vec![
1292 1 : (1, Lsn(10)).into(),
1293 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1294 1 : (4, Lsn(40)).into(),
1295 1 : ]);
1296 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1297 1 : assert_eq!(
1298 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1299 1 : Some(TermLsn {
1300 1 : term: 2,
1301 1 : lsn: Lsn(32),
1302 1 : })
1303 1 : );
1304 1 : }
1305 :
1306 1 : #[test]
1307 1 : fn test_find_highest_common_point_walprop() {
1308 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1309 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1310 1 : assert_eq!(
1311 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1312 1 : Some(TermLsn {
1313 1 : term: 2,
1314 1 : lsn: Lsn(32),
1315 1 : })
1316 1 : );
1317 1 : }
1318 :
1319 1 : #[test]
1320 1 : fn test_sk_state_bincode_serde_roundtrip() {
1321 1 : use utils::Hex;
1322 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1323 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1324 1 : let state = SafeKeeperState {
1325 1 : tenant_id,
1326 1 : timeline_id,
1327 1 : acceptor_state: AcceptorState {
1328 1 : term: 42,
1329 1 : term_history: TermHistory(vec![TermLsn {
1330 1 : lsn: Lsn(0x1),
1331 1 : term: 41,
1332 1 : }]),
1333 1 : },
1334 1 : server: ServerInfo {
1335 1 : pg_version: 14,
1336 1 : system_id: 0x1234567887654321,
1337 1 : wal_seg_size: 0x12345678,
1338 1 : },
1339 1 : proposer_uuid: {
1340 1 : let mut arr = timeline_id.as_arr();
1341 1 : arr.reverse();
1342 1 : arr
1343 1 : },
1344 1 : timeline_start_lsn: Lsn(0x12345600),
1345 1 : local_start_lsn: Lsn(0x12),
1346 1 : commit_lsn: Lsn(1234567800),
1347 1 : backup_lsn: Lsn(1234567300),
1348 1 : peer_horizon_lsn: Lsn(9999999),
1349 1 : remote_consistent_lsn: Lsn(1234560000),
1350 1 : peers: PersistedPeers(vec![(
1351 1 : NodeId(1),
1352 1 : PersistedPeerInfo {
1353 1 : backup_lsn: Lsn(1234567000),
1354 1 : term: 42,
1355 1 : flush_lsn: Lsn(1234567800 - 8),
1356 1 : commit_lsn: Lsn(1234567600),
1357 1 : },
1358 1 : )]),
1359 1 : };
1360 1 :
1361 1 : let ser = state.ser().unwrap();
1362 1 :
1363 1 : #[rustfmt::skip]
1364 1 : let expected = [
1365 1 : // tenant_id as length prefixed hex
1366 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1367 1 : 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
1368 1 : // timeline_id as length prefixed hex
1369 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1370 1 : 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
1371 1 : // term
1372 1 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1373 1 : // length prefix
1374 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1375 1 : // unsure why this order is swapped
1376 1 : 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1377 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1378 1 : // pg_version
1379 1 : 0x0e, 0x00, 0x00, 0x00,
1380 1 : // systemid
1381 1 : 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
1382 1 : // wal_seg_size
1383 1 : 0x78, 0x56, 0x34, 0x12,
1384 1 : // pguuid as length prefixed hex
1385 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1386 1 : 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
1387 1 :
1388 1 : // timeline_start_lsn
1389 1 : 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
1390 1 : 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1391 1 : 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1392 1 : 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1393 1 : 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
1394 1 : 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1395 1 : // length prefix for persistentpeers
1396 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1397 1 : // nodeid
1398 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1399 1 : // backuplsn
1400 1 : 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1401 1 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1402 1 : 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1403 1 : 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1404 1 : ];
1405 1 :
1406 1 : assert_eq!(Hex(&ser), Hex(&expected));
1407 :
1408 1 : let deser = SafeKeeperState::des(&ser).unwrap();
1409 1 :
1410 1 : assert_eq!(deser, state);
1411 1 : }
1412 : }
|