LCOV - differential code coverage report
Current view: top level - safekeeper/src - control_file_upgrade.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 131 0 131
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 110 0 110
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Code to deal with safekeeper control file upgrades
       2                 : use crate::safekeeper::{
       3                 :     AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
       4                 : };
       5                 : use anyhow::{bail, Result};
       6                 : use pq_proto::SystemId;
       7                 : use serde::{Deserialize, Serialize};
       8                 : use tracing::*;
       9                 : use utils::{
      10                 :     bin_ser::LeSer,
      11                 :     id::{TenantId, TimelineId},
      12                 :     lsn::Lsn,
      13                 : };
      14                 : 
      15                 : /// Persistent consensus state of the acceptor.
      16 UBC           0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      17                 : struct AcceptorStateV1 {
      18                 :     /// acceptor's last term it voted for (advanced in 1 phase)
      19                 :     term: Term,
      20                 :     /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
      21                 :     epoch: Term,
      22                 : }
      23                 : 
      24               0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      25                 : struct SafeKeeperStateV1 {
      26                 :     /// persistent acceptor state
      27                 :     acceptor_state: AcceptorStateV1,
      28                 :     /// information about server
      29                 :     server: ServerInfoV2,
      30                 :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      31                 :     /// for correctness, exists for monitoring purposes.
      32                 :     proposer_uuid: PgUuid,
      33                 :     /// part of WAL acknowledged by quorum and available locally
      34                 :     commit_lsn: Lsn,
      35                 :     /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      36                 :     /// of last record streamed to everyone)
      37                 :     truncate_lsn: Lsn,
      38                 :     // Safekeeper starts receiving WAL from this LSN, zeros before it ought to
      39                 :     // be skipped during decoding.
      40                 :     wal_start_lsn: Lsn,
      41                 : }
      42                 : 
      43               0 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
      44                 : pub struct ServerInfoV2 {
      45                 :     /// Postgres server version
      46                 :     pub pg_version: u32,
      47                 :     pub system_id: SystemId,
      48                 :     pub tenant_id: TenantId,
      49                 :     pub timeline_id: TimelineId,
      50                 :     pub wal_seg_size: u32,
      51                 : }
      52                 : 
      53               0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      54                 : pub struct SafeKeeperStateV2 {
      55                 :     /// persistent acceptor state
      56                 :     pub acceptor_state: AcceptorState,
      57                 :     /// information about server
      58                 :     pub server: ServerInfoV2,
      59                 :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      60                 :     /// for correctness, exists for monitoring purposes.
      61                 :     pub proposer_uuid: PgUuid,
      62                 :     /// part of WAL acknowledged by quorum and available locally
      63                 :     pub commit_lsn: Lsn,
      64                 :     /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      65                 :     /// of last record streamed to everyone)
      66                 :     pub truncate_lsn: Lsn,
      67                 :     // Safekeeper starts receiving WAL from this LSN, zeros before it ought to
      68                 :     // be skipped during decoding.
      69                 :     pub wal_start_lsn: Lsn,
      70                 : }
      71                 : 
      72               0 : #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
      73                 : pub struct ServerInfoV3 {
      74                 :     /// Postgres server version
      75                 :     pub pg_version: u32,
      76                 :     pub system_id: SystemId,
      77                 :     #[serde(with = "hex")]
      78                 :     pub tenant_id: TenantId,
      79                 :     #[serde(with = "hex")]
      80                 :     pub timeline_id: TimelineId,
      81                 :     pub wal_seg_size: u32,
      82                 : }
      83                 : 
      84               0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      85                 : pub struct SafeKeeperStateV3 {
      86                 :     /// persistent acceptor state
      87                 :     pub acceptor_state: AcceptorState,
      88                 :     /// information about server
      89                 :     pub server: ServerInfoV3,
      90                 :     /// Unique id of the last *elected* proposer we dealt with. Not needed
      91                 :     /// for correctness, exists for monitoring purposes.
      92                 :     #[serde(with = "hex")]
      93                 :     pub proposer_uuid: PgUuid,
      94                 :     /// part of WAL acknowledged by quorum and available locally
      95                 :     pub commit_lsn: Lsn,
      96                 :     /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
      97                 :     /// of last record streamed to everyone)
      98                 :     pub truncate_lsn: Lsn,
      99                 :     // Safekeeper starts receiving WAL from this LSN, zeros before it ought to
     100                 :     // be skipped during decoding.
     101                 :     pub wal_start_lsn: Lsn,
     102                 : }
     103                 : 
     104               0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     105                 : pub struct SafeKeeperStateV4 {
     106                 :     #[serde(with = "hex")]
     107                 :     pub tenant_id: TenantId,
     108                 :     #[serde(with = "hex")]
     109                 :     pub timeline_id: TimelineId,
     110                 :     /// persistent acceptor state
     111                 :     pub acceptor_state: AcceptorState,
     112                 :     /// information about server
     113                 :     pub server: ServerInfo,
     114                 :     /// Unique id of the last *elected* proposer we dealt with. Not needed
     115                 :     /// for correctness, exists for monitoring purposes.
     116                 :     #[serde(with = "hex")]
     117                 :     pub proposer_uuid: PgUuid,
     118                 :     /// Part of WAL acknowledged by quorum and available locally. Always points
     119                 :     /// to record boundary.
     120                 :     pub commit_lsn: Lsn,
     121                 :     /// First LSN not yet offloaded to s3. Useful to persist to avoid finding
     122                 :     /// out offloading progress on boot.
     123                 :     pub s3_wal_lsn: Lsn,
     124                 :     /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
     125                 :     /// of last record streamed to everyone). Persisting it helps skipping
     126                 :     /// recovery in walproposer, generally we compute it from peers. In
     127                 :     /// walproposer proto called 'truncate_lsn'.
     128                 :     pub peer_horizon_lsn: Lsn,
     129                 :     /// LSN of the oldest known checkpoint made by pageserver and successfully
     130                 :     /// pushed to s3. We don't remove WAL beyond it. Persisted only for
     131                 :     /// informational purposes, we receive it from pageserver (or broker).
     132                 :     pub remote_consistent_lsn: Lsn,
     133                 :     // Peers and their state as we remember it. Knowing peers themselves is
     134                 :     // fundamental; but state is saved here only for informational purposes and
     135                 :     // obviously can be stale. (Currently not saved at all, but let's provision
     136                 :     // place to have less file version upgrades).
     137                 :     pub peers: PersistedPeers,
     138                 : }
     139                 : 
     140               0 : pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
     141               0 :     // migrate to storing full term history
     142               0 :     if version == 1 {
     143               0 :         info!("reading safekeeper control file version {}", version);
     144               0 :         let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
     145               0 :         let ac = AcceptorState {
     146               0 :             term: oldstate.acceptor_state.term,
     147               0 :             term_history: TermHistory(vec![TermLsn {
     148               0 :                 term: oldstate.acceptor_state.epoch,
     149               0 :                 lsn: Lsn(0),
     150               0 :             }]),
     151               0 :         };
     152               0 :         return Ok(SafeKeeperState {
     153               0 :             tenant_id: oldstate.server.tenant_id,
     154               0 :             timeline_id: oldstate.server.timeline_id,
     155               0 :             acceptor_state: ac,
     156               0 :             server: ServerInfo {
     157               0 :                 pg_version: oldstate.server.pg_version,
     158               0 :                 system_id: oldstate.server.system_id,
     159               0 :                 wal_seg_size: oldstate.server.wal_seg_size,
     160               0 :             },
     161               0 :             proposer_uuid: oldstate.proposer_uuid,
     162               0 :             timeline_start_lsn: Lsn(0),
     163               0 :             local_start_lsn: Lsn(0),
     164               0 :             commit_lsn: oldstate.commit_lsn,
     165               0 :             backup_lsn: Lsn(0),
     166               0 :             peer_horizon_lsn: oldstate.truncate_lsn,
     167               0 :             remote_consistent_lsn: Lsn(0),
     168               0 :             peers: PersistedPeers(vec![]),
     169               0 :         });
     170                 :     // migrate to hexing some ids
     171               0 :     } else if version == 2 {
     172               0 :         info!("reading safekeeper control file version {}", version);
     173               0 :         let oldstate = SafeKeeperStateV2::des(&buf[..buf.len()])?;
     174               0 :         let server = ServerInfo {
     175               0 :             pg_version: oldstate.server.pg_version,
     176               0 :             system_id: oldstate.server.system_id,
     177               0 :             wal_seg_size: oldstate.server.wal_seg_size,
     178               0 :         };
     179               0 :         return Ok(SafeKeeperState {
     180               0 :             tenant_id: oldstate.server.tenant_id,
     181               0 :             timeline_id: oldstate.server.timeline_id,
     182               0 :             acceptor_state: oldstate.acceptor_state,
     183               0 :             server,
     184               0 :             proposer_uuid: oldstate.proposer_uuid,
     185               0 :             timeline_start_lsn: Lsn(0),
     186               0 :             local_start_lsn: Lsn(0),
     187               0 :             commit_lsn: oldstate.commit_lsn,
     188               0 :             backup_lsn: Lsn(0),
     189               0 :             peer_horizon_lsn: oldstate.truncate_lsn,
     190               0 :             remote_consistent_lsn: Lsn(0),
     191               0 :             peers: PersistedPeers(vec![]),
     192               0 :         });
     193                 :     // migrate to moving tenant_id/timeline_id to the top and adding some lsns
     194               0 :     } else if version == 3 {
     195               0 :         info!("reading safekeeper control file version {version}");
     196               0 :         let oldstate = SafeKeeperStateV3::des(&buf[..buf.len()])?;
     197               0 :         let server = ServerInfo {
     198               0 :             pg_version: oldstate.server.pg_version,
     199               0 :             system_id: oldstate.server.system_id,
     200               0 :             wal_seg_size: oldstate.server.wal_seg_size,
     201               0 :         };
     202               0 :         return Ok(SafeKeeperState {
     203               0 :             tenant_id: oldstate.server.tenant_id,
     204               0 :             timeline_id: oldstate.server.timeline_id,
     205               0 :             acceptor_state: oldstate.acceptor_state,
     206               0 :             server,
     207               0 :             proposer_uuid: oldstate.proposer_uuid,
     208               0 :             timeline_start_lsn: Lsn(0),
     209               0 :             local_start_lsn: Lsn(0),
     210               0 :             commit_lsn: oldstate.commit_lsn,
     211               0 :             backup_lsn: Lsn(0),
     212               0 :             peer_horizon_lsn: oldstate.truncate_lsn,
     213               0 :             remote_consistent_lsn: Lsn(0),
     214               0 :             peers: PersistedPeers(vec![]),
     215               0 :         });
     216                 :     // migrate to having timeline_start_lsn
     217               0 :     } else if version == 4 {
     218               0 :         info!("reading safekeeper control file version {}", version);
     219               0 :         let oldstate = SafeKeeperStateV4::des(&buf[..buf.len()])?;
     220               0 :         let server = ServerInfo {
     221               0 :             pg_version: oldstate.server.pg_version,
     222               0 :             system_id: oldstate.server.system_id,
     223               0 :             wal_seg_size: oldstate.server.wal_seg_size,
     224               0 :         };
     225               0 :         return Ok(SafeKeeperState {
     226               0 :             tenant_id: oldstate.tenant_id,
     227               0 :             timeline_id: oldstate.timeline_id,
     228               0 :             acceptor_state: oldstate.acceptor_state,
     229               0 :             server,
     230               0 :             proposer_uuid: oldstate.proposer_uuid,
     231               0 :             timeline_start_lsn: Lsn(0),
     232               0 :             local_start_lsn: Lsn(0),
     233               0 :             commit_lsn: oldstate.commit_lsn,
     234               0 :             backup_lsn: Lsn::INVALID,
     235               0 :             peer_horizon_lsn: oldstate.peer_horizon_lsn,
     236               0 :             remote_consistent_lsn: Lsn(0),
     237               0 :             peers: PersistedPeers(vec![]),
     238               0 :         });
     239               0 :     } else if version == 5 {
     240               0 :         info!("reading safekeeper control file version {}", version);
     241               0 :         let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
     242               0 :         if oldstate.timeline_start_lsn != Lsn(0) {
     243               0 :             return Ok(oldstate);
     244               0 :         }
     245               0 : 
     246               0 :         // set special timeline_start_lsn because we don't know the real one
     247               0 :         info!("setting timeline_start_lsn and local_start_lsn to Lsn(1)");
     248               0 :         oldstate.timeline_start_lsn = Lsn(1);
     249               0 :         oldstate.local_start_lsn = Lsn(1);
     250               0 : 
     251               0 :         return Ok(oldstate);
     252               0 :     } else if version == 6 {
     253               0 :         info!("reading safekeeper control file version {}", version);
     254               0 :         let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
     255               0 :         if oldstate.server.pg_version != 0 {
     256               0 :             return Ok(oldstate);
     257               0 :         }
     258               0 : 
     259               0 :         // set pg_version to the default v14
     260               0 :         info!("setting pg_version to 140005");
     261               0 :         oldstate.server.pg_version = 140005;
     262               0 : 
     263               0 :         return Ok(oldstate);
     264               0 :     }
     265               0 :     bail!("unsupported safekeeper control file version {}", version)
     266               0 : }
        

Generated by: LCOV version 2.1-beta