Line data Source code
1 : //! Acceptor part of proposer-acceptor consensus algorithm.
2 :
3 : use std::cmp::{max, min};
4 : use std::fmt;
5 : use std::io::Read;
6 : use std::str::FromStr;
7 :
8 : use anyhow::{Context, Result, bail};
9 : use byteorder::{LittleEndian, ReadBytesExt};
10 : use bytes::{Buf, BufMut, Bytes, BytesMut};
11 : use postgres_ffi::{MAX_SEND_SIZE, TimeLineID};
12 : use pq_proto::SystemId;
13 : use safekeeper_api::membership::{
14 : INVALID_GENERATION, MemberSet, SafekeeperGeneration as Generation, SafekeeperId,
15 : };
16 : use safekeeper_api::models::HotStandbyFeedback;
17 : use safekeeper_api::{Term, membership};
18 : use serde::{Deserialize, Serialize};
19 : use storage_broker::proto::SafekeeperTimelineInfo;
20 : use tracing::*;
21 : use utils::bin_ser::LeSer;
22 : use utils::id::{NodeId, TenantId, TimelineId};
23 : use utils::lsn::Lsn;
24 : use utils::pageserver_feedback::PageserverFeedback;
25 :
26 : use crate::metrics::MISC_OPERATION_SECONDS;
27 : use crate::state::TimelineState;
28 : use crate::{control_file, wal_storage};
29 :
30 : pub const SK_PROTO_VERSION_2: u32 = 2;
31 : pub const SK_PROTO_VERSION_3: u32 = 3;
32 : pub const UNKNOWN_SERVER_VERSION: u32 = 0;
33 :
34 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
35 : pub struct TermLsn {
36 : pub term: Term,
37 : pub lsn: Lsn,
38 : }
39 :
40 : // Creation from tuple provides less typing (e.g. for unit tests).
41 : impl From<(Term, Lsn)> for TermLsn {
42 1283 : fn from(pair: (Term, Lsn)) -> TermLsn {
43 1283 : TermLsn {
44 1283 : term: pair.0,
45 1283 : lsn: pair.1,
46 1283 : }
47 1283 : }
48 : }
49 :
50 0 : #[derive(Clone, Serialize, Deserialize, PartialEq)]
51 : pub struct TermHistory(pub Vec<TermLsn>);
52 :
53 : impl TermHistory {
54 1435 : pub fn empty() -> TermHistory {
55 1435 : TermHistory(Vec::new())
56 1435 : }
57 :
58 : // Parse TermHistory as n_entries followed by TermLsn pairs in network order.
59 613 : pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
60 613 : let n_entries = bytes
61 613 : .get_u32_f()
62 613 : .with_context(|| "TermHistory misses len")?;
63 613 : let mut res = Vec::with_capacity(n_entries as usize);
64 4962 : for i in 0..n_entries {
65 4962 : let term = bytes
66 4962 : .get_u64_f()
67 4962 : .with_context(|| format!("TermHistory pos {} misses term", i))?;
68 4962 : let lsn = bytes
69 4962 : .get_u64_f()
70 4962 : .with_context(|| format!("TermHistory pos {} misses lsn", i))?
71 4962 : .into();
72 4962 : res.push(TermLsn { term, lsn })
73 : }
74 613 : Ok(TermHistory(res))
75 613 : }
76 :
77 : // Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
78 : // TODO remove once v2 protocol is fully dropped.
79 0 : pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
80 0 : if bytes.remaining() < 4 {
81 0 : bail!("TermHistory misses len");
82 0 : }
83 0 : let n_entries = bytes.get_u32_le();
84 0 : let mut res = Vec::with_capacity(n_entries as usize);
85 0 : for _ in 0..n_entries {
86 0 : if bytes.remaining() < 16 {
87 0 : bail!("TermHistory is incomplete");
88 0 : }
89 0 : res.push(TermLsn {
90 0 : term: bytes.get_u64_le(),
91 0 : lsn: bytes.get_u64_le().into(),
92 0 : })
93 : }
94 0 : Ok(TermHistory(res))
95 0 : }
96 :
97 : /// Return copy of self with switches happening strictly after up_to
98 : /// truncated.
99 4597 : pub fn up_to(&self, up_to: Lsn) -> TermHistory {
100 4597 : let mut res = Vec::with_capacity(self.0.len());
101 17951 : for e in &self.0 {
102 13359 : if e.lsn > up_to {
103 5 : break;
104 13354 : }
105 13354 : res.push(*e);
106 : }
107 4597 : TermHistory(res)
108 4597 : }
109 :
110 : /// Find point of divergence between leader (walproposer) term history and
111 : /// safekeeper. Arguments are not symmetric as proposer history ends at
112 : /// +infinity while safekeeper at flush_lsn.
113 : /// C version is at walproposer SendProposerElected.
114 624 : pub fn find_highest_common_point(
115 624 : prop_th: &TermHistory,
116 624 : sk_th: &TermHistory,
117 624 : sk_wal_end: Lsn,
118 624 : ) -> Option<TermLsn> {
119 624 : let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
120 :
121 624 : if let Some(sk_th_last) = sk_th.last() {
122 489 : assert!(
123 489 : sk_th_last.lsn <= sk_wal_end,
124 0 : "safekeeper term history end {:?} LSN is higher than WAL end {:?}",
125 : sk_th_last,
126 : sk_wal_end
127 : );
128 135 : }
129 :
130 : // find last common term, if any...
131 624 : let mut last_common_idx = None;
132 4321 : for i in 0..min(sk_th.len(), prop_th.len()) {
133 4321 : if prop_th[i].term != sk_th[i].term {
134 5 : break;
135 4316 : }
136 4316 : // If term is the same, LSN must be equal as well.
137 4316 : assert!(
138 4316 : prop_th[i].lsn == sk_th[i].lsn,
139 0 : "same term {} has different start LSNs: prop {}, sk {}",
140 0 : prop_th[i].term,
141 0 : prop_th[i].lsn,
142 0 : sk_th[i].lsn
143 : );
144 4316 : last_common_idx = Some(i);
145 : }
146 624 : let last_common_idx = last_common_idx?;
147 : // Now find where it ends at both prop and sk and take min. End of
148 : // (common) term is the start of the next except it is the last one;
149 : // there it is flush_lsn in case of safekeeper or, in case of proposer
150 : // +infinity, so we just take flush_lsn then.
151 488 : if last_common_idx == prop_th.len() - 1 {
152 45 : Some(TermLsn {
153 45 : term: prop_th[last_common_idx].term,
154 45 : lsn: sk_wal_end,
155 45 : })
156 : } else {
157 443 : let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
158 443 : let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
159 4 : sk_th[last_common_idx + 1].lsn
160 : } else {
161 439 : sk_wal_end
162 : };
163 443 : Some(TermLsn {
164 443 : term: prop_th[last_common_idx].term,
165 443 : lsn: min(prop_common_term_end, sk_common_term_end),
166 443 : })
167 : }
168 624 : }
169 : }
170 :
171 : /// Display only latest entries for Debug.
172 : impl fmt::Debug for TermHistory {
173 336 : fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
174 336 : let n_printed = 20;
175 336 : write!(
176 336 : fmt,
177 336 : "{}{:?}",
178 336 : if self.0.len() > n_printed { "... " } else { "" },
179 336 : self.0
180 336 : .iter()
181 336 : .rev()
182 336 : .take(n_printed)
183 1256 : .map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
184 336 : .collect::<Vec<_>>()
185 336 : )
186 336 : }
187 : }
188 :
189 : /// Unique id of proposer. Not needed for correctness, used for monitoring.
190 : pub type PgUuid = [u8; 16];
191 :
192 : /// Persistent consensus state of the acceptor.
193 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194 : pub struct AcceptorState {
195 : /// acceptor's last term it voted for (advanced in 1 phase)
196 : pub term: Term,
197 : /// History of term switches for safekeeper's WAL.
198 : /// Actually it often goes *beyond* WAL contents as we adopt term history
199 : /// from the proposer before recovery.
200 : pub term_history: TermHistory,
201 : }
202 :
203 : impl AcceptorState {
204 : /// acceptor's last_log_term is the term of the highest entry in the log
205 1343 : pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
206 1343 : let th = self.term_history.up_to(flush_lsn);
207 1343 : match th.0.last() {
208 1337 : Some(e) => e.term,
209 6 : None => 0,
210 : }
211 1343 : }
212 : }
213 :
214 : // protocol messages
215 :
216 : /// Initial Proposer -> Acceptor message
217 0 : #[derive(Debug, Deserialize)]
218 : pub struct ProposerGreeting {
219 : pub tenant_id: TenantId,
220 : pub timeline_id: TimelineId,
221 : pub mconf: membership::Configuration,
222 : /// Postgres server version
223 : pub pg_version: u32,
224 : pub system_id: SystemId,
225 : pub wal_seg_size: u32,
226 : }
227 :
228 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
229 0 : #[derive(Debug, Deserialize)]
230 : pub struct ProposerGreetingV2 {
231 : /// proposer-acceptor protocol version
232 : pub protocol_version: u32,
233 : /// Postgres server version
234 : pub pg_version: u32,
235 : pub proposer_id: PgUuid,
236 : pub system_id: SystemId,
237 : pub timeline_id: TimelineId,
238 : pub tenant_id: TenantId,
239 : pub tli: TimeLineID,
240 : pub wal_seg_size: u32,
241 : }
242 :
243 : /// Acceptor -> Proposer initial response: the highest term known to me
244 : /// (acceptor voted for).
245 : #[derive(Debug, Serialize)]
246 : pub struct AcceptorGreeting {
247 : node_id: NodeId,
248 : mconf: membership::Configuration,
249 : term: u64,
250 : }
251 :
252 : /// Vote request sent from proposer to safekeepers
253 : #[derive(Debug)]
254 : pub struct VoteRequest {
255 : pub generation: Generation,
256 : pub term: Term,
257 : }
258 :
259 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
260 0 : #[derive(Debug, Deserialize)]
261 : pub struct VoteRequestV2 {
262 : pub term: Term,
263 : }
264 :
265 : /// Vote itself, sent from safekeeper to proposer
266 : #[derive(Debug, Serialize)]
267 : pub struct VoteResponse {
268 : generation: Generation, // membership conf generation
269 : pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
270 : vote_given: bool,
271 : // Safekeeper flush_lsn (end of WAL) + history of term switches allow
272 : // proposer to choose the most advanced one.
273 : pub flush_lsn: Lsn,
274 : truncate_lsn: Lsn,
275 : pub term_history: TermHistory,
276 : }
277 :
278 : /*
279 : * Proposer -> Acceptor message announcing proposer is elected and communicating
280 : * term history to it.
281 : */
282 : #[derive(Debug, Clone)]
283 : pub struct ProposerElected {
284 : pub generation: Generation, // membership conf generation
285 : pub term: Term,
286 : pub start_streaming_at: Lsn,
287 : pub term_history: TermHistory,
288 : }
289 :
290 : /// Request with WAL message sent from proposer to safekeeper. Along the way it
291 : /// communicates commit_lsn.
292 : #[derive(Debug)]
293 : pub struct AppendRequest {
294 : pub h: AppendRequestHeader,
295 : pub wal_data: Bytes,
296 : }
297 0 : #[derive(Debug, Clone, Deserialize)]
298 : pub struct AppendRequestHeader {
299 : pub generation: Generation, // membership conf generation
300 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
301 : pub term: Term,
302 : /// start position of message in WAL
303 : pub begin_lsn: Lsn,
304 : /// end position of message in WAL
305 : pub end_lsn: Lsn,
306 : /// LSN committed by quorum of safekeepers
307 : pub commit_lsn: Lsn,
308 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
309 : pub truncate_lsn: Lsn,
310 : }
311 :
312 : /// V2 of the message; exists as a struct because we (de)serialized it as is.
313 0 : #[derive(Debug, Clone, Deserialize)]
314 : pub struct AppendRequestHeaderV2 {
315 : // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
316 : pub term: Term,
317 : // TODO: remove this field from the protocol, it in unused -- LSN of term
318 : // switch can be taken from ProposerElected (as well as from term history).
319 : pub term_start_lsn: Lsn,
320 : /// start position of message in WAL
321 : pub begin_lsn: Lsn,
322 : /// end position of message in WAL
323 : pub end_lsn: Lsn,
324 : /// LSN committed by quorum of safekeepers
325 : pub commit_lsn: Lsn,
326 : /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
327 : pub truncate_lsn: Lsn,
328 : // only for logging/debugging
329 : pub proposer_uuid: PgUuid,
330 : }
331 :
332 : /// Report safekeeper state to proposer
333 : #[derive(Debug, Serialize, Clone)]
334 : pub struct AppendResponse {
335 : // Membership conf generation. Not strictly required because on mismatch
336 : // connection is reset, but let's sanity check it.
337 : generation: Generation,
338 : // Current term of the safekeeper; if it is higher than proposer's, the
339 : // compute is out of date.
340 : pub term: Term,
341 : // Flushed end of wal on safekeeper; one should be always mindful from what
342 : // term history this value comes, either checking history directly or
343 : // observing term being set to one for which WAL truncation is known to have
344 : // happened.
345 : pub flush_lsn: Lsn,
346 : // We report back our awareness about which WAL is committed, as this is
347 : // a criterion for walproposer --sync mode exit
348 : pub commit_lsn: Lsn,
349 : pub hs_feedback: HotStandbyFeedback,
350 : pub pageserver_feedback: Option<PageserverFeedback>,
351 : }
352 :
353 : impl AppendResponse {
354 0 : fn term_only(generation: Generation, term: Term) -> AppendResponse {
355 0 : AppendResponse {
356 0 : generation,
357 0 : term,
358 0 : flush_lsn: Lsn(0),
359 0 : commit_lsn: Lsn(0),
360 0 : hs_feedback: HotStandbyFeedback::empty(),
361 0 : pageserver_feedback: None,
362 0 : }
363 0 : }
364 : }
365 :
366 : /// Proposer -> Acceptor messages
367 : #[derive(Debug)]
368 : pub enum ProposerAcceptorMessage {
369 : Greeting(ProposerGreeting),
370 : VoteRequest(VoteRequest),
371 : Elected(ProposerElected),
372 : AppendRequest(AppendRequest),
373 : NoFlushAppendRequest(AppendRequest),
374 : FlushWAL,
375 : }
376 :
377 : /// Augment Bytes with fallible get_uN where N is number of bytes methods.
378 : /// All reads are in network (big endian) order.
379 : trait BytesF {
380 : fn get_u8_f(&mut self) -> Result<u8>;
381 : fn get_u16_f(&mut self) -> Result<u16>;
382 : fn get_u32_f(&mut self) -> Result<u32>;
383 : fn get_u64_f(&mut self) -> Result<u64>;
384 : }
385 :
386 : impl BytesF for Bytes {
387 25311 : fn get_u8_f(&mut self) -> Result<u8> {
388 25311 : if self.is_empty() {
389 0 : bail!("no bytes left, expected 1");
390 25311 : }
391 25311 : Ok(self.get_u8())
392 25311 : }
393 0 : fn get_u16_f(&mut self) -> Result<u16> {
394 0 : if self.remaining() < 2 {
395 0 : bail!("no bytes left, expected 2");
396 0 : }
397 0 : Ok(self.get_u16())
398 0 : }
399 103192 : fn get_u32_f(&mut self) -> Result<u32> {
400 103192 : if self.remaining() < 4 {
401 0 : bail!("only {} bytes left, expected 4", self.remaining());
402 103192 : }
403 103192 : Ok(self.get_u32())
404 103192 : }
405 46844 : fn get_u64_f(&mut self) -> Result<u64> {
406 46844 : if self.remaining() < 8 {
407 0 : bail!("only {} bytes left, expected 8", self.remaining());
408 46844 : }
409 46844 : Ok(self.get_u64())
410 46844 : }
411 : }
412 :
413 : impl ProposerAcceptorMessage {
414 : /// Read cstring from Bytes.
415 38634 : fn get_cstr(buf: &mut Bytes) -> Result<String> {
416 38634 : let pos = buf
417 38634 : .iter()
418 1274922 : .position(|x| *x == 0)
419 38634 : .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
420 38634 : let result = buf.split_to(pos);
421 38634 : buf.advance(1); // drop the null terminator
422 38634 : match std::str::from_utf8(&result) {
423 38634 : Ok(s) => Ok(s.to_string()),
424 0 : Err(e) => bail!("invalid utf8 in cstring: {}", e),
425 : }
426 38634 : }
427 :
428 : /// Read membership::Configuration from Bytes.
429 19317 : fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
430 19317 : let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
431 19317 : let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
432 : // Main member set must have at least someone in valid configuration.
433 : // Empty conf is allowed until we fully migrate.
434 19317 : if generation != INVALID_GENERATION && members_len == 0 {
435 0 : bail!("empty members_len");
436 19317 : }
437 19317 : let mut members = MemberSet::empty();
438 19317 : for i in 0..members_len {
439 0 : let id = buf
440 0 : .get_u64_f()
441 0 : .with_context(|| format!("reading member {} node_id", i))?;
442 0 : let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
443 0 : let pg_port = buf
444 0 : .get_u16_f()
445 0 : .with_context(|| format!("reading member {} port", i))?;
446 0 : let sk = SafekeeperId {
447 0 : id: NodeId(id),
448 0 : host,
449 0 : pg_port,
450 0 : };
451 0 : members.add(sk)?;
452 : }
453 19317 : let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
454 : // Non joint conf.
455 19317 : if new_members_len == 0 {
456 19317 : Ok(membership::Configuration {
457 19317 : generation,
458 19317 : members,
459 19317 : new_members: None,
460 19317 : })
461 : } else {
462 0 : let mut new_members = MemberSet::empty();
463 0 : for i in 0..new_members_len {
464 0 : let id = buf
465 0 : .get_u64_f()
466 0 : .with_context(|| format!("reading new member {} node_id", i))?;
467 0 : let host = Self::get_cstr(buf)
468 0 : .with_context(|| format!("reading new member {} host", i))?;
469 0 : let pg_port = buf
470 0 : .get_u16_f()
471 0 : .with_context(|| format!("reading new member {} port", i))?;
472 0 : let sk = SafekeeperId {
473 0 : id: NodeId(id),
474 0 : host,
475 0 : pg_port,
476 0 : };
477 0 : new_members.add(sk)?;
478 : }
479 0 : Ok(membership::Configuration {
480 0 : generation,
481 0 : members,
482 0 : new_members: Some(new_members),
483 0 : })
484 : }
485 19317 : }
486 :
487 : /// Parse proposer message.
488 25311 : pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
489 25311 : if proto_version == SK_PROTO_VERSION_3 {
490 25311 : if msg_bytes.is_empty() {
491 0 : bail!("ProposerAcceptorMessage is not complete: missing tag");
492 25311 : }
493 25311 : let tag = msg_bytes.get_u8_f().with_context(|| {
494 0 : "ProposerAcceptorMessage is not complete: missing tag".to_string()
495 25311 : })? as char;
496 25311 : match tag {
497 : 'g' => {
498 19317 : let tenant_id_str =
499 19317 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
500 19317 : let tenant_id = TenantId::from_str(&tenant_id_str)?;
501 19317 : let timeline_id_str =
502 19317 : Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
503 19317 : let timeline_id = TimelineId::from_str(&timeline_id_str)?;
504 19317 : let mconf = Self::get_mconf(&mut msg_bytes)?;
505 19317 : let pg_version = msg_bytes
506 19317 : .get_u32_f()
507 19317 : .with_context(|| "reading pg_version")?;
508 19317 : let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
509 19317 : let wal_seg_size = msg_bytes
510 19317 : .get_u32_f()
511 19317 : .with_context(|| "reading wal_seg_size")?;
512 19317 : let g = ProposerGreeting {
513 19317 : tenant_id,
514 19317 : timeline_id,
515 19317 : mconf,
516 19317 : pg_version,
517 19317 : system_id,
518 19317 : wal_seg_size,
519 19317 : };
520 19317 : Ok(ProposerAcceptorMessage::Greeting(g))
521 : }
522 : 'v' => {
523 2632 : let generation = Generation::new(
524 2632 : msg_bytes
525 2632 : .get_u32_f()
526 2632 : .with_context(|| "reading generation")?,
527 : );
528 2632 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
529 2632 : let v = VoteRequest { generation, term };
530 2632 : Ok(ProposerAcceptorMessage::VoteRequest(v))
531 : }
532 : 'e' => {
533 613 : let generation = Generation::new(
534 613 : msg_bytes
535 613 : .get_u32_f()
536 613 : .with_context(|| "reading generation")?,
537 : );
538 613 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
539 613 : let start_streaming_at: Lsn = msg_bytes
540 613 : .get_u64_f()
541 613 : .with_context(|| "reading start_streaming_at")?
542 613 : .into();
543 613 : let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
544 613 : let msg = ProposerElected {
545 613 : generation,
546 613 : term,
547 613 : start_streaming_at,
548 613 : term_history,
549 613 : };
550 613 : Ok(ProposerAcceptorMessage::Elected(msg))
551 : }
552 : 'a' => {
553 2749 : let generation = Generation::new(
554 2749 : msg_bytes
555 2749 : .get_u32_f()
556 2749 : .with_context(|| "reading generation")?,
557 : );
558 2749 : let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
559 2749 : let begin_lsn: Lsn = msg_bytes
560 2749 : .get_u64_f()
561 2749 : .with_context(|| "reading begin_lsn")?
562 2749 : .into();
563 2749 : let end_lsn: Lsn = msg_bytes
564 2749 : .get_u64_f()
565 2749 : .with_context(|| "reading end_lsn")?
566 2749 : .into();
567 2749 : let commit_lsn: Lsn = msg_bytes
568 2749 : .get_u64_f()
569 2749 : .with_context(|| "reading commit_lsn")?
570 2749 : .into();
571 2749 : let truncate_lsn: Lsn = msg_bytes
572 2749 : .get_u64_f()
573 2749 : .with_context(|| "reading truncate_lsn")?
574 2749 : .into();
575 2749 : let hdr = AppendRequestHeader {
576 2749 : generation,
577 2749 : term,
578 2749 : begin_lsn,
579 2749 : end_lsn,
580 2749 : commit_lsn,
581 2749 : truncate_lsn,
582 2749 : };
583 2749 : let rec_size = hdr
584 2749 : .end_lsn
585 2749 : .checked_sub(hdr.begin_lsn)
586 2749 : .context("begin_lsn > end_lsn in AppendRequest")?
587 : .0 as usize;
588 2749 : if rec_size > MAX_SEND_SIZE {
589 0 : bail!(
590 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
591 0 : MAX_SEND_SIZE
592 0 : );
593 2749 : }
594 2749 : if msg_bytes.remaining() < rec_size {
595 0 : bail!(
596 0 : "reading WAL: only {} bytes left, wanted {}",
597 0 : msg_bytes.remaining(),
598 0 : rec_size
599 0 : );
600 2749 : }
601 2749 : let wal_data = msg_bytes.copy_to_bytes(rec_size);
602 2749 : let msg = AppendRequest { h: hdr, wal_data };
603 2749 :
604 2749 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
605 : }
606 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
607 : }
608 0 : } else if proto_version == SK_PROTO_VERSION_2 {
609 : // xxx using Reader is inefficient but easy to work with bincode
610 0 : let mut stream = msg_bytes.reader();
611 : // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
612 0 : let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
613 0 : match tag {
614 : 'g' => {
615 0 : let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
616 0 : let g = ProposerGreeting {
617 0 : tenant_id: msgv2.tenant_id,
618 0 : timeline_id: msgv2.timeline_id,
619 0 : mconf: membership::Configuration {
620 0 : generation: INVALID_GENERATION,
621 0 : members: MemberSet::empty(),
622 0 : new_members: None,
623 0 : },
624 0 : pg_version: msgv2.pg_version,
625 0 : system_id: msgv2.system_id,
626 0 : wal_seg_size: msgv2.wal_seg_size,
627 0 : };
628 0 : Ok(ProposerAcceptorMessage::Greeting(g))
629 : }
630 : 'v' => {
631 0 : let msg = VoteRequestV2::des_from(&mut stream)?;
632 0 : let v = VoteRequest {
633 0 : generation: INVALID_GENERATION,
634 0 : term: msg.term,
635 0 : };
636 0 : Ok(ProposerAcceptorMessage::VoteRequest(v))
637 : }
638 : 'e' => {
639 0 : let mut msg_bytes = stream.into_inner();
640 0 : if msg_bytes.remaining() < 16 {
641 0 : bail!("ProposerElected message is not complete");
642 0 : }
643 0 : let term = msg_bytes.get_u64_le();
644 0 : let start_streaming_at = msg_bytes.get_u64_le().into();
645 0 : let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
646 0 : if msg_bytes.remaining() < 8 {
647 0 : bail!("ProposerElected message is not complete");
648 0 : }
649 0 : let _timeline_start_lsn = msg_bytes.get_u64_le();
650 0 : let msg = ProposerElected {
651 0 : generation: INVALID_GENERATION,
652 0 : term,
653 0 : start_streaming_at,
654 0 : term_history,
655 0 : };
656 0 : Ok(ProposerAcceptorMessage::Elected(msg))
657 : }
658 : 'a' => {
659 : // read header followed by wal data
660 0 : let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
661 0 : let hdr = AppendRequestHeader {
662 0 : generation: INVALID_GENERATION,
663 0 : term: hdrv2.term,
664 0 : begin_lsn: hdrv2.begin_lsn,
665 0 : end_lsn: hdrv2.end_lsn,
666 0 : commit_lsn: hdrv2.commit_lsn,
667 0 : truncate_lsn: hdrv2.truncate_lsn,
668 0 : };
669 0 : let rec_size = hdr
670 0 : .end_lsn
671 0 : .checked_sub(hdr.begin_lsn)
672 0 : .context("begin_lsn > end_lsn in AppendRequest")?
673 : .0 as usize;
674 0 : if rec_size > MAX_SEND_SIZE {
675 0 : bail!(
676 0 : "AppendRequest is longer than MAX_SEND_SIZE ({})",
677 0 : MAX_SEND_SIZE
678 0 : );
679 0 : }
680 0 :
681 0 : let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
682 0 : stream.read_exact(&mut wal_data_vec)?;
683 0 : let wal_data = Bytes::from(wal_data_vec);
684 0 :
685 0 : let msg = AppendRequest { h: hdr, wal_data };
686 0 :
687 0 : Ok(ProposerAcceptorMessage::AppendRequest(msg))
688 : }
689 0 : _ => bail!("unknown proposer-acceptor message tag: {}", tag),
690 : }
691 : } else {
692 0 : bail!("unsupported protocol version {}", proto_version);
693 : }
694 25311 : }
695 :
696 : /// The memory size of the message, including byte slices.
697 620 : pub fn size(&self) -> usize {
698 : const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
699 :
700 : // For most types, the size is just the base enum size including the nested structs. Some
701 : // types also contain byte slices; add them.
702 : //
703 : // We explicitly list all fields, to draw attention here when new fields are added.
704 620 : let mut size = BASE_SIZE;
705 620 : size += match self {
706 0 : Self::Greeting(_) => 0,
707 :
708 0 : Self::VoteRequest(_) => 0,
709 :
710 0 : Self::Elected(_) => 0,
711 :
712 : Self::AppendRequest(AppendRequest {
713 : h:
714 : AppendRequestHeader {
715 : generation: _,
716 : term: _,
717 : begin_lsn: _,
718 : end_lsn: _,
719 : commit_lsn: _,
720 : truncate_lsn: _,
721 : },
722 620 : wal_data,
723 620 : }) => wal_data.len(),
724 :
725 : Self::NoFlushAppendRequest(AppendRequest {
726 : h:
727 : AppendRequestHeader {
728 : generation: _,
729 : term: _,
730 : begin_lsn: _,
731 : end_lsn: _,
732 : commit_lsn: _,
733 : truncate_lsn: _,
734 : },
735 0 : wal_data,
736 0 : }) => wal_data.len(),
737 :
738 0 : Self::FlushWAL => 0,
739 : };
740 :
741 620 : size
742 620 : }
743 : }
744 :
745 : /// Acceptor -> Proposer messages
746 : #[derive(Debug)]
747 : pub enum AcceptorProposerMessage {
748 : Greeting(AcceptorGreeting),
749 : VoteResponse(VoteResponse),
750 : AppendResponse(AppendResponse),
751 : }
752 :
753 : impl AcceptorProposerMessage {
754 0 : fn put_cstr(buf: &mut BytesMut, s: &str) {
755 0 : buf.put_slice(s.as_bytes());
756 0 : buf.put_u8(0); // null terminator
757 0 : }
758 :
759 : /// Serialize membership::Configuration into buf.
760 19317 : fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
761 19317 : buf.put_u32(mconf.generation.into_inner());
762 19317 : buf.put_u32(mconf.members.m.len() as u32);
763 19317 : for sk in &mconf.members.m {
764 0 : buf.put_u64(sk.id.0);
765 0 : Self::put_cstr(buf, &sk.host);
766 0 : buf.put_u16(sk.pg_port);
767 0 : }
768 19317 : if let Some(ref new_members) = mconf.new_members {
769 0 : buf.put_u32(new_members.m.len() as u32);
770 0 : for sk in &new_members.m {
771 0 : buf.put_u64(sk.id.0);
772 0 : Self::put_cstr(buf, &sk.host);
773 0 : buf.put_u16(sk.pg_port);
774 0 : }
775 19317 : } else {
776 19317 : buf.put_u32(0);
777 19317 : }
778 19317 : }
779 :
780 : /// Serialize acceptor -> proposer message.
781 24279 : pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
782 24279 : if proto_version == SK_PROTO_VERSION_3 {
783 24279 : match self {
784 19317 : AcceptorProposerMessage::Greeting(msg) => {
785 19317 : buf.put_u8(b'g');
786 19317 : buf.put_u64(msg.node_id.0);
787 19317 : Self::serialize_mconf(buf, &msg.mconf);
788 19317 : buf.put_u64(msg.term)
789 : }
790 2632 : AcceptorProposerMessage::VoteResponse(msg) => {
791 2632 : buf.put_u8(b'v');
792 2632 : buf.put_u32(msg.generation.into_inner());
793 2632 : buf.put_u64(msg.term);
794 2632 : buf.put_u8(msg.vote_given as u8);
795 2632 : buf.put_u64(msg.flush_lsn.into());
796 2632 : buf.put_u64(msg.truncate_lsn.into());
797 2632 : buf.put_u32(msg.term_history.0.len() as u32);
798 10141 : for e in &msg.term_history.0 {
799 7509 : buf.put_u64(e.term);
800 7509 : buf.put_u64(e.lsn.into());
801 7509 : }
802 : }
803 2330 : AcceptorProposerMessage::AppendResponse(msg) => {
804 2330 : buf.put_u8(b'a');
805 2330 : buf.put_u32(msg.generation.into_inner());
806 2330 : buf.put_u64(msg.term);
807 2330 : buf.put_u64(msg.flush_lsn.into());
808 2330 : buf.put_u64(msg.commit_lsn.into());
809 2330 : buf.put_i64(msg.hs_feedback.ts);
810 2330 : buf.put_u64(msg.hs_feedback.xmin);
811 2330 : buf.put_u64(msg.hs_feedback.catalog_xmin);
812 :
813 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
814 : // if it is not present.
815 2330 : if let Some(ref msg) = msg.pageserver_feedback {
816 0 : msg.serialize(buf);
817 2330 : }
818 : }
819 : }
820 24279 : Ok(())
821 : // TODO remove 3 after converting all msgs
822 0 : } else if proto_version == SK_PROTO_VERSION_2 {
823 0 : match self {
824 0 : AcceptorProposerMessage::Greeting(msg) => {
825 0 : buf.put_u64_le('g' as u64);
826 0 : // v2 didn't have mconf and fields were reordered
827 0 : buf.put_u64_le(msg.term);
828 0 : buf.put_u64_le(msg.node_id.0);
829 0 : }
830 0 : AcceptorProposerMessage::VoteResponse(msg) => {
831 0 : // v2 didn't have generation, had u64 vote_given and timeline_start_lsn
832 0 : buf.put_u64_le('v' as u64);
833 0 : buf.put_u64_le(msg.term);
834 0 : buf.put_u64_le(msg.vote_given as u64);
835 0 : buf.put_u64_le(msg.flush_lsn.into());
836 0 : buf.put_u64_le(msg.truncate_lsn.into());
837 0 : buf.put_u32_le(msg.term_history.0.len() as u32);
838 0 : for e in &msg.term_history.0 {
839 0 : buf.put_u64_le(e.term);
840 0 : buf.put_u64_le(e.lsn.into());
841 0 : }
842 : // removed timeline_start_lsn
843 0 : buf.put_u64_le(0);
844 : }
845 0 : AcceptorProposerMessage::AppendResponse(msg) => {
846 0 : // v2 didn't have generation
847 0 : buf.put_u64_le('a' as u64);
848 0 : buf.put_u64_le(msg.term);
849 0 : buf.put_u64_le(msg.flush_lsn.into());
850 0 : buf.put_u64_le(msg.commit_lsn.into());
851 0 : buf.put_i64_le(msg.hs_feedback.ts);
852 0 : buf.put_u64_le(msg.hs_feedback.xmin);
853 0 : buf.put_u64_le(msg.hs_feedback.catalog_xmin);
854 :
855 : // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
856 : // if it is not present.
857 0 : if let Some(ref msg) = msg.pageserver_feedback {
858 0 : msg.serialize(buf);
859 0 : }
860 : }
861 : }
862 0 : Ok(())
863 : } else {
864 0 : bail!("unsupported protocol version {}", proto_version);
865 : }
866 24279 : }
867 : }
868 :
869 : /// Safekeeper implements consensus to reliably persist WAL across nodes.
870 : /// It controls all WAL disk writes and updates of control file.
871 : ///
872 : /// Currently safekeeper processes:
873 : /// - messages from compute (proposers) and provides replies
874 : /// - messages from broker peers
875 : pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
876 : /// LSN since the proposer safekeeper currently talking to appends WAL;
877 : /// determines last_log_term switch point.
878 : pub term_start_lsn: Lsn,
879 :
880 : pub state: TimelineState<CTRL>, // persistent state storage
881 : pub wal_store: WAL,
882 :
883 : node_id: NodeId, // safekeeper's node id
884 : }
885 :
886 : impl<CTRL, WAL> SafeKeeper<CTRL, WAL>
887 : where
888 : CTRL: control_file::Storage,
889 : WAL: wal_storage::Storage,
890 : {
891 : /// Accepts a control file storage containing the safekeeper state.
892 : /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
893 : /// and `server` (`wal_seg_size` inside it) fields.
894 8602 : pub fn new(
895 8602 : state: TimelineState<CTRL>,
896 8602 : wal_store: WAL,
897 8602 : node_id: NodeId,
898 8602 : ) -> Result<SafeKeeper<CTRL, WAL>> {
899 8602 : if state.tenant_id == TenantId::from([0u8; 16])
900 8602 : || state.timeline_id == TimelineId::from([0u8; 16])
901 : {
902 0 : bail!(
903 0 : "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
904 0 : state.tenant_id,
905 0 : state.timeline_id
906 0 : );
907 8602 : }
908 8602 :
909 8602 : Ok(SafeKeeper {
910 8602 : term_start_lsn: Lsn(0),
911 8602 : state,
912 8602 : wal_store,
913 8602 : node_id,
914 8602 : })
915 9 : }
916 :
917 : /// Get history of term switches for the available WAL
918 3254 : fn get_term_history(&self) -> TermHistory {
919 3254 : self.state
920 3254 : .acceptor_state
921 3254 : .term_history
922 3254 : .up_to(self.flush_lsn())
923 3254 : }
924 :
925 43 : pub fn get_last_log_term(&self) -> Term {
926 43 : self.state
927 43 : .acceptor_state
928 43 : .get_last_log_term(self.flush_lsn())
929 43 : }
930 :
931 : /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
932 15282 : pub fn flush_lsn(&self) -> Lsn {
933 15282 : max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
934 15282 : }
935 :
936 : /// Process message from proposer and possibly form reply. Concurrent
937 : /// callers must exclude each other.
938 28897 : pub async fn process_msg(
939 28897 : &mut self,
940 28897 : msg: &ProposerAcceptorMessage,
941 28897 : ) -> Result<Option<AcceptorProposerMessage>> {
942 28897 : match msg {
943 19317 : ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
944 2635 : ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
945 621 : ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
946 5 : ProposerAcceptorMessage::AppendRequest(msg) => {
947 5 : self.handle_append_request(msg, true).await
948 : }
949 3369 : ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
950 3369 : self.handle_append_request(msg, false).await
951 : }
952 2950 : ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
953 : }
954 1256 : }
955 :
956 : /// Handle initial message from proposer: check its sanity and send my
957 : /// current term.
958 19317 : async fn handle_greeting(
959 19317 : &mut self,
960 19317 : msg: &ProposerGreeting,
961 19317 : ) -> Result<Option<AcceptorProposerMessage>> {
962 19317 : /* Postgres major version mismatch is treated as fatal error
963 19317 : * because safekeepers parse WAL headers and the format
964 19317 : * may change between versions.
965 19317 : */
966 19317 : if msg.pg_version / 10000 != self.state.server.pg_version / 10000
967 0 : && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
968 : {
969 0 : bail!(
970 0 : "incompatible server version {}, expected {}",
971 0 : msg.pg_version,
972 0 : self.state.server.pg_version
973 0 : );
974 19317 : }
975 19317 :
976 19317 : if msg.tenant_id != self.state.tenant_id {
977 0 : bail!(
978 0 : "invalid tenant ID, got {}, expected {}",
979 0 : msg.tenant_id,
980 0 : self.state.tenant_id
981 0 : );
982 19317 : }
983 19317 : if msg.timeline_id != self.state.timeline_id {
984 0 : bail!(
985 0 : "invalid timeline ID, got {}, expected {}",
986 0 : msg.timeline_id,
987 0 : self.state.timeline_id
988 0 : );
989 19317 : }
990 19317 : if self.state.server.wal_seg_size != msg.wal_seg_size {
991 0 : bail!(
992 0 : "invalid wal_seg_size, got {}, expected {}",
993 0 : msg.wal_seg_size,
994 0 : self.state.server.wal_seg_size
995 0 : );
996 19317 : }
997 19317 :
998 19317 : // system_id will be updated on mismatch
999 19317 : // sync-safekeepers doesn't know sysid and sends 0, ignore it
1000 19317 : if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
1001 0 : if self.state.server.system_id != 0 {
1002 0 : warn!(
1003 0 : "unexpected system ID arrived, got {}, expected {}",
1004 0 : msg.system_id, self.state.server.system_id
1005 : );
1006 0 : }
1007 :
1008 0 : let mut state = self.state.start_change();
1009 0 : state.server.system_id = msg.system_id;
1010 0 : if msg.pg_version != UNKNOWN_SERVER_VERSION {
1011 0 : state.server.pg_version = msg.pg_version;
1012 0 : }
1013 0 : self.state.finish_change(&state).await?;
1014 0 : }
1015 :
1016 : // Switch into conf given by proposer conf if it is higher.
1017 19317 : self.state.membership_switch(msg.mconf.clone()).await?;
1018 :
1019 19317 : let apg = AcceptorGreeting {
1020 19317 : node_id: self.node_id,
1021 19317 : mconf: self.state.mconf.clone(),
1022 19317 : term: self.state.acceptor_state.term,
1023 19317 : };
1024 19317 : info!(
1025 0 : "processed greeting {:?} from walproposer, sending {:?}",
1026 : msg, apg
1027 : );
1028 19317 : Ok(Some(AcceptorProposerMessage::Greeting(apg)))
1029 0 : }
1030 :
1031 : /// Give vote for the given term, if we haven't done that previously.
1032 2635 : async fn handle_vote_request(
1033 2635 : &mut self,
1034 2635 : msg: &VoteRequest,
1035 2635 : ) -> Result<Option<AcceptorProposerMessage>> {
1036 2635 : if self.state.mconf.generation != msg.generation {
1037 1 : bail!(
1038 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1039 1 : msg,
1040 1 : self.state.mconf.generation
1041 1 : );
1042 2634 : }
1043 2634 : // Once voted, we won't accept data from older proposers; flush
1044 2634 : // everything we've already received so that new proposer starts
1045 2634 : // streaming at end of our WAL, without overlap. WAL is truncated at
1046 2634 : // streaming point and commit_lsn may be advanced from peers, so this
1047 2634 : // also avoids possible spurious attempt to truncate committed WAL.
1048 2634 : self.wal_store.flush_wal().await?;
1049 : // initialize with refusal
1050 2634 : let mut resp = VoteResponse {
1051 2634 : generation: self.state.mconf.generation,
1052 2634 : term: self.state.acceptor_state.term,
1053 2634 : vote_given: false,
1054 2634 : flush_lsn: self.flush_lsn(),
1055 2634 : truncate_lsn: self.state.inmem.peer_horizon_lsn,
1056 2634 : term_history: self.get_term_history(),
1057 2634 : };
1058 2634 : if self.state.acceptor_state.term < msg.term {
1059 2500 : let mut state = self.state.start_change();
1060 2500 : state.acceptor_state.term = msg.term;
1061 2500 : // persist vote before sending it out
1062 2500 : self.state.finish_change(&state).await?;
1063 :
1064 2500 : resp.term = self.state.acceptor_state.term;
1065 2500 : resp.vote_given = true;
1066 1 : }
1067 2634 : info!("processed {:?}: sending {:?}", msg, &resp);
1068 2634 : Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
1069 3 : }
1070 :
1071 : /// Form AppendResponse from current state.
1072 2953 : fn append_response(&self) -> AppendResponse {
1073 2953 : let ar = AppendResponse {
1074 2953 : generation: self.state.mconf.generation,
1075 2953 : term: self.state.acceptor_state.term,
1076 2953 : flush_lsn: self.flush_lsn(),
1077 2953 : commit_lsn: self.state.commit_lsn,
1078 2953 : // will be filled by the upper code to avoid bothering safekeeper
1079 2953 : hs_feedback: HotStandbyFeedback::empty(),
1080 2953 : pageserver_feedback: None,
1081 2953 : };
1082 2953 : trace!("formed AppendResponse {:?}", ar);
1083 2953 : ar
1084 2953 : }
1085 :
1086 621 : async fn handle_elected(
1087 621 : &mut self,
1088 621 : msg: &ProposerElected,
1089 621 : ) -> Result<Option<AcceptorProposerMessage>> {
1090 621 : let _timer = MISC_OPERATION_SECONDS
1091 621 : .with_label_values(&["handle_elected"])
1092 621 : .start_timer();
1093 621 :
1094 621 : info!(
1095 0 : "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
1096 0 : msg,
1097 0 : self.state.acceptor_state.term,
1098 0 : self.get_last_log_term(),
1099 0 : self.flush_lsn()
1100 : );
1101 621 : if self.state.mconf.generation != msg.generation {
1102 1 : bail!(
1103 1 : "refusing {:?} due to generation mismatch: sk generation {}",
1104 1 : msg,
1105 1 : self.state.mconf.generation
1106 1 : );
1107 620 : }
1108 620 : if self.state.acceptor_state.term < msg.term {
1109 7 : let mut state = self.state.start_change();
1110 7 : state.acceptor_state.term = msg.term;
1111 7 : self.state.finish_change(&state).await?;
1112 0 : }
1113 :
1114 : // If our term is higher, ignore the message (next feedback will inform the compute)
1115 620 : if self.state.acceptor_state.term > msg.term {
1116 0 : return Ok(None);
1117 620 : }
1118 620 :
1119 620 : // Before truncating WAL check-cross the check divergence point received
1120 620 : // from the walproposer.
1121 620 : let sk_th = self.get_term_history();
1122 620 : let last_common_point = match TermHistory::find_highest_common_point(
1123 620 : &msg.term_history,
1124 620 : &sk_th,
1125 620 : self.flush_lsn(),
1126 620 : ) {
1127 : // No common point. Expect streaming from the beginning of the
1128 : // history like walproposer while we don't have proper init.
1129 135 : None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
1130 135 : "empty walproposer term history {:?}",
1131 135 : msg.term_history
1132 135 : ))?,
1133 485 : Some(lcp) => lcp,
1134 : };
1135 : // This is expected to happen in a rare race when another connection
1136 : // from the same walproposer writes + flushes WAL after this connection
1137 : // sent flush_lsn in VoteRequest; for instance, very late
1138 : // ProposerElected message delivery after another connection was
1139 : // established and wrote WAL. In such cases error is transient;
1140 : // reconnection makes safekeeper send newest term history and flush_lsn
1141 : // and walproposer recalculates the streaming point. OTOH repeating
1142 : // error indicates a serious bug.
1143 620 : if last_common_point.lsn != msg.start_streaming_at {
1144 0 : bail!(
1145 0 : "refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1146 0 : last_common_point,
1147 0 : msg.start_streaming_at,
1148 0 : self.state.acceptor_state.term,
1149 0 : sk_th,
1150 0 : self.flush_lsn(),
1151 0 : msg.term_history,
1152 0 : );
1153 620 : }
1154 620 :
1155 620 : // We are also expected to never attempt to truncate committed data.
1156 620 : assert!(
1157 620 : msg.start_streaming_at >= self.state.inmem.commit_lsn,
1158 0 : "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
1159 0 : msg.start_streaming_at,
1160 0 : self.state.inmem.commit_lsn,
1161 0 : self.state.acceptor_state.term,
1162 0 : sk_th,
1163 0 : self.flush_lsn(),
1164 : msg.term_history,
1165 : );
1166 :
1167 : // Before first WAL write initialize its segment. It makes first segment
1168 : // pg_waldump'able because stream from compute doesn't include its
1169 : // segment and page headers.
1170 : //
1171 : // If we fail before first WAL write flush this action would be
1172 : // repeated, that's ok because it is idempotent.
1173 620 : if self.wal_store.flush_lsn() == Lsn::INVALID {
1174 135 : self.wal_store
1175 135 : .initialize_first_segment(msg.start_streaming_at)
1176 135 : .await?;
1177 0 : }
1178 :
1179 : // truncate wal, update the LSNs
1180 620 : self.wal_store.truncate_wal(msg.start_streaming_at).await?;
1181 :
1182 : // and now adopt term history from proposer
1183 : {
1184 620 : let mut state = self.state.start_change();
1185 :
1186 : // Here we learn initial LSN for the first time, set fields
1187 : // interested in that.
1188 :
1189 620 : if let Some(start_lsn) = msg.term_history.0.first() {
1190 620 : if state.timeline_start_lsn == Lsn(0) {
1191 : // Remember point where WAL begins globally. In the future it
1192 : // will be intialized immediately on timeline creation.
1193 135 : state.timeline_start_lsn = start_lsn.lsn;
1194 135 : info!(
1195 0 : "setting timeline_start_lsn to {:?}",
1196 : state.timeline_start_lsn
1197 : );
1198 0 : }
1199 0 : }
1200 :
1201 620 : if state.peer_horizon_lsn == Lsn(0) {
1202 135 : // Update peer_horizon_lsn as soon as we know where timeline starts.
1203 135 : // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
1204 135 : state.peer_horizon_lsn = state.timeline_start_lsn;
1205 135 : }
1206 620 : if state.local_start_lsn == Lsn(0) {
1207 135 : state.local_start_lsn = msg.start_streaming_at;
1208 135 : info!("setting local_start_lsn to {:?}", state.local_start_lsn);
1209 0 : }
1210 : // Initializing commit_lsn before acking first flushed record is
1211 : // important to let find_end_of_wal skip the hole in the beginning
1212 : // of the first segment.
1213 : //
1214 : // NB: on new clusters, this happens at the same time as
1215 : // timeline_start_lsn initialization, it is taken outside to provide
1216 : // upgrade.
1217 620 : state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
1218 620 :
1219 620 : // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
1220 620 : state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
1221 620 : // similar for remote_consistent_lsn
1222 620 : state.remote_consistent_lsn =
1223 620 : max(state.remote_consistent_lsn, state.timeline_start_lsn);
1224 620 :
1225 620 : state.acceptor_state.term_history = msg.term_history.clone();
1226 620 : self.state.finish_change(&state).await?;
1227 : }
1228 :
1229 620 : info!("start receiving WAL since {:?}", msg.start_streaming_at);
1230 :
1231 : // Cache LSN where term starts to immediately fsync control file with
1232 : // commit_lsn once we reach it -- sync-safekeepers finishes when
1233 : // persisted commit_lsn on majority of safekeepers aligns.
1234 620 : self.term_start_lsn = match msg.term_history.0.last() {
1235 0 : None => bail!("proposer elected with empty term history"),
1236 620 : Some(term_lsn_start) => term_lsn_start.lsn,
1237 620 : };
1238 620 :
1239 620 : Ok(None)
1240 8 : }
1241 :
1242 : /// Advance commit_lsn taking into account what we have locally.
1243 : ///
1244 : /// Note: it is assumed that 'WAL we have is from the right term' check has
1245 : /// already been done outside.
1246 2368 : async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
1247 2368 : // Both peers and walproposer communicate this value, we might already
1248 2368 : // have a fresher (higher) version.
1249 2368 : candidate = max(candidate, self.state.inmem.commit_lsn);
1250 2368 : let commit_lsn = min(candidate, self.flush_lsn());
1251 2368 : assert!(
1252 2368 : commit_lsn >= self.state.inmem.commit_lsn,
1253 0 : "commit_lsn monotonicity violated: old={} new={}",
1254 : self.state.inmem.commit_lsn,
1255 : commit_lsn
1256 : );
1257 :
1258 2368 : self.state.inmem.commit_lsn = commit_lsn;
1259 2368 :
1260 2368 : // If new commit_lsn reached term switch, force sync of control
1261 2368 : // file: walproposer in sync mode is very interested when this
1262 2368 : // happens. Note: this is for sync-safekeepers mode only, as
1263 2368 : // otherwise commit_lsn might jump over term_start_lsn.
1264 2368 : if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
1265 93 : self.state.flush().await?;
1266 620 : }
1267 :
1268 2368 : Ok(())
1269 620 : }
1270 :
1271 : /// Handle request to append WAL.
1272 : #[allow(clippy::comparison_chain)]
1273 3374 : async fn handle_append_request(
1274 3374 : &mut self,
1275 3374 : msg: &AppendRequest,
1276 3374 : require_flush: bool,
1277 3374 : ) -> Result<Option<AcceptorProposerMessage>> {
1278 3374 : // Refuse message on generation mismatch. On reconnect wp will get full
1279 3374 : // configuration from greeting.
1280 3374 : if self.state.mconf.generation != msg.h.generation {
1281 1 : bail!(
1282 1 : "refusing append request due to generation mismatch: request {}, sk {}",
1283 1 : msg.h.generation,
1284 1 : self.state.mconf.generation
1285 1 : );
1286 3373 : }
1287 3373 :
1288 3373 : if self.state.acceptor_state.term < msg.h.term {
1289 0 : bail!("got AppendRequest before ProposerElected");
1290 3373 : }
1291 3373 :
1292 3373 : // If our term is higher, immediately refuse the message. Send term only
1293 3373 : // response; elected walproposer can never advance the term, so it will
1294 3373 : // figure out the refusal from it -- which is important as term change
1295 3373 : // should cause not just reconnection but whole walproposer re-election.
1296 3373 : if self.state.acceptor_state.term > msg.h.term {
1297 0 : let resp = AppendResponse::term_only(
1298 0 : self.state.mconf.generation,
1299 0 : self.state.acceptor_state.term,
1300 0 : );
1301 0 : return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
1302 3373 : }
1303 3373 :
1304 3373 : // Disallow any non-sequential writes, which can result in gaps or
1305 3373 : // overwrites. If we need to move the pointer, ProposerElected message
1306 3373 : // should have truncated WAL first accordingly. Note that the first
1307 3373 : // condition (WAL rewrite) is quite expected in real world; it happens
1308 3373 : // when walproposer reconnects to safekeeper and writes some more data
1309 3373 : // while first connection still gets some packets later. It might be
1310 3373 : // better to not log this as error! above.
1311 3373 : let write_lsn = self.wal_store.write_lsn();
1312 3373 : let flush_lsn = self.wal_store.flush_lsn();
1313 3373 : if write_lsn > msg.h.begin_lsn {
1314 1 : bail!(
1315 1 : "append request rewrites WAL written before, write_lsn={}, msg lsn={}",
1316 1 : write_lsn,
1317 1 : msg.h.begin_lsn
1318 1 : );
1319 3372 : }
1320 3372 : if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
1321 0 : bail!(
1322 0 : "append request creates gap in written WAL, write_lsn={}, msg lsn={}",
1323 0 : write_lsn,
1324 0 : msg.h.begin_lsn,
1325 0 : );
1326 3372 : }
1327 3372 :
1328 3372 : // Now we know that we are in the same term as the proposer, process the
1329 3372 : // message.
1330 3372 :
1331 3372 : // do the job
1332 3372 : if !msg.wal_data.is_empty() {
1333 1367 : self.wal_store
1334 1367 : .write_wal(msg.h.begin_lsn, &msg.wal_data)
1335 1367 : .await?;
1336 0 : }
1337 :
1338 : // flush wal to the disk, if required
1339 3372 : if require_flush {
1340 3 : self.wal_store.flush_wal().await?;
1341 620 : }
1342 :
1343 : // Update commit_lsn. It will be flushed to the control file regularly by the timeline
1344 : // manager, off of the WAL ingest hot path.
1345 3372 : if msg.h.commit_lsn != Lsn(0) {
1346 2368 : self.update_commit_lsn(msg.h.commit_lsn).await?;
1347 3 : }
1348 : // Value calculated by walproposer can always lag:
1349 : // - safekeepers can forget inmem value and send to proposer lower
1350 : // persisted one on restart;
1351 : // - if we make safekeepers always send persistent value,
1352 : // any compute restart would pull it down.
1353 : // Thus, take max before adopting.
1354 3372 : self.state.inmem.peer_horizon_lsn =
1355 3372 : max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
1356 3372 :
1357 3372 : trace!(
1358 0 : "processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
1359 0 : msg.wal_data.len(),
1360 : msg.h.begin_lsn,
1361 : msg.h.end_lsn,
1362 : msg.h.commit_lsn,
1363 : msg.h.truncate_lsn,
1364 : require_flush,
1365 : );
1366 :
1367 : // If flush_lsn hasn't updated, AppendResponse is not very useful.
1368 : // This is the common case for !require_flush, but a flush can still
1369 : // happen on segment bounds.
1370 3372 : if !require_flush && flush_lsn == self.flush_lsn() {
1371 3369 : return Ok(None);
1372 3 : }
1373 3 :
1374 3 : let resp = self.append_response();
1375 3 : Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
1376 625 : }
1377 :
1378 : /// Flush WAL to disk. Return AppendResponse with latest LSNs.
1379 2950 : async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
1380 2950 : self.wal_store.flush_wal().await?;
1381 2950 : Ok(Some(AcceptorProposerMessage::AppendResponse(
1382 2950 : self.append_response(),
1383 2950 : )))
1384 620 : }
1385 :
1386 : /// Update commit_lsn from peer safekeeper data.
1387 0 : pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
1388 0 : if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
1389 : // Note: the check is too restrictive, generally we can update local
1390 : // commit_lsn if our history matches (is part of) history of advanced
1391 : // commit_lsn provider.
1392 0 : if sk_info.last_log_term == self.get_last_log_term() {
1393 0 : self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
1394 0 : }
1395 0 : }
1396 0 : Ok(())
1397 0 : }
1398 : }
1399 :
1400 : #[cfg(test)]
1401 : mod tests {
1402 : use std::ops::Deref;
1403 : use std::str::FromStr;
1404 : use std::time::{Instant, UNIX_EPOCH};
1405 :
1406 : use futures::future::BoxFuture;
1407 : use postgres_ffi::{WAL_SEGMENT_SIZE, XLogSegNo};
1408 : use safekeeper_api::ServerInfo;
1409 : use safekeeper_api::membership::{
1410 : Configuration, MemberSet, SafekeeperGeneration, SafekeeperId,
1411 : };
1412 :
1413 : use super::*;
1414 : use crate::state::{EvictionState, TimelinePersistentState};
1415 :
1416 : // fake storage for tests
1417 : struct InMemoryState {
1418 : persisted_state: TimelinePersistentState,
1419 : }
1420 :
1421 : impl control_file::Storage for InMemoryState {
1422 5 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
1423 5 : self.persisted_state = s.clone();
1424 5 : Ok(())
1425 5 : }
1426 :
1427 0 : fn last_persist_at(&self) -> Instant {
1428 0 : Instant::now()
1429 0 : }
1430 : }
1431 :
1432 : impl Deref for InMemoryState {
1433 : type Target = TimelinePersistentState;
1434 :
1435 100 : fn deref(&self) -> &Self::Target {
1436 100 : &self.persisted_state
1437 100 : }
1438 : }
1439 :
1440 3 : fn test_sk_state() -> TimelinePersistentState {
1441 3 : let mut state = TimelinePersistentState::empty();
1442 3 : state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
1443 3 : state.tenant_id = TenantId::from([1u8; 16]);
1444 3 : state.timeline_id = TimelineId::from([1u8; 16]);
1445 3 : state
1446 3 : }
1447 :
1448 : struct DummyWalStore {
1449 : lsn: Lsn,
1450 : }
1451 :
1452 : impl wal_storage::Storage for DummyWalStore {
1453 4 : fn write_lsn(&self) -> Lsn {
1454 4 : self.lsn
1455 4 : }
1456 :
1457 19 : fn flush_lsn(&self) -> Lsn {
1458 19 : self.lsn
1459 19 : }
1460 :
1461 2 : async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
1462 2 : Ok(())
1463 2 : }
1464 :
1465 3 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
1466 3 : self.lsn = startpos + buf.len() as u64;
1467 3 : Ok(())
1468 3 : }
1469 :
1470 2 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
1471 2 : self.lsn = end_pos;
1472 2 : Ok(())
1473 2 : }
1474 :
1475 5 : async fn flush_wal(&mut self) -> Result<()> {
1476 5 : Ok(())
1477 5 : }
1478 :
1479 0 : fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
1480 0 : Box::pin(async { Ok(()) })
1481 0 : }
1482 :
1483 0 : fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
1484 0 : crate::metrics::WalStorageMetrics::default()
1485 0 : }
1486 : }
1487 :
1488 : #[tokio::test]
1489 1 : async fn test_voting() {
1490 1 : let storage = InMemoryState {
1491 1 : persisted_state: test_sk_state(),
1492 1 : };
1493 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1494 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1495 1 :
1496 1 : // Vote with generation mismatch should be rejected.
1497 1 : let gen_mismatch_vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1498 1 : generation: SafekeeperGeneration::new(42),
1499 1 : term: 1,
1500 1 : });
1501 1 : assert!(sk.process_msg(&gen_mismatch_vote_request).await.is_err());
1502 1 :
1503 1 : // check voting for 1 is ok
1504 1 : let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
1505 1 : generation: Generation::new(0),
1506 1 : term: 1,
1507 1 : });
1508 1 : let mut vote_resp = sk.process_msg(&vote_request).await;
1509 1 : match vote_resp.unwrap() {
1510 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
1511 1 : r => panic!("unexpected response: {:?}", r),
1512 1 : }
1513 1 :
1514 1 : // reboot...
1515 1 : let state = sk.state.deref().clone();
1516 1 : let storage = InMemoryState {
1517 1 : persisted_state: state,
1518 1 : };
1519 1 :
1520 1 : sk = SafeKeeper::new(TimelineState::new(storage), sk.wal_store, NodeId(0)).unwrap();
1521 1 :
1522 1 : // and ensure voting second time for 1 is not ok
1523 1 : vote_resp = sk.process_msg(&vote_request).await;
1524 1 : match vote_resp.unwrap() {
1525 1 : Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
1526 1 : r => panic!("unexpected response: {:?}", r),
1527 1 : }
1528 1 : }
1529 :
1530 : #[tokio::test]
1531 1 : async fn test_last_log_term_switch() {
1532 1 : let storage = InMemoryState {
1533 1 : persisted_state: test_sk_state(),
1534 1 : };
1535 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1536 1 :
1537 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1538 1 :
1539 1 : let mut ar_hdr = AppendRequestHeader {
1540 1 : generation: Generation::new(0),
1541 1 : term: 2,
1542 1 : begin_lsn: Lsn(1),
1543 1 : end_lsn: Lsn(2),
1544 1 : commit_lsn: Lsn(0),
1545 1 : truncate_lsn: Lsn(0),
1546 1 : };
1547 1 : let mut append_request = AppendRequest {
1548 1 : h: ar_hdr.clone(),
1549 1 : wal_data: Bytes::from_static(b"b"),
1550 1 : };
1551 1 :
1552 1 : let pem = ProposerElected {
1553 1 : generation: Generation::new(0),
1554 1 : term: 2,
1555 1 : start_streaming_at: Lsn(1),
1556 1 : term_history: TermHistory(vec![
1557 1 : TermLsn {
1558 1 : term: 1,
1559 1 : lsn: Lsn(1),
1560 1 : },
1561 1 : TermLsn {
1562 1 : term: 2,
1563 1 : lsn: Lsn(3),
1564 1 : },
1565 1 : ]),
1566 1 : };
1567 1 :
1568 1 : // check that elected msg with generation mismatch is rejected
1569 1 : let mut pem_gen_mismatch = pem.clone();
1570 1 : pem_gen_mismatch.generation = SafekeeperGeneration::new(42);
1571 1 : assert!(
1572 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem_gen_mismatch))
1573 1 : .await
1574 1 : .is_err()
1575 1 : );
1576 1 :
1577 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1578 1 : .await
1579 1 : .unwrap();
1580 1 :
1581 1 : // check that AppendRequest before term_start_lsn doesn't switch last_log_term.
1582 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1583 1 : .await
1584 1 : .unwrap();
1585 1 : assert_eq!(sk.get_last_log_term(), 1);
1586 1 :
1587 1 : // but record at term_start_lsn does the switch
1588 1 : ar_hdr.begin_lsn = Lsn(2);
1589 1 : ar_hdr.end_lsn = Lsn(3);
1590 1 : append_request = AppendRequest {
1591 1 : h: ar_hdr,
1592 1 : wal_data: Bytes::from_static(b"b"),
1593 1 : };
1594 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1595 1 : .await
1596 1 : .unwrap();
1597 1 : assert_eq!(sk.get_last_log_term(), 2);
1598 1 : }
1599 :
1600 : #[tokio::test]
1601 1 : async fn test_non_consecutive_write() {
1602 1 : let storage = InMemoryState {
1603 1 : persisted_state: test_sk_state(),
1604 1 : };
1605 1 : let wal_store = DummyWalStore { lsn: Lsn(0) };
1606 1 :
1607 1 : let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
1608 1 :
1609 1 : let pem = ProposerElected {
1610 1 : generation: Generation::new(0),
1611 1 : term: 1,
1612 1 : start_streaming_at: Lsn(1),
1613 1 : term_history: TermHistory(vec![TermLsn {
1614 1 : term: 1,
1615 1 : lsn: Lsn(1),
1616 1 : }]),
1617 1 : };
1618 1 : sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
1619 1 : .await
1620 1 : .unwrap();
1621 1 :
1622 1 : let ar_hdr = AppendRequestHeader {
1623 1 : generation: Generation::new(0),
1624 1 : term: 1,
1625 1 : begin_lsn: Lsn(1),
1626 1 : end_lsn: Lsn(2),
1627 1 : commit_lsn: Lsn(0),
1628 1 : truncate_lsn: Lsn(0),
1629 1 : };
1630 1 : let append_request = AppendRequest {
1631 1 : h: ar_hdr.clone(),
1632 1 : wal_data: Bytes::from_static(b"b"),
1633 1 : };
1634 1 :
1635 1 : // check that append request with generation mismatch is rejected
1636 1 : let mut ar_hdr_gen_mismatch = ar_hdr.clone();
1637 1 : ar_hdr_gen_mismatch.generation = SafekeeperGeneration::new(42);
1638 1 : let append_request_gen_mismatch = AppendRequest {
1639 1 : h: ar_hdr_gen_mismatch,
1640 1 : wal_data: Bytes::from_static(b"b"),
1641 1 : };
1642 1 : assert!(
1643 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(
1644 1 : append_request_gen_mismatch
1645 1 : ))
1646 1 : .await
1647 1 : .is_err()
1648 1 : );
1649 1 :
1650 1 : // do write ending at 2, it should be ok
1651 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1652 1 : .await
1653 1 : .unwrap();
1654 1 : let mut ar_hrd2 = ar_hdr.clone();
1655 1 : ar_hrd2.begin_lsn = Lsn(4);
1656 1 : ar_hrd2.end_lsn = Lsn(5);
1657 1 : let append_request = AppendRequest {
1658 1 : h: ar_hdr,
1659 1 : wal_data: Bytes::from_static(b"b"),
1660 1 : };
1661 1 : // and now starting at 4, it must fail
1662 1 : sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
1663 1 : .await
1664 1 : .unwrap_err();
1665 1 : }
1666 :
1667 : #[test]
1668 1 : fn test_find_highest_common_point_none() {
1669 1 : let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
1670 1 : let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
1671 1 : assert_eq!(
1672 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
1673 1 : None
1674 1 : );
1675 1 : }
1676 :
1677 : #[test]
1678 1 : fn test_find_highest_common_point_middle() {
1679 1 : let prop_th = TermHistory(vec![
1680 1 : (1, Lsn(10)).into(),
1681 1 : (2, Lsn(20)).into(),
1682 1 : (4, Lsn(40)).into(),
1683 1 : ]);
1684 1 : let sk_th = TermHistory(vec![
1685 1 : (1, Lsn(10)).into(),
1686 1 : (2, Lsn(20)).into(),
1687 1 : (3, Lsn(30)).into(), // sk ends last common term 2 at 30
1688 1 : ]);
1689 1 : assert_eq!(
1690 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
1691 1 : Some(TermLsn {
1692 1 : term: 2,
1693 1 : lsn: Lsn(30),
1694 1 : })
1695 1 : );
1696 1 : }
1697 :
1698 : #[test]
1699 1 : fn test_find_highest_common_point_sk_end() {
1700 1 : let prop_th = TermHistory(vec![
1701 1 : (1, Lsn(10)).into(),
1702 1 : (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
1703 1 : (4, Lsn(40)).into(),
1704 1 : ]);
1705 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1706 1 : assert_eq!(
1707 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1708 1 : Some(TermLsn {
1709 1 : term: 2,
1710 1 : lsn: Lsn(32),
1711 1 : })
1712 1 : );
1713 1 : }
1714 :
1715 : #[test]
1716 1 : fn test_find_highest_common_point_walprop() {
1717 1 : let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1718 1 : let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
1719 1 : assert_eq!(
1720 1 : TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
1721 1 : Some(TermLsn {
1722 1 : term: 2,
1723 1 : lsn: Lsn(32),
1724 1 : })
1725 1 : );
1726 1 : }
1727 :
1728 : #[test]
1729 1 : fn test_sk_state_bincode_serde_roundtrip() {
1730 1 : let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
1731 1 : let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
1732 1 : let state = TimelinePersistentState {
1733 1 : tenant_id,
1734 1 : timeline_id,
1735 1 : mconf: Configuration {
1736 1 : generation: SafekeeperGeneration::new(42),
1737 1 : members: MemberSet::new(vec![SafekeeperId {
1738 1 : id: NodeId(1),
1739 1 : host: "hehe.org".to_owned(),
1740 1 : pg_port: 5432,
1741 1 : }])
1742 1 : .expect("duplicate member"),
1743 1 : new_members: None,
1744 1 : },
1745 1 : acceptor_state: AcceptorState {
1746 1 : term: 42,
1747 1 : term_history: TermHistory(vec![TermLsn {
1748 1 : lsn: Lsn(0x1),
1749 1 : term: 41,
1750 1 : }]),
1751 1 : },
1752 1 : server: ServerInfo {
1753 1 : pg_version: 14,
1754 1 : system_id: 0x1234567887654321,
1755 1 : wal_seg_size: 0x12345678,
1756 1 : },
1757 1 : proposer_uuid: {
1758 1 : let mut arr = timeline_id.as_arr();
1759 1 : arr.reverse();
1760 1 : arr
1761 1 : },
1762 1 : timeline_start_lsn: Lsn(0x12345600),
1763 1 : local_start_lsn: Lsn(0x12),
1764 1 : commit_lsn: Lsn(1234567800),
1765 1 : backup_lsn: Lsn(1234567300),
1766 1 : peer_horizon_lsn: Lsn(9999999),
1767 1 : remote_consistent_lsn: Lsn(1234560000),
1768 1 : partial_backup: crate::wal_backup_partial::State::default(),
1769 1 : eviction_state: EvictionState::Present,
1770 1 : creation_ts: UNIX_EPOCH,
1771 1 : };
1772 1 :
1773 1 : let ser = state.ser().unwrap();
1774 1 :
1775 1 : let deser = TimelinePersistentState::des(&ser).unwrap();
1776 1 :
1777 1 : assert_eq!(deser, state);
1778 1 : }
1779 : }
|