Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use storage_broker::proto::SafekeeperTimelineInfo;
14 :
15 : use tracing::*;
16 :
17 : use crate::control_file;
18 : use crate::metrics::MISC_OPERATION_SECONDS;
19 : use crate::send_wal::HotStandbyFeedback;
20 :
21 : use crate::state::TimelineState;
22 : use crate::wal_storage;
23 : use pq_proto::SystemId;
24 : use utils::pageserver_feedback::PageserverFeedback;
25 : use utils::{
26 : bin_ser::LeSer,
27 : id::{NodeId, TenantId, TimelineId},
28 : lsn::Lsn,
29 : };
30 :
31 : const SK_PROTOCOL_VERSION: u32 = 2;
32 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
33 :
34 : /// Consensus logical timestamp.
35 : pub type Term = u64;
36 : pub const INVALID_TERM: Term = 0;
37 :
38 24 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
39 : pub struct TermLsn {
40 : pub term: Term,
41 : pub lsn: Lsn,
42 : }
43 :
44 : // Creation from tuple provides less typing (e.g. for unit tests).
45 : impl From<(Term, Lsn)> for TermLsn {
46 108 : fn from(pair: (Term, Lsn)) -> TermLsn {
47 108 : TermLsn {
48 108 : term: pair.0,
49 108 : lsn: pair.1,
50 108 : }
51 108 : }
52 : }
53 :
54 36 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
55 : pub struct TermHistory(pub Vec<TermLsn>);
56 :
57 : impl TermHistory {
58 34154 : pub fn empty() -> TermHistory {
59 34154 : TermHistory(Vec::new())
60 34154 : }
61 :
62 : // Parse TermHistory as n_entries followed by TermLsn pairs
63 19991 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
64 19991 : if bytes.remaining() < 4 {
65 0 : bail!("TermHistory misses len");
66 19991 : }
67 19991 : let n_entries = bytes.get_u32_le();
68 19991 : let mut res = Vec::with_capacity(n_entries as usize);
69 19991 : for _ in 0..n_entries {
70 174652 : if bytes.remaining() < 16 {
71 0 : bail!("TermHistory is incomplete");
72 174652 : }
73 174652 : res.push(TermLsn {
74 174652 : term: bytes.get_u64_le(),
75 174652 : lsn: bytes.get_u64_le().into(),
76 174652 : })
77 : }
78 19991 : Ok(TermHistory(res))
79 19991 : }
80 :
81 : /// Return copy of self with switches happening strictly after up_to
82 : /// truncated.
83 93821 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
84 93821 : let mut res = Vec::with_capacity(self.0.len());
85 528832 : for e in &self.0 {
86 435158 : if e.lsn > up_to {
87 147 : break;
88 435011 : }
89 435011 : res.push(*e);
90 : }
91 93821 : TermHistory(res)
92 93821 : }
93 :
94 : /// Find point of divergence between leader (walproposer) term history and
95 : /// safekeeper. Arguments are not symmetric as proposer history ends at
96 : /// +infinity while safekeeper at flush_lsn.
97 : /// C version is at walproposer SendProposerElected.
98 20021 : pub fn find_highest_common_point(
99 20021 : prop_th: &TermHistory,
100 20021 : sk_th: &TermHistory,
101 20021 : sk_wal_end: Lsn,
102 20021 : ) -> Option<TermLsn> {
103 20021 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
104 :
105 20021 : if let Some(sk_th_last) = sk_th.last() {
106 16855 : assert!(
107 16855 : sk_th_last.lsn <= sk_wal_end,
108 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
109 : sk_th_last,
110 : sk_wal_end
111 : );
112 3166 : }
113 :
114 : // find last common term, if any...
115 20021 : let mut last_common_idx = None;
116 152934 : for i in 0..min(sk_th.len(), prop_th.len()) {
117 152934 : if prop_th[i].term != sk_th[i].term {
118 138 : break;
119 152796 : }
120 152796 : // If term is the same, LSN must be equal as well.
121 152796 : assert!(
122 152796 : prop_th[i].lsn == sk_th[i].lsn,
123 0 : "same term {} has different start LSNs: prop {}, sk {}",
124 0 : prop_th[i].term,
125 0 : prop_th[i].lsn,
126 0 : sk_th[i].lsn
127 : );
128 152796 : last_common_idx = Some(i);
129 : }
130 20021 : let last_common_idx = match last_common_idx {
131 3184 : None => return None, // no common point
132 16837 : Some(lci) => lci,
133 16837 : };
134 16837 : // Now find where it ends at both prop and sk and take min. End of
135 16837 : // (common) term is the start of the next except it is the last one;
136 16837 : // there it is flush_lsn in case of safekeeper or, in case of proposer
137 16837 : // +infinity, so we just take flush_lsn then.
138 16837 : if last_common_idx == prop_th.len() - 1 {
139 1261 : Some(TermLsn {
140 1261 : term: prop_th[last_common_idx].term,
141 1261 : lsn: sk_wal_end,
142 1261 : })
143 : } else {
144 15576 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
145 15576 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
146 120 : sk_th[last_common_idx + 1].lsn
147 : } else {
148 15456 : sk_wal_end
149 : };
150 15576 : Some(TermLsn {
151 15576 : term: prop_th[last_common_idx].term,
152 15576 : lsn: min(prop_common_term_end, sk_common_term_end),
153 15576 : })
154 : }
155 20021 : }
156 : }
157 :
158 : /// Display only latest entries for Debug.
159 : impl fmt::Debug for TermHistory {
160 4378 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
161 4378 : let n_printed = 20;
162 4378 : write!(
163 4378 : fmt,
164 4378 : "{}{:?}",
165 4378 : if self.0.len() > n_printed { "... " } else { "" },
166 4378 : self.0
167 4378 : .iter()
168 4378 : .rev()
169 4378 : .take(n_printed)
170 10415 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
171 4378 : .collect::<Vec<_>>()
172 4378 : )
173 4378 : }
174 : }
175 :
176 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
177 : pub type PgUuid = [u8; 16];
178 :
179 : /// Persistent consensus state of the acceptor.
180 36 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
181 : pub struct AcceptorState {
182 : /// acceptor's last term it voted for (advanced in 1 phase)
183 : pub term: Term,
184 : /// History of term switches for safekeeper's WAL.
185 : /// Actually it often goes *beyond* WAL contents as we adopt term history
186 : /// from the proposer before recovery.
187 : pub term_history: TermHistory,
188 : }
189 :
190 : impl AcceptorState {
191 : /// acceptor's last_log_term is the term of the highest entry in the log
192 258 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
193 258 : let th = self.term_history.up_to(flush_lsn);
194 258 : match th.0.last() {
195 216 : Some(e) => e.term,
196 42 : None => 0,
197 : }
198 258 : }
199 : }
200 :
201 : /// Information about Postgres. Safekeeper gets it once and then verifies
202 : /// all further connections from computes match.
203 24 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204 : pub struct ServerInfo {
205 : /// Postgres server version
206 : pub pg_version: u32,
207 : pub system_id: SystemId,
208 : pub wal_seg_size: u32,
209 : }
210 :
211 12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212 : pub struct PersistedPeerInfo {
213 : /// LSN up to which safekeeper offloaded WAL to s3.
214 : pub backup_lsn: Lsn,
215 : /// Term of the last entry.
216 : pub term: Term,
217 : /// LSN of the last record.
218 : pub flush_lsn: Lsn,
219 : /// Up to which LSN safekeeper regards its WAL as committed.
220 : pub commit_lsn: Lsn,
221 : }
222 :
223 : impl PersistedPeerInfo {
224 0 : pub fn new() -> Self {
225 0 : Self {
226 0 : backup_lsn: Lsn::INVALID,
227 0 : term: INVALID_TERM,
228 0 : flush_lsn: Lsn(0),
229 0 : commit_lsn: Lsn(0),
230 0 : }
231 0 : }
232 : }
233 :
234 : // make clippy happy
235 : impl Default for PersistedPeerInfo {
236 0 : fn default() -> Self {
237 0 : Self::new()
238 0 : }
239 : }
240 :
241 : // protocol messages
242 :
243 : /// Initial Proposer -> Acceptor message
244 463422 : #[derive(Debug, Deserialize)]
245 : pub struct ProposerGreeting {
246 : /// proposer-acceptor protocol version
247 : pub protocol_version: u32,
248 : /// Postgres server version
249 : pub pg_version: u32,
250 : pub proposer_id: PgUuid,
251 : pub system_id: SystemId,
252 : pub timeline_id: TimelineId,
253 : pub tenant_id: TenantId,
254 : pub tli: TimeLineID,
255 : pub wal_seg_size: u32,
256 : }
257 :
258 : /// Acceptor -> Proposer initial response: the highest term known to me
259 : /// (acceptor voted for).
260 : #[derive(Debug, Serialize)]
261 : pub struct AcceptorGreeting {
262 : term: u64,
263 : node_id: NodeId,
264 : }
265 :
266 : /// Vote request sent from proposer to safekeepers
267 73554 : #[derive(Debug, Deserialize)]
268 : pub struct VoteRequest {
269 : pub term: Term,
270 : }
271 :
272 : /// Vote itself, sent from safekeeper to proposer
273 : #[derive(Debug, Serialize)]
274 : pub struct VoteResponse {
275 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
276 : vote_given: u64, // fixme u64 due to padding
277 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
278 : // proposer to choose the most advanced one.
279 : pub flush_lsn: Lsn,
280 : truncate_lsn: Lsn,
281 : pub term_history: TermHistory,
282 : timeline_start_lsn: Lsn,
283 : }
284 :
285 : /*
286 : * Proposer -> Acceptor message announcing proposer is elected and communicating
287 : * term history to it.
288 : */
289 : #[derive(Debug)]
290 : pub struct ProposerElected {
291 : pub term: Term,
292 : pub start_streaming_at: Lsn,
293 : pub term_history: TermHistory,
294 : pub timeline_start_lsn: Lsn,
295 : }
296 :
297 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
298 : /// communicates commit_lsn.
299 : #[derive(Debug)]
300 : pub struct AppendRequest {
301 : pub h: AppendRequestHeader,
302 : pub wal_data: Bytes,
303 : }
304 66070 : #[derive(Debug, Clone, Deserialize)]
305 : pub struct AppendRequestHeader {
306 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
307 : pub term: Term,
308 : // TODO: remove this field from the protocol, it in unused -- LSN of term
309 : // switch can be taken from ProposerElected (as well as from term history).
310 : pub term_start_lsn: Lsn,
311 : /// start position of message in WAL
312 : pub begin_lsn: Lsn,
313 : /// end position of message in WAL
314 : pub end_lsn: Lsn,
315 : /// LSN committed by quorum of safekeepers
316 : pub commit_lsn: Lsn,
317 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
318 : pub truncate_lsn: Lsn,
319 : // only for logging/debugging
320 : pub proposer_uuid: PgUuid,
321 : }
322 :
323 : /// Report safekeeper state to proposer
324 : #[derive(Debug, Serialize, Clone)]
325 : pub struct AppendResponse {
326 : // Current term of the safekeeper; if it is higher than proposer's, the
327 : // compute is out of date.
328 : pub term: Term,
329 : // Flushed end of wal on safekeeper; one should be always mindful from what
330 : // term history this value comes, either checking history directly or
331 : // observing term being set to one for which WAL truncation is known to have
332 : // happened.
333 : pub flush_lsn: Lsn,
334 : // We report back our awareness about which WAL is committed, as this is
335 : // a criterion for walproposer --sync mode exit
336 : pub commit_lsn: Lsn,
337 : pub hs_feedback: HotStandbyFeedback,
338 : pub pageserver_feedback: Option<PageserverFeedback>,
339 : }
340 :
341 : impl AppendResponse {
342 0 : fn term_only(term: Term) -> AppendResponse {
343 0 : AppendResponse {
344 0 : term,
345 0 : flush_lsn: Lsn(0),
346 0 : commit_lsn: Lsn(0),
347 0 : hs_feedback: HotStandbyFeedback::empty(),
348 0 : pageserver_feedback: None,
349 0 : }
350 0 : }
351 : }
352 :
353 : /// Proposer -> Acceptor messages
354 : #[derive(Debug)]
355 : pub enum ProposerAcceptorMessage {
356 : Greeting(ProposerGreeting),
357 : VoteRequest(VoteRequest),
358 : Elected(ProposerElected),
359 : AppendRequest(AppendRequest),
360 : NoFlushAppendRequest(AppendRequest),
361 : FlushWAL,
362 : }
363 :
364 : impl ProposerAcceptorMessage {
365 : /// Parse proposer message.
366 623037 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
367 623037 : // xxx using Reader is inefficient but easy to work with bincode
368 623037 : let mut stream = msg_bytes.reader();
369 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
370 623037 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
371 623037 : match tag {
372 : 'g' => {
373 463422 : let msg = ProposerGreeting::des_from(&mut stream)?;
374 463422 : Ok(ProposerAcceptorMessage::Greeting(msg))
375 : }
376 : 'v' => {
377 73554 : let msg = VoteRequest::des_from(&mut stream)?;
378 73554 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
379 : }
380 : 'e' => {
381 19991 : let mut msg_bytes = stream.into_inner();
382 19991 : if msg_bytes.remaining() < 16 {
383 0 : bail!("ProposerElected message is not complete");
384 19991 : }
385 19991 : let term = msg_bytes.get_u64_le();
386 19991 : let start_streaming_at = msg_bytes.get_u64_le().into();
387 19991 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
388 19991 : if msg_bytes.remaining() < 8 {
389 0 : bail!("ProposerElected message is not complete");
390 19991 : }
391 19991 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
392 19991 : let msg = ProposerElected {
393 19991 : term,
394 19991 : start_streaming_at,
395 19991 : timeline_start_lsn,
396 19991 : term_history,
397 19991 : };
398 19991 : Ok(ProposerAcceptorMessage::Elected(msg))
399 : }
400 : 'a' => {
401 : // read header followed by wal data
402 66070 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
403 66070 : let rec_size = hdr
404 66070 : .end_lsn
405 66070 : .checked_sub(hdr.begin_lsn)
406 66070 : .context("begin_lsn > end_lsn in AppendRequest")?
407 : .0 as usize;
408 66070 : if rec_size > MAX_SEND_SIZE {
409 0 : bail!(
410 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
411 0 : MAX_SEND_SIZE
412 0 : );
413 66070 : }
414 66070 :
415 66070 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
416 66070 : stream.read_exact(&mut wal_data_vec)?;
417 66070 : let wal_data = Bytes::from(wal_data_vec);
418 66070 : let msg = AppendRequest { h: hdr, wal_data };
419 66070 :
420 66070 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
421 : }
422 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
423 : }
424 623037 : }
425 : }
426 :
427 : /// Acceptor -> Proposer messages
428 : #[derive(Debug)]
429 : pub enum AcceptorProposerMessage {
430 : Greeting(AcceptorGreeting),
431 : VoteResponse(VoteResponse),
432 : AppendResponse(AppendResponse),
433 : }
434 :
435 : impl AcceptorProposerMessage {
436 : /// Serialize acceptor -> proposer message.
437 591711 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
438 591711 : match self {
439 463422 : AcceptorProposerMessage::Greeting(msg) => {
440 463422 : buf.put_u64_le('g' as u64);
441 463422 : buf.put_u64_le(msg.term);
442 463422 : buf.put_u64_le(msg.node_id.0);
443 463422 : }
444 73554 : AcceptorProposerMessage::VoteResponse(msg) => {
445 73554 : buf.put_u64_le('v' as u64);
446 73554 : buf.put_u64_le(msg.term);
447 73554 : buf.put_u64_le(msg.vote_given);
448 73554 : buf.put_u64_le(msg.flush_lsn.into());
449 73554 : buf.put_u64_le(msg.truncate_lsn.into());
450 73554 : buf.put_u32_le(msg.term_history.0.len() as u32);
451 354283 : for e in &msg.term_history.0 {
452 280729 : buf.put_u64_le(e.term);
453 280729 : buf.put_u64_le(e.lsn.into());
454 280729 : }
455 73554 : buf.put_u64_le(msg.timeline_start_lsn.into());
456 : }
457 54735 : AcceptorProposerMessage::AppendResponse(msg) => {
458 54735 : buf.put_u64_le('a' as u64);
459 54735 : buf.put_u64_le(msg.term);
460 54735 : buf.put_u64_le(msg.flush_lsn.into());
461 54735 : buf.put_u64_le(msg.commit_lsn.into());
462 54735 : buf.put_i64_le(msg.hs_feedback.ts);
463 54735 : buf.put_u64_le(msg.hs_feedback.xmin);
464 54735 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
465 :
466 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
467 : // if it is not present.
468 54735 : if let Some(ref msg) = msg.pageserver_feedback {
469 0 : msg.serialize(buf);
470 54735 : }
471 : }
472 : }
473 :
474 591711 : Ok(())
475 591711 : }
476 : }
477 :
478 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
479 : /// It controls all WAL disk writes and updates of control file.
480 : ///
481 : /// Currently safekeeper processes:
482 : /// - messages from compute (proposers) and provides replies
483 : /// - messages from broker peers
484 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
485 : /// LSN since the proposer safekeeper currently talking to appends WAL;
486 : /// determines last_log_term switch point.
487 : pub term_start_lsn: Lsn,
488 :
489 : pub state: TimelineState<CTRL>, // persistent state storage
490 : pub wal_store: WAL,
491 :
492 : node_id: NodeId, // safekeeper's node id
493 : }
494 :
495 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
496 : where
497 : CTRL: control_file::Storage,
498 : WAL: wal_storage::Storage,
499 : {
500 : /// Accepts a control file storage containing the safekeeper state.
501 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
502 : /// and `server` (`wal_seg_size` inside it) fields.
503 207266 : pub fn new(
504 207266 : state: TimelineState<CTRL>,
505 207266 : wal_store: WAL,
506 207266 : node_id: NodeId,
507 207266 : ) -> Result<SafeKeeper<CTRL, WAL>> {
508 207266 : if state.tenant_id == TenantId::from([0u8; 16])
509 207266 : || state.timeline_id == TimelineId::from([0u8; 16])
510 : {
511 0 : bail!(
512 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
513 0 : state.tenant_id,
514 0 : state.timeline_id
515 0 : );
516 207266 : }
517 207266 :
518 207266 : Ok(SafeKeeper {
519 207266 : term_start_lsn: Lsn(0),
520 207266 : state,
521 207266 : wal_store,
522 207266 : node_id,
523 207266 : })
524 207266 : }
525 :
526 : /// Get history of term switches for the available WAL
527 93563 : fn get_term_history(&self) -> TermHistory {
528 93563 : self.state
529 93563 : .acceptor_state
530 93563 : .term_history
531 93563 : .up_to(self.flush_lsn())
532 93563 : }
533 :
534 258 : pub fn get_last_log_term(&self) -> Term {
535 258 : self.state
536 258 : .acceptor_state
537 258 : .get_last_log_term(self.flush_lsn())
538 258 : }
539 :
540 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
541 275454 : pub fn flush_lsn(&self) -> Lsn {
542 275454 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
543 275454 : }
544 :
545 : /// Process message from proposer and possibly form reply. Concurrent
546 : /// callers must exclude each other.
547 677802 : pub async fn process_msg(
548 677802 : &mut self,
549 677802 : msg: &ProposerAcceptorMessage,
550 677802 : ) -> Result<Option<AcceptorProposerMessage>> {
551 677802 : match msg {
552 463422 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
553 73566 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
554 19997 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
555 12 : ProposerAcceptorMessage::AppendRequest(msg) => {
556 12 : self.handle_append_request(msg, true).await
557 : }
558 66070 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
559 66070 : self.handle_append_request(msg, false).await
560 : }
561 54735 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
562 : }
563 677802 : }
564 :
565 : /// Handle initial message from proposer: check its sanity and send my
566 : /// current term.
567 463422 : async fn handle_greeting(
568 463422 : &mut self,
569 463422 : msg: &ProposerGreeting,
570 463422 : ) -> Result<Option<AcceptorProposerMessage>> {
571 463422 : // Check protocol compatibility
572 463422 : if msg.protocol_version != SK_PROTOCOL_VERSION {
573 0 : bail!(
574 0 : "incompatible protocol version {}, expected {}",
575 0 : msg.protocol_version,
576 0 : SK_PROTOCOL_VERSION
577 0 : );
578 463422 : }
579 463422 : /* Postgres major version mismatch is treated as fatal error
580 463422 : * because safekeepers parse WAL headers and the format
581 463422 : * may change between versions.
582 463422 : */
583 463422 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
584 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
585 : {
586 0 : bail!(
587 0 : "incompatible server version {}, expected {}",
588 0 : msg.pg_version,
589 0 : self.state.server.pg_version
590 0 : );
591 463422 : }
592 463422 :
593 463422 : if msg.tenant_id != self.state.tenant_id {
594 0 : bail!(
595 0 : "invalid tenant ID, got {}, expected {}",
596 0 : msg.tenant_id,
597 0 : self.state.tenant_id
598 0 : );
599 463422 : }
600 463422 : if msg.timeline_id != self.state.timeline_id {
601 0 : bail!(
602 0 : "invalid timeline ID, got {}, expected {}",
603 0 : msg.timeline_id,
604 0 : self.state.timeline_id
605 0 : );
606 463422 : }
607 463422 : if self.state.server.wal_seg_size != msg.wal_seg_size {
608 0 : bail!(
609 0 : "invalid wal_seg_size, got {}, expected {}",
610 0 : msg.wal_seg_size,
611 0 : self.state.server.wal_seg_size
612 0 : );
613 463422 : }
614 463422 :
615 463422 : // system_id will be updated on mismatch
616 463422 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
617 463422 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
618 0 : if self.state.server.system_id != 0 {
619 0 : warn!(
620 0 : "unexpected system ID arrived, got {}, expected {}",
621 0 : msg.system_id, self.state.server.system_id
622 : );
623 0 : }
624 :
625 0 : let mut state = self.state.start_change();
626 0 : state.server.system_id = msg.system_id;
627 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
628 0 : state.server.pg_version = msg.pg_version;
629 0 : }
630 0 : self.state.finish_change(&state).await?;
631 463422 : }
632 :
633 463422 : info!(
634 0 : "processed greeting from walproposer {}, sending term {:?}",
635 14496 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
636 0 : self.state.acceptor_state.term
637 : );
638 463422 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
639 463422 : term: self.state.acceptor_state.term,
640 463422 : node_id: self.node_id,
641 463422 : })))
642 463422 : }
643 :
644 : /// Give vote for the given term, if we haven't done that previously.
645 73566 : async fn handle_vote_request(
646 73566 : &mut self,
647 73566 : msg: &VoteRequest,
648 73566 : ) -> Result<Option<AcceptorProposerMessage>> {
649 73566 : // Once voted, we won't accept data from older proposers; flush
650 73566 : // everything we've already received so that new proposer starts
651 73566 : // streaming at end of our WAL, without overlap. Currently we truncate
652 73566 : // WAL at streaming point, so this avoids truncating already committed
653 73566 : // WAL.
654 73566 : //
655 73566 : // TODO: it would be smoother to not truncate committed piece at
656 73566 : // handle_elected instead. Currently not a big deal, as proposer is the
657 73566 : // only source of WAL; with peer2peer recovery it would be more
658 73566 : // important.
659 73566 : self.wal_store.flush_wal().await?;
660 : // initialize with refusal
661 73566 : let mut resp = VoteResponse {
662 73566 : term: self.state.acceptor_state.term,
663 73566 : vote_given: false as u64,
664 73566 : flush_lsn: self.flush_lsn(),
665 73566 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
666 73566 : term_history: self.get_term_history(),
667 73566 : timeline_start_lsn: self.state.timeline_start_lsn,
668 73566 : };
669 73566 : if self.state.acceptor_state.term < msg.term {
670 69810 : let mut state = self.state.start_change();
671 69810 : state.acceptor_state.term = msg.term;
672 69810 : // persist vote before sending it out
673 69810 : self.state.finish_change(&state).await?;
674 :
675 69810 : resp.term = self.state.acceptor_state.term;
676 69810 : resp.vote_given = true as u64;
677 3756 : }
678 73566 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
679 73566 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
680 73566 : }
681 :
682 : /// Form AppendResponse from current state.
683 54747 : fn append_response(&self) -> AppendResponse {
684 54747 : let ar = AppendResponse {
685 54747 : term: self.state.acceptor_state.term,
686 54747 : flush_lsn: self.flush_lsn(),
687 54747 : commit_lsn: self.state.commit_lsn,
688 54747 : // will be filled by the upper code to avoid bothering safekeeper
689 54747 : hs_feedback: HotStandbyFeedback::empty(),
690 54747 : pageserver_feedback: None,
691 54747 : };
692 54747 : trace!("formed AppendResponse {:?}", ar);
693 54747 : ar
694 54747 : }
695 :
696 19997 : async fn handle_elected(
697 19997 : &mut self,
698 19997 : msg: &ProposerElected,
699 19997 : ) -> Result<Option<AcceptorProposerMessage>> {
700 19997 : let _timer = MISC_OPERATION_SECONDS
701 19997 : .with_label_values(&["handle_elected"])
702 19997 : .start_timer();
703 19997 :
704 19997 : info!(
705 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
706 0 : msg,
707 0 : self.state.acceptor_state.term,
708 0 : self.get_last_log_term(),
709 0 : self.flush_lsn()
710 : );
711 19997 : if self.state.acceptor_state.term < msg.term {
712 6 : let mut state = self.state.start_change();
713 6 : state.acceptor_state.term = msg.term;
714 6 : self.state.finish_change(&state).await?;
715 19991 : }
716 :
717 : // If our term is higher, ignore the message (next feedback will inform the compute)
718 19997 : if self.state.acceptor_state.term > msg.term {
719 0 : return Ok(None);
720 19997 : }
721 19997 :
722 19997 : // Before truncating WAL check-cross the check divergence point received
723 19997 : // from the walproposer.
724 19997 : let sk_th = self.get_term_history();
725 19997 : let last_common_point = match TermHistory::find_highest_common_point(
726 19997 : &msg.term_history,
727 19997 : &sk_th,
728 19997 : self.flush_lsn(),
729 19997 : ) {
730 : // No common point. Expect streaming from the beginning of the
731 : // history like walproposer while we don't have proper init.
732 3178 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
733 3178 : "empty walproposer term history {:?}",
734 3178 : msg.term_history
735 3178 : ))?,
736 16819 : Some(lcp) => lcp,
737 : };
738 : // This is expected to happen in a rare race when another connection
739 : // from the same walproposer writes + flushes WAL after this connection
740 : // sent flush_lsn in VoteRequest; for instance, very late
741 : // ProposerElected message delivery after another connection was
742 : // established and wrote WAL. In such cases error is transient;
743 : // reconnection makes safekeeper send newest term history and flush_lsn
744 : // and walproposer recalculates the streaming point. OTOH repeating
745 : // error indicates a serious bug.
746 19997 : if last_common_point.lsn != msg.start_streaming_at {
747 0 : bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
748 0 : last_common_point, msg.start_streaming_at,
749 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
750 0 : );
751 19997 : }
752 19997 :
753 19997 : // We are also expected to never attempt to truncate committed data.
754 19997 : assert!(
755 19997 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
756 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
757 0 : msg.start_streaming_at, self.state.inmem.commit_lsn,
758 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
759 : );
760 :
761 : // Before first WAL write initialize its segment. It makes first segment
762 : // pg_waldump'able because stream from compute doesn't include its
763 : // segment and page headers.
764 : //
765 : // If we fail before first WAL write flush this action would be
766 : // repeated, that's ok because it is idempotent.
767 19997 : if self.wal_store.flush_lsn() == Lsn::INVALID {
768 3166 : self.wal_store
769 3166 : .initialize_first_segment(msg.start_streaming_at)
770 0 : .await?;
771 16831 : }
772 :
773 : // truncate wal, update the LSNs
774 19997 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
775 :
776 : // and now adopt term history from proposer
777 : {
778 19997 : let mut state = self.state.start_change();
779 19997 :
780 19997 : // Here we learn initial LSN for the first time, set fields
781 19997 : // interested in that.
782 19997 :
783 19997 : if state.timeline_start_lsn == Lsn(0) {
784 : // Remember point where WAL begins globally.
785 3166 : state.timeline_start_lsn = msg.timeline_start_lsn;
786 3166 : info!(
787 0 : "setting timeline_start_lsn to {:?}",
788 : state.timeline_start_lsn
789 : );
790 16831 : }
791 19997 : if state.peer_horizon_lsn == Lsn(0) {
792 3166 : // Update peer_horizon_lsn as soon as we know where timeline starts.
793 3166 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
794 3166 : state.peer_horizon_lsn = msg.timeline_start_lsn;
795 16831 : }
796 19997 : if state.local_start_lsn == Lsn(0) {
797 3166 : state.local_start_lsn = msg.start_streaming_at;
798 3166 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
799 16831 : }
800 : // Initializing commit_lsn before acking first flushed record is
801 : // important to let find_end_of_wal skip the hole in the beginning
802 : // of the first segment.
803 : //
804 : // NB: on new clusters, this happens at the same time as
805 : // timeline_start_lsn initialization, it is taken outside to provide
806 : // upgrade.
807 19997 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
808 19997 :
809 19997 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
810 19997 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
811 19997 : // similar for remote_consistent_lsn
812 19997 : state.remote_consistent_lsn =
813 19997 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
814 19997 :
815 19997 : state.acceptor_state.term_history = msg.term_history.clone();
816 19997 : self.state.finish_change(&state).await?;
817 : }
818 :
819 19997 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
820 :
821 : // Cache LSN where term starts to immediately fsync control file with
822 : // commit_lsn once we reach it -- sync-safekeepers finishes when
823 : // persisted commit_lsn on majority of safekeepers aligns.
824 19997 : self.term_start_lsn = match msg.term_history.0.last() {
825 0 : None => bail!("proposer elected with empty term history"),
826 19997 : Some(term_lsn_start) => term_lsn_start.lsn,
827 19997 : };
828 19997 :
829 19997 : Ok(None)
830 19997 : }
831 :
832 : /// Advance commit_lsn taking into account what we have locally.
833 : ///
834 : /// Note: it is assumed that 'WAL we have is from the right term' check has
835 : /// already been done outside.
836 33077 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
837 33077 : // Both peers and walproposer communicate this value, we might already
838 33077 : // have a fresher (higher) version.
839 33077 : candidate = max(candidate, self.state.inmem.commit_lsn);
840 33077 : let commit_lsn = min(candidate, self.flush_lsn());
841 33077 : assert!(
842 33077 : commit_lsn >= self.state.inmem.commit_lsn,
843 0 : "commit_lsn monotonicity violated: old={} new={}",
844 : self.state.inmem.commit_lsn,
845 : commit_lsn
846 : );
847 :
848 33077 : self.state.inmem.commit_lsn = commit_lsn;
849 33077 :
850 33077 : // If new commit_lsn reached term switch, force sync of control
851 33077 : // file: walproposer in sync mode is very interested when this
852 33077 : // happens. Note: this is for sync-safekeepers mode only, as
853 33077 : // otherwise commit_lsn might jump over term_start_lsn.
854 33077 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
855 2532 : self.state.flush().await?;
856 30545 : }
857 :
858 33077 : Ok(())
859 33077 : }
860 :
861 : /// Handle request to append WAL.
862 : #[allow(clippy::comparison_chain)]
863 66082 : async fn handle_append_request(
864 66082 : &mut self,
865 66082 : msg: &AppendRequest,
866 66082 : require_flush: bool,
867 66082 : ) -> Result<Option<AcceptorProposerMessage>> {
868 66082 : if self.state.acceptor_state.term < msg.h.term {
869 0 : bail!("got AppendRequest before ProposerElected");
870 66082 : }
871 66082 :
872 66082 : // If our term is higher, immediately refuse the message.
873 66082 : if self.state.acceptor_state.term > msg.h.term {
874 0 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
875 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
876 66082 : }
877 66082 :
878 66082 : // Now we know that we are in the same term as the proposer,
879 66082 : // processing the message.
880 66082 :
881 66082 : self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
882 66082 :
883 66082 : // do the job
884 66082 : if !msg.wal_data.is_empty() {
885 12014 : self.wal_store
886 12014 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
887 0 : .await?;
888 54068 : }
889 :
890 : // flush wal to the disk, if required
891 66082 : if require_flush {
892 12 : self.wal_store.flush_wal().await?;
893 66070 : }
894 :
895 : // Update commit_lsn.
896 66082 : if msg.h.commit_lsn != Lsn(0) {
897 33077 : self.update_commit_lsn(msg.h.commit_lsn).await?;
898 33005 : }
899 : // Value calculated by walproposer can always lag:
900 : // - safekeepers can forget inmem value and send to proposer lower
901 : // persisted one on restart;
902 : // - if we make safekeepers always send persistent value,
903 : // any compute restart would pull it down.
904 : // Thus, take max before adopting.
905 66082 : self.state.inmem.peer_horizon_lsn =
906 66082 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
907 66082 :
908 66082 : // Update truncate and commit LSN in control file.
909 66082 : // To avoid negative impact on performance of extra fsync, do it only
910 66082 : // when commit_lsn delta exceeds WAL segment size.
911 66082 : if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
912 66082 : < self.state.inmem.commit_lsn
913 : {
914 0 : self.state.flush().await?;
915 66082 : }
916 :
917 66082 : trace!(
918 0 : "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
919 0 : msg.wal_data.len(),
920 : msg.h.end_lsn,
921 : msg.h.commit_lsn,
922 : msg.h.truncate_lsn,
923 : require_flush,
924 : );
925 :
926 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
927 66082 : if !require_flush {
928 66070 : return Ok(None);
929 12 : }
930 12 :
931 12 : let resp = self.append_response();
932 12 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
933 66082 : }
934 :
935 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
936 54735 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
937 54735 : self.wal_store.flush_wal().await?;
938 54735 : Ok(Some(AcceptorProposerMessage::AppendResponse(
939 54735 : self.append_response(),
940 54735 : )))
941 54735 : }
942 :
943 : /// Update commit_lsn from peer safekeeper data.
944 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
945 0 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
946 : // Note: the check is too restrictive, generally we can update local
947 : // commit_lsn if our history matches (is part of) history of advanced
948 : // commit_lsn provider.
949 0 : if sk_info.last_log_term == self.get_last_log_term() {
950 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
951 0 : }
952 0 : }
953 0 : Ok(())
954 0 : }
955 : }
956 :
957 : #[cfg(test)]
958 : mod tests {
959 : use futures::future::BoxFuture;
960 : use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
961 :
962 : use super::*;
963 : use crate::{
964 : state::{EvictionState, PersistedPeers, TimelinePersistentState},
965 : wal_storage::Storage,
966 : };
967 : use std::{ops::Deref, str::FromStr, time::Instant};
968 :
969 : // fake storage for tests
970 : struct InMemoryState {
971 : persisted_state: TimelinePersistentState,
972 : }
973 :
974 : #[async_trait::async_trait]
975 : impl control_file::Storage for InMemoryState {
976 18 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
977 18 : self.persisted_state = s.clone();
978 18 : Ok(())
979 18 : }
980 :
981 0 : fn last_persist_at(&self) -> Instant {
982 0 : Instant::now()
983 0 : }
984 : }
985 :
986 : impl Deref for InMemoryState {
987 : type Target = TimelinePersistentState;
988 :
989 384 : fn deref(&self) -> &Self::Target {
990 384 : &self.persisted_state
991 384 : }
992 : }
993 :
994 12 : fn test_sk_state() -> TimelinePersistentState {
995 12 : let mut state = TimelinePersistentState::empty();
996 12 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
997 12 : state.tenant_id = TenantId::from([1u8; 16]);
998 12 : state.timeline_id = TimelineId::from([1u8; 16]);
999 12 : state
1000 12 : }
1001 :
1002 : struct DummyWalStore {
1003 : lsn: Lsn,
1004 : }
1005 :
1006 : #[async_trait::async_trait]
1007 : impl wal_storage::Storage for DummyWalStore {
1008 66 : fn flush_lsn(&self) -> Lsn {
1009 66 : self.lsn
1010 66 : }
1011 :
1012 6 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1013 6 : Ok(())
1014 6 : }
1015 :
1016 12 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1017 12 : self.lsn = startpos + buf.len() as u64;
1018 12 : Ok(())
1019 12 : }
1020 :
1021 12 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1022 12 : self.lsn = end_pos;
1023 12 : Ok(())
1024 12 : }
1025 :
1026 24 : async fn flush_wal(&mut self) -> Result<()> {
1027 24 : Ok(())
1028 24 : }
1029 :
1030 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1031 0 : Box::pin(async { Ok(()) })
1032 0 : }
1033 :
1034 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1035 0 : crate::metrics::WalStorageMetrics::default()
1036 0 : }
1037 : }
1038 :
1039 : #[tokio::test]
1040 6 : async fn test_voting() {
1041 6 : let storage = InMemoryState {
1042 6 : persisted_state: test_sk_state(),
1043 6 : };
1044 6 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1045 6 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1046 6 :
1047 6 : // check voting for 1 is ok
1048 6 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1049 6 : let mut vote_resp = sk.process_msg(&vote_request).await;
1050 6 : match vote_resp.unwrap() {
1051 6 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1052 6 : r => panic!("unexpected response: {:?}", r),
1053 6 : }
1054 6 :
1055 6 : // reboot...
1056 6 : let state = sk.state.deref().clone();
1057 6 : let storage = InMemoryState {
1058 6 : persisted_state: state,
1059 6 : };
1060 6 :
1061 6 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1062 6 :
1063 6 : // and ensure voting second time for 1 is not ok
1064 6 : vote_resp = sk.process_msg(&vote_request).await;
1065 6 : match vote_resp.unwrap() {
1066 6 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1067 6 : r => panic!("unexpected response: {:?}", r),
1068 6 : }
1069 6 : }
1070 :
1071 : #[tokio::test]
1072 6 : async fn test_last_log_term_switch() {
1073 6 : let storage = InMemoryState {
1074 6 : persisted_state: test_sk_state(),
1075 6 : };
1076 6 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1077 6 :
1078 6 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1079 6 :
1080 6 : let mut ar_hdr = AppendRequestHeader {
1081 6 : term: 1,
1082 6 : term_start_lsn: Lsn(3),
1083 6 : begin_lsn: Lsn(1),
1084 6 : end_lsn: Lsn(2),
1085 6 : commit_lsn: Lsn(0),
1086 6 : truncate_lsn: Lsn(0),
1087 6 : proposer_uuid: [0; 16],
1088 6 : };
1089 6 : let mut append_request = AppendRequest {
1090 6 : h: ar_hdr.clone(),
1091 6 : wal_data: Bytes::from_static(b"b"),
1092 6 : };
1093 6 :
1094 6 : let pem = ProposerElected {
1095 6 : term: 1,
1096 6 : start_streaming_at: Lsn(3),
1097 6 : term_history: TermHistory(vec![TermLsn {
1098 6 : term: 1,
1099 6 : lsn: Lsn(3),
1100 6 : }]),
1101 6 : timeline_start_lsn: Lsn(0),
1102 6 : };
1103 6 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1104 6 : .await
1105 6 : .unwrap();
1106 6 :
1107 6 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1108 6 : let resp = sk
1109 6 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1110 6 : .await;
1111 6 : assert!(resp.is_ok());
1112 6 : assert_eq!(sk.get_last_log_term(), 0);
1113 6 :
1114 6 : // but record at term_start_lsn does the switch
1115 6 : ar_hdr.begin_lsn = Lsn(2);
1116 6 : ar_hdr.end_lsn = Lsn(3);
1117 6 : append_request = AppendRequest {
1118 6 : h: ar_hdr,
1119 6 : wal_data: Bytes::from_static(b"b"),
1120 6 : };
1121 6 : let resp = sk
1122 6 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1123 6 : .await;
1124 6 : assert!(resp.is_ok());
1125 6 : sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
1126 6 : assert_eq!(sk.get_last_log_term(), 1);
1127 6 : }
1128 :
1129 : #[test]
1130 6 : fn test_find_highest_common_point_none() {
1131 6 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1132 6 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1133 6 : assert_eq!(
1134 6 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1135 6 : None
1136 6 : );
1137 6 : }
1138 :
1139 : #[test]
1140 6 : fn test_find_highest_common_point_middle() {
1141 6 : let prop_th = TermHistory(vec![
1142 6 : (1, Lsn(10)).into(),
1143 6 : (2, Lsn(20)).into(),
1144 6 : (4, Lsn(40)).into(),
1145 6 : ]);
1146 6 : let sk_th = TermHistory(vec![
1147 6 : (1, Lsn(10)).into(),
1148 6 : (2, Lsn(20)).into(),
1149 6 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1150 6 : ]);
1151 6 : assert_eq!(
1152 6 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1153 6 : Some(TermLsn {
1154 6 : term: 2,
1155 6 : lsn: Lsn(30),
1156 6 : })
1157 6 : );
1158 6 : }
1159 :
1160 : #[test]
1161 6 : fn test_find_highest_common_point_sk_end() {
1162 6 : let prop_th = TermHistory(vec![
1163 6 : (1, Lsn(10)).into(),
1164 6 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1165 6 : (4, Lsn(40)).into(),
1166 6 : ]);
1167 6 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1168 6 : assert_eq!(
1169 6 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1170 6 : Some(TermLsn {
1171 6 : term: 2,
1172 6 : lsn: Lsn(32),
1173 6 : })
1174 6 : );
1175 6 : }
1176 :
1177 : #[test]
1178 6 : fn test_find_highest_common_point_walprop() {
1179 6 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1180 6 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1181 6 : assert_eq!(
1182 6 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1183 6 : Some(TermLsn {
1184 6 : term: 2,
1185 6 : lsn: Lsn(32),
1186 6 : })
1187 6 : );
1188 6 : }
1189 :
1190 : #[test]
1191 6 : fn test_sk_state_bincode_serde_roundtrip() {
1192 6 : use utils::Hex;
1193 6 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1194 6 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1195 6 : let state = TimelinePersistentState {
1196 6 : tenant_id,
1197 6 : timeline_id,
1198 6 : acceptor_state: AcceptorState {
1199 6 : term: 42,
1200 6 : term_history: TermHistory(vec![TermLsn {
1201 6 : lsn: Lsn(0x1),
1202 6 : term: 41,
1203 6 : }]),
1204 6 : },
1205 6 : server: ServerInfo {
1206 6 : pg_version: 14,
1207 6 : system_id: 0x1234567887654321,
1208 6 : wal_seg_size: 0x12345678,
1209 6 : },
1210 6 : proposer_uuid: {
1211 6 : let mut arr = timeline_id.as_arr();
1212 6 : arr.reverse();
1213 6 : arr
1214 6 : },
1215 6 : timeline_start_lsn: Lsn(0x12345600),
1216 6 : local_start_lsn: Lsn(0x12),
1217 6 : commit_lsn: Lsn(1234567800),
1218 6 : backup_lsn: Lsn(1234567300),
1219 6 : peer_horizon_lsn: Lsn(9999999),
1220 6 : remote_consistent_lsn: Lsn(1234560000),
1221 6 : peers: PersistedPeers(vec![(
1222 6 : NodeId(1),
1223 6 : PersistedPeerInfo {
1224 6 : backup_lsn: Lsn(1234567000),
1225 6 : term: 42,
1226 6 : flush_lsn: Lsn(1234567800 - 8),
1227 6 : commit_lsn: Lsn(1234567600),
1228 6 : },
1229 6 : )]),
1230 6 : partial_backup: crate::wal_backup_partial::State::default(),
1231 6 : eviction_state: EvictionState::Present,
1232 6 : };
1233 6 :
1234 6 : let ser = state.ser().unwrap();
1235 6 :
1236 6 : #[rustfmt::skip]
1237 6 : let expected = [
1238 6 : // tenant_id as length prefixed hex
1239 6 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1240 6 : 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
1241 6 : // timeline_id as length prefixed hex
1242 6 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1243 6 : 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
1244 6 : // term
1245 6 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1246 6 : // length prefix
1247 6 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1248 6 : // unsure why this order is swapped
1249 6 : 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1250 6 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1251 6 : // pg_version
1252 6 : 0x0e, 0x00, 0x00, 0x00,
1253 6 : // systemid
1254 6 : 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
1255 6 : // wal_seg_size
1256 6 : 0x78, 0x56, 0x34, 0x12,
1257 6 : // pguuid as length prefixed hex
1258 6 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1259 6 : 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
1260 6 :
1261 6 : // timeline_start_lsn
1262 6 : 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
1263 6 : 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1264 6 : 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1265 6 : 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1266 6 : 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
1267 6 : 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1268 6 : // length prefix for persistentpeers
1269 6 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1270 6 : // nodeid
1271 6 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1272 6 : // backuplsn
1273 6 : 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1274 6 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1275 6 : 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1276 6 : 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1277 6 : // partial_backup
1278 6 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1279 6 : // eviction_state
1280 6 : 0x00, 0x00, 0x00, 0x00,
1281 6 : ];
1282 6 :
1283 6 : assert_eq!(Hex(&ser), Hex(&expected));
1284 :
1285 6 : let deser = TimelinePersistentState::des(&ser).unwrap();
1286 6 :
1287 6 : assert_eq!(deser, state);
1288 6 : }
1289 : }
|