Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
8 : use serde::{Deserialize, Serialize};
9 : use std::cmp::max;
10 : use std::cmp::min;
11 : use std::fmt;
12 : use std::io::Read;
13 : use storage_broker::proto::SafekeeperTimelineInfo;
14 :
15 : use tracing::*;
16 :
17 : use crate::control_file;
18 : use crate::metrics::MISC_OPERATION_SECONDS;
19 : use crate::send_wal::HotStandbyFeedback;
20 :
21 : use crate::state::TimelineState;
22 : use crate::wal_storage;
23 : use pq_proto::SystemId;
24 : use utils::pageserver_feedback::PageserverFeedback;
25 : use utils::{
26 : bin_ser::LeSer,
27 : id::{NodeId, TenantId, TimelineId},
28 : lsn::Lsn,
29 : };
30 :
31 : const SK_PROTOCOL_VERSION: u32 = 2;
32 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
33 :
34 : /// Consensus logical timestamp.
35 : pub type Term = u64;
36 : pub const INVALID_TERM: Term = 0;
37 :
38 4 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
39 : pub struct TermLsn {
40 : pub term: Term,
41 : pub lsn: Lsn,
42 : }
43 :
44 : // Creation from tuple provides less typing (e.g. for unit tests).
45 : impl From<(Term, Lsn)> for TermLsn {
46 18 : fn from(pair: (Term, Lsn)) -> TermLsn {
47 18 : TermLsn {
48 18 : term: pair.0,
49 18 : lsn: pair.1,
50 18 : }
51 18 : }
52 : }
53 :
54 5 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
55 : pub struct TermHistory(pub Vec<TermLsn>);
56 :
57 : impl TermHistory {
58 1462 : pub fn empty() -> TermHistory {
59 1462 : TermHistory(Vec::new())
60 1462 : }
61 :
62 : // Parse TermHistory as n_entries followed by TermLsn pairs
63 726 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
64 726 : if bytes.remaining() < 4 {
65 0 : bail!("TermHistory misses len");
66 726 : }
67 726 : let n_entries = bytes.get_u32_le();
68 726 : let mut res = Vec::with_capacity(n_entries as usize);
69 726 : for _ in 0..n_entries {
70 5916 : if bytes.remaining() < 16 {
71 0 : bail!("TermHistory is incomplete");
72 5916 : }
73 5916 : res.push(TermLsn {
74 5916 : term: bytes.get_u64_le(),
75 5916 : lsn: bytes.get_u64_le().into(),
76 5916 : })
77 : }
78 726 : Ok(TermHistory(res))
79 726 : }
80 :
81 : /// Return copy of self with switches happening strictly after up_to
82 : /// truncated.
83 3548 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
84 3548 : let mut res = Vec::with_capacity(self.0.len());
85 17882 : for e in &self.0 {
86 14346 : if e.lsn > up_to {
87 12 : break;
88 14334 : }
89 14334 : res.push(*e);
90 : }
91 3548 : TermHistory(res)
92 3548 : }
93 :
94 : /// Find point of divergence between leader (walproposer) term history and
95 : /// safekeeper. Arguments are not symmetric as proposer history ends at
96 : /// +infinity while safekeeper at flush_lsn.
97 : /// C version is at walproposer SendProposerElected.
98 732 : pub fn find_highest_common_point(
99 732 : prop_th: &TermHistory,
100 732 : sk_th: &TermHistory,
101 732 : sk_wal_end: Lsn,
102 732 : ) -> Option<TermLsn> {
103 732 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
104 :
105 732 : if let Some(sk_th_last) = sk_th.last() {
106 591 : assert!(
107 591 : sk_th_last.lsn <= sk_wal_end,
108 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
109 : sk_th_last,
110 : sk_wal_end
111 : );
112 141 : }
113 :
114 : // find last common term, if any...
115 732 : let mut last_common_idx = None;
116 5128 : for i in 0..min(sk_th.len(), prop_th.len()) {
117 5128 : if prop_th[i].term != sk_th[i].term {
118 5 : break;
119 5123 : }
120 5123 : // If term is the same, LSN must be equal as well.
121 5123 : assert!(
122 5123 : prop_th[i].lsn == sk_th[i].lsn,
123 0 : "same term {} has different start LSNs: prop {}, sk {}",
124 0 : prop_th[i].term,
125 0 : prop_th[i].lsn,
126 0 : sk_th[i].lsn
127 : );
128 5123 : last_common_idx = Some(i);
129 : }
130 732 : let last_common_idx = match last_common_idx {
131 142 : None => return None, // no common point
132 590 : Some(lci) => lci,
133 590 : };
134 590 : // Now find where it ends at both prop and sk and take min. End of
135 590 : // (common) term is the start of the next except it is the last one;
136 590 : // there it is flush_lsn in case of safekeeper or, in case of proposer
137 590 : // +infinity, so we just take flush_lsn then.
138 590 : if last_common_idx == prop_th.len() - 1 {
139 52 : Some(TermLsn {
140 52 : term: prop_th[last_common_idx].term,
141 52 : lsn: sk_wal_end,
142 52 : })
143 : } else {
144 538 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
145 538 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
146 4 : sk_th[last_common_idx + 1].lsn
147 : } else {
148 534 : sk_wal_end
149 : };
150 538 : Some(TermLsn {
151 538 : term: prop_th[last_common_idx].term,
152 538 : lsn: min(prop_common_term_end, sk_common_term_end),
153 538 : })
154 : }
155 732 : }
156 : }
157 :
158 : /// Display only latest entries for Debug.
159 : impl fmt::Debug for TermHistory {
160 341 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
161 341 : let n_printed = 20;
162 341 : write!(
163 341 : fmt,
164 341 : "{}{:?}",
165 341 : if self.0.len() > n_printed { "... " } else { "" },
166 341 : self.0
167 341 : .iter()
168 341 : .rev()
169 341 : .take(n_printed)
170 1280 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
171 341 : .collect::<Vec<_>>()
172 341 : )
173 341 : }
174 : }
175 :
176 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
177 : pub type PgUuid = [u8; 16];
178 :
179 : /// Persistent consensus state of the acceptor.
180 5 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
181 : pub struct AcceptorState {
182 : /// acceptor's last term it voted for (advanced in 1 phase)
183 : pub term: Term,
184 : /// History of term switches for safekeeper's WAL.
185 : /// Actually it often goes *beyond* WAL contents as we adopt term history
186 : /// from the proposer before recovery.
187 : pub term_history: TermHistory,
188 : }
189 :
190 : impl AcceptorState {
191 : /// acceptor's last_log_term is the term of the highest entry in the log
192 43 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
193 43 : let th = self.term_history.up_to(flush_lsn);
194 43 : match th.0.last() {
195 37 : Some(e) => e.term,
196 6 : None => 0,
197 : }
198 43 : }
199 : }
200 :
201 : /// Information about Postgres. Safekeeper gets it once and then verifies
202 : /// all further connections from computes match.
203 3 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204 : pub struct ServerInfo {
205 : /// Postgres server version
206 : pub pg_version: u32,
207 : pub system_id: SystemId,
208 : pub wal_seg_size: u32,
209 : }
210 :
211 2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212 : pub struct PersistedPeerInfo {
213 : /// LSN up to which safekeeper offloaded WAL to s3.
214 : pub backup_lsn: Lsn,
215 : /// Term of the last entry.
216 : pub term: Term,
217 : /// LSN of the last record.
218 : pub flush_lsn: Lsn,
219 : /// Up to which LSN safekeeper regards its WAL as committed.
220 : pub commit_lsn: Lsn,
221 : }
222 :
223 : impl PersistedPeerInfo {
224 0 : pub fn new() -> Self {
225 0 : Self {
226 0 : backup_lsn: Lsn::INVALID,
227 0 : term: INVALID_TERM,
228 0 : flush_lsn: Lsn(0),
229 0 : commit_lsn: Lsn(0),
230 0 : }
231 0 : }
232 : }
233 :
234 : // make clippy happy
235 : impl Default for PersistedPeerInfo {
236 0 : fn default() -> Self {
237 0 : Self::new()
238 0 : }
239 : }
240 :
241 : // protocol messages
242 :
243 : /// Initial Proposer -> Acceptor message
244 19976 : #[derive(Debug, Deserialize)]
245 : pub struct ProposerGreeting {
246 : /// proposer-acceptor protocol version
247 : pub protocol_version: u32,
248 : /// Postgres server version
249 : pub pg_version: u32,
250 : pub proposer_id: PgUuid,
251 : pub system_id: SystemId,
252 : pub timeline_id: TimelineId,
253 : pub tenant_id: TenantId,
254 : pub tli: TimeLineID,
255 : pub wal_seg_size: u32,
256 : }
257 :
258 : /// Acceptor -> Proposer initial response: the highest term known to me
259 : /// (acceptor voted for).
260 : #[derive(Debug, Serialize)]
261 : pub struct AcceptorGreeting {
262 : term: u64,
263 : node_id: NodeId,
264 : }
265 :
266 : /// Vote request sent from proposer to safekeepers
267 2775 : #[derive(Debug, Deserialize)]
268 : pub struct VoteRequest {
269 : pub term: Term,
270 : }
271 :
272 : /// Vote itself, sent from safekeeper to proposer
273 : #[derive(Debug, Serialize)]
274 : pub struct VoteResponse {
275 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
276 : vote_given: u64, // fixme u64 due to padding
277 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
278 : // proposer to choose the most advanced one.
279 : pub flush_lsn: Lsn,
280 : truncate_lsn: Lsn,
281 : pub term_history: TermHistory,
282 : timeline_start_lsn: Lsn,
283 : }
284 :
285 : /*
286 : * Proposer -> Acceptor message announcing proposer is elected and communicating
287 : * term history to it.
288 : */
289 : #[derive(Debug)]
290 : pub struct ProposerElected {
291 : pub term: Term,
292 : pub start_streaming_at: Lsn,
293 : pub term_history: TermHistory,
294 : pub timeline_start_lsn: Lsn,
295 : }
296 :
297 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
298 : /// communicates commit_lsn.
299 : #[derive(Debug)]
300 : pub struct AppendRequest {
301 : pub h: AppendRequestHeader,
302 : pub wal_data: Bytes,
303 : }
304 3049 : #[derive(Debug, Clone, Deserialize)]
305 : pub struct AppendRequestHeader {
306 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
307 : pub term: Term,
308 : // TODO: remove this field from the protocol, it in unused -- LSN of term
309 : // switch can be taken from ProposerElected (as well as from term history).
310 : pub term_start_lsn: Lsn,
311 : /// start position of message in WAL
312 : pub begin_lsn: Lsn,
313 : /// end position of message in WAL
314 : pub end_lsn: Lsn,
315 : /// LSN committed by quorum of safekeepers
316 : pub commit_lsn: Lsn,
317 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
318 : pub truncate_lsn: Lsn,
319 : // only for logging/debugging
320 : pub proposer_uuid: PgUuid,
321 : }
322 :
323 : /// Report safekeeper state to proposer
324 : #[derive(Debug, Serialize, Clone)]
325 : pub struct AppendResponse {
326 : // Current term of the safekeeper; if it is higher than proposer's, the
327 : // compute is out of date.
328 : pub term: Term,
329 : // Flushed end of wal on safekeeper; one should be always mindful from what
330 : // term history this value comes, either checking history directly or
331 : // observing term being set to one for which WAL truncation is known to have
332 : // happened.
333 : pub flush_lsn: Lsn,
334 : // We report back our awareness about which WAL is committed, as this is
335 : // a criterion for walproposer --sync mode exit
336 : pub commit_lsn: Lsn,
337 : pub hs_feedback: HotStandbyFeedback,
338 : pub pageserver_feedback: Option<PageserverFeedback>,
339 : }
340 :
341 : impl AppendResponse {
342 0 : fn term_only(term: Term) -> AppendResponse {
343 0 : AppendResponse {
344 0 : term,
345 0 : flush_lsn: Lsn(0),
346 0 : commit_lsn: Lsn(0),
347 0 : hs_feedback: HotStandbyFeedback::empty(),
348 0 : pageserver_feedback: None,
349 0 : }
350 0 : }
351 : }
352 :
353 : /// Proposer -> Acceptor messages
354 : #[derive(Debug)]
355 : pub enum ProposerAcceptorMessage {
356 : Greeting(ProposerGreeting),
357 : VoteRequest(VoteRequest),
358 : Elected(ProposerElected),
359 : AppendRequest(AppendRequest),
360 : NoFlushAppendRequest(AppendRequest),
361 : FlushWAL,
362 : }
363 :
364 : impl ProposerAcceptorMessage {
365 : /// Parse proposer message.
366 26526 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
367 26526 : // xxx using Reader is inefficient but easy to work with bincode
368 26526 : let mut stream = msg_bytes.reader();
369 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
370 26526 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
371 26526 : match tag {
372 : 'g' => {
373 19976 : let msg = ProposerGreeting::des_from(&mut stream)?;
374 19976 : Ok(ProposerAcceptorMessage::Greeting(msg))
375 : }
376 : 'v' => {
377 2775 : let msg = VoteRequest::des_from(&mut stream)?;
378 2775 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
379 : }
380 : 'e' => {
381 726 : let mut msg_bytes = stream.into_inner();
382 726 : if msg_bytes.remaining() < 16 {
383 0 : bail!("ProposerElected message is not complete");
384 726 : }
385 726 : let term = msg_bytes.get_u64_le();
386 726 : let start_streaming_at = msg_bytes.get_u64_le().into();
387 726 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
388 726 : if msg_bytes.remaining() < 8 {
389 0 : bail!("ProposerElected message is not complete");
390 726 : }
391 726 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
392 726 : let msg = ProposerElected {
393 726 : term,
394 726 : start_streaming_at,
395 726 : timeline_start_lsn,
396 726 : term_history,
397 726 : };
398 726 : Ok(ProposerAcceptorMessage::Elected(msg))
399 : }
400 : 'a' => {
401 : // read header followed by wal data
402 3049 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
403 3049 : let rec_size = hdr
404 3049 : .end_lsn
405 3049 : .checked_sub(hdr.begin_lsn)
406 3049 : .context("begin_lsn > end_lsn in AppendRequest")?
407 : .0 as usize;
408 3049 : if rec_size > MAX_SEND_SIZE {
409 0 : bail!(
410 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
411 0 : MAX_SEND_SIZE
412 0 : );
413 3049 : }
414 3049 :
415 3049 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
416 3049 : stream.read_exact(&mut wal_data_vec)?;
417 3049 : let wal_data = Bytes::from(wal_data_vec);
418 3049 : let msg = AppendRequest { h: hdr, wal_data };
419 3049 :
420 3049 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
421 : }
422 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
423 : }
424 26526 : }
425 :
426 : /// The memory size of the message, including byte slices.
427 0 : pub fn size(&self) -> usize {
428 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
429 :
430 : // For most types, the size is just the base enum size including the nested structs. Some
431 : // types also contain byte slices; add them.
432 : //
433 : // We explicitly list all fields, to draw attention here when new fields are added.
434 0 : let mut size = BASE_SIZE;
435 0 : size += match self {
436 : Self::Greeting(ProposerGreeting {
437 : protocol_version: _,
438 : pg_version: _,
439 : proposer_id: _,
440 : system_id: _,
441 : timeline_id: _,
442 : tenant_id: _,
443 : tli: _,
444 : wal_seg_size: _,
445 0 : }) => 0,
446 :
447 0 : Self::VoteRequest(VoteRequest { term: _ }) => 0,
448 :
449 : Self::Elected(ProposerElected {
450 : term: _,
451 : start_streaming_at: _,
452 : term_history: _,
453 : timeline_start_lsn: _,
454 0 : }) => 0,
455 :
456 : Self::AppendRequest(AppendRequest {
457 : h:
458 : AppendRequestHeader {
459 : term: _,
460 : term_start_lsn: _,
461 : begin_lsn: _,
462 : end_lsn: _,
463 : commit_lsn: _,
464 : truncate_lsn: _,
465 : proposer_uuid: _,
466 : },
467 0 : wal_data,
468 0 : }) => wal_data.len(),
469 :
470 : Self::NoFlushAppendRequest(AppendRequest {
471 : h:
472 : AppendRequestHeader {
473 : term: _,
474 : term_start_lsn: _,
475 : begin_lsn: _,
476 : end_lsn: _,
477 : commit_lsn: _,
478 : truncate_lsn: _,
479 : proposer_uuid: _,
480 : },
481 0 : wal_data,
482 0 : }) => wal_data.len(),
483 :
484 0 : Self::FlushWAL => 0,
485 : };
486 :
487 0 : size
488 0 : }
489 : }
490 :
491 : /// Acceptor -> Proposer messages
492 : #[derive(Debug)]
493 : pub enum AcceptorProposerMessage {
494 : Greeting(AcceptorGreeting),
495 : VoteResponse(VoteResponse),
496 : AppendResponse(AppendResponse),
497 : }
498 :
499 : impl AcceptorProposerMessage {
500 : /// Serialize acceptor -> proposer message.
501 25279 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
502 25279 : match self {
503 19976 : AcceptorProposerMessage::Greeting(msg) => {
504 19976 : buf.put_u64_le('g' as u64);
505 19976 : buf.put_u64_le(msg.term);
506 19976 : buf.put_u64_le(msg.node_id.0);
507 19976 : }
508 2775 : AcceptorProposerMessage::VoteResponse(msg) => {
509 2775 : buf.put_u64_le('v' as u64);
510 2775 : buf.put_u64_le(msg.term);
511 2775 : buf.put_u64_le(msg.vote_given);
512 2775 : buf.put_u64_le(msg.flush_lsn.into());
513 2775 : buf.put_u64_le(msg.truncate_lsn.into());
514 2775 : buf.put_u32_le(msg.term_history.0.len() as u32);
515 11755 : for e in &msg.term_history.0 {
516 8980 : buf.put_u64_le(e.term);
517 8980 : buf.put_u64_le(e.lsn.into());
518 8980 : }
519 2775 : buf.put_u64_le(msg.timeline_start_lsn.into());
520 : }
521 2528 : AcceptorProposerMessage::AppendResponse(msg) => {
522 2528 : buf.put_u64_le('a' as u64);
523 2528 : buf.put_u64_le(msg.term);
524 2528 : buf.put_u64_le(msg.flush_lsn.into());
525 2528 : buf.put_u64_le(msg.commit_lsn.into());
526 2528 : buf.put_i64_le(msg.hs_feedback.ts);
527 2528 : buf.put_u64_le(msg.hs_feedback.xmin);
528 2528 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
529 :
530 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
531 : // if it is not present.
532 2528 : if let Some(ref msg) = msg.pageserver_feedback {
533 0 : msg.serialize(buf);
534 2528 : }
535 : }
536 : }
537 :
538 25279 : Ok(())
539 25279 : }
540 : }
541 :
542 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
543 : /// It controls all WAL disk writes and updates of control file.
544 : ///
545 : /// Currently safekeeper processes:
546 : /// - messages from compute (proposers) and provides replies
547 : /// - messages from broker peers
548 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
549 : /// LSN since the proposer safekeeper currently talking to appends WAL;
550 : /// determines last_log_term switch point.
551 : pub term_start_lsn: Lsn,
552 :
553 : pub state: TimelineState<CTRL>, // persistent state storage
554 : pub wal_store: WAL,
555 :
556 : node_id: NodeId, // safekeeper's node id
557 : }
558 :
559 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
560 : where
561 : CTRL: control_file::Storage,
562 : WAL: wal_storage::Storage,
563 : {
564 : /// Accepts a control file storage containing the safekeeper state.
565 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
566 : /// and `server` (`wal_seg_size` inside it) fields.
567 8896 : pub fn new(
568 8896 : state: TimelineState<CTRL>,
569 8896 : wal_store: WAL,
570 8896 : node_id: NodeId,
571 8896 : ) -> Result<SafeKeeper<CTRL, WAL>> {
572 8896 : if state.tenant_id == TenantId::from([0u8; 16])
573 8896 : || state.timeline_id == TimelineId::from([0u8; 16])
574 : {
575 0 : bail!(
576 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
577 0 : state.tenant_id,
578 0 : state.timeline_id
579 0 : );
580 8896 : }
581 8896 :
582 8896 : Ok(SafeKeeper {
583 8896 : term_start_lsn: Lsn(0),
584 8896 : state,
585 8896 : wal_store,
586 8896 : node_id,
587 8896 : })
588 8896 : }
589 :
590 : /// Get history of term switches for the available WAL
591 3505 : fn get_term_history(&self) -> TermHistory {
592 3505 : self.state
593 3505 : .acceptor_state
594 3505 : .term_history
595 3505 : .up_to(self.flush_lsn())
596 3505 : }
597 :
598 43 : pub fn get_last_log_term(&self) -> Term {
599 43 : self.state
600 43 : .acceptor_state
601 43 : .get_last_log_term(self.flush_lsn())
602 43 : }
603 :
604 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
605 14511 : pub fn flush_lsn(&self) -> Lsn {
606 14511 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
607 14511 : }
608 :
609 : /// Process message from proposer and possibly form reply. Concurrent
610 : /// callers must exclude each other.
611 29062 : pub async fn process_msg(
612 29062 : &mut self,
613 29062 : msg: &ProposerAcceptorMessage,
614 29062 : ) -> Result<Option<AcceptorProposerMessage>> {
615 29062 : match msg {
616 19976 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
617 2777 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
618 728 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
619 4 : ProposerAcceptorMessage::AppendRequest(msg) => {
620 4 : self.handle_append_request(msg, true).await
621 : }
622 3049 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
623 3049 : self.handle_append_request(msg, false).await
624 : }
625 2528 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
626 : }
627 29062 : }
628 :
629 : /// Handle initial message from proposer: check its sanity and send my
630 : /// current term.
631 19976 : async fn handle_greeting(
632 19976 : &mut self,
633 19976 : msg: &ProposerGreeting,
634 19976 : ) -> Result<Option<AcceptorProposerMessage>> {
635 19976 : // Check protocol compatibility
636 19976 : if msg.protocol_version != SK_PROTOCOL_VERSION {
637 0 : bail!(
638 0 : "incompatible protocol version {}, expected {}",
639 0 : msg.protocol_version,
640 0 : SK_PROTOCOL_VERSION
641 0 : );
642 19976 : }
643 19976 : /* Postgres major version mismatch is treated as fatal error
644 19976 : * because safekeepers parse WAL headers and the format
645 19976 : * may change between versions.
646 19976 : */
647 19976 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
648 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
649 : {
650 0 : bail!(
651 0 : "incompatible server version {}, expected {}",
652 0 : msg.pg_version,
653 0 : self.state.server.pg_version
654 0 : );
655 19976 : }
656 19976 :
657 19976 : if msg.tenant_id != self.state.tenant_id {
658 0 : bail!(
659 0 : "invalid tenant ID, got {}, expected {}",
660 0 : msg.tenant_id,
661 0 : self.state.tenant_id
662 0 : );
663 19976 : }
664 19976 : if msg.timeline_id != self.state.timeline_id {
665 0 : bail!(
666 0 : "invalid timeline ID, got {}, expected {}",
667 0 : msg.timeline_id,
668 0 : self.state.timeline_id
669 0 : );
670 19976 : }
671 19976 : if self.state.server.wal_seg_size != msg.wal_seg_size {
672 0 : bail!(
673 0 : "invalid wal_seg_size, got {}, expected {}",
674 0 : msg.wal_seg_size,
675 0 : self.state.server.wal_seg_size
676 0 : );
677 19976 : }
678 19976 :
679 19976 : // system_id will be updated on mismatch
680 19976 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
681 19976 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
682 0 : if self.state.server.system_id != 0 {
683 0 : warn!(
684 0 : "unexpected system ID arrived, got {}, expected {}",
685 0 : msg.system_id, self.state.server.system_id
686 : );
687 0 : }
688 :
689 0 : let mut state = self.state.start_change();
690 0 : state.server.system_id = msg.system_id;
691 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
692 0 : state.server.pg_version = msg.pg_version;
693 0 : }
694 0 : self.state.finish_change(&state).await?;
695 19976 : }
696 :
697 19976 : info!(
698 0 : "processed greeting from walproposer {}, sending term {:?}",
699 2416 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
700 0 : self.state.acceptor_state.term
701 : );
702 19976 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
703 19976 : term: self.state.acceptor_state.term,
704 19976 : node_id: self.node_id,
705 19976 : })))
706 19976 : }
707 :
708 : /// Give vote for the given term, if we haven't done that previously.
709 2777 : async fn handle_vote_request(
710 2777 : &mut self,
711 2777 : msg: &VoteRequest,
712 2777 : ) -> Result<Option<AcceptorProposerMessage>> {
713 2777 : // Once voted, we won't accept data from older proposers; flush
714 2777 : // everything we've already received so that new proposer starts
715 2777 : // streaming at end of our WAL, without overlap. Currently we truncate
716 2777 : // WAL at streaming point, so this avoids truncating already committed
717 2777 : // WAL.
718 2777 : //
719 2777 : // TODO: it would be smoother to not truncate committed piece at
720 2777 : // handle_elected instead. Currently not a big deal, as proposer is the
721 2777 : // only source of WAL; with peer2peer recovery it would be more
722 2777 : // important.
723 2777 : self.wal_store.flush_wal().await?;
724 : // initialize with refusal
725 2777 : let mut resp = VoteResponse {
726 2777 : term: self.state.acceptor_state.term,
727 2777 : vote_given: false as u64,
728 2777 : flush_lsn: self.flush_lsn(),
729 2777 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
730 2777 : term_history: self.get_term_history(),
731 2777 : timeline_start_lsn: self.state.timeline_start_lsn,
732 2777 : };
733 2777 : if self.state.acceptor_state.term < msg.term {
734 2637 : let mut state = self.state.start_change();
735 2637 : state.acceptor_state.term = msg.term;
736 2637 : // persist vote before sending it out
737 2637 : self.state.finish_change(&state).await?;
738 :
739 2637 : resp.term = self.state.acceptor_state.term;
740 2637 : resp.vote_given = true as u64;
741 140 : }
742 2777 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
743 2777 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
744 2777 : }
745 :
746 : /// Form AppendResponse from current state.
747 2531 : fn append_response(&self) -> AppendResponse {
748 2531 : let ar = AppendResponse {
749 2531 : term: self.state.acceptor_state.term,
750 2531 : flush_lsn: self.flush_lsn(),
751 2531 : commit_lsn: self.state.commit_lsn,
752 2531 : // will be filled by the upper code to avoid bothering safekeeper
753 2531 : hs_feedback: HotStandbyFeedback::empty(),
754 2531 : pageserver_feedback: None,
755 2531 : };
756 2531 : trace!("formed AppendResponse {:?}", ar);
757 2531 : ar
758 2531 : }
759 :
760 728 : async fn handle_elected(
761 728 : &mut self,
762 728 : msg: &ProposerElected,
763 728 : ) -> Result<Option<AcceptorProposerMessage>> {
764 728 : let _timer = MISC_OPERATION_SECONDS
765 728 : .with_label_values(&["handle_elected"])
766 728 : .start_timer();
767 728 :
768 728 : info!(
769 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
770 0 : msg,
771 0 : self.state.acceptor_state.term,
772 0 : self.get_last_log_term(),
773 0 : self.flush_lsn()
774 : );
775 728 : if self.state.acceptor_state.term < msg.term {
776 2 : let mut state = self.state.start_change();
777 2 : state.acceptor_state.term = msg.term;
778 2 : self.state.finish_change(&state).await?;
779 726 : }
780 :
781 : // If our term is higher, ignore the message (next feedback will inform the compute)
782 728 : if self.state.acceptor_state.term > msg.term {
783 0 : return Ok(None);
784 728 : }
785 728 :
786 728 : // Before truncating WAL check-cross the check divergence point received
787 728 : // from the walproposer.
788 728 : let sk_th = self.get_term_history();
789 728 : let last_common_point = match TermHistory::find_highest_common_point(
790 728 : &msg.term_history,
791 728 : &sk_th,
792 728 : self.flush_lsn(),
793 728 : ) {
794 : // No common point. Expect streaming from the beginning of the
795 : // history like walproposer while we don't have proper init.
796 141 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
797 141 : "empty walproposer term history {:?}",
798 141 : msg.term_history
799 141 : ))?,
800 587 : Some(lcp) => lcp,
801 : };
802 : // This is expected to happen in a rare race when another connection
803 : // from the same walproposer writes + flushes WAL after this connection
804 : // sent flush_lsn in VoteRequest; for instance, very late
805 : // ProposerElected message delivery after another connection was
806 : // established and wrote WAL. In such cases error is transient;
807 : // reconnection makes safekeeper send newest term history and flush_lsn
808 : // and walproposer recalculates the streaming point. OTOH repeating
809 : // error indicates a serious bug.
810 728 : if last_common_point.lsn != msg.start_streaming_at {
811 0 : bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
812 0 : last_common_point, msg.start_streaming_at,
813 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
814 0 : );
815 728 : }
816 728 :
817 728 : // We are also expected to never attempt to truncate committed data.
818 728 : assert!(
819 728 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
820 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
821 0 : msg.start_streaming_at, self.state.inmem.commit_lsn,
822 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
823 : );
824 :
825 : // Before first WAL write initialize its segment. It makes first segment
826 : // pg_waldump'able because stream from compute doesn't include its
827 : // segment and page headers.
828 : //
829 : // If we fail before first WAL write flush this action would be
830 : // repeated, that's ok because it is idempotent.
831 728 : if self.wal_store.flush_lsn() == Lsn::INVALID {
832 141 : self.wal_store
833 141 : .initialize_first_segment(msg.start_streaming_at)
834 0 : .await?;
835 587 : }
836 :
837 : // truncate wal, update the LSNs
838 728 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
839 :
840 : // and now adopt term history from proposer
841 : {
842 728 : let mut state = self.state.start_change();
843 728 :
844 728 : // Here we learn initial LSN for the first time, set fields
845 728 : // interested in that.
846 728 :
847 728 : if state.timeline_start_lsn == Lsn(0) {
848 : // Remember point where WAL begins globally.
849 141 : state.timeline_start_lsn = msg.timeline_start_lsn;
850 141 : info!(
851 0 : "setting timeline_start_lsn to {:?}",
852 : state.timeline_start_lsn
853 : );
854 587 : }
855 728 : if state.peer_horizon_lsn == Lsn(0) {
856 141 : // Update peer_horizon_lsn as soon as we know where timeline starts.
857 141 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
858 141 : state.peer_horizon_lsn = msg.timeline_start_lsn;
859 587 : }
860 728 : if state.local_start_lsn == Lsn(0) {
861 141 : state.local_start_lsn = msg.start_streaming_at;
862 141 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
863 587 : }
864 : // Initializing commit_lsn before acking first flushed record is
865 : // important to let find_end_of_wal skip the hole in the beginning
866 : // of the first segment.
867 : //
868 : // NB: on new clusters, this happens at the same time as
869 : // timeline_start_lsn initialization, it is taken outside to provide
870 : // upgrade.
871 728 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
872 728 :
873 728 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
874 728 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
875 728 : // similar for remote_consistent_lsn
876 728 : state.remote_consistent_lsn =
877 728 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
878 728 :
879 728 : state.acceptor_state.term_history = msg.term_history.clone();
880 728 : self.state.finish_change(&state).await?;
881 : }
882 :
883 728 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
884 :
885 : // Cache LSN where term starts to immediately fsync control file with
886 : // commit_lsn once we reach it -- sync-safekeepers finishes when
887 : // persisted commit_lsn on majority of safekeepers aligns.
888 728 : self.term_start_lsn = match msg.term_history.0.last() {
889 0 : None => bail!("proposer elected with empty term history"),
890 728 : Some(term_lsn_start) => term_lsn_start.lsn,
891 728 : };
892 728 :
893 728 : Ok(None)
894 728 : }
895 :
896 : /// Advance commit_lsn taking into account what we have locally.
897 : ///
898 : /// Note: it is assumed that 'WAL we have is from the right term' check has
899 : /// already been done outside.
900 1837 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
901 1837 : // Both peers and walproposer communicate this value, we might already
902 1837 : // have a fresher (higher) version.
903 1837 : candidate = max(candidate, self.state.inmem.commit_lsn);
904 1837 : let commit_lsn = min(candidate, self.flush_lsn());
905 1837 : assert!(
906 1837 : commit_lsn >= self.state.inmem.commit_lsn,
907 0 : "commit_lsn monotonicity violated: old={} new={}",
908 : self.state.inmem.commit_lsn,
909 : commit_lsn
910 : );
911 :
912 1837 : self.state.inmem.commit_lsn = commit_lsn;
913 1837 :
914 1837 : // If new commit_lsn reached term switch, force sync of control
915 1837 : // file: walproposer in sync mode is very interested when this
916 1837 : // happens. Note: this is for sync-safekeepers mode only, as
917 1837 : // otherwise commit_lsn might jump over term_start_lsn.
918 1837 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
919 91 : self.state.flush().await?;
920 1746 : }
921 :
922 1837 : Ok(())
923 1837 : }
924 :
925 : /// Handle request to append WAL.
926 : #[allow(clippy::comparison_chain)]
927 3053 : async fn handle_append_request(
928 3053 : &mut self,
929 3053 : msg: &AppendRequest,
930 3053 : require_flush: bool,
931 3053 : ) -> Result<Option<AcceptorProposerMessage>> {
932 3053 : if self.state.acceptor_state.term < msg.h.term {
933 0 : bail!("got AppendRequest before ProposerElected");
934 3053 : }
935 3053 :
936 3053 : // If our term is higher, immediately refuse the message.
937 3053 : if self.state.acceptor_state.term > msg.h.term {
938 0 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
939 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
940 3053 : }
941 3053 :
942 3053 : // Disallow any non-sequential writes, which can result in gaps or
943 3053 : // overwrites. If we need to move the pointer, ProposerElected message
944 3053 : // should have truncated WAL first accordingly. Note that the first
945 3053 : // condition (WAL rewrite) is quite expected in real world; it happens
946 3053 : // when walproposer reconnects to safekeeper and writes some more data
947 3053 : // while first connection still gets some packets later. It might be
948 3053 : // better to not log this as error! above.
949 3053 : let write_lsn = self.wal_store.write_lsn();
950 3053 : let flush_lsn = self.wal_store.flush_lsn();
951 3053 : if write_lsn > msg.h.begin_lsn {
952 1 : bail!(
953 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
954 1 : write_lsn,
955 1 : msg.h.begin_lsn
956 1 : );
957 3052 : }
958 3052 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
959 0 : bail!(
960 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
961 0 : write_lsn,
962 0 : msg.h.begin_lsn,
963 0 : );
964 3052 : }
965 3052 :
966 3052 : // Now we know that we are in the same term as the proposer,
967 3052 : // processing the message.
968 3052 :
969 3052 : self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
970 3052 :
971 3052 : // do the job
972 3052 : if !msg.wal_data.is_empty() {
973 795 : self.wal_store
974 795 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
975 0 : .await?;
976 2257 : }
977 :
978 : // flush wal to the disk, if required
979 3052 : if require_flush {
980 3 : self.wal_store.flush_wal().await?;
981 3049 : }
982 :
983 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
984 : // manager, off of the WAL ingest hot path.
985 3052 : if msg.h.commit_lsn != Lsn(0) {
986 1837 : self.update_commit_lsn(msg.h.commit_lsn).await?;
987 1215 : }
988 : // Value calculated by walproposer can always lag:
989 : // - safekeepers can forget inmem value and send to proposer lower
990 : // persisted one on restart;
991 : // - if we make safekeepers always send persistent value,
992 : // any compute restart would pull it down.
993 : // Thus, take max before adopting.
994 3052 : self.state.inmem.peer_horizon_lsn =
995 3052 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
996 3052 :
997 3052 : trace!(
998 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
999 0 : msg.wal_data.len(),
1000 : msg.h.begin_lsn,
1001 : msg.h.end_lsn,
1002 : msg.h.commit_lsn,
1003 : msg.h.truncate_lsn,
1004 : require_flush,
1005 : );
1006 :
1007 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
1008 : // This is the common case for !require_flush, but a flush can still
1009 : // happen on segment bounds.
1010 3052 : if !require_flush && flush_lsn == self.flush_lsn() {
1011 3049 : return Ok(None);
1012 3 : }
1013 3 :
1014 3 : let resp = self.append_response();
1015 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1016 3053 : }
1017 :
1018 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1019 2528 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1020 2528 : self.wal_store.flush_wal().await?;
1021 2528 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1022 2528 : self.append_response(),
1023 2528 : )))
1024 2528 : }
1025 :
1026 : /// Update commit_lsn from peer safekeeper data.
1027 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1028 0 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
1029 : // Note: the check is too restrictive, generally we can update local
1030 : // commit_lsn if our history matches (is part of) history of advanced
1031 : // commit_lsn provider.
1032 0 : if sk_info.last_log_term == self.get_last_log_term() {
1033 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1034 0 : }
1035 0 : }
1036 0 : Ok(())
1037 0 : }
1038 : }
1039 :
1040 : #[cfg(test)]
1041 : mod tests {
1042 : use futures::future::BoxFuture;
1043 : use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
1044 :
1045 : use super::*;
1046 : use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
1047 : use std::{ops::Deref, str::FromStr, time::Instant};
1048 :
1049 : // fake storage for tests
1050 : struct InMemoryState {
1051 : persisted_state: TimelinePersistentState,
1052 : }
1053 :
1054 : impl control_file::Storage for InMemoryState {
1055 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1056 5 : self.persisted_state = s.clone();
1057 5 : Ok(())
1058 5 : }
1059 :
1060 0 : fn last_persist_at(&self) -> Instant {
1061 0 : Instant::now()
1062 0 : }
1063 : }
1064 :
1065 : impl Deref for InMemoryState {
1066 : type Target = TimelinePersistentState;
1067 :
1068 83 : fn deref(&self) -> &Self::Target {
1069 83 : &self.persisted_state
1070 83 : }
1071 : }
1072 :
1073 3 : fn test_sk_state() -> TimelinePersistentState {
1074 3 : let mut state = TimelinePersistentState::empty();
1075 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1076 3 : state.tenant_id = TenantId::from([1u8; 16]);
1077 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1078 3 : state
1079 3 : }
1080 :
1081 : struct DummyWalStore {
1082 : lsn: Lsn,
1083 : }
1084 :
1085 : impl wal_storage::Storage for DummyWalStore {
1086 4 : fn write_lsn(&self) -> Lsn {
1087 4 : self.lsn
1088 4 : }
1089 :
1090 19 : fn flush_lsn(&self) -> Lsn {
1091 19 : self.lsn
1092 19 : }
1093 :
1094 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1095 2 : Ok(())
1096 2 : }
1097 :
1098 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1099 3 : self.lsn = startpos + buf.len() as u64;
1100 3 : Ok(())
1101 3 : }
1102 :
1103 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1104 2 : self.lsn = end_pos;
1105 2 : Ok(())
1106 2 : }
1107 :
1108 5 : async fn flush_wal(&mut self) -> Result<()> {
1109 5 : Ok(())
1110 5 : }
1111 :
1112 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1113 0 : Box::pin(async { Ok(()) })
1114 0 : }
1115 :
1116 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1117 0 : crate::metrics::WalStorageMetrics::default()
1118 0 : }
1119 : }
1120 :
1121 : #[tokio::test]
1122 1 : async fn test_voting() {
1123 1 : let storage = InMemoryState {
1124 1 : persisted_state: test_sk_state(),
1125 1 : };
1126 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1127 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1128 1 :
1129 1 : // check voting for 1 is ok
1130 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1131 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1132 1 : match vote_resp.unwrap() {
1133 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1134 1 : r => panic!("unexpected response: {:?}", r),
1135 1 : }
1136 1 :
1137 1 : // reboot...
1138 1 : let state = sk.state.deref().clone();
1139 1 : let storage = InMemoryState {
1140 1 : persisted_state: state,
1141 1 : };
1142 1 :
1143 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1144 1 :
1145 1 : // and ensure voting second time for 1 is not ok
1146 1 : vote_resp = sk.process_msg(&vote_request).await;
1147 1 : match vote_resp.unwrap() {
1148 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1149 1 : r => panic!("unexpected response: {:?}", r),
1150 1 : }
1151 1 : }
1152 :
1153 : #[tokio::test]
1154 1 : async fn test_last_log_term_switch() {
1155 1 : let storage = InMemoryState {
1156 1 : persisted_state: test_sk_state(),
1157 1 : };
1158 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1159 1 :
1160 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1161 1 :
1162 1 : let mut ar_hdr = AppendRequestHeader {
1163 1 : term: 2,
1164 1 : term_start_lsn: Lsn(3),
1165 1 : begin_lsn: Lsn(1),
1166 1 : end_lsn: Lsn(2),
1167 1 : commit_lsn: Lsn(0),
1168 1 : truncate_lsn: Lsn(0),
1169 1 : proposer_uuid: [0; 16],
1170 1 : };
1171 1 : let mut append_request = AppendRequest {
1172 1 : h: ar_hdr.clone(),
1173 1 : wal_data: Bytes::from_static(b"b"),
1174 1 : };
1175 1 :
1176 1 : let pem = ProposerElected {
1177 1 : term: 2,
1178 1 : start_streaming_at: Lsn(1),
1179 1 : term_history: TermHistory(vec![
1180 1 : TermLsn {
1181 1 : term: 1,
1182 1 : lsn: Lsn(1),
1183 1 : },
1184 1 : TermLsn {
1185 1 : term: 2,
1186 1 : lsn: Lsn(3),
1187 1 : },
1188 1 : ]),
1189 1 : timeline_start_lsn: Lsn(1),
1190 1 : };
1191 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1192 1 : .await
1193 1 : .unwrap();
1194 1 :
1195 1 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1196 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1197 1 : .await
1198 1 : .unwrap();
1199 1 : assert_eq!(sk.get_last_log_term(), 1);
1200 1 :
1201 1 : // but record at term_start_lsn does the switch
1202 1 : ar_hdr.begin_lsn = Lsn(2);
1203 1 : ar_hdr.end_lsn = Lsn(3);
1204 1 : append_request = AppendRequest {
1205 1 : h: ar_hdr,
1206 1 : wal_data: Bytes::from_static(b"b"),
1207 1 : };
1208 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1209 1 : .await
1210 1 : .unwrap();
1211 1 : assert_eq!(sk.get_last_log_term(), 2);
1212 1 : }
1213 :
1214 : #[tokio::test]
1215 1 : async fn test_non_consecutive_write() {
1216 1 : let storage = InMemoryState {
1217 1 : persisted_state: test_sk_state(),
1218 1 : };
1219 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1220 1 :
1221 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1222 1 :
1223 1 : let pem = ProposerElected {
1224 1 : term: 1,
1225 1 : start_streaming_at: Lsn(1),
1226 1 : term_history: TermHistory(vec![TermLsn {
1227 1 : term: 1,
1228 1 : lsn: Lsn(1),
1229 1 : }]),
1230 1 : timeline_start_lsn: Lsn(1),
1231 1 : };
1232 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1233 1 : .await
1234 1 : .unwrap();
1235 1 :
1236 1 : let ar_hdr = AppendRequestHeader {
1237 1 : term: 1,
1238 1 : term_start_lsn: Lsn(3),
1239 1 : begin_lsn: Lsn(1),
1240 1 : end_lsn: Lsn(2),
1241 1 : commit_lsn: Lsn(0),
1242 1 : truncate_lsn: Lsn(0),
1243 1 : proposer_uuid: [0; 16],
1244 1 : };
1245 1 : let append_request = AppendRequest {
1246 1 : h: ar_hdr.clone(),
1247 1 : wal_data: Bytes::from_static(b"b"),
1248 1 : };
1249 1 :
1250 1 : // do write ending at 2, it should be ok
1251 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1252 1 : .await
1253 1 : .unwrap();
1254 1 : let mut ar_hrd2 = ar_hdr.clone();
1255 1 : ar_hrd2.begin_lsn = Lsn(4);
1256 1 : ar_hrd2.end_lsn = Lsn(5);
1257 1 : let append_request = AppendRequest {
1258 1 : h: ar_hdr,
1259 1 : wal_data: Bytes::from_static(b"b"),
1260 1 : };
1261 1 : // and now starting at 4, it must fail
1262 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1263 1 : .await
1264 1 : .unwrap_err();
1265 1 : }
1266 :
1267 : #[test]
1268 1 : fn test_find_highest_common_point_none() {
1269 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1270 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1271 1 : assert_eq!(
1272 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1273 1 : None
1274 1 : );
1275 1 : }
1276 :
1277 : #[test]
1278 1 : fn test_find_highest_common_point_middle() {
1279 1 : let prop_th = TermHistory(vec![
1280 1 : (1, Lsn(10)).into(),
1281 1 : (2, Lsn(20)).into(),
1282 1 : (4, Lsn(40)).into(),
1283 1 : ]);
1284 1 : let sk_th = TermHistory(vec![
1285 1 : (1, Lsn(10)).into(),
1286 1 : (2, Lsn(20)).into(),
1287 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1288 1 : ]);
1289 1 : assert_eq!(
1290 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1291 1 : Some(TermLsn {
1292 1 : term: 2,
1293 1 : lsn: Lsn(30),
1294 1 : })
1295 1 : );
1296 1 : }
1297 :
1298 : #[test]
1299 1 : fn test_find_highest_common_point_sk_end() {
1300 1 : let prop_th = TermHistory(vec![
1301 1 : (1, Lsn(10)).into(),
1302 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1303 1 : (4, Lsn(40)).into(),
1304 1 : ]);
1305 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1306 1 : assert_eq!(
1307 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1308 1 : Some(TermLsn {
1309 1 : term: 2,
1310 1 : lsn: Lsn(32),
1311 1 : })
1312 1 : );
1313 1 : }
1314 :
1315 : #[test]
1316 1 : fn test_find_highest_common_point_walprop() {
1317 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1318 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1319 1 : assert_eq!(
1320 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1321 1 : Some(TermLsn {
1322 1 : term: 2,
1323 1 : lsn: Lsn(32),
1324 1 : })
1325 1 : );
1326 1 : }
1327 :
1328 : #[test]
1329 1 : fn test_sk_state_bincode_serde_roundtrip() {
1330 : use utils::Hex;
1331 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1332 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1333 1 : let state = TimelinePersistentState {
1334 1 : tenant_id,
1335 1 : timeline_id,
1336 1 : acceptor_state: AcceptorState {
1337 1 : term: 42,
1338 1 : term_history: TermHistory(vec![TermLsn {
1339 1 : lsn: Lsn(0x1),
1340 1 : term: 41,
1341 1 : }]),
1342 1 : },
1343 1 : server: ServerInfo {
1344 1 : pg_version: 14,
1345 1 : system_id: 0x1234567887654321,
1346 1 : wal_seg_size: 0x12345678,
1347 1 : },
1348 1 : proposer_uuid: {
1349 1 : let mut arr = timeline_id.as_arr();
1350 1 : arr.reverse();
1351 1 : arr
1352 1 : },
1353 1 : timeline_start_lsn: Lsn(0x12345600),
1354 1 : local_start_lsn: Lsn(0x12),
1355 1 : commit_lsn: Lsn(1234567800),
1356 1 : backup_lsn: Lsn(1234567300),
1357 1 : peer_horizon_lsn: Lsn(9999999),
1358 1 : remote_consistent_lsn: Lsn(1234560000),
1359 1 : peers: PersistedPeers(vec![(
1360 1 : NodeId(1),
1361 1 : PersistedPeerInfo {
1362 1 : backup_lsn: Lsn(1234567000),
1363 1 : term: 42,
1364 1 : flush_lsn: Lsn(1234567800 - 8),
1365 1 : commit_lsn: Lsn(1234567600),
1366 1 : },
1367 1 : )]),
1368 1 : partial_backup: crate::wal_backup_partial::State::default(),
1369 1 : eviction_state: EvictionState::Present,
1370 1 : };
1371 1 :
1372 1 : let ser = state.ser().unwrap();
1373 1 :
1374 1 : #[rustfmt::skip]
1375 1 : let expected = [
1376 1 : // tenant_id as length prefixed hex
1377 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1378 1 : 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
1379 1 : // timeline_id as length prefixed hex
1380 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1381 1 : 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
1382 1 : // term
1383 1 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1384 1 : // length prefix
1385 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1386 1 : // unsure why this order is swapped
1387 1 : 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1388 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1389 1 : // pg_version
1390 1 : 0x0e, 0x00, 0x00, 0x00,
1391 1 : // systemid
1392 1 : 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
1393 1 : // wal_seg_size
1394 1 : 0x78, 0x56, 0x34, 0x12,
1395 1 : // pguuid as length prefixed hex
1396 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1397 1 : 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
1398 1 :
1399 1 : // timeline_start_lsn
1400 1 : 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
1401 1 : 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1402 1 : 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1403 1 : 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1404 1 : 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
1405 1 : 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1406 1 : // length prefix for persistentpeers
1407 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1408 1 : // nodeid
1409 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1410 1 : // backuplsn
1411 1 : 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1412 1 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1413 1 : 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1414 1 : 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1415 1 : // partial_backup
1416 1 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1417 1 : // eviction_state
1418 1 : 0x00, 0x00, 0x00, 0x00,
1419 1 : ];
1420 1 :
1421 1 : assert_eq!(Hex(&ser), Hex(&expected));
1422 :
1423 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1424 1 :
1425 1 : assert_eq!(deser, state);
1426 1 : }
1427 : }
|