LCOV - differential code coverage report
Current view: top level - safekeeper/src - json_ctrl.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 96.8 % 95 92 3 92
Current Date: 2023-10-19 02:04:12 Functions: 46.9 % 32 15 17 15
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! This module implements JSON_CTRL protocol, which allows exchange
       3                 : //! JSON messages over psql for testing purposes.
       4                 : //!
       5                 : //! Currently supports AppendLogicalMessage, which is used for WAL
       6                 : //! modifications in tests.
       7                 : //!
       8                 : 
       9                 : use std::sync::Arc;
      10                 : 
      11                 : use anyhow::Context;
      12                 : use bytes::Bytes;
      13                 : use postgres_backend::QueryError;
      14                 : use serde::{Deserialize, Serialize};
      15                 : use tokio::io::{AsyncRead, AsyncWrite};
      16                 : use tracing::*;
      17                 : use utils::id::TenantTimelineId;
      18                 : 
      19                 : use crate::handler::SafekeeperPostgresHandler;
      20                 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
      21                 : use crate::safekeeper::{
      22                 :     AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
      23                 : };
      24                 : use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
      25                 : use crate::timeline::Timeline;
      26                 : use crate::GlobalTimelines;
      27                 : use postgres_backend::PostgresBackend;
      28                 : use postgres_ffi::encode_logical_message;
      29                 : use postgres_ffi::WAL_SEGMENT_SIZE;
      30                 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
      31                 : use utils::lsn::Lsn;
      32                 : 
      33 CBC          57 : #[derive(Serialize, Deserialize, Debug)]
      34                 : pub struct AppendLogicalMessage {
      35                 :     // prefix and message to build LogicalMessage
      36                 :     pub lm_prefix: String,
      37                 :     pub lm_message: String,
      38                 : 
      39                 :     // if true, commit_lsn will match flush_lsn after append
      40                 :     pub set_commit_lsn: bool,
      41                 : 
      42                 :     // if true, ProposerElected will be sent before append
      43                 :     pub send_proposer_elected: bool,
      44                 : 
      45                 :     // fields from AppendRequestHeader
      46                 :     pub term: Term,
      47                 :     pub epoch_start_lsn: Lsn,
      48                 :     pub begin_lsn: Lsn,
      49                 :     pub truncate_lsn: Lsn,
      50                 :     pub pg_version: u32,
      51                 : }
      52                 : 
      53               3 : #[derive(Debug, Serialize)]
      54                 : struct AppendResult {
      55                 :     // safekeeper state after append
      56                 :     state: SafeKeeperState,
      57                 :     // info about new record in the WAL
      58                 :     inserted_wal: InsertedWAL,
      59                 : }
      60                 : 
      61                 : /// Handles command to craft logical message WAL record with given
      62                 : /// content, and then append it with specified term and lsn. This
      63                 : /// function is used to test safekeepers in different scenarios.
      64               3 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
      65               3 :     spg: &SafekeeperPostgresHandler,
      66               3 :     pgb: &mut PostgresBackend<IO>,
      67               3 :     append_request: &AppendLogicalMessage,
      68               3 : ) -> Result<(), QueryError> {
      69               3 :     info!("JSON_CTRL request: {append_request:?}");
      70                 : 
      71                 :     // need to init safekeeper state before AppendRequest
      72              17 :     let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
      73                 : 
      74                 :     // if send_proposer_elected is true, we need to update local history
      75               3 :     if append_request.send_proposer_elected {
      76            9678 :         send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
      77 UBC           0 :     }
      78                 : 
      79 CBC          18 :     let inserted_wal = append_logical_message(&tli, append_request).await?;
      80               3 :     let response = AppendResult {
      81               3 :         state: tli.get_state().await.1,
      82               3 :         inserted_wal,
      83                 :     };
      84               3 :     let response_data = serde_json::to_vec(&response)
      85               3 :         .with_context(|| format!("Response {response:?} is not a json array"))?;
      86                 : 
      87               3 :     pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
      88               3 :         name: b"json",
      89               3 :         typoid: TEXT_OID,
      90               3 :         typlen: -1,
      91               3 :         ..Default::default()
      92               3 :     }]))?
      93               3 :     .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
      94               3 :     .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
      95               3 :     Ok(())
      96               3 : }
      97                 : 
      98                 : /// Prepare safekeeper to process append requests without crashes,
      99                 : /// by sending ProposerGreeting with default server.wal_seg_size.
     100               3 : async fn prepare_safekeeper(
     101               3 :     ttid: TenantTimelineId,
     102               3 :     pg_version: u32,
     103               3 : ) -> anyhow::Result<Arc<Timeline>> {
     104               3 :     GlobalTimelines::create(
     105               3 :         ttid,
     106               3 :         ServerInfo {
     107               3 :             pg_version,
     108               3 :             wal_seg_size: WAL_SEGMENT_SIZE as u32,
     109               3 :             system_id: 0,
     110               3 :         },
     111               3 :         Lsn::INVALID,
     112               3 :         Lsn::INVALID,
     113               3 :     )
     114              17 :     .await
     115               3 : }
     116                 : 
     117               3 : async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
     118                 :     // add new term to existing history
     119               3 :     let history = tli.get_state().await.1.acceptor_state.term_history;
     120               3 :     let history = history.up_to(lsn.checked_sub(1u64).unwrap());
     121               3 :     let mut history_entries = history.0;
     122               3 :     history_entries.push(TermLsn { term, lsn });
     123               3 :     let history = TermHistory(history_entries);
     124               3 : 
     125               3 :     let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
     126               3 :         term,
     127               3 :         start_streaming_at: lsn,
     128               3 :         term_history: history,
     129               3 :         timeline_start_lsn: lsn,
     130               3 :     });
     131               3 : 
     132            9678 :     tli.process_msg(&proposer_elected_request).await?;
     133               3 :     Ok(())
     134               3 : }
     135                 : 
     136               3 : #[derive(Debug, Serialize)]
     137                 : pub struct InsertedWAL {
     138                 :     begin_lsn: Lsn,
     139                 :     pub end_lsn: Lsn,
     140                 :     append_response: AppendResponse,
     141                 : }
     142                 : 
     143                 : /// Extend local WAL with new LogicalMessage record. To do that,
     144                 : /// create AppendRequest with new WAL and pass it to safekeeper.
     145               3 : pub async fn append_logical_message(
     146               3 :     tli: &Arc<Timeline>,
     147               3 :     msg: &AppendLogicalMessage,
     148               3 : ) -> anyhow::Result<InsertedWAL> {
     149               3 :     let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
     150               3 :     let sk_state = tli.get_state().await.1;
     151                 : 
     152               3 :     let begin_lsn = msg.begin_lsn;
     153               3 :     let end_lsn = begin_lsn + wal_data.len() as u64;
     154                 : 
     155               3 :     let commit_lsn = if msg.set_commit_lsn {
     156               3 :         end_lsn
     157                 :     } else {
     158 UBC           0 :         sk_state.commit_lsn
     159                 :     };
     160                 : 
     161 CBC           3 :     let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
     162               3 :         h: AppendRequestHeader {
     163               3 :             term: msg.term,
     164               3 :             epoch_start_lsn: begin_lsn,
     165               3 :             begin_lsn,
     166               3 :             end_lsn,
     167               3 :             commit_lsn,
     168               3 :             truncate_lsn: msg.truncate_lsn,
     169               3 :             proposer_uuid: [0u8; 16],
     170               3 :         },
     171               3 :         wal_data: Bytes::from(wal_data),
     172               3 :     });
     173                 : 
     174              18 :     let response = tli.process_msg(&append_request).await?;
     175                 : 
     176               3 :     let append_response = match response {
     177               3 :         Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
     178 UBC           0 :         _ => anyhow::bail!("not AppendResponse"),
     179                 :     };
     180                 : 
     181 CBC           3 :     Ok(InsertedWAL {
     182               3 :         begin_lsn,
     183               3 :         end_lsn,
     184               3 :         append_response,
     185               3 :     })
     186               3 : }
        

Generated by: LCOV version 2.1-beta