Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use std::cmp::{max, min};
4 : use std::fmt;
5 : use std::io::Read;
6 : use std::str::FromStr;
7 :
8 : use anyhow::{Context, Result, bail};
9 : use byteorder::{LittleEndian, ReadBytesExt};
10 : use bytes::{Buf, BufMut, Bytes, BytesMut};
11 : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
12 : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
13 : use pq_proto::SystemId;
14 : use safekeeper_api::membership::{
15 : INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
16 : };
17 : use safekeeper_api::models::HotStandbyFeedback;
18 : use safekeeper_api::{Term, membership};
19 : use serde::{Deserialize, Serialize};
20 : use storage_broker::proto::SafekeeperTimelineInfo;
21 : use tracing::*;
22 : use utils::bin_ser::LeSer;
23 : use utils::id::{NodeId, TenantId, TimelineId};
24 : use utils::lsn::Lsn;
25 : use utils::pageserver_feedback::PageserverFeedback;
26 :
27 : use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
28 : use crate::state::TimelineState;
29 : use crate::{control_file, wal_storage};
30 :
31 : pub const SK_PROTO_VERSION_2: u32 = 2;
32 : pub const SK_PROTO_VERSION_3: u32 = 3;
33 : pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
34 :
35 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
36 : pub struct TermLsn {
37 : pub term: Term,
38 : pub lsn: Lsn,
39 : }
40 :
41 : // Creation from tuple provides less typing (e.g. for unit tests).
42 : impl From<(Term, Lsn)> for TermLsn {
43 1277 : fn from(pair: (Term, Lsn)) -> TermLsn {
44 1277 : TermLsn {
45 1277 : term: pair.0,
46 1277 : lsn: pair.1,
47 1277 : }
48 1277 : }
49 : }
50 :
51 0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
52 : pub struct TermHistory(pub Vec<TermLsn>);
53 :
54 : impl TermHistory {
55 1464 : pub fn empty() -> TermHistory {
56 1464 : TermHistory(Vec::new())
57 1464 : }
58 :
59 : // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
60 1021 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
61 1021 : let n_entries = bytes
62 1021 : .get_u32_f()
63 1021 : .with_context(|| "TermHistory misses len")?;
64 1021 : let mut res = Vec::with_capacity(n_entries as usize);
65 10761 : for i in 0..n_entries {
66 10761 : let term = bytes
67 10761 : .get_u64_f()
68 10761 : .with_context(|| format!("TermHistory pos {i} misses term"))?;
69 10761 : let lsn = bytes
70 10761 : .get_u64_f()
71 10761 : .with_context(|| format!("TermHistory pos {i} misses lsn"))?
72 10761 : .into();
73 10761 : res.push(TermLsn { term, lsn })
74 : }
75 1021 : Ok(TermHistory(res))
76 1021 : }
77 :
78 : // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
79 : // TODO remove once v2 protocol is fully dropped.
80 0 : pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
81 0 : if bytes.remaining() < 4 {
82 0 : bail!("TermHistory misses len");
83 0 : }
84 0 : let n_entries = bytes.get_u32_le();
85 0 : let mut res = Vec::with_capacity(n_entries as usize);
86 0 : for _ in 0..n_entries {
87 0 : if bytes.remaining() < 16 {
88 0 : bail!("TermHistory is incomplete");
89 0 : }
90 0 : res.push(TermLsn {
91 0 : term: bytes.get_u64_le(),
92 0 : lsn: bytes.get_u64_le().into(),
93 0 : })
94 : }
95 0 : Ok(TermHistory(res))
96 0 : }
97 :
98 : /// Return copy of self with switches happening strictly after up_to
99 : /// truncated.
100 5407 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
101 5407 : let mut res = Vec::with_capacity(self.0.len());
102 32366 : for e in &self.0 {
103 26960 : if e.lsn > up_to {
104 1 : break;
105 26959 : }
106 26959 : res.push(*e);
107 : }
108 5407 : TermHistory(res)
109 5407 : }
110 :
111 : /// Find point of divergence between leader (walproposer) term history and
112 : /// safekeeper. Arguments are not symmetric as proposer history ends at
113 : /// +infinity while safekeeper at flush_lsn.
114 : /// C version is at walproposer SendProposerElected.
115 1032 : pub fn find_highest_common_point(
116 1032 : prop_th: &TermHistory,
117 1032 : sk_th: &TermHistory,
118 1032 : sk_wal_end: Lsn,
119 1032 : ) -> Option<TermLsn> {
120 1032 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
121 :
122 1032 : if let Some(sk_th_last) = sk_th.last() {
123 895 : assert!(
124 895 : sk_th_last.lsn <= sk_wal_end,
125 0 : "safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
126 : );
127 137 : }
128 :
129 : // find last common term, if any...
130 1032 : let mut last_common_idx = None;
131 9710 : for i in 0..min(sk_th.len(), prop_th.len()) {
132 9710 : if prop_th[i].term != sk_th[i].term {
133 2 : break;
134 9708 : }
135 : // If term is the same, LSN must be equal as well.
136 9708 : assert!(
137 9708 : prop_th[i].lsn == sk_th[i].lsn,
138 0 : "same term {} has different start LSNs: prop {}, sk {}",
139 0 : prop_th[i].term,
140 0 : prop_th[i].lsn,
141 0 : sk_th[i].lsn
142 : );
143 9708 : last_common_idx = Some(i);
144 : }
145 1032 : let last_common_idx = last_common_idx?;
146 : // Now find where it ends at both prop and sk and take min. End of
147 : // (common) term is the start of the next except it is the last one;
148 : // there it is flush_lsn in case of safekeeper or, in case of proposer
149 : // +infinity, so we just take flush_lsn then.
150 894 : if last_common_idx == prop_th.len() - 1 {
151 79 : Some(TermLsn {
152 79 : term: prop_th[last_common_idx].term,
153 79 : lsn: sk_wal_end,
154 79 : })
155 : } else {
156 815 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
157 815 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
158 1 : sk_th[last_common_idx + 1].lsn
159 : } else {
160 814 : sk_wal_end
161 : };
162 815 : Some(TermLsn {
163 815 : term: prop_th[last_common_idx].term,
164 815 : lsn: min(prop_common_term_end, sk_common_term_end),
165 815 : })
166 : }
167 1032 : }
168 : }
169 :
170 : /// Display only latest entries for Debug.
171 : impl fmt::Debug for TermHistory {
172 163 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
173 163 : let n_printed = 20;
174 163 : write!(
175 163 : fmt,
176 163 : "{}{:?}",
177 163 : if self.0.len() > n_printed { "... " } else { "" },
178 163 : self.0
179 163 : .iter()
180 163 : .rev()
181 163 : .take(n_printed)
182 163 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
183 163 : .collect::<Vec<_>>()
184 : )
185 163 : }
186 : }
187 :
188 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
189 : pub type PgUuid = [u8; 16];
190 :
191 : /// Persistent consensus state of the acceptor.
192 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
193 : pub struct AcceptorState {
194 : /// acceptor's last term it voted for (advanced in 1 phase)
195 : pub term: Term,
196 : /// History of term switches for safekeeper's WAL.
197 : /// Actually it often goes *beyond* WAL contents as we adopt term history
198 : /// from the proposer before recovery.
199 : pub term_history: TermHistory,
200 : }
201 :
202 : impl AcceptorState {
203 : /// acceptor's last_log_term is the term of the highest entry in the log
204 1292 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
205 1292 : let th = self.term_history.up_to(flush_lsn);
206 1292 : match th.0.last() {
207 1292 : Some(e) => e.term,
208 0 : None => 0,
209 : }
210 1292 : }
211 : }
212 :
213 : // protocol messages
214 :
215 : /// Initial Proposer -> Acceptor message
216 0 : #[derive(Debug, Deserialize)]
217 : pub struct ProposerGreeting {
218 : pub tenant_id: TenantId,
219 : pub timeline_id: TimelineId,
220 : pub mconf: membership::Configuration,
221 : /// Postgres server version
222 : pub pg_version: PgVersionId,
223 : pub system_id: SystemId,
224 : pub wal_seg_size: u32,
225 : }
226 :
227 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
228 0 : #[derive(Debug, Deserialize)]
229 : pub struct ProposerGreetingV2 {
230 : /// proposer-acceptor protocol version
231 : pub protocol_version: u32,
232 : /// Postgres server version
233 : pub pg_version: PgVersionId,
234 : pub proposer_id: PgUuid,
235 : pub system_id: SystemId,
236 : pub timeline_id: TimelineId,
237 : pub tenant_id: TenantId,
238 : pub tli: TimeLineID,
239 : pub wal_seg_size: u32,
240 : }
241 :
242 : /// Acceptor -> Proposer initial response: the highest term known to me
243 : /// (acceptor voted for).
244 : #[derive(Debug, Serialize)]
245 : pub struct AcceptorGreeting {
246 : node_id: NodeId,
247 : mconf: membership::Configuration,
248 : term: u64,
249 : }
250 :
251 : /// Vote request sent from proposer to safekeepers
252 : #[derive(Debug)]
253 : pub struct VoteRequest {
254 : pub generation: Generation,
255 : pub term: Term,
256 : }
257 :
258 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
259 0 : #[derive(Debug, Deserialize)]
260 : pub struct VoteRequestV2 {
261 : pub term: Term,
262 : }
263 :
264 : /// Vote itself, sent from safekeeper to proposer
265 : #[derive(Debug, Serialize)]
266 : pub struct VoteResponse {
267 : generation: Generation, // membership conf generation
268 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
269 : vote_given: bool,
270 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
271 : // proposer to choose the most advanced one.
272 : pub flush_lsn: Lsn,
273 : truncate_lsn: Lsn,
274 : pub term_history: TermHistory,
275 : }
276 :
277 : /*
278 : * Proposer -> Acceptor message announcing proposer is elected and communicating
279 : * term history to it.
280 : */
281 : #[derive(Debug, Clone)]
282 : pub struct ProposerElected {
283 : pub generation: Generation, // membership conf generation
284 : pub term: Term,
285 : pub start_streaming_at: Lsn,
286 : pub term_history: TermHistory,
287 : }
288 :
289 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
290 : /// communicates commit_lsn.
291 : #[derive(Debug)]
292 : pub struct AppendRequest {
293 : pub h: AppendRequestHeader,
294 : pub wal_data: Bytes,
295 : }
296 0 : #[derive(Debug, Clone, Deserialize)]
297 : pub struct AppendRequestHeader {
298 : pub generation: Generation, // membership conf generation
299 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
300 : pub term: Term,
301 : /// start position of message in WAL
302 : pub begin_lsn: Lsn,
303 : /// end position of message in WAL
304 : pub end_lsn: Lsn,
305 : /// LSN committed by quorum of safekeepers
306 : pub commit_lsn: Lsn,
307 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
308 : pub truncate_lsn: Lsn,
309 : }
310 :
311 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
312 0 : #[derive(Debug, Clone, Deserialize)]
313 : pub struct AppendRequestHeaderV2 {
314 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
315 : pub term: Term,
316 : // TODO: remove this field from the protocol, it in unused -- LSN of term
317 : // switch can be taken from ProposerElected (as well as from term history).
318 : pub term_start_lsn: Lsn,
319 : /// start position of message in WAL
320 : pub begin_lsn: Lsn,
321 : /// end position of message in WAL
322 : pub end_lsn: Lsn,
323 : /// LSN committed by quorum of safekeepers
324 : pub commit_lsn: Lsn,
325 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
326 : pub truncate_lsn: Lsn,
327 : // only for logging/debugging
328 : pub proposer_uuid: PgUuid,
329 : }
330 :
331 : /// Report safekeeper state to proposer
332 : #[derive(Debug, Serialize, Clone)]
333 : pub struct AppendResponse {
334 : // Membership conf generation. Not strictly required because on mismatch
335 : // connection is reset, but let's sanity check it.
336 : generation: Generation,
337 : // Current term of the safekeeper; if it is higher than proposer's, the
338 : // compute is out of date.
339 : pub term: Term,
340 : // Flushed end of wal on safekeeper; one should be always mindful from what
341 : // term history this value comes, either checking history directly or
342 : // observing term being set to one for which WAL truncation is known to have
343 : // happened.
344 : pub flush_lsn: Lsn,
345 : // We report back our awareness about which WAL is committed, as this is
346 : // a criterion for walproposer --sync mode exit
347 : pub commit_lsn: Lsn,
348 : pub hs_feedback: HotStandbyFeedback,
349 : pub pageserver_feedback: Option<PageserverFeedback>,
350 : }
351 :
352 : impl AppendResponse {
353 0 : fn term_only(generation: Generation, term: Term) -> AppendResponse {
354 0 : AppendResponse {
355 0 : generation,
356 0 : term,
357 0 : flush_lsn: Lsn(0),
358 0 : commit_lsn: Lsn(0),
359 0 : hs_feedback: HotStandbyFeedback::empty(),
360 0 : pageserver_feedback: None,
361 0 : }
362 0 : }
363 : }
364 :
365 : /// Proposer -> Acceptor messages
366 : #[derive(Debug)]
367 : pub enum ProposerAcceptorMessage {
368 : Greeting(ProposerGreeting),
369 : VoteRequest(VoteRequest),
370 : Elected(ProposerElected),
371 : AppendRequest(AppendRequest),
372 : NoFlushAppendRequest(AppendRequest),
373 : FlushWAL,
374 : }
375 :
376 : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
377 : /// All reads are in network (big endian) order.
378 : trait BytesF {
379 : fn get_u8_f(&mut self) -> Result<u8>;
380 : fn get_u16_f(&mut self) -> Result<u16>;
381 : fn get_u32_f(&mut self) -> Result<u32>;
382 : fn get_u64_f(&mut self) -> Result<u64>;
383 : }
384 :
385 : impl BytesF for Bytes {
386 28750 : fn get_u8_f(&mut self) -> Result<u8> {
387 28750 : if self.is_empty() {
388 0 : bail!("no bytes left, expected 1");
389 28750 : }
390 28750 : Ok(self.get_u8())
391 28750 : }
392 0 : fn get_u16_f(&mut self) -> Result<u16> {
393 0 : if self.remaining() < 2 {
394 0 : bail!("no bytes left, expected 2");
395 0 : }
396 0 : Ok(self.get_u16())
397 0 : }
398 110419 : fn get_u32_f(&mut self) -> Result<u32> {
399 110419 : if self.remaining() < 4 {
400 0 : bail!("only {} bytes left, expected 4", self.remaining());
401 110419 : }
402 110419 : Ok(self.get_u32())
403 110419 : }
404 69221 : fn get_u64_f(&mut self) -> Result<u64> {
405 69221 : if self.remaining() < 8 {
406 0 : bail!("only {} bytes left, expected 8", self.remaining());
407 69221 : }
408 69221 : Ok(self.get_u64())
409 69221 : }
410 : }
411 :
412 : impl ProposerAcceptorMessage {
413 : /// Read cstring from Bytes.
414 40324 : fn get_cstr(buf: &mut Bytes) -> Result<String> {
415 40324 : let pos = buf
416 40324 : .iter()
417 1330692 : .position(|x| *x == 0)
418 40324 : .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
419 40324 : let result = buf.split_to(pos);
420 40324 : buf.advance(1); // drop the null terminator
421 40324 : match std::str::from_utf8(&result) {
422 40324 : Ok(s) => Ok(s.to_string()),
423 0 : Err(e) => bail!("invalid utf8 in cstring: {}", e),
424 : }
425 40324 : }
426 :
427 : /// Read membership::Configuration from Bytes.
428 20162 : fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
429 20162 : let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
430 20162 : let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
431 : // Main member set must have at least someone in valid configuration.
432 : // Empty conf is allowed until we fully migrate.
433 20162 : if generation != INVALID_GENERATION && members_len == 0 {
434 0 : bail!("empty members_len");
435 20162 : }
436 20162 : let mut members = MemberSet::empty();
437 20162 : for i in 0..members_len {
438 0 : let id = buf
439 0 : .get_u64_f()
440 0 : .with_context(|| format!("reading member {i} node_id"))?;
441 0 : let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
442 0 : let pg_port = buf
443 0 : .get_u16_f()
444 0 : .with_context(|| format!("reading member {i} port"))?;
445 0 : let sk = SafekeeperId {
446 0 : id: NodeId(id),
447 0 : host,
448 0 : pg_port,
449 0 : };
450 0 : members.add(sk)?;
451 : }
452 20162 : let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
453 : // Non joint conf.
454 20162 : if new_members_len == 0 {
455 20162 : Ok(membership::Configuration {
456 20162 : generation,
457 20162 : members,
458 20162 : new_members: None,
459 20162 : })
460 : } else {
461 0 : let mut new_members = MemberSet::empty();
462 0 : for i in 0..new_members_len {
463 0 : let id = buf
464 0 : .get_u64_f()
465 0 : .with_context(|| format!("reading new member {i} node_id"))?;
466 0 : let host =
467 0 : Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
468 0 : let pg_port = buf
469 0 : .get_u16_f()
470 0 : .with_context(|| format!("reading new member {i} port"))?;
471 0 : let sk = SafekeeperId {
472 0 : id: NodeId(id),
473 0 : host,
474 0 : pg_port,
475 0 : };
476 0 : new_members.add(sk)?;
477 : }
478 0 : Ok(membership::Configuration {
479 0 : generation,
480 0 : members,
481 0 : new_members: Some(new_members),
482 0 : })
483 : }
484 20162 : }
485 :
486 : /// Parse proposer message.
487 28750 : pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
488 28750 : if proto_version == SK_PROTO_VERSION_3 {
489 28750 : if msg_bytes.is_empty() {
490 0 : bail!("ProposerAcceptorMessage is not complete: missing tag");
491 28750 : }
492 28750 : let tag = msg_bytes.get_u8_f().with_context(|| {
493 0 : "ProposerAcceptorMessage is not complete: missing tag".to_string()
494 0 : })? as char;
495 28750 : match tag {
496 : 'g' => {
497 20162 : let tenant_id_str =
498 20162 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
499 20162 : let tenant_id = TenantId::from_str(&tenant_id_str)?;
500 20162 : let timeline_id_str =
501 20162 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
502 20162 : let timeline_id = TimelineId::from_str(&timeline_id_str)?;
503 20162 : let mconf = Self::get_mconf(&mut msg_bytes)?;
504 20162 : let pg_version = msg_bytes
505 20162 : .get_u32_f()
506 20162 : .with_context(|| "reading pg_version")?;
507 20162 : let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
508 20162 : let wal_seg_size = msg_bytes
509 20162 : .get_u32_f()
510 20162 : .with_context(|| "reading wal_seg_size")?;
511 20162 : let g = ProposerGreeting {
512 20162 : tenant_id,
513 20162 : timeline_id,
514 20162 : mconf,
515 20162 : pg_version: PgVersionId::from_full_pg_version(pg_version),
516 20162 : system_id,
517 20162 : wal_seg_size,
518 20162 : };
519 20162 : Ok(ProposerAcceptorMessage::Greeting(g))
520 : }
521 : 'v' => {
522 3085 : let generation = Generation::new(
523 3085 : msg_bytes
524 3085 : .get_u32_f()
525 3085 : .with_context(|| "reading generation")?,
526 : );
527 3085 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
528 3085 : let v = VoteRequest { generation, term };
529 3085 : Ok(ProposerAcceptorMessage::VoteRequest(v))
530 : }
531 : 'e' => {
532 1021 : let generation = Generation::new(
533 1021 : msg_bytes
534 1021 : .get_u32_f()
535 1021 : .with_context(|| "reading generation")?,
536 : );
537 1021 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
538 1021 : let start_streaming_at: Lsn = msg_bytes
539 1021 : .get_u64_f()
540 1021 : .with_context(|| "reading start_streaming_at")?
541 1021 : .into();
542 1021 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
543 1021 : let msg = ProposerElected {
544 1021 : generation,
545 1021 : term,
546 1021 : start_streaming_at,
547 1021 : term_history,
548 1021 : };
549 1021 : Ok(ProposerAcceptorMessage::Elected(msg))
550 : }
551 : 'a' => {
552 4482 : let generation = Generation::new(
553 4482 : msg_bytes
554 4482 : .get_u32_f()
555 4482 : .with_context(|| "reading generation")?,
556 : );
557 4482 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
558 4482 : let begin_lsn: Lsn = msg_bytes
559 4482 : .get_u64_f()
560 4482 : .with_context(|| "reading begin_lsn")?
561 4482 : .into();
562 4482 : let end_lsn: Lsn = msg_bytes
563 4482 : .get_u64_f()
564 4482 : .with_context(|| "reading end_lsn")?
565 4482 : .into();
566 4482 : let commit_lsn: Lsn = msg_bytes
567 4482 : .get_u64_f()
568 4482 : .with_context(|| "reading commit_lsn")?
569 4482 : .into();
570 4482 : let truncate_lsn: Lsn = msg_bytes
571 4482 : .get_u64_f()
572 4482 : .with_context(|| "reading truncate_lsn")?
573 4482 : .into();
574 4482 : let hdr = AppendRequestHeader {
575 4482 : generation,
576 4482 : term,
577 4482 : begin_lsn,
578 4482 : end_lsn,
579 4482 : commit_lsn,
580 4482 : truncate_lsn,
581 4482 : };
582 4482 : let rec_size = hdr
583 4482 : .end_lsn
584 4482 : .checked_sub(hdr.begin_lsn)
585 4482 : .context("begin_lsn > end_lsn in AppendRequest")?
586 : .0 as usize;
587 4482 : if rec_size > MAX_SEND_SIZE {
588 0 : bail!(
589 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
590 : MAX_SEND_SIZE
591 : );
592 4482 : }
593 4482 : if msg_bytes.remaining() < rec_size {
594 0 : bail!(
595 0 : "reading WAL: only {} bytes left, wanted {}",
596 0 : msg_bytes.remaining(),
597 : rec_size
598 : );
599 4482 : }
600 4482 : let wal_data = msg_bytes.copy_to_bytes(rec_size);
601 4482 : let msg = AppendRequest { h: hdr, wal_data };
602 :
603 4482 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
604 : }
605 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
606 : }
607 0 : } else if proto_version == SK_PROTO_VERSION_2 {
608 : // xxx using Reader is inefficient but easy to work with bincode
609 0 : let mut stream = msg_bytes.reader();
610 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
611 0 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
612 0 : match tag {
613 : 'g' => {
614 0 : let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
615 0 : let g = ProposerGreeting {
616 0 : tenant_id: msgv2.tenant_id,
617 0 : timeline_id: msgv2.timeline_id,
618 0 : mconf: membership::Configuration {
619 0 : generation: INVALID_GENERATION,
620 0 : members: MemberSet::empty(),
621 0 : new_members: None,
622 0 : },
623 0 : pg_version: msgv2.pg_version,
624 0 : system_id: msgv2.system_id,
625 0 : wal_seg_size: msgv2.wal_seg_size,
626 0 : };
627 0 : Ok(ProposerAcceptorMessage::Greeting(g))
628 : }
629 : 'v' => {
630 0 : let msg = VoteRequestV2::des_from(&mut stream)?;
631 0 : let v = VoteRequest {
632 0 : generation: INVALID_GENERATION,
633 0 : term: msg.term,
634 0 : };
635 0 : Ok(ProposerAcceptorMessage::VoteRequest(v))
636 : }
637 : 'e' => {
638 0 : let mut msg_bytes = stream.into_inner();
639 0 : if msg_bytes.remaining() < 16 {
640 0 : bail!("ProposerElected message is not complete");
641 0 : }
642 0 : let term = msg_bytes.get_u64_le();
643 0 : let start_streaming_at = msg_bytes.get_u64_le().into();
644 0 : let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
645 0 : if msg_bytes.remaining() < 8 {
646 0 : bail!("ProposerElected message is not complete");
647 0 : }
648 0 : let _timeline_start_lsn = msg_bytes.get_u64_le();
649 0 : let msg = ProposerElected {
650 0 : generation: INVALID_GENERATION,
651 0 : term,
652 0 : start_streaming_at,
653 0 : term_history,
654 0 : };
655 0 : Ok(ProposerAcceptorMessage::Elected(msg))
656 : }
657 : 'a' => {
658 : // read header followed by wal data
659 0 : let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
660 0 : let hdr = AppendRequestHeader {
661 0 : generation: INVALID_GENERATION,
662 0 : term: hdrv2.term,
663 0 : begin_lsn: hdrv2.begin_lsn,
664 0 : end_lsn: hdrv2.end_lsn,
665 0 : commit_lsn: hdrv2.commit_lsn,
666 0 : truncate_lsn: hdrv2.truncate_lsn,
667 0 : };
668 0 : let rec_size = hdr
669 0 : .end_lsn
670 0 : .checked_sub(hdr.begin_lsn)
671 0 : .context("begin_lsn > end_lsn in AppendRequest")?
672 : .0 as usize;
673 0 : if rec_size > MAX_SEND_SIZE {
674 0 : bail!(
675 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
676 : MAX_SEND_SIZE
677 : );
678 0 : }
679 :
680 0 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
681 0 : stream.read_exact(&mut wal_data_vec)?;
682 0 : let wal_data = Bytes::from(wal_data_vec);
683 :
684 0 : let msg = AppendRequest { h: hdr, wal_data };
685 :
686 0 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
687 : }
688 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
689 : }
690 : } else {
691 0 : bail!("unsupported protocol version {}", proto_version);
692 : }
693 28750 : }
694 :
695 : /// The memory size of the message, including byte slices.
696 620 : pub fn size(&self) -> usize {
697 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
698 :
699 : // For most types, the size is just the base enum size including the nested structs. Some
700 : // types also contain byte slices; add them.
701 : //
702 : // We explicitly list all fields, to draw attention here when new fields are added.
703 620 : let mut size = BASE_SIZE;
704 620 : size += match self {
705 0 : Self::Greeting(_) => 0,
706 :
707 0 : Self::VoteRequest(_) => 0,
708 :
709 0 : Self::Elected(_) => 0,
710 :
711 : Self::AppendRequest(AppendRequest {
712 : h:
713 : AppendRequestHeader {
714 : generation: _,
715 : term: _,
716 : begin_lsn: _,
717 : end_lsn: _,
718 : commit_lsn: _,
719 : truncate_lsn: _,
720 : },
721 620 : wal_data,
722 620 : }) => wal_data.len(),
723 :
724 : Self::NoFlushAppendRequest(AppendRequest {
725 : h:
726 : AppendRequestHeader {
727 : generation: _,
728 : term: _,
729 : begin_lsn: _,
730 : end_lsn: _,
731 : commit_lsn: _,
732 : truncate_lsn: _,
733 : },
734 0 : wal_data,
735 0 : }) => wal_data.len(),
736 :
737 0 : Self::FlushWAL => 0,
738 : };
739 :
740 620 : size
741 620 : }
742 : }
743 :
744 : /// Acceptor -> Proposer messages
745 : #[derive(Debug)]
746 : pub enum AcceptorProposerMessage {
747 : Greeting(AcceptorGreeting),
748 : VoteResponse(VoteResponse),
749 : AppendResponse(AppendResponse),
750 : }
751 :
752 : impl AcceptorProposerMessage {
753 0 : fn put_cstr(buf: &mut BytesMut, s: &str) {
754 0 : buf.put_slice(s.as_bytes());
755 0 : buf.put_u8(0); // null terminator
756 0 : }
757 :
758 : /// Serialize membership::Configuration into buf.
759 20162 : fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
760 20162 : buf.put_u32(mconf.generation.into_inner());
761 20162 : buf.put_u32(mconf.members.m.len() as u32);
762 20162 : for sk in &mconf.members.m {
763 0 : buf.put_u64(sk.id.0);
764 0 : Self::put_cstr(buf, &sk.host);
765 0 : buf.put_u16(sk.pg_port);
766 0 : }
767 20162 : if let Some(ref new_members) = mconf.new_members {
768 0 : buf.put_u32(new_members.m.len() as u32);
769 0 : for sk in &new_members.m {
770 0 : buf.put_u64(sk.id.0);
771 0 : Self::put_cstr(buf, &sk.host);
772 0 : buf.put_u16(sk.pg_port);
773 0 : }
774 20162 : } else {
775 20162 : buf.put_u32(0);
776 20162 : }
777 20162 : }
778 :
779 : /// Serialize acceptor -> proposer message.
780 27059 : pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
781 27059 : if proto_version == SK_PROTO_VERSION_3 {
782 27059 : match self {
783 20162 : AcceptorProposerMessage::Greeting(msg) => {
784 20162 : buf.put_u8(b'g');
785 20162 : buf.put_u64(msg.node_id.0);
786 20162 : Self::serialize_mconf(buf, &msg.mconf);
787 20162 : buf.put_u64(msg.term)
788 : }
789 3085 : AcceptorProposerMessage::VoteResponse(msg) => {
790 3085 : buf.put_u8(b'v');
791 3085 : buf.put_u32(msg.generation.into_inner());
792 3085 : buf.put_u64(msg.term);
793 3085 : buf.put_u8(msg.vote_given as u8);
794 3085 : buf.put_u64(msg.flush_lsn.into());
795 3085 : buf.put_u64(msg.truncate_lsn.into());
796 3085 : buf.put_u32(msg.term_history.0.len() as u32);
797 19049 : for e in &msg.term_history.0 {
798 15964 : buf.put_u64(e.term);
799 15964 : buf.put_u64(e.lsn.into());
800 15964 : }
801 : }
802 3812 : AcceptorProposerMessage::AppendResponse(msg) => {
803 3812 : buf.put_u8(b'a');
804 3812 : buf.put_u32(msg.generation.into_inner());
805 3812 : buf.put_u64(msg.term);
806 3812 : buf.put_u64(msg.flush_lsn.into());
807 3812 : buf.put_u64(msg.commit_lsn.into());
808 3812 : buf.put_i64(msg.hs_feedback.ts);
809 3812 : buf.put_u64(msg.hs_feedback.xmin);
810 3812 : buf.put_u64(msg.hs_feedback.catalog_xmin);
811 :
812 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
813 : // if it is not present.
814 3812 : if let Some(ref msg) = msg.pageserver_feedback {
815 0 : msg.serialize(buf);
816 3812 : }
817 : }
818 : }
819 27059 : Ok(())
820 : // TODO remove 3 after converting all msgs
821 0 : } else if proto_version == SK_PROTO_VERSION_2 {
822 0 : match self {
823 0 : AcceptorProposerMessage::Greeting(msg) => {
824 0 : buf.put_u64_le('g' as u64);
825 0 : // v2 didn't have mconf and fields were reordered
826 0 : buf.put_u64_le(msg.term);
827 0 : buf.put_u64_le(msg.node_id.0);
828 0 : }
829 0 : AcceptorProposerMessage::VoteResponse(msg) => {
830 : // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
831 0 : buf.put_u64_le('v' as u64);
832 0 : buf.put_u64_le(msg.term);
833 0 : buf.put_u64_le(msg.vote_given as u64);
834 0 : buf.put_u64_le(msg.flush_lsn.into());
835 0 : buf.put_u64_le(msg.truncate_lsn.into());
836 0 : buf.put_u32_le(msg.term_history.0.len() as u32);
837 0 : for e in &msg.term_history.0 {
838 0 : buf.put_u64_le(e.term);
839 0 : buf.put_u64_le(e.lsn.into());
840 0 : }
841 : // removed timeline_start_lsn
842 0 : buf.put_u64_le(0);
843 : }
844 0 : AcceptorProposerMessage::AppendResponse(msg) => {
845 : // v2 didn't have generation
846 0 : buf.put_u64_le('a' as u64);
847 0 : buf.put_u64_le(msg.term);
848 0 : buf.put_u64_le(msg.flush_lsn.into());
849 0 : buf.put_u64_le(msg.commit_lsn.into());
850 0 : buf.put_i64_le(msg.hs_feedback.ts);
851 0 : buf.put_u64_le(msg.hs_feedback.xmin);
852 0 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
853 :
854 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
855 : // if it is not present.
856 0 : if let Some(ref msg) = msg.pageserver_feedback {
857 0 : msg.serialize(buf);
858 0 : }
859 : }
860 : }
861 0 : Ok(())
862 : } else {
863 0 : bail!("unsupported protocol version {}", proto_version);
864 : }
865 27059 : }
866 : }
867 :
868 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
869 : /// It controls all WAL disk writes and updates of control file.
870 : ///
871 : /// Currently safekeeper processes:
872 : /// - messages from compute (proposers) and provides replies
873 : /// - messages from broker peers
874 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
875 : /// LSN since the proposer safekeeper currently talking to appends WAL;
876 : /// determines last_log_term switch point.
877 : pub term_start_lsn: Lsn,
878 :
879 : pub state: TimelineState<CTRL>, // persistent state storage
880 : pub wal_store: WAL,
881 :
882 : node_id: NodeId, // safekeeper's node id
883 : }
884 :
885 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
886 : where
887 : CTRL: control_file::Storage,
888 : WAL: wal_storage::Storage,
889 : {
890 : /// Accepts a control file storage containing the safekeeper state.
891 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
892 : /// and `server` (`wal_seg_size` inside it) fields.
893 9149 : pub fn new(
894 9149 : state: TimelineState<CTRL>,
895 9149 : wal_store: WAL,
896 9149 : node_id: NodeId,
897 9149 : ) -> Result<SafeKeeper<CTRL, WAL>> {
898 9149 : if state.tenant_id == TenantId::from([0u8; 16])
899 9149 : || state.timeline_id == TimelineId::from([0u8; 16])
900 : {
901 0 : bail!(
902 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
903 0 : state.tenant_id,
904 0 : state.timeline_id
905 : );
906 9 : }
907 :
908 9149 : Ok(SafeKeeper {
909 9149 : term_start_lsn: Lsn(0),
910 9149 : state,
911 9149 : wal_store,
912 9149 : node_id,
913 9149 : })
914 9 : }
915 :
916 : /// Get history of term switches for the available WAL
917 4115 : fn get_term_history(&self) -> TermHistory {
918 4115 : self.state
919 4115 : .acceptor_state
920 4115 : .term_history
921 4115 : .up_to(self.flush_lsn())
922 9 : }
923 :
924 2 : pub fn get_last_log_term(&self) -> Term {
925 2 : self.state
926 2 : .acceptor_state
927 2 : .get_last_log_term(self.flush_lsn())
928 2 : }
929 :
930 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
931 21266 : pub fn flush_lsn(&self) -> Lsn {
932 21266 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
933 1883 : }
934 :
935 : /// Process message from proposer and possibly form reply. Concurrent
936 : /// callers must exclude each other.
937 33818 : pub async fn process_msg(
938 33818 : &mut self,
939 33818 : msg: &ProposerAcceptorMessage,
940 33818 : ) -> Result<Option<AcceptorProposerMessage>> {
941 33818 : let res = match msg {
942 20162 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
943 3088 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
944 1029 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
945 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
946 5 : self.handle_append_request(msg, true).await
947 : }
948 5102 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
949 5102 : self.handle_append_request(msg, false).await
950 : }
951 4432 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
952 : };
953 :
954 : // BEGIN HADRON
955 33818 : match &res {
956 33814 : Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
957 33814 : .with_label_values(&["success"])
958 33814 : .inc(),
959 4 : Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
960 4 : .with_label_values(&["error"])
961 4 : .inc(),
962 : };
963 :
964 33818 : res
965 : // END HADRON
966 1256 : }
967 :
968 : /// Handle initial message from proposer: check its sanity and send my
969 : /// current term.
970 20162 : async fn handle_greeting(
971 20162 : &mut self,
972 20162 : msg: &ProposerGreeting,
973 20162 : ) -> Result<Option<AcceptorProposerMessage>> {
974 : /* Postgres major version mismatch is treated as fatal error
975 : * because safekeepers parse WAL headers and the format
976 : * may change between versions.
977 : */
978 20162 : if PgMajorVersion::try_from(msg.pg_version)?
979 20162 : != PgMajorVersion::try_from(self.state.server.pg_version)?
980 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
981 : {
982 0 : bail!(
983 0 : "incompatible server version {}, expected {}",
984 : msg.pg_version,
985 0 : self.state.server.pg_version
986 : );
987 0 : }
988 :
989 20162 : if msg.tenant_id != self.state.tenant_id {
990 0 : bail!(
991 0 : "invalid tenant ID, got {}, expected {}",
992 : msg.tenant_id,
993 0 : self.state.tenant_id
994 : );
995 0 : }
996 20162 : if msg.timeline_id != self.state.timeline_id {
997 0 : bail!(
998 0 : "invalid timeline ID, got {}, expected {}",
999 : msg.timeline_id,
1000 0 : self.state.timeline_id
1001 : );
1002 0 : }
1003 20162 : if self.state.server.wal_seg_size != msg.wal_seg_size {
1004 0 : bail!(
1005 0 : "invalid wal_seg_size, got {}, expected {}",
1006 : msg.wal_seg_size,
1007 0 : self.state.server.wal_seg_size
1008 : );
1009 0 : }
1010 :
1011 : // system_id will be updated on mismatch
1012 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
1013 20162 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
1014 0 : if self.state.server.system_id != 0 {
1015 0 : warn!(
1016 0 : "unexpected system ID arrived, got {}, expected {}",
1017 0 : msg.system_id, self.state.server.system_id
1018 : );
1019 0 : }
1020 :
1021 0 : let mut state = self.state.start_change();
1022 0 : state.server.system_id = msg.system_id;
1023 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
1024 0 : state.server.pg_version = msg.pg_version;
1025 0 : }
1026 0 : self.state.finish_change(&state).await?;
1027 0 : }
1028 :
1029 20162 : if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
1030 0 : bail!(
1031 0 : "refused to switch into {}, node {} is not a member of it",
1032 : msg.mconf,
1033 : self.node_id,
1034 : );
1035 0 : }
1036 : // Switch into conf given by proposer conf if it is higher.
1037 20162 : self.state.membership_switch(msg.mconf.clone()).await?;
1038 :
1039 20162 : let apg = AcceptorGreeting {
1040 20162 : node_id: self.node_id,
1041 20162 : mconf: self.state.mconf.clone(),
1042 20162 : term: self.state.acceptor_state.term,
1043 20162 : };
1044 20162 : info!(
1045 0 : "processed greeting {:?} from walproposer, sending {:?}",
1046 : msg, apg
1047 : );
1048 20162 : Ok(Some(AcceptorProposerMessage::Greeting(apg)))
1049 0 : }
1050 :
1051 : /// Give vote for the given term, if we haven't done that previously.
1052 3088 : async fn handle_vote_request(
1053 3088 : &mut self,
1054 3088 : msg: &VoteRequest,
1055 3088 : ) -> Result<Option<AcceptorProposerMessage>> {
1056 3088 : if self.state.mconf.generation != msg.generation {
1057 1 : bail!(
1058 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1059 : msg,
1060 1 : self.state.mconf.generation
1061 : );
1062 2 : }
1063 : // Once voted, we won't accept data from older proposers; flush
1064 : // everything we've already received so that new proposer starts
1065 : // streaming at end of our WAL, without overlap. WAL is truncated at
1066 : // streaming point and commit_lsn may be advanced from peers, so this
1067 : // also avoids possible spurious attempt to truncate committed WAL.
1068 3087 : self.wal_store.flush_wal().await?;
1069 : // initialize with refusal
1070 3087 : let mut resp = VoteResponse {
1071 3087 : generation: self.state.mconf.generation,
1072 3087 : term: self.state.acceptor_state.term,
1073 3087 : vote_given: false,
1074 3087 : flush_lsn: self.flush_lsn(),
1075 3087 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
1076 3087 : term_history: self.get_term_history(),
1077 3087 : };
1078 3087 : if self.state.acceptor_state.term < msg.term {
1079 2905 : let mut state = self.state.start_change();
1080 2905 : state.acceptor_state.term = msg.term;
1081 : // persist vote before sending it out
1082 2905 : self.state.finish_change(&state).await?;
1083 :
1084 2905 : resp.term = self.state.acceptor_state.term;
1085 2905 : resp.vote_given = true;
1086 1 : }
1087 3087 : info!("processed {:?}: sending {:?}", msg, &resp);
1088 3087 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
1089 3 : }
1090 :
1091 : /// Form AppendResponse from current state.
1092 4435 : fn append_response(&self) -> AppendResponse {
1093 4435 : let ar = AppendResponse {
1094 4435 : generation: self.state.mconf.generation,
1095 4435 : term: self.state.acceptor_state.term,
1096 4435 : flush_lsn: self.flush_lsn(),
1097 4435 : commit_lsn: self.state.commit_lsn,
1098 4435 : // will be filled by the upper code to avoid bothering safekeeper
1099 4435 : hs_feedback: HotStandbyFeedback::empty(),
1100 4435 : pageserver_feedback: None,
1101 4435 : };
1102 4435 : trace!("formed AppendResponse {:?}", ar);
1103 4435 : ar
1104 623 : }
1105 :
1106 1029 : async fn handle_elected(
1107 1029 : &mut self,
1108 1029 : msg: &ProposerElected,
1109 1029 : ) -> Result<Option<AcceptorProposerMessage>> {
1110 1029 : let _timer = MISC_OPERATION_SECONDS
1111 1029 : .with_label_values(&["handle_elected"])
1112 1029 : .start_timer();
1113 :
1114 1029 : info!(
1115 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
1116 : msg,
1117 0 : self.state.acceptor_state.term,
1118 0 : self.get_last_log_term(),
1119 0 : self.flush_lsn()
1120 : );
1121 1029 : if self.state.mconf.generation != msg.generation {
1122 1 : bail!(
1123 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1124 : msg,
1125 1 : self.state.mconf.generation
1126 : );
1127 7 : }
1128 1028 : if self.state.acceptor_state.term < msg.term {
1129 7 : let mut state = self.state.start_change();
1130 7 : state.acceptor_state.term = msg.term;
1131 7 : self.state.finish_change(&state).await?;
1132 0 : }
1133 :
1134 : // If our term is higher, ignore the message (next feedback will inform the compute)
1135 1028 : if self.state.acceptor_state.term > msg.term {
1136 0 : return Ok(None);
1137 7 : }
1138 :
1139 : // Before truncating WAL check-cross the check divergence point received
1140 : // from the walproposer.
1141 1028 : let sk_th = self.get_term_history();
1142 1028 : let last_common_point = match TermHistory::find_highest_common_point(
1143 1028 : &msg.term_history,
1144 1028 : &sk_th,
1145 1028 : self.flush_lsn(),
1146 1028 : ) {
1147 : // No common point. Expect streaming from the beginning of the
1148 : // history like walproposer while we don't have proper init.
1149 137 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
1150 137 : "empty walproposer term history {:?}",
1151 : msg.term_history
1152 0 : ))?,
1153 891 : Some(lcp) => lcp,
1154 : };
1155 : // This is expected to happen in a rare race when another connection
1156 : // from the same walproposer writes + flushes WAL after this connection
1157 : // sent flush_lsn in VoteRequest; for instance, very late
1158 : // ProposerElected message delivery after another connection was
1159 : // established and wrote WAL. In such cases error is transient;
1160 : // reconnection makes safekeeper send newest term history and flush_lsn
1161 : // and walproposer recalculates the streaming point. OTOH repeating
1162 : // error indicates a serious bug.
1163 1028 : if last_common_point.lsn != msg.start_streaming_at {
1164 0 : bail!(
1165 0 : "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1166 : last_common_point,
1167 : msg.start_streaming_at,
1168 0 : self.state.acceptor_state.term,
1169 : sk_th,
1170 0 : self.flush_lsn(),
1171 : msg.term_history,
1172 : );
1173 7 : }
1174 :
1175 : // We are also expected to never attempt to truncate committed data.
1176 1028 : assert!(
1177 1028 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
1178 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1179 : msg.start_streaming_at,
1180 : self.state.inmem.commit_lsn,
1181 0 : self.state.acceptor_state.term,
1182 : sk_th,
1183 0 : self.flush_lsn(),
1184 : msg.term_history,
1185 : );
1186 :
1187 : // Before first WAL write initialize its segment. It makes first segment
1188 : // pg_waldump'able because stream from compute doesn't include its
1189 : // segment and page headers.
1190 : //
1191 : // If we fail before first WAL write flush this action would be
1192 : // repeated, that's ok because it is idempotent.
1193 1028 : if self.wal_store.flush_lsn() == Lsn::INVALID {
1194 137 : self.wal_store
1195 137 : .initialize_first_segment(msg.start_streaming_at)
1196 137 : .await?;
1197 0 : }
1198 :
1199 : // truncate wal, update the LSNs
1200 1028 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
1201 :
1202 : // and now adopt term history from proposer
1203 : {
1204 1028 : let mut state = self.state.start_change();
1205 :
1206 : // Here we learn initial LSN for the first time, set fields
1207 : // interested in that.
1208 :
1209 1028 : if let Some(start_lsn) = msg.term_history.0.first() {
1210 1028 : if state.timeline_start_lsn == Lsn(0) {
1211 : // Remember point where WAL begins globally. In the future it
1212 : // will be intialized immediately on timeline creation.
1213 137 : state.timeline_start_lsn = start_lsn.lsn;
1214 137 : info!(
1215 0 : "setting timeline_start_lsn to {:?}",
1216 : state.timeline_start_lsn
1217 : );
1218 0 : }
1219 0 : }
1220 :
1221 1028 : if state.peer_horizon_lsn == Lsn(0) {
1222 137 : // Update peer_horizon_lsn as soon as we know where timeline starts.
1223 137 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
1224 137 : state.peer_horizon_lsn = state.timeline_start_lsn;
1225 137 : }
1226 1028 : if state.local_start_lsn == Lsn(0) {
1227 137 : state.local_start_lsn = msg.start_streaming_at;
1228 137 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
1229 0 : }
1230 : // Initializing commit_lsn before acking first flushed record is
1231 : // important to let find_end_of_wal skip the hole in the beginning
1232 : // of the first segment.
1233 : //
1234 : // NB: on new clusters, this happens at the same time as
1235 : // timeline_start_lsn initialization, it is taken outside to provide
1236 : // upgrade.
1237 1028 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
1238 :
1239 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
1240 1028 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
1241 : // similar for remote_consistent_lsn
1242 1028 : state.remote_consistent_lsn =
1243 1028 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
1244 :
1245 1028 : state.acceptor_state.term_history = msg.term_history.clone();
1246 1028 : self.state.finish_change(&state).await?;
1247 : }
1248 :
1249 1028 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
1250 :
1251 : // Cache LSN where term starts to immediately fsync control file with
1252 : // commit_lsn once we reach it -- sync-safekeepers finishes when
1253 : // persisted commit_lsn on majority of safekeepers aligns.
1254 1028 : self.term_start_lsn = match msg.term_history.0.last() {
1255 0 : None => bail!("proposer elected with empty term history"),
1256 1028 : Some(term_lsn_start) => term_lsn_start.lsn,
1257 : };
1258 :
1259 1028 : Ok(None)
1260 8 : }
1261 :
1262 : /// Advance commit_lsn taking into account what we have locally.
1263 : ///
1264 : /// Note: it is assumed that 'WAL we have is from the right term' check has
1265 : /// already been done outside.
1266 3497 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
1267 : // Both peers and walproposer communicate this value, we might already
1268 : // have a fresher (higher) version.
1269 3497 : candidate = max(candidate, self.state.inmem.commit_lsn);
1270 3497 : let commit_lsn = min(candidate, self.flush_lsn());
1271 3497 : assert!(
1272 3497 : commit_lsn >= self.state.inmem.commit_lsn,
1273 0 : "commit_lsn monotonicity violated: old={} new={}",
1274 : self.state.inmem.commit_lsn,
1275 : commit_lsn
1276 : );
1277 :
1278 3497 : self.state.inmem.commit_lsn = commit_lsn;
1279 :
1280 : // If new commit_lsn reached term switch, force sync of control
1281 : // file: walproposer in sync mode is very interested when this
1282 : // happens. Note: this is for sync-safekeepers mode only, as
1283 : // otherwise commit_lsn might jump over term_start_lsn.
1284 3497 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
1285 140 : self.state.flush().await?;
1286 620 : }
1287 :
1288 3497 : Ok(())
1289 620 : }
1290 :
1291 : /// Handle request to append WAL.
1292 : #[allow(clippy::comparison_chain)]
1293 5107 : async fn handle_append_request(
1294 5107 : &mut self,
1295 5107 : msg: &AppendRequest,
1296 5107 : require_flush: bool,
1297 5107 : ) -> Result<Option<AcceptorProposerMessage>> {
1298 : // Refuse message on generation mismatch. On reconnect wp will get full
1299 : // configuration from greeting.
1300 5107 : if self.state.mconf.generation != msg.h.generation {
1301 1 : bail!(
1302 1 : "refusing append request due to generation mismatch: request {}, sk {}",
1303 : msg.h.generation,
1304 1 : self.state.mconf.generation
1305 : );
1306 624 : }
1307 :
1308 5106 : if self.state.acceptor_state.term < msg.h.term {
1309 0 : bail!("got AppendRequest before ProposerElected");
1310 624 : }
1311 :
1312 : // If our term is higher, immediately refuse the message. Send term only
1313 : // response; elected walproposer can never advance the term, so it will
1314 : // figure out the refusal from it -- which is important as term change
1315 : // should cause not just reconnection but whole walproposer re-election.
1316 5106 : if self.state.acceptor_state.term > msg.h.term {
1317 0 : let resp = AppendResponse::term_only(
1318 0 : self.state.mconf.generation,
1319 0 : self.state.acceptor_state.term,
1320 : );
1321 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
1322 624 : }
1323 :
1324 : // Disallow any non-sequential writes, which can result in gaps or
1325 : // overwrites. If we need to move the pointer, ProposerElected message
1326 : // should have truncated WAL first accordingly. Note that the first
1327 : // condition (WAL rewrite) is quite expected in real world; it happens
1328 : // when walproposer reconnects to safekeeper and writes some more data
1329 : // while first connection still gets some packets later. It might be
1330 : // better to not log this as error! above.
1331 5106 : let write_lsn = self.wal_store.write_lsn();
1332 5106 : let flush_lsn = self.wal_store.flush_lsn();
1333 5106 : if write_lsn > msg.h.begin_lsn {
1334 1 : bail!(
1335 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
1336 : write_lsn,
1337 : msg.h.begin_lsn
1338 : );
1339 623 : }
1340 5105 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
1341 0 : bail!(
1342 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
1343 : write_lsn,
1344 : msg.h.begin_lsn,
1345 : );
1346 623 : }
1347 :
1348 : // Now we know that we are in the same term as the proposer, process the
1349 : // message.
1350 :
1351 : // do the job
1352 5105 : if !msg.wal_data.is_empty() {
1353 1757 : self.wal_store
1354 1757 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
1355 1757 : .await?;
1356 0 : }
1357 :
1358 : // flush wal to the disk, if required
1359 5105 : if require_flush {
1360 3 : self.wal_store.flush_wal().await?;
1361 620 : }
1362 :
1363 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
1364 : // manager, off of the WAL ingest hot path.
1365 5105 : if msg.h.commit_lsn != Lsn(0) {
1366 3497 : self.update_commit_lsn(msg.h.commit_lsn).await?;
1367 3 : }
1368 : // Value calculated by walproposer can always lag:
1369 : // - safekeepers can forget inmem value and send to proposer lower
1370 : // persisted one on restart;
1371 : // - if we make safekeepers always send persistent value,
1372 : // any compute restart would pull it down.
1373 : // Thus, take max before adopting.
1374 5105 : self.state.inmem.peer_horizon_lsn =
1375 5105 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
1376 :
1377 5105 : trace!(
1378 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
1379 0 : msg.wal_data.len(),
1380 : msg.h.begin_lsn,
1381 : msg.h.end_lsn,
1382 : msg.h.commit_lsn,
1383 : msg.h.truncate_lsn,
1384 : require_flush,
1385 : );
1386 :
1387 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
1388 : // This is the common case for !require_flush, but a flush can still
1389 : // happen on segment bounds.
1390 5105 : if !require_flush && flush_lsn == self.flush_lsn() {
1391 5102 : return Ok(None);
1392 3 : }
1393 :
1394 3 : let resp = self.append_response();
1395 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1396 625 : }
1397 :
1398 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1399 4432 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1400 4432 : self.wal_store.flush_wal().await?;
1401 4432 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1402 4432 : self.append_response(),
1403 4432 : )))
1404 620 : }
1405 :
1406 : /// Update commit_lsn from peer safekeeper data.
1407 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1408 0 : if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
1409 : // Note: the check is too restrictive, generally we can update local
1410 : // commit_lsn if our history matches (is part of) history of advanced
1411 : // commit_lsn provider.
1412 0 : if sk_info.last_log_term == self.get_last_log_term() {
1413 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1414 0 : }
1415 0 : }
1416 0 : Ok(())
1417 0 : }
1418 : }
1419 :
1420 : #[cfg(test)]
1421 : mod tests {
1422 : use std::ops::Deref;
1423 : use std::str::FromStr;
1424 : use std::time::{Instant, UNIX_EPOCH};
1425 :
1426 : use futures::future::BoxFuture;
1427 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
1428 : use safekeeper_api::ServerInfo;
1429 : use safekeeper_api::membership::{
1430 : Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
1431 : };
1432 :
1433 : use super::*;
1434 : use crate::state::{EvictionState, TimelinePersistentState};
1435 :
1436 : // fake storage for tests
1437 : struct InMemoryState {
1438 : persisted_state: TimelinePersistentState,
1439 : }
1440 :
1441 : impl control_file::Storage for InMemoryState {
1442 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1443 5 : self.persisted_state = s.clone();
1444 5 : Ok(())
1445 5 : }
1446 :
1447 0 : fn last_persist_at(&self) -> Instant {
1448 0 : Instant::now()
1449 0 : }
1450 : }
1451 :
1452 : impl Deref for InMemoryState {
1453 : type Target = TimelinePersistentState;
1454 :
1455 100 : fn deref(&self) -> &Self::Target {
1456 100 : &self.persisted_state
1457 100 : }
1458 : }
1459 :
1460 3 : fn test_sk_state() -> TimelinePersistentState {
1461 3 : let mut state = TimelinePersistentState::empty();
1462 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1463 3 : state.tenant_id = TenantId::from([1u8; 16]);
1464 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1465 3 : state
1466 3 : }
1467 :
1468 : struct DummyWalStore {
1469 : lsn: Lsn,
1470 : }
1471 :
1472 : impl wal_storage::Storage for DummyWalStore {
1473 4 : fn write_lsn(&self) -> Lsn {
1474 4 : self.lsn
1475 4 : }
1476 :
1477 19 : fn flush_lsn(&self) -> Lsn {
1478 19 : self.lsn
1479 19 : }
1480 :
1481 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1482 2 : Ok(())
1483 2 : }
1484 :
1485 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1486 3 : self.lsn = startpos + buf.len() as u64;
1487 3 : Ok(())
1488 3 : }
1489 :
1490 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1491 2 : self.lsn = end_pos;
1492 2 : Ok(())
1493 2 : }
1494 :
1495 5 : async fn flush_wal(&mut self) -> Result<()> {
1496 5 : Ok(())
1497 5 : }
1498 :
1499 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1500 0 : Box::pin(async { Ok(()) })
1501 0 : }
1502 :
1503 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1504 0 : crate::metrics::WalStorageMetrics::default()
1505 0 : }
1506 : }
1507 :
1508 : #[tokio::test]
1509 1 : async fn test_voting() {
1510 1 : let storage = InMemoryState {
1511 1 : persisted_state: test_sk_state(),
1512 1 : };
1513 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1514 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1515 :
1516 : // Vote with generation mismatch should be rejected.
1517 1 : let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1518 1 : generation: SafekeeperGeneration::new(42),
1519 1 : term: 1,
1520 1 : });
1521 1 : assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
1522 :
1523 : // check voting for 1 is ok
1524 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1525 1 : generation: Generation::new(0),
1526 1 : term: 1,
1527 1 : });
1528 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1529 1 : match vote_resp.unwrap() {
1530 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
1531 0 : r => panic!("unexpected response: {r:?}"),
1532 : }
1533 :
1534 : // reboot...
1535 1 : let state = sk.state.deref().clone();
1536 1 : let storage = InMemoryState {
1537 1 : persisted_state: state,
1538 1 : };
1539 :
1540 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1541 :
1542 : // and ensure voting second time for 1 is not ok
1543 1 : vote_resp = sk.process_msg(&vote_request).await;
1544 1 : match vote_resp.unwrap() {
1545 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
1546 1 : r => panic!("unexpected response: {r:?}"),
1547 1 : }
1548 1 : }
1549 :
1550 : #[tokio::test]
1551 1 : async fn test_last_log_term_switch() {
1552 1 : let storage = InMemoryState {
1553 1 : persisted_state: test_sk_state(),
1554 1 : };
1555 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1556 :
1557 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1558 :
1559 1 : let mut ar_hdr = AppendRequestHeader {
1560 1 : generation: Generation::new(0),
1561 1 : term: 2,
1562 1 : begin_lsn: Lsn(1),
1563 1 : end_lsn: Lsn(2),
1564 1 : commit_lsn: Lsn(0),
1565 1 : truncate_lsn: Lsn(0),
1566 1 : };
1567 1 : let mut append_request = AppendRequest {
1568 1 : h: ar_hdr.clone(),
1569 1 : wal_data: Bytes::from_static(b"b"),
1570 1 : };
1571 :
1572 1 : let pem = ProposerElected {
1573 1 : generation: Generation::new(0),
1574 1 : term: 2,
1575 1 : start_streaming_at: Lsn(1),
1576 1 : term_history: TermHistory(vec![
1577 1 : TermLsn {
1578 1 : term: 1,
1579 1 : lsn: Lsn(1),
1580 1 : },
1581 1 : TermLsn {
1582 1 : term: 2,
1583 1 : lsn: Lsn(3),
1584 1 : },
1585 1 : ]),
1586 1 : };
1587 :
1588 : // check that elected msg with generation mismatch is rejected
1589 1 : let mut pem_gen_mismatch = pem.clone();
1590 1 : pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
1591 1 : assert!(
1592 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
1593 1 : .await
1594 1 : .is_err()
1595 : );
1596 :
1597 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1598 1 : .await
1599 1 : .unwrap();
1600 :
1601 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1602 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1603 1 : .await
1604 1 : .unwrap();
1605 1 : assert_eq!(sk.get_last_log_term(), 1);
1606 :
1607 : // but record at term_start_lsn does the switch
1608 1 : ar_hdr.begin_lsn = Lsn(2);
1609 1 : ar_hdr.end_lsn = Lsn(3);
1610 1 : append_request = AppendRequest {
1611 1 : h: ar_hdr,
1612 1 : wal_data: Bytes::from_static(b"b"),
1613 1 : };
1614 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1615 1 : .await
1616 1 : .unwrap();
1617 1 : assert_eq!(sk.get_last_log_term(), 2);
1618 1 : }
1619 :
1620 : #[tokio::test]
1621 1 : async fn test_non_consecutive_write() {
1622 1 : let storage = InMemoryState {
1623 1 : persisted_state: test_sk_state(),
1624 1 : };
1625 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1626 :
1627 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1628 :
1629 1 : let pem = ProposerElected {
1630 1 : generation: Generation::new(0),
1631 1 : term: 1,
1632 1 : start_streaming_at: Lsn(1),
1633 1 : term_history: TermHistory(vec![TermLsn {
1634 1 : term: 1,
1635 1 : lsn: Lsn(1),
1636 1 : }]),
1637 1 : };
1638 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1639 1 : .await
1640 1 : .unwrap();
1641 :
1642 1 : let ar_hdr = AppendRequestHeader {
1643 1 : generation: Generation::new(0),
1644 1 : term: 1,
1645 1 : begin_lsn: Lsn(1),
1646 1 : end_lsn: Lsn(2),
1647 1 : commit_lsn: Lsn(0),
1648 1 : truncate_lsn: Lsn(0),
1649 1 : };
1650 1 : let append_request = AppendRequest {
1651 1 : h: ar_hdr.clone(),
1652 1 : wal_data: Bytes::from_static(b"b"),
1653 1 : };
1654 :
1655 : // check that append request with generation mismatch is rejected
1656 1 : let mut ar_hdr_gen_mismatch = ar_hdr.clone();
1657 1 : ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
1658 1 : let append_request_gen_mismatch = AppendRequest {
1659 1 : h: ar_hdr_gen_mismatch,
1660 1 : wal_data: Bytes::from_static(b"b"),
1661 1 : };
1662 1 : assert!(
1663 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
1664 1 : append_request_gen_mismatch
1665 1 : ))
1666 1 : .await
1667 1 : .is_err()
1668 : );
1669 :
1670 : // do write ending at 2, it should be ok
1671 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1672 1 : .await
1673 1 : .unwrap();
1674 1 : let mut ar_hrd2 = ar_hdr.clone();
1675 1 : ar_hrd2.begin_lsn = Lsn(4);
1676 1 : ar_hrd2.end_lsn = Lsn(5);
1677 1 : let append_request = AppendRequest {
1678 1 : h: ar_hdr,
1679 1 : wal_data: Bytes::from_static(b"b"),
1680 1 : };
1681 : // and now starting at 4, it must fail
1682 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1683 1 : .await
1684 1 : .unwrap_err();
1685 1 : }
1686 :
1687 : #[test]
1688 1 : fn test_find_highest_common_point_none() {
1689 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1690 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1691 1 : assert_eq!(
1692 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1693 : None
1694 : );
1695 1 : }
1696 :
1697 : #[test]
1698 1 : fn test_find_highest_common_point_middle() {
1699 1 : let prop_th = TermHistory(vec![
1700 1 : (1, Lsn(10)).into(),
1701 1 : (2, Lsn(20)).into(),
1702 1 : (4, Lsn(40)).into(),
1703 1 : ]);
1704 1 : let sk_th = TermHistory(vec![
1705 1 : (1, Lsn(10)).into(),
1706 1 : (2, Lsn(20)).into(),
1707 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1708 1 : ]);
1709 1 : assert_eq!(
1710 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1711 : Some(TermLsn {
1712 : term: 2,
1713 : lsn: Lsn(30),
1714 : })
1715 : );
1716 1 : }
1717 :
1718 : #[test]
1719 1 : fn test_find_highest_common_point_sk_end() {
1720 1 : let prop_th = TermHistory(vec![
1721 1 : (1, Lsn(10)).into(),
1722 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1723 1 : (4, Lsn(40)).into(),
1724 1 : ]);
1725 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1726 1 : assert_eq!(
1727 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1728 : Some(TermLsn {
1729 : term: 2,
1730 : lsn: Lsn(32),
1731 : })
1732 : );
1733 1 : }
1734 :
1735 : #[test]
1736 1 : fn test_find_highest_common_point_walprop() {
1737 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1738 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1739 1 : assert_eq!(
1740 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1741 : Some(TermLsn {
1742 : term: 2,
1743 : lsn: Lsn(32),
1744 : })
1745 : );
1746 1 : }
1747 :
1748 : #[test]
1749 1 : fn test_sk_state_bincode_serde_roundtrip() {
1750 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1751 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1752 1 : let state = TimelinePersistentState {
1753 1 : tenant_id,
1754 1 : timeline_id,
1755 1 : mconf: Configuration {
1756 1 : generation: SafekeeperGeneration::new(42),
1757 1 : members: MemberSet::new(vec![SafekeeperId {
1758 1 : id: NodeId(1),
1759 1 : host: "hehe.org".to_owned(),
1760 1 : pg_port: 5432,
1761 1 : }])
1762 1 : .expect("duplicate member"),
1763 1 : new_members: None,
1764 1 : },
1765 1 : acceptor_state: AcceptorState {
1766 1 : term: 42,
1767 1 : term_history: TermHistory(vec![TermLsn {
1768 1 : lsn: Lsn(0x1),
1769 1 : term: 41,
1770 1 : }]),
1771 1 : },
1772 1 : server: ServerInfo {
1773 1 : pg_version: PgVersionId::from_full_pg_version(140000),
1774 1 : system_id: 0x1234567887654321,
1775 1 : wal_seg_size: 0x12345678,
1776 1 : },
1777 1 : proposer_uuid: {
1778 1 : let mut arr = timeline_id.as_arr();
1779 1 : arr.reverse();
1780 1 : arr
1781 1 : },
1782 1 : timeline_start_lsn: Lsn(0x12345600),
1783 1 : local_start_lsn: Lsn(0x12),
1784 1 : commit_lsn: Lsn(1234567800),
1785 1 : backup_lsn: Lsn(1234567300),
1786 1 : peer_horizon_lsn: Lsn(9999999),
1787 1 : remote_consistent_lsn: Lsn(1234560000),
1788 1 : partial_backup: crate::wal_backup_partial::State::default(),
1789 1 : eviction_state: EvictionState::Present,
1790 1 : creation_ts: UNIX_EPOCH,
1791 1 : };
1792 :
1793 1 : let ser = state.ser().unwrap();
1794 :
1795 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1796 :
1797 1 : assert_eq!(deser, state);
1798 1 : }
1799 : }
|