Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use std::time::Duration;
14 : use storage_broker::proto::SafekeeperTimelineInfo;
15 :
16 : use tracing::*;
17 :
18 : use crate::control_file;
19 : use crate::send_wal::HotStandbyFeedback;
20 :
21 : use crate::state::TimelineState;
22 : use crate::wal_storage;
23 : use pq_proto::SystemId;
24 : use utils::pageserver_feedback::PageserverFeedback;
25 : use utils::{
26 : bin_ser::LeSer,
27 : id::{NodeId, TenantId, TimelineId},
28 : lsn::Lsn,
29 : };
30 :
31 : const SK_PROTOCOL_VERSION: u32 = 2;
32 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
33 :
34 : /// Consensus logical timestamp.
35 : pub type Term = u64;
36 : pub const INVALID_TERM: Term = 0;
37 :
38 8588 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
39 : pub struct TermLsn {
40 : pub term: Term,
41 : pub lsn: Lsn,
42 : }
43 :
44 : // Creation from tuple provides less typing (e.g. for unit tests).
45 : impl From<(Term, Lsn)> for TermLsn {
46 3923883 : fn from(pair: (Term, Lsn)) -> TermLsn {
47 3923883 : TermLsn {
48 3923883 : term: pair.0,
49 3923883 : lsn: pair.1,
50 3923883 : }
51 3923883 : }
52 : }
53 :
54 239504 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
55 : pub struct TermHistory(pub Vec<TermLsn>);
56 :
57 : impl TermHistory {
58 11918 : pub fn empty() -> TermHistory {
59 11918 : TermHistory(Vec::new())
60 11918 : }
61 :
62 : // Parse TermHistory as n_entries followed by TermLsn pairs
63 7243 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
64 7243 : if bytes.remaining() < 4 {
65 0 : bail!("TermHistory misses len");
66 7243 : }
67 7243 : let n_entries = bytes.get_u32_le();
68 7243 : let mut res = Vec::with_capacity(n_entries as usize);
69 7243 : for _ in 0..n_entries {
70 59647 : if bytes.remaining() < 16 {
71 0 : bail!("TermHistory is incomplete");
72 59647 : }
73 59647 : res.push(TermLsn {
74 59647 : term: bytes.get_u64_le(),
75 59647 : lsn: bytes.get_u64_le().into(),
76 59647 : })
77 : }
78 7243 : Ok(TermHistory(res))
79 7243 : }
80 :
81 : /// Return copy of self with switches happening strictly after up_to
82 : /// truncated.
83 52529 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
84 52529 : let mut res = Vec::with_capacity(self.0.len());
85 228327 : for e in &self.0 {
86 175869 : if e.lsn > up_to {
87 71 : break;
88 175798 : }
89 175798 : res.push(*e);
90 : }
91 52529 : TermHistory(res)
92 52529 : }
93 :
94 : /// Find point of divergence between leader (walproposer) term history and
95 : /// safekeeper. Arguments are not symmetrics as proposer history ends at
96 : /// +infinity while safekeeper at flush_lsn.
97 : /// C version is at walproposer SendProposerElected.
98 9 : pub fn find_highest_common_point(
99 9 : prop_th: &TermHistory,
100 9 : sk_th: &TermHistory,
101 9 : sk_wal_end: Lsn,
102 9 : ) -> Option<TermLsn> {
103 9 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
104 :
105 9 : if let Some(sk_th_last) = sk_th.last() {
106 9 : assert!(
107 9 : sk_th_last.lsn <= sk_wal_end,
108 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
109 : sk_th_last,
110 : sk_wal_end
111 : );
112 0 : }
113 :
114 : // find last common term, if any...
115 9 : let mut last_common_idx = None;
116 17 : for i in 0..min(sk_th.len(), prop_th.len()) {
117 17 : if prop_th[i].term != sk_th[i].term {
118 4 : break;
119 13 : }
120 13 : // If term is the same, LSN must be equal as well.
121 13 : assert!(
122 13 : prop_th[i].lsn == sk_th[i].lsn,
123 0 : "same term {} has different start LSNs: prop {}, sk {}",
124 0 : prop_th[i].term,
125 0 : prop_th[i].lsn,
126 0 : sk_th[i].lsn
127 : );
128 13 : last_common_idx = Some(i);
129 : }
130 9 : let last_common_idx = match last_common_idx {
131 2 : None => return None, // no common point
132 7 : Some(lci) => lci,
133 7 : };
134 7 : // Now find where it ends at both prop and sk and take min. End of
135 7 : // (common) term is the start of the next except it is the last one;
136 7 : // there it is flush_lsn in case of safekeeper or, in case of proposer
137 7 : // +infinity, so we just take flush_lsn then.
138 7 : if last_common_idx == prop_th.len() - 1 {
139 2 : Some(TermLsn {
140 2 : term: prop_th[last_common_idx].term,
141 2 : lsn: sk_wal_end,
142 2 : })
143 : } else {
144 5 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
145 5 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
146 2 : sk_th[last_common_idx + 1].lsn
147 : } else {
148 3 : sk_wal_end
149 : };
150 5 : Some(TermLsn {
151 5 : term: prop_th[last_common_idx].term,
152 5 : lsn: min(prop_common_term_end, sk_common_term_end),
153 5 : })
154 : }
155 9 : }
156 : }
157 :
158 : /// Display only latest entries for Debug.
159 : impl fmt::Debug for TermHistory {
160 3109 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
161 3109 : let n_printed = 20;
162 3109 : write!(
163 3109 : fmt,
164 3109 : "{}{:?}",
165 3109 : if self.0.len() > n_printed { "... " } else { "" },
166 3109 : self.0
167 3109 : .iter()
168 3109 : .rev()
169 3109 : .take(n_printed)
170 8776 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
171 3109 : .collect::<Vec<_>>()
172 3109 : )
173 3109 : }
174 : }
175 :
176 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
177 : pub type PgUuid = [u8; 16];
178 :
179 : /// Persistent consensus state of the acceptor.
180 232255 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
181 : pub struct AcceptorState {
182 : /// acceptor's last term it voted for (advanced in 1 phase)
183 : pub term: Term,
184 : /// History of term switches for safekeeper's WAL.
185 : /// Actually it often goes *beyond* WAL contents as we adopt term history
186 : /// from the proposer before recovery.
187 : pub term_history: TermHistory,
188 : }
189 :
190 : impl AcceptorState {
191 : /// acceptor's epoch is the term of the highest entry in the log
192 26857 : pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
193 26857 : let th = self.term_history.up_to(flush_lsn);
194 26857 : match th.0.last() {
195 25346 : Some(e) => e.term,
196 1511 : None => 0,
197 : }
198 26857 : }
199 : }
200 :
201 : /// Information about Postgres. Safekeeper gets it once and then verifies
202 : /// all further connections from computes match.
203 232303 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204 : pub struct ServerInfo {
205 : /// Postgres server version
206 : pub pg_version: u32,
207 : pub system_id: SystemId,
208 : pub wal_seg_size: u32,
209 : }
210 :
211 8 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212 : pub struct PersistedPeerInfo {
213 : /// LSN up to which safekeeper offloaded WAL to s3.
214 : pub backup_lsn: Lsn,
215 : /// Term of the last entry.
216 : pub term: Term,
217 : /// LSN of the last record.
218 : pub flush_lsn: Lsn,
219 : /// Up to which LSN safekeeper regards its WAL as committed.
220 : pub commit_lsn: Lsn,
221 : }
222 :
223 : impl PersistedPeerInfo {
224 0 : pub fn new() -> Self {
225 0 : Self {
226 0 : backup_lsn: Lsn::INVALID,
227 0 : term: INVALID_TERM,
228 0 : flush_lsn: Lsn(0),
229 0 : commit_lsn: Lsn(0),
230 0 : }
231 0 : }
232 : }
233 :
234 : // make clippy happy
235 : impl Default for PersistedPeerInfo {
236 0 : fn default() -> Self {
237 0 : Self::new()
238 0 : }
239 : }
240 :
241 : // protocol messages
242 :
243 : /// Initial Proposer -> Acceptor message
244 160060 : #[derive(Debug, Deserialize)]
245 : pub struct ProposerGreeting {
246 : /// proposer-acceptor protocol version
247 : pub protocol_version: u32,
248 : /// Postgres server version
249 : pub pg_version: u32,
250 : pub proposer_id: PgUuid,
251 : pub system_id: SystemId,
252 : pub timeline_id: TimelineId,
253 : pub tenant_id: TenantId,
254 : pub tli: TimeLineID,
255 : pub wal_seg_size: u32,
256 : }
257 :
258 : /// Acceptor -> Proposer initial response: the highest term known to me
259 : /// (acceptor voted for).
260 0 : #[derive(Debug, Serialize)]
261 : pub struct AcceptorGreeting {
262 : term: u64,
263 : node_id: NodeId,
264 : }
265 :
266 : /// Vote request sent from proposer to safekeepers
267 25664 : #[derive(Debug, Deserialize)]
268 : pub struct VoteRequest {
269 : pub term: Term,
270 : }
271 :
272 : /// Vote itself, sent from safekeeper to proposer
273 2078 : #[derive(Debug, Serialize)]
274 : pub struct VoteResponse {
275 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
276 : vote_given: u64, // fixme u64 due to padding
277 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
278 : // proposer to choose the most advanced one.
279 : pub flush_lsn: Lsn,
280 : truncate_lsn: Lsn,
281 : pub term_history: TermHistory,
282 : timeline_start_lsn: Lsn,
283 : }
284 :
285 : /*
286 : * Proposer -> Acceptor message announcing proposer is elected and communicating
287 : * term history to it.
288 : */
289 1029 : #[derive(Debug)]
290 : pub struct ProposerElected {
291 : pub term: Term,
292 : pub start_streaming_at: Lsn,
293 : pub term_history: TermHistory,
294 : pub timeline_start_lsn: Lsn,
295 : }
296 :
297 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
298 : /// communicates commit_lsn.
299 272 : #[derive(Debug)]
300 : pub struct AppendRequest {
301 : pub h: AppendRequestHeader,
302 : pub wal_data: Bytes,
303 : }
304 2418996 : #[derive(Debug, Clone, Deserialize)]
305 : pub struct AppendRequestHeader {
306 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
307 : pub term: Term,
308 : // TODO: remove this field, it in unused -- LSN of term switch can be taken
309 : // from ProposerElected (as well as from term history).
310 : pub epoch_start_lsn: Lsn,
311 : /// start position of message in WAL
312 : pub begin_lsn: Lsn,
313 : /// end position of message in WAL
314 : pub end_lsn: Lsn,
315 : /// LSN committed by quorum of safekeepers
316 : pub commit_lsn: Lsn,
317 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
318 : pub truncate_lsn: Lsn,
319 : // only for logging/debugging
320 : pub proposer_uuid: PgUuid,
321 : }
322 :
323 : /// Report safekeeper state to proposer
324 3 : #[derive(Debug, Serialize)]
325 : pub struct AppendResponse {
326 : // Current term of the safekeeper; if it is higher than proposer's, the
327 : // compute is out of date.
328 : pub term: Term,
329 : // NOTE: this is physical end of wal on safekeeper; currently it doesn't
330 : // make much sense without taking epoch into account, as history can be
331 : // diverged.
332 : pub flush_lsn: Lsn,
333 : // We report back our awareness about which WAL is committed, as this is
334 : // a criterion for walproposer --sync mode exit
335 : pub commit_lsn: Lsn,
336 : pub hs_feedback: HotStandbyFeedback,
337 : pub pageserver_feedback: PageserverFeedback,
338 : }
339 :
340 : impl AppendResponse {
341 3 : fn term_only(term: Term) -> AppendResponse {
342 3 : AppendResponse {
343 3 : term,
344 3 : flush_lsn: Lsn(0),
345 3 : commit_lsn: Lsn(0),
346 3 : hs_feedback: HotStandbyFeedback::empty(),
347 3 : pageserver_feedback: PageserverFeedback::empty(),
348 3 : }
349 3 : }
350 : }
351 :
352 : /// Proposer -> Acceptor messages
353 892 : #[derive(Debug)]
354 : pub enum ProposerAcceptorMessage {
355 : Greeting(ProposerGreeting),
356 : VoteRequest(VoteRequest),
357 : Elected(ProposerElected),
358 : AppendRequest(AppendRequest),
359 : NoFlushAppendRequest(AppendRequest),
360 : FlushWAL,
361 : }
362 :
363 : impl ProposerAcceptorMessage {
364 : /// Parse proposer message.
365 2611963 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
366 2611963 : // xxx using Reader is inefficient but easy to work with bincode
367 2611963 : let mut stream = msg_bytes.reader();
368 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
369 2611963 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
370 2611963 : match tag {
371 : 'g' => {
372 160060 : let msg = ProposerGreeting::des_from(&mut stream)?;
373 160060 : Ok(ProposerAcceptorMessage::Greeting(msg))
374 : }
375 : 'v' => {
376 25664 : let msg = VoteRequest::des_from(&mut stream)?;
377 25664 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
378 : }
379 : 'e' => {
380 7243 : let mut msg_bytes = stream.into_inner();
381 7243 : if msg_bytes.remaining() < 16 {
382 0 : bail!("ProposerElected message is not complete");
383 7243 : }
384 7243 : let term = msg_bytes.get_u64_le();
385 7243 : let start_streaming_at = msg_bytes.get_u64_le().into();
386 7243 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
387 7243 : if msg_bytes.remaining() < 8 {
388 0 : bail!("ProposerElected message is not complete");
389 7243 : }
390 7243 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
391 7243 : let msg = ProposerElected {
392 7243 : term,
393 7243 : start_streaming_at,
394 7243 : timeline_start_lsn,
395 7243 : term_history,
396 7243 : };
397 7243 : Ok(ProposerAcceptorMessage::Elected(msg))
398 : }
399 : 'a' => {
400 : // read header followed by wal data
401 2418996 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
402 2418996 : let rec_size = hdr
403 2418996 : .end_lsn
404 2418996 : .checked_sub(hdr.begin_lsn)
405 2418996 : .context("begin_lsn > end_lsn in AppendRequest")?
406 : .0 as usize;
407 2418996 : if rec_size > MAX_SEND_SIZE {
408 0 : bail!(
409 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
410 0 : MAX_SEND_SIZE
411 0 : );
412 2418996 : }
413 2418996 :
414 2418996 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
415 2418996 : stream.read_exact(&mut wal_data_vec)?;
416 2418996 : let wal_data = Bytes::from(wal_data_vec);
417 2418996 : let msg = AppendRequest { h: hdr, wal_data };
418 2418996 :
419 2418996 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
420 : }
421 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
422 : }
423 2611963 : }
424 : }
425 :
426 : /// Acceptor -> Proposer messages
427 0 : #[derive(Debug)]
428 : pub enum AcceptorProposerMessage {
429 : Greeting(AcceptorGreeting),
430 : VoteResponse(VoteResponse),
431 : AppendResponse(AppendResponse),
432 : }
433 :
434 : impl AcceptorProposerMessage {
435 : /// Serialize acceptor -> proposer message.
436 1724391 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
437 1724391 : match self {
438 160060 : AcceptorProposerMessage::Greeting(msg) => {
439 160060 : buf.put_u64_le('g' as u64);
440 160060 : buf.put_u64_le(msg.term);
441 160060 : buf.put_u64_le(msg.node_id.0);
442 160060 : }
443 25662 : AcceptorProposerMessage::VoteResponse(msg) => {
444 25662 : buf.put_u64_le('v' as u64);
445 25662 : buf.put_u64_le(msg.term);
446 25662 : buf.put_u64_le(msg.vote_given);
447 25662 : buf.put_u64_le(msg.flush_lsn.into());
448 25662 : buf.put_u64_le(msg.truncate_lsn.into());
449 25662 : buf.put_u32_le(msg.term_history.0.len() as u32);
450 120433 : for e in &msg.term_history.0 {
451 94771 : buf.put_u64_le(e.term);
452 94771 : buf.put_u64_le(e.lsn.into());
453 94771 : }
454 25662 : buf.put_u64_le(msg.timeline_start_lsn.into());
455 : }
456 1538669 : AcceptorProposerMessage::AppendResponse(msg) => {
457 1538669 : buf.put_u64_le('a' as u64);
458 1538669 : buf.put_u64_le(msg.term);
459 1538669 : buf.put_u64_le(msg.flush_lsn.into());
460 1538669 : buf.put_u64_le(msg.commit_lsn.into());
461 1538669 : buf.put_i64_le(msg.hs_feedback.ts);
462 1538669 : buf.put_u64_le(msg.hs_feedback.xmin);
463 1538669 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
464 1538669 :
465 1538669 : msg.pageserver_feedback.serialize(buf);
466 1538669 : }
467 : }
468 :
469 1724391 : Ok(())
470 1724391 : }
471 : }
472 :
473 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
474 : /// It controls all WAL disk writes and updates of control file.
475 : ///
476 : /// Currently safekeeper processes:
477 : /// - messages from compute (proposers) and provides replies
478 : /// - messages from broker peers
479 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
480 : /// LSN since the proposer safekeeper currently talking to appends WAL;
481 : /// determines epoch switch point.
482 : pub epoch_start_lsn: Lsn,
483 :
484 : pub state: TimelineState<CTRL>, // persistent state storage
485 : pub wal_store: WAL,
486 :
487 : node_id: NodeId, // safekeeper's node id
488 : }
489 :
490 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
491 : where
492 : CTRL: control_file::Storage,
493 : WAL: wal_storage::Storage,
494 : {
495 : /// Accepts a control file storage containing the safekeeper state.
496 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
497 : /// and `server` (`wal_seg_size` inside it) fields.
498 71044 : pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
499 71044 : if state.tenant_id == TenantId::from([0u8; 16])
500 71044 : || state.timeline_id == TimelineId::from([0u8; 16])
501 : {
502 0 : bail!(
503 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
504 0 : state.tenant_id,
505 0 : state.timeline_id
506 0 : );
507 71044 : }
508 71044 :
509 71044 : Ok(SafeKeeper {
510 71044 : epoch_start_lsn: Lsn(0),
511 71044 : state: TimelineState::new(state),
512 71044 : wal_store,
513 71044 : node_id,
514 71044 : })
515 71044 : }
516 :
517 : /// Get history of term switches for the available WAL
518 25669 : fn get_term_history(&self) -> TermHistory {
519 25669 : self.state
520 25669 : .acceptor_state
521 25669 : .term_history
522 25669 : .up_to(self.flush_lsn())
523 25669 : }
524 :
525 : /// Get current term.
526 3923367 : pub fn get_term(&self) -> Term {
527 3923367 : self.state.acceptor_state.term
528 3923367 : }
529 :
530 26607 : pub fn get_epoch(&self) -> Term {
531 26607 : self.state.acceptor_state.get_epoch(self.flush_lsn())
532 26607 : }
533 :
534 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
535 7965321 : pub fn flush_lsn(&self) -> Lsn {
536 7965321 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
537 7965321 : }
538 :
539 : /// Process message from proposer and possibly form reply. Concurrent
540 : /// callers must exclude each other.
541 4150614 : pub async fn process_msg(
542 4150614 : &mut self,
543 4150614 : msg: &ProposerAcceptorMessage,
544 4150614 : ) -> Result<Option<AcceptorProposerMessage>> {
545 4150614 : match msg {
546 160060 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
547 25669 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
548 1599389 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
549 7 : ProposerAcceptorMessage::AppendRequest(msg) => {
550 12 : self.handle_append_request(msg, true).await
551 : }
552 2418958 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
553 2924193 : self.handle_append_request(msg, false).await
554 : }
555 1538671 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
556 : }
557 4150614 : }
558 :
559 : /// Handle initial message from proposer: check its sanity and send my
560 : /// current term.
561 160060 : async fn handle_greeting(
562 160060 : &mut self,
563 160060 : msg: &ProposerGreeting,
564 160060 : ) -> Result<Option<AcceptorProposerMessage>> {
565 160060 : // Check protocol compatibility
566 160060 : if msg.protocol_version != SK_PROTOCOL_VERSION {
567 0 : bail!(
568 0 : "incompatible protocol version {}, expected {}",
569 0 : msg.protocol_version,
570 0 : SK_PROTOCOL_VERSION
571 0 : );
572 160060 : }
573 160060 : /* Postgres major version mismatch is treated as fatal error
574 160060 : * because safekeepers parse WAL headers and the format
575 160060 : * may change between versions.
576 160060 : */
577 160060 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
578 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
579 : {
580 0 : bail!(
581 0 : "incompatible server version {}, expected {}",
582 0 : msg.pg_version,
583 0 : self.state.server.pg_version
584 0 : );
585 160060 : }
586 160060 :
587 160060 : if msg.tenant_id != self.state.tenant_id {
588 0 : bail!(
589 0 : "invalid tenant ID, got {}, expected {}",
590 0 : msg.tenant_id,
591 0 : self.state.tenant_id
592 0 : );
593 160060 : }
594 160060 : if msg.timeline_id != self.state.timeline_id {
595 0 : bail!(
596 0 : "invalid timeline ID, got {}, expected {}",
597 0 : msg.timeline_id,
598 0 : self.state.timeline_id
599 0 : );
600 160060 : }
601 160060 : if self.state.server.wal_seg_size != msg.wal_seg_size {
602 0 : bail!(
603 0 : "invalid wal_seg_size, got {}, expected {}",
604 0 : msg.wal_seg_size,
605 0 : self.state.server.wal_seg_size
606 0 : );
607 160060 : }
608 160060 :
609 160060 : // system_id will be updated on mismatch
610 160060 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
611 160060 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
612 474 : if self.state.server.system_id != 0 {
613 0 : warn!(
614 0 : "unexpected system ID arrived, got {}, expected {}",
615 0 : msg.system_id, self.state.server.system_id
616 0 : );
617 474 : }
618 :
619 474 : let mut state = self.state.start_change();
620 474 : state.server.system_id = msg.system_id;
621 474 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
622 474 : state.server.pg_version = msg.pg_version;
623 474 : }
624 1422 : self.state.finish_change(&state).await?;
625 159586 : }
626 :
627 2144 : info!(
628 2144 : "processed greeting from walproposer {}, sending term {:?}",
629 34304 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
630 2144 : self.state.acceptor_state.term
631 2144 : );
632 160060 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
633 160060 : term: self.state.acceptor_state.term,
634 160060 : node_id: self.node_id,
635 160060 : })))
636 160060 : }
637 :
638 : /// Give vote for the given term, if we haven't done that previously.
639 25669 : async fn handle_vote_request(
640 25669 : &mut self,
641 25669 : msg: &VoteRequest,
642 25669 : ) -> Result<Option<AcceptorProposerMessage>> {
643 25669 : // Once voted, we won't accept data from older proposers; flush
644 25669 : // everything we've already received so that new proposer starts
645 25669 : // streaming at end of our WAL, without overlap. Currently we truncate
646 25669 : // WAL at streaming point, so this avoids truncating already committed
647 25669 : // WAL.
648 25669 : //
649 25669 : // TODO: it would be smoother to not truncate committed piece at
650 25669 : // handle_elected instead. Currently not a big deal, as proposer is the
651 25669 : // only source of WAL; with peer2peer recovery it would be more
652 25669 : // important.
653 25669 : self.wal_store.flush_wal().await?;
654 : // initialize with refusal
655 25669 : let mut resp = VoteResponse {
656 25669 : term: self.state.acceptor_state.term,
657 25669 : vote_given: false as u64,
658 25669 : flush_lsn: self.flush_lsn(),
659 25669 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
660 25669 : term_history: self.get_term_history(),
661 25669 : timeline_start_lsn: self.state.timeline_start_lsn,
662 25669 : };
663 25669 : if self.state.acceptor_state.term < msg.term {
664 24487 : let mut state = self.state.start_change();
665 24487 : state.acceptor_state.term = msg.term;
666 24487 : // persist vote before sending it out
667 24487 : self.state.finish_change(&state).await?;
668 :
669 24487 : resp.term = self.state.acceptor_state.term;
670 24487 : resp.vote_given = true as u64;
671 1182 : }
672 2078 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
673 25669 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
674 25669 : }
675 :
676 : /// Form AppendResponse from current state.
677 1538678 : fn append_response(&self) -> AppendResponse {
678 1538678 : let ar = AppendResponse {
679 1538678 : term: self.state.acceptor_state.term,
680 1538678 : flush_lsn: self.flush_lsn(),
681 1538678 : commit_lsn: self.state.commit_lsn,
682 1538678 : // will be filled by the upper code to avoid bothering safekeeper
683 1538678 : hs_feedback: HotStandbyFeedback::empty(),
684 1538678 : pageserver_feedback: PageserverFeedback::empty(),
685 1538678 : };
686 1538678 : trace!("formed AppendResponse {:?}", ar);
687 1538678 : ar
688 1538678 : }
689 :
690 7249 : async fn handle_elected(
691 7249 : &mut self,
692 7249 : msg: &ProposerElected,
693 7249 : ) -> Result<Option<AcceptorProposerMessage>> {
694 947 : info!("received ProposerElected {:?}", msg);
695 7249 : if self.state.acceptor_state.term < msg.term {
696 5 : let mut state = self.state.start_change();
697 5 : state.acceptor_state.term = msg.term;
698 9 : self.state.finish_change(&state).await?;
699 7244 : }
700 :
701 : // If our term is higher, ignore the message (next feedback will inform the compute)
702 7249 : if self.state.acceptor_state.term > msg.term {
703 0 : return Ok(None);
704 7249 : }
705 7249 :
706 7249 : // This might happen in a rare race when another (old) connection from
707 7249 : // the same walproposer writes + flushes WAL after this connection
708 7249 : // already sent flush_lsn in VoteRequest. It is generally safe to
709 7249 : // proceed, but to prevent commit_lsn surprisingly going down we should
710 7249 : // either refuse the session (simpler) or skip the part we already have
711 7249 : // from the stream (can be implemented).
712 7249 : if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
713 0 : bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
714 0 : msg.term, self.flush_lsn(), msg.start_streaming_at)
715 7249 : }
716 7249 : // Otherwise we must never attempt to truncate committed data.
717 7249 : assert!(
718 7249 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
719 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
720 : msg.start_streaming_at,
721 : self.state.inmem.commit_lsn
722 : );
723 :
724 : // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
725 : // intersection of our history and history from msg
726 :
727 : // truncate wal, update the LSNs
728 1596839 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
729 :
730 : // and now adopt term history from proposer
731 : {
732 7249 : let mut state = self.state.start_change();
733 7249 :
734 7249 : // Here we learn initial LSN for the first time, set fields
735 7249 : // interested in that.
736 7249 :
737 7249 : if state.timeline_start_lsn == Lsn(0) {
738 : // Remember point where WAL begins globally.
739 1469 : state.timeline_start_lsn = msg.timeline_start_lsn;
740 490 : info!(
741 490 : "setting timeline_start_lsn to {:?}",
742 490 : state.timeline_start_lsn
743 490 : );
744 5780 : }
745 7249 : if state.peer_horizon_lsn == Lsn(0) {
746 1466 : // Update peer_horizon_lsn as soon as we know where timeline starts.
747 1466 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
748 1466 : state.peer_horizon_lsn = msg.timeline_start_lsn;
749 5783 : }
750 7249 : if state.local_start_lsn == Lsn(0) {
751 1466 : state.local_start_lsn = msg.start_streaming_at;
752 487 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
753 5783 : }
754 : // Initializing commit_lsn before acking first flushed record is
755 : // important to let find_end_of_wal skip the hole in the beginning
756 : // of the first segment.
757 : //
758 : // NB: on new clusters, this happens at the same time as
759 : // timeline_start_lsn initialization, it is taken outside to provide
760 : // upgrade.
761 7249 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
762 7249 :
763 7249 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
764 7249 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
765 7249 :
766 7249 : state.acceptor_state.term_history = msg.term_history.clone();
767 7249 : self.state.finish_change(&state).await?;
768 : }
769 :
770 947 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
771 :
772 : // Cache LSN where term starts to immediately fsync control file with
773 : // commit_lsn once we reach it -- sync-safekeepers finishes when
774 : // persisted commit_lsn on majority of safekeepers aligns.
775 7249 : self.epoch_start_lsn = match msg.term_history.0.last() {
776 0 : None => bail!("proposer elected with empty term history"),
777 7249 : Some(term_lsn_start) => term_lsn_start.lsn,
778 7249 : };
779 7249 :
780 7249 : Ok(None)
781 7249 : }
782 :
783 : /// Advance commit_lsn taking into account what we have locally.
784 : ///
785 : /// Note: it is assumed that 'WAL we have is from the right term' check has
786 : /// already been done outside.
787 2416710 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
788 2416710 : // Both peers and walproposer communicate this value, we might already
789 2416710 : // have a fresher (higher) version.
790 2416710 : candidate = max(candidate, self.state.inmem.commit_lsn);
791 2416710 : let commit_lsn = min(candidate, self.flush_lsn());
792 2416710 : assert!(
793 2416710 : commit_lsn >= self.state.inmem.commit_lsn,
794 0 : "commit_lsn monotonicity violated: old={} new={}",
795 : self.state.inmem.commit_lsn,
796 : commit_lsn
797 : );
798 :
799 2416710 : self.state.inmem.commit_lsn = commit_lsn;
800 2416710 :
801 2416710 : // If new commit_lsn reached epoch switch, force sync of control
802 2416710 : // file: walproposer in sync mode is very interested when this
803 2416710 : // happens. Note: this is for sync-safekeepers mode only, as
804 2416710 : // otherwise commit_lsn might jump over epoch_start_lsn.
805 2416710 : if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
806 898 : self.state.flush().await?;
807 2415812 : }
808 :
809 2416710 : Ok(())
810 2416710 : }
811 :
812 : /// Persist control file if there is something to save and enough time
813 : /// passed after the last save.
814 1833 : pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<()> {
815 1833 : const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
816 1833 : if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
817 1833 : return Ok(());
818 0 : }
819 0 : let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn
820 0 : || self.state.inmem.backup_lsn > self.state.backup_lsn
821 0 : || self.state.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
822 0 : || self.state.inmem.remote_consistent_lsn > self.state.remote_consistent_lsn;
823 0 : if need_persist {
824 0 : self.state.flush().await?;
825 0 : trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
826 0 : }
827 0 : Ok(())
828 1833 : }
829 :
830 : /// Handle request to append WAL.
831 : #[allow(clippy::comparison_chain)]
832 2418965 : async fn handle_append_request(
833 2418965 : &mut self,
834 2418965 : msg: &AppendRequest,
835 2418965 : require_flush: bool,
836 2418965 : ) -> Result<Option<AcceptorProposerMessage>> {
837 2418965 : if self.state.acceptor_state.term < msg.h.term {
838 0 : bail!("got AppendRequest before ProposerElected");
839 2418965 : }
840 2418965 :
841 2418965 : // If our term is higher, immediately refuse the message.
842 2418965 : if self.state.acceptor_state.term > msg.h.term {
843 3 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
844 3 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
845 2418962 : }
846 2418962 :
847 2418962 : // Now we know that we are in the same term as the proposer,
848 2418962 : // processing the message.
849 2418962 :
850 2418962 : self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
851 2418962 :
852 2418962 : // do the job
853 2418962 : if !msg.wal_data.is_empty() {
854 1516470 : self.wal_store
855 1516470 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
856 2922791 : .await?;
857 902492 : }
858 :
859 : // flush wal to the disk, if required
860 2418962 : if require_flush {
861 7 : self.wal_store.flush_wal().await?;
862 2418955 : }
863 :
864 : // Update commit_lsn.
865 2418962 : if msg.h.commit_lsn != Lsn(0) {
866 2405612 : self.update_commit_lsn(msg.h.commit_lsn).await?;
867 13350 : }
868 : // Value calculated by walproposer can always lag:
869 : // - safekeepers can forget inmem value and send to proposer lower
870 : // persisted one on restart;
871 : // - if we make safekeepers always send persistent value,
872 : // any compute restart would pull it down.
873 : // Thus, take max before adopting.
874 2418962 : self.state.inmem.peer_horizon_lsn =
875 2418962 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
876 2418962 :
877 2418962 : // Update truncate and commit LSN in control file.
878 2418962 : // To avoid negative impact on performance of extra fsync, do it only
879 2418962 : // when commit_lsn delta exceeds WAL segment size.
880 2418962 : if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
881 2418962 : < self.state.inmem.commit_lsn
882 : {
883 1330 : self.state.flush().await?;
884 2418508 : }
885 :
886 0 : trace!(
887 0 : "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
888 0 : msg.wal_data.len(),
889 0 : msg.h.end_lsn,
890 0 : msg.h.commit_lsn,
891 0 : msg.h.truncate_lsn,
892 0 : require_flush,
893 0 : );
894 :
895 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
896 2418962 : if !require_flush {
897 2418955 : return Ok(None);
898 7 : }
899 7 :
900 7 : let resp = self.append_response();
901 7 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
902 2418965 : }
903 :
904 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
905 1538671 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
906 1538671 : self.wal_store.flush_wal().await?;
907 1538671 : Ok(Some(AcceptorProposerMessage::AppendResponse(
908 1538671 : self.append_response(),
909 1538671 : )))
910 1538671 : }
911 :
912 : /// Update timeline state with peer safekeeper data.
913 11279 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
914 11279 : let mut sync_control_file = false;
915 11279 :
916 11279 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
917 : // Note: the check is too restrictive, generally we can update local
918 : // commit_lsn if our history matches (is part of) history of advanced
919 : // commit_lsn provider.
920 11166 : if sk_info.last_log_term == self.get_epoch() {
921 11098 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
922 68 : }
923 113 : }
924 :
925 11279 : self.state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), self.state.inmem.backup_lsn);
926 11279 : sync_control_file |= self.state.backup_lsn + (self.state.server.wal_seg_size as u64)
927 11279 : < self.state.inmem.backup_lsn;
928 11279 :
929 11279 : self.state.inmem.remote_consistent_lsn = max(
930 11279 : Lsn(sk_info.remote_consistent_lsn),
931 11279 : self.state.inmem.remote_consistent_lsn,
932 11279 : );
933 11279 : sync_control_file |= self.state.remote_consistent_lsn
934 11279 : + (self.state.server.wal_seg_size as u64)
935 11279 : < self.state.inmem.remote_consistent_lsn;
936 11279 :
937 11279 : self.state.inmem.peer_horizon_lsn = max(
938 11279 : Lsn(sk_info.peer_horizon_lsn),
939 11279 : self.state.inmem.peer_horizon_lsn,
940 11279 : );
941 11279 : sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
942 11279 : < self.state.inmem.peer_horizon_lsn;
943 11279 :
944 11279 : if sync_control_file {
945 1062 : self.state.flush().await?;
946 10925 : }
947 11279 : Ok(())
948 11279 : }
949 :
950 : /// Get oldest segno we still need to keep. We hold WAL till it is consumed
951 : /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
952 : /// offloading.
953 : /// While it is safe to use inmem values for determining horizon,
954 : /// we use persistent to make possible normal states less surprising.
955 1833 : pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
956 1833 : let mut horizon_lsn = min(
957 1833 : self.state.remote_consistent_lsn,
958 1833 : self.state.peer_horizon_lsn,
959 1833 : );
960 1833 : if wal_backup_enabled {
961 1833 : horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
962 1833 : }
963 1833 : horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
964 1833 : }
965 : }
966 :
967 : #[cfg(test)]
968 : mod tests {
969 : use futures::future::BoxFuture;
970 : use postgres_ffi::WAL_SEGMENT_SIZE;
971 :
972 : use super::*;
973 : use crate::{
974 : state::{PersistedPeers, TimelinePersistentState},
975 : wal_storage::Storage,
976 : };
977 : use std::{ops::Deref, str::FromStr, time::Instant};
978 :
979 : // fake storage for tests
980 : struct InMemoryState {
981 : persisted_state: TimelinePersistentState,
982 : }
983 :
984 : #[async_trait::async_trait]
985 : impl control_file::Storage for InMemoryState {
986 6 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
987 6 : self.persisted_state = s.clone();
988 6 : Ok(())
989 12 : }
990 :
991 0 : fn last_persist_at(&self) -> Instant {
992 0 : Instant::now()
993 0 : }
994 : }
995 :
996 : impl Deref for InMemoryState {
997 : type Target = TimelinePersistentState;
998 :
999 120 : fn deref(&self) -> &Self::Target {
1000 120 : &self.persisted_state
1001 120 : }
1002 : }
1003 :
1004 4 : fn test_sk_state() -> TimelinePersistentState {
1005 4 : let mut state = TimelinePersistentState::empty();
1006 4 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1007 4 : state.tenant_id = TenantId::from([1u8; 16]);
1008 4 : state.timeline_id = TimelineId::from([1u8; 16]);
1009 4 : state
1010 4 : }
1011 :
1012 : struct DummyWalStore {
1013 : lsn: Lsn,
1014 : }
1015 :
1016 : #[async_trait::async_trait]
1017 : impl wal_storage::Storage for DummyWalStore {
1018 18 : fn flush_lsn(&self) -> Lsn {
1019 18 : self.lsn
1020 18 : }
1021 :
1022 4 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1023 4 : self.lsn = startpos + buf.len() as u64;
1024 4 : Ok(())
1025 8 : }
1026 :
1027 4 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1028 4 : self.lsn = end_pos;
1029 4 : Ok(())
1030 8 : }
1031 :
1032 8 : async fn flush_wal(&mut self) -> Result<()> {
1033 8 : Ok(())
1034 16 : }
1035 :
1036 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1037 0 : Box::pin(async { Ok(()) })
1038 0 : }
1039 :
1040 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1041 0 : crate::metrics::WalStorageMetrics::default()
1042 0 : }
1043 : }
1044 :
1045 2 : #[tokio::test]
1046 2 : async fn test_voting() {
1047 2 : let storage = InMemoryState {
1048 2 : persisted_state: test_sk_state(),
1049 2 : };
1050 2 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1051 2 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1052 2 :
1053 2 : // check voting for 1 is ok
1054 2 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1055 2 : let mut vote_resp = sk.process_msg(&vote_request).await;
1056 2 : match vote_resp.unwrap() {
1057 2 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1058 2 : r => panic!("unexpected response: {:?}", r),
1059 2 : }
1060 2 :
1061 2 : // reboot...
1062 2 : let state = sk.state.deref().clone();
1063 2 : let storage = InMemoryState {
1064 2 : persisted_state: state,
1065 2 : };
1066 2 :
1067 2 : sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
1068 2 :
1069 2 : // and ensure voting second time for 1 is not ok
1070 2 : vote_resp = sk.process_msg(&vote_request).await;
1071 2 : match vote_resp.unwrap() {
1072 2 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1073 2 : r => panic!("unexpected response: {:?}", r),
1074 2 : }
1075 2 : }
1076 :
1077 2 : #[tokio::test]
1078 2 : async fn test_epoch_switch() {
1079 2 : let storage = InMemoryState {
1080 2 : persisted_state: test_sk_state(),
1081 2 : };
1082 2 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1083 2 :
1084 2 : let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
1085 2 :
1086 2 : let mut ar_hdr = AppendRequestHeader {
1087 2 : term: 1,
1088 2 : epoch_start_lsn: Lsn(3),
1089 2 : begin_lsn: Lsn(1),
1090 2 : end_lsn: Lsn(2),
1091 2 : commit_lsn: Lsn(0),
1092 2 : truncate_lsn: Lsn(0),
1093 2 : proposer_uuid: [0; 16],
1094 2 : };
1095 2 : let mut append_request = AppendRequest {
1096 2 : h: ar_hdr.clone(),
1097 2 : wal_data: Bytes::from_static(b"b"),
1098 2 : };
1099 2 :
1100 2 : let pem = ProposerElected {
1101 2 : term: 1,
1102 2 : start_streaming_at: Lsn(1),
1103 2 : term_history: TermHistory(vec![TermLsn {
1104 2 : term: 1,
1105 2 : lsn: Lsn(3),
1106 2 : }]),
1107 2 : timeline_start_lsn: Lsn(0),
1108 2 : };
1109 2 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1110 2 : .await
1111 2 : .unwrap();
1112 2 :
1113 2 : // check that AppendRequest before epochStartLsn doesn't switch epoch
1114 2 : let resp = sk
1115 2 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1116 2 : .await;
1117 2 : assert!(resp.is_ok());
1118 2 : assert_eq!(sk.get_epoch(), 0);
1119 2 :
1120 2 : // but record at epochStartLsn does the switch
1121 2 : ar_hdr.begin_lsn = Lsn(2);
1122 2 : ar_hdr.end_lsn = Lsn(3);
1123 2 : append_request = AppendRequest {
1124 2 : h: ar_hdr,
1125 2 : wal_data: Bytes::from_static(b"b"),
1126 2 : };
1127 2 : let resp = sk
1128 2 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1129 2 : .await;
1130 2 : assert!(resp.is_ok());
1131 2 : sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
1132 2 : assert_eq!(sk.get_epoch(), 1);
1133 2 : }
1134 :
1135 2 : #[test]
1136 2 : fn test_find_highest_common_point_none() {
1137 2 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1138 2 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1139 2 : assert_eq!(
1140 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1141 2 : None
1142 2 : );
1143 2 : }
1144 :
1145 2 : #[test]
1146 2 : fn test_find_highest_common_point_middle() {
1147 2 : let prop_th = TermHistory(vec![
1148 2 : (1, Lsn(10)).into(),
1149 2 : (2, Lsn(20)).into(),
1150 2 : (4, Lsn(40)).into(),
1151 2 : ]);
1152 2 : let sk_th = TermHistory(vec![
1153 2 : (1, Lsn(10)).into(),
1154 2 : (2, Lsn(20)).into(),
1155 2 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1156 2 : ]);
1157 2 : assert_eq!(
1158 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1159 2 : Some(TermLsn {
1160 2 : term: 2,
1161 2 : lsn: Lsn(30),
1162 2 : })
1163 2 : );
1164 2 : }
1165 :
1166 2 : #[test]
1167 2 : fn test_find_highest_common_point_sk_end() {
1168 2 : let prop_th = TermHistory(vec![
1169 2 : (1, Lsn(10)).into(),
1170 2 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1171 2 : (4, Lsn(40)).into(),
1172 2 : ]);
1173 2 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1174 2 : assert_eq!(
1175 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1176 2 : Some(TermLsn {
1177 2 : term: 2,
1178 2 : lsn: Lsn(32),
1179 2 : })
1180 2 : );
1181 2 : }
1182 :
1183 2 : #[test]
1184 2 : fn test_find_highest_common_point_walprop() {
1185 2 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1186 2 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1187 2 : assert_eq!(
1188 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1189 2 : Some(TermLsn {
1190 2 : term: 2,
1191 2 : lsn: Lsn(32),
1192 2 : })
1193 2 : );
1194 2 : }
1195 :
1196 2 : #[test]
1197 2 : fn test_sk_state_bincode_serde_roundtrip() {
1198 2 : use utils::Hex;
1199 2 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1200 2 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1201 2 : let state = TimelinePersistentState {
1202 2 : tenant_id,
1203 2 : timeline_id,
1204 2 : acceptor_state: AcceptorState {
1205 2 : term: 42,
1206 2 : term_history: TermHistory(vec![TermLsn {
1207 2 : lsn: Lsn(0x1),
1208 2 : term: 41,
1209 2 : }]),
1210 2 : },
1211 2 : server: ServerInfo {
1212 2 : pg_version: 14,
1213 2 : system_id: 0x1234567887654321,
1214 2 : wal_seg_size: 0x12345678,
1215 2 : },
1216 2 : proposer_uuid: {
1217 2 : let mut arr = timeline_id.as_arr();
1218 2 : arr.reverse();
1219 2 : arr
1220 2 : },
1221 2 : timeline_start_lsn: Lsn(0x12345600),
1222 2 : local_start_lsn: Lsn(0x12),
1223 2 : commit_lsn: Lsn(1234567800),
1224 2 : backup_lsn: Lsn(1234567300),
1225 2 : peer_horizon_lsn: Lsn(9999999),
1226 2 : remote_consistent_lsn: Lsn(1234560000),
1227 2 : peers: PersistedPeers(vec![(
1228 2 : NodeId(1),
1229 2 : PersistedPeerInfo {
1230 2 : backup_lsn: Lsn(1234567000),
1231 2 : term: 42,
1232 2 : flush_lsn: Lsn(1234567800 - 8),
1233 2 : commit_lsn: Lsn(1234567600),
1234 2 : },
1235 2 : )]),
1236 2 : };
1237 2 :
1238 2 : let ser = state.ser().unwrap();
1239 2 :
1240 2 : #[rustfmt::skip]
1241 2 : let expected = [
1242 2 : // tenant_id as length prefixed hex
1243 2 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1244 2 : 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
1245 2 : // timeline_id as length prefixed hex
1246 2 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1247 2 : 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
1248 2 : // term
1249 2 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1250 2 : // length prefix
1251 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1252 2 : // unsure why this order is swapped
1253 2 : 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1254 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1255 2 : // pg_version
1256 2 : 0x0e, 0x00, 0x00, 0x00,
1257 2 : // systemid
1258 2 : 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
1259 2 : // wal_seg_size
1260 2 : 0x78, 0x56, 0x34, 0x12,
1261 2 : // pguuid as length prefixed hex
1262 2 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1263 2 : 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
1264 2 :
1265 2 : // timeline_start_lsn
1266 2 : 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
1267 2 : 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1268 2 : 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1269 2 : 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1270 2 : 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
1271 2 : 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1272 2 : // length prefix for persistentpeers
1273 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1274 2 : // nodeid
1275 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1276 2 : // backuplsn
1277 2 : 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1278 2 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1279 2 : 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1280 2 : 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1281 2 : ];
1282 2 :
1283 2 : assert_eq!(Hex(&ser), Hex(&expected));
1284 :
1285 2 : let deser = TimelinePersistentState::des(&ser).unwrap();
1286 2 :
1287 2 : assert_eq!(deser, state);
1288 2 : }
1289 : }
|