LCOV - code coverage report
Current view: top level - safekeeper/src - json_ctrl.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 101 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 19 0

            Line data    Source code
       1              : //!
       2              : //! This module implements JSON_CTRL protocol, which allows exchange
       3              : //! JSON messages over psql for testing purposes.
       4              : //!
       5              : //! Currently supports AppendLogicalMessage, which is used for WAL
       6              : //! modifications in tests.
       7              : //!
       8              : 
       9              : use anyhow::Context;
      10              : use postgres_backend::QueryError;
      11              : use safekeeper_api::membership::Configuration;
      12              : use safekeeper_api::{ServerInfo, Term};
      13              : use serde::{Deserialize, Serialize};
      14              : use tokio::io::{AsyncRead, AsyncWrite};
      15              : use tracing::*;
      16              : 
      17              : use crate::handler::SafekeeperPostgresHandler;
      18              : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
      19              : use crate::safekeeper::{
      20              :     AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
      21              : };
      22              : use crate::safekeeper::{TermHistory, TermLsn};
      23              : use crate::state::TimelinePersistentState;
      24              : use crate::timeline::WalResidentTimeline;
      25              : use postgres_backend::PostgresBackend;
      26              : use postgres_ffi::encode_logical_message;
      27              : use postgres_ffi::WAL_SEGMENT_SIZE;
      28              : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
      29              : use utils::lsn::Lsn;
      30              : 
      31            0 : #[derive(Serialize, Deserialize, Debug)]
      32              : pub struct AppendLogicalMessage {
      33              :     // prefix and message to build LogicalMessage
      34              :     pub lm_prefix: String,
      35              :     pub lm_message: String,
      36              : 
      37              :     // if true, commit_lsn will match flush_lsn after append
      38              :     pub set_commit_lsn: bool,
      39              : 
      40              :     // if true, ProposerElected will be sent before append
      41              :     pub send_proposer_elected: bool,
      42              : 
      43              :     // fields from AppendRequestHeader
      44              :     pub term: Term,
      45              :     #[serde(with = "utils::lsn::serde_as_u64")]
      46              :     pub epoch_start_lsn: Lsn,
      47              :     #[serde(with = "utils::lsn::serde_as_u64")]
      48              :     pub begin_lsn: Lsn,
      49              :     #[serde(with = "utils::lsn::serde_as_u64")]
      50              :     pub truncate_lsn: Lsn,
      51              :     pub pg_version: u32,
      52              : }
      53              : 
      54              : #[derive(Debug, Serialize)]
      55              : struct AppendResult {
      56              :     // safekeeper state after append
      57              :     state: TimelinePersistentState,
      58              :     // info about new record in the WAL
      59              :     inserted_wal: InsertedWAL,
      60              : }
      61              : 
      62              : /// Handles command to craft logical message WAL record with given
      63              : /// content, and then append it with specified term and lsn. This
      64              : /// function is used to test safekeepers in different scenarios.
      65            0 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
      66            0 :     spg: &SafekeeperPostgresHandler,
      67            0 :     pgb: &mut PostgresBackend<IO>,
      68            0 :     append_request: &AppendLogicalMessage,
      69            0 : ) -> Result<(), QueryError> {
      70            0 :     info!("JSON_CTRL request: {append_request:?}");
      71              : 
      72              :     // need to init safekeeper state before AppendRequest
      73            0 :     let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
      74              : 
      75              :     // if send_proposer_elected is true, we need to update local history
      76            0 :     if append_request.send_proposer_elected {
      77            0 :         send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
      78            0 :     }
      79              : 
      80            0 :     let inserted_wal = append_logical_message(&tli, append_request).await?;
      81            0 :     let response = AppendResult {
      82            0 :         state: tli.get_state().await.1,
      83            0 :         inserted_wal,
      84              :     };
      85            0 :     let response_data = serde_json::to_vec(&response)
      86            0 :         .with_context(|| format!("Response {response:?} is not a json array"))?;
      87              : 
      88            0 :     pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
      89            0 :         name: b"json",
      90            0 :         typoid: TEXT_OID,
      91            0 :         typlen: -1,
      92            0 :         ..Default::default()
      93            0 :     }]))?
      94            0 :     .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
      95            0 :     .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
      96            0 :     Ok(())
      97            0 : }
      98              : 
      99              : /// Prepare safekeeper to process append requests without crashes,
     100              : /// by sending ProposerGreeting with default server.wal_seg_size.
     101            0 : async fn prepare_safekeeper(
     102            0 :     spg: &SafekeeperPostgresHandler,
     103            0 :     pg_version: u32,
     104            0 : ) -> anyhow::Result<WalResidentTimeline> {
     105            0 :     let tli = spg
     106            0 :         .global_timelines
     107            0 :         .create(
     108            0 :             spg.ttid,
     109            0 :             Configuration::empty(),
     110            0 :             ServerInfo {
     111            0 :                 pg_version,
     112            0 :                 wal_seg_size: WAL_SEGMENT_SIZE as u32,
     113            0 :                 system_id: 0,
     114            0 :             },
     115            0 :             Lsn::INVALID,
     116            0 :             Lsn::INVALID,
     117            0 :         )
     118            0 :         .await?;
     119              : 
     120            0 :     tli.wal_residence_guard().await
     121            0 : }
     122              : 
     123            0 : async fn send_proposer_elected(
     124            0 :     tli: &WalResidentTimeline,
     125            0 :     term: Term,
     126            0 :     lsn: Lsn,
     127            0 : ) -> anyhow::Result<()> {
     128              :     // add new term to existing history
     129            0 :     let history = tli.get_state().await.1.acceptor_state.term_history;
     130            0 :     let history = history.up_to(lsn.checked_sub(1u64).unwrap());
     131            0 :     let mut history_entries = history.0;
     132            0 :     history_entries.push(TermLsn { term, lsn });
     133            0 :     let history = TermHistory(history_entries);
     134            0 : 
     135            0 :     let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
     136            0 :         term,
     137            0 :         start_streaming_at: lsn,
     138            0 :         term_history: history,
     139            0 :         timeline_start_lsn: lsn,
     140            0 :     });
     141            0 : 
     142            0 :     tli.process_msg(&proposer_elected_request).await?;
     143            0 :     Ok(())
     144            0 : }
     145              : 
     146              : #[derive(Debug, Serialize)]
     147              : pub struct InsertedWAL {
     148              :     begin_lsn: Lsn,
     149              :     pub end_lsn: Lsn,
     150              :     append_response: AppendResponse,
     151              : }
     152              : 
     153              : /// Extend local WAL with new LogicalMessage record. To do that,
     154              : /// create AppendRequest with new WAL and pass it to safekeeper.
     155            0 : pub async fn append_logical_message(
     156            0 :     tli: &WalResidentTimeline,
     157            0 :     msg: &AppendLogicalMessage,
     158            0 : ) -> anyhow::Result<InsertedWAL> {
     159            0 :     let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
     160            0 :     let sk_state = tli.get_state().await.1;
     161              : 
     162            0 :     let begin_lsn = msg.begin_lsn;
     163            0 :     let end_lsn = begin_lsn + wal_data.len() as u64;
     164              : 
     165            0 :     let commit_lsn = if msg.set_commit_lsn {
     166            0 :         end_lsn
     167              :     } else {
     168            0 :         sk_state.commit_lsn
     169              :     };
     170              : 
     171            0 :     let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
     172            0 :         h: AppendRequestHeader {
     173            0 :             term: msg.term,
     174            0 :             term_start_lsn: begin_lsn,
     175            0 :             begin_lsn,
     176            0 :             end_lsn,
     177            0 :             commit_lsn,
     178            0 :             truncate_lsn: msg.truncate_lsn,
     179            0 :             proposer_uuid: [0u8; 16],
     180            0 :         },
     181            0 :         wal_data,
     182            0 :     });
     183              : 
     184            0 :     let response = tli.process_msg(&append_request).await?;
     185              : 
     186            0 :     let append_response = match response {
     187            0 :         Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
     188            0 :         _ => anyhow::bail!("not AppendResponse"),
     189              :     };
     190              : 
     191            0 :     Ok(InsertedWAL {
     192            0 :         begin_lsn,
     193            0 :         end_lsn,
     194            0 :         append_response,
     195            0 :     })
     196            0 : }
        

Generated by: LCOV version 2.1-beta