Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use std::cmp::{max, min};
4 : use std::fmt;
5 : use std::io::Read;
6 : use std::str::FromStr;
7 :
8 : use anyhow::{Context, Result, bail};
9 : use byteorder::{LittleEndian, ReadBytesExt};
10 : use bytes::{Buf, BufMut, Bytes, BytesMut};
11 : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
12 : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
13 : use pq_proto::SystemId;
14 : use safekeeper_api::membership::{
15 : INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
16 : };
17 : use safekeeper_api::models::HotStandbyFeedback;
18 : use safekeeper_api::{Term, membership};
19 : use serde::{Deserialize, Serialize};
20 : use storage_broker::proto::SafekeeperTimelineInfo;
21 : use tracing::*;
22 : use utils::bin_ser::LeSer;
23 : use utils::id::{NodeId, TenantId, TimelineId};
24 : use utils::lsn::Lsn;
25 : use utils::pageserver_feedback::PageserverFeedback;
26 :
27 : use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
28 : use crate::state::TimelineState;
29 : use crate::{control_file, wal_storage};
30 :
31 : pub const SK_PROTO_VERSION_2: u32 = 2;
32 : pub const SK_PROTO_VERSION_3: u32 = 3;
33 : pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
34 :
35 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
36 : pub struct TermLsn {
37 : pub term: Term,
38 : pub lsn: Lsn,
39 : }
40 :
41 : // Creation from tuple provides less typing (e.g. for unit tests).
42 : impl From<(Term, Lsn)> for TermLsn {
43 1298 : fn from(pair: (Term, Lsn)) -> TermLsn {
44 1298 : TermLsn {
45 1298 : term: pair.0,
46 1298 : lsn: pair.1,
47 1298 : }
48 1298 : }
49 : }
50 :
51 0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
52 : pub struct TermHistory(pub Vec<TermLsn>);
53 :
54 : impl TermHistory {
55 1471 : pub fn empty() -> TermHistory {
56 1471 : TermHistory(Vec::new())
57 1471 : }
58 :
59 : // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
60 680 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
61 680 : let n_entries = bytes
62 680 : .get_u32_f()
63 680 : .with_context(|| "TermHistory misses len")?;
64 680 : let mut res = Vec::with_capacity(n_entries as usize);
65 4636 : for i in 0..n_entries {
66 4636 : let term = bytes
67 4636 : .get_u64_f()
68 4636 : .with_context(|| format!("TermHistory pos {i} misses term"))?;
69 4636 : let lsn = bytes
70 4636 : .get_u64_f()
71 4636 : .with_context(|| format!("TermHistory pos {i} misses lsn"))?
72 4636 : .into();
73 4636 : res.push(TermLsn { term, lsn })
74 : }
75 680 : Ok(TermHistory(res))
76 680 : }
77 :
78 : // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
79 : // TODO remove once v2 protocol is fully dropped.
80 0 : pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
81 0 : if bytes.remaining() < 4 {
82 0 : bail!("TermHistory misses len");
83 0 : }
84 0 : let n_entries = bytes.get_u32_le();
85 0 : let mut res = Vec::with_capacity(n_entries as usize);
86 0 : for _ in 0..n_entries {
87 0 : if bytes.remaining() < 16 {
88 0 : bail!("TermHistory is incomplete");
89 0 : }
90 0 : res.push(TermLsn {
91 0 : term: bytes.get_u64_le(),
92 0 : lsn: bytes.get_u64_le().into(),
93 0 : })
94 : }
95 0 : Ok(TermHistory(res))
96 0 : }
97 :
98 : /// Return copy of self with switches happening strictly after up_to
99 : /// truncated.
100 4643 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
101 4643 : let mut res = Vec::with_capacity(self.0.len());
102 17081 : for e in &self.0 {
103 12441 : if e.lsn > up_to {
104 3 : break;
105 12438 : }
106 12438 : res.push(*e);
107 : }
108 4643 : TermHistory(res)
109 4643 : }
110 :
111 : /// Find point of divergence between leader (walproposer) term history and
112 : /// safekeeper. Arguments are not symmetric as proposer history ends at
113 : /// +infinity while safekeeper at flush_lsn.
114 : /// C version is at walproposer SendProposerElected.
115 691 : pub fn find_highest_common_point(
116 691 : prop_th: &TermHistory,
117 691 : sk_th: &TermHistory,
118 691 : sk_wal_end: Lsn,
119 691 : ) -> Option<TermLsn> {
120 691 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
121 :
122 691 : if let Some(sk_th_last) = sk_th.last() {
123 552 : assert!(
124 552 : sk_th_last.lsn <= sk_wal_end,
125 0 : "safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
126 : );
127 139 : }
128 :
129 : // find last common term, if any...
130 691 : let mut last_common_idx = None;
131 3914 : for i in 0..min(sk_th.len(), prop_th.len()) {
132 3914 : if prop_th[i].term != sk_th[i].term {
133 4 : break;
134 3910 : }
135 : // If term is the same, LSN must be equal as well.
136 3910 : assert!(
137 3910 : prop_th[i].lsn == sk_th[i].lsn,
138 0 : "same term {} has different start LSNs: prop {}, sk {}",
139 0 : prop_th[i].term,
140 0 : prop_th[i].lsn,
141 0 : sk_th[i].lsn
142 : );
143 3910 : last_common_idx = Some(i);
144 : }
145 691 : let last_common_idx = last_common_idx?;
146 : // Now find where it ends at both prop and sk and take min. End of
147 : // (common) term is the start of the next except it is the last one;
148 : // there it is flush_lsn in case of safekeeper or, in case of proposer
149 : // +infinity, so we just take flush_lsn then.
150 551 : if last_common_idx == prop_th.len() - 1 {
151 46 : Some(TermLsn {
152 46 : term: prop_th[last_common_idx].term,
153 46 : lsn: sk_wal_end,
154 46 : })
155 : } else {
156 505 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
157 505 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
158 3 : sk_th[last_common_idx + 1].lsn
159 : } else {
160 502 : sk_wal_end
161 : };
162 505 : Some(TermLsn {
163 505 : term: prop_th[last_common_idx].term,
164 505 : lsn: min(prop_common_term_end, sk_common_term_end),
165 505 : })
166 : }
167 691 : }
168 : }
169 :
170 : /// Display only latest entries for Debug.
171 : impl fmt::Debug for TermHistory {
172 340 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
173 340 : let n_printed = 20;
174 340 : write!(
175 340 : fmt,
176 340 : "{}{:?}",
177 340 : if self.0.len() > n_printed { "... " } else { "" },
178 340 : self.0
179 340 : .iter()
180 340 : .rev()
181 340 : .take(n_printed)
182 1257 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
183 340 : .collect::<Vec<_>>()
184 : )
185 340 : }
186 : }
187 :
188 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
189 : pub type PgUuid = [u8; 16];
190 :
191 : /// Persistent consensus state of the acceptor.
192 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
193 : pub struct AcceptorState {
194 : /// acceptor's last term it voted for (advanced in 1 phase)
195 : pub term: Term,
196 : /// History of term switches for safekeeper's WAL.
197 : /// Actually it often goes *beyond* WAL contents as we adopt term history
198 : /// from the proposer before recovery.
199 : pub term_history: TermHistory,
200 : }
201 :
202 : impl AcceptorState {
203 : /// acceptor's last_log_term is the term of the highest entry in the log
204 1376 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
205 1376 : let th = self.term_history.up_to(flush_lsn);
206 1376 : match th.0.last() {
207 1370 : Some(e) => e.term,
208 6 : None => 0,
209 : }
210 1376 : }
211 : }
212 :
213 : // protocol messages
214 :
215 : /// Initial Proposer -> Acceptor message
216 0 : #[derive(Debug, Deserialize)]
217 : pub struct ProposerGreeting {
218 : pub tenant_id: TenantId,
219 : pub timeline_id: TimelineId,
220 : pub mconf: membership::Configuration,
221 : /// Postgres server version
222 : pub pg_version: PgVersionId,
223 : pub system_id: SystemId,
224 : pub wal_seg_size: u32,
225 : }
226 :
227 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
228 0 : #[derive(Debug, Deserialize)]
229 : pub struct ProposerGreetingV2 {
230 : /// proposer-acceptor protocol version
231 : pub protocol_version: u32,
232 : /// Postgres server version
233 : pub pg_version: PgVersionId,
234 : pub proposer_id: PgUuid,
235 : pub system_id: SystemId,
236 : pub timeline_id: TimelineId,
237 : pub tenant_id: TenantId,
238 : pub tli: TimeLineID,
239 : pub wal_seg_size: u32,
240 : }
241 :
242 : /// Acceptor -> Proposer initial response: the highest term known to me
243 : /// (acceptor voted for).
244 : #[derive(Debug, Serialize)]
245 : pub struct AcceptorGreeting {
246 : node_id: NodeId,
247 : mconf: membership::Configuration,
248 : term: u64,
249 : }
250 :
251 : /// Vote request sent from proposer to safekeepers
252 : #[derive(Debug)]
253 : pub struct VoteRequest {
254 : pub generation: Generation,
255 : pub term: Term,
256 : }
257 :
258 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
259 0 : #[derive(Debug, Deserialize)]
260 : pub struct VoteRequestV2 {
261 : pub term: Term,
262 : }
263 :
264 : /// Vote itself, sent from safekeeper to proposer
265 : #[derive(Debug, Serialize)]
266 : pub struct VoteResponse {
267 : generation: Generation, // membership conf generation
268 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
269 : vote_given: bool,
270 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
271 : // proposer to choose the most advanced one.
272 : pub flush_lsn: Lsn,
273 : truncate_lsn: Lsn,
274 : pub term_history: TermHistory,
275 : }
276 :
277 : /*
278 : * Proposer -> Acceptor message announcing proposer is elected and communicating
279 : * term history to it.
280 : */
281 : #[derive(Debug, Clone)]
282 : pub struct ProposerElected {
283 : pub generation: Generation, // membership conf generation
284 : pub term: Term,
285 : pub start_streaming_at: Lsn,
286 : pub term_history: TermHistory,
287 : }
288 :
289 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
290 : /// communicates commit_lsn.
291 : #[derive(Debug)]
292 : pub struct AppendRequest {
293 : pub h: AppendRequestHeader,
294 : pub wal_data: Bytes,
295 : }
296 0 : #[derive(Debug, Clone, Deserialize)]
297 : pub struct AppendRequestHeader {
298 : pub generation: Generation, // membership conf generation
299 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
300 : pub term: Term,
301 : /// start position of message in WAL
302 : pub begin_lsn: Lsn,
303 : /// end position of message in WAL
304 : pub end_lsn: Lsn,
305 : /// LSN committed by quorum of safekeepers
306 : pub commit_lsn: Lsn,
307 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
308 : pub truncate_lsn: Lsn,
309 : }
310 :
311 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
312 0 : #[derive(Debug, Clone, Deserialize)]
313 : pub struct AppendRequestHeaderV2 {
314 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
315 : pub term: Term,
316 : // TODO: remove this field from the protocol, it in unused -- LSN of term
317 : // switch can be taken from ProposerElected (as well as from term history).
318 : pub term_start_lsn: Lsn,
319 : /// start position of message in WAL
320 : pub begin_lsn: Lsn,
321 : /// end position of message in WAL
322 : pub end_lsn: Lsn,
323 : /// LSN committed by quorum of safekeepers
324 : pub commit_lsn: Lsn,
325 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
326 : pub truncate_lsn: Lsn,
327 : // only for logging/debugging
328 : pub proposer_uuid: PgUuid,
329 : }
330 :
331 : /// Report safekeeper state to proposer
332 : #[derive(Debug, Serialize, Clone)]
333 : pub struct AppendResponse {
334 : // Membership conf generation. Not strictly required because on mismatch
335 : // connection is reset, but let's sanity check it.
336 : generation: Generation,
337 : // Current term of the safekeeper; if it is higher than proposer's, the
338 : // compute is out of date.
339 : pub term: Term,
340 : // Flushed end of wal on safekeeper; one should be always mindful from what
341 : // term history this value comes, either checking history directly or
342 : // observing term being set to one for which WAL truncation is known to have
343 : // happened.
344 : pub flush_lsn: Lsn,
345 : // We report back our awareness about which WAL is committed, as this is
346 : // a criterion for walproposer --sync mode exit
347 : pub commit_lsn: Lsn,
348 : pub hs_feedback: HotStandbyFeedback,
349 : pub pageserver_feedback: Option<PageserverFeedback>,
350 : }
351 :
352 : impl AppendResponse {
353 0 : fn term_only(generation: Generation, term: Term) -> AppendResponse {
354 0 : AppendResponse {
355 0 : generation,
356 0 : term,
357 0 : flush_lsn: Lsn(0),
358 0 : commit_lsn: Lsn(0),
359 0 : hs_feedback: HotStandbyFeedback::empty(),
360 0 : pageserver_feedback: None,
361 0 : }
362 0 : }
363 : }
364 :
365 : /// Proposer -> Acceptor messages
366 : #[derive(Debug)]
367 : pub enum ProposerAcceptorMessage {
368 : Greeting(ProposerGreeting),
369 : VoteRequest(VoteRequest),
370 : Elected(ProposerElected),
371 : AppendRequest(AppendRequest),
372 : NoFlushAppendRequest(AppendRequest),
373 : FlushWAL,
374 : }
375 :
376 : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
377 : /// All reads are in network (big endian) order.
378 : trait BytesF {
379 : fn get_u8_f(&mut self) -> Result<u8>;
380 : fn get_u16_f(&mut self) -> Result<u16>;
381 : fn get_u32_f(&mut self) -> Result<u32>;
382 : fn get_u64_f(&mut self) -> Result<u64>;
383 : }
384 :
385 : impl BytesF for Bytes {
386 26464 : fn get_u8_f(&mut self) -> Result<u8> {
387 26464 : if self.is_empty() {
388 0 : bail!("no bytes left, expected 1");
389 26464 : }
390 26464 : Ok(self.get_u8())
391 26464 : }
392 0 : fn get_u16_f(&mut self) -> Result<u16> {
393 0 : if self.remaining() < 2 {
394 0 : bail!("no bytes left, expected 2");
395 0 : }
396 0 : Ok(self.get_u16())
397 0 : }
398 108416 : fn get_u32_f(&mut self) -> Result<u32> {
399 108416 : if self.remaining() < 4 {
400 0 : bail!("only {} bytes left, expected 4", self.remaining());
401 108416 : }
402 108416 : Ok(self.get_u32())
403 108416 : }
404 47968 : fn get_u64_f(&mut self) -> Result<u64> {
405 47968 : if self.remaining() < 8 {
406 0 : bail!("only {} bytes left, expected 8", self.remaining());
407 47968 : }
408 47968 : Ok(self.get_u64())
409 47968 : }
410 : }
411 :
412 : impl ProposerAcceptorMessage {
413 : /// Read cstring from Bytes.
414 40636 : fn get_cstr(buf: &mut Bytes) -> Result<String> {
415 40636 : let pos = buf
416 40636 : .iter()
417 1340988 : .position(|x| *x == 0)
418 40636 : .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
419 40636 : let result = buf.split_to(pos);
420 40636 : buf.advance(1); // drop the null terminator
421 40636 : match std::str::from_utf8(&result) {
422 40636 : Ok(s) => Ok(s.to_string()),
423 0 : Err(e) => bail!("invalid utf8 in cstring: {}", e),
424 : }
425 40636 : }
426 :
427 : /// Read membership::Configuration from Bytes.
428 20318 : fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
429 20318 : let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
430 20318 : let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
431 : // Main member set must have at least someone in valid configuration.
432 : // Empty conf is allowed until we fully migrate.
433 20318 : if generation != INVALID_GENERATION && members_len == 0 {
434 0 : bail!("empty members_len");
435 20318 : }
436 20318 : let mut members = MemberSet::empty();
437 20318 : for i in 0..members_len {
438 0 : let id = buf
439 0 : .get_u64_f()
440 0 : .with_context(|| format!("reading member {i} node_id"))?;
441 0 : let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
442 0 : let pg_port = buf
443 0 : .get_u16_f()
444 0 : .with_context(|| format!("reading member {i} port"))?;
445 0 : let sk = SafekeeperId {
446 0 : id: NodeId(id),
447 0 : host,
448 0 : pg_port,
449 0 : };
450 0 : members.add(sk)?;
451 : }
452 20318 : let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
453 : // Non joint conf.
454 20318 : if new_members_len == 0 {
455 20318 : Ok(membership::Configuration {
456 20318 : generation,
457 20318 : members,
458 20318 : new_members: None,
459 20318 : })
460 : } else {
461 0 : let mut new_members = MemberSet::empty();
462 0 : for i in 0..new_members_len {
463 0 : let id = buf
464 0 : .get_u64_f()
465 0 : .with_context(|| format!("reading new member {i} node_id"))?;
466 0 : let host =
467 0 : Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
468 0 : let pg_port = buf
469 0 : .get_u16_f()
470 0 : .with_context(|| format!("reading new member {i} port"))?;
471 0 : let sk = SafekeeperId {
472 0 : id: NodeId(id),
473 0 : host,
474 0 : pg_port,
475 0 : };
476 0 : new_members.add(sk)?;
477 : }
478 0 : Ok(membership::Configuration {
479 0 : generation,
480 0 : members,
481 0 : new_members: Some(new_members),
482 0 : })
483 : }
484 20318 : }
485 :
486 : /// Parse proposer message.
487 26464 : pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
488 26464 : if proto_version == SK_PROTO_VERSION_3 {
489 26464 : if msg_bytes.is_empty() {
490 0 : bail!("ProposerAcceptorMessage is not complete: missing tag");
491 26464 : }
492 26464 : let tag = msg_bytes.get_u8_f().with_context(|| {
493 0 : "ProposerAcceptorMessage is not complete: missing tag".to_string()
494 0 : })? as char;
495 26464 : match tag {
496 : 'g' => {
497 20318 : let tenant_id_str =
498 20318 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
499 20318 : let tenant_id = TenantId::from_str(&tenant_id_str)?;
500 20318 : let timeline_id_str =
501 20318 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
502 20318 : let timeline_id = TimelineId::from_str(&timeline_id_str)?;
503 20318 : let mconf = Self::get_mconf(&mut msg_bytes)?;
504 20318 : let pg_version = msg_bytes
505 20318 : .get_u32_f()
506 20318 : .with_context(|| "reading pg_version")?;
507 20318 : let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
508 20318 : let wal_seg_size = msg_bytes
509 20318 : .get_u32_f()
510 20318 : .with_context(|| "reading wal_seg_size")?;
511 20318 : let g = ProposerGreeting {
512 20318 : tenant_id,
513 20318 : timeline_id,
514 20318 : mconf,
515 20318 : pg_version: PgVersionId::from_full_pg_version(pg_version),
516 20318 : system_id,
517 20318 : wal_seg_size,
518 20318 : };
519 20318 : Ok(ProposerAcceptorMessage::Greeting(g))
520 : }
521 : 'v' => {
522 2578 : let generation = Generation::new(
523 2578 : msg_bytes
524 2578 : .get_u32_f()
525 2578 : .with_context(|| "reading generation")?,
526 : );
527 2578 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
528 2578 : let v = VoteRequest { generation, term };
529 2578 : Ok(ProposerAcceptorMessage::VoteRequest(v))
530 : }
531 : 'e' => {
532 680 : let generation = Generation::new(
533 680 : msg_bytes
534 680 : .get_u32_f()
535 680 : .with_context(|| "reading generation")?,
536 : );
537 680 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
538 680 : let start_streaming_at: Lsn = msg_bytes
539 680 : .get_u64_f()
540 680 : .with_context(|| "reading start_streaming_at")?
541 680 : .into();
542 680 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
543 680 : let msg = ProposerElected {
544 680 : generation,
545 680 : term,
546 680 : start_streaming_at,
547 680 : term_history,
548 680 : };
549 680 : Ok(ProposerAcceptorMessage::Elected(msg))
550 : }
551 : 'a' => {
552 2888 : let generation = Generation::new(
553 2888 : msg_bytes
554 2888 : .get_u32_f()
555 2888 : .with_context(|| "reading generation")?,
556 : );
557 2888 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
558 2888 : let begin_lsn: Lsn = msg_bytes
559 2888 : .get_u64_f()
560 2888 : .with_context(|| "reading begin_lsn")?
561 2888 : .into();
562 2888 : let end_lsn: Lsn = msg_bytes
563 2888 : .get_u64_f()
564 2888 : .with_context(|| "reading end_lsn")?
565 2888 : .into();
566 2888 : let commit_lsn: Lsn = msg_bytes
567 2888 : .get_u64_f()
568 2888 : .with_context(|| "reading commit_lsn")?
569 2888 : .into();
570 2888 : let truncate_lsn: Lsn = msg_bytes
571 2888 : .get_u64_f()
572 2888 : .with_context(|| "reading truncate_lsn")?
573 2888 : .into();
574 2888 : let hdr = AppendRequestHeader {
575 2888 : generation,
576 2888 : term,
577 2888 : begin_lsn,
578 2888 : end_lsn,
579 2888 : commit_lsn,
580 2888 : truncate_lsn,
581 2888 : };
582 2888 : let rec_size = hdr
583 2888 : .end_lsn
584 2888 : .checked_sub(hdr.begin_lsn)
585 2888 : .context("begin_lsn > end_lsn in AppendRequest")?
586 : .0 as usize;
587 2888 : if rec_size > MAX_SEND_SIZE {
588 0 : bail!(
589 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
590 : MAX_SEND_SIZE
591 : );
592 2888 : }
593 2888 : if msg_bytes.remaining() < rec_size {
594 0 : bail!(
595 0 : "reading WAL: only {} bytes left, wanted {}",
596 0 : msg_bytes.remaining(),
597 : rec_size
598 : );
599 2888 : }
600 2888 : let wal_data = msg_bytes.copy_to_bytes(rec_size);
601 2888 : let msg = AppendRequest { h: hdr, wal_data };
602 :
603 2888 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
604 : }
605 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
606 : }
607 0 : } else if proto_version == SK_PROTO_VERSION_2 {
608 : // xxx using Reader is inefficient but easy to work with bincode
609 0 : let mut stream = msg_bytes.reader();
610 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
611 0 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
612 0 : match tag {
613 : 'g' => {
614 0 : let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
615 0 : let g = ProposerGreeting {
616 0 : tenant_id: msgv2.tenant_id,
617 0 : timeline_id: msgv2.timeline_id,
618 0 : mconf: membership::Configuration {
619 0 : generation: INVALID_GENERATION,
620 0 : members: MemberSet::empty(),
621 0 : new_members: None,
622 0 : },
623 0 : pg_version: msgv2.pg_version,
624 0 : system_id: msgv2.system_id,
625 0 : wal_seg_size: msgv2.wal_seg_size,
626 0 : };
627 0 : Ok(ProposerAcceptorMessage::Greeting(g))
628 : }
629 : 'v' => {
630 0 : let msg = VoteRequestV2::des_from(&mut stream)?;
631 0 : let v = VoteRequest {
632 0 : generation: INVALID_GENERATION,
633 0 : term: msg.term,
634 0 : };
635 0 : Ok(ProposerAcceptorMessage::VoteRequest(v))
636 : }
637 : 'e' => {
638 0 : let mut msg_bytes = stream.into_inner();
639 0 : if msg_bytes.remaining() < 16 {
640 0 : bail!("ProposerElected message is not complete");
641 0 : }
642 0 : let term = msg_bytes.get_u64_le();
643 0 : let start_streaming_at = msg_bytes.get_u64_le().into();
644 0 : let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
645 0 : if msg_bytes.remaining() < 8 {
646 0 : bail!("ProposerElected message is not complete");
647 0 : }
648 0 : let _timeline_start_lsn = msg_bytes.get_u64_le();
649 0 : let msg = ProposerElected {
650 0 : generation: INVALID_GENERATION,
651 0 : term,
652 0 : start_streaming_at,
653 0 : term_history,
654 0 : };
655 0 : Ok(ProposerAcceptorMessage::Elected(msg))
656 : }
657 : 'a' => {
658 : // read header followed by wal data
659 0 : let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
660 0 : let hdr = AppendRequestHeader {
661 0 : generation: INVALID_GENERATION,
662 0 : term: hdrv2.term,
663 0 : begin_lsn: hdrv2.begin_lsn,
664 0 : end_lsn: hdrv2.end_lsn,
665 0 : commit_lsn: hdrv2.commit_lsn,
666 0 : truncate_lsn: hdrv2.truncate_lsn,
667 0 : };
668 0 : let rec_size = hdr
669 0 : .end_lsn
670 0 : .checked_sub(hdr.begin_lsn)
671 0 : .context("begin_lsn > end_lsn in AppendRequest")?
672 : .0 as usize;
673 0 : if rec_size > MAX_SEND_SIZE {
674 0 : bail!(
675 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
676 : MAX_SEND_SIZE
677 : );
678 0 : }
679 :
680 0 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
681 0 : stream.read_exact(&mut wal_data_vec)?;
682 0 : let wal_data = Bytes::from(wal_data_vec);
683 :
684 0 : let msg = AppendRequest { h: hdr, wal_data };
685 :
686 0 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
687 : }
688 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
689 : }
690 : } else {
691 0 : bail!("unsupported protocol version {}", proto_version);
692 : }
693 26464 : }
694 :
695 : /// The memory size of the message, including byte slices.
696 620 : pub fn size(&self) -> usize {
697 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
698 :
699 : // For most types, the size is just the base enum size including the nested structs. Some
700 : // types also contain byte slices; add them.
701 : //
702 : // We explicitly list all fields, to draw attention here when new fields are added.
703 620 : let mut size = BASE_SIZE;
704 620 : size += match self {
705 0 : Self::Greeting(_) => 0,
706 :
707 0 : Self::VoteRequest(_) => 0,
708 :
709 0 : Self::Elected(_) => 0,
710 :
711 : Self::AppendRequest(AppendRequest {
712 : h:
713 : AppendRequestHeader {
714 : generation: _,
715 : term: _,
716 : begin_lsn: _,
717 : end_lsn: _,
718 : commit_lsn: _,
719 : truncate_lsn: _,
720 : },
721 620 : wal_data,
722 620 : }) => wal_data.len(),
723 :
724 : Self::NoFlushAppendRequest(AppendRequest {
725 : h:
726 : AppendRequestHeader {
727 : generation: _,
728 : term: _,
729 : begin_lsn: _,
730 : end_lsn: _,
731 : commit_lsn: _,
732 : truncate_lsn: _,
733 : },
734 0 : wal_data,
735 0 : }) => wal_data.len(),
736 :
737 0 : Self::FlushWAL => 0,
738 : };
739 :
740 620 : size
741 620 : }
742 : }
743 :
744 : /// Acceptor -> Proposer messages
745 : #[derive(Debug)]
746 : pub enum AcceptorProposerMessage {
747 : Greeting(AcceptorGreeting),
748 : VoteResponse(VoteResponse),
749 : AppendResponse(AppendResponse),
750 : }
751 :
752 : impl AcceptorProposerMessage {
753 0 : fn put_cstr(buf: &mut BytesMut, s: &str) {
754 0 : buf.put_slice(s.as_bytes());
755 0 : buf.put_u8(0); // null terminator
756 0 : }
757 :
758 : /// Serialize membership::Configuration into buf.
759 20318 : fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
760 20318 : buf.put_u32(mconf.generation.into_inner());
761 20318 : buf.put_u32(mconf.members.m.len() as u32);
762 20318 : for sk in &mconf.members.m {
763 0 : buf.put_u64(sk.id.0);
764 0 : Self::put_cstr(buf, &sk.host);
765 0 : buf.put_u16(sk.pg_port);
766 0 : }
767 20318 : if let Some(ref new_members) = mconf.new_members {
768 0 : buf.put_u32(new_members.m.len() as u32);
769 0 : for sk in &new_members.m {
770 0 : buf.put_u64(sk.id.0);
771 0 : Self::put_cstr(buf, &sk.host);
772 0 : buf.put_u16(sk.pg_port);
773 0 : }
774 20318 : } else {
775 20318 : buf.put_u32(0);
776 20318 : }
777 20318 : }
778 :
779 : /// Serialize acceptor -> proposer message.
780 25283 : pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
781 25283 : if proto_version == SK_PROTO_VERSION_3 {
782 25283 : match self {
783 20318 : AcceptorProposerMessage::Greeting(msg) => {
784 20318 : buf.put_u8(b'g');
785 20318 : buf.put_u64(msg.node_id.0);
786 20318 : Self::serialize_mconf(buf, &msg.mconf);
787 20318 : buf.put_u64(msg.term)
788 : }
789 2578 : AcceptorProposerMessage::VoteResponse(msg) => {
790 2578 : buf.put_u8(b'v');
791 2578 : buf.put_u32(msg.generation.into_inner());
792 2578 : buf.put_u64(msg.term);
793 2578 : buf.put_u8(msg.vote_given as u8);
794 2578 : buf.put_u64(msg.flush_lsn.into());
795 2578 : buf.put_u64(msg.truncate_lsn.into());
796 2578 : buf.put_u32(msg.term_history.0.len() as u32);
797 9545 : for e in &msg.term_history.0 {
798 6967 : buf.put_u64(e.term);
799 6967 : buf.put_u64(e.lsn.into());
800 6967 : }
801 : }
802 2387 : AcceptorProposerMessage::AppendResponse(msg) => {
803 2387 : buf.put_u8(b'a');
804 2387 : buf.put_u32(msg.generation.into_inner());
805 2387 : buf.put_u64(msg.term);
806 2387 : buf.put_u64(msg.flush_lsn.into());
807 2387 : buf.put_u64(msg.commit_lsn.into());
808 2387 : buf.put_i64(msg.hs_feedback.ts);
809 2387 : buf.put_u64(msg.hs_feedback.xmin);
810 2387 : buf.put_u64(msg.hs_feedback.catalog_xmin);
811 :
812 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
813 : // if it is not present.
814 2387 : if let Some(ref msg) = msg.pageserver_feedback {
815 0 : msg.serialize(buf);
816 2387 : }
817 : }
818 : }
819 25283 : Ok(())
820 : // TODO remove 3 after converting all msgs
821 0 : } else if proto_version == SK_PROTO_VERSION_2 {
822 0 : match self {
823 0 : AcceptorProposerMessage::Greeting(msg) => {
824 0 : buf.put_u64_le('g' as u64);
825 0 : // v2 didn't have mconf and fields were reordered
826 0 : buf.put_u64_le(msg.term);
827 0 : buf.put_u64_le(msg.node_id.0);
828 0 : }
829 0 : AcceptorProposerMessage::VoteResponse(msg) => {
830 : // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
831 0 : buf.put_u64_le('v' as u64);
832 0 : buf.put_u64_le(msg.term);
833 0 : buf.put_u64_le(msg.vote_given as u64);
834 0 : buf.put_u64_le(msg.flush_lsn.into());
835 0 : buf.put_u64_le(msg.truncate_lsn.into());
836 0 : buf.put_u32_le(msg.term_history.0.len() as u32);
837 0 : for e in &msg.term_history.0 {
838 0 : buf.put_u64_le(e.term);
839 0 : buf.put_u64_le(e.lsn.into());
840 0 : }
841 : // removed timeline_start_lsn
842 0 : buf.put_u64_le(0);
843 : }
844 0 : AcceptorProposerMessage::AppendResponse(msg) => {
845 : // v2 didn't have generation
846 0 : buf.put_u64_le('a' as u64);
847 0 : buf.put_u64_le(msg.term);
848 0 : buf.put_u64_le(msg.flush_lsn.into());
849 0 : buf.put_u64_le(msg.commit_lsn.into());
850 0 : buf.put_i64_le(msg.hs_feedback.ts);
851 0 : buf.put_u64_le(msg.hs_feedback.xmin);
852 0 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
853 :
854 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
855 : // if it is not present.
856 0 : if let Some(ref msg) = msg.pageserver_feedback {
857 0 : msg.serialize(buf);
858 0 : }
859 : }
860 : }
861 0 : Ok(())
862 : } else {
863 0 : bail!("unsupported protocol version {}", proto_version);
864 : }
865 25283 : }
866 : }
867 :
868 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
869 : /// It controls all WAL disk writes and updates of control file.
870 : ///
871 : /// Currently safekeeper processes:
872 : /// - messages from compute (proposers) and provides replies
873 : /// - messages from broker peers
874 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
875 : /// LSN since the proposer safekeeper currently talking to appends WAL;
876 : /// determines last_log_term switch point.
877 : pub term_start_lsn: Lsn,
878 :
879 : pub state: TimelineState<CTRL>, // persistent state storage
880 : pub wal_store: WAL,
881 :
882 : node_id: NodeId, // safekeeper's node id
883 : }
884 :
885 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
886 : where
887 : CTRL: control_file::Storage,
888 : WAL: wal_storage::Storage,
889 : {
890 : /// Accepts a control file storage containing the safekeeper state.
891 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
892 : /// and `server` (`wal_seg_size` inside it) fields.
893 9112 : pub fn new(
894 9112 : state: TimelineState<CTRL>,
895 9112 : wal_store: WAL,
896 9112 : node_id: NodeId,
897 9112 : ) -> Result<SafeKeeper<CTRL, WAL>> {
898 9112 : if state.tenant_id == TenantId::from([0u8; 16])
899 9112 : || state.timeline_id == TimelineId::from([0u8; 16])
900 : {
901 0 : bail!(
902 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
903 0 : state.tenant_id,
904 0 : state.timeline_id
905 : );
906 9 : }
907 :
908 9112 : Ok(SafeKeeper {
909 9112 : term_start_lsn: Lsn(0),
910 9112 : state,
911 9112 : wal_store,
912 9112 : node_id,
913 9112 : })
914 9 : }
915 :
916 : /// Get history of term switches for the available WAL
917 3267 : fn get_term_history(&self) -> TermHistory {
918 3267 : self.state
919 3267 : .acceptor_state
920 3267 : .term_history
921 3267 : .up_to(self.flush_lsn())
922 9 : }
923 :
924 43 : pub fn get_last_log_term(&self) -> Term {
925 43 : self.state
926 43 : .acceptor_state
927 43 : .get_last_log_term(self.flush_lsn())
928 2 : }
929 :
930 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
931 15483 : pub fn flush_lsn(&self) -> Lsn {
932 15483 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
933 1883 : }
934 :
935 : /// Process message from proposer and possibly form reply. Concurrent
936 : /// callers must exclude each other.
937 30107 : pub async fn process_msg(
938 30107 : &mut self,
939 30107 : msg: &ProposerAcceptorMessage,
940 30107 : ) -> Result<Option<AcceptorProposerMessage>> {
941 30107 : let res = match msg {
942 20318 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
943 2581 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
944 688 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
945 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
946 5 : self.handle_append_request(msg, true).await
947 : }
948 3508 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
949 3508 : self.handle_append_request(msg, false).await
950 : }
951 3007 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
952 : };
953 :
954 : // BEGIN HADRON
955 30107 : match &res {
956 30103 : Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
957 30103 : .with_label_values(&["success"])
958 30103 : .inc(),
959 4 : Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
960 4 : .with_label_values(&["error"])
961 4 : .inc(),
962 : };
963 :
964 30107 : res
965 : // END HADRON
966 1256 : }
967 :
968 : /// Handle initial message from proposer: check its sanity and send my
969 : /// current term.
970 20318 : async fn handle_greeting(
971 20318 : &mut self,
972 20318 : msg: &ProposerGreeting,
973 20318 : ) -> Result<Option<AcceptorProposerMessage>> {
974 : /* Postgres major version mismatch is treated as fatal error
975 : * because safekeepers parse WAL headers and the format
976 : * may change between versions.
977 : */
978 20318 : if PgMajorVersion::try_from(msg.pg_version)?
979 20318 : != PgMajorVersion::try_from(self.state.server.pg_version)?
980 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
981 : {
982 0 : bail!(
983 0 : "incompatible server version {}, expected {}",
984 : msg.pg_version,
985 0 : self.state.server.pg_version
986 : );
987 0 : }
988 :
989 20318 : if msg.tenant_id != self.state.tenant_id {
990 0 : bail!(
991 0 : "invalid tenant ID, got {}, expected {}",
992 : msg.tenant_id,
993 0 : self.state.tenant_id
994 : );
995 0 : }
996 20318 : if msg.timeline_id != self.state.timeline_id {
997 0 : bail!(
998 0 : "invalid timeline ID, got {}, expected {}",
999 : msg.timeline_id,
1000 0 : self.state.timeline_id
1001 : );
1002 0 : }
1003 20318 : if self.state.server.wal_seg_size != msg.wal_seg_size {
1004 0 : bail!(
1005 0 : "invalid wal_seg_size, got {}, expected {}",
1006 : msg.wal_seg_size,
1007 0 : self.state.server.wal_seg_size
1008 : );
1009 0 : }
1010 :
1011 : // system_id will be updated on mismatch
1012 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
1013 20318 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
1014 0 : if self.state.server.system_id != 0 {
1015 0 : warn!(
1016 0 : "unexpected system ID arrived, got {}, expected {}",
1017 0 : msg.system_id, self.state.server.system_id
1018 : );
1019 0 : }
1020 :
1021 0 : let mut state = self.state.start_change();
1022 0 : state.server.system_id = msg.system_id;
1023 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
1024 0 : state.server.pg_version = msg.pg_version;
1025 0 : }
1026 0 : self.state.finish_change(&state).await?;
1027 0 : }
1028 :
1029 : // Switch into conf given by proposer conf if it is higher.
1030 20318 : self.state.membership_switch(msg.mconf.clone()).await?;
1031 :
1032 20318 : let apg = AcceptorGreeting {
1033 20318 : node_id: self.node_id,
1034 20318 : mconf: self.state.mconf.clone(),
1035 20318 : term: self.state.acceptor_state.term,
1036 20318 : };
1037 20318 : info!(
1038 0 : "processed greeting {:?} from walproposer, sending {:?}",
1039 : msg, apg
1040 : );
1041 20318 : Ok(Some(AcceptorProposerMessage::Greeting(apg)))
1042 0 : }
1043 :
1044 : /// Give vote for the given term, if we haven't done that previously.
1045 2581 : async fn handle_vote_request(
1046 2581 : &mut self,
1047 2581 : msg: &VoteRequest,
1048 2581 : ) -> Result<Option<AcceptorProposerMessage>> {
1049 2581 : if self.state.mconf.generation != msg.generation {
1050 1 : bail!(
1051 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1052 : msg,
1053 1 : self.state.mconf.generation
1054 : );
1055 2 : }
1056 : // Once voted, we won't accept data from older proposers; flush
1057 : // everything we've already received so that new proposer starts
1058 : // streaming at end of our WAL, without overlap. WAL is truncated at
1059 : // streaming point and commit_lsn may be advanced from peers, so this
1060 : // also avoids possible spurious attempt to truncate committed WAL.
1061 2580 : self.wal_store.flush_wal().await?;
1062 : // initialize with refusal
1063 2580 : let mut resp = VoteResponse {
1064 2580 : generation: self.state.mconf.generation,
1065 2580 : term: self.state.acceptor_state.term,
1066 2580 : vote_given: false,
1067 2580 : flush_lsn: self.flush_lsn(),
1068 2580 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
1069 2580 : term_history: self.get_term_history(),
1070 2580 : };
1071 2580 : if self.state.acceptor_state.term < msg.term {
1072 2472 : let mut state = self.state.start_change();
1073 2472 : state.acceptor_state.term = msg.term;
1074 : // persist vote before sending it out
1075 2472 : self.state.finish_change(&state).await?;
1076 :
1077 2472 : resp.term = self.state.acceptor_state.term;
1078 2472 : resp.vote_given = true;
1079 1 : }
1080 2580 : info!("processed {:?}: sending {:?}", msg, &resp);
1081 2580 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
1082 3 : }
1083 :
1084 : /// Form AppendResponse from current state.
1085 3010 : fn append_response(&self) -> AppendResponse {
1086 3010 : let ar = AppendResponse {
1087 3010 : generation: self.state.mconf.generation,
1088 3010 : term: self.state.acceptor_state.term,
1089 3010 : flush_lsn: self.flush_lsn(),
1090 3010 : commit_lsn: self.state.commit_lsn,
1091 3010 : // will be filled by the upper code to avoid bothering safekeeper
1092 3010 : hs_feedback: HotStandbyFeedback::empty(),
1093 3010 : pageserver_feedback: None,
1094 3010 : };
1095 3010 : trace!("formed AppendResponse {:?}", ar);
1096 3010 : ar
1097 623 : }
1098 :
1099 688 : async fn handle_elected(
1100 688 : &mut self,
1101 688 : msg: &ProposerElected,
1102 688 : ) -> Result<Option<AcceptorProposerMessage>> {
1103 688 : let _timer = MISC_OPERATION_SECONDS
1104 688 : .with_label_values(&["handle_elected"])
1105 688 : .start_timer();
1106 :
1107 688 : info!(
1108 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
1109 : msg,
1110 0 : self.state.acceptor_state.term,
1111 0 : self.get_last_log_term(),
1112 0 : self.flush_lsn()
1113 : );
1114 688 : if self.state.mconf.generation != msg.generation {
1115 1 : bail!(
1116 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1117 : msg,
1118 1 : self.state.mconf.generation
1119 : );
1120 7 : }
1121 687 : if self.state.acceptor_state.term < msg.term {
1122 7 : let mut state = self.state.start_change();
1123 7 : state.acceptor_state.term = msg.term;
1124 7 : self.state.finish_change(&state).await?;
1125 0 : }
1126 :
1127 : // If our term is higher, ignore the message (next feedback will inform the compute)
1128 687 : if self.state.acceptor_state.term > msg.term {
1129 0 : return Ok(None);
1130 7 : }
1131 :
1132 : // Before truncating WAL check-cross the check divergence point received
1133 : // from the walproposer.
1134 687 : let sk_th = self.get_term_history();
1135 687 : let last_common_point = match TermHistory::find_highest_common_point(
1136 687 : &msg.term_history,
1137 687 : &sk_th,
1138 687 : self.flush_lsn(),
1139 687 : ) {
1140 : // No common point. Expect streaming from the beginning of the
1141 : // history like walproposer while we don't have proper init.
1142 139 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
1143 139 : "empty walproposer term history {:?}",
1144 : msg.term_history
1145 0 : ))?,
1146 548 : Some(lcp) => lcp,
1147 : };
1148 : // This is expected to happen in a rare race when another connection
1149 : // from the same walproposer writes + flushes WAL after this connection
1150 : // sent flush_lsn in VoteRequest; for instance, very late
1151 : // ProposerElected message delivery after another connection was
1152 : // established and wrote WAL. In such cases error is transient;
1153 : // reconnection makes safekeeper send newest term history and flush_lsn
1154 : // and walproposer recalculates the streaming point. OTOH repeating
1155 : // error indicates a serious bug.
1156 687 : if last_common_point.lsn != msg.start_streaming_at {
1157 0 : bail!(
1158 0 : "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1159 : last_common_point,
1160 : msg.start_streaming_at,
1161 0 : self.state.acceptor_state.term,
1162 : sk_th,
1163 0 : self.flush_lsn(),
1164 : msg.term_history,
1165 : );
1166 7 : }
1167 :
1168 : // We are also expected to never attempt to truncate committed data.
1169 687 : assert!(
1170 687 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
1171 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1172 : msg.start_streaming_at,
1173 : self.state.inmem.commit_lsn,
1174 0 : self.state.acceptor_state.term,
1175 : sk_th,
1176 0 : self.flush_lsn(),
1177 : msg.term_history,
1178 : );
1179 :
1180 : // Before first WAL write initialize its segment. It makes first segment
1181 : // pg_waldump'able because stream from compute doesn't include its
1182 : // segment and page headers.
1183 : //
1184 : // If we fail before first WAL write flush this action would be
1185 : // repeated, that's ok because it is idempotent.
1186 687 : if self.wal_store.flush_lsn() == Lsn::INVALID {
1187 139 : self.wal_store
1188 139 : .initialize_first_segment(msg.start_streaming_at)
1189 139 : .await?;
1190 0 : }
1191 :
1192 : // truncate wal, update the LSNs
1193 687 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
1194 :
1195 : // and now adopt term history from proposer
1196 : {
1197 687 : let mut state = self.state.start_change();
1198 :
1199 : // Here we learn initial LSN for the first time, set fields
1200 : // interested in that.
1201 :
1202 687 : if let Some(start_lsn) = msg.term_history.0.first() {
1203 687 : if state.timeline_start_lsn == Lsn(0) {
1204 : // Remember point where WAL begins globally. In the future it
1205 : // will be intialized immediately on timeline creation.
1206 139 : state.timeline_start_lsn = start_lsn.lsn;
1207 139 : info!(
1208 0 : "setting timeline_start_lsn to {:?}",
1209 : state.timeline_start_lsn
1210 : );
1211 0 : }
1212 0 : }
1213 :
1214 687 : if state.peer_horizon_lsn == Lsn(0) {
1215 139 : // Update peer_horizon_lsn as soon as we know where timeline starts.
1216 139 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
1217 139 : state.peer_horizon_lsn = state.timeline_start_lsn;
1218 139 : }
1219 687 : if state.local_start_lsn == Lsn(0) {
1220 139 : state.local_start_lsn = msg.start_streaming_at;
1221 139 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
1222 0 : }
1223 : // Initializing commit_lsn before acking first flushed record is
1224 : // important to let find_end_of_wal skip the hole in the beginning
1225 : // of the first segment.
1226 : //
1227 : // NB: on new clusters, this happens at the same time as
1228 : // timeline_start_lsn initialization, it is taken outside to provide
1229 : // upgrade.
1230 687 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
1231 :
1232 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
1233 687 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
1234 : // similar for remote_consistent_lsn
1235 687 : state.remote_consistent_lsn =
1236 687 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
1237 :
1238 687 : state.acceptor_state.term_history = msg.term_history.clone();
1239 687 : self.state.finish_change(&state).await?;
1240 : }
1241 :
1242 687 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
1243 :
1244 : // Cache LSN where term starts to immediately fsync control file with
1245 : // commit_lsn once we reach it -- sync-safekeepers finishes when
1246 : // persisted commit_lsn on majority of safekeepers aligns.
1247 687 : self.term_start_lsn = match msg.term_history.0.last() {
1248 0 : None => bail!("proposer elected with empty term history"),
1249 687 : Some(term_lsn_start) => term_lsn_start.lsn,
1250 : };
1251 :
1252 687 : Ok(None)
1253 8 : }
1254 :
1255 : /// Advance commit_lsn taking into account what we have locally.
1256 : ///
1257 : /// Note: it is assumed that 'WAL we have is from the right term' check has
1258 : /// already been done outside.
1259 2347 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
1260 : // Both peers and walproposer communicate this value, we might already
1261 : // have a fresher (higher) version.
1262 2347 : candidate = max(candidate, self.state.inmem.commit_lsn);
1263 2347 : let commit_lsn = min(candidate, self.flush_lsn());
1264 2347 : assert!(
1265 2347 : commit_lsn >= self.state.inmem.commit_lsn,
1266 0 : "commit_lsn monotonicity violated: old={} new={}",
1267 : self.state.inmem.commit_lsn,
1268 : commit_lsn
1269 : );
1270 :
1271 2347 : self.state.inmem.commit_lsn = commit_lsn;
1272 :
1273 : // If new commit_lsn reached term switch, force sync of control
1274 : // file: walproposer in sync mode is very interested when this
1275 : // happens. Note: this is for sync-safekeepers mode only, as
1276 : // otherwise commit_lsn might jump over term_start_lsn.
1277 2347 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
1278 85 : self.state.flush().await?;
1279 620 : }
1280 :
1281 2347 : Ok(())
1282 620 : }
1283 :
1284 : /// Handle request to append WAL.
1285 : #[allow(clippy::comparison_chain)]
1286 3513 : async fn handle_append_request(
1287 3513 : &mut self,
1288 3513 : msg: &AppendRequest,
1289 3513 : require_flush: bool,
1290 3513 : ) -> Result<Option<AcceptorProposerMessage>> {
1291 : // Refuse message on generation mismatch. On reconnect wp will get full
1292 : // configuration from greeting.
1293 3513 : if self.state.mconf.generation != msg.h.generation {
1294 1 : bail!(
1295 1 : "refusing append request due to generation mismatch: request {}, sk {}",
1296 : msg.h.generation,
1297 1 : self.state.mconf.generation
1298 : );
1299 624 : }
1300 :
1301 3512 : if self.state.acceptor_state.term < msg.h.term {
1302 0 : bail!("got AppendRequest before ProposerElected");
1303 624 : }
1304 :
1305 : // If our term is higher, immediately refuse the message. Send term only
1306 : // response; elected walproposer can never advance the term, so it will
1307 : // figure out the refusal from it -- which is important as term change
1308 : // should cause not just reconnection but whole walproposer re-election.
1309 3512 : if self.state.acceptor_state.term > msg.h.term {
1310 0 : let resp = AppendResponse::term_only(
1311 0 : self.state.mconf.generation,
1312 0 : self.state.acceptor_state.term,
1313 : );
1314 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
1315 624 : }
1316 :
1317 : // Disallow any non-sequential writes, which can result in gaps or
1318 : // overwrites. If we need to move the pointer, ProposerElected message
1319 : // should have truncated WAL first accordingly. Note that the first
1320 : // condition (WAL rewrite) is quite expected in real world; it happens
1321 : // when walproposer reconnects to safekeeper and writes some more data
1322 : // while first connection still gets some packets later. It might be
1323 : // better to not log this as error! above.
1324 3512 : let write_lsn = self.wal_store.write_lsn();
1325 3512 : let flush_lsn = self.wal_store.flush_lsn();
1326 3512 : if write_lsn > msg.h.begin_lsn {
1327 1 : bail!(
1328 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
1329 : write_lsn,
1330 : msg.h.begin_lsn
1331 : );
1332 623 : }
1333 3511 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
1334 0 : bail!(
1335 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
1336 : write_lsn,
1337 : msg.h.begin_lsn,
1338 : );
1339 623 : }
1340 :
1341 : // Now we know that we are in the same term as the proposer, process the
1342 : // message.
1343 :
1344 : // do the job
1345 3511 : if !msg.wal_data.is_empty() {
1346 1356 : self.wal_store
1347 1356 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
1348 1356 : .await?;
1349 0 : }
1350 :
1351 : // flush wal to the disk, if required
1352 3511 : if require_flush {
1353 3 : self.wal_store.flush_wal().await?;
1354 620 : }
1355 :
1356 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
1357 : // manager, off of the WAL ingest hot path.
1358 3511 : if msg.h.commit_lsn != Lsn(0) {
1359 2347 : self.update_commit_lsn(msg.h.commit_lsn).await?;
1360 3 : }
1361 : // Value calculated by walproposer can always lag:
1362 : // - safekeepers can forget inmem value and send to proposer lower
1363 : // persisted one on restart;
1364 : // - if we make safekeepers always send persistent value,
1365 : // any compute restart would pull it down.
1366 : // Thus, take max before adopting.
1367 3511 : self.state.inmem.peer_horizon_lsn =
1368 3511 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
1369 :
1370 3511 : trace!(
1371 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
1372 0 : msg.wal_data.len(),
1373 : msg.h.begin_lsn,
1374 : msg.h.end_lsn,
1375 : msg.h.commit_lsn,
1376 : msg.h.truncate_lsn,
1377 : require_flush,
1378 : );
1379 :
1380 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
1381 : // This is the common case for !require_flush, but a flush can still
1382 : // happen on segment bounds.
1383 3511 : if !require_flush && flush_lsn == self.flush_lsn() {
1384 3508 : return Ok(None);
1385 3 : }
1386 :
1387 3 : let resp = self.append_response();
1388 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1389 625 : }
1390 :
1391 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1392 3007 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1393 3007 : self.wal_store.flush_wal().await?;
1394 3007 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1395 3007 : self.append_response(),
1396 3007 : )))
1397 620 : }
1398 :
1399 : /// Update commit_lsn from peer safekeeper data.
1400 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1401 0 : if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
1402 : // Note: the check is too restrictive, generally we can update local
1403 : // commit_lsn if our history matches (is part of) history of advanced
1404 : // commit_lsn provider.
1405 0 : if sk_info.last_log_term == self.get_last_log_term() {
1406 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1407 0 : }
1408 0 : }
1409 0 : Ok(())
1410 0 : }
1411 : }
1412 :
1413 : #[cfg(test)]
1414 : mod tests {
1415 : use std::ops::Deref;
1416 : use std::str::FromStr;
1417 : use std::time::{Instant, UNIX_EPOCH};
1418 :
1419 : use futures::future::BoxFuture;
1420 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
1421 : use safekeeper_api::ServerInfo;
1422 : use safekeeper_api::membership::{
1423 : Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
1424 : };
1425 :
1426 : use super::*;
1427 : use crate::state::{EvictionState, TimelinePersistentState};
1428 :
1429 : // fake storage for tests
1430 : struct InMemoryState {
1431 : persisted_state: TimelinePersistentState,
1432 : }
1433 :
1434 : impl control_file::Storage for InMemoryState {
1435 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1436 5 : self.persisted_state = s.clone();
1437 5 : Ok(())
1438 5 : }
1439 :
1440 0 : fn last_persist_at(&self) -> Instant {
1441 0 : Instant::now()
1442 0 : }
1443 : }
1444 :
1445 : impl Deref for InMemoryState {
1446 : type Target = TimelinePersistentState;
1447 :
1448 100 : fn deref(&self) -> &Self::Target {
1449 100 : &self.persisted_state
1450 100 : }
1451 : }
1452 :
1453 3 : fn test_sk_state() -> TimelinePersistentState {
1454 3 : let mut state = TimelinePersistentState::empty();
1455 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1456 3 : state.tenant_id = TenantId::from([1u8; 16]);
1457 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1458 3 : state
1459 3 : }
1460 :
1461 : struct DummyWalStore {
1462 : lsn: Lsn,
1463 : }
1464 :
1465 : impl wal_storage::Storage for DummyWalStore {
1466 4 : fn write_lsn(&self) -> Lsn {
1467 4 : self.lsn
1468 4 : }
1469 :
1470 19 : fn flush_lsn(&self) -> Lsn {
1471 19 : self.lsn
1472 19 : }
1473 :
1474 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1475 2 : Ok(())
1476 2 : }
1477 :
1478 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1479 3 : self.lsn = startpos + buf.len() as u64;
1480 3 : Ok(())
1481 3 : }
1482 :
1483 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1484 2 : self.lsn = end_pos;
1485 2 : Ok(())
1486 2 : }
1487 :
1488 5 : async fn flush_wal(&mut self) -> Result<()> {
1489 5 : Ok(())
1490 5 : }
1491 :
1492 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1493 0 : Box::pin(async { Ok(()) })
1494 0 : }
1495 :
1496 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1497 0 : crate::metrics::WalStorageMetrics::default()
1498 0 : }
1499 : }
1500 :
1501 : #[tokio::test]
1502 1 : async fn test_voting() {
1503 1 : let storage = InMemoryState {
1504 1 : persisted_state: test_sk_state(),
1505 1 : };
1506 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1507 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1508 :
1509 : // Vote with generation mismatch should be rejected.
1510 1 : let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1511 1 : generation: SafekeeperGeneration::new(42),
1512 1 : term: 1,
1513 1 : });
1514 1 : assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
1515 :
1516 : // check voting for 1 is ok
1517 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1518 1 : generation: Generation::new(0),
1519 1 : term: 1,
1520 1 : });
1521 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1522 1 : match vote_resp.unwrap() {
1523 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
1524 0 : r => panic!("unexpected response: {r:?}"),
1525 : }
1526 :
1527 : // reboot...
1528 1 : let state = sk.state.deref().clone();
1529 1 : let storage = InMemoryState {
1530 1 : persisted_state: state,
1531 1 : };
1532 :
1533 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1534 :
1535 : // and ensure voting second time for 1 is not ok
1536 1 : vote_resp = sk.process_msg(&vote_request).await;
1537 1 : match vote_resp.unwrap() {
1538 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
1539 1 : r => panic!("unexpected response: {r:?}"),
1540 1 : }
1541 1 : }
1542 :
1543 : #[tokio::test]
1544 1 : async fn test_last_log_term_switch() {
1545 1 : let storage = InMemoryState {
1546 1 : persisted_state: test_sk_state(),
1547 1 : };
1548 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1549 :
1550 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1551 :
1552 1 : let mut ar_hdr = AppendRequestHeader {
1553 1 : generation: Generation::new(0),
1554 1 : term: 2,
1555 1 : begin_lsn: Lsn(1),
1556 1 : end_lsn: Lsn(2),
1557 1 : commit_lsn: Lsn(0),
1558 1 : truncate_lsn: Lsn(0),
1559 1 : };
1560 1 : let mut append_request = AppendRequest {
1561 1 : h: ar_hdr.clone(),
1562 1 : wal_data: Bytes::from_static(b"b"),
1563 1 : };
1564 :
1565 1 : let pem = ProposerElected {
1566 1 : generation: Generation::new(0),
1567 1 : term: 2,
1568 1 : start_streaming_at: Lsn(1),
1569 1 : term_history: TermHistory(vec![
1570 1 : TermLsn {
1571 1 : term: 1,
1572 1 : lsn: Lsn(1),
1573 1 : },
1574 1 : TermLsn {
1575 1 : term: 2,
1576 1 : lsn: Lsn(3),
1577 1 : },
1578 1 : ]),
1579 1 : };
1580 :
1581 : // check that elected msg with generation mismatch is rejected
1582 1 : let mut pem_gen_mismatch = pem.clone();
1583 1 : pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
1584 1 : assert!(
1585 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
1586 1 : .await
1587 1 : .is_err()
1588 : );
1589 :
1590 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1591 1 : .await
1592 1 : .unwrap();
1593 :
1594 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1595 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1596 1 : .await
1597 1 : .unwrap();
1598 1 : assert_eq!(sk.get_last_log_term(), 1);
1599 :
1600 : // but record at term_start_lsn does the switch
1601 1 : ar_hdr.begin_lsn = Lsn(2);
1602 1 : ar_hdr.end_lsn = Lsn(3);
1603 1 : append_request = AppendRequest {
1604 1 : h: ar_hdr,
1605 1 : wal_data: Bytes::from_static(b"b"),
1606 1 : };
1607 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1608 1 : .await
1609 1 : .unwrap();
1610 1 : assert_eq!(sk.get_last_log_term(), 2);
1611 1 : }
1612 :
1613 : #[tokio::test]
1614 1 : async fn test_non_consecutive_write() {
1615 1 : let storage = InMemoryState {
1616 1 : persisted_state: test_sk_state(),
1617 1 : };
1618 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1619 :
1620 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1621 :
1622 1 : let pem = ProposerElected {
1623 1 : generation: Generation::new(0),
1624 1 : term: 1,
1625 1 : start_streaming_at: Lsn(1),
1626 1 : term_history: TermHistory(vec![TermLsn {
1627 1 : term: 1,
1628 1 : lsn: Lsn(1),
1629 1 : }]),
1630 1 : };
1631 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1632 1 : .await
1633 1 : .unwrap();
1634 :
1635 1 : let ar_hdr = AppendRequestHeader {
1636 1 : generation: Generation::new(0),
1637 1 : term: 1,
1638 1 : begin_lsn: Lsn(1),
1639 1 : end_lsn: Lsn(2),
1640 1 : commit_lsn: Lsn(0),
1641 1 : truncate_lsn: Lsn(0),
1642 1 : };
1643 1 : let append_request = AppendRequest {
1644 1 : h: ar_hdr.clone(),
1645 1 : wal_data: Bytes::from_static(b"b"),
1646 1 : };
1647 :
1648 : // check that append request with generation mismatch is rejected
1649 1 : let mut ar_hdr_gen_mismatch = ar_hdr.clone();
1650 1 : ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
1651 1 : let append_request_gen_mismatch = AppendRequest {
1652 1 : h: ar_hdr_gen_mismatch,
1653 1 : wal_data: Bytes::from_static(b"b"),
1654 1 : };
1655 1 : assert!(
1656 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
1657 1 : append_request_gen_mismatch
1658 1 : ))
1659 1 : .await
1660 1 : .is_err()
1661 : );
1662 :
1663 : // do write ending at 2, it should be ok
1664 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1665 1 : .await
1666 1 : .unwrap();
1667 1 : let mut ar_hrd2 = ar_hdr.clone();
1668 1 : ar_hrd2.begin_lsn = Lsn(4);
1669 1 : ar_hrd2.end_lsn = Lsn(5);
1670 1 : let append_request = AppendRequest {
1671 1 : h: ar_hdr,
1672 1 : wal_data: Bytes::from_static(b"b"),
1673 1 : };
1674 : // and now starting at 4, it must fail
1675 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1676 1 : .await
1677 1 : .unwrap_err();
1678 1 : }
1679 :
1680 : #[test]
1681 1 : fn test_find_highest_common_point_none() {
1682 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1683 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1684 1 : assert_eq!(
1685 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1686 : None
1687 : );
1688 1 : }
1689 :
1690 : #[test]
1691 1 : fn test_find_highest_common_point_middle() {
1692 1 : let prop_th = TermHistory(vec![
1693 1 : (1, Lsn(10)).into(),
1694 1 : (2, Lsn(20)).into(),
1695 1 : (4, Lsn(40)).into(),
1696 1 : ]);
1697 1 : let sk_th = TermHistory(vec![
1698 1 : (1, Lsn(10)).into(),
1699 1 : (2, Lsn(20)).into(),
1700 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1701 1 : ]);
1702 1 : assert_eq!(
1703 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1704 : Some(TermLsn {
1705 : term: 2,
1706 : lsn: Lsn(30),
1707 : })
1708 : );
1709 1 : }
1710 :
1711 : #[test]
1712 1 : fn test_find_highest_common_point_sk_end() {
1713 1 : let prop_th = TermHistory(vec![
1714 1 : (1, Lsn(10)).into(),
1715 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1716 1 : (4, Lsn(40)).into(),
1717 1 : ]);
1718 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1719 1 : assert_eq!(
1720 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1721 : Some(TermLsn {
1722 : term: 2,
1723 : lsn: Lsn(32),
1724 : })
1725 : );
1726 1 : }
1727 :
1728 : #[test]
1729 1 : fn test_find_highest_common_point_walprop() {
1730 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1731 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1732 1 : assert_eq!(
1733 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1734 : Some(TermLsn {
1735 : term: 2,
1736 : lsn: Lsn(32),
1737 : })
1738 : );
1739 1 : }
1740 :
1741 : #[test]
1742 1 : fn test_sk_state_bincode_serde_roundtrip() {
1743 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1744 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1745 1 : let state = TimelinePersistentState {
1746 1 : tenant_id,
1747 1 : timeline_id,
1748 1 : mconf: Configuration {
1749 1 : generation: SafekeeperGeneration::new(42),
1750 1 : members: MemberSet::new(vec![SafekeeperId {
1751 1 : id: NodeId(1),
1752 1 : host: "hehe.org".to_owned(),
1753 1 : pg_port: 5432,
1754 1 : }])
1755 1 : .expect("duplicate member"),
1756 1 : new_members: None,
1757 1 : },
1758 1 : acceptor_state: AcceptorState {
1759 1 : term: 42,
1760 1 : term_history: TermHistory(vec![TermLsn {
1761 1 : lsn: Lsn(0x1),
1762 1 : term: 41,
1763 1 : }]),
1764 1 : },
1765 1 : server: ServerInfo {
1766 1 : pg_version: PgVersionId::from_full_pg_version(140000),
1767 1 : system_id: 0x1234567887654321,
1768 1 : wal_seg_size: 0x12345678,
1769 1 : },
1770 1 : proposer_uuid: {
1771 1 : let mut arr = timeline_id.as_arr();
1772 1 : arr.reverse();
1773 1 : arr
1774 1 : },
1775 1 : timeline_start_lsn: Lsn(0x12345600),
1776 1 : local_start_lsn: Lsn(0x12),
1777 1 : commit_lsn: Lsn(1234567800),
1778 1 : backup_lsn: Lsn(1234567300),
1779 1 : peer_horizon_lsn: Lsn(9999999),
1780 1 : remote_consistent_lsn: Lsn(1234560000),
1781 1 : partial_backup: crate::wal_backup_partial::State::default(),
1782 1 : eviction_state: EvictionState::Present,
1783 1 : creation_ts: UNIX_EPOCH,
1784 1 : };
1785 :
1786 1 : let ser = state.ser().unwrap();
1787 :
1788 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1789 :
1790 1 : assert_eq!(deser, state);
1791 1 : }
1792 : }
|