LCOV - code coverage report
Current view: top level - safekeeper/src - control_file_upgrade.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 0.0 % 131 0
Test Date: 2023-09-06 10:18:01 Functions: 0.0 % 110 0

            Line data    Source code
       1              : //! Code to deal with safekeeper control file upgrades
       2              : use crate::safekeeper::{
       3              :     AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
       4              : };
       5              : use anyhow::{bail, Result};
       6              : use pq_proto::SystemId;
       7              : use serde::{Deserialize, Serialize};
       8              : use tracing::*;
       9              : use utils::{
      10              :     bin_ser::LeSer,
      11              :     id::{TenantId, TimelineId},
      12              :     lsn::Lsn,
      13              : };
      14              : 
      15              : /// Persistent consensus state of the acceptor.
      16            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      17              : struct AcceptorStateV1 {
      18              :     /// acceptor's last term it voted for (advanced in 1 phase)
      19              :     term: Term,
      20              :     /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
      21              :     epoch: Term,
      22              : }
      23              : 
      24            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      25              : struct SafeKeeperStateV1 {
      26              :     /// persistent acceptor state
      27              :     acceptor_state: AcceptorStateV1,
      28              :     /// information about server
      29              :     server: ServerInfoV2,
      30              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      31              :     /// for correctness, exists for monitoring purposes.
      32              :     proposer_uuid: PgUuid,
      33              :     /// part of WAL acknowledged by quorum and available locally
      34              :     commit_lsn: Lsn,
      35              :     /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      36              :     /// of last record streamed to everyone)
      37              :     truncate_lsn: Lsn,
      38              :     // Safekeeper starts receiving WAL from this LSN, zeros before it ought to
      39              :     // be skipped during decoding.
      40              :     wal_start_lsn: Lsn,
      41              : }
      42              : 
      43            0 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
      44              : pub struct ServerInfoV2 {
      45              :     /// Postgres server version
      46              :     pub pg_version: u32,
      47              :     pub system_id: SystemId,
      48              :     pub tenant_id: TenantId,
      49              :     pub timeline_id: TimelineId,
      50              :     pub wal_seg_size: u32,
      51              : }
      52              : 
      53            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      54              : pub struct SafeKeeperStateV2 {
      55              :     /// persistent acceptor state
      56              :     pub acceptor_state: AcceptorState,
      57              :     /// information about server
      58              :     pub server: ServerInfoV2,
      59              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      60              :     /// for correctness, exists for monitoring purposes.
      61              :     pub proposer_uuid: PgUuid,
      62              :     /// part of WAL acknowledged by quorum and available locally
      63              :     pub commit_lsn: Lsn,
      64              :     /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      65              :     /// of last record streamed to everyone)
      66              :     pub truncate_lsn: Lsn,
      67              :     // Safekeeper starts receiving WAL from this LSN, zeros before it ought to
      68              :     // be skipped during decoding.
      69              :     pub wal_start_lsn: Lsn,
      70              : }
      71              : 
      72            0 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
      73              : pub struct ServerInfoV3 {
      74              :     /// Postgres server version
      75              :     pub pg_version: u32,
      76              :     pub system_id: SystemId,
      77              :     #[serde(with = "hex")]
      78              :     pub tenant_id: TenantId,
      79              :     #[serde(with = "hex")]
      80              :     pub timeline_id: TimelineId,
      81              :     pub wal_seg_size: u32,
      82              : }
      83              : 
      84            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      85              : pub struct SafeKeeperStateV3 {
      86              :     /// persistent acceptor state
      87              :     pub acceptor_state: AcceptorState,
      88              :     /// information about server
      89              :     pub server: ServerInfoV3,
      90              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      91              :     /// for correctness, exists for monitoring purposes.
      92              :     #[serde(with = "hex")]
      93              :     pub proposer_uuid: PgUuid,
      94              :     /// part of WAL acknowledged by quorum and available locally
      95              :     pub commit_lsn: Lsn,
      96              :     /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      97              :     /// of last record streamed to everyone)
      98              :     pub truncate_lsn: Lsn,
      99              :     // Safekeeper starts receiving WAL from this LSN, zeros before it ought to
     100              :     // be skipped during decoding.
     101              :     pub wal_start_lsn: Lsn,
     102              : }
     103              : 
     104            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     105              : pub struct SafeKeeperStateV4 {
     106              :     #[serde(with = "hex")]
     107              :     pub tenant_id: TenantId,
     108              :     #[serde(with = "hex")]
     109              :     pub timeline_id: TimelineId,
     110              :     /// persistent acceptor state
     111              :     pub acceptor_state: AcceptorState,
     112              :     /// information about server
     113              :     pub server: ServerInfo,
     114              :     /// Unique id of the last *elected* proposer we dealt with. Not needed
     115              :     /// for correctness, exists for monitoring purposes.
     116              :     #[serde(with = "hex")]
     117              :     pub proposer_uuid: PgUuid,
     118              :     /// Part of WAL acknowledged by quorum and available locally. Always points
     119              :     /// to record boundary.
     120              :     pub commit_lsn: Lsn,
     121              :     /// First LSN not yet offloaded to s3. Useful to persist to avoid finding
     122              :     /// out offloading progress on boot.
     123              :     pub s3_wal_lsn: Lsn,
     124              :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
     125              :     /// of last record streamed to everyone). Persisting it helps skipping
     126              :     /// recovery in walproposer, generally we compute it from peers. In
     127              :     /// walproposer proto called 'truncate_lsn'.
     128              :     pub peer_horizon_lsn: Lsn,
     129              :     /// LSN of the oldest known checkpoint made by pageserver and successfully
     130              :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
     131              :     /// informational purposes, we receive it from pageserver (or broker).
     132              :     pub remote_consistent_lsn: Lsn,
     133              :     // Peers and their state as we remember it. Knowing peers themselves is
     134              :     // fundamental; but state is saved here only for informational purposes and
     135              :     // obviously can be stale. (Currently not saved at all, but let's provision
     136              :     // place to have less file version upgrades).
     137              :     pub peers: PersistedPeers,
     138              : }
     139              : 
     140            0 : pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
     141            0 :     // migrate to storing full term history
     142            0 :     if version == 1 {
     143            0 :         info!("reading safekeeper control file version {}", version);
     144            0 :         let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
     145            0 :         let ac = AcceptorState {
     146            0 :             term: oldstate.acceptor_state.term,
     147            0 :             term_history: TermHistory(vec![TermLsn {
     148            0 :                 term: oldstate.acceptor_state.epoch,
     149            0 :                 lsn: Lsn(0),
     150            0 :             }]),
     151            0 :         };
     152            0 :         return Ok(SafeKeeperState {
     153            0 :             tenant_id: oldstate.server.tenant_id,
     154            0 :             timeline_id: oldstate.server.timeline_id,
     155            0 :             acceptor_state: ac,
     156            0 :             server: ServerInfo {
     157            0 :                 pg_version: oldstate.server.pg_version,
     158            0 :                 system_id: oldstate.server.system_id,
     159            0 :                 wal_seg_size: oldstate.server.wal_seg_size,
     160            0 :             },
     161            0 :             proposer_uuid: oldstate.proposer_uuid,
     162            0 :             timeline_start_lsn: Lsn(0),
     163            0 :             local_start_lsn: Lsn(0),
     164            0 :             commit_lsn: oldstate.commit_lsn,
     165            0 :             backup_lsn: Lsn(0),
     166            0 :             peer_horizon_lsn: oldstate.truncate_lsn,
     167            0 :             remote_consistent_lsn: Lsn(0),
     168            0 :             peers: PersistedPeers(vec![]),
     169            0 :         });
     170              :     // migrate to hexing some ids
     171            0 :     } else if version == 2 {
     172            0 :         info!("reading safekeeper control file version {}", version);
     173            0 :         let oldstate = SafeKeeperStateV2::des(&buf[..buf.len()])?;
     174            0 :         let server = ServerInfo {
     175            0 :             pg_version: oldstate.server.pg_version,
     176            0 :             system_id: oldstate.server.system_id,
     177            0 :             wal_seg_size: oldstate.server.wal_seg_size,
     178            0 :         };
     179            0 :         return Ok(SafeKeeperState {
     180            0 :             tenant_id: oldstate.server.tenant_id,
     181            0 :             timeline_id: oldstate.server.timeline_id,
     182            0 :             acceptor_state: oldstate.acceptor_state,
     183            0 :             server,
     184            0 :             proposer_uuid: oldstate.proposer_uuid,
     185            0 :             timeline_start_lsn: Lsn(0),
     186            0 :             local_start_lsn: Lsn(0),
     187            0 :             commit_lsn: oldstate.commit_lsn,
     188            0 :             backup_lsn: Lsn(0),
     189            0 :             peer_horizon_lsn: oldstate.truncate_lsn,
     190            0 :             remote_consistent_lsn: Lsn(0),
     191            0 :             peers: PersistedPeers(vec![]),
     192            0 :         });
     193              :     // migrate to moving tenant_id/timeline_id to the top and adding some lsns
     194            0 :     } else if version == 3 {
     195            0 :         info!("reading safekeeper control file version {version}");
     196            0 :         let oldstate = SafeKeeperStateV3::des(&buf[..buf.len()])?;
     197            0 :         let server = ServerInfo {
     198            0 :             pg_version: oldstate.server.pg_version,
     199            0 :             system_id: oldstate.server.system_id,
     200            0 :             wal_seg_size: oldstate.server.wal_seg_size,
     201            0 :         };
     202            0 :         return Ok(SafeKeeperState {
     203            0 :             tenant_id: oldstate.server.tenant_id,
     204            0 :             timeline_id: oldstate.server.timeline_id,
     205            0 :             acceptor_state: oldstate.acceptor_state,
     206            0 :             server,
     207            0 :             proposer_uuid: oldstate.proposer_uuid,
     208            0 :             timeline_start_lsn: Lsn(0),
     209            0 :             local_start_lsn: Lsn(0),
     210            0 :             commit_lsn: oldstate.commit_lsn,
     211            0 :             backup_lsn: Lsn(0),
     212            0 :             peer_horizon_lsn: oldstate.truncate_lsn,
     213            0 :             remote_consistent_lsn: Lsn(0),
     214            0 :             peers: PersistedPeers(vec![]),
     215            0 :         });
     216              :     // migrate to having timeline_start_lsn
     217            0 :     } else if version == 4 {
     218            0 :         info!("reading safekeeper control file version {}", version);
     219            0 :         let oldstate = SafeKeeperStateV4::des(&buf[..buf.len()])?;
     220            0 :         let server = ServerInfo {
     221            0 :             pg_version: oldstate.server.pg_version,
     222            0 :             system_id: oldstate.server.system_id,
     223            0 :             wal_seg_size: oldstate.server.wal_seg_size,
     224            0 :         };
     225            0 :         return Ok(SafeKeeperState {
     226            0 :             tenant_id: oldstate.tenant_id,
     227            0 :             timeline_id: oldstate.timeline_id,
     228            0 :             acceptor_state: oldstate.acceptor_state,
     229            0 :             server,
     230            0 :             proposer_uuid: oldstate.proposer_uuid,
     231            0 :             timeline_start_lsn: Lsn(0),
     232            0 :             local_start_lsn: Lsn(0),
     233            0 :             commit_lsn: oldstate.commit_lsn,
     234            0 :             backup_lsn: Lsn::INVALID,
     235            0 :             peer_horizon_lsn: oldstate.peer_horizon_lsn,
     236            0 :             remote_consistent_lsn: Lsn(0),
     237            0 :             peers: PersistedPeers(vec![]),
     238            0 :         });
     239            0 :     } else if version == 5 {
     240            0 :         info!("reading safekeeper control file version {}", version);
     241            0 :         let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
     242            0 :         if oldstate.timeline_start_lsn != Lsn(0) {
     243            0 :             return Ok(oldstate);
     244            0 :         }
     245            0 : 
     246            0 :         // set special timeline_start_lsn because we don't know the real one
     247            0 :         info!("setting timeline_start_lsn and local_start_lsn to Lsn(1)");
     248            0 :         oldstate.timeline_start_lsn = Lsn(1);
     249            0 :         oldstate.local_start_lsn = Lsn(1);
     250            0 : 
     251            0 :         return Ok(oldstate);
     252            0 :     } else if version == 6 {
     253            0 :         info!("reading safekeeper control file version {}", version);
     254            0 :         let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
     255            0 :         if oldstate.server.pg_version != 0 {
     256            0 :             return Ok(oldstate);
     257            0 :         }
     258            0 : 
     259            0 :         // set pg_version to the default v14
     260            0 :         info!("setting pg_version to 140005");
     261            0 :         oldstate.server.pg_version = 140005;
     262            0 : 
     263            0 :         return Ok(oldstate);
     264            0 :     }
     265            0 :     bail!("unsupported safekeeper control file version {}", version)
     266            0 : }
        

Generated by: LCOV version 2.1-beta