Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use storage_broker::proto::SafekeeperTimelineInfo;
14 :
15 : use tracing::*;
16 :
17 : use crate::control_file;
18 : use crate::send_wal::HotStandbyFeedback;
19 :
20 : use crate::state::TimelineState;
21 : use crate::wal_storage;
22 : use pq_proto::SystemId;
23 : use utils::pageserver_feedback::PageserverFeedback;
24 : use utils::{
25 : bin_ser::LeSer,
26 : id::{NodeId, TenantId, TimelineId},
27 : lsn::Lsn,
28 : };
29 :
30 : const SK_PROTOCOL_VERSION: u32 = 2;
31 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
32 :
33 : /// Consensus logical timestamp.
34 : pub type Term = u64;
35 : pub const INVALID_TERM: Term = 0;
36 :
37 8 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
38 : pub struct TermLsn {
39 : pub term: Term,
40 : pub lsn: Lsn,
41 : }
42 :
43 : // Creation from tuple provides less typing (e.g. for unit tests).
44 : impl From<(Term, Lsn)> for TermLsn {
45 36 : fn from(pair: (Term, Lsn)) -> TermLsn {
46 36 : TermLsn {
47 36 : term: pair.0,
48 36 : lsn: pair.1,
49 36 : }
50 36 : }
51 : }
52 :
53 12 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
54 : pub struct TermHistory(pub Vec<TermLsn>);
55 :
56 : impl TermHistory {
57 11451 : pub fn empty() -> TermHistory {
58 11451 : TermHistory(Vec::new())
59 11451 : }
60 :
61 : // Parse TermHistory as n_entries followed by TermLsn pairs
62 6261 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
63 6261 : if bytes.remaining() < 4 {
64 0 : bail!("TermHistory misses len");
65 6261 : }
66 6261 : let n_entries = bytes.get_u32_le();
67 6261 : let mut res = Vec::with_capacity(n_entries as usize);
68 6261 : for _ in 0..n_entries {
69 49080 : if bytes.remaining() < 16 {
70 0 : bail!("TermHistory is incomplete");
71 49080 : }
72 49080 : res.push(TermLsn {
73 49080 : term: bytes.get_u64_le(),
74 49080 : lsn: bytes.get_u64_le().into(),
75 49080 : })
76 : }
77 6261 : Ok(TermHistory(res))
78 6261 : }
79 :
80 : /// Return copy of self with switches happening strictly after up_to
81 : /// truncated.
82 30608 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
83 30608 : let mut res = Vec::with_capacity(self.0.len());
84 153492 : for e in &self.0 {
85 122925 : if e.lsn > up_to {
86 41 : break;
87 122884 : }
88 122884 : res.push(*e);
89 : }
90 30608 : TermHistory(res)
91 30608 : }
92 :
93 : /// Find point of divergence between leader (walproposer) term history and
94 : /// safekeeper. Arguments are not symmetrics as proposer history ends at
95 : /// +infinity while safekeeper at flush_lsn.
96 : /// C version is at walproposer SendProposerElected.
97 8 : pub fn find_highest_common_point(
98 8 : prop_th: &TermHistory,
99 8 : sk_th: &TermHistory,
100 8 : sk_wal_end: Lsn,
101 8 : ) -> Option<TermLsn> {
102 8 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
103 :
104 8 : if let Some(sk_th_last) = sk_th.last() {
105 8 : assert!(
106 8 : sk_th_last.lsn <= sk_wal_end,
107 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
108 : sk_th_last,
109 : sk_wal_end
110 : );
111 0 : }
112 :
113 : // find last common term, if any...
114 8 : let mut last_common_idx = None;
115 16 : for i in 0..min(sk_th.len(), prop_th.len()) {
116 16 : if prop_th[i].term != sk_th[i].term {
117 4 : break;
118 12 : }
119 12 : // If term is the same, LSN must be equal as well.
120 12 : assert!(
121 12 : prop_th[i].lsn == sk_th[i].lsn,
122 0 : "same term {} has different start LSNs: prop {}, sk {}",
123 0 : prop_th[i].term,
124 0 : prop_th[i].lsn,
125 0 : sk_th[i].lsn
126 : );
127 12 : last_common_idx = Some(i);
128 : }
129 8 : let last_common_idx = match last_common_idx {
130 2 : None => return None, // no common point
131 6 : Some(lci) => lci,
132 6 : };
133 6 : // Now find where it ends at both prop and sk and take min. End of
134 6 : // (common) term is the start of the next except it is the last one;
135 6 : // there it is flush_lsn in case of safekeeper or, in case of proposer
136 6 : // +infinity, so we just take flush_lsn then.
137 6 : if last_common_idx == prop_th.len() - 1 {
138 2 : Some(TermLsn {
139 2 : term: prop_th[last_common_idx].term,
140 2 : lsn: sk_wal_end,
141 2 : })
142 : } else {
143 4 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
144 4 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
145 2 : sk_th[last_common_idx + 1].lsn
146 : } else {
147 2 : sk_wal_end
148 : };
149 4 : Some(TermLsn {
150 4 : term: prop_th[last_common_idx].term,
151 4 : lsn: min(prop_common_term_end, sk_common_term_end),
152 4 : })
153 : }
154 8 : }
155 : }
156 :
157 : /// Display only latest entries for Debug.
158 : impl fmt::Debug for TermHistory {
159 400 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
160 400 : let n_printed = 20;
161 400 : write!(
162 400 : fmt,
163 400 : "{}{:?}",
164 400 : if self.0.len() > n_printed { "... " } else { "" },
165 400 : self.0
166 400 : .iter()
167 400 : .rev()
168 400 : .take(n_printed)
169 2216 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
170 400 : .collect::<Vec<_>>()
171 400 : )
172 400 : }
173 : }
174 :
175 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
176 : pub type PgUuid = [u8; 16];
177 :
178 : /// Persistent consensus state of the acceptor.
179 12 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
180 : pub struct AcceptorState {
181 : /// acceptor's last term it voted for (advanced in 1 phase)
182 : pub term: Term,
183 : /// History of term switches for safekeeper's WAL.
184 : /// Actually it often goes *beyond* WAL contents as we adopt term history
185 : /// from the proposer before recovery.
186 : pub term_history: TermHistory,
187 : }
188 :
189 : impl AcceptorState {
190 : /// acceptor's last_log_term is the term of the highest entry in the log
191 6267 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
192 6267 : let th = self.term_history.up_to(flush_lsn);
193 6267 : match th.0.last() {
194 5165 : Some(e) => e.term,
195 1102 : None => 0,
196 : }
197 6267 : }
198 : }
199 :
200 : /// Information about Postgres. Safekeeper gets it once and then verifies
201 : /// all further connections from computes match.
202 8 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
203 : pub struct ServerInfo {
204 : /// Postgres server version
205 : pub pg_version: u32,
206 : pub system_id: SystemId,
207 : pub wal_seg_size: u32,
208 : }
209 :
210 4 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
211 : pub struct PersistedPeerInfo {
212 : /// LSN up to which safekeeper offloaded WAL to s3.
213 : pub backup_lsn: Lsn,
214 : /// Term of the last entry.
215 : pub term: Term,
216 : /// LSN of the last record.
217 : pub flush_lsn: Lsn,
218 : /// Up to which LSN safekeeper regards its WAL as committed.
219 : pub commit_lsn: Lsn,
220 : }
221 :
222 : impl PersistedPeerInfo {
223 0 : pub fn new() -> Self {
224 0 : Self {
225 0 : backup_lsn: Lsn::INVALID,
226 0 : term: INVALID_TERM,
227 0 : flush_lsn: Lsn(0),
228 0 : commit_lsn: Lsn(0),
229 0 : }
230 0 : }
231 : }
232 :
233 : // make clippy happy
234 : impl Default for PersistedPeerInfo {
235 0 : fn default() -> Self {
236 0 : Self::new()
237 0 : }
238 : }
239 :
240 : // protocol messages
241 :
242 : /// Initial Proposer -> Acceptor message
243 155678 : #[derive(Debug, Deserialize)]
244 : pub struct ProposerGreeting {
245 : /// proposer-acceptor protocol version
246 : pub protocol_version: u32,
247 : /// Postgres server version
248 : pub pg_version: u32,
249 : pub proposer_id: PgUuid,
250 : pub system_id: SystemId,
251 : pub timeline_id: TimelineId,
252 : pub tenant_id: TenantId,
253 : pub tli: TimeLineID,
254 : pub wal_seg_size: u32,
255 : }
256 :
257 : /// Acceptor -> Proposer initial response: the highest term known to me
258 : /// (acceptor voted for).
259 : #[derive(Debug, Serialize)]
260 : pub struct AcceptorGreeting {
261 : term: u64,
262 : node_id: NodeId,
263 : }
264 :
265 : /// Vote request sent from proposer to safekeepers
266 24337 : #[derive(Debug, Deserialize)]
267 : pub struct VoteRequest {
268 : pub term: Term,
269 : }
270 :
271 : /// Vote itself, sent from safekeeper to proposer
272 : #[derive(Debug, Serialize)]
273 : pub struct VoteResponse {
274 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
275 : vote_given: u64, // fixme u64 due to padding
276 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
277 : // proposer to choose the most advanced one.
278 : pub flush_lsn: Lsn,
279 : truncate_lsn: Lsn,
280 : pub term_history: TermHistory,
281 : timeline_start_lsn: Lsn,
282 : }
283 :
284 : /*
285 : * Proposer -> Acceptor message announcing proposer is elected and communicating
286 : * term history to it.
287 : */
288 : #[derive(Debug)]
289 : pub struct ProposerElected {
290 : pub term: Term,
291 : pub start_streaming_at: Lsn,
292 : pub term_history: TermHistory,
293 : pub timeline_start_lsn: Lsn,
294 : }
295 :
296 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
297 : /// communicates commit_lsn.
298 : #[derive(Debug)]
299 : pub struct AppendRequest {
300 : pub h: AppendRequestHeader,
301 : pub wal_data: Bytes,
302 : }
303 20505 : #[derive(Debug, Clone, Deserialize)]
304 : pub struct AppendRequestHeader {
305 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
306 : pub term: Term,
307 : // TODO: remove this field from the protocol, it in unused -- LSN of term
308 : // switch can be taken from ProposerElected (as well as from term history).
309 : pub term_start_lsn: Lsn,
310 : /// start position of message in WAL
311 : pub begin_lsn: Lsn,
312 : /// end position of message in WAL
313 : pub end_lsn: Lsn,
314 : /// LSN committed by quorum of safekeepers
315 : pub commit_lsn: Lsn,
316 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
317 : pub truncate_lsn: Lsn,
318 : // only for logging/debugging
319 : pub proposer_uuid: PgUuid,
320 : }
321 :
322 : /// Report safekeeper state to proposer
323 : #[derive(Debug, Serialize, Clone)]
324 : pub struct AppendResponse {
325 : // Current term of the safekeeper; if it is higher than proposer's, the
326 : // compute is out of date.
327 : pub term: Term,
328 : // Flushed end of wal on safekeeper; one should be always mindful from what
329 : // term history this value comes, either checking history directly or
330 : // observing term being set to one for which WAL truncation is known to have
331 : // happened.
332 : pub flush_lsn: Lsn,
333 : // We report back our awareness about which WAL is committed, as this is
334 : // a criterion for walproposer --sync mode exit
335 : pub commit_lsn: Lsn,
336 : pub hs_feedback: HotStandbyFeedback,
337 : pub pageserver_feedback: Option<PageserverFeedback>,
338 : }
339 :
340 : impl AppendResponse {
341 0 : fn term_only(term: Term) -> AppendResponse {
342 0 : AppendResponse {
343 0 : term,
344 0 : flush_lsn: Lsn(0),
345 0 : commit_lsn: Lsn(0),
346 0 : hs_feedback: HotStandbyFeedback::empty(),
347 0 : pageserver_feedback: None,
348 0 : }
349 0 : }
350 : }
351 :
352 : /// Proposer -> Acceptor messages
353 : #[derive(Debug)]
354 : pub enum ProposerAcceptorMessage {
355 : Greeting(ProposerGreeting),
356 : VoteRequest(VoteRequest),
357 : Elected(ProposerElected),
358 : AppendRequest(AppendRequest),
359 : NoFlushAppendRequest(AppendRequest),
360 : FlushWAL,
361 : }
362 :
363 : impl ProposerAcceptorMessage {
364 : /// Parse proposer message.
365 206781 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
366 206781 : // xxx using Reader is inefficient but easy to work with bincode
367 206781 : let mut stream = msg_bytes.reader();
368 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
369 206781 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
370 206781 : match tag {
371 : 'g' => {
372 155678 : let msg = ProposerGreeting::des_from(&mut stream)?;
373 155678 : Ok(ProposerAcceptorMessage::Greeting(msg))
374 : }
375 : 'v' => {
376 24337 : let msg = VoteRequest::des_from(&mut stream)?;
377 24337 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
378 : }
379 : 'e' => {
380 6261 : let mut msg_bytes = stream.into_inner();
381 6261 : if msg_bytes.remaining() < 16 {
382 0 : bail!("ProposerElected message is not complete");
383 6261 : }
384 6261 : let term = msg_bytes.get_u64_le();
385 6261 : let start_streaming_at = msg_bytes.get_u64_le().into();
386 6261 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
387 6261 : if msg_bytes.remaining() < 8 {
388 0 : bail!("ProposerElected message is not complete");
389 6261 : }
390 6261 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
391 6261 : let msg = ProposerElected {
392 6261 : term,
393 6261 : start_streaming_at,
394 6261 : timeline_start_lsn,
395 6261 : term_history,
396 6261 : };
397 6261 : Ok(ProposerAcceptorMessage::Elected(msg))
398 : }
399 : 'a' => {
400 : // read header followed by wal data
401 20505 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
402 20505 : let rec_size = hdr
403 20505 : .end_lsn
404 20505 : .checked_sub(hdr.begin_lsn)
405 20505 : .context("begin_lsn > end_lsn in AppendRequest")?
406 : .0 as usize;
407 20505 : if rec_size > MAX_SEND_SIZE {
408 0 : bail!(
409 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
410 0 : MAX_SEND_SIZE
411 0 : );
412 20505 : }
413 20505 :
414 20505 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
415 20505 : stream.read_exact(&mut wal_data_vec)?;
416 20505 : let wal_data = Bytes::from(wal_data_vec);
417 20505 : let msg = AppendRequest { h: hdr, wal_data };
418 20505 :
419 20505 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
420 : }
421 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
422 : }
423 206781 : }
424 : }
425 :
426 : /// Acceptor -> Proposer messages
427 : #[derive(Debug)]
428 : pub enum AcceptorProposerMessage {
429 : Greeting(AcceptorGreeting),
430 : VoteResponse(VoteResponse),
431 : AppendResponse(AppendResponse),
432 : }
433 :
434 : impl AcceptorProposerMessage {
435 : /// Serialize acceptor -> proposer message.
436 196890 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
437 196890 : match self {
438 155678 : AcceptorProposerMessage::Greeting(msg) => {
439 155678 : buf.put_u64_le('g' as u64);
440 155678 : buf.put_u64_le(msg.term);
441 155678 : buf.put_u64_le(msg.node_id.0);
442 155678 : }
443 24337 : AcceptorProposerMessage::VoteResponse(msg) => {
444 24337 : buf.put_u64_le('v' as u64);
445 24337 : buf.put_u64_le(msg.term);
446 24337 : buf.put_u64_le(msg.vote_given);
447 24337 : buf.put_u64_le(msg.flush_lsn.into());
448 24337 : buf.put_u64_le(msg.truncate_lsn.into());
449 24337 : buf.put_u32_le(msg.term_history.0.len() as u32);
450 105024 : for e in &msg.term_history.0 {
451 80687 : buf.put_u64_le(e.term);
452 80687 : buf.put_u64_le(e.lsn.into());
453 80687 : }
454 24337 : buf.put_u64_le(msg.timeline_start_lsn.into());
455 : }
456 16875 : AcceptorProposerMessage::AppendResponse(msg) => {
457 16875 : buf.put_u64_le('a' as u64);
458 16875 : buf.put_u64_le(msg.term);
459 16875 : buf.put_u64_le(msg.flush_lsn.into());
460 16875 : buf.put_u64_le(msg.commit_lsn.into());
461 16875 : buf.put_i64_le(msg.hs_feedback.ts);
462 16875 : buf.put_u64_le(msg.hs_feedback.xmin);
463 16875 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
464 :
465 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
466 : // if it is not present.
467 16875 : if let Some(ref msg) = msg.pageserver_feedback {
468 0 : msg.serialize(buf);
469 16875 : }
470 : }
471 : }
472 :
473 196890 : Ok(())
474 196890 : }
475 : }
476 :
477 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
478 : /// It controls all WAL disk writes and updates of control file.
479 : ///
480 : /// Currently safekeeper processes:
481 : /// - messages from compute (proposers) and provides replies
482 : /// - messages from broker peers
483 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
484 : /// LSN since the proposer safekeeper currently talking to appends WAL;
485 : /// determines last_log_term switch point.
486 : pub term_start_lsn: Lsn,
487 :
488 : pub state: TimelineState<CTRL>, // persistent state storage
489 : pub wal_store: WAL,
490 :
491 : node_id: NodeId, // safekeeper's node id
492 : }
493 :
494 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
495 : where
496 : CTRL: control_file::Storage,
497 : WAL: wal_storage::Storage,
498 : {
499 : /// Accepts a control file storage containing the safekeeper state.
500 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
501 : /// and `server` (`wal_seg_size` inside it) fields.
502 69776 : pub fn new(
503 69776 : state: TimelineState<CTRL>,
504 69776 : wal_store: WAL,
505 69776 : node_id: NodeId,
506 69776 : ) -> Result<SafeKeeper<CTRL, WAL>> {
507 69776 : if state.tenant_id == TenantId::from([0u8; 16])
508 69776 : || state.timeline_id == TimelineId::from([0u8; 16])
509 : {
510 0 : bail!(
511 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
512 0 : state.tenant_id,
513 0 : state.timeline_id
514 0 : );
515 69776 : }
516 69776 :
517 69776 : Ok(SafeKeeper {
518 69776 : term_start_lsn: Lsn(0),
519 69776 : state,
520 69776 : wal_store,
521 69776 : node_id,
522 69776 : })
523 69776 : }
524 :
525 : /// Get history of term switches for the available WAL
526 24341 : fn get_term_history(&self) -> TermHistory {
527 24341 : self.state
528 24341 : .acceptor_state
529 24341 : .term_history
530 24341 : .up_to(self.flush_lsn())
531 24341 : }
532 :
533 6267 : pub fn get_last_log_term(&self) -> Term {
534 6267 : self.state
535 6267 : .acceptor_state
536 6267 : .get_last_log_term(self.flush_lsn())
537 6267 : }
538 :
539 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
540 82355 : pub fn flush_lsn(&self) -> Lsn {
541 82355 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
542 82355 : }
543 :
544 : /// Process message from proposer and possibly form reply. Concurrent
545 : /// callers must exclude each other.
546 223666 : pub async fn process_msg(
547 223666 : &mut self,
548 223666 : msg: &ProposerAcceptorMessage,
549 223666 : ) -> Result<Option<AcceptorProposerMessage>> {
550 223666 : match msg {
551 155678 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
552 24341 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
553 6263 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
554 4 : ProposerAcceptorMessage::AppendRequest(msg) => {
555 4 : self.handle_append_request(msg, true).await
556 : }
557 20505 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
558 20505 : self.handle_append_request(msg, false).await
559 : }
560 16875 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
561 : }
562 223666 : }
563 :
564 : /// Handle initial message from proposer: check its sanity and send my
565 : /// current term.
566 155678 : async fn handle_greeting(
567 155678 : &mut self,
568 155678 : msg: &ProposerGreeting,
569 155678 : ) -> Result<Option<AcceptorProposerMessage>> {
570 155678 : // Check protocol compatibility
571 155678 : if msg.protocol_version != SK_PROTOCOL_VERSION {
572 0 : bail!(
573 0 : "incompatible protocol version {}, expected {}",
574 0 : msg.protocol_version,
575 0 : SK_PROTOCOL_VERSION
576 0 : );
577 155678 : }
578 155678 : /* Postgres major version mismatch is treated as fatal error
579 155678 : * because safekeepers parse WAL headers and the format
580 155678 : * may change between versions.
581 155678 : */
582 155678 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
583 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
584 : {
585 0 : bail!(
586 0 : "incompatible server version {}, expected {}",
587 0 : msg.pg_version,
588 0 : self.state.server.pg_version
589 0 : );
590 155678 : }
591 155678 :
592 155678 : if msg.tenant_id != self.state.tenant_id {
593 0 : bail!(
594 0 : "invalid tenant ID, got {}, expected {}",
595 0 : msg.tenant_id,
596 0 : self.state.tenant_id
597 0 : );
598 155678 : }
599 155678 : if msg.timeline_id != self.state.timeline_id {
600 0 : bail!(
601 0 : "invalid timeline ID, got {}, expected {}",
602 0 : msg.timeline_id,
603 0 : self.state.timeline_id
604 0 : );
605 155678 : }
606 155678 : if self.state.server.wal_seg_size != msg.wal_seg_size {
607 0 : bail!(
608 0 : "invalid wal_seg_size, got {}, expected {}",
609 0 : msg.wal_seg_size,
610 0 : self.state.server.wal_seg_size
611 0 : );
612 155678 : }
613 155678 :
614 155678 : // system_id will be updated on mismatch
615 155678 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
616 155678 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
617 0 : if self.state.server.system_id != 0 {
618 0 : warn!(
619 0 : "unexpected system ID arrived, got {}, expected {}",
620 0 : msg.system_id, self.state.server.system_id
621 : );
622 0 : }
623 :
624 0 : let mut state = self.state.start_change();
625 0 : state.server.system_id = msg.system_id;
626 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
627 0 : state.server.pg_version = msg.pg_version;
628 0 : }
629 0 : self.state.finish_change(&state).await?;
630 155678 : }
631 :
632 155678 : info!(
633 0 : "processed greeting from walproposer {}, sending term {:?}",
634 4832 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
635 0 : self.state.acceptor_state.term
636 : );
637 155678 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
638 155678 : term: self.state.acceptor_state.term,
639 155678 : node_id: self.node_id,
640 155678 : })))
641 155678 : }
642 :
643 : /// Give vote for the given term, if we haven't done that previously.
644 24341 : async fn handle_vote_request(
645 24341 : &mut self,
646 24341 : msg: &VoteRequest,
647 24341 : ) -> Result<Option<AcceptorProposerMessage>> {
648 24341 : // Once voted, we won't accept data from older proposers; flush
649 24341 : // everything we've already received so that new proposer starts
650 24341 : // streaming at end of our WAL, without overlap. Currently we truncate
651 24341 : // WAL at streaming point, so this avoids truncating already committed
652 24341 : // WAL.
653 24341 : //
654 24341 : // TODO: it would be smoother to not truncate committed piece at
655 24341 : // handle_elected instead. Currently not a big deal, as proposer is the
656 24341 : // only source of WAL; with peer2peer recovery it would be more
657 24341 : // important.
658 24341 : self.wal_store.flush_wal().await?;
659 : // initialize with refusal
660 24341 : let mut resp = VoteResponse {
661 24341 : term: self.state.acceptor_state.term,
662 24341 : vote_given: false as u64,
663 24341 : flush_lsn: self.flush_lsn(),
664 24341 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
665 24341 : term_history: self.get_term_history(),
666 24341 : timeline_start_lsn: self.state.timeline_start_lsn,
667 24341 : };
668 24341 : if self.state.acceptor_state.term < msg.term {
669 23106 : let mut state = self.state.start_change();
670 23106 : state.acceptor_state.term = msg.term;
671 23106 : // persist vote before sending it out
672 23106 : self.state.finish_change(&state).await?;
673 :
674 23106 : resp.term = self.state.acceptor_state.term;
675 23106 : resp.vote_given = true as u64;
676 1235 : }
677 24341 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
678 24341 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
679 24341 : }
680 :
681 : /// Form AppendResponse from current state.
682 16879 : fn append_response(&self) -> AppendResponse {
683 16879 : let ar = AppendResponse {
684 16879 : term: self.state.acceptor_state.term,
685 16879 : flush_lsn: self.flush_lsn(),
686 16879 : commit_lsn: self.state.commit_lsn,
687 16879 : // will be filled by the upper code to avoid bothering safekeeper
688 16879 : hs_feedback: HotStandbyFeedback::empty(),
689 16879 : pageserver_feedback: None,
690 16879 : };
691 16879 : trace!("formed AppendResponse {:?}", ar);
692 16879 : ar
693 16879 : }
694 :
695 6263 : async fn handle_elected(
696 6263 : &mut self,
697 6263 : msg: &ProposerElected,
698 6263 : ) -> Result<Option<AcceptorProposerMessage>> {
699 6263 : info!("received ProposerElected {:?}", msg);
700 6263 : if self.state.acceptor_state.term < msg.term {
701 2 : let mut state = self.state.start_change();
702 2 : state.acceptor_state.term = msg.term;
703 2 : self.state.finish_change(&state).await?;
704 6261 : }
705 :
706 : // If our term is higher, ignore the message (next feedback will inform the compute)
707 6263 : if self.state.acceptor_state.term > msg.term {
708 0 : return Ok(None);
709 6263 : }
710 6263 :
711 6263 : // This might happen in a rare race when another (old) connection from
712 6263 : // the same walproposer writes + flushes WAL after this connection
713 6263 : // already sent flush_lsn in VoteRequest. It is generally safe to
714 6263 : // proceed, but to prevent commit_lsn surprisingly going down we should
715 6263 : // either refuse the session (simpler) or skip the part we already have
716 6263 : // from the stream (can be implemented).
717 6263 : if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at {
718 0 : bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
719 0 : msg.term, self.flush_lsn(), msg.start_streaming_at)
720 6263 : }
721 6263 : // Otherwise we must never attempt to truncate committed data.
722 6263 : assert!(
723 6263 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
724 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
725 : msg.start_streaming_at,
726 : self.state.inmem.commit_lsn
727 : );
728 :
729 : // Before first WAL write initialize its segment. It makes first segment
730 : // pg_waldump'able because stream from compute doesn't include its
731 : // segment and page headers.
732 : //
733 : // If we fail before first WAL write flush this action would be
734 : // repeated, that's ok because it is idempotent.
735 6263 : if self.wal_store.flush_lsn() == Lsn::INVALID {
736 1100 : self.wal_store
737 1100 : .initialize_first_segment(msg.start_streaming_at)
738 0 : .await?;
739 5163 : }
740 :
741 : // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
742 : // intersection of our history and history from msg
743 :
744 : // truncate wal, update the LSNs
745 6263 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
746 :
747 : // and now adopt term history from proposer
748 : {
749 6263 : let mut state = self.state.start_change();
750 6263 :
751 6263 : // Here we learn initial LSN for the first time, set fields
752 6263 : // interested in that.
753 6263 :
754 6263 : if state.timeline_start_lsn == Lsn(0) {
755 : // Remember point where WAL begins globally.
756 1100 : state.timeline_start_lsn = msg.timeline_start_lsn;
757 1100 : info!(
758 0 : "setting timeline_start_lsn to {:?}",
759 : state.timeline_start_lsn
760 : );
761 5163 : }
762 6263 : if state.peer_horizon_lsn == Lsn(0) {
763 1100 : // Update peer_horizon_lsn as soon as we know where timeline starts.
764 1100 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
765 1100 : state.peer_horizon_lsn = msg.timeline_start_lsn;
766 5163 : }
767 6263 : if state.local_start_lsn == Lsn(0) {
768 1100 : state.local_start_lsn = msg.start_streaming_at;
769 1100 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
770 5163 : }
771 : // Initializing commit_lsn before acking first flushed record is
772 : // important to let find_end_of_wal skip the hole in the beginning
773 : // of the first segment.
774 : //
775 : // NB: on new clusters, this happens at the same time as
776 : // timeline_start_lsn initialization, it is taken outside to provide
777 : // upgrade.
778 6263 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
779 6263 :
780 6263 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
781 6263 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
782 6263 : // similar for remote_consistent_lsn
783 6263 : state.remote_consistent_lsn =
784 6263 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
785 6263 :
786 6263 : state.acceptor_state.term_history = msg.term_history.clone();
787 6263 : self.state.finish_change(&state).await?;
788 : }
789 :
790 6263 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
791 :
792 : // Cache LSN where term starts to immediately fsync control file with
793 : // commit_lsn once we reach it -- sync-safekeepers finishes when
794 : // persisted commit_lsn on majority of safekeepers aligns.
795 6263 : self.term_start_lsn = match msg.term_history.0.last() {
796 0 : None => bail!("proposer elected with empty term history"),
797 6263 : Some(term_lsn_start) => term_lsn_start.lsn,
798 6263 : };
799 6263 :
800 6263 : Ok(None)
801 6263 : }
802 :
803 : /// Advance commit_lsn taking into account what we have locally.
804 : ///
805 : /// Note: it is assumed that 'WAL we have is from the right term' check has
806 : /// already been done outside.
807 10145 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
808 10145 : // Both peers and walproposer communicate this value, we might already
809 10145 : // have a fresher (higher) version.
810 10145 : candidate = max(candidate, self.state.inmem.commit_lsn);
811 10145 : let commit_lsn = min(candidate, self.flush_lsn());
812 10145 : assert!(
813 10145 : commit_lsn >= self.state.inmem.commit_lsn,
814 0 : "commit_lsn monotonicity violated: old={} new={}",
815 : self.state.inmem.commit_lsn,
816 : commit_lsn
817 : );
818 :
819 10145 : self.state.inmem.commit_lsn = commit_lsn;
820 10145 :
821 10145 : // If new commit_lsn reached term switch, force sync of control
822 10145 : // file: walproposer in sync mode is very interested when this
823 10145 : // happens. Note: this is for sync-safekeepers mode only, as
824 10145 : // otherwise commit_lsn might jump over term_start_lsn.
825 10145 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
826 798 : self.state.flush().await?;
827 9347 : }
828 :
829 10145 : Ok(())
830 10145 : }
831 :
832 : /// Handle request to append WAL.
833 : #[allow(clippy::comparison_chain)]
834 20509 : async fn handle_append_request(
835 20509 : &mut self,
836 20509 : msg: &AppendRequest,
837 20509 : require_flush: bool,
838 20509 : ) -> Result<Option<AcceptorProposerMessage>> {
839 20509 : if self.state.acceptor_state.term < msg.h.term {
840 0 : bail!("got AppendRequest before ProposerElected");
841 20509 : }
842 20509 :
843 20509 : // If our term is higher, immediately refuse the message.
844 20509 : if self.state.acceptor_state.term > msg.h.term {
845 0 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
846 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
847 20509 : }
848 20509 :
849 20509 : // Now we know that we are in the same term as the proposer,
850 20509 : // processing the message.
851 20509 :
852 20509 : self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
853 20509 :
854 20509 : // do the job
855 20509 : if !msg.wal_data.is_empty() {
856 3837 : self.wal_store
857 3837 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
858 0 : .await?;
859 16672 : }
860 :
861 : // flush wal to the disk, if required
862 20509 : if require_flush {
863 4 : self.wal_store.flush_wal().await?;
864 20505 : }
865 :
866 : // Update commit_lsn.
867 20509 : if msg.h.commit_lsn != Lsn(0) {
868 10145 : self.update_commit_lsn(msg.h.commit_lsn).await?;
869 10364 : }
870 : // Value calculated by walproposer can always lag:
871 : // - safekeepers can forget inmem value and send to proposer lower
872 : // persisted one on restart;
873 : // - if we make safekeepers always send persistent value,
874 : // any compute restart would pull it down.
875 : // Thus, take max before adopting.
876 20509 : self.state.inmem.peer_horizon_lsn =
877 20509 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
878 20509 :
879 20509 : // Update truncate and commit LSN in control file.
880 20509 : // To avoid negative impact on performance of extra fsync, do it only
881 20509 : // when commit_lsn delta exceeds WAL segment size.
882 20509 : if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
883 20509 : < self.state.inmem.commit_lsn
884 : {
885 0 : self.state.flush().await?;
886 20509 : }
887 :
888 20509 : trace!(
889 0 : "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
890 0 : msg.wal_data.len(),
891 : msg.h.end_lsn,
892 : msg.h.commit_lsn,
893 : msg.h.truncate_lsn,
894 : require_flush,
895 : );
896 :
897 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
898 20509 : if !require_flush {
899 20505 : return Ok(None);
900 4 : }
901 4 :
902 4 : let resp = self.append_response();
903 4 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
904 20509 : }
905 :
906 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
907 16875 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
908 16875 : self.wal_store.flush_wal().await?;
909 16875 : Ok(Some(AcceptorProposerMessage::AppendResponse(
910 16875 : self.append_response(),
911 16875 : )))
912 16875 : }
913 :
914 : /// Update commit_lsn from peer safekeeper data.
915 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
916 0 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
917 : // Note: the check is too restrictive, generally we can update local
918 : // commit_lsn if our history matches (is part of) history of advanced
919 : // commit_lsn provider.
920 0 : if sk_info.last_log_term == self.get_last_log_term() {
921 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
922 0 : }
923 0 : }
924 0 : Ok(())
925 0 : }
926 : }
927 :
928 : #[cfg(test)]
929 : mod tests {
930 : use futures::future::BoxFuture;
931 : use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
932 :
933 : use super::*;
934 : use crate::{
935 : state::{EvictionState, PersistedPeers, TimelinePersistentState},
936 : wal_storage::Storage,
937 : };
938 : use std::{ops::Deref, str::FromStr, time::Instant};
939 :
940 : // fake storage for tests
941 : struct InMemoryState {
942 : persisted_state: TimelinePersistentState,
943 : }
944 :
945 : #[async_trait::async_trait]
946 : impl control_file::Storage for InMemoryState {
947 6 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
948 6 : self.persisted_state = s.clone();
949 6 : Ok(())
950 6 : }
951 :
952 0 : fn last_persist_at(&self) -> Instant {
953 0 : Instant::now()
954 0 : }
955 : }
956 :
957 : impl Deref for InMemoryState {
958 : type Target = TimelinePersistentState;
959 :
960 120 : fn deref(&self) -> &Self::Target {
961 120 : &self.persisted_state
962 120 : }
963 : }
964 :
965 4 : fn test_sk_state() -> TimelinePersistentState {
966 4 : let mut state = TimelinePersistentState::empty();
967 4 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
968 4 : state.tenant_id = TenantId::from([1u8; 16]);
969 4 : state.timeline_id = TimelineId::from([1u8; 16]);
970 4 : state
971 4 : }
972 :
973 : struct DummyWalStore {
974 : lsn: Lsn,
975 : }
976 :
977 : #[async_trait::async_trait]
978 : impl wal_storage::Storage for DummyWalStore {
979 20 : fn flush_lsn(&self) -> Lsn {
980 20 : self.lsn
981 20 : }
982 :
983 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
984 2 : Ok(())
985 2 : }
986 :
987 4 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
988 4 : self.lsn = startpos + buf.len() as u64;
989 4 : Ok(())
990 4 : }
991 :
992 4 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
993 4 : self.lsn = end_pos;
994 4 : Ok(())
995 4 : }
996 :
997 8 : async fn flush_wal(&mut self) -> Result<()> {
998 8 : Ok(())
999 8 : }
1000 :
1001 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1002 0 : Box::pin(async { Ok(()) })
1003 0 : }
1004 :
1005 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1006 0 : crate::metrics::WalStorageMetrics::default()
1007 0 : }
1008 : }
1009 :
1010 : #[tokio::test]
1011 2 : async fn test_voting() {
1012 2 : let storage = InMemoryState {
1013 2 : persisted_state: test_sk_state(),
1014 2 : };
1015 2 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1016 2 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1017 2 :
1018 2 : // check voting for 1 is ok
1019 2 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1020 2 : let mut vote_resp = sk.process_msg(&vote_request).await;
1021 2 : match vote_resp.unwrap() {
1022 2 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1023 2 : r => panic!("unexpected response: {:?}", r),
1024 2 : }
1025 2 :
1026 2 : // reboot...
1027 2 : let state = sk.state.deref().clone();
1028 2 : let storage = InMemoryState {
1029 2 : persisted_state: state,
1030 2 : };
1031 2 :
1032 2 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1033 2 :
1034 2 : // and ensure voting second time for 1 is not ok
1035 2 : vote_resp = sk.process_msg(&vote_request).await;
1036 2 : match vote_resp.unwrap() {
1037 2 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1038 2 : r => panic!("unexpected response: {:?}", r),
1039 2 : }
1040 2 : }
1041 :
1042 : #[tokio::test]
1043 2 : async fn test_last_log_term_switch() {
1044 2 : let storage = InMemoryState {
1045 2 : persisted_state: test_sk_state(),
1046 2 : };
1047 2 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1048 2 :
1049 2 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1050 2 :
1051 2 : let mut ar_hdr = AppendRequestHeader {
1052 2 : term: 1,
1053 2 : term_start_lsn: Lsn(3),
1054 2 : begin_lsn: Lsn(1),
1055 2 : end_lsn: Lsn(2),
1056 2 : commit_lsn: Lsn(0),
1057 2 : truncate_lsn: Lsn(0),
1058 2 : proposer_uuid: [0; 16],
1059 2 : };
1060 2 : let mut append_request = AppendRequest {
1061 2 : h: ar_hdr.clone(),
1062 2 : wal_data: Bytes::from_static(b"b"),
1063 2 : };
1064 2 :
1065 2 : let pem = ProposerElected {
1066 2 : term: 1,
1067 2 : start_streaming_at: Lsn(1),
1068 2 : term_history: TermHistory(vec![TermLsn {
1069 2 : term: 1,
1070 2 : lsn: Lsn(3),
1071 2 : }]),
1072 2 : timeline_start_lsn: Lsn(0),
1073 2 : };
1074 2 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1075 2 : .await
1076 2 : .unwrap();
1077 2 :
1078 2 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1079 2 : let resp = sk
1080 2 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1081 2 : .await;
1082 2 : assert!(resp.is_ok());
1083 2 : assert_eq!(sk.get_last_log_term(), 0);
1084 2 :
1085 2 : // but record at term_start_lsn does the switch
1086 2 : ar_hdr.begin_lsn = Lsn(2);
1087 2 : ar_hdr.end_lsn = Lsn(3);
1088 2 : append_request = AppendRequest {
1089 2 : h: ar_hdr,
1090 2 : wal_data: Bytes::from_static(b"b"),
1091 2 : };
1092 2 : let resp = sk
1093 2 : .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1094 2 : .await;
1095 2 : assert!(resp.is_ok());
1096 2 : sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
1097 2 : assert_eq!(sk.get_last_log_term(), 1);
1098 2 : }
1099 :
1100 : #[test]
1101 2 : fn test_find_highest_common_point_none() {
1102 2 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1103 2 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1104 2 : assert_eq!(
1105 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1106 2 : None
1107 2 : );
1108 2 : }
1109 :
1110 : #[test]
1111 2 : fn test_find_highest_common_point_middle() {
1112 2 : let prop_th = TermHistory(vec![
1113 2 : (1, Lsn(10)).into(),
1114 2 : (2, Lsn(20)).into(),
1115 2 : (4, Lsn(40)).into(),
1116 2 : ]);
1117 2 : let sk_th = TermHistory(vec![
1118 2 : (1, Lsn(10)).into(),
1119 2 : (2, Lsn(20)).into(),
1120 2 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1121 2 : ]);
1122 2 : assert_eq!(
1123 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1124 2 : Some(TermLsn {
1125 2 : term: 2,
1126 2 : lsn: Lsn(30),
1127 2 : })
1128 2 : );
1129 2 : }
1130 :
1131 : #[test]
1132 2 : fn test_find_highest_common_point_sk_end() {
1133 2 : let prop_th = TermHistory(vec![
1134 2 : (1, Lsn(10)).into(),
1135 2 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1136 2 : (4, Lsn(40)).into(),
1137 2 : ]);
1138 2 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1139 2 : assert_eq!(
1140 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1141 2 : Some(TermLsn {
1142 2 : term: 2,
1143 2 : lsn: Lsn(32),
1144 2 : })
1145 2 : );
1146 2 : }
1147 :
1148 : #[test]
1149 2 : fn test_find_highest_common_point_walprop() {
1150 2 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1151 2 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1152 2 : assert_eq!(
1153 2 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1154 2 : Some(TermLsn {
1155 2 : term: 2,
1156 2 : lsn: Lsn(32),
1157 2 : })
1158 2 : );
1159 2 : }
1160 :
1161 : #[test]
1162 2 : fn test_sk_state_bincode_serde_roundtrip() {
1163 2 : use utils::Hex;
1164 2 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1165 2 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1166 2 : let state = TimelinePersistentState {
1167 2 : tenant_id,
1168 2 : timeline_id,
1169 2 : acceptor_state: AcceptorState {
1170 2 : term: 42,
1171 2 : term_history: TermHistory(vec![TermLsn {
1172 2 : lsn: Lsn(0x1),
1173 2 : term: 41,
1174 2 : }]),
1175 2 : },
1176 2 : server: ServerInfo {
1177 2 : pg_version: 14,
1178 2 : system_id: 0x1234567887654321,
1179 2 : wal_seg_size: 0x12345678,
1180 2 : },
1181 2 : proposer_uuid: {
1182 2 : let mut arr = timeline_id.as_arr();
1183 2 : arr.reverse();
1184 2 : arr
1185 2 : },
1186 2 : timeline_start_lsn: Lsn(0x12345600),
1187 2 : local_start_lsn: Lsn(0x12),
1188 2 : commit_lsn: Lsn(1234567800),
1189 2 : backup_lsn: Lsn(1234567300),
1190 2 : peer_horizon_lsn: Lsn(9999999),
1191 2 : remote_consistent_lsn: Lsn(1234560000),
1192 2 : peers: PersistedPeers(vec![(
1193 2 : NodeId(1),
1194 2 : PersistedPeerInfo {
1195 2 : backup_lsn: Lsn(1234567000),
1196 2 : term: 42,
1197 2 : flush_lsn: Lsn(1234567800 - 8),
1198 2 : commit_lsn: Lsn(1234567600),
1199 2 : },
1200 2 : )]),
1201 2 : partial_backup: crate::wal_backup_partial::State::default(),
1202 2 : eviction_state: EvictionState::Present,
1203 2 : };
1204 2 :
1205 2 : let ser = state.ser().unwrap();
1206 2 :
1207 2 : #[rustfmt::skip]
1208 2 : let expected = [
1209 2 : // tenant_id as length prefixed hex
1210 2 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1211 2 : 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
1212 2 : // timeline_id as length prefixed hex
1213 2 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1214 2 : 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
1215 2 : // term
1216 2 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1217 2 : // length prefix
1218 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1219 2 : // unsure why this order is swapped
1220 2 : 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1221 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1222 2 : // pg_version
1223 2 : 0x0e, 0x00, 0x00, 0x00,
1224 2 : // systemid
1225 2 : 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
1226 2 : // wal_seg_size
1227 2 : 0x78, 0x56, 0x34, 0x12,
1228 2 : // pguuid as length prefixed hex
1229 2 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1230 2 : 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
1231 2 :
1232 2 : // timeline_start_lsn
1233 2 : 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
1234 2 : 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1235 2 : 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1236 2 : 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1237 2 : 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
1238 2 : 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1239 2 : // length prefix for persistentpeers
1240 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1241 2 : // nodeid
1242 2 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1243 2 : // backuplsn
1244 2 : 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1245 2 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1246 2 : 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1247 2 : 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1248 2 : // partial_backup
1249 2 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1250 2 : // eviction_state
1251 2 : 0x00, 0x00, 0x00, 0x00,
1252 2 : ];
1253 2 :
1254 2 : assert_eq!(Hex(&ser), Hex(&expected));
1255 :
1256 2 : let deser = TimelinePersistentState::des(&ser).unwrap();
1257 2 :
1258 2 : assert_eq!(deser, state);
1259 2 : }
1260 : }
|