Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use std::cmp::{max, min};
4 : use std::fmt;
5 : use std::io::Read;
6 : use std::str::FromStr;
7 :
8 : use anyhow::{Context, Result, bail};
9 : use byteorder::{LittleEndian, ReadBytesExt};
10 : use bytes::{Buf, BufMut, Bytes, BytesMut};
11 : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
12 : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
13 : use pq_proto::SystemId;
14 : use safekeeper_api::membership::{
15 : INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
16 : };
17 : use safekeeper_api::models::HotStandbyFeedback;
18 : use safekeeper_api::{Term, membership};
19 : use serde::{Deserialize, Serialize};
20 : use storage_broker::proto::SafekeeperTimelineInfo;
21 : use tracing::*;
22 : use utils::bin_ser::LeSer;
23 : use utils::id::{NodeId, TenantId, TimelineId};
24 : use utils::lsn::Lsn;
25 : use utils::pageserver_feedback::PageserverFeedback;
26 :
27 : use crate::metrics::MISC_OPERATION_SECONDS;
28 : use crate::state::TimelineState;
29 : use crate::{control_file, wal_storage};
30 :
31 : pub const SK_PROTO_VERSION_2: u32 = 2;
32 : pub const SK_PROTO_VERSION_3: u32 = 3;
33 : pub const UNKNOWN_SERVER_VERSION: PgVersionId = PgVersionId::UNKNOWN;
34 :
35 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
36 : pub struct TermLsn {
37 : pub term: Term,
38 : pub lsn: Lsn,
39 : }
40 :
41 : // Creation from tuple provides less typing (e.g. for unit tests).
42 : impl From<(Term, Lsn)> for TermLsn {
43 1273 : fn from(pair: (Term, Lsn)) -> TermLsn {
44 1273 : TermLsn {
45 1273 : term: pair.0,
46 1273 : lsn: pair.1,
47 1273 : }
48 1273 : }
49 : }
50 :
51 0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
52 : pub struct TermHistory(pub Vec<TermLsn>);
53 :
54 : impl TermHistory {
55 1457 : pub fn empty() -> TermHistory {
56 1457 : TermHistory(Vec::new())
57 1457 : }
58 :
59 : // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
60 810 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
61 810 : let n_entries = bytes
62 810 : .get_u32_f()
63 810 : .with_context(|| "TermHistory misses len")?;
64 810 : let mut res = Vec::with_capacity(n_entries as usize);
65 7517 : for i in 0..n_entries {
66 7517 : let term = bytes
67 7517 : .get_u64_f()
68 7517 : .with_context(|| format!("TermHistory pos {i} misses term"))?;
69 7517 : let lsn = bytes
70 7517 : .get_u64_f()
71 7517 : .with_context(|| format!("TermHistory pos {i} misses lsn"))?
72 7517 : .into();
73 7517 : res.push(TermLsn { term, lsn })
74 : }
75 810 : Ok(TermHistory(res))
76 810 : }
77 :
78 : // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
79 : // TODO remove once v2 protocol is fully dropped.
80 0 : pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
81 0 : if bytes.remaining() < 4 {
82 0 : bail!("TermHistory misses len");
83 0 : }
84 0 : let n_entries = bytes.get_u32_le();
85 0 : let mut res = Vec::with_capacity(n_entries as usize);
86 0 : for _ in 0..n_entries {
87 0 : if bytes.remaining() < 16 {
88 0 : bail!("TermHistory is incomplete");
89 0 : }
90 0 : res.push(TermLsn {
91 0 : term: bytes.get_u64_le(),
92 0 : lsn: bytes.get_u64_le().into(),
93 0 : })
94 : }
95 0 : Ok(TermHistory(res))
96 0 : }
97 :
98 : /// Return copy of self with switches happening strictly after up_to
99 : /// truncated.
100 5074 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
101 5074 : let mut res = Vec::with_capacity(self.0.len());
102 24945 : for e in &self.0 {
103 19885 : if e.lsn > up_to {
104 14 : break;
105 19871 : }
106 19871 : res.push(*e);
107 : }
108 5074 : TermHistory(res)
109 5074 : }
110 :
111 : /// Find point of divergence between leader (walproposer) term history and
112 : /// safekeeper. Arguments are not symmetric as proposer history ends at
113 : /// +infinity while safekeeper at flush_lsn.
114 : /// C version is at walproposer SendProposerElected.
115 821 : pub fn find_highest_common_point(
116 821 : prop_th: &TermHistory,
117 821 : sk_th: &TermHistory,
118 821 : sk_wal_end: Lsn,
119 821 : ) -> Option<TermLsn> {
120 821 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
121 :
122 821 : if let Some(sk_th_last) = sk_th.last() {
123 685 : assert!(
124 685 : sk_th_last.lsn <= sk_wal_end,
125 0 : "safekeeper term history end {sk_th_last:?} LSN is higher than WAL end {sk_wal_end:?}"
126 : );
127 136 : }
128 :
129 : // find last common term, if any...
130 821 : let mut last_common_idx = None;
131 6620 : for i in 0..min(sk_th.len(), prop_th.len()) {
132 6620 : if prop_th[i].term != sk_th[i].term {
133 2 : break;
134 6618 : }
135 : // If term is the same, LSN must be equal as well.
136 6618 : assert!(
137 6618 : prop_th[i].lsn == sk_th[i].lsn,
138 0 : "same term {} has different start LSNs: prop {}, sk {}",
139 0 : prop_th[i].term,
140 0 : prop_th[i].lsn,
141 0 : sk_th[i].lsn
142 : );
143 6618 : last_common_idx = Some(i);
144 : }
145 821 : let last_common_idx = last_common_idx?;
146 : // Now find where it ends at both prop and sk and take min. End of
147 : // (common) term is the start of the next except it is the last one;
148 : // there it is flush_lsn in case of safekeeper or, in case of proposer
149 : // +infinity, so we just take flush_lsn then.
150 684 : if last_common_idx == prop_th.len() - 1 {
151 40 : Some(TermLsn {
152 40 : term: prop_th[last_common_idx].term,
153 40 : lsn: sk_wal_end,
154 40 : })
155 : } else {
156 644 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
157 644 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
158 1 : sk_th[last_common_idx + 1].lsn
159 : } else {
160 643 : sk_wal_end
161 : };
162 644 : Some(TermLsn {
163 644 : term: prop_th[last_common_idx].term,
164 644 : lsn: min(prop_common_term_end, sk_common_term_end),
165 644 : })
166 : }
167 821 : }
168 : }
169 :
170 : /// Display only latest entries for Debug.
171 : impl fmt::Debug for TermHistory {
172 337 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
173 337 : let n_printed = 20;
174 337 : write!(
175 337 : fmt,
176 337 : "{}{:?}",
177 337 : if self.0.len() > n_printed { "... " } else { "" },
178 337 : self.0
179 337 : .iter()
180 337 : .rev()
181 337 : .take(n_printed)
182 1260 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
183 337 : .collect::<Vec<_>>()
184 : )
185 337 : }
186 : }
187 :
188 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
189 : pub type PgUuid = [u8; 16];
190 :
191 : /// Persistent consensus state of the acceptor.
192 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
193 : pub struct AcceptorState {
194 : /// acceptor's last term it voted for (advanced in 1 phase)
195 : pub term: Term,
196 : /// History of term switches for safekeeper's WAL.
197 : /// Actually it often goes *beyond* WAL contents as we adopt term history
198 : /// from the proposer before recovery.
199 : pub term_history: TermHistory,
200 : }
201 :
202 : impl AcceptorState {
203 : /// acceptor's last_log_term is the term of the highest entry in the log
204 1332 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
205 1332 : let th = self.term_history.up_to(flush_lsn);
206 1332 : match th.0.last() {
207 1326 : Some(e) => e.term,
208 6 : None => 0,
209 : }
210 1332 : }
211 : }
212 :
213 : // protocol messages
214 :
215 : /// Initial Proposer -> Acceptor message
216 0 : #[derive(Debug, Deserialize)]
217 : pub struct ProposerGreeting {
218 : pub tenant_id: TenantId,
219 : pub timeline_id: TimelineId,
220 : pub mconf: membership::Configuration,
221 : /// Postgres server version
222 : pub pg_version: PgVersionId,
223 : pub system_id: SystemId,
224 : pub wal_seg_size: u32,
225 : }
226 :
227 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
228 0 : #[derive(Debug, Deserialize)]
229 : pub struct ProposerGreetingV2 {
230 : /// proposer-acceptor protocol version
231 : pub protocol_version: u32,
232 : /// Postgres server version
233 : pub pg_version: PgVersionId,
234 : pub proposer_id: PgUuid,
235 : pub system_id: SystemId,
236 : pub timeline_id: TimelineId,
237 : pub tenant_id: TenantId,
238 : pub tli: TimeLineID,
239 : pub wal_seg_size: u32,
240 : }
241 :
242 : /// Acceptor -> Proposer initial response: the highest term known to me
243 : /// (acceptor voted for).
244 : #[derive(Debug, Serialize)]
245 : pub struct AcceptorGreeting {
246 : node_id: NodeId,
247 : mconf: membership::Configuration,
248 : term: u64,
249 : }
250 :
251 : /// Vote request sent from proposer to safekeepers
252 : #[derive(Debug)]
253 : pub struct VoteRequest {
254 : pub generation: Generation,
255 : pub term: Term,
256 : }
257 :
258 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
259 0 : #[derive(Debug, Deserialize)]
260 : pub struct VoteRequestV2 {
261 : pub term: Term,
262 : }
263 :
264 : /// Vote itself, sent from safekeeper to proposer
265 : #[derive(Debug, Serialize)]
266 : pub struct VoteResponse {
267 : generation: Generation, // membership conf generation
268 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
269 : vote_given: bool,
270 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
271 : // proposer to choose the most advanced one.
272 : pub flush_lsn: Lsn,
273 : truncate_lsn: Lsn,
274 : pub term_history: TermHistory,
275 : }
276 :
277 : /*
278 : * Proposer -> Acceptor message announcing proposer is elected and communicating
279 : * term history to it.
280 : */
281 : #[derive(Debug, Clone)]
282 : pub struct ProposerElected {
283 : pub generation: Generation, // membership conf generation
284 : pub term: Term,
285 : pub start_streaming_at: Lsn,
286 : pub term_history: TermHistory,
287 : }
288 :
289 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
290 : /// communicates commit_lsn.
291 : #[derive(Debug)]
292 : pub struct AppendRequest {
293 : pub h: AppendRequestHeader,
294 : pub wal_data: Bytes,
295 : }
296 0 : #[derive(Debug, Clone, Deserialize)]
297 : pub struct AppendRequestHeader {
298 : pub generation: Generation, // membership conf generation
299 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
300 : pub term: Term,
301 : /// start position of message in WAL
302 : pub begin_lsn: Lsn,
303 : /// end position of message in WAL
304 : pub end_lsn: Lsn,
305 : /// LSN committed by quorum of safekeepers
306 : pub commit_lsn: Lsn,
307 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
308 : pub truncate_lsn: Lsn,
309 : }
310 :
311 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
312 0 : #[derive(Debug, Clone, Deserialize)]
313 : pub struct AppendRequestHeaderV2 {
314 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
315 : pub term: Term,
316 : // TODO: remove this field from the protocol, it in unused -- LSN of term
317 : // switch can be taken from ProposerElected (as well as from term history).
318 : pub term_start_lsn: Lsn,
319 : /// start position of message in WAL
320 : pub begin_lsn: Lsn,
321 : /// end position of message in WAL
322 : pub end_lsn: Lsn,
323 : /// LSN committed by quorum of safekeepers
324 : pub commit_lsn: Lsn,
325 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
326 : pub truncate_lsn: Lsn,
327 : // only for logging/debugging
328 : pub proposer_uuid: PgUuid,
329 : }
330 :
331 : /// Report safekeeper state to proposer
332 : #[derive(Debug, Serialize, Clone)]
333 : pub struct AppendResponse {
334 : // Membership conf generation. Not strictly required because on mismatch
335 : // connection is reset, but let's sanity check it.
336 : generation: Generation,
337 : // Current term of the safekeeper; if it is higher than proposer's, the
338 : // compute is out of date.
339 : pub term: Term,
340 : // Flushed end of wal on safekeeper; one should be always mindful from what
341 : // term history this value comes, either checking history directly or
342 : // observing term being set to one for which WAL truncation is known to have
343 : // happened.
344 : pub flush_lsn: Lsn,
345 : // We report back our awareness about which WAL is committed, as this is
346 : // a criterion for walproposer --sync mode exit
347 : pub commit_lsn: Lsn,
348 : pub hs_feedback: HotStandbyFeedback,
349 : pub pageserver_feedback: Option<PageserverFeedback>,
350 : }
351 :
352 : impl AppendResponse {
353 0 : fn term_only(generation: Generation, term: Term) -> AppendResponse {
354 0 : AppendResponse {
355 0 : generation,
356 0 : term,
357 0 : flush_lsn: Lsn(0),
358 0 : commit_lsn: Lsn(0),
359 0 : hs_feedback: HotStandbyFeedback::empty(),
360 0 : pageserver_feedback: None,
361 0 : }
362 0 : }
363 : }
364 :
365 : /// Proposer -> Acceptor messages
366 : #[derive(Debug)]
367 : pub enum ProposerAcceptorMessage {
368 : Greeting(ProposerGreeting),
369 : VoteRequest(VoteRequest),
370 : Elected(ProposerElected),
371 : AppendRequest(AppendRequest),
372 : NoFlushAppendRequest(AppendRequest),
373 : FlushWAL,
374 : }
375 :
376 : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
377 : /// All reads are in network (big endian) order.
378 : trait BytesF {
379 : fn get_u8_f(&mut self) -> Result<u8>;
380 : fn get_u16_f(&mut self) -> Result<u16>;
381 : fn get_u32_f(&mut self) -> Result<u32>;
382 : fn get_u64_f(&mut self) -> Result<u64>;
383 : }
384 :
385 : impl BytesF for Bytes {
386 27773 : fn get_u8_f(&mut self) -> Result<u8> {
387 27773 : if self.is_empty() {
388 0 : bail!("no bytes left, expected 1");
389 27773 : }
390 27773 : Ok(self.get_u8())
391 27773 : }
392 0 : fn get_u16_f(&mut self) -> Result<u16> {
393 0 : if self.remaining() < 2 {
394 0 : bail!("no bytes left, expected 2");
395 0 : }
396 0 : Ok(self.get_u16())
397 0 : }
398 110999 : fn get_u32_f(&mut self) -> Result<u32> {
399 110999 : if self.remaining() < 4 {
400 0 : bail!("only {} bytes left, expected 4", self.remaining());
401 110999 : }
402 110999 : Ok(self.get_u32())
403 110999 : }
404 57361 : fn get_u64_f(&mut self) -> Result<u64> {
405 57361 : if self.remaining() < 8 {
406 0 : bail!("only {} bytes left, expected 8", self.remaining());
407 57361 : }
408 57361 : Ok(self.get_u64())
409 57361 : }
410 : }
411 :
412 : impl ProposerAcceptorMessage {
413 : /// Read cstring from Bytes.
414 41208 : fn get_cstr(buf: &mut Bytes) -> Result<String> {
415 41208 : let pos = buf
416 41208 : .iter()
417 1359864 : .position(|x| *x == 0)
418 41208 : .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
419 41208 : let result = buf.split_to(pos);
420 41208 : buf.advance(1); // drop the null terminator
421 41208 : match std::str::from_utf8(&result) {
422 41208 : Ok(s) => Ok(s.to_string()),
423 0 : Err(e) => bail!("invalid utf8 in cstring: {}", e),
424 : }
425 41208 : }
426 :
427 : /// Read membership::Configuration from Bytes.
428 20604 : fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
429 20604 : let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
430 20604 : let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
431 : // Main member set must have at least someone in valid configuration.
432 : // Empty conf is allowed until we fully migrate.
433 20604 : if generation != INVALID_GENERATION && members_len == 0 {
434 0 : bail!("empty members_len");
435 20604 : }
436 20604 : let mut members = MemberSet::empty();
437 20604 : for i in 0..members_len {
438 0 : let id = buf
439 0 : .get_u64_f()
440 0 : .with_context(|| format!("reading member {i} node_id"))?;
441 0 : let host = Self::get_cstr(buf).with_context(|| format!("reading member {i} host"))?;
442 0 : let pg_port = buf
443 0 : .get_u16_f()
444 0 : .with_context(|| format!("reading member {i} port"))?;
445 0 : let sk = SafekeeperId {
446 0 : id: NodeId(id),
447 0 : host,
448 0 : pg_port,
449 0 : };
450 0 : members.add(sk)?;
451 : }
452 20604 : let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
453 : // Non joint conf.
454 20604 : if new_members_len == 0 {
455 20604 : Ok(membership::Configuration {
456 20604 : generation,
457 20604 : members,
458 20604 : new_members: None,
459 20604 : })
460 : } else {
461 0 : let mut new_members = MemberSet::empty();
462 0 : for i in 0..new_members_len {
463 0 : let id = buf
464 0 : .get_u64_f()
465 0 : .with_context(|| format!("reading new member {i} node_id"))?;
466 0 : let host =
467 0 : Self::get_cstr(buf).with_context(|| format!("reading new member {i} host"))?;
468 0 : let pg_port = buf
469 0 : .get_u16_f()
470 0 : .with_context(|| format!("reading new member {i} port"))?;
471 0 : let sk = SafekeeperId {
472 0 : id: NodeId(id),
473 0 : host,
474 0 : pg_port,
475 0 : };
476 0 : new_members.add(sk)?;
477 : }
478 0 : Ok(membership::Configuration {
479 0 : generation,
480 0 : members,
481 0 : new_members: Some(new_members),
482 0 : })
483 : }
484 20604 : }
485 :
486 : /// Parse proposer message.
487 27773 : pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
488 27773 : if proto_version == SK_PROTO_VERSION_3 {
489 27773 : if msg_bytes.is_empty() {
490 0 : bail!("ProposerAcceptorMessage is not complete: missing tag");
491 27773 : }
492 27773 : let tag = msg_bytes.get_u8_f().with_context(|| {
493 0 : "ProposerAcceptorMessage is not complete: missing tag".to_string()
494 0 : })? as char;
495 27773 : match tag {
496 : 'g' => {
497 20604 : let tenant_id_str =
498 20604 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
499 20604 : let tenant_id = TenantId::from_str(&tenant_id_str)?;
500 20604 : let timeline_id_str =
501 20604 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
502 20604 : let timeline_id = TimelineId::from_str(&timeline_id_str)?;
503 20604 : let mconf = Self::get_mconf(&mut msg_bytes)?;
504 20604 : let pg_version = msg_bytes
505 20604 : .get_u32_f()
506 20604 : .with_context(|| "reading pg_version")?;
507 20604 : let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
508 20604 : let wal_seg_size = msg_bytes
509 20604 : .get_u32_f()
510 20604 : .with_context(|| "reading wal_seg_size")?;
511 20604 : let g = ProposerGreeting {
512 20604 : tenant_id,
513 20604 : timeline_id,
514 20604 : mconf,
515 20604 : pg_version: PgVersionId::from_full_pg_version(pg_version),
516 20604 : system_id,
517 20604 : wal_seg_size,
518 20604 : };
519 20604 : Ok(ProposerAcceptorMessage::Greeting(g))
520 : }
521 : 'v' => {
522 2923 : let generation = Generation::new(
523 2923 : msg_bytes
524 2923 : .get_u32_f()
525 2923 : .with_context(|| "reading generation")?,
526 : );
527 2923 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
528 2923 : let v = VoteRequest { generation, term };
529 2923 : Ok(ProposerAcceptorMessage::VoteRequest(v))
530 : }
531 : 'e' => {
532 810 : let generation = Generation::new(
533 810 : msg_bytes
534 810 : .get_u32_f()
535 810 : .with_context(|| "reading generation")?,
536 : );
537 810 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
538 810 : let start_streaming_at: Lsn = msg_bytes
539 810 : .get_u64_f()
540 810 : .with_context(|| "reading start_streaming_at")?
541 810 : .into();
542 810 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
543 810 : let msg = ProposerElected {
544 810 : generation,
545 810 : term,
546 810 : start_streaming_at,
547 810 : term_history,
548 810 : };
549 810 : Ok(ProposerAcceptorMessage::Elected(msg))
550 : }
551 : 'a' => {
552 3436 : let generation = Generation::new(
553 3436 : msg_bytes
554 3436 : .get_u32_f()
555 3436 : .with_context(|| "reading generation")?,
556 : );
557 3436 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
558 3436 : let begin_lsn: Lsn = msg_bytes
559 3436 : .get_u64_f()
560 3436 : .with_context(|| "reading begin_lsn")?
561 3436 : .into();
562 3436 : let end_lsn: Lsn = msg_bytes
563 3436 : .get_u64_f()
564 3436 : .with_context(|| "reading end_lsn")?
565 3436 : .into();
566 3436 : let commit_lsn: Lsn = msg_bytes
567 3436 : .get_u64_f()
568 3436 : .with_context(|| "reading commit_lsn")?
569 3436 : .into();
570 3436 : let truncate_lsn: Lsn = msg_bytes
571 3436 : .get_u64_f()
572 3436 : .with_context(|| "reading truncate_lsn")?
573 3436 : .into();
574 3436 : let hdr = AppendRequestHeader {
575 3436 : generation,
576 3436 : term,
577 3436 : begin_lsn,
578 3436 : end_lsn,
579 3436 : commit_lsn,
580 3436 : truncate_lsn,
581 3436 : };
582 3436 : let rec_size = hdr
583 3436 : .end_lsn
584 3436 : .checked_sub(hdr.begin_lsn)
585 3436 : .context("begin_lsn > end_lsn in AppendRequest")?
586 : .0 as usize;
587 3436 : if rec_size > MAX_SEND_SIZE {
588 0 : bail!(
589 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
590 : MAX_SEND_SIZE
591 : );
592 3436 : }
593 3436 : if msg_bytes.remaining() < rec_size {
594 0 : bail!(
595 0 : "reading WAL: only {} bytes left, wanted {}",
596 0 : msg_bytes.remaining(),
597 : rec_size
598 : );
599 3436 : }
600 3436 : let wal_data = msg_bytes.copy_to_bytes(rec_size);
601 3436 : let msg = AppendRequest { h: hdr, wal_data };
602 :
603 3436 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
604 : }
605 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
606 : }
607 0 : } else if proto_version == SK_PROTO_VERSION_2 {
608 : // xxx using Reader is inefficient but easy to work with bincode
609 0 : let mut stream = msg_bytes.reader();
610 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
611 0 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
612 0 : match tag {
613 : 'g' => {
614 0 : let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
615 0 : let g = ProposerGreeting {
616 0 : tenant_id: msgv2.tenant_id,
617 0 : timeline_id: msgv2.timeline_id,
618 0 : mconf: membership::Configuration {
619 0 : generation: INVALID_GENERATION,
620 0 : members: MemberSet::empty(),
621 0 : new_members: None,
622 0 : },
623 0 : pg_version: msgv2.pg_version,
624 0 : system_id: msgv2.system_id,
625 0 : wal_seg_size: msgv2.wal_seg_size,
626 0 : };
627 0 : Ok(ProposerAcceptorMessage::Greeting(g))
628 : }
629 : 'v' => {
630 0 : let msg = VoteRequestV2::des_from(&mut stream)?;
631 0 : let v = VoteRequest {
632 0 : generation: INVALID_GENERATION,
633 0 : term: msg.term,
634 0 : };
635 0 : Ok(ProposerAcceptorMessage::VoteRequest(v))
636 : }
637 : 'e' => {
638 0 : let mut msg_bytes = stream.into_inner();
639 0 : if msg_bytes.remaining() < 16 {
640 0 : bail!("ProposerElected message is not complete");
641 0 : }
642 0 : let term = msg_bytes.get_u64_le();
643 0 : let start_streaming_at = msg_bytes.get_u64_le().into();
644 0 : let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
645 0 : if msg_bytes.remaining() < 8 {
646 0 : bail!("ProposerElected message is not complete");
647 0 : }
648 0 : let _timeline_start_lsn = msg_bytes.get_u64_le();
649 0 : let msg = ProposerElected {
650 0 : generation: INVALID_GENERATION,
651 0 : term,
652 0 : start_streaming_at,
653 0 : term_history,
654 0 : };
655 0 : Ok(ProposerAcceptorMessage::Elected(msg))
656 : }
657 : 'a' => {
658 : // read header followed by wal data
659 0 : let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
660 0 : let hdr = AppendRequestHeader {
661 0 : generation: INVALID_GENERATION,
662 0 : term: hdrv2.term,
663 0 : begin_lsn: hdrv2.begin_lsn,
664 0 : end_lsn: hdrv2.end_lsn,
665 0 : commit_lsn: hdrv2.commit_lsn,
666 0 : truncate_lsn: hdrv2.truncate_lsn,
667 0 : };
668 0 : let rec_size = hdr
669 0 : .end_lsn
670 0 : .checked_sub(hdr.begin_lsn)
671 0 : .context("begin_lsn > end_lsn in AppendRequest")?
672 : .0 as usize;
673 0 : if rec_size > MAX_SEND_SIZE {
674 0 : bail!(
675 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
676 : MAX_SEND_SIZE
677 : );
678 0 : }
679 :
680 0 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
681 0 : stream.read_exact(&mut wal_data_vec)?;
682 0 : let wal_data = Bytes::from(wal_data_vec);
683 :
684 0 : let msg = AppendRequest { h: hdr, wal_data };
685 :
686 0 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
687 : }
688 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
689 : }
690 : } else {
691 0 : bail!("unsupported protocol version {}", proto_version);
692 : }
693 27773 : }
694 :
695 : /// The memory size of the message, including byte slices.
696 620 : pub fn size(&self) -> usize {
697 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
698 :
699 : // For most types, the size is just the base enum size including the nested structs. Some
700 : // types also contain byte slices; add them.
701 : //
702 : // We explicitly list all fields, to draw attention here when new fields are added.
703 620 : let mut size = BASE_SIZE;
704 620 : size += match self {
705 0 : Self::Greeting(_) => 0,
706 :
707 0 : Self::VoteRequest(_) => 0,
708 :
709 0 : Self::Elected(_) => 0,
710 :
711 : Self::AppendRequest(AppendRequest {
712 : h:
713 : AppendRequestHeader {
714 : generation: _,
715 : term: _,
716 : begin_lsn: _,
717 : end_lsn: _,
718 : commit_lsn: _,
719 : truncate_lsn: _,
720 : },
721 620 : wal_data,
722 620 : }) => wal_data.len(),
723 :
724 : Self::NoFlushAppendRequest(AppendRequest {
725 : h:
726 : AppendRequestHeader {
727 : generation: _,
728 : term: _,
729 : begin_lsn: _,
730 : end_lsn: _,
731 : commit_lsn: _,
732 : truncate_lsn: _,
733 : },
734 0 : wal_data,
735 0 : }) => wal_data.len(),
736 :
737 0 : Self::FlushWAL => 0,
738 : };
739 :
740 620 : size
741 620 : }
742 : }
743 :
744 : /// Acceptor -> Proposer messages
745 : #[derive(Debug)]
746 : pub enum AcceptorProposerMessage {
747 : Greeting(AcceptorGreeting),
748 : VoteResponse(VoteResponse),
749 : AppendResponse(AppendResponse),
750 : }
751 :
752 : impl AcceptorProposerMessage {
753 0 : fn put_cstr(buf: &mut BytesMut, s: &str) {
754 0 : buf.put_slice(s.as_bytes());
755 0 : buf.put_u8(0); // null terminator
756 0 : }
757 :
758 : /// Serialize membership::Configuration into buf.
759 20604 : fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
760 20604 : buf.put_u32(mconf.generation.into_inner());
761 20604 : buf.put_u32(mconf.members.m.len() as u32);
762 20604 : for sk in &mconf.members.m {
763 0 : buf.put_u64(sk.id.0);
764 0 : Self::put_cstr(buf, &sk.host);
765 0 : buf.put_u16(sk.pg_port);
766 0 : }
767 20604 : if let Some(ref new_members) = mconf.new_members {
768 0 : buf.put_u32(new_members.m.len() as u32);
769 0 : for sk in &new_members.m {
770 0 : buf.put_u64(sk.id.0);
771 0 : Self::put_cstr(buf, &sk.host);
772 0 : buf.put_u16(sk.pg_port);
773 0 : }
774 20604 : } else {
775 20604 : buf.put_u32(0);
776 20604 : }
777 20604 : }
778 :
779 : /// Serialize acceptor -> proposer message.
780 26397 : pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
781 26397 : if proto_version == SK_PROTO_VERSION_3 {
782 26397 : match self {
783 20604 : AcceptorProposerMessage::Greeting(msg) => {
784 20604 : buf.put_u8(b'g');
785 20604 : buf.put_u64(msg.node_id.0);
786 20604 : Self::serialize_mconf(buf, &msg.mconf);
787 20604 : buf.put_u64(msg.term)
788 : }
789 2923 : AcceptorProposerMessage::VoteResponse(msg) => {
790 2923 : buf.put_u8(b'v');
791 2923 : buf.put_u32(msg.generation.into_inner());
792 2923 : buf.put_u64(msg.term);
793 2923 : buf.put_u8(msg.vote_given as u8);
794 2923 : buf.put_u64(msg.flush_lsn.into());
795 2923 : buf.put_u64(msg.truncate_lsn.into());
796 2923 : buf.put_u32(msg.term_history.0.len() as u32);
797 14661 : for e in &msg.term_history.0 {
798 11738 : buf.put_u64(e.term);
799 11738 : buf.put_u64(e.lsn.into());
800 11738 : }
801 : }
802 2870 : AcceptorProposerMessage::AppendResponse(msg) => {
803 2870 : buf.put_u8(b'a');
804 2870 : buf.put_u32(msg.generation.into_inner());
805 2870 : buf.put_u64(msg.term);
806 2870 : buf.put_u64(msg.flush_lsn.into());
807 2870 : buf.put_u64(msg.commit_lsn.into());
808 2870 : buf.put_i64(msg.hs_feedback.ts);
809 2870 : buf.put_u64(msg.hs_feedback.xmin);
810 2870 : buf.put_u64(msg.hs_feedback.catalog_xmin);
811 :
812 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
813 : // if it is not present.
814 2870 : if let Some(ref msg) = msg.pageserver_feedback {
815 0 : msg.serialize(buf);
816 2870 : }
817 : }
818 : }
819 26397 : Ok(())
820 : // TODO remove 3 after converting all msgs
821 0 : } else if proto_version == SK_PROTO_VERSION_2 {
822 0 : match self {
823 0 : AcceptorProposerMessage::Greeting(msg) => {
824 0 : buf.put_u64_le('g' as u64);
825 0 : // v2 didn't have mconf and fields were reordered
826 0 : buf.put_u64_le(msg.term);
827 0 : buf.put_u64_le(msg.node_id.0);
828 0 : }
829 0 : AcceptorProposerMessage::VoteResponse(msg) => {
830 : // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
831 0 : buf.put_u64_le('v' as u64);
832 0 : buf.put_u64_le(msg.term);
833 0 : buf.put_u64_le(msg.vote_given as u64);
834 0 : buf.put_u64_le(msg.flush_lsn.into());
835 0 : buf.put_u64_le(msg.truncate_lsn.into());
836 0 : buf.put_u32_le(msg.term_history.0.len() as u32);
837 0 : for e in &msg.term_history.0 {
838 0 : buf.put_u64_le(e.term);
839 0 : buf.put_u64_le(e.lsn.into());
840 0 : }
841 : // removed timeline_start_lsn
842 0 : buf.put_u64_le(0);
843 : }
844 0 : AcceptorProposerMessage::AppendResponse(msg) => {
845 : // v2 didn't have generation
846 0 : buf.put_u64_le('a' as u64);
847 0 : buf.put_u64_le(msg.term);
848 0 : buf.put_u64_le(msg.flush_lsn.into());
849 0 : buf.put_u64_le(msg.commit_lsn.into());
850 0 : buf.put_i64_le(msg.hs_feedback.ts);
851 0 : buf.put_u64_le(msg.hs_feedback.xmin);
852 0 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
853 :
854 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
855 : // if it is not present.
856 0 : if let Some(ref msg) = msg.pageserver_feedback {
857 0 : msg.serialize(buf);
858 0 : }
859 : }
860 : }
861 0 : Ok(())
862 : } else {
863 0 : bail!("unsupported protocol version {}", proto_version);
864 : }
865 26397 : }
866 : }
867 :
868 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
869 : /// It controls all WAL disk writes and updates of control file.
870 : ///
871 : /// Currently safekeeper processes:
872 : /// - messages from compute (proposers) and provides replies
873 : /// - messages from broker peers
874 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
875 : /// LSN since the proposer safekeeper currently talking to appends WAL;
876 : /// determines last_log_term switch point.
877 : pub term_start_lsn: Lsn,
878 :
879 : pub state: TimelineState<CTRL>, // persistent state storage
880 : pub wal_store: WAL,
881 :
882 : node_id: NodeId, // safekeeper's node id
883 : }
884 :
885 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
886 : where
887 : CTRL: control_file::Storage,
888 : WAL: wal_storage::Storage,
889 : {
890 : /// Accepts a control file storage containing the safekeeper state.
891 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
892 : /// and `server` (`wal_seg_size` inside it) fields.
893 9129 : pub fn new(
894 9129 : state: TimelineState<CTRL>,
895 9129 : wal_store: WAL,
896 9129 : node_id: NodeId,
897 9129 : ) -> Result<SafeKeeper<CTRL, WAL>> {
898 9129 : if state.tenant_id == TenantId::from([0u8; 16])
899 9129 : || state.timeline_id == TimelineId::from([0u8; 16])
900 : {
901 0 : bail!(
902 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
903 0 : state.tenant_id,
904 0 : state.timeline_id
905 : );
906 9 : }
907 :
908 9129 : Ok(SafeKeeper {
909 9129 : term_start_lsn: Lsn(0),
910 9129 : state,
911 9129 : wal_store,
912 9129 : node_id,
913 9129 : })
914 9 : }
915 :
916 : /// Get history of term switches for the available WAL
917 3742 : fn get_term_history(&self) -> TermHistory {
918 3742 : self.state
919 3742 : .acceptor_state
920 3742 : .term_history
921 3742 : .up_to(self.flush_lsn())
922 9 : }
923 :
924 43 : pub fn get_last_log_term(&self) -> Term {
925 43 : self.state
926 43 : .acceptor_state
927 43 : .get_last_log_term(self.flush_lsn())
928 2 : }
929 :
930 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
931 17799 : pub fn flush_lsn(&self) -> Lsn {
932 17799 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
933 1883 : }
934 :
935 : /// Process message from proposer and possibly form reply. Concurrent
936 : /// callers must exclude each other.
937 31899 : pub async fn process_msg(
938 31899 : &mut self,
939 31899 : msg: &ProposerAcceptorMessage,
940 31899 : ) -> Result<Option<AcceptorProposerMessage>> {
941 31899 : match msg {
942 20604 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
943 2926 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
944 818 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
945 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
946 5 : self.handle_append_request(msg, true).await
947 : }
948 4056 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
949 4056 : self.handle_append_request(msg, false).await
950 : }
951 3490 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
952 : }
953 1256 : }
954 :
955 : /// Handle initial message from proposer: check its sanity and send my
956 : /// current term.
957 20604 : async fn handle_greeting(
958 20604 : &mut self,
959 20604 : msg: &ProposerGreeting,
960 20604 : ) -> Result<Option<AcceptorProposerMessage>> {
961 : /* Postgres major version mismatch is treated as fatal error
962 : * because safekeepers parse WAL headers and the format
963 : * may change between versions.
964 : */
965 20604 : if PgMajorVersion::try_from(msg.pg_version)?
966 20604 : != PgMajorVersion::try_from(self.state.server.pg_version)?
967 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
968 : {
969 0 : bail!(
970 0 : "incompatible server version {}, expected {}",
971 : msg.pg_version,
972 0 : self.state.server.pg_version
973 : );
974 0 : }
975 :
976 20604 : if msg.tenant_id != self.state.tenant_id {
977 0 : bail!(
978 0 : "invalid tenant ID, got {}, expected {}",
979 : msg.tenant_id,
980 0 : self.state.tenant_id
981 : );
982 0 : }
983 20604 : if msg.timeline_id != self.state.timeline_id {
984 0 : bail!(
985 0 : "invalid timeline ID, got {}, expected {}",
986 : msg.timeline_id,
987 0 : self.state.timeline_id
988 : );
989 0 : }
990 20604 : if self.state.server.wal_seg_size != msg.wal_seg_size {
991 0 : bail!(
992 0 : "invalid wal_seg_size, got {}, expected {}",
993 : msg.wal_seg_size,
994 0 : self.state.server.wal_seg_size
995 : );
996 0 : }
997 :
998 : // system_id will be updated on mismatch
999 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
1000 20604 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
1001 0 : if self.state.server.system_id != 0 {
1002 0 : warn!(
1003 0 : "unexpected system ID arrived, got {}, expected {}",
1004 0 : msg.system_id, self.state.server.system_id
1005 : );
1006 0 : }
1007 :
1008 0 : let mut state = self.state.start_change();
1009 0 : state.server.system_id = msg.system_id;
1010 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
1011 0 : state.server.pg_version = msg.pg_version;
1012 0 : }
1013 0 : self.state.finish_change(&state).await?;
1014 0 : }
1015 :
1016 : // Switch into conf given by proposer conf if it is higher.
1017 20604 : self.state.membership_switch(msg.mconf.clone()).await?;
1018 :
1019 20604 : let apg = AcceptorGreeting {
1020 20604 : node_id: self.node_id,
1021 20604 : mconf: self.state.mconf.clone(),
1022 20604 : term: self.state.acceptor_state.term,
1023 20604 : };
1024 20604 : info!(
1025 0 : "processed greeting {:?} from walproposer, sending {:?}",
1026 : msg, apg
1027 : );
1028 20604 : Ok(Some(AcceptorProposerMessage::Greeting(apg)))
1029 0 : }
1030 :
1031 : /// Give vote for the given term, if we haven't done that previously.
1032 2926 : async fn handle_vote_request(
1033 2926 : &mut self,
1034 2926 : msg: &VoteRequest,
1035 2926 : ) -> Result<Option<AcceptorProposerMessage>> {
1036 2926 : if self.state.mconf.generation != msg.generation {
1037 1 : bail!(
1038 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1039 : msg,
1040 1 : self.state.mconf.generation
1041 : );
1042 2 : }
1043 : // Once voted, we won't accept data from older proposers; flush
1044 : // everything we've already received so that new proposer starts
1045 : // streaming at end of our WAL, without overlap. WAL is truncated at
1046 : // streaming point and commit_lsn may be advanced from peers, so this
1047 : // also avoids possible spurious attempt to truncate committed WAL.
1048 2925 : self.wal_store.flush_wal().await?;
1049 : // initialize with refusal
1050 2925 : let mut resp = VoteResponse {
1051 2925 : generation: self.state.mconf.generation,
1052 2925 : term: self.state.acceptor_state.term,
1053 2925 : vote_given: false,
1054 2925 : flush_lsn: self.flush_lsn(),
1055 2925 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
1056 2925 : term_history: self.get_term_history(),
1057 2925 : };
1058 2925 : if self.state.acceptor_state.term < msg.term {
1059 2813 : let mut state = self.state.start_change();
1060 2813 : state.acceptor_state.term = msg.term;
1061 : // persist vote before sending it out
1062 2813 : self.state.finish_change(&state).await?;
1063 :
1064 2813 : resp.term = self.state.acceptor_state.term;
1065 2813 : resp.vote_given = true;
1066 1 : }
1067 2925 : info!("processed {:?}: sending {:?}", msg, &resp);
1068 2925 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
1069 3 : }
1070 :
1071 : /// Form AppendResponse from current state.
1072 3493 : fn append_response(&self) -> AppendResponse {
1073 3493 : let ar = AppendResponse {
1074 3493 : generation: self.state.mconf.generation,
1075 3493 : term: self.state.acceptor_state.term,
1076 3493 : flush_lsn: self.flush_lsn(),
1077 3493 : commit_lsn: self.state.commit_lsn,
1078 3493 : // will be filled by the upper code to avoid bothering safekeeper
1079 3493 : hs_feedback: HotStandbyFeedback::empty(),
1080 3493 : pageserver_feedback: None,
1081 3493 : };
1082 3493 : trace!("formed AppendResponse {:?}", ar);
1083 3493 : ar
1084 623 : }
1085 :
1086 818 : async fn handle_elected(
1087 818 : &mut self,
1088 818 : msg: &ProposerElected,
1089 818 : ) -> Result<Option<AcceptorProposerMessage>> {
1090 818 : let _timer = MISC_OPERATION_SECONDS
1091 818 : .with_label_values(&["handle_elected"])
1092 818 : .start_timer();
1093 :
1094 818 : info!(
1095 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
1096 : msg,
1097 0 : self.state.acceptor_state.term,
1098 0 : self.get_last_log_term(),
1099 0 : self.flush_lsn()
1100 : );
1101 818 : if self.state.mconf.generation != msg.generation {
1102 1 : bail!(
1103 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1104 : msg,
1105 1 : self.state.mconf.generation
1106 : );
1107 7 : }
1108 817 : if self.state.acceptor_state.term < msg.term {
1109 7 : let mut state = self.state.start_change();
1110 7 : state.acceptor_state.term = msg.term;
1111 7 : self.state.finish_change(&state).await?;
1112 0 : }
1113 :
1114 : // If our term is higher, ignore the message (next feedback will inform the compute)
1115 817 : if self.state.acceptor_state.term > msg.term {
1116 0 : return Ok(None);
1117 7 : }
1118 :
1119 : // Before truncating WAL check-cross the check divergence point received
1120 : // from the walproposer.
1121 817 : let sk_th = self.get_term_history();
1122 817 : let last_common_point = match TermHistory::find_highest_common_point(
1123 817 : &msg.term_history,
1124 817 : &sk_th,
1125 817 : self.flush_lsn(),
1126 817 : ) {
1127 : // No common point. Expect streaming from the beginning of the
1128 : // history like walproposer while we don't have proper init.
1129 136 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
1130 136 : "empty walproposer term history {:?}",
1131 : msg.term_history
1132 0 : ))?,
1133 681 : Some(lcp) => lcp,
1134 : };
1135 : // This is expected to happen in a rare race when another connection
1136 : // from the same walproposer writes + flushes WAL after this connection
1137 : // sent flush_lsn in VoteRequest; for instance, very late
1138 : // ProposerElected message delivery after another connection was
1139 : // established and wrote WAL. In such cases error is transient;
1140 : // reconnection makes safekeeper send newest term history and flush_lsn
1141 : // and walproposer recalculates the streaming point. OTOH repeating
1142 : // error indicates a serious bug.
1143 817 : if last_common_point.lsn != msg.start_streaming_at {
1144 0 : bail!(
1145 0 : "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1146 : last_common_point,
1147 : msg.start_streaming_at,
1148 0 : self.state.acceptor_state.term,
1149 : sk_th,
1150 0 : self.flush_lsn(),
1151 : msg.term_history,
1152 : );
1153 7 : }
1154 :
1155 : // We are also expected to never attempt to truncate committed data.
1156 817 : assert!(
1157 817 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
1158 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1159 : msg.start_streaming_at,
1160 : self.state.inmem.commit_lsn,
1161 0 : self.state.acceptor_state.term,
1162 : sk_th,
1163 0 : self.flush_lsn(),
1164 : msg.term_history,
1165 : );
1166 :
1167 : // Before first WAL write initialize its segment. It makes first segment
1168 : // pg_waldump'able because stream from compute doesn't include its
1169 : // segment and page headers.
1170 : //
1171 : // If we fail before first WAL write flush this action would be
1172 : // repeated, that's ok because it is idempotent.
1173 817 : if self.wal_store.flush_lsn() == Lsn::INVALID {
1174 136 : self.wal_store
1175 136 : .initialize_first_segment(msg.start_streaming_at)
1176 136 : .await?;
1177 0 : }
1178 :
1179 : // truncate wal, update the LSNs
1180 817 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
1181 :
1182 : // and now adopt term history from proposer
1183 : {
1184 817 : let mut state = self.state.start_change();
1185 :
1186 : // Here we learn initial LSN for the first time, set fields
1187 : // interested in that.
1188 :
1189 817 : if let Some(start_lsn) = msg.term_history.0.first() {
1190 817 : if state.timeline_start_lsn == Lsn(0) {
1191 : // Remember point where WAL begins globally. In the future it
1192 : // will be intialized immediately on timeline creation.
1193 136 : state.timeline_start_lsn = start_lsn.lsn;
1194 136 : info!(
1195 0 : "setting timeline_start_lsn to {:?}",
1196 : state.timeline_start_lsn
1197 : );
1198 0 : }
1199 0 : }
1200 :
1201 817 : if state.peer_horizon_lsn == Lsn(0) {
1202 136 : // Update peer_horizon_lsn as soon as we know where timeline starts.
1203 136 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
1204 136 : state.peer_horizon_lsn = state.timeline_start_lsn;
1205 136 : }
1206 817 : if state.local_start_lsn == Lsn(0) {
1207 136 : state.local_start_lsn = msg.start_streaming_at;
1208 136 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
1209 0 : }
1210 : // Initializing commit_lsn before acking first flushed record is
1211 : // important to let find_end_of_wal skip the hole in the beginning
1212 : // of the first segment.
1213 : //
1214 : // NB: on new clusters, this happens at the same time as
1215 : // timeline_start_lsn initialization, it is taken outside to provide
1216 : // upgrade.
1217 817 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
1218 :
1219 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
1220 817 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
1221 : // similar for remote_consistent_lsn
1222 817 : state.remote_consistent_lsn =
1223 817 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
1224 :
1225 817 : state.acceptor_state.term_history = msg.term_history.clone();
1226 817 : self.state.finish_change(&state).await?;
1227 : }
1228 :
1229 817 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
1230 :
1231 : // Cache LSN where term starts to immediately fsync control file with
1232 : // commit_lsn once we reach it -- sync-safekeepers finishes when
1233 : // persisted commit_lsn on majority of safekeepers aligns.
1234 817 : self.term_start_lsn = match msg.term_history.0.last() {
1235 0 : None => bail!("proposer elected with empty term history"),
1236 817 : Some(term_lsn_start) => term_lsn_start.lsn,
1237 : };
1238 :
1239 817 : Ok(None)
1240 8 : }
1241 :
1242 : /// Advance commit_lsn taking into account what we have locally.
1243 : ///
1244 : /// Note: it is assumed that 'WAL we have is from the right term' check has
1245 : /// already been done outside.
1246 2682 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
1247 : // Both peers and walproposer communicate this value, we might already
1248 : // have a fresher (higher) version.
1249 2682 : candidate = max(candidate, self.state.inmem.commit_lsn);
1250 2682 : let commit_lsn = min(candidate, self.flush_lsn());
1251 2682 : assert!(
1252 2682 : commit_lsn >= self.state.inmem.commit_lsn,
1253 0 : "commit_lsn monotonicity violated: old={} new={}",
1254 : self.state.inmem.commit_lsn,
1255 : commit_lsn
1256 : );
1257 :
1258 2682 : self.state.inmem.commit_lsn = commit_lsn;
1259 :
1260 : // If new commit_lsn reached term switch, force sync of control
1261 : // file: walproposer in sync mode is very interested when this
1262 : // happens. Note: this is for sync-safekeepers mode only, as
1263 : // otherwise commit_lsn might jump over term_start_lsn.
1264 2682 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
1265 118 : self.state.flush().await?;
1266 620 : }
1267 :
1268 2682 : Ok(())
1269 620 : }
1270 :
1271 : /// Handle request to append WAL.
1272 : #[allow(clippy::comparison_chain)]
1273 4061 : async fn handle_append_request(
1274 4061 : &mut self,
1275 4061 : msg: &AppendRequest,
1276 4061 : require_flush: bool,
1277 4061 : ) -> Result<Option<AcceptorProposerMessage>> {
1278 : // Refuse message on generation mismatch. On reconnect wp will get full
1279 : // configuration from greeting.
1280 4061 : if self.state.mconf.generation != msg.h.generation {
1281 1 : bail!(
1282 1 : "refusing append request due to generation mismatch: request {}, sk {}",
1283 : msg.h.generation,
1284 1 : self.state.mconf.generation
1285 : );
1286 624 : }
1287 :
1288 4060 : if self.state.acceptor_state.term < msg.h.term {
1289 0 : bail!("got AppendRequest before ProposerElected");
1290 624 : }
1291 :
1292 : // If our term is higher, immediately refuse the message. Send term only
1293 : // response; elected walproposer can never advance the term, so it will
1294 : // figure out the refusal from it -- which is important as term change
1295 : // should cause not just reconnection but whole walproposer re-election.
1296 4060 : if self.state.acceptor_state.term > msg.h.term {
1297 0 : let resp = AppendResponse::term_only(
1298 0 : self.state.mconf.generation,
1299 0 : self.state.acceptor_state.term,
1300 : );
1301 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
1302 624 : }
1303 :
1304 : // Disallow any non-sequential writes, which can result in gaps or
1305 : // overwrites. If we need to move the pointer, ProposerElected message
1306 : // should have truncated WAL first accordingly. Note that the first
1307 : // condition (WAL rewrite) is quite expected in real world; it happens
1308 : // when walproposer reconnects to safekeeper and writes some more data
1309 : // while first connection still gets some packets later. It might be
1310 : // better to not log this as error! above.
1311 4060 : let write_lsn = self.wal_store.write_lsn();
1312 4060 : let flush_lsn = self.wal_store.flush_lsn();
1313 4060 : if write_lsn > msg.h.begin_lsn {
1314 1 : bail!(
1315 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
1316 : write_lsn,
1317 : msg.h.begin_lsn
1318 : );
1319 623 : }
1320 4059 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
1321 0 : bail!(
1322 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
1323 : write_lsn,
1324 : msg.h.begin_lsn,
1325 : );
1326 623 : }
1327 :
1328 : // Now we know that we are in the same term as the proposer, process the
1329 : // message.
1330 :
1331 : // do the job
1332 4059 : if !msg.wal_data.is_empty() {
1333 1440 : self.wal_store
1334 1440 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
1335 1440 : .await?;
1336 0 : }
1337 :
1338 : // flush wal to the disk, if required
1339 4059 : if require_flush {
1340 3 : self.wal_store.flush_wal().await?;
1341 620 : }
1342 :
1343 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
1344 : // manager, off of the WAL ingest hot path.
1345 4059 : if msg.h.commit_lsn != Lsn(0) {
1346 2682 : self.update_commit_lsn(msg.h.commit_lsn).await?;
1347 3 : }
1348 : // Value calculated by walproposer can always lag:
1349 : // - safekeepers can forget inmem value and send to proposer lower
1350 : // persisted one on restart;
1351 : // - if we make safekeepers always send persistent value,
1352 : // any compute restart would pull it down.
1353 : // Thus, take max before adopting.
1354 4059 : self.state.inmem.peer_horizon_lsn =
1355 4059 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
1356 :
1357 4059 : trace!(
1358 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
1359 0 : msg.wal_data.len(),
1360 : msg.h.begin_lsn,
1361 : msg.h.end_lsn,
1362 : msg.h.commit_lsn,
1363 : msg.h.truncate_lsn,
1364 : require_flush,
1365 : );
1366 :
1367 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
1368 : // This is the common case for !require_flush, but a flush can still
1369 : // happen on segment bounds.
1370 4059 : if !require_flush && flush_lsn == self.flush_lsn() {
1371 4056 : return Ok(None);
1372 3 : }
1373 :
1374 3 : let resp = self.append_response();
1375 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1376 625 : }
1377 :
1378 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1379 3490 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1380 3490 : self.wal_store.flush_wal().await?;
1381 3490 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1382 3490 : self.append_response(),
1383 3490 : )))
1384 620 : }
1385 :
1386 : /// Update commit_lsn from peer safekeeper data.
1387 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1388 0 : if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
1389 : // Note: the check is too restrictive, generally we can update local
1390 : // commit_lsn if our history matches (is part of) history of advanced
1391 : // commit_lsn provider.
1392 0 : if sk_info.last_log_term == self.get_last_log_term() {
1393 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1394 0 : }
1395 0 : }
1396 0 : Ok(())
1397 0 : }
1398 : }
1399 :
1400 : #[cfg(test)]
1401 : mod tests {
1402 : use std::ops::Deref;
1403 : use std::str::FromStr;
1404 : use std::time::{Instant, UNIX_EPOCH};
1405 :
1406 : use futures::future::BoxFuture;
1407 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
1408 : use safekeeper_api::ServerInfo;
1409 : use safekeeper_api::membership::{
1410 : Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
1411 : };
1412 :
1413 : use super::*;
1414 : use crate::state::{EvictionState, TimelinePersistentState};
1415 :
1416 : // fake storage for tests
1417 : struct InMemoryState {
1418 : persisted_state: TimelinePersistentState,
1419 : }
1420 :
1421 : impl control_file::Storage for InMemoryState {
1422 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1423 5 : self.persisted_state = s.clone();
1424 5 : Ok(())
1425 5 : }
1426 :
1427 0 : fn last_persist_at(&self) -> Instant {
1428 0 : Instant::now()
1429 0 : }
1430 : }
1431 :
1432 : impl Deref for InMemoryState {
1433 : type Target = TimelinePersistentState;
1434 :
1435 100 : fn deref(&self) -> &Self::Target {
1436 100 : &self.persisted_state
1437 100 : }
1438 : }
1439 :
1440 3 : fn test_sk_state() -> TimelinePersistentState {
1441 3 : let mut state = TimelinePersistentState::empty();
1442 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1443 3 : state.tenant_id = TenantId::from([1u8; 16]);
1444 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1445 3 : state
1446 3 : }
1447 :
1448 : struct DummyWalStore {
1449 : lsn: Lsn,
1450 : }
1451 :
1452 : impl wal_storage::Storage for DummyWalStore {
1453 4 : fn write_lsn(&self) -> Lsn {
1454 4 : self.lsn
1455 4 : }
1456 :
1457 19 : fn flush_lsn(&self) -> Lsn {
1458 19 : self.lsn
1459 19 : }
1460 :
1461 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1462 2 : Ok(())
1463 2 : }
1464 :
1465 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1466 3 : self.lsn = startpos + buf.len() as u64;
1467 3 : Ok(())
1468 3 : }
1469 :
1470 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1471 2 : self.lsn = end_pos;
1472 2 : Ok(())
1473 2 : }
1474 :
1475 5 : async fn flush_wal(&mut self) -> Result<()> {
1476 5 : Ok(())
1477 5 : }
1478 :
1479 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1480 0 : Box::pin(async { Ok(()) })
1481 0 : }
1482 :
1483 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1484 0 : crate::metrics::WalStorageMetrics::default()
1485 0 : }
1486 : }
1487 :
1488 : #[tokio::test]
1489 1 : async fn test_voting() {
1490 1 : let storage = InMemoryState {
1491 1 : persisted_state: test_sk_state(),
1492 1 : };
1493 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1494 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1495 :
1496 : // Vote with generation mismatch should be rejected.
1497 1 : let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1498 1 : generation: SafekeeperGeneration::new(42),
1499 1 : term: 1,
1500 1 : });
1501 1 : assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
1502 :
1503 : // check voting for 1 is ok
1504 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1505 1 : generation: Generation::new(0),
1506 1 : term: 1,
1507 1 : });
1508 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1509 1 : match vote_resp.unwrap() {
1510 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
1511 0 : r => panic!("unexpected response: {r:?}"),
1512 : }
1513 :
1514 : // reboot...
1515 1 : let state = sk.state.deref().clone();
1516 1 : let storage = InMemoryState {
1517 1 : persisted_state: state,
1518 1 : };
1519 :
1520 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1521 :
1522 : // and ensure voting second time for 1 is not ok
1523 1 : vote_resp = sk.process_msg(&vote_request).await;
1524 1 : match vote_resp.unwrap() {
1525 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
1526 1 : r => panic!("unexpected response: {r:?}"),
1527 1 : }
1528 1 : }
1529 :
1530 : #[tokio::test]
1531 1 : async fn test_last_log_term_switch() {
1532 1 : let storage = InMemoryState {
1533 1 : persisted_state: test_sk_state(),
1534 1 : };
1535 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1536 :
1537 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1538 :
1539 1 : let mut ar_hdr = AppendRequestHeader {
1540 1 : generation: Generation::new(0),
1541 1 : term: 2,
1542 1 : begin_lsn: Lsn(1),
1543 1 : end_lsn: Lsn(2),
1544 1 : commit_lsn: Lsn(0),
1545 1 : truncate_lsn: Lsn(0),
1546 1 : };
1547 1 : let mut append_request = AppendRequest {
1548 1 : h: ar_hdr.clone(),
1549 1 : wal_data: Bytes::from_static(b"b"),
1550 1 : };
1551 :
1552 1 : let pem = ProposerElected {
1553 1 : generation: Generation::new(0),
1554 1 : term: 2,
1555 1 : start_streaming_at: Lsn(1),
1556 1 : term_history: TermHistory(vec![
1557 1 : TermLsn {
1558 1 : term: 1,
1559 1 : lsn: Lsn(1),
1560 1 : },
1561 1 : TermLsn {
1562 1 : term: 2,
1563 1 : lsn: Lsn(3),
1564 1 : },
1565 1 : ]),
1566 1 : };
1567 :
1568 : // check that elected msg with generation mismatch is rejected
1569 1 : let mut pem_gen_mismatch = pem.clone();
1570 1 : pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
1571 1 : assert!(
1572 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
1573 1 : .await
1574 1 : .is_err()
1575 : );
1576 :
1577 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1578 1 : .await
1579 1 : .unwrap();
1580 :
1581 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1582 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1583 1 : .await
1584 1 : .unwrap();
1585 1 : assert_eq!(sk.get_last_log_term(), 1);
1586 :
1587 : // but record at term_start_lsn does the switch
1588 1 : ar_hdr.begin_lsn = Lsn(2);
1589 1 : ar_hdr.end_lsn = Lsn(3);
1590 1 : append_request = AppendRequest {
1591 1 : h: ar_hdr,
1592 1 : wal_data: Bytes::from_static(b"b"),
1593 1 : };
1594 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1595 1 : .await
1596 1 : .unwrap();
1597 1 : assert_eq!(sk.get_last_log_term(), 2);
1598 1 : }
1599 :
1600 : #[tokio::test]
1601 1 : async fn test_non_consecutive_write() {
1602 1 : let storage = InMemoryState {
1603 1 : persisted_state: test_sk_state(),
1604 1 : };
1605 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1606 :
1607 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1608 :
1609 1 : let pem = ProposerElected {
1610 1 : generation: Generation::new(0),
1611 1 : term: 1,
1612 1 : start_streaming_at: Lsn(1),
1613 1 : term_history: TermHistory(vec![TermLsn {
1614 1 : term: 1,
1615 1 : lsn: Lsn(1),
1616 1 : }]),
1617 1 : };
1618 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1619 1 : .await
1620 1 : .unwrap();
1621 :
1622 1 : let ar_hdr = AppendRequestHeader {
1623 1 : generation: Generation::new(0),
1624 1 : term: 1,
1625 1 : begin_lsn: Lsn(1),
1626 1 : end_lsn: Lsn(2),
1627 1 : commit_lsn: Lsn(0),
1628 1 : truncate_lsn: Lsn(0),
1629 1 : };
1630 1 : let append_request = AppendRequest {
1631 1 : h: ar_hdr.clone(),
1632 1 : wal_data: Bytes::from_static(b"b"),
1633 1 : };
1634 :
1635 : // check that append request with generation mismatch is rejected
1636 1 : let mut ar_hdr_gen_mismatch = ar_hdr.clone();
1637 1 : ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
1638 1 : let append_request_gen_mismatch = AppendRequest {
1639 1 : h: ar_hdr_gen_mismatch,
1640 1 : wal_data: Bytes::from_static(b"b"),
1641 1 : };
1642 1 : assert!(
1643 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
1644 1 : append_request_gen_mismatch
1645 1 : ))
1646 1 : .await
1647 1 : .is_err()
1648 : );
1649 :
1650 : // do write ending at 2, it should be ok
1651 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1652 1 : .await
1653 1 : .unwrap();
1654 1 : let mut ar_hrd2 = ar_hdr.clone();
1655 1 : ar_hrd2.begin_lsn = Lsn(4);
1656 1 : ar_hrd2.end_lsn = Lsn(5);
1657 1 : let append_request = AppendRequest {
1658 1 : h: ar_hdr,
1659 1 : wal_data: Bytes::from_static(b"b"),
1660 1 : };
1661 : // and now starting at 4, it must fail
1662 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1663 1 : .await
1664 1 : .unwrap_err();
1665 1 : }
1666 :
1667 : #[test]
1668 1 : fn test_find_highest_common_point_none() {
1669 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1670 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1671 1 : assert_eq!(
1672 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1673 : None
1674 : );
1675 1 : }
1676 :
1677 : #[test]
1678 1 : fn test_find_highest_common_point_middle() {
1679 1 : let prop_th = TermHistory(vec![
1680 1 : (1, Lsn(10)).into(),
1681 1 : (2, Lsn(20)).into(),
1682 1 : (4, Lsn(40)).into(),
1683 1 : ]);
1684 1 : let sk_th = TermHistory(vec![
1685 1 : (1, Lsn(10)).into(),
1686 1 : (2, Lsn(20)).into(),
1687 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1688 1 : ]);
1689 1 : assert_eq!(
1690 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1691 : Some(TermLsn {
1692 : term: 2,
1693 : lsn: Lsn(30),
1694 : })
1695 : );
1696 1 : }
1697 :
1698 : #[test]
1699 1 : fn test_find_highest_common_point_sk_end() {
1700 1 : let prop_th = TermHistory(vec![
1701 1 : (1, Lsn(10)).into(),
1702 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1703 1 : (4, Lsn(40)).into(),
1704 1 : ]);
1705 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1706 1 : assert_eq!(
1707 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1708 : Some(TermLsn {
1709 : term: 2,
1710 : lsn: Lsn(32),
1711 : })
1712 : );
1713 1 : }
1714 :
1715 : #[test]
1716 1 : fn test_find_highest_common_point_walprop() {
1717 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1718 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1719 1 : assert_eq!(
1720 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1721 : Some(TermLsn {
1722 : term: 2,
1723 : lsn: Lsn(32),
1724 : })
1725 : );
1726 1 : }
1727 :
1728 : #[test]
1729 1 : fn test_sk_state_bincode_serde_roundtrip() {
1730 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1731 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1732 1 : let state = TimelinePersistentState {
1733 1 : tenant_id,
1734 1 : timeline_id,
1735 1 : mconf: Configuration {
1736 1 : generation: SafekeeperGeneration::new(42),
1737 1 : members: MemberSet::new(vec![SafekeeperId {
1738 1 : id: NodeId(1),
1739 1 : host: "hehe.org".to_owned(),
1740 1 : pg_port: 5432,
1741 1 : }])
1742 1 : .expect("duplicate member"),
1743 1 : new_members: None,
1744 1 : },
1745 1 : acceptor_state: AcceptorState {
1746 1 : term: 42,
1747 1 : term_history: TermHistory(vec![TermLsn {
1748 1 : lsn: Lsn(0x1),
1749 1 : term: 41,
1750 1 : }]),
1751 1 : },
1752 1 : server: ServerInfo {
1753 1 : pg_version: PgVersionId::from_full_pg_version(140000),
1754 1 : system_id: 0x1234567887654321,
1755 1 : wal_seg_size: 0x12345678,
1756 1 : },
1757 1 : proposer_uuid: {
1758 1 : let mut arr = timeline_id.as_arr();
1759 1 : arr.reverse();
1760 1 : arr
1761 1 : },
1762 1 : timeline_start_lsn: Lsn(0x12345600),
1763 1 : local_start_lsn: Lsn(0x12),
1764 1 : commit_lsn: Lsn(1234567800),
1765 1 : backup_lsn: Lsn(1234567300),
1766 1 : peer_horizon_lsn: Lsn(9999999),
1767 1 : remote_consistent_lsn: Lsn(1234560000),
1768 1 : partial_backup: crate::wal_backup_partial::State::default(),
1769 1 : eviction_state: EvictionState::Present,
1770 1 : creation_ts: UNIX_EPOCH,
1771 1 : };
1772 :
1773 1 : let ser = state.ser().unwrap();
1774 :
1775 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1776 :
1777 1 : assert_eq!(deser, state);
1778 1 : }
1779 : }
|