Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use anyhow::{bail, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt};
5 : use bytes::{Buf, BufMut, Bytes, BytesMut};
6 :
7 : use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
8 : use safekeeper_api::models::HotStandbyFeedback;
9 : use safekeeper_api::Term;
10 : use serde::{Deserialize, Serialize};
11 : use std::cmp::max;
12 : use std::cmp::min;
13 : use std::fmt;
14 : use std::io::Read;
15 : use storage_broker::proto::SafekeeperTimelineInfo;
16 :
17 : use tracing::*;
18 :
19 : use crate::control_file;
20 : use crate::metrics::MISC_OPERATION_SECONDS;
21 :
22 : use crate::state::TimelineState;
23 : use crate::wal_storage;
24 : use pq_proto::SystemId;
25 : use utils::pageserver_feedback::PageserverFeedback;
26 : use utils::{
27 : bin_ser::LeSer,
28 : id::{NodeId, TenantId, TimelineId},
29 : lsn::Lsn,
30 : };
31 :
32 : pub const SK_PROTOCOL_VERSION: u32 = 2;
33 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
34 :
35 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
36 : pub struct TermLsn {
37 : pub term: Term,
38 : pub lsn: Lsn,
39 : }
40 :
41 : // Creation from tuple provides less typing (e.g. for unit tests).
42 : impl From<(Term, Lsn)> for TermLsn {
43 1230 : fn from(pair: (Term, Lsn)) -> TermLsn {
44 1230 : TermLsn {
45 1230 : term: pair.0,
46 1230 : lsn: pair.1,
47 1230 : }
48 1230 : }
49 : }
50 :
51 0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
52 : pub struct TermHistory(pub Vec<TermLsn>);
53 :
54 : impl TermHistory {
55 1440 : pub fn empty() -> TermHistory {
56 1440 : TermHistory(Vec::new())
57 1440 : }
58 :
59 : // Parse TermHistory as n_entries followed by TermLsn pairs
60 832 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
61 832 : if bytes.remaining() < 4 {
62 0 : bail!("TermHistory misses len");
63 832 : }
64 832 : let n_entries = bytes.get_u32_le();
65 832 : let mut res = Vec::with_capacity(n_entries as usize);
66 832 : for _ in 0..n_entries {
67 5704 : if bytes.remaining() < 16 {
68 0 : bail!("TermHistory is incomplete");
69 5704 : }
70 5704 : res.push(TermLsn {
71 5704 : term: bytes.get_u64_le(),
72 5704 : lsn: bytes.get_u64_le().into(),
73 5704 : })
74 : }
75 832 : Ok(TermHistory(res))
76 832 : }
77 :
78 : /// Return copy of self with switches happening strictly after up_to
79 : /// truncated.
80 5420 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
81 5420 : let mut res = Vec::with_capacity(self.0.len());
82 21727 : for e in &self.0 {
83 16315 : if e.lsn > up_to {
84 8 : break;
85 16307 : }
86 16307 : res.push(*e);
87 : }
88 5420 : TermHistory(res)
89 5420 : }
90 :
91 : /// Find point of divergence between leader (walproposer) term history and
92 : /// safekeeper. Arguments are not symmetric as proposer history ends at
93 : /// +infinity while safekeeper at flush_lsn.
94 : /// C version is at walproposer SendProposerElected.
95 841 : pub fn find_highest_common_point(
96 841 : prop_th: &TermHistory,
97 841 : sk_th: &TermHistory,
98 841 : sk_wal_end: Lsn,
99 841 : ) -> Option<TermLsn> {
100 841 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
101 :
102 841 : if let Some(sk_th_last) = sk_th.last() {
103 671 : assert!(
104 671 : sk_th_last.lsn <= sk_wal_end,
105 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
106 : sk_th_last,
107 : sk_wal_end
108 : );
109 170 : }
110 :
111 : // find last common term, if any...
112 841 : let mut last_common_idx = None;
113 4814 : for i in 0..min(sk_th.len(), prop_th.len()) {
114 4814 : if prop_th[i].term != sk_th[i].term {
115 10 : break;
116 4804 : }
117 4804 : // If term is the same, LSN must be equal as well.
118 4804 : assert!(
119 4804 : prop_th[i].lsn == sk_th[i].lsn,
120 0 : "same term {} has different start LSNs: prop {}, sk {}",
121 0 : prop_th[i].term,
122 0 : prop_th[i].lsn,
123 0 : sk_th[i].lsn
124 : );
125 4804 : last_common_idx = Some(i);
126 : }
127 841 : let last_common_idx = last_common_idx?;
128 : // Now find where it ends at both prop and sk and take min. End of
129 : // (common) term is the start of the next except it is the last one;
130 : // there it is flush_lsn in case of safekeeper or, in case of proposer
131 : // +infinity, so we just take flush_lsn then.
132 669 : if last_common_idx == prop_th.len() - 1 {
133 63 : Some(TermLsn {
134 63 : term: prop_th[last_common_idx].term,
135 63 : lsn: sk_wal_end,
136 63 : })
137 : } else {
138 606 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
139 606 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
140 8 : sk_th[last_common_idx + 1].lsn
141 : } else {
142 598 : sk_wal_end
143 : };
144 606 : Some(TermLsn {
145 606 : term: prop_th[last_common_idx].term,
146 606 : lsn: min(prop_common_term_end, sk_common_term_end),
147 606 : })
148 : }
149 841 : }
150 : }
151 :
152 : /// Display only latest entries for Debug.
153 : impl fmt::Debug for TermHistory {
154 371 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
155 371 : let n_printed = 20;
156 371 : write!(
157 371 : fmt,
158 371 : "{}{:?}",
159 371 : if self.0.len() > n_printed { "... " } else { "" },
160 371 : self.0
161 371 : .iter()
162 371 : .rev()
163 371 : .take(n_printed)
164 1321 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
165 371 : .collect::<Vec<_>>()
166 371 : )
167 371 : }
168 : }
169 :
170 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
171 : pub type PgUuid = [u8; 16];
172 :
173 : /// Persistent consensus state of the acceptor.
174 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
175 : pub struct AcceptorState {
176 : /// acceptor's last term it voted for (advanced in 1 phase)
177 : pub term: Term,
178 : /// History of term switches for safekeeper's WAL.
179 : /// Actually it often goes *beyond* WAL contents as we adopt term history
180 : /// from the proposer before recovery.
181 : pub term_history: TermHistory,
182 : }
183 :
184 : impl AcceptorState {
185 : /// acceptor's last_log_term is the term of the highest entry in the log
186 1275 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
187 1275 : let th = self.term_history.up_to(flush_lsn);
188 1275 : match th.0.last() {
189 1269 : Some(e) => e.term,
190 6 : None => 0,
191 : }
192 1275 : }
193 : }
194 :
195 : // protocol messages
196 :
197 : /// Initial Proposer -> Acceptor message
198 0 : #[derive(Debug, Deserialize)]
199 : pub struct ProposerGreeting {
200 : /// proposer-acceptor protocol version
201 : pub protocol_version: u32,
202 : /// Postgres server version
203 : pub pg_version: u32,
204 : pub proposer_id: PgUuid,
205 : pub system_id: SystemId,
206 : pub timeline_id: TimelineId,
207 : pub tenant_id: TenantId,
208 : pub tli: TimeLineID,
209 : pub wal_seg_size: u32,
210 : }
211 :
212 : /// Acceptor -> Proposer initial response: the highest term known to me
213 : /// (acceptor voted for).
214 : #[derive(Debug, Serialize)]
215 : pub struct AcceptorGreeting {
216 : term: u64,
217 : node_id: NodeId,
218 : }
219 :
220 : /// Vote request sent from proposer to safekeepers
221 0 : #[derive(Debug, Deserialize)]
222 : pub struct VoteRequest {
223 : pub term: Term,
224 : }
225 :
226 : /// Vote itself, sent from safekeeper to proposer
227 : #[derive(Debug, Serialize)]
228 : pub struct VoteResponse {
229 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
230 : vote_given: u64, // fixme u64 due to padding
231 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
232 : // proposer to choose the most advanced one.
233 : pub flush_lsn: Lsn,
234 : truncate_lsn: Lsn,
235 : pub term_history: TermHistory,
236 : timeline_start_lsn: Lsn,
237 : }
238 :
239 : /*
240 : * Proposer -> Acceptor message announcing proposer is elected and communicating
241 : * term history to it.
242 : */
243 : #[derive(Debug)]
244 : pub struct ProposerElected {
245 : pub term: Term,
246 : pub start_streaming_at: Lsn,
247 : pub term_history: TermHistory,
248 : pub timeline_start_lsn: Lsn,
249 : }
250 :
251 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
252 : /// communicates commit_lsn.
253 : #[derive(Debug)]
254 : pub struct AppendRequest {
255 : pub h: AppendRequestHeader,
256 : pub wal_data: Bytes,
257 : }
258 0 : #[derive(Debug, Clone, Deserialize)]
259 : pub struct AppendRequestHeader {
260 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
261 : pub term: Term,
262 : // TODO: remove this field from the protocol, it in unused -- LSN of term
263 : // switch can be taken from ProposerElected (as well as from term history).
264 : pub term_start_lsn: Lsn,
265 : /// start position of message in WAL
266 : pub begin_lsn: Lsn,
267 : /// end position of message in WAL
268 : pub end_lsn: Lsn,
269 : /// LSN committed by quorum of safekeepers
270 : pub commit_lsn: Lsn,
271 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
272 : pub truncate_lsn: Lsn,
273 : // only for logging/debugging
274 : pub proposer_uuid: PgUuid,
275 : }
276 :
277 : /// Report safekeeper state to proposer
278 : #[derive(Debug, Serialize, Clone)]
279 : pub struct AppendResponse {
280 : // Current term of the safekeeper; if it is higher than proposer's, the
281 : // compute is out of date.
282 : pub term: Term,
283 : // Flushed end of wal on safekeeper; one should be always mindful from what
284 : // term history this value comes, either checking history directly or
285 : // observing term being set to one for which WAL truncation is known to have
286 : // happened.
287 : pub flush_lsn: Lsn,
288 : // We report back our awareness about which WAL is committed, as this is
289 : // a criterion for walproposer --sync mode exit
290 : pub commit_lsn: Lsn,
291 : pub hs_feedback: HotStandbyFeedback,
292 : pub pageserver_feedback: Option<PageserverFeedback>,
293 : }
294 :
295 : impl AppendResponse {
296 0 : fn term_only(term: Term) -> AppendResponse {
297 0 : AppendResponse {
298 0 : term,
299 0 : flush_lsn: Lsn(0),
300 0 : commit_lsn: Lsn(0),
301 0 : hs_feedback: HotStandbyFeedback::empty(),
302 0 : pageserver_feedback: None,
303 0 : }
304 0 : }
305 : }
306 :
307 : /// Proposer -> Acceptor messages
308 : #[derive(Debug)]
309 : pub enum ProposerAcceptorMessage {
310 : Greeting(ProposerGreeting),
311 : VoteRequest(VoteRequest),
312 : Elected(ProposerElected),
313 : AppendRequest(AppendRequest),
314 : NoFlushAppendRequest(AppendRequest),
315 : FlushWAL,
316 : }
317 :
318 : impl ProposerAcceptorMessage {
319 : /// Parse proposer message.
320 26580 : pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
321 26580 : if proto_version != SK_PROTOCOL_VERSION {
322 0 : bail!(
323 0 : "incompatible protocol version {}, expected {}",
324 0 : proto_version,
325 0 : SK_PROTOCOL_VERSION
326 0 : );
327 26580 : }
328 26580 : // xxx using Reader is inefficient but easy to work with bincode
329 26580 : let mut stream = msg_bytes.reader();
330 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
331 26580 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
332 26580 : match tag {
333 : 'g' => {
334 19090 : let msg = ProposerGreeting::des_from(&mut stream)?;
335 19090 : Ok(ProposerAcceptorMessage::Greeting(msg))
336 : }
337 : 'v' => {
338 3306 : let msg = VoteRequest::des_from(&mut stream)?;
339 3306 : Ok(ProposerAcceptorMessage::VoteRequest(msg))
340 : }
341 : 'e' => {
342 832 : let mut msg_bytes = stream.into_inner();
343 832 : if msg_bytes.remaining() < 16 {
344 0 : bail!("ProposerElected message is not complete");
345 832 : }
346 832 : let term = msg_bytes.get_u64_le();
347 832 : let start_streaming_at = msg_bytes.get_u64_le().into();
348 832 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
349 832 : if msg_bytes.remaining() < 8 {
350 0 : bail!("ProposerElected message is not complete");
351 832 : }
352 832 : let timeline_start_lsn = msg_bytes.get_u64_le().into();
353 832 : let msg = ProposerElected {
354 832 : term,
355 832 : start_streaming_at,
356 832 : timeline_start_lsn,
357 832 : term_history,
358 832 : };
359 832 : Ok(ProposerAcceptorMessage::Elected(msg))
360 : }
361 : 'a' => {
362 : // read header followed by wal data
363 3352 : let hdr = AppendRequestHeader::des_from(&mut stream)?;
364 3352 : let rec_size = hdr
365 3352 : .end_lsn
366 3352 : .checked_sub(hdr.begin_lsn)
367 3352 : .context("begin_lsn > end_lsn in AppendRequest")?
368 : .0 as usize;
369 3352 : if rec_size > MAX_SEND_SIZE {
370 0 : bail!(
371 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
372 0 : MAX_SEND_SIZE
373 0 : );
374 3352 : }
375 3352 :
376 3352 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
377 3352 : stream.read_exact(&mut wal_data_vec)?;
378 3352 : let wal_data = Bytes::from(wal_data_vec);
379 3352 : let msg = AppendRequest { h: hdr, wal_data };
380 3352 :
381 3352 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
382 : }
383 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
384 : }
385 26580 : }
386 :
387 : /// The memory size of the message, including byte slices.
388 600 : pub fn size(&self) -> usize {
389 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
390 :
391 : // For most types, the size is just the base enum size including the nested structs. Some
392 : // types also contain byte slices; add them.
393 : //
394 : // We explicitly list all fields, to draw attention here when new fields are added.
395 600 : let mut size = BASE_SIZE;
396 600 : size += match self {
397 : Self::Greeting(ProposerGreeting {
398 : protocol_version: _,
399 : pg_version: _,
400 : proposer_id: _,
401 : system_id: _,
402 : timeline_id: _,
403 : tenant_id: _,
404 : tli: _,
405 : wal_seg_size: _,
406 0 : }) => 0,
407 :
408 0 : Self::VoteRequest(VoteRequest { term: _ }) => 0,
409 :
410 : Self::Elected(ProposerElected {
411 : term: _,
412 : start_streaming_at: _,
413 : term_history: _,
414 : timeline_start_lsn: _,
415 0 : }) => 0,
416 :
417 : Self::AppendRequest(AppendRequest {
418 : h:
419 : AppendRequestHeader {
420 : term: _,
421 : term_start_lsn: _,
422 : begin_lsn: _,
423 : end_lsn: _,
424 : commit_lsn: _,
425 : truncate_lsn: _,
426 : proposer_uuid: _,
427 : },
428 600 : wal_data,
429 600 : }) => wal_data.len(),
430 :
431 : Self::NoFlushAppendRequest(AppendRequest {
432 : h:
433 : AppendRequestHeader {
434 : term: _,
435 : term_start_lsn: _,
436 : begin_lsn: _,
437 : end_lsn: _,
438 : commit_lsn: _,
439 : truncate_lsn: _,
440 : proposer_uuid: _,
441 : },
442 0 : wal_data,
443 0 : }) => wal_data.len(),
444 :
445 0 : Self::FlushWAL => 0,
446 : };
447 :
448 600 : size
449 600 : }
450 : }
451 :
452 : /// Acceptor -> Proposer messages
453 : #[derive(Debug)]
454 : pub enum AcceptorProposerMessage {
455 : Greeting(AcceptorGreeting),
456 : VoteResponse(VoteResponse),
457 : AppendResponse(AppendResponse),
458 : }
459 :
460 : impl AcceptorProposerMessage {
461 : /// Serialize acceptor -> proposer message.
462 25192 : pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
463 25192 : match self {
464 19090 : AcceptorProposerMessage::Greeting(msg) => {
465 19090 : buf.put_u64_le('g' as u64);
466 19090 : buf.put_u64_le(msg.term);
467 19090 : buf.put_u64_le(msg.node_id.0);
468 19090 : }
469 3306 : AcceptorProposerMessage::VoteResponse(msg) => {
470 3306 : buf.put_u64_le('v' as u64);
471 3306 : buf.put_u64_le(msg.term);
472 3306 : buf.put_u64_le(msg.vote_given);
473 3306 : buf.put_u64_le(msg.flush_lsn.into());
474 3306 : buf.put_u64_le(msg.truncate_lsn.into());
475 3306 : buf.put_u32_le(msg.term_history.0.len() as u32);
476 13341 : for e in &msg.term_history.0 {
477 10035 : buf.put_u64_le(e.term);
478 10035 : buf.put_u64_le(e.lsn.into());
479 10035 : }
480 3306 : buf.put_u64_le(msg.timeline_start_lsn.into());
481 : }
482 2796 : AcceptorProposerMessage::AppendResponse(msg) => {
483 2796 : buf.put_u64_le('a' as u64);
484 2796 : buf.put_u64_le(msg.term);
485 2796 : buf.put_u64_le(msg.flush_lsn.into());
486 2796 : buf.put_u64_le(msg.commit_lsn.into());
487 2796 : buf.put_i64_le(msg.hs_feedback.ts);
488 2796 : buf.put_u64_le(msg.hs_feedback.xmin);
489 2796 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
490 :
491 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
492 : // if it is not present.
493 2796 : if let Some(ref msg) = msg.pageserver_feedback {
494 0 : msg.serialize(buf);
495 2796 : }
496 : }
497 : }
498 :
499 25192 : Ok(())
500 25192 : }
501 : }
502 :
503 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
504 : /// It controls all WAL disk writes and updates of control file.
505 : ///
506 : /// Currently safekeeper processes:
507 : /// - messages from compute (proposers) and provides replies
508 : /// - messages from broker peers
509 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
510 : /// LSN since the proposer safekeeper currently talking to appends WAL;
511 : /// determines last_log_term switch point.
512 : pub term_start_lsn: Lsn,
513 :
514 : pub state: TimelineState<CTRL>, // persistent state storage
515 : pub wal_store: WAL,
516 :
517 : node_id: NodeId, // safekeeper's node id
518 : }
519 :
520 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
521 : where
522 : CTRL: control_file::Storage,
523 : WAL: wal_storage::Storage,
524 : {
525 : /// Accepts a control file storage containing the safekeeper state.
526 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
527 : /// and `server` (`wal_seg_size` inside it) fields.
528 8439 : pub fn new(
529 8439 : state: TimelineState<CTRL>,
530 8439 : wal_store: WAL,
531 8439 : node_id: NodeId,
532 8439 : ) -> Result<SafeKeeper<CTRL, WAL>> {
533 8439 : if state.tenant_id == TenantId::from([0u8; 16])
534 8439 : || state.timeline_id == TimelineId::from([0u8; 16])
535 : {
536 0 : bail!(
537 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
538 0 : state.tenant_id,
539 0 : state.timeline_id
540 0 : );
541 8439 : }
542 8439 :
543 8439 : Ok(SafeKeeper {
544 8439 : term_start_lsn: Lsn(0),
545 8439 : state,
546 8439 : wal_store,
547 8439 : node_id,
548 8439 : })
549 8439 : }
550 :
551 : /// Get history of term switches for the available WAL
552 4145 : fn get_term_history(&self) -> TermHistory {
553 4145 : self.state
554 4145 : .acceptor_state
555 4145 : .term_history
556 4145 : .up_to(self.flush_lsn())
557 4145 : }
558 :
559 43 : pub fn get_last_log_term(&self) -> Term {
560 43 : self.state
561 43 : .acceptor_state
562 43 : .get_last_log_term(self.flush_lsn())
563 43 : }
564 :
565 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
566 18334 : pub fn flush_lsn(&self) -> Lsn {
567 18334 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
568 18334 : }
569 :
570 : /// Process message from proposer and possibly form reply. Concurrent
571 : /// callers must exclude each other.
572 30587 : pub async fn process_msg(
573 30587 : &mut self,
574 30587 : msg: &ProposerAcceptorMessage,
575 30587 : ) -> Result<Option<AcceptorProposerMessage>> {
576 30587 : match msg {
577 19090 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
578 3308 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
579 837 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
580 4 : ProposerAcceptorMessage::AppendRequest(msg) => {
581 4 : self.handle_append_request(msg, true).await
582 : }
583 3952 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
584 3952 : self.handle_append_request(msg, false).await
585 : }
586 3396 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
587 : }
588 30587 : }
589 :
590 : /// Handle initial message from proposer: check its sanity and send my
591 : /// current term.
592 19090 : async fn handle_greeting(
593 19090 : &mut self,
594 19090 : msg: &ProposerGreeting,
595 19090 : ) -> Result<Option<AcceptorProposerMessage>> {
596 19090 : // Check protocol compatibility
597 19090 : if msg.protocol_version != SK_PROTOCOL_VERSION {
598 0 : bail!(
599 0 : "incompatible protocol version {}, expected {}",
600 0 : msg.protocol_version,
601 0 : SK_PROTOCOL_VERSION
602 0 : );
603 19090 : }
604 19090 : /* Postgres major version mismatch is treated as fatal error
605 19090 : * because safekeepers parse WAL headers and the format
606 19090 : * may change between versions.
607 19090 : */
608 19090 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
609 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
610 : {
611 0 : bail!(
612 0 : "incompatible server version {}, expected {}",
613 0 : msg.pg_version,
614 0 : self.state.server.pg_version
615 0 : );
616 19090 : }
617 19090 :
618 19090 : if msg.tenant_id != self.state.tenant_id {
619 0 : bail!(
620 0 : "invalid tenant ID, got {}, expected {}",
621 0 : msg.tenant_id,
622 0 : self.state.tenant_id
623 0 : );
624 19090 : }
625 19090 : if msg.timeline_id != self.state.timeline_id {
626 0 : bail!(
627 0 : "invalid timeline ID, got {}, expected {}",
628 0 : msg.timeline_id,
629 0 : self.state.timeline_id
630 0 : );
631 19090 : }
632 19090 : if self.state.server.wal_seg_size != msg.wal_seg_size {
633 0 : bail!(
634 0 : "invalid wal_seg_size, got {}, expected {}",
635 0 : msg.wal_seg_size,
636 0 : self.state.server.wal_seg_size
637 0 : );
638 19090 : }
639 19090 :
640 19090 : // system_id will be updated on mismatch
641 19090 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
642 19090 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
643 0 : if self.state.server.system_id != 0 {
644 0 : warn!(
645 0 : "unexpected system ID arrived, got {}, expected {}",
646 0 : msg.system_id, self.state.server.system_id
647 : );
648 0 : }
649 :
650 0 : let mut state = self.state.start_change();
651 0 : state.server.system_id = msg.system_id;
652 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
653 0 : state.server.pg_version = msg.pg_version;
654 0 : }
655 0 : self.state.finish_change(&state).await?;
656 19090 : }
657 :
658 19090 : info!(
659 0 : "processed greeting from walproposer {}, sending term {:?}",
660 2416 : msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
661 0 : self.state.acceptor_state.term
662 : );
663 19090 : Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
664 19090 : term: self.state.acceptor_state.term,
665 19090 : node_id: self.node_id,
666 19090 : })))
667 19090 : }
668 :
669 : /// Give vote for the given term, if we haven't done that previously.
670 3308 : async fn handle_vote_request(
671 3308 : &mut self,
672 3308 : msg: &VoteRequest,
673 3308 : ) -> Result<Option<AcceptorProposerMessage>> {
674 3308 : // Once voted, we won't accept data from older proposers; flush
675 3308 : // everything we've already received so that new proposer starts
676 3308 : // streaming at end of our WAL, without overlap. Currently we truncate
677 3308 : // WAL at streaming point, so this avoids truncating already committed
678 3308 : // WAL.
679 3308 : //
680 3308 : // TODO: it would be smoother to not truncate committed piece at
681 3308 : // handle_elected instead. Currently not a big deal, as proposer is the
682 3308 : // only source of WAL; with peer2peer recovery it would be more
683 3308 : // important.
684 3308 : self.wal_store.flush_wal().await?;
685 : // initialize with refusal
686 3308 : let mut resp = VoteResponse {
687 3308 : term: self.state.acceptor_state.term,
688 3308 : vote_given: false as u64,
689 3308 : flush_lsn: self.flush_lsn(),
690 3308 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
691 3308 : term_history: self.get_term_history(),
692 3308 : timeline_start_lsn: self.state.timeline_start_lsn,
693 3308 : };
694 3308 : if self.state.acceptor_state.term < msg.term {
695 3132 : let mut state = self.state.start_change();
696 3132 : state.acceptor_state.term = msg.term;
697 3132 : // persist vote before sending it out
698 3132 : self.state.finish_change(&state).await?;
699 :
700 3132 : resp.term = self.state.acceptor_state.term;
701 3132 : resp.vote_given = true as u64;
702 176 : }
703 3308 : info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
704 3308 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
705 3308 : }
706 :
707 : /// Form AppendResponse from current state.
708 3399 : fn append_response(&self) -> AppendResponse {
709 3399 : let ar = AppendResponse {
710 3399 : term: self.state.acceptor_state.term,
711 3399 : flush_lsn: self.flush_lsn(),
712 3399 : commit_lsn: self.state.commit_lsn,
713 3399 : // will be filled by the upper code to avoid bothering safekeeper
714 3399 : hs_feedback: HotStandbyFeedback::empty(),
715 3399 : pageserver_feedback: None,
716 3399 : };
717 3399 : trace!("formed AppendResponse {:?}", ar);
718 3399 : ar
719 3399 : }
720 :
721 837 : async fn handle_elected(
722 837 : &mut self,
723 837 : msg: &ProposerElected,
724 837 : ) -> Result<Option<AcceptorProposerMessage>> {
725 837 : let _timer = MISC_OPERATION_SECONDS
726 837 : .with_label_values(&["handle_elected"])
727 837 : .start_timer();
728 837 :
729 837 : info!(
730 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
731 0 : msg,
732 0 : self.state.acceptor_state.term,
733 0 : self.get_last_log_term(),
734 0 : self.flush_lsn()
735 : );
736 837 : if self.state.acceptor_state.term < msg.term {
737 5 : let mut state = self.state.start_change();
738 5 : state.acceptor_state.term = msg.term;
739 5 : self.state.finish_change(&state).await?;
740 832 : }
741 :
742 : // If our term is higher, ignore the message (next feedback will inform the compute)
743 837 : if self.state.acceptor_state.term > msg.term {
744 0 : return Ok(None);
745 837 : }
746 837 :
747 837 : // Before truncating WAL check-cross the check divergence point received
748 837 : // from the walproposer.
749 837 : let sk_th = self.get_term_history();
750 837 : let last_common_point = match TermHistory::find_highest_common_point(
751 837 : &msg.term_history,
752 837 : &sk_th,
753 837 : self.flush_lsn(),
754 837 : ) {
755 : // No common point. Expect streaming from the beginning of the
756 : // history like walproposer while we don't have proper init.
757 171 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
758 171 : "empty walproposer term history {:?}",
759 171 : msg.term_history
760 171 : ))?,
761 666 : Some(lcp) => lcp,
762 : };
763 : // This is expected to happen in a rare race when another connection
764 : // from the same walproposer writes + flushes WAL after this connection
765 : // sent flush_lsn in VoteRequest; for instance, very late
766 : // ProposerElected message delivery after another connection was
767 : // established and wrote WAL. In such cases error is transient;
768 : // reconnection makes safekeeper send newest term history and flush_lsn
769 : // and walproposer recalculates the streaming point. OTOH repeating
770 : // error indicates a serious bug.
771 837 : if last_common_point.lsn != msg.start_streaming_at {
772 0 : bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
773 0 : last_common_point, msg.start_streaming_at,
774 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
775 0 : );
776 837 : }
777 837 :
778 837 : // We are also expected to never attempt to truncate committed data.
779 837 : assert!(
780 837 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
781 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
782 0 : msg.start_streaming_at, self.state.inmem.commit_lsn,
783 0 : self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
784 : );
785 :
786 : // Before first WAL write initialize its segment. It makes first segment
787 : // pg_waldump'able because stream from compute doesn't include its
788 : // segment and page headers.
789 : //
790 : // If we fail before first WAL write flush this action would be
791 : // repeated, that's ok because it is idempotent.
792 837 : if self.wal_store.flush_lsn() == Lsn::INVALID {
793 170 : self.wal_store
794 170 : .initialize_first_segment(msg.start_streaming_at)
795 170 : .await?;
796 667 : }
797 :
798 : // truncate wal, update the LSNs
799 837 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
800 :
801 : // and now adopt term history from proposer
802 : {
803 837 : let mut state = self.state.start_change();
804 837 :
805 837 : // Here we learn initial LSN for the first time, set fields
806 837 : // interested in that.
807 837 :
808 837 : if state.timeline_start_lsn == Lsn(0) {
809 : // Remember point where WAL begins globally.
810 170 : state.timeline_start_lsn = msg.timeline_start_lsn;
811 170 : info!(
812 0 : "setting timeline_start_lsn to {:?}",
813 : state.timeline_start_lsn
814 : );
815 667 : }
816 837 : if state.peer_horizon_lsn == Lsn(0) {
817 170 : // Update peer_horizon_lsn as soon as we know where timeline starts.
818 170 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
819 170 : state.peer_horizon_lsn = msg.timeline_start_lsn;
820 667 : }
821 837 : if state.local_start_lsn == Lsn(0) {
822 170 : state.local_start_lsn = msg.start_streaming_at;
823 170 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
824 667 : }
825 : // Initializing commit_lsn before acking first flushed record is
826 : // important to let find_end_of_wal skip the hole in the beginning
827 : // of the first segment.
828 : //
829 : // NB: on new clusters, this happens at the same time as
830 : // timeline_start_lsn initialization, it is taken outside to provide
831 : // upgrade.
832 837 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
833 837 :
834 837 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
835 837 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
836 837 : // similar for remote_consistent_lsn
837 837 : state.remote_consistent_lsn =
838 837 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
839 837 :
840 837 : state.acceptor_state.term_history = msg.term_history.clone();
841 837 : self.state.finish_change(&state).await?;
842 : }
843 :
844 837 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
845 :
846 : // Cache LSN where term starts to immediately fsync control file with
847 : // commit_lsn once we reach it -- sync-safekeepers finishes when
848 : // persisted commit_lsn on majority of safekeepers aligns.
849 837 : self.term_start_lsn = match msg.term_history.0.last() {
850 0 : None => bail!("proposer elected with empty term history"),
851 837 : Some(term_lsn_start) => term_lsn_start.lsn,
852 837 : };
853 837 :
854 837 : Ok(None)
855 837 : }
856 :
857 : /// Advance commit_lsn taking into account what we have locally.
858 : ///
859 : /// Note: it is assumed that 'WAL we have is from the right term' check has
860 : /// already been done outside.
861 2609 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
862 2609 : // Both peers and walproposer communicate this value, we might already
863 2609 : // have a fresher (higher) version.
864 2609 : candidate = max(candidate, self.state.inmem.commit_lsn);
865 2609 : let commit_lsn = min(candidate, self.flush_lsn());
866 2609 : assert!(
867 2609 : commit_lsn >= self.state.inmem.commit_lsn,
868 0 : "commit_lsn monotonicity violated: old={} new={}",
869 : self.state.inmem.commit_lsn,
870 : commit_lsn
871 : );
872 :
873 2609 : self.state.inmem.commit_lsn = commit_lsn;
874 2609 :
875 2609 : // If new commit_lsn reached term switch, force sync of control
876 2609 : // file: walproposer in sync mode is very interested when this
877 2609 : // happens. Note: this is for sync-safekeepers mode only, as
878 2609 : // otherwise commit_lsn might jump over term_start_lsn.
879 2609 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
880 124 : self.state.flush().await?;
881 2485 : }
882 :
883 2609 : Ok(())
884 2609 : }
885 :
886 : /// Handle request to append WAL.
887 : #[allow(clippy::comparison_chain)]
888 3956 : async fn handle_append_request(
889 3956 : &mut self,
890 3956 : msg: &AppendRequest,
891 3956 : require_flush: bool,
892 3956 : ) -> Result<Option<AcceptorProposerMessage>> {
893 3956 : if self.state.acceptor_state.term < msg.h.term {
894 0 : bail!("got AppendRequest before ProposerElected");
895 3956 : }
896 3956 :
897 3956 : // If our term is higher, immediately refuse the message.
898 3956 : if self.state.acceptor_state.term > msg.h.term {
899 0 : let resp = AppendResponse::term_only(self.state.acceptor_state.term);
900 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
901 3956 : }
902 3956 :
903 3956 : // Disallow any non-sequential writes, which can result in gaps or
904 3956 : // overwrites. If we need to move the pointer, ProposerElected message
905 3956 : // should have truncated WAL first accordingly. Note that the first
906 3956 : // condition (WAL rewrite) is quite expected in real world; it happens
907 3956 : // when walproposer reconnects to safekeeper and writes some more data
908 3956 : // while first connection still gets some packets later. It might be
909 3956 : // better to not log this as error! above.
910 3956 : let write_lsn = self.wal_store.write_lsn();
911 3956 : let flush_lsn = self.wal_store.flush_lsn();
912 3956 : if write_lsn > msg.h.begin_lsn {
913 1 : bail!(
914 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
915 1 : write_lsn,
916 1 : msg.h.begin_lsn
917 1 : );
918 3955 : }
919 3955 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
920 0 : bail!(
921 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
922 0 : write_lsn,
923 0 : msg.h.begin_lsn,
924 0 : );
925 3955 : }
926 3955 :
927 3955 : // Now we know that we are in the same term as the proposer,
928 3955 : // processing the message.
929 3955 :
930 3955 : self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
931 3955 :
932 3955 : // do the job
933 3955 : if !msg.wal_data.is_empty() {
934 1428 : self.wal_store
935 1428 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
936 1428 : .await?;
937 2527 : }
938 :
939 : // flush wal to the disk, if required
940 3955 : if require_flush {
941 3 : self.wal_store.flush_wal().await?;
942 3952 : }
943 :
944 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
945 : // manager, off of the WAL ingest hot path.
946 3955 : if msg.h.commit_lsn != Lsn(0) {
947 2609 : self.update_commit_lsn(msg.h.commit_lsn).await?;
948 1346 : }
949 : // Value calculated by walproposer can always lag:
950 : // - safekeepers can forget inmem value and send to proposer lower
951 : // persisted one on restart;
952 : // - if we make safekeepers always send persistent value,
953 : // any compute restart would pull it down.
954 : // Thus, take max before adopting.
955 3955 : self.state.inmem.peer_horizon_lsn =
956 3955 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
957 3955 :
958 3955 : trace!(
959 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
960 0 : msg.wal_data.len(),
961 : msg.h.begin_lsn,
962 : msg.h.end_lsn,
963 : msg.h.commit_lsn,
964 : msg.h.truncate_lsn,
965 : require_flush,
966 : );
967 :
968 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
969 : // This is the common case for !require_flush, but a flush can still
970 : // happen on segment bounds.
971 3955 : if !require_flush && flush_lsn == self.flush_lsn() {
972 3952 : return Ok(None);
973 3 : }
974 3 :
975 3 : let resp = self.append_response();
976 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
977 3956 : }
978 :
979 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
980 3396 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
981 3396 : self.wal_store.flush_wal().await?;
982 3396 : Ok(Some(AcceptorProposerMessage::AppendResponse(
983 3396 : self.append_response(),
984 3396 : )))
985 3396 : }
986 :
987 : /// Update commit_lsn from peer safekeeper data.
988 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
989 0 : if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
990 : // Note: the check is too restrictive, generally we can update local
991 : // commit_lsn if our history matches (is part of) history of advanced
992 : // commit_lsn provider.
993 0 : if sk_info.last_log_term == self.get_last_log_term() {
994 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
995 0 : }
996 0 : }
997 0 : Ok(())
998 0 : }
999 : }
1000 :
1001 : #[cfg(test)]
1002 : mod tests {
1003 : use futures::future::BoxFuture;
1004 :
1005 : use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
1006 : use safekeeper_api::{
1007 : membership::{Configuration, MemberSet, SafekeeperId},
1008 : ServerInfo,
1009 : };
1010 :
1011 : use super::*;
1012 : use crate::state::{EvictionState, TimelinePersistentState};
1013 : use std::{
1014 : ops::Deref,
1015 : str::FromStr,
1016 : time::{Instant, UNIX_EPOCH},
1017 : };
1018 :
1019 : // fake storage for tests
1020 : struct InMemoryState {
1021 : persisted_state: TimelinePersistentState,
1022 : }
1023 :
1024 : impl control_file::Storage for InMemoryState {
1025 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1026 5 : self.persisted_state = s.clone();
1027 5 : Ok(())
1028 5 : }
1029 :
1030 0 : fn last_persist_at(&self) -> Instant {
1031 0 : Instant::now()
1032 0 : }
1033 : }
1034 :
1035 : impl Deref for InMemoryState {
1036 : type Target = TimelinePersistentState;
1037 :
1038 83 : fn deref(&self) -> &Self::Target {
1039 83 : &self.persisted_state
1040 83 : }
1041 : }
1042 :
1043 3 : fn test_sk_state() -> TimelinePersistentState {
1044 3 : let mut state = TimelinePersistentState::empty();
1045 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1046 3 : state.tenant_id = TenantId::from([1u8; 16]);
1047 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1048 3 : state
1049 3 : }
1050 :
1051 : struct DummyWalStore {
1052 : lsn: Lsn,
1053 : }
1054 :
1055 : impl wal_storage::Storage for DummyWalStore {
1056 4 : fn write_lsn(&self) -> Lsn {
1057 4 : self.lsn
1058 4 : }
1059 :
1060 19 : fn flush_lsn(&self) -> Lsn {
1061 19 : self.lsn
1062 19 : }
1063 :
1064 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1065 2 : Ok(())
1066 2 : }
1067 :
1068 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1069 3 : self.lsn = startpos + buf.len() as u64;
1070 3 : Ok(())
1071 3 : }
1072 :
1073 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1074 2 : self.lsn = end_pos;
1075 2 : Ok(())
1076 2 : }
1077 :
1078 5 : async fn flush_wal(&mut self) -> Result<()> {
1079 5 : Ok(())
1080 5 : }
1081 :
1082 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1083 0 : Box::pin(async { Ok(()) })
1084 0 : }
1085 :
1086 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1087 0 : crate::metrics::WalStorageMetrics::default()
1088 0 : }
1089 : }
1090 :
1091 : #[tokio::test]
1092 1 : async fn test_voting() {
1093 1 : let storage = InMemoryState {
1094 1 : persisted_state: test_sk_state(),
1095 1 : };
1096 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1097 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1098 1 :
1099 1 : // check voting for 1 is ok
1100 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
1101 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1102 1 : match vote_resp.unwrap() {
1103 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
1104 1 : r => panic!("unexpected response: {:?}", r),
1105 1 : }
1106 1 :
1107 1 : // reboot...
1108 1 : let state = sk.state.deref().clone();
1109 1 : let storage = InMemoryState {
1110 1 : persisted_state: state,
1111 1 : };
1112 1 :
1113 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1114 1 :
1115 1 : // and ensure voting second time for 1 is not ok
1116 1 : vote_resp = sk.process_msg(&vote_request).await;
1117 1 : match vote_resp.unwrap() {
1118 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
1119 1 : r => panic!("unexpected response: {:?}", r),
1120 1 : }
1121 1 : }
1122 :
1123 : #[tokio::test]
1124 1 : async fn test_last_log_term_switch() {
1125 1 : let storage = InMemoryState {
1126 1 : persisted_state: test_sk_state(),
1127 1 : };
1128 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1129 1 :
1130 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1131 1 :
1132 1 : let mut ar_hdr = AppendRequestHeader {
1133 1 : term: 2,
1134 1 : term_start_lsn: Lsn(3),
1135 1 : begin_lsn: Lsn(1),
1136 1 : end_lsn: Lsn(2),
1137 1 : commit_lsn: Lsn(0),
1138 1 : truncate_lsn: Lsn(0),
1139 1 : proposer_uuid: [0; 16],
1140 1 : };
1141 1 : let mut append_request = AppendRequest {
1142 1 : h: ar_hdr.clone(),
1143 1 : wal_data: Bytes::from_static(b"b"),
1144 1 : };
1145 1 :
1146 1 : let pem = ProposerElected {
1147 1 : term: 2,
1148 1 : start_streaming_at: Lsn(1),
1149 1 : term_history: TermHistory(vec![
1150 1 : TermLsn {
1151 1 : term: 1,
1152 1 : lsn: Lsn(1),
1153 1 : },
1154 1 : TermLsn {
1155 1 : term: 2,
1156 1 : lsn: Lsn(3),
1157 1 : },
1158 1 : ]),
1159 1 : timeline_start_lsn: Lsn(1),
1160 1 : };
1161 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1162 1 : .await
1163 1 : .unwrap();
1164 1 :
1165 1 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1166 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1167 1 : .await
1168 1 : .unwrap();
1169 1 : assert_eq!(sk.get_last_log_term(), 1);
1170 1 :
1171 1 : // but record at term_start_lsn does the switch
1172 1 : ar_hdr.begin_lsn = Lsn(2);
1173 1 : ar_hdr.end_lsn = Lsn(3);
1174 1 : append_request = AppendRequest {
1175 1 : h: ar_hdr,
1176 1 : wal_data: Bytes::from_static(b"b"),
1177 1 : };
1178 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1179 1 : .await
1180 1 : .unwrap();
1181 1 : assert_eq!(sk.get_last_log_term(), 2);
1182 1 : }
1183 :
1184 : #[tokio::test]
1185 1 : async fn test_non_consecutive_write() {
1186 1 : let storage = InMemoryState {
1187 1 : persisted_state: test_sk_state(),
1188 1 : };
1189 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1190 1 :
1191 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1192 1 :
1193 1 : let pem = ProposerElected {
1194 1 : term: 1,
1195 1 : start_streaming_at: Lsn(1),
1196 1 : term_history: TermHistory(vec![TermLsn {
1197 1 : term: 1,
1198 1 : lsn: Lsn(1),
1199 1 : }]),
1200 1 : timeline_start_lsn: Lsn(1),
1201 1 : };
1202 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1203 1 : .await
1204 1 : .unwrap();
1205 1 :
1206 1 : let ar_hdr = AppendRequestHeader {
1207 1 : term: 1,
1208 1 : term_start_lsn: Lsn(3),
1209 1 : begin_lsn: Lsn(1),
1210 1 : end_lsn: Lsn(2),
1211 1 : commit_lsn: Lsn(0),
1212 1 : truncate_lsn: Lsn(0),
1213 1 : proposer_uuid: [0; 16],
1214 1 : };
1215 1 : let append_request = AppendRequest {
1216 1 : h: ar_hdr.clone(),
1217 1 : wal_data: Bytes::from_static(b"b"),
1218 1 : };
1219 1 :
1220 1 : // do write ending at 2, it should be ok
1221 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1222 1 : .await
1223 1 : .unwrap();
1224 1 : let mut ar_hrd2 = ar_hdr.clone();
1225 1 : ar_hrd2.begin_lsn = Lsn(4);
1226 1 : ar_hrd2.end_lsn = Lsn(5);
1227 1 : let append_request = AppendRequest {
1228 1 : h: ar_hdr,
1229 1 : wal_data: Bytes::from_static(b"b"),
1230 1 : };
1231 1 : // and now starting at 4, it must fail
1232 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1233 1 : .await
1234 1 : .unwrap_err();
1235 1 : }
1236 :
1237 : #[test]
1238 1 : fn test_find_highest_common_point_none() {
1239 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1240 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1241 1 : assert_eq!(
1242 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1243 1 : None
1244 1 : );
1245 1 : }
1246 :
1247 : #[test]
1248 1 : fn test_find_highest_common_point_middle() {
1249 1 : let prop_th = TermHistory(vec![
1250 1 : (1, Lsn(10)).into(),
1251 1 : (2, Lsn(20)).into(),
1252 1 : (4, Lsn(40)).into(),
1253 1 : ]);
1254 1 : let sk_th = TermHistory(vec![
1255 1 : (1, Lsn(10)).into(),
1256 1 : (2, Lsn(20)).into(),
1257 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1258 1 : ]);
1259 1 : assert_eq!(
1260 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1261 1 : Some(TermLsn {
1262 1 : term: 2,
1263 1 : lsn: Lsn(30),
1264 1 : })
1265 1 : );
1266 1 : }
1267 :
1268 : #[test]
1269 1 : fn test_find_highest_common_point_sk_end() {
1270 1 : let prop_th = TermHistory(vec![
1271 1 : (1, Lsn(10)).into(),
1272 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1273 1 : (4, Lsn(40)).into(),
1274 1 : ]);
1275 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1276 1 : assert_eq!(
1277 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1278 1 : Some(TermLsn {
1279 1 : term: 2,
1280 1 : lsn: Lsn(32),
1281 1 : })
1282 1 : );
1283 1 : }
1284 :
1285 : #[test]
1286 1 : fn test_find_highest_common_point_walprop() {
1287 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1288 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1289 1 : assert_eq!(
1290 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1291 1 : Some(TermLsn {
1292 1 : term: 2,
1293 1 : lsn: Lsn(32),
1294 1 : })
1295 1 : );
1296 1 : }
1297 :
1298 : #[test]
1299 1 : fn test_sk_state_bincode_serde_roundtrip() {
1300 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1301 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1302 1 : let state = TimelinePersistentState {
1303 1 : tenant_id,
1304 1 : timeline_id,
1305 1 : mconf: Configuration {
1306 1 : generation: 42,
1307 1 : members: MemberSet::new(vec![SafekeeperId {
1308 1 : id: NodeId(1),
1309 1 : host: "hehe.org".to_owned(),
1310 1 : pg_port: 5432,
1311 1 : }])
1312 1 : .expect("duplicate member"),
1313 1 : new_members: None,
1314 1 : },
1315 1 : acceptor_state: AcceptorState {
1316 1 : term: 42,
1317 1 : term_history: TermHistory(vec![TermLsn {
1318 1 : lsn: Lsn(0x1),
1319 1 : term: 41,
1320 1 : }]),
1321 1 : },
1322 1 : server: ServerInfo {
1323 1 : pg_version: 14,
1324 1 : system_id: 0x1234567887654321,
1325 1 : wal_seg_size: 0x12345678,
1326 1 : },
1327 1 : proposer_uuid: {
1328 1 : let mut arr = timeline_id.as_arr();
1329 1 : arr.reverse();
1330 1 : arr
1331 1 : },
1332 1 : timeline_start_lsn: Lsn(0x12345600),
1333 1 : local_start_lsn: Lsn(0x12),
1334 1 : commit_lsn: Lsn(1234567800),
1335 1 : backup_lsn: Lsn(1234567300),
1336 1 : peer_horizon_lsn: Lsn(9999999),
1337 1 : remote_consistent_lsn: Lsn(1234560000),
1338 1 : partial_backup: crate::wal_backup_partial::State::default(),
1339 1 : eviction_state: EvictionState::Present,
1340 1 : creation_ts: UNIX_EPOCH,
1341 1 : };
1342 1 :
1343 1 : let ser = state.ser().unwrap();
1344 1 :
1345 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1346 1 :
1347 1 : assert_eq!(deser, state);
1348 1 : }
1349 : }
|