Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use std::time::Duration;
14 : use storage_broker::proto::SafekeeperTimelineInfo;
15 :
16 : use tracing::*;
17 :
18 : use crate::control_file;
19 : use crate::send_wal::HotStandbyFeedback;
20 :
21 : use crate::wal_storage;
22 : use pq_proto::SystemId;
23 : use utils::pageserver_feedback::PageserverFeedback;
24 : use utils::{
25 : bin_ser::LeSer,
26 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
27 : lsn::Lsn,
28 : };
29 :
30 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
31 : pub const SK_FORMAT_VERSION: u32 = 7;
32 : const SK_PROTOCOL_VERSION: u32 = 2;
33 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
34 :
35 : /// Consensus logical timestamp.
36 : pub type Term = u64;
37 : pub const INVALID_TERM: Term = 0;
38 :
39 13384 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
40 : pub struct TermLsn {
41 : pub term: Term,
42 : pub lsn: Lsn,
43 : }
44 :
45 : // Creation from tuple provides less typing (e.g. for unit tests).
46 : impl From<(Term, Lsn)> for TermLsn {
47 4849236 : fn from(pair: (Term, Lsn)) -> TermLsn {
48 4849236 : TermLsn {
49 4849236 : term: pair.0,
50 4849236 : lsn: pair.1,
51 4849236 : }
52 4849236 : }
53 : }
54 :
55 14759 : #[derive(Clone, Serialize, Deserialize)]
56 : pub struct TermHistory(pub Vec<TermLsn>);
57 :
58 : impl TermHistory {
59 527 : pub fn empty() -> TermHistory {
60 527 : TermHistory(Vec::new())
61 527 : }
62 :
63 : // Parse TermHistory as n_entries followed by TermLsn pairs
64 968 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
65 968 : if bytes.remaining() < 4 {
66 0 : bail!("TermHistory misses len");
67 968 : }
68 968 : let n_entries = bytes.get_u32_le();
69 968 : let mut res = Vec::with_capacity(n_entries as usize);
70 968 : for _ in 0..n_entries {
71 5124 : if bytes.remaining() < 16 {
72 0 : bail!("TermHistory is incomplete");
73 5124 : }
74 5124 : res.push(TermLsn {
75 5124 : term: bytes.get_u64_le(),
76 5124 : lsn: bytes.get_u64_le().into(),
77 5124 : })
78 : }
79 968 : Ok(TermHistory(res))
80 968 : }
81 :
82 : /// Return copy of self with switches happening strictly after up_to
83 : /// truncated.
84 18494 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
85 18494 : let mut res = Vec::with_capacity(self.0.len());
86 63204 : for e in &self.0 {
87 44712 : if e.lsn > up_to {
88 2 : break;
89 44710 : }
90 44710 : res.push(*e);
91 : }
92 18494 : TermHistory(res)
93 18494 : }
94 : }
95 :
96 : /// Display only latest entries for Debug.
97 : impl fmt::Debug for TermHistory {
98 3074 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
99 3074 : let n_printed = 20;
100 3074 : write!(
101 3074 : fmt,
102 3074 : "{}{:?}",
103 3074 : if self.0.len() > n_printed { "... " } else { "" },
104 3074 : self.0
105 3074 : .iter()
106 3074 : .rev()
107 3074 : .take(n_printed)
108 10603 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
109 3074 : .collect::<Vec<_>>()
110 3074 : )
111 3074 : }
112 : }
113 :
114 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
115 : pub type PgUuid = [u8; 16];
116 :
117 : /// Persistent consensus state of the acceptor.
118 13787 : #[derive(Debug, Clone, Serialize, Deserialize)]
119 : pub struct AcceptorState {
120 : /// acceptor's last term it voted for (advanced in 1 phase)
121 : pub term: Term,
122 : /// History of term switches for safekeeper's WAL.
123 : /// Actually it often goes *beyond* WAL contents as we adopt term history
124 : /// from the proposer before recovery.
125 : pub term_history: TermHistory,
126 : }
127 :
128 : impl AcceptorState {
129 : /// acceptor's epoch is the term of the highest entry in the log
130 16386 : pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
131 16386 : let th = self.term_history.up_to(flush_lsn);
132 16386 : match th.0.last() {
133 15807 : Some(e) => e.term,
134 579 : None => 0,
135 : }
136 16386 : }
137 : }
138 :
139 : /// Information about Postgres. Safekeeper gets it once and then verifies
140 : /// all further connections from computes match.
141 13787 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142 : pub struct ServerInfo {
143 : /// Postgres server version
144 : pub pg_version: u32,
145 : pub system_id: SystemId,
146 : pub wal_seg_size: u32,
147 : }
148 :
149 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
150 : pub struct PersistedPeerInfo {
151 : /// LSN up to which safekeeper offloaded WAL to s3.
152 : backup_lsn: Lsn,
153 : /// Term of the last entry.
154 : term: Term,
155 : /// LSN of the last record.
156 : flush_lsn: Lsn,
157 : /// Up to which LSN safekeeper regards its WAL as committed.
158 : commit_lsn: Lsn,
159 : }
160 :
161 : impl PersistedPeerInfo {
162 0 : fn new() -> Self {
163 0 : Self {
164 0 : backup_lsn: Lsn::INVALID,
165 0 : term: INVALID_TERM,
166 0 : flush_lsn: Lsn(0),
167 0 : commit_lsn: Lsn(0),
168 0 : }
169 0 : }
170 : }
171 :
172 13787 : #[derive(Debug, Clone, Serialize, Deserialize)]
173 : pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
174 :
175 : /// Persistent information stored on safekeeper node
176 : /// On disk data is prefixed by magic and format version and followed by checksum.
177 13787 : #[derive(Debug, Clone, Serialize, Deserialize)]
178 : pub struct SafeKeeperState {
179 : #[serde(with = "hex")]
180 : pub tenant_id: TenantId,
181 : #[serde(with = "hex")]
182 : pub timeline_id: TimelineId,
183 : /// persistent acceptor state
184 : pub acceptor_state: AcceptorState,
185 : /// information about server
186 : pub server: ServerInfo,
187 : /// Unique id of the last *elected* proposer we dealt with. Not needed
188 : /// for correctness, exists for monitoring purposes.
189 : #[serde(with = "hex")]
190 : pub proposer_uuid: PgUuid,
191 : /// Since which LSN this timeline generally starts. Safekeeper might have
192 : /// joined later.
193 : pub timeline_start_lsn: Lsn,
194 : /// Since which LSN safekeeper has (had) WAL for this timeline.
195 : /// All WAL segments next to one containing local_start_lsn are
196 : /// filled with data from the beginning.
197 : pub local_start_lsn: Lsn,
198 : /// Part of WAL acknowledged by quorum *and available locally*. Always points
199 : /// to record boundary.
200 : pub commit_lsn: Lsn,
201 : /// LSN that points to the end of the last backed up segment. Useful to
202 : /// persist to avoid finding out offloading progress on boot.
203 : pub backup_lsn: Lsn,
204 : /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
205 : /// of last record streamed to everyone). Persisting it helps skipping
206 : /// recovery in walproposer, generally we compute it from peers. In
207 : /// walproposer proto called 'truncate_lsn'. Updates are currently drived
208 : /// only by walproposer.
209 : pub peer_horizon_lsn: Lsn,
210 : /// LSN of the oldest known checkpoint made by pageserver and successfully
211 : /// pushed to s3. We don't remove WAL beyond it. Persisted only for
212 : /// informational purposes, we receive it from pageserver (or broker).
213 : pub remote_consistent_lsn: Lsn,
214 : // Peers and their state as we remember it. Knowing peers themselves is
215 : // fundamental; but state is saved here only for informational purposes and
216 : // obviously can be stale. (Currently not saved at all, but let's provision
217 : // place to have less file version upgrades).
218 : pub peers: PersistedPeers,
219 : }
220 :
221 2907 : #[derive(Debug, Clone, Serialize, Deserialize)]
222 : // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
223 : // are not flushed yet.
224 : pub struct SafekeeperMemState {
225 : pub commit_lsn: Lsn,
226 : pub backup_lsn: Lsn,
227 : pub peer_horizon_lsn: Lsn,
228 : #[serde(with = "hex")]
229 : pub proposer_uuid: PgUuid,
230 : }
231 :
232 : impl SafeKeeperState {
233 527 : pub fn new(
234 527 : ttid: &TenantTimelineId,
235 527 : server_info: ServerInfo,
236 527 : peers: Vec<NodeId>,
237 527 : commit_lsn: Lsn,
238 527 : local_start_lsn: Lsn,
239 527 : ) -> SafeKeeperState {
240 527 : SafeKeeperState {
241 527 : tenant_id: ttid.tenant_id,
242 527 : timeline_id: ttid.timeline_id,
243 527 : acceptor_state: AcceptorState {
244 527 : term: 0,
245 527 : term_history: TermHistory::empty(),
246 527 : },
247 527 : server: server_info,
248 527 : proposer_uuid: [0; 16],
249 527 : timeline_start_lsn: Lsn(0),
250 527 : local_start_lsn,
251 527 : commit_lsn,
252 527 : backup_lsn: local_start_lsn,
253 527 : peer_horizon_lsn: local_start_lsn,
254 527 : remote_consistent_lsn: Lsn(0),
255 527 : peers: PersistedPeers(
256 527 : peers
257 527 : .iter()
258 527 : .map(|p| (*p, PersistedPeerInfo::new()))
259 527 : .collect(),
260 527 : ),
261 527 : }
262 527 : }
263 :
264 : #[cfg(test)]
265 4 : pub fn empty() -> Self {
266 4 : SafeKeeperState::new(
267 4 : &TenantTimelineId::empty(),
268 4 : ServerInfo {
269 4 : pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
270 4 : system_id: 0, /* Postgres system identifier */
271 4 : wal_seg_size: 0,
272 4 : },
273 4 : vec![],
274 4 : Lsn::INVALID,
275 4 : Lsn::INVALID,
276 4 : )
277 4 : }
278 : }
279 :
280 : // protocol messages
281 :
282 : /// Initial Proposer -> Acceptor message
283 2105 : #[derive(Debug, Deserialize)]
284 : pub struct ProposerGreeting {
285 : /// proposer-acceptor protocol version
286 : pub protocol_version: u32,
287 : /// Postgres server version
288 : pub pg_version: u32,
289 : pub proposer_id: PgUuid,
290 : pub system_id: SystemId,
291 : pub timeline_id: TimelineId,
292 : pub tenant_id: TenantId,
293 : pub tli: TimeLineID,
294 : pub wal_seg_size: u32,
295 : }
296 :
297 : /// Acceptor -> Proposer initial response: the highest term known to me
298 : /// (acceptor voted for).
299 0 : #[derive(Debug, Serialize)]
300 : pub struct AcceptorGreeting {
301 : term: u64,
302 : node_id: NodeId,
303 : }
304 :
305 : /// Vote request sent from proposer to safekeepers
306 2103 : #[derive(Debug, Deserialize)]
307 : pub struct VoteRequest {
308 : term: Term,
309 : }
310 :
311 : /// Vote itself, sent from safekeeper to proposer
312 2103 : #[derive(Debug, Serialize)]
313 : pub struct VoteResponse {
314 : term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
315 : vote_given: u64, // fixme u64 due to padding
316 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
317 : // proposer to choose the most advanced one.
318 : flush_lsn: Lsn,
319 : truncate_lsn: Lsn,
320 : term_history: TermHistory,
321 : timeline_start_lsn: Lsn,
322 : }
323 :
324 : /*
325 : * Proposer -> Acceptor message announcing proposer is elected and communicating
326 : * term history to it.
327 : */
328 971 : #[derive(Debug)]
329 : pub struct ProposerElected {
330 : pub term: Term,
331 : pub start_streaming_at: Lsn,
332 : pub term_history: TermHistory,
333 : pub timeline_start_lsn: Lsn,
334 : }
335 :
336 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
337 : /// communicates commit_lsn.
338 0 : #[derive(Debug)]
339 : pub struct AppendRequest {
340 : pub h: AppendRequestHeader,
341 : pub wal_data: Bytes,
342 : }
343 2643242 : #[derive(Debug, Clone, Deserialize)]
344 : pub struct AppendRequestHeader {
345 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
346 : pub term: Term,
347 : // LSN since the proposer appends WAL; determines epoch switch point.
348 : pub epoch_start_lsn: Lsn,
349 : /// start position of message in WAL
350 : pub begin_lsn: Lsn,
351 : /// end position of message in WAL
352 : pub end_lsn: Lsn,
353 : /// LSN committed by quorum of safekeepers
354 : pub commit_lsn: Lsn,
355 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
356 : pub truncate_lsn: Lsn,
357 : // only for logging/debugging
358 : pub proposer_uuid: PgUuid,
359 : }
360 :
361 : /// Report safekeeper state to proposer
362 3 : #[derive(Debug, Serialize)]
363 : pub struct AppendResponse {
364 : // Current term of the safekeeper; if it is higher than proposer's, the
365 : // compute is out of date.
366 : pub term: Term,
367 : // NOTE: this is physical end of wal on safekeeper; currently it doesn't
368 : // make much sense without taking epoch into account, as history can be
369 : // diverged.
370 : pub flush_lsn: Lsn,
371 : // We report back our awareness about which WAL is committed, as this is
372 : // a criterion for walproposer --sync mode exit
373 : pub commit_lsn: Lsn,
374 : pub hs_feedback: HotStandbyFeedback,
375 : pub pageserver_feedback: PageserverFeedback,
376 : }
377 :
378 : impl AppendResponse {
379 72 : fn term_only(term: Term) -> AppendResponse {
380 72 : AppendResponse {
381 72 : term,
382 72 : flush_lsn: Lsn(0),
383 72 : commit_lsn: Lsn(0),
384 72 : hs_feedback: HotStandbyFeedback::empty(),
385 72 : pageserver_feedback: PageserverFeedback::empty(),
386 72 : }
387 72 : }
388 : }
389 :
390 : /// Proposer -> Acceptor messages
391 0 : #[derive(Debug)]
392 : pub enum ProposerAcceptorMessage {
393 : Greeting(ProposerGreeting),
394 : VoteRequest(VoteRequest),
395 : Elected(ProposerElected),
396 : AppendRequest(AppendRequest),
397 : NoFlushAppendRequest(AppendRequest),
398 : FlushWAL,
399 : }
400 :
401 : impl ProposerAcceptorMessage {
402 : /// Parse proposer message.
403 2648418 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
404 2648418 : // xxx using Reader is inefficient but easy to work with bincode
405 2648418 : let mut stream = msg_bytes.reader();
406 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
407 2648418 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
408 2648418 : match tag {
409 : 'g' => {
410 2105 : let msg = ProposerGreeting::des_from(&mut stream)?;
411 2105 : Ok(ProposerAcceptorMessage::Greeting(msg))
412 : }
413 : 'v' => {
414 2103 : let msg = VoteRequest::des_from(&mut stream)?;
415 2103 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
416 : }
417 : 'e' => {
418 968 : let mut msg_bytes = stream.into_inner();
419 968 : if msg_bytes.remaining() < 16 {
420 0 : bail!("ProposerElected message is not complete");
421 968 : }
422 968 : let term = msg_bytes.get_u64_le();
423 968 : let start_streaming_at = msg_bytes.get_u64_le().into();
424 968 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
425 968 : if msg_bytes.remaining() < 8 {
426 0 : bail!("ProposerElected message is not complete");
427 968 : }
428 968 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
429 968 : let msg = ProposerElected {
430 968 : term,
431 968 : start_streaming_at,
432 968 : timeline_start_lsn,
433 968 : term_history,
434 968 : };
435 968 : Ok(ProposerAcceptorMessage::Elected(msg))
436 : }
437 : 'a' => {
438 : // read header followed by wal data
439 2643242 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
440 2643242 : let rec_size = hdr
441 2643242 : .end_lsn
442 2643242 : .checked_sub(hdr.begin_lsn)
443 2643242 : .context("begin_lsn > end_lsn in AppendRequest")?
444 : .0 as usize;
445 2643242 : if rec_size > MAX_SEND_SIZE {
446 0 : bail!(
447 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
448 0 : MAX_SEND_SIZE
449 0 : );
450 2643242 : }
451 2643242 :
452 2643242 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
453 2643242 : stream.read_exact(&mut wal_data_vec)?;
454 2643242 : let wal_data = Bytes::from(wal_data_vec);
455 2643242 : let msg = AppendRequest { h: hdr, wal_data };
456 2643242 :
457 2643242 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
458 : }
459 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag,),
460 : }
461 2648418 : }
462 : }
463 :
464 : /// Acceptor -> Proposer messages
465 0 : #[derive(Debug)]
466 : pub enum AcceptorProposerMessage {
467 : Greeting(AcceptorGreeting),
468 : VoteResponse(VoteResponse),
469 : AppendResponse(AppendResponse),
470 : }
471 :
472 : impl AcceptorProposerMessage {
473 : /// Serialize acceptor -> proposer message.
474 2204482 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
475 2204482 : match self {
476 2103 : AcceptorProposerMessage::Greeting(msg) => {
477 2103 : buf.put_u64_le('g' as u64);
478 2103 : buf.put_u64_le(msg.term);
479 2103 : buf.put_u64_le(msg.node_id.0);
480 2103 : }
481 2099 : AcceptorProposerMessage::VoteResponse(msg) => {
482 2099 : buf.put_u64_le('v' as u64);
483 2099 : buf.put_u64_le(msg.term);
484 2099 : buf.put_u64_le(msg.vote_given);
485 2099 : buf.put_u64_le(msg.flush_lsn.into());
486 2099 : buf.put_u64_le(msg.truncate_lsn.into());
487 2099 : buf.put_u32_le(msg.term_history.0.len() as u32);
488 8886 : for e in &msg.term_history.0 {
489 6787 : buf.put_u64_le(e.term);
490 6787 : buf.put_u64_le(e.lsn.into());
491 6787 : }
492 2099 : buf.put_u64_le(msg.timeline_start_lsn.into());
493 : }
494 2200280 : AcceptorProposerMessage::AppendResponse(msg) => {
495 2200280 : buf.put_u64_le('a' as u64);
496 2200280 : buf.put_u64_le(msg.term);
497 2200280 : buf.put_u64_le(msg.flush_lsn.into());
498 2200280 : buf.put_u64_le(msg.commit_lsn.into());
499 2200280 : buf.put_i64_le(msg.hs_feedback.ts);
500 2200280 : buf.put_u64_le(msg.hs_feedback.xmin);
501 2200280 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
502 2200280 :
503 2200280 : msg.pageserver_feedback.serialize(buf);
504 2200280 : }
505 : }
506 :
507 2204482 : Ok(())
508 2204482 : }
509 : }
510 :
511 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
512 : /// It controls all WAL disk writes and updates of control file.
513 : ///
514 : /// Currently safekeeper processes:
515 : /// - messages from compute (proposers) and provides replies
516 : /// - messages from broker peers
517 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
518 : /// LSN since the proposer safekeeper currently talking to appends WAL;
519 : /// determines epoch switch point.
520 : pub epoch_start_lsn: Lsn,
521 :
522 : pub inmem: SafekeeperMemState, // in memory part
523 : pub state: CTRL, // persistent state storage
524 :
525 : pub wal_store: WAL,
526 :
527 : node_id: NodeId, // safekeeper's node id
528 : }
529 :
530 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
531 : where
532 : CTRL: control_file::Storage,
533 : WAL: wal_storage::Storage,
534 : {
535 : /// Accepts a control file storage containing the safekeeper state.
536 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
537 : /// and `server` (`wal_seg_size` inside it) fields.
538 607 : pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
539 607 : if state.tenant_id == TenantId::from([0u8; 16])
540 607 : || state.timeline_id == TimelineId::from([0u8; 16])
541 : {
542 0 : bail!(
543 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
544 0 : state.tenant_id,
545 0 : state.timeline_id
546 0 : );
547 607 : }
548 607 :
549 607 : Ok(SafeKeeper {
550 607 : epoch_start_lsn: Lsn(0),
551 607 : inmem: SafekeeperMemState {
552 607 : commit_lsn: state.commit_lsn,
553 607 : backup_lsn: state.backup_lsn,
554 607 : peer_horizon_lsn: state.peer_horizon_lsn,
555 607 : proposer_uuid: state.proposer_uuid,
556 607 : },
557 607 : state,
558 607 : wal_store,
559 607 : node_id,
560 607 : })
561 607 : }
562 :
563 : /// Get history of term switches for the available WAL
564 2105 : fn get_term_history(&self) -> TermHistory {
565 2105 : self.state
566 2105 : .acceptor_state
567 2105 : .term_history
568 2105 : .up_to(self.flush_lsn())
569 2105 : }
570 :
571 : /// Get current term.
572 4848713 : pub fn get_term(&self) -> Term {
573 4848713 : self.state.acceptor_state.term
574 4848713 : }
575 :
576 16187 : pub fn get_epoch(&self) -> Term {
577 16187 : self.state.acceptor_state.get_epoch(self.flush_lsn())
578 16187 : }
579 :
580 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
581 9725803 : pub fn flush_lsn(&self) -> Lsn {
582 9725803 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
583 9725803 : }
584 :
585 : /// Process message from proposer and possibly form reply. Concurrent
586 : /// callers must exclude each other.
587 4848637 : pub async fn process_msg(
588 4848637 : &mut self,
589 4848637 : msg: &ProposerAcceptorMessage,
590 4848637 : ) -> Result<Option<AcceptorProposerMessage>> {
591 4848637 : match msg {
592 2105 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
593 6142 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
594 1736690 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
595 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
596 21 : self.handle_append_request(msg, true).await
597 : }
598 2643242 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
599 3045045 : self.handle_append_request(msg, false).await
600 : }
601 2200208 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
602 : }
603 4848637 : }
604 :
605 : /// Handle initial message from proposer: check its sanity and send my
606 : /// current term.
607 2105 : async fn handle_greeting(
608 2105 : &mut self,
609 2105 : msg: &ProposerGreeting,
610 2105 : ) -> Result<Option<AcceptorProposerMessage>> {
611 2105 : // Check protocol compatibility
612 2105 : if msg.protocol_version != SK_PROTOCOL_VERSION {
613 0 : bail!(
614 0 : "incompatible protocol version {}, expected {}",
615 0 : msg.protocol_version,
616 0 : SK_PROTOCOL_VERSION
617 0 : );
618 2105 : }
619 2105 : /* Postgres major version mismatch is treated as fatal error
620 2105 : * because safekeepers parse WAL headers and the format
621 2105 : * may change between versions.
622 2105 : */
623 2105 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
624 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
625 : {
626 0 : bail!(
627 0 : "incompatible server version {}, expected {}",
628 0 : msg.pg_version,
629 0 : self.state.server.pg_version
630 0 : );
631 2105 : }
632 2105 :
633 2105 : if msg.tenant_id != self.state.tenant_id {
634 0 : bail!(
635 0 : "invalid tenant ID, got {}, expected {}",
636 0 : msg.tenant_id,
637 0 : self.state.tenant_id
638 0 : );
639 2105 : }
640 2105 : if msg.timeline_id != self.state.timeline_id {
641 0 : bail!(
642 0 : "invalid timeline ID, got {}, expected {}",
643 0 : msg.timeline_id,
644 0 : self.state.timeline_id
645 0 : );
646 2105 : }
647 2105 : if self.state.server.wal_seg_size != msg.wal_seg_size {
648 0 : bail!(
649 0 : "invalid wal_seg_size, got {}, expected {}",
650 0 : msg.wal_seg_size,
651 0 : self.state.server.wal_seg_size
652 0 : );
653 2105 : }
654 2105 :
655 2105 : // system_id will be updated on mismatch
656 2105 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
657 2105 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
658 518 : if self.state.server.system_id != 0 {
659 0 : warn!(
660 0 : "unexpected system ID arrived, got {}, expected {}",
661 0 : msg.system_id, self.state.server.system_id
662 0 : );
663 518 : }
664 :
665 518 : let mut state = self.state.clone();
666 518 : state.server.system_id = msg.system_id;
667 518 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
668 518 : state.server.pg_version = msg.pg_version;
669 518 : }
670 1554 : self.state.persist(&state).await?;
671 1587 : }
672 :
673 2105 : info!(
674 2105 : "processed greeting from walproposer {}, sending term {:?}",
675 33680 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
676 2105 : self.state.acceptor_state.term
677 2105 : );
678 2105 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
679 2105 : term: self.state.acceptor_state.term,
680 2105 : node_id: self.node_id,
681 2105 : })))
682 2105 : }
683 :
684 : /// Give vote for the given term, if we haven't done that previously.
685 2105 : async fn handle_vote_request(
686 2105 : &mut self,
687 2105 : msg: &VoteRequest,
688 2105 : ) -> Result<Option<AcceptorProposerMessage>> {
689 : // Once voted, we won't accept data from older proposers; flush
690 : // everything we've already received so that new proposer starts
691 : // streaming at end of our WAL, without overlap. Currently we truncate
692 : // WAL at streaming point, so this avoids truncating already committed
693 : // WAL.
694 : //
695 : // TODO: it would be smoother to not truncate committed piece at
696 : // handle_elected instead. Currently not a big deal, as proposer is the
697 : // only source of WAL; with peer2peer recovery it would be more
698 : // important.
699 2105 : self.wal_store.flush_wal().await?;
700 : // initialize with refusal
701 2105 : let mut resp = VoteResponse {
702 2105 : term: self.state.acceptor_state.term,
703 2105 : vote_given: false as u64,
704 2105 : flush_lsn: self.flush_lsn(),
705 2105 : truncate_lsn: self.inmem.peer_horizon_lsn,
706 2105 : term_history: self.get_term_history(),
707 2105 : timeline_start_lsn: self.state.timeline_start_lsn,
708 2105 : };
709 2105 : if self.state.acceptor_state.term < msg.term {
710 2049 : let mut state = self.state.clone();
711 2049 : state.acceptor_state.term = msg.term;
712 2049 : // persist vote before sending it out
713 6142 : self.state.persist(&state).await?;
714 :
715 2049 : resp.term = self.state.acceptor_state.term;
716 2049 : resp.vote_given = true as u64;
717 56 : }
718 2103 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
719 2105 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
720 2105 : }
721 :
722 : /// Form AppendResponse from current state.
723 2200213 : fn append_response(&self) -> AppendResponse {
724 2200213 : let ar = AppendResponse {
725 2200213 : term: self.state.acceptor_state.term,
726 2200213 : flush_lsn: self.flush_lsn(),
727 2200213 : commit_lsn: self.state.commit_lsn,
728 2200213 : // will be filled by the upper code to avoid bothering safekeeper
729 2200213 : hs_feedback: HotStandbyFeedback::empty(),
730 2200213 : pageserver_feedback: PageserverFeedback::empty(),
731 2200213 : };
732 2200213 : trace!("formed AppendResponse {:?}", ar);
733 2200213 : ar
734 2200213 : }
735 :
736 972 : async fn handle_elected(
737 972 : &mut self,
738 972 : msg: &ProposerElected,
739 972 : ) -> Result<Option<AcceptorProposerMessage>> {
740 971 : info!("received ProposerElected {:?}", msg);
741 972 : if self.state.acceptor_state.term < msg.term {
742 4 : let mut state = self.state.clone();
743 4 : state.acceptor_state.term = msg.term;
744 9 : self.state.persist(&state).await?;
745 968 : }
746 :
747 : // If our term is higher, ignore the message (next feedback will inform the compute)
748 972 : if self.state.acceptor_state.term > msg.term {
749 0 : return Ok(None);
750 972 : }
751 972 :
752 972 : // This might happen in a rare race when another (old) connection from
753 972 : // the same walproposer writes + flushes WAL after this connection
754 972 : // already sent flush_lsn in VoteRequest. It is generally safe to
755 972 : // proceed, but to prevent commit_lsn surprisingly going down we should
756 972 : // either refuse the session (simpler) or skip the part we already have
757 972 : // from the stream (can be implemented).
758 972 : if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
759 0 : bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
760 0 : msg.term, self.flush_lsn(), msg.start_streaming_at)
761 972 : }
762 972 : // Otherwise this shouldn't happen.
763 972 : assert!(
764 972 : msg.start_streaming_at >= self.inmem.commit_lsn,
765 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
766 : msg.start_streaming_at,
767 : self.inmem.commit_lsn
768 : );
769 :
770 : // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
771 : // intersection of our history and history from msg
772 :
773 : // truncate wal, update the LSNs
774 1733812 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
775 :
776 : // and now adopt term history from proposer
777 : {
778 972 : let mut state = self.state.clone();
779 972 :
780 972 : // Here we learn initial LSN for the first time, set fields
781 972 : // interested in that.
782 972 :
783 972 : if state.timeline_start_lsn == Lsn(0) {
784 : // Remember point where WAL begins globally.
785 523 : state.timeline_start_lsn = msg.timeline_start_lsn;
786 522 : info!(
787 522 : "setting timeline_start_lsn to {:?}",
788 522 : state.timeline_start_lsn
789 522 : );
790 449 : }
791 972 : if state.local_start_lsn == Lsn(0) {
792 514 : state.local_start_lsn = msg.start_streaming_at;
793 513 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
794 458 : }
795 : // Initializing commit_lsn before acking first flushed record is
796 : // important to let find_end_of_wal skip the hole in the beginning
797 : // of the first segment.
798 : //
799 : // NB: on new clusters, this happens at the same time as
800 : // timeline_start_lsn initialization, it is taken outside to provide
801 : // upgrade.
802 972 : self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
803 972 :
804 972 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
805 972 : self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
806 972 :
807 972 : state.acceptor_state.term_history = msg.term_history.clone();
808 2869 : self.persist_control_file(state).await?;
809 : }
810 :
811 971 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
812 :
813 972 : Ok(None)
814 972 : }
815 :
816 : /// Advance commit_lsn taking into account what we have locally.
817 : ///
818 : /// Note: it is assumed that 'WAL we have is from the right term' check has
819 : /// already been done outside.
820 2650125 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
821 2650125 : // Both peers and walproposer communicate this value, we might already
822 2650125 : // have a fresher (higher) version.
823 2650125 : candidate = max(candidate, self.inmem.commit_lsn);
824 2650125 : let commit_lsn = min(candidate, self.flush_lsn());
825 2650125 : assert!(
826 2650125 : commit_lsn >= self.inmem.commit_lsn,
827 0 : "commit_lsn monotonicity violated: old={} new={}",
828 : self.inmem.commit_lsn,
829 : commit_lsn
830 : );
831 :
832 2650125 : self.inmem.commit_lsn = commit_lsn;
833 2650125 :
834 2650125 : // If new commit_lsn reached epoch switch, force sync of control
835 2650125 : // file: walproposer in sync mode is very interested when this
836 2650125 : // happens. Note: this is for sync-safekeepers mode only, as
837 2650125 : // otherwise commit_lsn might jump over epoch_start_lsn.
838 2650125 : // Also note that commit_lsn can reach epoch_start_lsn earlier
839 2650125 : // that we receive new epoch_start_lsn, and we still need to sync
840 2650125 : // control file in this case.
841 2650125 : if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
842 63 : self.persist_control_file(self.state.clone()).await?;
843 2650104 : }
844 :
845 2650125 : Ok(())
846 2650125 : }
847 :
848 : /// Persist control file to disk, called only after timeline creation (bootstrap).
849 523 : pub async fn persist(&mut self) -> Result<()> {
850 1687 : self.persist_control_file(self.state.clone()).await
851 523 : }
852 :
853 : /// Persist in-memory state to the disk, taking other data from state.
854 2869 : async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
855 2869 : state.commit_lsn = self.inmem.commit_lsn;
856 2869 : state.backup_lsn = self.inmem.backup_lsn;
857 2869 : state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
858 2869 : state.proposer_uuid = self.inmem.proposer_uuid;
859 8622 : self.state.persist(&state).await
860 2869 : }
861 :
862 : /// Persist control file if there is something to save and enough time
863 : /// passed after the last save.
864 1315 : pub async fn maybe_persist_control_file(
865 1315 : &mut self,
866 1315 : inmem_remote_consistent_lsn: Lsn,
867 1315 : ) -> Result<()> {
868 1315 : const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
869 1315 : if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
870 1314 : return Ok(());
871 1 : }
872 1 : let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
873 0 : || self.inmem.backup_lsn > self.state.backup_lsn
874 0 : || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
875 0 : || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
876 1 : if need_persist {
877 1 : let mut state = self.state.clone();
878 1 : state.remote_consistent_lsn = inmem_remote_consistent_lsn;
879 3 : self.persist_control_file(state).await?;
880 0 : trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
881 0 : }
882 1 : Ok(())
883 1315 : }
884 :
885 : /// Handle request to append WAL.
886 : #[allow(clippy::comparison_chain)]
887 2643247 : async fn handle_append_request(
888 2643247 : &mut self,
889 2643247 : msg: &AppendRequest,
890 2643247 : require_flush: bool,
891 2643247 : ) -> Result<Option<AcceptorProposerMessage>> {
892 2643247 : if self.state.acceptor_state.term < msg.h.term {
893 0 : bail!("got AppendRequest before ProposerElected");
894 2643247 : }
895 2643247 :
896 2643247 : // If our term is higher, immediately refuse the message.
897 2643247 : if self.state.acceptor_state.term > msg.h.term {
898 72 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
899 72 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
900 2643175 : }
901 2643175 :
902 2643175 : // Now we know that we are in the same term as the proposer,
903 2643175 : // processing the message.
904 2643175 :
905 2643175 : self.epoch_start_lsn = msg.h.epoch_start_lsn;
906 2643175 : self.inmem.proposer_uuid = msg.h.proposer_uuid;
907 2643175 :
908 2643175 : // do the job
909 2643175 : if !msg.wal_data.is_empty() {
910 1460404 : self.wal_store
911 1460404 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
912 3041998 : .await?;
913 1182771 : }
914 :
915 : // flush wal to the disk, if required
916 2643175 : if require_flush {
917 5 : self.wal_store.flush_wal().await?;
918 2643170 : }
919 :
920 : // Update commit_lsn.
921 2643175 : if msg.h.commit_lsn != Lsn(0) {
922 2641221 : self.update_commit_lsn(msg.h.commit_lsn).await?;
923 1954 : }
924 : // Value calculated by walproposer can always lag:
925 : // - safekeepers can forget inmem value and send to proposer lower
926 : // persisted one on restart;
927 : // - if we make safekeepers always send persistent value,
928 : // any compute restart would pull it down.
929 : // Thus, take max before adopting.
930 2643175 : self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
931 2643175 :
932 2643175 : // Update truncate and commit LSN in control file.
933 2643175 : // To avoid negative impact on performance of extra fsync, do it only
934 2643175 : // when truncate_lsn delta exceeds WAL segment size.
935 2643175 : if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
936 2643175 : < self.inmem.peer_horizon_lsn
937 : {
938 3011 : self.persist_control_file(self.state.clone()).await?;
939 2642153 : }
940 :
941 0 : trace!(
942 0 : "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
943 0 : msg.wal_data.len(),
944 0 : msg.h.end_lsn,
945 0 : msg.h.commit_lsn,
946 0 : msg.h.truncate_lsn,
947 0 : require_flush,
948 0 : );
949 :
950 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
951 2643175 : if !require_flush {
952 2643170 : return Ok(None);
953 5 : }
954 5 :
955 5 : let resp = self.append_response();
956 5 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
957 2643247 : }
958 :
959 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
960 2200208 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
961 2200208 : self.wal_store.flush_wal().await?;
962 2200208 : Ok(Some(AcceptorProposerMessage::AppendResponse(
963 2200208 : self.append_response(),
964 2200208 : )))
965 2200208 : }
966 :
967 : /// Update timeline state with peer safekeeper data.
968 9077 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
969 9077 : let mut sync_control_file = false;
970 9077 :
971 9077 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
972 : // Note: the check is too restrictive, generally we can update local
973 : // commit_lsn if our history matches (is part of) history of advanced
974 : // commit_lsn provider.
975 8913 : if sk_info.last_log_term == self.get_epoch() {
976 8904 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
977 9 : }
978 164 : }
979 :
980 9077 : let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
981 9077 : sync_control_file |=
982 9077 : self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
983 9077 : self.inmem.backup_lsn = new_backup_lsn;
984 9077 :
985 9077 : // value in sk_info should be maximized over our local in memory value.
986 9077 : let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
987 9077 : assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
988 9077 : sync_control_file |= self.state.remote_consistent_lsn
989 9077 : + (self.state.server.wal_seg_size as u64)
990 9077 : < new_remote_consistent_lsn;
991 9077 :
992 9077 : let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
993 9077 : sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
994 9077 : < new_peer_horizon_lsn;
995 9077 : self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
996 9077 :
997 9077 : if sync_control_file {
998 330 : let mut state = self.state.clone();
999 330 : // Note: we could make remote_consistent_lsn update in cf common by
1000 330 : // storing Arc to walsenders in Safekeeper.
1001 330 : state.remote_consistent_lsn = new_remote_consistent_lsn;
1002 989 : self.persist_control_file(state).await?;
1003 8747 : }
1004 9077 : Ok(())
1005 9077 : }
1006 :
1007 : /// Get oldest segno we still need to keep. We hold WAL till it is consumed
1008 : /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
1009 : /// offloading.
1010 : /// While it is safe to use inmem values for determining horizon,
1011 : /// we use persistent to make possible normal states less surprising.
1012 1315 : pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
1013 1315 : let mut horizon_lsn = min(
1014 1315 : self.state.remote_consistent_lsn,
1015 1315 : self.state.peer_horizon_lsn,
1016 1315 : );
1017 1315 : if wal_backup_enabled {
1018 1315 : horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
1019 1315 : }
1020 1315 : horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
1021 1315 : }
1022 : }
1023 :
1024 : #[cfg(test)]
1025 : mod tests {
1026 : use futures::future::BoxFuture;
1027 : use postgres_ffi::WAL_SEGMENT_SIZE;
1028 :
1029 : use super::*;
1030 : use crate::wal_storage::Storage;
1031 : use std::{ops::Deref, time::Instant};
1032 :
1033 : // fake storage for tests
1034 : struct InMemoryState {
1035 : persisted_state: SafeKeeperState,
1036 : }
1037 :
1038 : #[async_trait::async_trait]
1039 : impl control_file::Storage for InMemoryState {
1040 3 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
1041 3 : self.persisted_state = s.clone();
1042 3 : Ok(())
1043 3 : }
1044 :
1045 0 : fn last_persist_at(&self) -> Instant {
1046 0 : Instant::now()
1047 0 : }
1048 : }
1049 :
1050 : impl Deref for InMemoryState {
1051 : type Target = SafeKeeperState;
1052 :
1053 56 : fn deref(&self) -> &Self::Target {
1054 56 : &self.persisted_state
1055 56 : }
1056 : }
1057 :
1058 2 : fn test_sk_state() -> SafeKeeperState {
1059 2 : let mut state = SafeKeeperState::empty();
1060 2 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1061 2 : state.tenant_id = TenantId::from([1u8; 16]);
1062 2 : state.timeline_id = TimelineId::from([1u8; 16]);
1063 2 : state
1064 2 : }
1065 :
1066 : struct DummyWalStore {
1067 : lsn: Lsn,
1068 : }
1069 :
1070 : #[async_trait::async_trait]
1071 : impl wal_storage::Storage for DummyWalStore {
1072 9 : fn flush_lsn(&self) -> Lsn {
1073 9 : self.lsn
1074 9 : }
1075 :
1076 2 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1077 2 : self.lsn = startpos + buf.len() as u64;
1078 2 : Ok(())
1079 2 : }
1080 :
1081 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1082 2 : self.lsn = end_pos;
1083 2 : Ok(())
1084 2 : }
1085 :
1086 4 : async fn flush_wal(&mut self) -> Result<()> {
1087 4 : Ok(())
1088 4 : }
1089 :
1090 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1091 0 : Box::pin(async { Ok(()) })
1092 0 : }
1093 :
1094 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1095 0 : crate::metrics::WalStorageMetrics::default()
1096 0 : }
1097 : }
1098 :
1099 1 : #[tokio::test]
1100 1 : async fn test_voting() {
1101 1 : let storage = InMemoryState {
1102 1 : persisted_state: test_sk_state(),
1103 1 : };
1104 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1105 1 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1106 1 :
1107 1 : // check voting for 1 is ok
1108 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1109 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1110 1 : match vote_resp.unwrap() {
1111 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1112 0 : r => panic!("unexpected response: {:?}", r),
1113 : }
1114 :
1115 : // reboot...
1116 1 : let state = sk.state.persisted_state.clone();
1117 1 : let storage = InMemoryState {
1118 1 : persisted_state: state,
1119 1 : };
1120 1 :
1121 1 : sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
1122 :
1123 : // and ensure voting second time for 1 is not ok
1124 1 : vote_resp = sk.process_msg(&vote_request).await;
1125 1 : match vote_resp.unwrap() {
1126 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1127 0 : r => panic!("unexpected response: {:?}", r),
1128 : }
1129 : }
1130 :
1131 1 : #[tokio::test]
1132 1 : async fn test_epoch_switch() {
1133 1 : let storage = InMemoryState {
1134 1 : persisted_state: test_sk_state(),
1135 1 : };
1136 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1137 1 :
1138 1 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1139 1 :
1140 1 : let mut ar_hdr = AppendRequestHeader {
1141 1 : term: 1,
1142 1 : epoch_start_lsn: Lsn(3),
1143 1 : begin_lsn: Lsn(1),
1144 1 : end_lsn: Lsn(2),
1145 1 : commit_lsn: Lsn(0),
1146 1 : truncate_lsn: Lsn(0),
1147 1 : proposer_uuid: [0; 16],
1148 1 : };
1149 1 : let mut append_request = AppendRequest {
1150 1 : h: ar_hdr.clone(),
1151 1 : wal_data: Bytes::from_static(b"b"),
1152 1 : };
1153 1 :
1154 1 : let pem = ProposerElected {
1155 1 : term: 1,
1156 1 : start_streaming_at: Lsn(1),
1157 1 : term_history: TermHistory(vec![TermLsn {
1158 1 : term: 1,
1159 1 : lsn: Lsn(3),
1160 1 : }]),
1161 1 : timeline_start_lsn: Lsn(0),
1162 1 : };
1163 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1164 0 : .await
1165 1 : .unwrap();
1166 :
1167 : // check that AppendRequest before epochStartLsn doesn't switch epoch
1168 1 : let resp = sk
1169 1 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1170 0 : .await;
1171 1 : assert!(resp.is_ok());
1172 1 : assert_eq!(sk.get_epoch(), 0);
1173 :
1174 : // but record at epochStartLsn does the switch
1175 1 : ar_hdr.begin_lsn = Lsn(2);
1176 1 : ar_hdr.end_lsn = Lsn(3);
1177 1 : append_request = AppendRequest {
1178 1 : h: ar_hdr,
1179 1 : wal_data: Bytes::from_static(b"b"),
1180 1 : };
1181 1 : let resp = sk
1182 1 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1183 0 : .await;
1184 1 : assert!(resp.is_ok());
1185 1 : sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
1186 1 : assert_eq!(sk.get_epoch(), 1);
1187 : }
1188 : }
|