TLA Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use std::time::Duration;
14 : use storage_broker::proto::SafekeeperTimelineInfo;
15 :
16 : use tracing::*;
17 :
18 : use crate::control_file;
19 : use crate::send_wal::HotStandbyFeedback;
20 :
21 : use crate::wal_storage;
22 : use pq_proto::SystemId;
23 : use utils::pageserver_feedback::PageserverFeedback;
24 : use utils::{
25 : bin_ser::LeSer,
26 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
27 : lsn::Lsn,
28 : };
29 :
30 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
31 : pub const SK_FORMAT_VERSION: u32 = 7;
32 : const SK_PROTOCOL_VERSION: u32 = 2;
33 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
34 :
35 : /// Consensus logical timestamp.
36 : pub type Term = u64;
37 : pub const INVALID_TERM: Term = 0;
38 :
39 CBC 16470 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
40 : pub struct TermLsn {
41 : pub term: Term,
42 : pub lsn: Lsn,
43 : }
44 :
45 : // Creation from tuple provides less typing (e.g. for unit tests).
46 : impl From<(Term, Lsn)> for TermLsn {
47 5378031 : fn from(pair: (Term, Lsn)) -> TermLsn {
48 5378031 : TermLsn {
49 5378031 : term: pair.0,
50 5378031 : lsn: pair.1,
51 5378031 : }
52 5378031 : }
53 : }
54 :
55 14592 : #[derive(Clone, Serialize, Deserialize)]
56 : pub struct TermHistory(pub Vec<TermLsn>);
57 :
58 : impl TermHistory {
59 485 : pub fn empty() -> TermHistory {
60 485 : TermHistory(Vec::new())
61 485 : }
62 :
63 : // Parse TermHistory as n_entries followed by TermLsn pairs
64 1016 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
65 1016 : if bytes.remaining() < 4 {
66 UBC 0 : bail!("TermHistory misses len");
67 CBC 1016 : }
68 1016 : let n_entries = bytes.get_u32_le();
69 1016 : let mut res = Vec::with_capacity(n_entries as usize);
70 1016 : for _ in 0..n_entries {
71 7108 : if bytes.remaining() < 16 {
72 UBC 0 : bail!("TermHistory is incomplete");
73 CBC 7108 : }
74 7108 : res.push(TermLsn {
75 7108 : term: bytes.get_u64_le(),
76 7108 : lsn: bytes.get_u64_le().into(),
77 7108 : })
78 : }
79 1016 : Ok(TermHistory(res))
80 1016 : }
81 :
82 : /// Return copy of self with switches happening strictly after up_to
83 : /// truncated.
84 19491 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
85 19491 : let mut res = Vec::with_capacity(self.0.len());
86 82324 : for e in &self.0 {
87 62837 : if e.lsn > up_to {
88 4 : break;
89 62833 : }
90 62833 : res.push(*e);
91 : }
92 19491 : TermHistory(res)
93 19491 : }
94 : }
95 :
96 : /// Display only latest entries for Debug.
97 : impl fmt::Debug for TermHistory {
98 3028 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
99 3028 : let n_printed = 20;
100 3028 : write!(
101 3028 : fmt,
102 3028 : "{}{:?}",
103 3028 : if self.0.len() > n_printed { "... " } else { "" },
104 3028 : self.0
105 3028 : .iter()
106 3028 : .rev()
107 3028 : .take(n_printed)
108 12246 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
109 3028 : .collect::<Vec<_>>()
110 3028 : )
111 3028 : }
112 : }
113 :
114 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
115 : pub type PgUuid = [u8; 16];
116 :
117 : /// Persistent consensus state of the acceptor.
118 13572 : #[derive(Debug, Clone, Serialize, Deserialize)]
119 : pub struct AcceptorState {
120 : /// acceptor's last term it voted for (advanced in 1 phase)
121 : pub term: Term,
122 : /// History of term switches for safekeeper's WAL.
123 : /// Actually it often goes *beyond* WAL contents as we adopt term history
124 : /// from the proposer before recovery.
125 : pub term_history: TermHistory,
126 : }
127 :
128 : impl AcceptorState {
129 : /// acceptor's epoch is the term of the highest entry in the log
130 17477 : pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
131 17477 : let th = self.term_history.up_to(flush_lsn);
132 17477 : match th.0.last() {
133 16941 : Some(e) => e.term,
134 536 : None => 0,
135 : }
136 17477 : }
137 : }
138 :
139 : /// Information about Postgres. Safekeeper gets it once and then verifies
140 : /// all further connections from computes match.
141 13572 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142 : pub struct ServerInfo {
143 : /// Postgres server version
144 : pub pg_version: u32,
145 : pub system_id: SystemId,
146 : pub wal_seg_size: u32,
147 : }
148 :
149 UBC 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
150 : pub struct PersistedPeerInfo {
151 : /// LSN up to which safekeeper offloaded WAL to s3.
152 : backup_lsn: Lsn,
153 : /// Term of the last entry.
154 : term: Term,
155 : /// LSN of the last record.
156 : flush_lsn: Lsn,
157 : /// Up to which LSN safekeeper regards its WAL as committed.
158 : commit_lsn: Lsn,
159 : }
160 :
161 : impl PersistedPeerInfo {
162 0 : fn new() -> Self {
163 0 : Self {
164 0 : backup_lsn: Lsn::INVALID,
165 0 : term: INVALID_TERM,
166 0 : flush_lsn: Lsn(0),
167 0 : commit_lsn: Lsn(0),
168 0 : }
169 0 : }
170 : }
171 :
172 CBC 13572 : #[derive(Debug, Clone, Serialize, Deserialize)]
173 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
174 :
175 : /// Persistent information stored on safekeeper node
176 : /// On disk data is prefixed by magic and format version and followed by checksum.
177 13572 : #[derive(Debug, Clone, Serialize, Deserialize)]
178 : pub struct SafeKeeperState {
179 : #[serde(with = "hex")]
180 : pub tenant_id: TenantId,
181 : #[serde(with = "hex")]
182 : pub timeline_id: TimelineId,
183 : /// persistent acceptor state
184 : pub acceptor_state: AcceptorState,
185 : /// information about server
186 : pub server: ServerInfo,
187 : /// Unique id of the last *elected* proposer we dealt with. Not needed
188 : /// for correctness, exists for monitoring purposes.
189 : #[serde(with = "hex")]
190 : pub proposer_uuid: PgUuid,
191 : /// Since which LSN this timeline generally starts. Safekeeper might have
192 : /// joined later.
193 : pub timeline_start_lsn: Lsn,
194 : /// Since which LSN safekeeper has (had) WAL for this timeline.
195 : /// All WAL segments next to one containing local_start_lsn are
196 : /// filled with data from the beginning.
197 : pub local_start_lsn: Lsn,
198 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
199 : /// to record boundary.
200 : pub commit_lsn: Lsn,
201 : /// LSN that points to the end of the last backed up segment. Useful to
202 : /// persist to avoid finding out offloading progress on boot.
203 : pub backup_lsn: Lsn,
204 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
205 : /// of last record streamed to everyone). Persisting it helps skipping
206 : /// recovery in walproposer, generally we compute it from peers. In
207 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
208 : /// only by walproposer.
209 : pub peer_horizon_lsn: Lsn,
210 : /// LSN of the oldest known checkpoint made by pageserver and successfully
211 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
212 : /// informational purposes, we receive it from pageserver (or broker).
213 : pub remote_consistent_lsn: Lsn,
214 : // Peers and their state as we remember it. Knowing peers themselves is
215 : // fundamental; but state is saved here only for informational purposes and
216 : // obviously can be stale. (Currently not saved at all, but let's provision
217 : // place to have less file version upgrades).
218 : pub peers: PersistedPeers,
219 : }
220 :
221 2734 : #[derive(Debug, Clone, Serialize, Deserialize)]
222 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
223 : // are not flushed yet.
224 : pub struct SafekeeperMemState {
225 : pub commit_lsn: Lsn,
226 : pub backup_lsn: Lsn,
227 : pub peer_horizon_lsn: Lsn,
228 : #[serde(with = "hex")]
229 : pub proposer_uuid: PgUuid,
230 : }
231 :
232 : impl SafeKeeperState {
233 485 : pub fn new(
234 485 : ttid: &TenantTimelineId,
235 485 : server_info: ServerInfo,
236 485 : peers: Vec<NodeId>,
237 485 : commit_lsn: Lsn,
238 485 : local_start_lsn: Lsn,
239 485 : ) -> SafeKeeperState {
240 485 : SafeKeeperState {
241 485 : tenant_id: ttid.tenant_id,
242 485 : timeline_id: ttid.timeline_id,
243 485 : acceptor_state: AcceptorState {
244 485 : term: 0,
245 485 : term_history: TermHistory::empty(),
246 485 : },
247 485 : server: server_info,
248 485 : proposer_uuid: [0; 16],
249 485 : timeline_start_lsn: Lsn(0),
250 485 : local_start_lsn,
251 485 : commit_lsn,
252 485 : backup_lsn: local_start_lsn,
253 485 : peer_horizon_lsn: local_start_lsn,
254 485 : remote_consistent_lsn: Lsn(0),
255 485 : peers: PersistedPeers(
256 485 : peers
257 485 : .iter()
258 485 : .map(|p| (*p, PersistedPeerInfo::new()))
259 485 : .collect(),
260 485 : ),
261 485 : }
262 485 : }
263 :
264 : #[cfg(test)]
265 4 : pub fn empty() -> Self {
266 4 : SafeKeeperState::new(
267 4 : &TenantTimelineId::empty(),
268 4 : ServerInfo {
269 4 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
270 4 : system_id: 0, /* Postgres system identifier */
271 4 : wal_seg_size: 0,
272 4 : },
273 4 : vec![],
274 4 : Lsn::INVALID,
275 4 : Lsn::INVALID,
276 4 : )
277 4 : }
278 : }
279 :
280 : // protocol messages
281 :
282 : /// Initial Proposer -> Acceptor message
283 2012 : #[derive(Debug, Deserialize)]
284 : pub struct ProposerGreeting {
285 : /// proposer-acceptor protocol version
286 : pub protocol_version: u32,
287 : /// Postgres server version
288 : pub pg_version: u32,
289 : pub proposer_id: PgUuid,
290 : pub system_id: SystemId,
291 : pub timeline_id: TimelineId,
292 : pub tenant_id: TenantId,
293 : pub tli: TimeLineID,
294 : pub wal_seg_size: u32,
295 : }
296 :
297 : /// Acceptor -> Proposer initial response: the highest term known to me
298 : /// (acceptor voted for).
299 UBC 0 : #[derive(Debug, Serialize)]
300 : pub struct AcceptorGreeting {
301 : term: u64,
302 : node_id: NodeId,
303 : }
304 :
305 : /// Vote request sent from proposer to safekeepers
306 CBC 2009 : #[derive(Debug, Deserialize)]
307 : pub struct VoteRequest {
308 : term: Term,
309 : }
310 :
311 : /// Vote itself, sent from safekeeper to proposer
312 2009 : #[derive(Debug, Serialize)]
313 : pub struct VoteResponse {
314 : term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
315 : vote_given: u64, // fixme u64 due to padding
316 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
317 : // proposer to choose the most advanced one.
318 : flush_lsn: Lsn,
319 : truncate_lsn: Lsn,
320 : term_history: TermHistory,
321 : timeline_start_lsn: Lsn,
322 : }
323 :
324 : /*
325 : * Proposer -> Acceptor message announcing proposer is elected and communicating
326 : * term history to it.
327 : */
328 1019 : #[derive(Debug)]
329 : pub struct ProposerElected {
330 : pub term: Term,
331 : pub start_streaming_at: Lsn,
332 : pub term_history: TermHistory,
333 : pub timeline_start_lsn: Lsn,
334 : }
335 :
336 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
337 : /// communicates commit_lsn.
338 UBC 0 : #[derive(Debug)]
339 : pub struct AppendRequest {
340 : pub h: AppendRequestHeader,
341 : pub wal_data: Bytes,
342 : }
343 CBC 2912723 : #[derive(Debug, Clone, Deserialize)]
344 : pub struct AppendRequestHeader {
345 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
346 : pub term: Term,
347 : // LSN since the proposer appends WAL; determines epoch switch point.
348 : pub epoch_start_lsn: Lsn,
349 : /// start position of message in WAL
350 : pub begin_lsn: Lsn,
351 : /// end position of message in WAL
352 : pub end_lsn: Lsn,
353 : /// LSN committed by quorum of safekeepers
354 : pub commit_lsn: Lsn,
355 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
356 : pub truncate_lsn: Lsn,
357 : // only for logging/debugging
358 : pub proposer_uuid: PgUuid,
359 : }
360 :
361 : /// Report safekeeper state to proposer
362 3 : #[derive(Debug, Serialize)]
363 : pub struct AppendResponse {
364 : // Current term of the safekeeper; if it is higher than proposer's, the
365 : // compute is out of date.
366 : pub term: Term,
367 : // NOTE: this is physical end of wal on safekeeper; currently it doesn't
368 : // make much sense without taking epoch into account, as history can be
369 : // diverged.
370 : pub flush_lsn: Lsn,
371 : // We report back our awareness about which WAL is committed, as this is
372 : // a criterion for walproposer --sync mode exit
373 : pub commit_lsn: Lsn,
374 : pub hs_feedback: HotStandbyFeedback,
375 : pub pageserver_feedback: PageserverFeedback,
376 : }
377 :
378 : impl AppendResponse {
379 57 : fn term_only(term: Term) -> AppendResponse {
380 57 : AppendResponse {
381 57 : term,
382 57 : flush_lsn: Lsn(0),
383 57 : commit_lsn: Lsn(0),
384 57 : hs_feedback: HotStandbyFeedback::empty(),
385 57 : pageserver_feedback: PageserverFeedback::empty(),
386 57 : }
387 57 : }
388 : }
389 :
390 : /// Proposer -> Acceptor messages
391 UBC 0 : #[derive(Debug)]
392 : pub enum ProposerAcceptorMessage {
393 : Greeting(ProposerGreeting),
394 : VoteRequest(VoteRequest),
395 : Elected(ProposerElected),
396 : AppendRequest(AppendRequest),
397 : NoFlushAppendRequest(AppendRequest),
398 : FlushWAL,
399 : }
400 :
401 : impl ProposerAcceptorMessage {
402 : /// Parse proposer message.
403 CBC 2917760 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
404 2917760 : // xxx using Reader is inefficient but easy to work with bincode
405 2917760 : let mut stream = msg_bytes.reader();
406 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
407 2917760 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
408 2917760 : match tag {
409 : 'g' => {
410 2012 : let msg = ProposerGreeting::des_from(&mut stream)?;
411 2012 : Ok(ProposerAcceptorMessage::Greeting(msg))
412 : }
413 : 'v' => {
414 2009 : let msg = VoteRequest::des_from(&mut stream)?;
415 2009 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
416 : }
417 : 'e' => {
418 1016 : let mut msg_bytes = stream.into_inner();
419 1016 : if msg_bytes.remaining() < 16 {
420 UBC 0 : bail!("ProposerElected message is not complete");
421 CBC 1016 : }
422 1016 : let term = msg_bytes.get_u64_le();
423 1016 : let start_streaming_at = msg_bytes.get_u64_le().into();
424 1016 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
425 1016 : if msg_bytes.remaining() < 8 {
426 UBC 0 : bail!("ProposerElected message is not complete");
427 CBC 1016 : }
428 1016 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
429 1016 : let msg = ProposerElected {
430 1016 : term,
431 1016 : start_streaming_at,
432 1016 : timeline_start_lsn,
433 1016 : term_history,
434 1016 : };
435 1016 : Ok(ProposerAcceptorMessage::Elected(msg))
436 : }
437 : 'a' => {
438 : // read header followed by wal data
439 2912723 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
440 2912723 : let rec_size = hdr
441 2912723 : .end_lsn
442 2912723 : .checked_sub(hdr.begin_lsn)
443 2912723 : .context("begin_lsn > end_lsn in AppendRequest")?
444 : .0 as usize;
445 2912723 : if rec_size > MAX_SEND_SIZE {
446 UBC 0 : bail!(
447 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
448 0 : MAX_SEND_SIZE
449 0 : );
450 CBC 2912723 : }
451 2912723 :
452 2912723 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
453 2912723 : stream.read_exact(&mut wal_data_vec)?;
454 2912723 : let wal_data = Bytes::from(wal_data_vec);
455 2912723 : let msg = AppendRequest { h: hdr, wal_data };
456 2912723 :
457 2912723 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
458 : }
459 UBC 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
460 : }
461 CBC 2917760 : }
462 : }
463 :
464 : /// Acceptor -> Proposer messages
465 UBC 0 : #[derive(Debug)]
466 : pub enum AcceptorProposerMessage {
467 : Greeting(AcceptorGreeting),
468 : VoteResponse(VoteResponse),
469 : AppendResponse(AppendResponse),
470 : }
471 :
472 : impl AcceptorProposerMessage {
473 : /// Serialize acceptor -> proposer message.
474 CBC 2463777 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
475 2463777 : match self {
476 2012 : AcceptorProposerMessage::Greeting(msg) => {
477 2012 : buf.put_u64_le('g' as u64);
478 2012 : buf.put_u64_le(msg.term);
479 2012 : buf.put_u64_le(msg.node_id.0);
480 2012 : }
481 2008 : AcceptorProposerMessage::VoteResponse(msg) => {
482 2008 : buf.put_u64_le('v' as u64);
483 2008 : buf.put_u64_le(msg.term);
484 2008 : buf.put_u64_le(msg.vote_given);
485 2008 : buf.put_u64_le(msg.flush_lsn.into());
486 2008 : buf.put_u64_le(msg.truncate_lsn.into());
487 2008 : buf.put_u32_le(msg.term_history.0.len() as u32);
488 9713 : for e in &msg.term_history.0 {
489 7705 : buf.put_u64_le(e.term);
490 7705 : buf.put_u64_le(e.lsn.into());
491 7705 : }
492 2008 : buf.put_u64_le(msg.timeline_start_lsn.into());
493 : }
494 2459757 : AcceptorProposerMessage::AppendResponse(msg) => {
495 2459757 : buf.put_u64_le('a' as u64);
496 2459757 : buf.put_u64_le(msg.term);
497 2459757 : buf.put_u64_le(msg.flush_lsn.into());
498 2459757 : buf.put_u64_le(msg.commit_lsn.into());
499 2459757 : buf.put_i64_le(msg.hs_feedback.ts);
500 2459757 : buf.put_u64_le(msg.hs_feedback.xmin);
501 2459757 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
502 2459757 :
503 2459757 : msg.pageserver_feedback.serialize(buf);
504 2459757 : }
505 : }
506 :
507 2463777 : Ok(())
508 2463777 : }
509 : }
510 :
511 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
512 : /// It controls all WAL disk writes and updates of control file.
513 : ///
514 : /// Currently safekeeper processes:
515 : /// - messages from compute (proposers) and provides replies
516 : /// - messages from broker peers
517 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
518 : /// LSN since the proposer safekeeper currently talking to appends WAL;
519 : /// determines epoch switch point.
520 : pub epoch_start_lsn: Lsn,
521 :
522 : pub inmem: SafekeeperMemState, // in memory part
523 : pub state: CTRL, // persistent state storage
524 :
525 : pub wal_store: WAL,
526 :
527 : node_id: NodeId, // safekeeper's node id
528 : }
529 :
530 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
531 : where
532 : CTRL: control_file::Storage,
533 : WAL: wal_storage::Storage,
534 : {
535 : /// Accepts a control file storage containing the safekeeper state.
536 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
537 : /// and `server` (`wal_seg_size` inside it) fields.
538 568 : pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
539 568 : if state.tenant_id == TenantId::from([0u8; 16])
540 568 : || state.timeline_id == TimelineId::from([0u8; 16])
541 : {
542 UBC 0 : bail!(
543 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
544 0 : state.tenant_id,
545 0 : state.timeline_id
546 0 : );
547 CBC 568 : }
548 568 :
549 568 : Ok(SafeKeeper {
550 568 : epoch_start_lsn: Lsn(0),
551 568 : inmem: SafekeeperMemState {
552 568 : commit_lsn: state.commit_lsn,
553 568 : backup_lsn: state.backup_lsn,
554 568 : peer_horizon_lsn: state.peer_horizon_lsn,
555 568 : proposer_uuid: state.proposer_uuid,
556 568 : },
557 568 : state,
558 568 : wal_store,
559 568 : node_id,
560 568 : })
561 568 : }
562 :
563 : /// Get history of term switches for the available WAL
564 2011 : fn get_term_history(&self) -> TermHistory {
565 2011 : self.state
566 2011 : .acceptor_state
567 2011 : .term_history
568 2011 : .up_to(self.flush_lsn())
569 2011 : }
570 :
571 : /// Get current term.
572 5377550 : pub fn get_term(&self) -> Term {
573 5377550 : self.state.acceptor_state.term
574 5377550 : }
575 :
576 17265 : pub fn get_epoch(&self) -> Term {
577 17265 : self.state.acceptor_state.get_epoch(self.flush_lsn())
578 17265 : }
579 :
580 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
581 10784999 : pub fn flush_lsn(&self) -> Lsn {
582 10784999 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
583 10784999 : }
584 :
585 : /// Process message from proposer and possibly form reply. Concurrent
586 : /// callers must exclude each other.
587 5377471 : pub async fn process_msg(
588 5377471 : &mut self,
589 5377471 : msg: &ProposerAcceptorMessage,
590 5377471 : ) -> Result<Option<AcceptorProposerMessage>> {
591 5377471 : match msg {
592 2012 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
593 5846 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
594 1624569 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
595 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
596 18 : self.handle_append_request(msg, true).await
597 : }
598 2912723 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
599 3134402 : self.handle_append_request(msg, false).await
600 : }
601 2459700 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
602 : }
603 5377471 : }
604 :
605 : /// Handle initial message from proposer: check its sanity and send my
606 : /// current term.
607 2012 : async fn handle_greeting(
608 2012 : &mut self,
609 2012 : msg: &ProposerGreeting,
610 2012 : ) -> Result<Option<AcceptorProposerMessage>> {
611 2012 : // Check protocol compatibility
612 2012 : if msg.protocol_version != SK_PROTOCOL_VERSION {
613 UBC 0 : bail!(
614 0 : "incompatible protocol version {}, expected {}",
615 0 : msg.protocol_version,
616 0 : SK_PROTOCOL_VERSION
617 0 : );
618 CBC 2012 : }
619 2012 : /* Postgres major version mismatch is treated as fatal error
620 2012 : * because safekeepers parse WAL headers and the format
621 2012 : * may change between versions.
622 2012 : */
623 2012 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
624 UBC 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
625 : {
626 0 : bail!(
627 0 : "incompatible server version {}, expected {}",
628 0 : msg.pg_version,
629 0 : self.state.server.pg_version
630 0 : );
631 CBC 2012 : }
632 2012 :
633 2012 : if msg.tenant_id != self.state.tenant_id {
634 UBC 0 : bail!(
635 0 : "invalid tenant ID, got {}, expected {}",
636 0 : msg.tenant_id,
637 0 : self.state.tenant_id
638 0 : );
639 CBC 2012 : }
640 2012 : if msg.timeline_id != self.state.timeline_id {
641 UBC 0 : bail!(
642 0 : "invalid timeline ID, got {}, expected {}",
643 0 : msg.timeline_id,
644 0 : self.state.timeline_id
645 0 : );
646 CBC 2012 : }
647 2012 : if self.state.server.wal_seg_size != msg.wal_seg_size {
648 UBC 0 : bail!(
649 0 : "invalid wal_seg_size, got {}, expected {}",
650 0 : msg.wal_seg_size,
651 0 : self.state.server.wal_seg_size
652 0 : );
653 CBC 2012 : }
654 2012 :
655 2012 : // system_id will be updated on mismatch
656 2012 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
657 2012 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
658 475 : if self.state.server.system_id != 0 {
659 UBC 0 : warn!(
660 0 : "unexpected system ID arrived, got {}, expected {}",
661 0 : msg.system_id, self.state.server.system_id
662 0 : );
663 CBC 475 : }
664 :
665 475 : let mut state = self.state.clone();
666 475 : state.server.system_id = msg.system_id;
667 475 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
668 475 : state.server.pg_version = msg.pg_version;
669 475 : }
670 1425 : self.state.persist(&state).await?;
671 1537 : }
672 :
673 2012 : info!(
674 2012 : "processed greeting from walproposer {}, sending term {:?}",
675 32192 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
676 2012 : self.state.acceptor_state.term
677 2012 : );
678 2012 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
679 2012 : term: self.state.acceptor_state.term,
680 2012 : node_id: self.node_id,
681 2012 : })))
682 2012 : }
683 :
684 : /// Give vote for the given term, if we haven't done that previously.
685 2011 : async fn handle_vote_request(
686 2011 : &mut self,
687 2011 : msg: &VoteRequest,
688 2011 : ) -> Result<Option<AcceptorProposerMessage>> {
689 : // Once voted, we won't accept data from older proposers; flush
690 : // everything we've already received so that new proposer starts
691 : // streaming at end of our WAL, without overlap. Currently we truncate
692 : // WAL at streaming point, so this avoids truncating already committed
693 : // WAL.
694 : //
695 : // TODO: it would be smoother to not truncate committed piece at
696 : // handle_elected instead. Currently not a big deal, as proposer is the
697 : // only source of WAL; with peer2peer recovery it would be more
698 : // important.
699 2011 : self.wal_store.flush_wal().await?;
700 : // initialize with refusal
701 2011 : let mut resp = VoteResponse {
702 2011 : term: self.state.acceptor_state.term,
703 2011 : vote_given: false as u64,
704 2011 : flush_lsn: self.flush_lsn(),
705 2011 : truncate_lsn: self.inmem.peer_horizon_lsn,
706 2011 : term_history: self.get_term_history(),
707 2011 : timeline_start_lsn: self.state.timeline_start_lsn,
708 2011 : };
709 2011 : if self.state.acceptor_state.term < msg.term {
710 1950 : let mut state = self.state.clone();
711 1950 : state.acceptor_state.term = msg.term;
712 1950 : // persist vote before sending it out
713 5846 : self.state.persist(&state).await?;
714 :
715 1950 : resp.term = self.state.acceptor_state.term;
716 1950 : resp.vote_given = true as u64;
717 61 : }
718 2009 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
719 2011 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
720 2011 : }
721 :
722 : /// Form AppendResponse from current state.
723 2459705 : fn append_response(&self) -> AppendResponse {
724 2459705 : let ar = AppendResponse {
725 2459705 : term: self.state.acceptor_state.term,
726 2459705 : flush_lsn: self.flush_lsn(),
727 2459705 : commit_lsn: self.state.commit_lsn,
728 2459705 : // will be filled by the upper code to avoid bothering safekeeper
729 2459705 : hs_feedback: HotStandbyFeedback::empty(),
730 2459705 : pageserver_feedback: PageserverFeedback::empty(),
731 2459705 : };
732 2459705 : trace!("formed AppendResponse {:?}", ar);
733 2459705 : ar
734 2459705 : }
735 :
736 1020 : async fn handle_elected(
737 1020 : &mut self,
738 1020 : msg: &ProposerElected,
739 1020 : ) -> Result<Option<AcceptorProposerMessage>> {
740 1019 : info!("received ProposerElected {:?}", msg);
741 1020 : if self.state.acceptor_state.term < msg.term {
742 4 : let mut state = self.state.clone();
743 4 : state.acceptor_state.term = msg.term;
744 10 : self.state.persist(&state).await?;
745 1016 : }
746 :
747 : // If our term is higher, ignore the message (next feedback will inform the compute)
748 1020 : if self.state.acceptor_state.term > msg.term {
749 UBC 0 : return Ok(None);
750 CBC 1020 : }
751 1020 :
752 1020 : // This might happen in a rare race when another (old) connection from
753 1020 : // the same walproposer writes + flushes WAL after this connection
754 1020 : // already sent flush_lsn in VoteRequest. It is generally safe to
755 1020 : // proceed, but to prevent commit_lsn surprisingly going down we should
756 1020 : // either refuse the session (simpler) or skip the part we already have
757 1020 : // from the stream (can be implemented).
758 1020 : if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
759 UBC 0 : bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
760 0 : msg.term, self.flush_lsn(), msg.start_streaming_at)
761 CBC 1020 : }
762 1020 : // Otherwise this shouldn't happen.
763 1020 : assert!(
764 1020 : msg.start_streaming_at >= self.inmem.commit_lsn,
765 UBC 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
766 : msg.start_streaming_at,
767 : self.inmem.commit_lsn
768 : );
769 :
770 : // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
771 : // intersection of our history and history from msg
772 :
773 : // truncate wal, update the LSNs
774 CBC 1621532 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
775 :
776 : // and now adopt term history from proposer
777 : {
778 1020 : let mut state = self.state.clone();
779 1020 :
780 1020 : // Here we learn initial LSN for the first time, set fields
781 1020 : // interested in that.
782 1020 :
783 1020 : if state.timeline_start_lsn == Lsn(0) {
784 : // Remember point where WAL begins globally.
785 480 : state.timeline_start_lsn = msg.timeline_start_lsn;
786 479 : info!(
787 479 : "setting timeline_start_lsn to {:?}",
788 479 : state.timeline_start_lsn
789 479 : );
790 540 : }
791 1020 : if state.local_start_lsn == Lsn(0) {
792 471 : state.local_start_lsn = msg.start_streaming_at;
793 470 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
794 549 : }
795 : // Initializing commit_lsn before acking first flushed record is
796 : // important to let find_end_of_wal skip the hole in the beginning
797 : // of the first segment.
798 : //
799 : // NB: on new clusters, this happens at the same time as
800 : // timeline_start_lsn initialization, it is taken outside to provide
801 : // upgrade.
802 1020 : self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
803 1020 :
804 1020 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
805 1020 : self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
806 1020 :
807 1020 : state.acceptor_state.term_history = msg.term_history.clone();
808 3027 : self.persist_control_file(state).await?;
809 : }
810 :
811 1019 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
812 :
813 1020 : Ok(None)
814 1020 : }
815 :
816 : /// Advance commit_lsn taking into account what we have locally.
817 : ///
818 : /// Note: it is assumed that 'WAL we have is from the right term' check has
819 : /// already been done outside.
820 2919732 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
821 2919732 : // Both peers and walproposer communicate this value, we might already
822 2919732 : // have a fresher (higher) version.
823 2919732 : candidate = max(candidate, self.inmem.commit_lsn);
824 2919732 : let commit_lsn = min(candidate, self.flush_lsn());
825 2919732 : assert!(
826 2919732 : commit_lsn >= self.inmem.commit_lsn,
827 UBC 0 : "commit_lsn monotonicity violated: old={} new={}",
828 : self.inmem.commit_lsn,
829 : commit_lsn
830 : );
831 :
832 CBC 2919732 : self.inmem.commit_lsn = commit_lsn;
833 2919732 :
834 2919732 : // If new commit_lsn reached epoch switch, force sync of control
835 2919732 : // file: walproposer in sync mode is very interested when this
836 2919732 : // happens. Note: this is for sync-safekeepers mode only, as
837 2919732 : // otherwise commit_lsn might jump over epoch_start_lsn.
838 2919732 : // Also note that commit_lsn can reach epoch_start_lsn earlier
839 2919732 : // that we receive new epoch_start_lsn, and we still need to sync
840 2919732 : // control file in this case.
841 2919732 : if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
842 75 : self.persist_control_file(self.state.clone()).await?;
843 2919707 : }
844 :
845 2919732 : Ok(())
846 2919732 : }
847 :
848 : /// Persist control file to disk, called only after timeline creation (bootstrap).
849 481 : pub async fn persist(&mut self) -> Result<()> {
850 1559 : self.persist_control_file(self.state.clone()).await
851 481 : }
852 :
853 : /// Persist in-memory state to the disk, taking other data from state.
854 2990 : async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
855 2990 : state.commit_lsn = self.inmem.commit_lsn;
856 2990 : state.backup_lsn = self.inmem.backup_lsn;
857 2990 : state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
858 2990 : state.proposer_uuid = self.inmem.proposer_uuid;
859 9008 : self.state.persist(&state).await
860 2990 : }
861 :
862 : /// Persist control file if there is something to save and enough time
863 : /// passed after the last save.
864 1363 : pub async fn maybe_persist_control_file(
865 1363 : &mut self,
866 1363 : inmem_remote_consistent_lsn: Lsn,
867 1363 : ) -> Result<()> {
868 1363 : const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
869 1363 : if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
870 1325 : return Ok(());
871 38 : }
872 38 : let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
873 12 : || self.inmem.backup_lsn > self.state.backup_lsn
874 12 : || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
875 12 : || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
876 38 : if need_persist {
877 27 : let mut state = self.state.clone();
878 27 : state.remote_consistent_lsn = inmem_remote_consistent_lsn;
879 81 : self.persist_control_file(state).await?;
880 UBC 0 : trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
881 CBC 11 : }
882 38 : Ok(())
883 1363 : }
884 :
885 : /// Handle request to append WAL.
886 : #[allow(clippy::comparison_chain)]
887 2912728 : async fn handle_append_request(
888 2912728 : &mut self,
889 2912728 : msg: &AppendRequest,
890 2912728 : require_flush: bool,
891 2912728 : ) -> Result<Option<AcceptorProposerMessage>> {
892 2912728 : if self.state.acceptor_state.term < msg.h.term {
893 UBC 0 : bail!("got AppendRequest before ProposerElected");
894 CBC 2912728 : }
895 2912728 :
896 2912728 : // If our term is higher, immediately refuse the message.
897 2912728 : if self.state.acceptor_state.term > msg.h.term {
898 57 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
899 57 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
900 2912671 : }
901 2912671 :
902 2912671 : // Now we know that we are in the same term as the proposer,
903 2912671 : // processing the message.
904 2912671 :
905 2912671 : self.epoch_start_lsn = msg.h.epoch_start_lsn;
906 2912671 : self.inmem.proposer_uuid = msg.h.proposer_uuid;
907 2912671 :
908 2912671 : // do the job
909 2912671 : if !msg.wal_data.is_empty() {
910 1597354 : self.wal_store
911 1597354 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
912 3131744 : .await?;
913 1315317 : }
914 :
915 : // flush wal to the disk, if required
916 2912671 : if require_flush {
917 5 : self.wal_store.flush_wal().await?;
918 2912666 : }
919 :
920 : // Update commit_lsn.
921 2912671 : if msg.h.commit_lsn != Lsn(0) {
922 2910168 : self.update_commit_lsn(msg.h.commit_lsn).await?;
923 2503 : }
924 : // Value calculated by walproposer can always lag:
925 : // - safekeepers can forget inmem value and send to proposer lower
926 : // persisted one on restart;
927 : // - if we make safekeepers always send persistent value,
928 : // any compute restart would pull it down.
929 : // Thus, take max before adopting.
930 2912671 : self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
931 2912671 :
932 2912671 : // Update truncate and commit LSN in control file.
933 2912671 : // To avoid negative impact on performance of extra fsync, do it only
934 2912671 : // when truncate_lsn delta exceeds WAL segment size.
935 2912671 : if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
936 2912671 : < self.inmem.peer_horizon_lsn
937 : {
938 2610 : self.persist_control_file(self.state.clone()).await?;
939 2911786 : }
940 :
941 UBC 0 : trace!(
942 0 : "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
943 0 : msg.wal_data.len(),
944 0 : msg.h.end_lsn,
945 0 : msg.h.commit_lsn,
946 0 : msg.h.truncate_lsn,
947 0 : require_flush,
948 0 : );
949 :
950 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
951 CBC 2912671 : if !require_flush {
952 2912666 : return Ok(None);
953 5 : }
954 5 :
955 5 : let resp = self.append_response();
956 5 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
957 2912728 : }
958 :
959 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
960 2459700 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
961 2459700 : self.wal_store.flush_wal().await?;
962 2459700 : Ok(Some(AcceptorProposerMessage::AppendResponse(
963 2459700 : self.append_response(),
964 2459700 : )))
965 2459700 : }
966 :
967 : /// Update timeline state with peer safekeeper data.
968 9742 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
969 9742 : let mut sync_control_file = false;
970 9742 :
971 9742 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
972 : // Note: the check is too restrictive, generally we can update local
973 : // commit_lsn if our history matches (is part of) history of advanced
974 : // commit_lsn provider.
975 9578 : if sk_info.last_log_term == self.get_epoch() {
976 9564 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
977 14 : }
978 164 : }
979 :
980 9742 : let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
981 9742 : sync_control_file |=
982 9742 : self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
983 9742 : self.inmem.backup_lsn = new_backup_lsn;
984 9742 :
985 9742 : // value in sk_info should be maximized over our local in memory value.
986 9742 : let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
987 9742 : assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
988 9742 : sync_control_file |= self.state.remote_consistent_lsn
989 9742 : + (self.state.server.wal_seg_size as u64)
990 9742 : < new_remote_consistent_lsn;
991 9742 :
992 9742 : let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
993 9742 : sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
994 9742 : < new_peer_horizon_lsn;
995 9742 : self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
996 9742 :
997 9742 : if sync_control_file {
998 552 : let mut state = self.state.clone();
999 552 : // Note: we could make remote_consistent_lsn update in cf common by
1000 552 : // storing Arc to walsenders in Safekeeper.
1001 552 : state.remote_consistent_lsn = new_remote_consistent_lsn;
1002 1656 : self.persist_control_file(state).await?;
1003 9190 : }
1004 9742 : Ok(())
1005 9742 : }
1006 :
1007 : /// Get oldest segno we still need to keep. We hold WAL till it is consumed
1008 : /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
1009 : /// offloading.
1010 : /// While it is safe to use inmem values for determining horizon,
1011 : /// we use persistent to make possible normal states less surprising.
1012 1363 : pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
1013 1363 : let mut horizon_lsn = min(
1014 1363 : self.state.remote_consistent_lsn,
1015 1363 : self.state.peer_horizon_lsn,
1016 1363 : );
1017 1363 : if wal_backup_enabled {
1018 1363 : horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
1019 1363 : }
1020 1363 : horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
1021 1363 : }
1022 : }
1023 :
1024 : #[cfg(test)]
1025 : mod tests {
1026 : use futures::future::BoxFuture;
1027 : use postgres_ffi::WAL_SEGMENT_SIZE;
1028 :
1029 : use super::*;
1030 : use crate::wal_storage::Storage;
1031 : use std::{ops::Deref, time::Instant};
1032 :
1033 : // fake storage for tests
1034 : struct InMemoryState {
1035 : persisted_state: SafeKeeperState,
1036 : }
1037 :
1038 : #[async_trait::async_trait]
1039 : impl control_file::Storage for InMemoryState {
1040 3 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
1041 3 : self.persisted_state = s.clone();
1042 3 : Ok(())
1043 3 : }
1044 :
1045 UBC 0 : fn last_persist_at(&self) -> Instant {
1046 0 : Instant::now()
1047 0 : }
1048 : }
1049 :
1050 : impl Deref for InMemoryState {
1051 : type Target = SafeKeeperState;
1052 :
1053 CBC 56 : fn deref(&self) -> &Self::Target {
1054 56 : &self.persisted_state
1055 56 : }
1056 : }
1057 :
1058 2 : fn test_sk_state() -> SafeKeeperState {
1059 2 : let mut state = SafeKeeperState::empty();
1060 2 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1061 2 : state.tenant_id = TenantId::from([1u8; 16]);
1062 2 : state.timeline_id = TimelineId::from([1u8; 16]);
1063 2 : state
1064 2 : }
1065 :
1066 : struct DummyWalStore {
1067 : lsn: Lsn,
1068 : }
1069 :
1070 : #[async_trait::async_trait]
1071 : impl wal_storage::Storage for DummyWalStore {
1072 9 : fn flush_lsn(&self) -> Lsn {
1073 9 : self.lsn
1074 9 : }
1075 :
1076 2 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1077 2 : self.lsn = startpos + buf.len() as u64;
1078 2 : Ok(())
1079 2 : }
1080 :
1081 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1082 2 : self.lsn = end_pos;
1083 2 : Ok(())
1084 2 : }
1085 :
1086 4 : async fn flush_wal(&mut self) -> Result<()> {
1087 4 : Ok(())
1088 4 : }
1089 :
1090 UBC 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1091 0 : Box::pin(async { Ok(()) })
1092 0 : }
1093 :
1094 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1095 0 : crate::metrics::WalStorageMetrics::default()
1096 0 : }
1097 : }
1098 :
1099 CBC 1 : #[tokio::test]
1100 1 : async fn test_voting() {
1101 1 : let storage = InMemoryState {
1102 1 : persisted_state: test_sk_state(),
1103 1 : };
1104 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1105 1 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1106 1 :
1107 1 : // check voting for 1 is ok
1108 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1109 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1110 1 : match vote_resp.unwrap() {
1111 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1112 UBC 0 : r => panic!("unexpected response: {:?}", r),
1113 : }
1114 :
1115 : // reboot...
1116 CBC 1 : let state = sk.state.persisted_state.clone();
1117 1 : let storage = InMemoryState {
1118 1 : persisted_state: state,
1119 1 : };
1120 1 :
1121 1 : sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
1122 :
1123 : // and ensure voting second time for 1 is not ok
1124 1 : vote_resp = sk.process_msg(&vote_request).await;
1125 1 : match vote_resp.unwrap() {
1126 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1127 UBC 0 : r => panic!("unexpected response: {:?}", r),
1128 : }
1129 : }
1130 :
1131 CBC 1 : #[tokio::test]
1132 1 : async fn test_epoch_switch() {
1133 1 : let storage = InMemoryState {
1134 1 : persisted_state: test_sk_state(),
1135 1 : };
1136 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1137 1 :
1138 1 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1139 1 :
1140 1 : let mut ar_hdr = AppendRequestHeader {
1141 1 : term: 1,
1142 1 : epoch_start_lsn: Lsn(3),
1143 1 : begin_lsn: Lsn(1),
1144 1 : end_lsn: Lsn(2),
1145 1 : commit_lsn: Lsn(0),
1146 1 : truncate_lsn: Lsn(0),
1147 1 : proposer_uuid: [0; 16],
1148 1 : };
1149 1 : let mut append_request = AppendRequest {
1150 1 : h: ar_hdr.clone(),
1151 1 : wal_data: Bytes::from_static(b"b"),
1152 1 : };
1153 1 :
1154 1 : let pem = ProposerElected {
1155 1 : term: 1,
1156 1 : start_streaming_at: Lsn(1),
1157 1 : term_history: TermHistory(vec![TermLsn {
1158 1 : term: 1,
1159 1 : lsn: Lsn(3),
1160 1 : }]),
1161 1 : timeline_start_lsn: Lsn(0),
1162 1 : };
1163 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1164 UBC 0 : .await
1165 CBC 1 : .unwrap();
1166 :
1167 : // check that AppendRequest before epochStartLsn doesn't switch epoch
1168 1 : let resp = sk
1169 1 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1170 UBC 0 : .await;
1171 CBC 1 : assert!(resp.is_ok());
1172 1 : assert_eq!(sk.get_epoch(), 0);
1173 :
1174 : // but record at epochStartLsn does the switch
1175 1 : ar_hdr.begin_lsn = Lsn(2);
1176 1 : ar_hdr.end_lsn = Lsn(3);
1177 1 : append_request = AppendRequest {
1178 1 : h: ar_hdr,
1179 1 : wal_data: Bytes::from_static(b"b"),
1180 1 : };
1181 1 : let resp = sk
1182 1 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1183 UBC 0 : .await;
1184 CBC 1 : assert!(resp.is_ok());
1185 1 : sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
1186 1 : assert_eq!(sk.get_epoch(), 1);
1187 : }
1188 : }
|