LCOV - code coverage report
Current view: top level - safekeeper/src - json_ctrl.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 96.8 % 95 92
Test Date: 2024-02-12 20:26:03 Functions: 44.7 % 38 17

            Line data    Source code
       1              : //!
       2              : //! This module implements JSON_CTRL protocol, which allows exchange
       3              : //! JSON messages over psql for testing purposes.
       4              : //!
       5              : //! Currently supports AppendLogicalMessage, which is used for WAL
       6              : //! modifications in tests.
       7              : //!
       8              : 
       9              : use std::sync::Arc;
      10              : 
      11              : use anyhow::Context;
      12              : use bytes::Bytes;
      13              : use postgres_backend::QueryError;
      14              : use serde::{Deserialize, Serialize};
      15              : use tokio::io::{AsyncRead, AsyncWrite};
      16              : use tracing::*;
      17              : use utils::id::TenantTimelineId;
      18              : 
      19              : use crate::handler::SafekeeperPostgresHandler;
      20              : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
      21              : use crate::safekeeper::{
      22              :     AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
      23              : };
      24              : use crate::safekeeper::{Term, TermHistory, TermLsn};
      25              : use crate::state::TimelinePersistentState;
      26              : use crate::timeline::Timeline;
      27              : use crate::GlobalTimelines;
      28              : use postgres_backend::PostgresBackend;
      29              : use postgres_ffi::encode_logical_message;
      30              : use postgres_ffi::WAL_SEGMENT_SIZE;
      31              : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
      32              : use utils::lsn::Lsn;
      33              : 
      34           66 : #[derive(Serialize, Deserialize, Debug)]
      35              : pub struct AppendLogicalMessage {
      36              :     // prefix and message to build LogicalMessage
      37              :     pub lm_prefix: String,
      38              :     pub lm_message: String,
      39              : 
      40              :     // if true, commit_lsn will match flush_lsn after append
      41              :     pub set_commit_lsn: bool,
      42              : 
      43              :     // if true, ProposerElected will be sent before append
      44              :     pub send_proposer_elected: bool,
      45              : 
      46              :     // fields from AppendRequestHeader
      47              :     pub term: Term,
      48              :     #[serde(with = "utils::lsn::serde_as_u64")]
      49              :     pub epoch_start_lsn: Lsn,
      50              :     #[serde(with = "utils::lsn::serde_as_u64")]
      51              :     pub begin_lsn: Lsn,
      52              :     #[serde(with = "utils::lsn::serde_as_u64")]
      53              :     pub truncate_lsn: Lsn,
      54              :     pub pg_version: u32,
      55              : }
      56              : 
      57            3 : #[derive(Debug, Serialize)]
      58              : struct AppendResult {
      59              :     // safekeeper state after append
      60              :     state: TimelinePersistentState,
      61              :     // info about new record in the WAL
      62              :     inserted_wal: InsertedWAL,
      63              : }
      64              : 
      65              : /// Handles command to craft logical message WAL record with given
      66              : /// content, and then append it with specified term and lsn. This
      67              : /// function is used to test safekeepers in different scenarios.
      68            3 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
      69            3 :     spg: &SafekeeperPostgresHandler,
      70            3 :     pgb: &mut PostgresBackend<IO>,
      71            3 :     append_request: &AppendLogicalMessage,
      72            3 : ) -> Result<(), QueryError> {
      73            3 :     info!("JSON_CTRL request: {append_request:?}");
      74              : 
      75              :     // need to init safekeeper state before AppendRequest
      76           17 :     let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
      77              : 
      78              :     // if send_proposer_elected is true, we need to update local history
      79            3 :     if append_request.send_proposer_elected {
      80         9744 :         send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
      81            0 :     }
      82              : 
      83           12 :     let inserted_wal = append_logical_message(&tli, append_request).await?;
      84            3 :     let response = AppendResult {
      85            3 :         state: tli.get_state().await.1,
      86            3 :         inserted_wal,
      87              :     };
      88            3 :     let response_data = serde_json::to_vec(&response)
      89            3 :         .with_context(|| format!("Response {response:?} is not a json array"))?;
      90              : 
      91            3 :     pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
      92            3 :         name: b"json",
      93            3 :         typoid: TEXT_OID,
      94            3 :         typlen: -1,
      95            3 :         ..Default::default()
      96            3 :     }]))?
      97            3 :     .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
      98            3 :     .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
      99            3 :     Ok(())
     100            3 : }
     101              : 
     102              : /// Prepare safekeeper to process append requests without crashes,
     103              : /// by sending ProposerGreeting with default server.wal_seg_size.
     104            3 : async fn prepare_safekeeper(
     105            3 :     ttid: TenantTimelineId,
     106            3 :     pg_version: u32,
     107            3 : ) -> anyhow::Result<Arc<Timeline>> {
     108            3 :     GlobalTimelines::create(
     109            3 :         ttid,
     110            3 :         ServerInfo {
     111            3 :             pg_version,
     112            3 :             wal_seg_size: WAL_SEGMENT_SIZE as u32,
     113            3 :             system_id: 0,
     114            3 :         },
     115            3 :         Lsn::INVALID,
     116            3 :         Lsn::INVALID,
     117            3 :     )
     118           17 :     .await
     119            3 : }
     120              : 
     121            3 : async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
     122              :     // add new term to existing history
     123            3 :     let history = tli.get_state().await.1.acceptor_state.term_history;
     124            3 :     let history = history.up_to(lsn.checked_sub(1u64).unwrap());
     125            3 :     let mut history_entries = history.0;
     126            3 :     history_entries.push(TermLsn { term, lsn });
     127            3 :     let history = TermHistory(history_entries);
     128            3 : 
     129            3 :     let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
     130            3 :         term,
     131            3 :         start_streaming_at: lsn,
     132            3 :         term_history: history,
     133            3 :         timeline_start_lsn: lsn,
     134            3 :     });
     135            3 : 
     136         9744 :     tli.process_msg(&proposer_elected_request).await?;
     137            3 :     Ok(())
     138            3 : }
     139              : 
     140            3 : #[derive(Debug, Serialize)]
     141              : pub struct InsertedWAL {
     142              :     begin_lsn: Lsn,
     143              :     pub end_lsn: Lsn,
     144              :     append_response: AppendResponse,
     145              : }
     146              : 
     147              : /// Extend local WAL with new LogicalMessage record. To do that,
     148              : /// create AppendRequest with new WAL and pass it to safekeeper.
     149            3 : pub async fn append_logical_message(
     150            3 :     tli: &Arc<Timeline>,
     151            3 :     msg: &AppendLogicalMessage,
     152            3 : ) -> anyhow::Result<InsertedWAL> {
     153            3 :     let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
     154            3 :     let sk_state = tli.get_state().await.1;
     155              : 
     156            3 :     let begin_lsn = msg.begin_lsn;
     157            3 :     let end_lsn = begin_lsn + wal_data.len() as u64;
     158              : 
     159            3 :     let commit_lsn = if msg.set_commit_lsn {
     160            3 :         end_lsn
     161              :     } else {
     162            0 :         sk_state.commit_lsn
     163              :     };
     164              : 
     165            3 :     let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
     166            3 :         h: AppendRequestHeader {
     167            3 :             term: msg.term,
     168            3 :             epoch_start_lsn: begin_lsn,
     169            3 :             begin_lsn,
     170            3 :             end_lsn,
     171            3 :             commit_lsn,
     172            3 :             truncate_lsn: msg.truncate_lsn,
     173            3 :             proposer_uuid: [0u8; 16],
     174            3 :         },
     175            3 :         wal_data: Bytes::from(wal_data),
     176            3 :     });
     177              : 
     178           12 :     let response = tli.process_msg(&append_request).await?;
     179              : 
     180            3 :     let append_response = match response {
     181            3 :         Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
     182            0 :         _ => anyhow::bail!("not AppendResponse"),
     183              :     };
     184              : 
     185            3 :     Ok(InsertedWAL {
     186            3 :         begin_lsn,
     187            3 :         end_lsn,
     188            3 :         append_response,
     189            3 :     })
     190            3 : }
        

Generated by: LCOV version 2.1-beta