Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
8 : use safekeeper_api::models::HotStandbyFeedback;
9 : use safekeeper_api::Term;
10 : use safekeeper_api::INVALID_TERM;
11 : use serde::{Deserialize, Serialize};
12 : use std::cmp::max;
13 : use std::cmp::min;
14 : use std::fmt;
15 : use std::io::Read;
16 : use storage_broker::proto::SafekeeperTimelineInfo;
17 :
18 : use tracing::*;
19 :
20 : use crate::control_file;
21 : use crate::metrics::MISC_OPERATION_SECONDS;
22 :
23 : use crate::state::TimelineState;
24 : use crate::wal_storage;
25 : use pq_proto::SystemId;
26 : use utils::pageserver_feedback::PageserverFeedback;
27 : use utils::{
28 : bin_ser::LeSer,
29 : id::{NodeId, TenantId, TimelineId},
30 : lsn::Lsn,
31 : };
32 :
33 : const SK_PROTOCOL_VERSION: u32 = 2;
34 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
35 :
36 4 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
37 : pub struct TermLsn {
38 : pub term: Term,
39 : pub lsn: Lsn,
40 : }
41 :
42 : // Creation from tuple provides less typing (e.g. for unit tests).
43 : impl From<(Term, Lsn)> for TermLsn {
44 18 : fn from(pair: (Term, Lsn)) -> TermLsn {
45 18 : TermLsn {
46 18 : term: pair.0,
47 18 : lsn: pair.1,
48 18 : }
49 18 : }
50 : }
51 :
52 5 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
53 : pub struct TermHistory(pub Vec<TermLsn>);
54 :
55 : impl TermHistory {
56 1461 : pub fn empty() -> TermHistory {
57 1461 : TermHistory(Vec::new())
58 1461 : }
59 :
60 : // Parse TermHistory as n_entries followed by TermLsn pairs
61 624 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
62 624 : if bytes.remaining() < 4 {
63 0 : bail!("TermHistory misses len");
64 624 : }
65 624 : let n_entries = bytes.get_u32_le();
66 624 : let mut res = Vec::with_capacity(n_entries as usize);
67 624 : for _ in 0..n_entries {
68 3328 : if bytes.remaining() < 16 {
69 0 : bail!("TermHistory is incomplete");
70 3328 : }
71 3328 : res.push(TermLsn {
72 3328 : term: bytes.get_u64_le(),
73 3328 : lsn: bytes.get_u64_le().into(),
74 3328 : })
75 : }
76 624 : Ok(TermHistory(res))
77 624 : }
78 :
79 : /// Return copy of self with switches happening strictly after up_to
80 : /// truncated.
81 3510 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
82 3510 : let mut res = Vec::with_capacity(self.0.len());
83 12696 : for e in &self.0 {
84 9198 : if e.lsn > up_to {
85 12 : break;
86 9186 : }
87 9186 : res.push(*e);
88 : }
89 3510 : TermHistory(res)
90 3510 : }
91 :
92 : /// Find point of divergence between leader (walproposer) term history and
93 : /// safekeeper. Arguments are not symmetric as proposer history ends at
94 : /// +infinity while safekeeper at flush_lsn.
95 : /// C version is at walproposer SendProposerElected.
96 630 : pub fn find_highest_common_point(
97 630 : prop_th: &TermHistory,
98 630 : sk_th: &TermHistory,
99 630 : sk_wal_end: Lsn,
100 630 : ) -> Option<TermLsn> {
101 630 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
102 :
103 630 : if let Some(sk_th_last) = sk_th.last() {
104 480 : assert!(
105 480 : sk_th_last.lsn <= sk_wal_end,
106 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
107 : sk_th_last,
108 : sk_wal_end
109 : );
110 150 : }
111 :
112 : // find last common term, if any...
113 630 : let mut last_common_idx = None;
114 2629 : for i in 0..min(sk_th.len(), prop_th.len()) {
115 2629 : if prop_th[i].term != sk_th[i].term {
116 11 : break;
117 2618 : }
118 2618 : // If term is the same, LSN must be equal as well.
119 2618 : assert!(
120 2618 : prop_th[i].lsn == sk_th[i].lsn,
121 0 : "same term {} has different start LSNs: prop {}, sk {}",
122 0 : prop_th[i].term,
123 0 : prop_th[i].lsn,
124 0 : sk_th[i].lsn
125 : );
126 2618 : last_common_idx = Some(i);
127 : }
128 630 : let last_common_idx = last_common_idx?;
129 : // Now find where it ends at both prop and sk and take min. End of
130 : // (common) term is the start of the next except it is the last one;
131 : // there it is flush_lsn in case of safekeeper or, in case of proposer
132 : // +infinity, so we just take flush_lsn then.
133 478 : if last_common_idx == prop_th.len() - 1 {
134 37 : Some(TermLsn {
135 37 : term: prop_th[last_common_idx].term,
136 37 : lsn: sk_wal_end,
137 37 : })
138 : } else {
139 441 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
140 441 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
141 9 : sk_th[last_common_idx + 1].lsn
142 : } else {
143 432 : sk_wal_end
144 : };
145 441 : Some(TermLsn {
146 441 : term: prop_th[last_common_idx].term,
147 441 : lsn: min(prop_common_term_end, sk_common_term_end),
148 441 : })
149 : }
150 630 : }
151 : }
152 :
153 : /// Display only latest entries for Debug.
154 : impl fmt::Debug for TermHistory {
155 351 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
156 351 : let n_printed = 20;
157 351 : write!(
158 351 : fmt,
159 351 : "{}{:?}",
160 351 : if self.0.len() > n_printed { "... " } else { "" },
161 351 : self.0
162 351 : .iter()
163 351 : .rev()
164 351 : .take(n_printed)
165 1292 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
166 351 : .collect::<Vec<_>>()
167 351 : )
168 351 : }
169 : }
170 :
171 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
172 : pub type PgUuid = [u8; 16];
173 :
174 : /// Persistent consensus state of the acceptor.
175 5 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
176 : pub struct AcceptorState {
177 : /// acceptor's last term it voted for (advanced in 1 phase)
178 : pub term: Term,
179 : /// History of term switches for safekeeper's WAL.
180 : /// Actually it often goes *beyond* WAL contents as we adopt term history
181 : /// from the proposer before recovery.
182 : pub term_history: TermHistory,
183 : }
184 :
185 : impl AcceptorState {
186 : /// acceptor's last_log_term is the term of the highest entry in the log
187 43 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
188 43 : let th = self.term_history.up_to(flush_lsn);
189 43 : match th.0.last() {
190 37 : Some(e) => e.term,
191 6 : None => 0,
192 : }
193 43 : }
194 : }
195 :
196 2 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
197 : pub struct PersistedPeerInfo {
198 : /// LSN up to which safekeeper offloaded WAL to s3.
199 : pub backup_lsn: Lsn,
200 : /// Term of the last entry.
201 : pub term: Term,
202 : /// LSN of the last record.
203 : pub flush_lsn: Lsn,
204 : /// Up to which LSN safekeeper regards its WAL as committed.
205 : pub commit_lsn: Lsn,
206 : }
207 :
208 : impl PersistedPeerInfo {
209 0 : pub fn new() -> Self {
210 0 : Self {
211 0 : backup_lsn: Lsn::INVALID,
212 0 : term: INVALID_TERM,
213 0 : flush_lsn: Lsn(0),
214 0 : commit_lsn: Lsn(0),
215 0 : }
216 0 : }
217 : }
218 :
219 : // make clippy happy
220 : impl Default for PersistedPeerInfo {
221 0 : fn default() -> Self {
222 0 : Self::new()
223 0 : }
224 : }
225 :
226 : // protocol messages
227 :
228 : /// Initial Proposer -> Acceptor message
229 19721 : #[derive(Debug, Deserialize)]
230 : pub struct ProposerGreeting {
231 : /// proposer-acceptor protocol version
232 : pub protocol_version: u32,
233 : /// Postgres server version
234 : pub pg_version: u32,
235 : pub proposer_id: PgUuid,
236 : pub system_id: SystemId,
237 : pub timeline_id: TimelineId,
238 : pub tenant_id: TenantId,
239 : pub tli: TimeLineID,
240 : pub wal_seg_size: u32,
241 : }
242 :
243 : /// Acceptor -> Proposer initial response: the highest term known to me
244 : /// (acceptor voted for).
245 : #[derive(Debug, Serialize)]
246 : pub struct AcceptorGreeting {
247 : term: u64,
248 : node_id: NodeId,
249 : }
250 :
251 : /// Vote request sent from proposer to safekeepers
252 2839 : #[derive(Debug, Deserialize)]
253 : pub struct VoteRequest {
254 : pub term: Term,
255 : }
256 :
257 : /// Vote itself, sent from safekeeper to proposer
258 : #[derive(Debug, Serialize)]
259 : pub struct VoteResponse {
260 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
261 : vote_given: u64, // fixme u64 due to padding
262 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
263 : // proposer to choose the most advanced one.
264 : pub flush_lsn: Lsn,
265 : truncate_lsn: Lsn,
266 : pub term_history: TermHistory,
267 : timeline_start_lsn: Lsn,
268 : }
269 :
270 : /*
271 : * Proposer -> Acceptor message announcing proposer is elected and communicating
272 : * term history to it.
273 : */
274 : #[derive(Debug)]
275 : pub struct ProposerElected {
276 : pub term: Term,
277 : pub start_streaming_at: Lsn,
278 : pub term_history: TermHistory,
279 : pub timeline_start_lsn: Lsn,
280 : }
281 :
282 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
283 : /// communicates commit_lsn.
284 : #[derive(Debug)]
285 : pub struct AppendRequest {
286 : pub h: AppendRequestHeader,
287 : pub wal_data: Bytes,
288 : }
289 2616 : #[derive(Debug, Clone, Deserialize)]
290 : pub struct AppendRequestHeader {
291 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
292 : pub term: Term,
293 : // TODO: remove this field from the protocol, it in unused -- LSN of term
294 : // switch can be taken from ProposerElected (as well as from term history).
295 : pub term_start_lsn: Lsn,
296 : /// start position of message in WAL
297 : pub begin_lsn: Lsn,
298 : /// end position of message in WAL
299 : pub end_lsn: Lsn,
300 : /// LSN committed by quorum of safekeepers
301 : pub commit_lsn: Lsn,
302 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
303 : pub truncate_lsn: Lsn,
304 : // only for logging/debugging
305 : pub proposer_uuid: PgUuid,
306 : }
307 :
308 : /// Report safekeeper state to proposer
309 : #[derive(Debug, Serialize, Clone)]
310 : pub struct AppendResponse {
311 : // Current term of the safekeeper; if it is higher than proposer's, the
312 : // compute is out of date.
313 : pub term: Term,
314 : // Flushed end of wal on safekeeper; one should be always mindful from what
315 : // term history this value comes, either checking history directly or
316 : // observing term being set to one for which WAL truncation is known to have
317 : // happened.
318 : pub flush_lsn: Lsn,
319 : // We report back our awareness about which WAL is committed, as this is
320 : // a criterion for walproposer --sync mode exit
321 : pub commit_lsn: Lsn,
322 : pub hs_feedback: HotStandbyFeedback,
323 : pub pageserver_feedback: Option<PageserverFeedback>,
324 : }
325 :
326 : impl AppendResponse {
327 0 : fn term_only(term: Term) -> AppendResponse {
328 0 : AppendResponse {
329 0 : term,
330 0 : flush_lsn: Lsn(0),
331 0 : commit_lsn: Lsn(0),
332 0 : hs_feedback: HotStandbyFeedback::empty(),
333 0 : pageserver_feedback: None,
334 0 : }
335 0 : }
336 : }
337 :
338 : /// Proposer -> Acceptor messages
339 : #[derive(Debug)]
340 : pub enum ProposerAcceptorMessage {
341 : Greeting(ProposerGreeting),
342 : VoteRequest(VoteRequest),
343 : Elected(ProposerElected),
344 : AppendRequest(AppendRequest),
345 : NoFlushAppendRequest(AppendRequest),
346 : FlushWAL,
347 : }
348 :
349 : impl ProposerAcceptorMessage {
350 : /// Parse proposer message.
351 25800 : pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
352 25800 : // xxx using Reader is inefficient but easy to work with bincode
353 25800 : let mut stream = msg_bytes.reader();
354 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
355 25800 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
356 25800 : match tag {
357 : 'g' => {
358 19721 : let msg = ProposerGreeting::des_from(&mut stream)?;
359 19721 : Ok(ProposerAcceptorMessage::Greeting(msg))
360 : }
361 : 'v' => {
362 2839 : let msg = VoteRequest::des_from(&mut stream)?;
363 2839 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
364 : }
365 : 'e' => {
366 624 : let mut msg_bytes = stream.into_inner();
367 624 : if msg_bytes.remaining() < 16 {
368 0 : bail!("ProposerElected message is not complete");
369 624 : }
370 624 : let term = msg_bytes.get_u64_le();
371 624 : let start_streaming_at = msg_bytes.get_u64_le().into();
372 624 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
373 624 : if msg_bytes.remaining() < 8 {
374 0 : bail!("ProposerElected message is not complete");
375 624 : }
376 624 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
377 624 : let msg = ProposerElected {
378 624 : term,
379 624 : start_streaming_at,
380 624 : timeline_start_lsn,
381 624 : term_history,
382 624 : };
383 624 : Ok(ProposerAcceptorMessage::Elected(msg))
384 : }
385 : 'a' => {
386 : // read header followed by wal data
387 2616 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
388 2616 : let rec_size = hdr
389 2616 : .end_lsn
390 2616 : .checked_sub(hdr.begin_lsn)
391 2616 : .context("begin_lsn > end_lsn in AppendRequest")?
392 : .0 as usize;
393 2616 : if rec_size > MAX_SEND_SIZE {
394 0 : bail!(
395 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
396 0 : MAX_SEND_SIZE
397 0 : );
398 2616 : }
399 2616 :
400 2616 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
401 2616 : stream.read_exact(&mut wal_data_vec)?;
402 2616 : let wal_data = Bytes::from(wal_data_vec);
403 2616 : let msg = AppendRequest { h: hdr, wal_data };
404 2616 :
405 2616 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
406 : }
407 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
408 : }
409 25800 : }
410 :
411 : /// The memory size of the message, including byte slices.
412 0 : pub fn size(&self) -> usize {
413 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
414 :
415 : // For most types, the size is just the base enum size including the nested structs. Some
416 : // types also contain byte slices; add them.
417 : //
418 : // We explicitly list all fields, to draw attention here when new fields are added.
419 0 : let mut size = BASE_SIZE;
420 0 : size += match self {
421 : Self::Greeting(ProposerGreeting {
422 : protocol_version: _,
423 : pg_version: _,
424 : proposer_id: _,
425 : system_id: _,
426 : timeline_id: _,
427 : tenant_id: _,
428 : tli: _,
429 : wal_seg_size: _,
430 0 : }) => 0,
431 :
432 0 : Self::VoteRequest(VoteRequest { term: _ }) => 0,
433 :
434 : Self::Elected(ProposerElected {
435 : term: _,
436 : start_streaming_at: _,
437 : term_history: _,
438 : timeline_start_lsn: _,
439 0 : }) => 0,
440 :
441 : Self::AppendRequest(AppendRequest {
442 : h:
443 : AppendRequestHeader {
444 : term: _,
445 : term_start_lsn: _,
446 : begin_lsn: _,
447 : end_lsn: _,
448 : commit_lsn: _,
449 : truncate_lsn: _,
450 : proposer_uuid: _,
451 : },
452 0 : wal_data,
453 0 : }) => wal_data.len(),
454 :
455 : Self::NoFlushAppendRequest(AppendRequest {
456 : h:
457 : AppendRequestHeader {
458 : term: _,
459 : term_start_lsn: _,
460 : begin_lsn: _,
461 : end_lsn: _,
462 : commit_lsn: _,
463 : truncate_lsn: _,
464 : proposer_uuid: _,
465 : },
466 0 : wal_data,
467 0 : }) => wal_data.len(),
468 :
469 0 : Self::FlushWAL => 0,
470 : };
471 :
472 0 : size
473 0 : }
474 : }
475 :
476 : /// Acceptor -> Proposer messages
477 : #[derive(Debug)]
478 : pub enum AcceptorProposerMessage {
479 : Greeting(AcceptorGreeting),
480 : VoteResponse(VoteResponse),
481 : AppendResponse(AppendResponse),
482 : }
483 :
484 : impl AcceptorProposerMessage {
485 : /// Serialize acceptor -> proposer message.
486 24736 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
487 24736 : match self {
488 19721 : AcceptorProposerMessage::Greeting(msg) => {
489 19721 : buf.put_u64_le('g' as u64);
490 19721 : buf.put_u64_le(msg.term);
491 19721 : buf.put_u64_le(msg.node_id.0);
492 19721 : }
493 2839 : AcceptorProposerMessage::VoteResponse(msg) => {
494 2839 : buf.put_u64_le('v' as u64);
495 2839 : buf.put_u64_le(msg.term);
496 2839 : buf.put_u64_le(msg.vote_given);
497 2839 : buf.put_u64_le(msg.flush_lsn.into());
498 2839 : buf.put_u64_le(msg.truncate_lsn.into());
499 2839 : buf.put_u32_le(msg.term_history.0.len() as u32);
500 9170 : for e in &msg.term_history.0 {
501 6331 : buf.put_u64_le(e.term);
502 6331 : buf.put_u64_le(e.lsn.into());
503 6331 : }
504 2839 : buf.put_u64_le(msg.timeline_start_lsn.into());
505 : }
506 2176 : AcceptorProposerMessage::AppendResponse(msg) => {
507 2176 : buf.put_u64_le('a' as u64);
508 2176 : buf.put_u64_le(msg.term);
509 2176 : buf.put_u64_le(msg.flush_lsn.into());
510 2176 : buf.put_u64_le(msg.commit_lsn.into());
511 2176 : buf.put_i64_le(msg.hs_feedback.ts);
512 2176 : buf.put_u64_le(msg.hs_feedback.xmin);
513 2176 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
514 :
515 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
516 : // if it is not present.
517 2176 : if let Some(ref msg) = msg.pageserver_feedback {
518 0 : msg.serialize(buf);
519 2176 : }
520 : }
521 : }
522 :
523 24736 : Ok(())
524 24736 : }
525 : }
526 :
527 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
528 : /// It controls all WAL disk writes and updates of control file.
529 : ///
530 : /// Currently safekeeper processes:
531 : /// - messages from compute (proposers) and provides replies
532 : /// - messages from broker peers
533 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
534 : /// LSN since the proposer safekeeper currently talking to appends WAL;
535 : /// determines last_log_term switch point.
536 : pub term_start_lsn: Lsn,
537 :
538 : pub state: TimelineState<CTRL>, // persistent state storage
539 : pub wal_store: WAL,
540 :
541 : node_id: NodeId, // safekeeper's node id
542 : }
543 :
544 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
545 : where
546 : CTRL: control_file::Storage,
547 : WAL: wal_storage::Storage,
548 : {
549 : /// Accepts a control file storage containing the safekeeper state.
550 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
551 : /// and `server` (`wal_seg_size` inside it) fields.
552 8922 : pub fn new(
553 8922 : state: TimelineState<CTRL>,
554 8922 : wal_store: WAL,
555 8922 : node_id: NodeId,
556 8922 : ) -> Result<SafeKeeper<CTRL, WAL>> {
557 8922 : if state.tenant_id == TenantId::from([0u8; 16])
558 8922 : || state.timeline_id == TimelineId::from([0u8; 16])
559 : {
560 0 : bail!(
561 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
562 0 : state.tenant_id,
563 0 : state.timeline_id
564 0 : );
565 8922 : }
566 8922 :
567 8922 : Ok(SafeKeeper {
568 8922 : term_start_lsn: Lsn(0),
569 8922 : state,
570 8922 : wal_store,
571 8922 : node_id,
572 8922 : })
573 8922 : }
574 :
575 : /// Get history of term switches for the available WAL
576 3467 : fn get_term_history(&self) -> TermHistory {
577 3467 : self.state
578 3467 : .acceptor_state
579 3467 : .term_history
580 3467 : .up_to(self.flush_lsn())
581 3467 : }
582 :
583 43 : pub fn get_last_log_term(&self) -> Term {
584 43 : self.state
585 43 : .acceptor_state
586 43 : .get_last_log_term(self.flush_lsn())
587 43 : }
588 :
589 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
590 13380 : pub fn flush_lsn(&self) -> Lsn {
591 13380 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
592 13380 : }
593 :
594 : /// Process message from proposer and possibly form reply. Concurrent
595 : /// callers must exclude each other.
596 27984 : pub async fn process_msg(
597 27984 : &mut self,
598 27984 : msg: &ProposerAcceptorMessage,
599 27984 : ) -> Result<Option<AcceptorProposerMessage>> {
600 27984 : match msg {
601 19721 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
602 2841 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
603 626 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
604 4 : ProposerAcceptorMessage::AppendRequest(msg) => {
605 4 : self.handle_append_request(msg, true).await
606 : }
607 2616 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
608 2616 : self.handle_append_request(msg, false).await
609 : }
610 2176 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
611 : }
612 27984 : }
613 :
614 : /// Handle initial message from proposer: check its sanity and send my
615 : /// current term.
616 19721 : async fn handle_greeting(
617 19721 : &mut self,
618 19721 : msg: &ProposerGreeting,
619 19721 : ) -> Result<Option<AcceptorProposerMessage>> {
620 19721 : // Check protocol compatibility
621 19721 : if msg.protocol_version != SK_PROTOCOL_VERSION {
622 0 : bail!(
623 0 : "incompatible protocol version {}, expected {}",
624 0 : msg.protocol_version,
625 0 : SK_PROTOCOL_VERSION
626 0 : );
627 19721 : }
628 19721 : /* Postgres major version mismatch is treated as fatal error
629 19721 : * because safekeepers parse WAL headers and the format
630 19721 : * may change between versions.
631 19721 : */
632 19721 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
633 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
634 : {
635 0 : bail!(
636 0 : "incompatible server version {}, expected {}",
637 0 : msg.pg_version,
638 0 : self.state.server.pg_version
639 0 : );
640 19721 : }
641 19721 :
642 19721 : if msg.tenant_id != self.state.tenant_id {
643 0 : bail!(
644 0 : "invalid tenant ID, got {}, expected {}",
645 0 : msg.tenant_id,
646 0 : self.state.tenant_id
647 0 : );
648 19721 : }
649 19721 : if msg.timeline_id != self.state.timeline_id {
650 0 : bail!(
651 0 : "invalid timeline ID, got {}, expected {}",
652 0 : msg.timeline_id,
653 0 : self.state.timeline_id
654 0 : );
655 19721 : }
656 19721 : if self.state.server.wal_seg_size != msg.wal_seg_size {
657 0 : bail!(
658 0 : "invalid wal_seg_size, got {}, expected {}",
659 0 : msg.wal_seg_size,
660 0 : self.state.server.wal_seg_size
661 0 : );
662 19721 : }
663 19721 :
664 19721 : // system_id will be updated on mismatch
665 19721 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
666 19721 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
667 0 : if self.state.server.system_id != 0 {
668 0 : warn!(
669 0 : "unexpected system ID arrived, got {}, expected {}",
670 0 : msg.system_id, self.state.server.system_id
671 : );
672 0 : }
673 :
674 0 : let mut state = self.state.start_change();
675 0 : state.server.system_id = msg.system_id;
676 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
677 0 : state.server.pg_version = msg.pg_version;
678 0 : }
679 0 : self.state.finish_change(&state).await?;
680 19721 : }
681 :
682 19721 : info!(
683 0 : "processed greeting from walproposer {}, sending term {:?}",
684 2416 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
685 0 : self.state.acceptor_state.term
686 : );
687 19721 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
688 19721 : term: self.state.acceptor_state.term,
689 19721 : node_id: self.node_id,
690 19721 : })))
691 19721 : }
692 :
693 : /// Give vote for the given term, if we haven't done that previously.
694 2841 : async fn handle_vote_request(
695 2841 : &mut self,
696 2841 : msg: &VoteRequest,
697 2841 : ) -> Result<Option<AcceptorProposerMessage>> {
698 2841 : // Once voted, we won't accept data from older proposers; flush
699 2841 : // everything we've already received so that new proposer starts
700 2841 : // streaming at end of our WAL, without overlap. Currently we truncate
701 2841 : // WAL at streaming point, so this avoids truncating already committed
702 2841 : // WAL.
703 2841 : //
704 2841 : // TODO: it would be smoother to not truncate committed piece at
705 2841 : // handle_elected instead. Currently not a big deal, as proposer is the
706 2841 : // only source of WAL; with peer2peer recovery it would be more
707 2841 : // important.
708 2841 : self.wal_store.flush_wal().await?;
709 : // initialize with refusal
710 2841 : let mut resp = VoteResponse {
711 2841 : term: self.state.acceptor_state.term,
712 2841 : vote_given: false as u64,
713 2841 : flush_lsn: self.flush_lsn(),
714 2841 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
715 2841 : term_history: self.get_term_history(),
716 2841 : timeline_start_lsn: self.state.timeline_start_lsn,
717 2841 : };
718 2841 : if self.state.acceptor_state.term < msg.term {
719 2696 : let mut state = self.state.start_change();
720 2696 : state.acceptor_state.term = msg.term;
721 2696 : // persist vote before sending it out
722 2696 : self.state.finish_change(&state).await?;
723 :
724 2696 : resp.term = self.state.acceptor_state.term;
725 2696 : resp.vote_given = true as u64;
726 145 : }
727 2841 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
728 2841 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
729 2841 : }
730 :
731 : /// Form AppendResponse from current state.
732 2179 : fn append_response(&self) -> AppendResponse {
733 2179 : let ar = AppendResponse {
734 2179 : term: self.state.acceptor_state.term,
735 2179 : flush_lsn: self.flush_lsn(),
736 2179 : commit_lsn: self.state.commit_lsn,
737 2179 : // will be filled by the upper code to avoid bothering safekeeper
738 2179 : hs_feedback: HotStandbyFeedback::empty(),
739 2179 : pageserver_feedback: None,
740 2179 : };
741 2179 : trace!("formed AppendResponse {:?}", ar);
742 2179 : ar
743 2179 : }
744 :
745 626 : async fn handle_elected(
746 626 : &mut self,
747 626 : msg: &ProposerElected,
748 626 : ) -> Result<Option<AcceptorProposerMessage>> {
749 626 : let _timer = MISC_OPERATION_SECONDS
750 626 : .with_label_values(&["handle_elected"])
751 626 : .start_timer();
752 626 :
753 626 : info!(
754 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
755 0 : msg,
756 0 : self.state.acceptor_state.term,
757 0 : self.get_last_log_term(),
758 0 : self.flush_lsn()
759 : );
760 626 : if self.state.acceptor_state.term < msg.term {
761 2 : let mut state = self.state.start_change();
762 2 : state.acceptor_state.term = msg.term;
763 2 : self.state.finish_change(&state).await?;
764 624 : }
765 :
766 : // If our term is higher, ignore the message (next feedback will inform the compute)
767 626 : if self.state.acceptor_state.term > msg.term {
768 0 : return Ok(None);
769 626 : }
770 626 :
771 626 : // Before truncating WAL check-cross the check divergence point received
772 626 : // from the walproposer.
773 626 : let sk_th = self.get_term_history();
774 626 : let last_common_point = match TermHistory::find_highest_common_point(
775 626 : &msg.term_history,
776 626 : &sk_th,
777 626 : self.flush_lsn(),
778 626 : ) {
779 : // No common point. Expect streaming from the beginning of the
780 : // history like walproposer while we don't have proper init.
781 151 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
782 151 : "empty walproposer term history {:?}",
783 151 : msg.term_history
784 151 : ))?,
785 475 : Some(lcp) => lcp,
786 : };
787 : // This is expected to happen in a rare race when another connection
788 : // from the same walproposer writes + flushes WAL after this connection
789 : // sent flush_lsn in VoteRequest; for instance, very late
790 : // ProposerElected message delivery after another connection was
791 : // established and wrote WAL. In such cases error is transient;
792 : // reconnection makes safekeeper send newest term history and flush_lsn
793 : // and walproposer recalculates the streaming point. OTOH repeating
794 : // error indicates a serious bug.
795 626 : if last_common_point.lsn != msg.start_streaming_at {
796 0 : bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
797 0 : last_common_point, msg.start_streaming_at,
798 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
799 0 : );
800 626 : }
801 626 :
802 626 : // We are also expected to never attempt to truncate committed data.
803 626 : assert!(
804 626 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
805 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
806 0 : msg.start_streaming_at, self.state.inmem.commit_lsn,
807 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
808 : );
809 :
810 : // Before first WAL write initialize its segment. It makes first segment
811 : // pg_waldump'able because stream from compute doesn't include its
812 : // segment and page headers.
813 : //
814 : // If we fail before first WAL write flush this action would be
815 : // repeated, that's ok because it is idempotent.
816 626 : if self.wal_store.flush_lsn() == Lsn::INVALID {
817 150 : self.wal_store
818 150 : .initialize_first_segment(msg.start_streaming_at)
819 150 : .await?;
820 476 : }
821 :
822 : // truncate wal, update the LSNs
823 626 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
824 :
825 : // and now adopt term history from proposer
826 : {
827 626 : let mut state = self.state.start_change();
828 626 :
829 626 : // Here we learn initial LSN for the first time, set fields
830 626 : // interested in that.
831 626 :
832 626 : if state.timeline_start_lsn == Lsn(0) {
833 : // Remember point where WAL begins globally.
834 150 : state.timeline_start_lsn = msg.timeline_start_lsn;
835 150 : info!(
836 0 : "setting timeline_start_lsn to {:?}",
837 : state.timeline_start_lsn
838 : );
839 476 : }
840 626 : if state.peer_horizon_lsn == Lsn(0) {
841 150 : // Update peer_horizon_lsn as soon as we know where timeline starts.
842 150 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
843 150 : state.peer_horizon_lsn = msg.timeline_start_lsn;
844 476 : }
845 626 : if state.local_start_lsn == Lsn(0) {
846 150 : state.local_start_lsn = msg.start_streaming_at;
847 150 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
848 476 : }
849 : // Initializing commit_lsn before acking first flushed record is
850 : // important to let find_end_of_wal skip the hole in the beginning
851 : // of the first segment.
852 : //
853 : // NB: on new clusters, this happens at the same time as
854 : // timeline_start_lsn initialization, it is taken outside to provide
855 : // upgrade.
856 626 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
857 626 :
858 626 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
859 626 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
860 626 : // similar for remote_consistent_lsn
861 626 : state.remote_consistent_lsn =
862 626 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
863 626 :
864 626 : state.acceptor_state.term_history = msg.term_history.clone();
865 626 : self.state.finish_change(&state).await?;
866 : }
867 :
868 626 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
869 :
870 : // Cache LSN where term starts to immediately fsync control file with
871 : // commit_lsn once we reach it -- sync-safekeepers finishes when
872 : // persisted commit_lsn on majority of safekeepers aligns.
873 626 : self.term_start_lsn = match msg.term_history.0.last() {
874 0 : None => bail!("proposer elected with empty term history"),
875 626 : Some(term_lsn_start) => term_lsn_start.lsn,
876 626 : };
877 626 :
878 626 : Ok(None)
879 626 : }
880 :
881 : /// Advance commit_lsn taking into account what we have locally.
882 : ///
883 : /// Note: it is assumed that 'WAL we have is from the right term' check has
884 : /// already been done outside.
885 1567 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
886 1567 : // Both peers and walproposer communicate this value, we might already
887 1567 : // have a fresher (higher) version.
888 1567 : candidate = max(candidate, self.state.inmem.commit_lsn);
889 1567 : let commit_lsn = min(candidate, self.flush_lsn());
890 1567 : assert!(
891 1567 : commit_lsn >= self.state.inmem.commit_lsn,
892 0 : "commit_lsn monotonicity violated: old={} new={}",
893 : self.state.inmem.commit_lsn,
894 : commit_lsn
895 : );
896 :
897 1567 : self.state.inmem.commit_lsn = commit_lsn;
898 1567 :
899 1567 : // If new commit_lsn reached term switch, force sync of control
900 1567 : // file: walproposer in sync mode is very interested when this
901 1567 : // happens. Note: this is for sync-safekeepers mode only, as
902 1567 : // otherwise commit_lsn might jump over term_start_lsn.
903 1567 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
904 83 : self.state.flush().await?;
905 1484 : }
906 :
907 1567 : Ok(())
908 1567 : }
909 :
910 : /// Handle request to append WAL.
911 : #[allow(clippy::comparison_chain)]
912 2620 : async fn handle_append_request(
913 2620 : &mut self,
914 2620 : msg: &AppendRequest,
915 2620 : require_flush: bool,
916 2620 : ) -> Result<Option<AcceptorProposerMessage>> {
917 2620 : if self.state.acceptor_state.term < msg.h.term {
918 0 : bail!("got AppendRequest before ProposerElected");
919 2620 : }
920 2620 :
921 2620 : // If our term is higher, immediately refuse the message.
922 2620 : if self.state.acceptor_state.term > msg.h.term {
923 0 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
924 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
925 2620 : }
926 2620 :
927 2620 : // Disallow any non-sequential writes, which can result in gaps or
928 2620 : // overwrites. If we need to move the pointer, ProposerElected message
929 2620 : // should have truncated WAL first accordingly. Note that the first
930 2620 : // condition (WAL rewrite) is quite expected in real world; it happens
931 2620 : // when walproposer reconnects to safekeeper and writes some more data
932 2620 : // while first connection still gets some packets later. It might be
933 2620 : // better to not log this as error! above.
934 2620 : let write_lsn = self.wal_store.write_lsn();
935 2620 : let flush_lsn = self.wal_store.flush_lsn();
936 2620 : if write_lsn > msg.h.begin_lsn {
937 1 : bail!(
938 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
939 1 : write_lsn,
940 1 : msg.h.begin_lsn
941 1 : );
942 2619 : }
943 2619 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
944 0 : bail!(
945 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
946 0 : write_lsn,
947 0 : msg.h.begin_lsn,
948 0 : );
949 2619 : }
950 2619 :
951 2619 : // Now we know that we are in the same term as the proposer,
952 2619 : // processing the message.
953 2619 :
954 2619 : self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
955 2619 :
956 2619 : // do the job
957 2619 : if !msg.wal_data.is_empty() {
958 734 : self.wal_store
959 734 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
960 734 : .await?;
961 1885 : }
962 :
963 : // flush wal to the disk, if required
964 2619 : if require_flush {
965 3 : self.wal_store.flush_wal().await?;
966 2616 : }
967 :
968 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
969 : // manager, off of the WAL ingest hot path.
970 2619 : if msg.h.commit_lsn != Lsn(0) {
971 1567 : self.update_commit_lsn(msg.h.commit_lsn).await?;
972 1052 : }
973 : // Value calculated by walproposer can always lag:
974 : // - safekeepers can forget inmem value and send to proposer lower
975 : // persisted one on restart;
976 : // - if we make safekeepers always send persistent value,
977 : // any compute restart would pull it down.
978 : // Thus, take max before adopting.
979 2619 : self.state.inmem.peer_horizon_lsn =
980 2619 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
981 2619 :
982 2619 : trace!(
983 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
984 0 : msg.wal_data.len(),
985 : msg.h.begin_lsn,
986 : msg.h.end_lsn,
987 : msg.h.commit_lsn,
988 : msg.h.truncate_lsn,
989 : require_flush,
990 : );
991 :
992 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
993 : // This is the common case for !require_flush, but a flush can still
994 : // happen on segment bounds.
995 2619 : if !require_flush && flush_lsn == self.flush_lsn() {
996 2616 : return Ok(None);
997 3 : }
998 3 :
999 3 : let resp = self.append_response();
1000 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1001 2620 : }
1002 :
1003 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1004 2176 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1005 2176 : self.wal_store.flush_wal().await?;
1006 2176 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1007 2176 : self.append_response(),
1008 2176 : )))
1009 2176 : }
1010 :
1011 : /// Update commit_lsn from peer safekeeper data.
1012 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1013 0 : if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
1014 : // Note: the check is too restrictive, generally we can update local
1015 : // commit_lsn if our history matches (is part of) history of advanced
1016 : // commit_lsn provider.
1017 0 : if sk_info.last_log_term == self.get_last_log_term() {
1018 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1019 0 : }
1020 0 : }
1021 0 : Ok(())
1022 0 : }
1023 : }
1024 :
1025 : #[cfg(test)]
1026 : mod tests {
1027 : use futures::future::BoxFuture;
1028 : use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
1029 : use safekeeper_api::ServerInfo;
1030 :
1031 : use super::*;
1032 : use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
1033 : use std::{ops::Deref, str::FromStr, time::Instant};
1034 :
1035 : // fake storage for tests
1036 : struct InMemoryState {
1037 : persisted_state: TimelinePersistentState,
1038 : }
1039 :
1040 : impl control_file::Storage for InMemoryState {
1041 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1042 5 : self.persisted_state = s.clone();
1043 5 : Ok(())
1044 5 : }
1045 :
1046 0 : fn last_persist_at(&self) -> Instant {
1047 0 : Instant::now()
1048 0 : }
1049 : }
1050 :
1051 : impl Deref for InMemoryState {
1052 : type Target = TimelinePersistentState;
1053 :
1054 83 : fn deref(&self) -> &Self::Target {
1055 83 : &self.persisted_state
1056 83 : }
1057 : }
1058 :
1059 3 : fn test_sk_state() -> TimelinePersistentState {
1060 3 : let mut state = TimelinePersistentState::empty();
1061 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1062 3 : state.tenant_id = TenantId::from([1u8; 16]);
1063 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1064 3 : state
1065 3 : }
1066 :
1067 : struct DummyWalStore {
1068 : lsn: Lsn,
1069 : }
1070 :
1071 : impl wal_storage::Storage for DummyWalStore {
1072 4 : fn write_lsn(&self) -> Lsn {
1073 4 : self.lsn
1074 4 : }
1075 :
1076 19 : fn flush_lsn(&self) -> Lsn {
1077 19 : self.lsn
1078 19 : }
1079 :
1080 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1081 2 : Ok(())
1082 2 : }
1083 :
1084 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1085 3 : self.lsn = startpos + buf.len() as u64;
1086 3 : Ok(())
1087 3 : }
1088 :
1089 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1090 2 : self.lsn = end_pos;
1091 2 : Ok(())
1092 2 : }
1093 :
1094 5 : async fn flush_wal(&mut self) -> Result<()> {
1095 5 : Ok(())
1096 5 : }
1097 :
1098 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1099 0 : Box::pin(async { Ok(()) })
1100 0 : }
1101 :
1102 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1103 0 : crate::metrics::WalStorageMetrics::default()
1104 0 : }
1105 : }
1106 :
1107 : #[tokio::test]
1108 1 : async fn test_voting() {
1109 1 : let storage = InMemoryState {
1110 1 : persisted_state: test_sk_state(),
1111 1 : };
1112 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1113 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1114 1 :
1115 1 : // check voting for 1 is ok
1116 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1117 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1118 1 : match vote_resp.unwrap() {
1119 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1120 1 : r => panic!("unexpected response: {:?}", r),
1121 1 : }
1122 1 :
1123 1 : // reboot...
1124 1 : let state = sk.state.deref().clone();
1125 1 : let storage = InMemoryState {
1126 1 : persisted_state: state,
1127 1 : };
1128 1 :
1129 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1130 1 :
1131 1 : // and ensure voting second time for 1 is not ok
1132 1 : vote_resp = sk.process_msg(&vote_request).await;
1133 1 : match vote_resp.unwrap() {
1134 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1135 1 : r => panic!("unexpected response: {:?}", r),
1136 1 : }
1137 1 : }
1138 :
1139 : #[tokio::test]
1140 1 : async fn test_last_log_term_switch() {
1141 1 : let storage = InMemoryState {
1142 1 : persisted_state: test_sk_state(),
1143 1 : };
1144 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1145 1 :
1146 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1147 1 :
1148 1 : let mut ar_hdr = AppendRequestHeader {
1149 1 : term: 2,
1150 1 : term_start_lsn: Lsn(3),
1151 1 : begin_lsn: Lsn(1),
1152 1 : end_lsn: Lsn(2),
1153 1 : commit_lsn: Lsn(0),
1154 1 : truncate_lsn: Lsn(0),
1155 1 : proposer_uuid: [0; 16],
1156 1 : };
1157 1 : let mut append_request = AppendRequest {
1158 1 : h: ar_hdr.clone(),
1159 1 : wal_data: Bytes::from_static(b"b"),
1160 1 : };
1161 1 :
1162 1 : let pem = ProposerElected {
1163 1 : term: 2,
1164 1 : start_streaming_at: Lsn(1),
1165 1 : term_history: TermHistory(vec![
1166 1 : TermLsn {
1167 1 : term: 1,
1168 1 : lsn: Lsn(1),
1169 1 : },
1170 1 : TermLsn {
1171 1 : term: 2,
1172 1 : lsn: Lsn(3),
1173 1 : },
1174 1 : ]),
1175 1 : timeline_start_lsn: Lsn(1),
1176 1 : };
1177 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1178 1 : .await
1179 1 : .unwrap();
1180 1 :
1181 1 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1182 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1183 1 : .await
1184 1 : .unwrap();
1185 1 : assert_eq!(sk.get_last_log_term(), 1);
1186 1 :
1187 1 : // but record at term_start_lsn does the switch
1188 1 : ar_hdr.begin_lsn = Lsn(2);
1189 1 : ar_hdr.end_lsn = Lsn(3);
1190 1 : append_request = AppendRequest {
1191 1 : h: ar_hdr,
1192 1 : wal_data: Bytes::from_static(b"b"),
1193 1 : };
1194 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1195 1 : .await
1196 1 : .unwrap();
1197 1 : assert_eq!(sk.get_last_log_term(), 2);
1198 1 : }
1199 :
1200 : #[tokio::test]
1201 1 : async fn test_non_consecutive_write() {
1202 1 : let storage = InMemoryState {
1203 1 : persisted_state: test_sk_state(),
1204 1 : };
1205 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1206 1 :
1207 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1208 1 :
1209 1 : let pem = ProposerElected {
1210 1 : term: 1,
1211 1 : start_streaming_at: Lsn(1),
1212 1 : term_history: TermHistory(vec![TermLsn {
1213 1 : term: 1,
1214 1 : lsn: Lsn(1),
1215 1 : }]),
1216 1 : timeline_start_lsn: Lsn(1),
1217 1 : };
1218 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1219 1 : .await
1220 1 : .unwrap();
1221 1 :
1222 1 : let ar_hdr = AppendRequestHeader {
1223 1 : term: 1,
1224 1 : term_start_lsn: Lsn(3),
1225 1 : begin_lsn: Lsn(1),
1226 1 : end_lsn: Lsn(2),
1227 1 : commit_lsn: Lsn(0),
1228 1 : truncate_lsn: Lsn(0),
1229 1 : proposer_uuid: [0; 16],
1230 1 : };
1231 1 : let append_request = AppendRequest {
1232 1 : h: ar_hdr.clone(),
1233 1 : wal_data: Bytes::from_static(b"b"),
1234 1 : };
1235 1 :
1236 1 : // do write ending at 2, it should be ok
1237 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1238 1 : .await
1239 1 : .unwrap();
1240 1 : let mut ar_hrd2 = ar_hdr.clone();
1241 1 : ar_hrd2.begin_lsn = Lsn(4);
1242 1 : ar_hrd2.end_lsn = Lsn(5);
1243 1 : let append_request = AppendRequest {
1244 1 : h: ar_hdr,
1245 1 : wal_data: Bytes::from_static(b"b"),
1246 1 : };
1247 1 : // and now starting at 4, it must fail
1248 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1249 1 : .await
1250 1 : .unwrap_err();
1251 1 : }
1252 :
1253 : #[test]
1254 1 : fn test_find_highest_common_point_none() {
1255 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1256 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1257 1 : assert_eq!(
1258 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1259 1 : None
1260 1 : );
1261 1 : }
1262 :
1263 : #[test]
1264 1 : fn test_find_highest_common_point_middle() {
1265 1 : let prop_th = TermHistory(vec![
1266 1 : (1, Lsn(10)).into(),
1267 1 : (2, Lsn(20)).into(),
1268 1 : (4, Lsn(40)).into(),
1269 1 : ]);
1270 1 : let sk_th = TermHistory(vec![
1271 1 : (1, Lsn(10)).into(),
1272 1 : (2, Lsn(20)).into(),
1273 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1274 1 : ]);
1275 1 : assert_eq!(
1276 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1277 1 : Some(TermLsn {
1278 1 : term: 2,
1279 1 : lsn: Lsn(30),
1280 1 : })
1281 1 : );
1282 1 : }
1283 :
1284 : #[test]
1285 1 : fn test_find_highest_common_point_sk_end() {
1286 1 : let prop_th = TermHistory(vec![
1287 1 : (1, Lsn(10)).into(),
1288 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1289 1 : (4, Lsn(40)).into(),
1290 1 : ]);
1291 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1292 1 : assert_eq!(
1293 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1294 1 : Some(TermLsn {
1295 1 : term: 2,
1296 1 : lsn: Lsn(32),
1297 1 : })
1298 1 : );
1299 1 : }
1300 :
1301 : #[test]
1302 1 : fn test_find_highest_common_point_walprop() {
1303 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1304 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1305 1 : assert_eq!(
1306 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1307 1 : Some(TermLsn {
1308 1 : term: 2,
1309 1 : lsn: Lsn(32),
1310 1 : })
1311 1 : );
1312 1 : }
1313 :
1314 : #[test]
1315 1 : fn test_sk_state_bincode_serde_roundtrip() {
1316 : use utils::Hex;
1317 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1318 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1319 1 : let state = TimelinePersistentState {
1320 1 : tenant_id,
1321 1 : timeline_id,
1322 1 : acceptor_state: AcceptorState {
1323 1 : term: 42,
1324 1 : term_history: TermHistory(vec![TermLsn {
1325 1 : lsn: Lsn(0x1),
1326 1 : term: 41,
1327 1 : }]),
1328 1 : },
1329 1 : server: ServerInfo {
1330 1 : pg_version: 14,
1331 1 : system_id: 0x1234567887654321,
1332 1 : wal_seg_size: 0x12345678,
1333 1 : },
1334 1 : proposer_uuid: {
1335 1 : let mut arr = timeline_id.as_arr();
1336 1 : arr.reverse();
1337 1 : arr
1338 1 : },
1339 1 : timeline_start_lsn: Lsn(0x12345600),
1340 1 : local_start_lsn: Lsn(0x12),
1341 1 : commit_lsn: Lsn(1234567800),
1342 1 : backup_lsn: Lsn(1234567300),
1343 1 : peer_horizon_lsn: Lsn(9999999),
1344 1 : remote_consistent_lsn: Lsn(1234560000),
1345 1 : peers: PersistedPeers(vec![(
1346 1 : NodeId(1),
1347 1 : PersistedPeerInfo {
1348 1 : backup_lsn: Lsn(1234567000),
1349 1 : term: 42,
1350 1 : flush_lsn: Lsn(1234567800 - 8),
1351 1 : commit_lsn: Lsn(1234567600),
1352 1 : },
1353 1 : )]),
1354 1 : partial_backup: crate::wal_backup_partial::State::default(),
1355 1 : eviction_state: EvictionState::Present,
1356 1 : };
1357 1 :
1358 1 : let ser = state.ser().unwrap();
1359 1 :
1360 1 : #[rustfmt::skip]
1361 1 : let expected = [
1362 1 : // tenant_id as length prefixed hex
1363 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1364 1 : 0x63, 0x66, 0x30, 0x34, 0x38, 0x30, 0x39, 0x32, 0x39, 0x37, 0x30, 0x37, 0x65, 0x65, 0x37, 0x35, 0x33, 0x37, 0x32, 0x33, 0x33, 0x37, 0x65, 0x66, 0x61, 0x61, 0x35, 0x65, 0x63, 0x66, 0x39, 0x36,
1365 1 : // timeline_id as length prefixed hex
1366 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1367 1 : 0x31, 0x31, 0x32, 0x64, 0x65, 0x64, 0x36, 0x36, 0x34, 0x32, 0x32, 0x61, 0x61, 0x35, 0x65, 0x39, 0x35, 0x33, 0x65, 0x35, 0x34, 0x34, 0x30, 0x66, 0x61, 0x35, 0x34, 0x32, 0x37, 0x61, 0x63, 0x34,
1368 1 : // term
1369 1 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1370 1 : // length prefix
1371 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1372 1 : // unsure why this order is swapped
1373 1 : 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1374 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1375 1 : // pg_version
1376 1 : 0x0e, 0x00, 0x00, 0x00,
1377 1 : // systemid
1378 1 : 0x21, 0x43, 0x65, 0x87, 0x78, 0x56, 0x34, 0x12,
1379 1 : // wal_seg_size
1380 1 : 0x78, 0x56, 0x34, 0x12,
1381 1 : // pguuid as length prefixed hex
1382 1 : 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1383 1 : 0x63, 0x34, 0x37, 0x61, 0x34, 0x32, 0x61, 0x35, 0x30, 0x66, 0x34, 0x34, 0x65, 0x35, 0x35, 0x33, 0x65, 0x39, 0x61, 0x35, 0x32, 0x61, 0x34, 0x32, 0x36, 0x36, 0x65, 0x64, 0x32, 0x64, 0x31, 0x31,
1384 1 :
1385 1 : // timeline_start_lsn
1386 1 : 0x00, 0x56, 0x34, 0x12, 0x00, 0x00, 0x00, 0x00,
1387 1 : 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1388 1 : 0x78, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1389 1 : 0x84, 0x00, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1390 1 : 0x7f, 0x96, 0x98, 0x00, 0x00, 0x00, 0x00, 0x00,
1391 1 : 0x00, 0xe4, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1392 1 : // length prefix for persistentpeers
1393 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1394 1 : // nodeid
1395 1 : 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1396 1 : // backuplsn
1397 1 : 0x58, 0xff, 0x95, 0x49, 0x00, 0x00, 0x00, 0x00,
1398 1 : 0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1399 1 : 0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1400 1 : 0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
1401 1 : // partial_backup
1402 1 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1403 1 : // eviction_state
1404 1 : 0x00, 0x00, 0x00, 0x00,
1405 1 : ];
1406 1 :
1407 1 : assert_eq!(Hex(&ser), Hex(&expected));
1408 :
1409 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1410 1 :
1411 1 : assert_eq!(deser, state);
1412 1 : }
1413 : }
|