Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use anyhow::Context;
10 : use postgres_backend::QueryError;
11 : use safekeeper_api::membership::Configuration;
12 : use safekeeper_api::{ServerInfo, Term};
13 : use serde::{Deserialize, Serialize};
14 : use tokio::io::{AsyncRead, AsyncWrite};
15 : use tracing::*;
16 :
17 : use crate::handler::SafekeeperPostgresHandler;
18 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
19 : use crate::safekeeper::{
20 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
21 : };
22 : use crate::safekeeper::{TermHistory, TermLsn};
23 : use crate::state::TimelinePersistentState;
24 : use crate::timeline::WalResidentTimeline;
25 : use postgres_backend::PostgresBackend;
26 : use postgres_ffi::encode_logical_message;
27 : use postgres_ffi::WAL_SEGMENT_SIZE;
28 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
29 : use utils::lsn::Lsn;
30 :
31 0 : #[derive(Serialize, Deserialize, Debug)]
32 : pub struct AppendLogicalMessage {
33 : // prefix and message to build LogicalMessage
34 : pub lm_prefix: String,
35 : pub lm_message: String,
36 :
37 : // if true, commit_lsn will match flush_lsn after append
38 : pub set_commit_lsn: bool,
39 :
40 : // if true, ProposerElected will be sent before append
41 : pub send_proposer_elected: bool,
42 :
43 : // fields from AppendRequestHeader
44 : pub term: Term,
45 : #[serde(with = "utils::lsn::serde_as_u64")]
46 : pub epoch_start_lsn: Lsn,
47 : #[serde(with = "utils::lsn::serde_as_u64")]
48 : pub begin_lsn: Lsn,
49 : #[serde(with = "utils::lsn::serde_as_u64")]
50 : pub truncate_lsn: Lsn,
51 : pub pg_version: u32,
52 : }
53 :
54 : #[derive(Debug, Serialize)]
55 : struct AppendResult {
56 : // safekeeper state after append
57 : state: TimelinePersistentState,
58 : // info about new record in the WAL
59 : inserted_wal: InsertedWAL,
60 : }
61 :
62 : /// Handles command to craft logical message WAL record with given
63 : /// content, and then append it with specified term and lsn. This
64 : /// function is used to test safekeepers in different scenarios.
65 0 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
66 0 : spg: &SafekeeperPostgresHandler,
67 0 : pgb: &mut PostgresBackend<IO>,
68 0 : append_request: &AppendLogicalMessage,
69 0 : ) -> Result<(), QueryError> {
70 0 : info!("JSON_CTRL request: {append_request:?}");
71 :
72 : // need to init safekeeper state before AppendRequest
73 0 : let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
74 :
75 : // if send_proposer_elected is true, we need to update local history
76 0 : if append_request.send_proposer_elected {
77 0 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
78 0 : }
79 :
80 0 : let inserted_wal = append_logical_message(&tli, append_request).await?;
81 0 : let response = AppendResult {
82 0 : state: tli.get_state().await.1,
83 0 : inserted_wal,
84 : };
85 0 : let response_data = serde_json::to_vec(&response)
86 0 : .with_context(|| format!("Response {response:?} is not a json array"))?;
87 :
88 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
89 0 : name: b"json",
90 0 : typoid: TEXT_OID,
91 0 : typlen: -1,
92 0 : ..Default::default()
93 0 : }]))?
94 0 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
95 0 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
96 0 : Ok(())
97 0 : }
98 :
99 : /// Prepare safekeeper to process append requests without crashes,
100 : /// by sending ProposerGreeting with default server.wal_seg_size.
101 0 : async fn prepare_safekeeper(
102 0 : spg: &SafekeeperPostgresHandler,
103 0 : pg_version: u32,
104 0 : ) -> anyhow::Result<WalResidentTimeline> {
105 0 : let tli = spg
106 0 : .global_timelines
107 0 : .create(
108 0 : spg.ttid,
109 0 : Configuration::empty(),
110 0 : ServerInfo {
111 0 : pg_version,
112 0 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
113 0 : system_id: 0,
114 0 : },
115 0 : Lsn::INVALID,
116 0 : Lsn::INVALID,
117 0 : )
118 0 : .await?;
119 :
120 0 : tli.wal_residence_guard().await
121 0 : }
122 :
123 0 : async fn send_proposer_elected(
124 0 : tli: &WalResidentTimeline,
125 0 : term: Term,
126 0 : lsn: Lsn,
127 0 : ) -> anyhow::Result<()> {
128 : // add new term to existing history
129 0 : let history = tli.get_state().await.1.acceptor_state.term_history;
130 0 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
131 0 : let mut history_entries = history.0;
132 0 : history_entries.push(TermLsn { term, lsn });
133 0 : let history = TermHistory(history_entries);
134 0 :
135 0 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
136 0 : term,
137 0 : start_streaming_at: lsn,
138 0 : term_history: history,
139 0 : timeline_start_lsn: lsn,
140 0 : });
141 0 :
142 0 : tli.process_msg(&proposer_elected_request).await?;
143 0 : Ok(())
144 0 : }
145 :
146 : #[derive(Debug, Serialize)]
147 : pub struct InsertedWAL {
148 : begin_lsn: Lsn,
149 : pub end_lsn: Lsn,
150 : append_response: AppendResponse,
151 : }
152 :
153 : /// Extend local WAL with new LogicalMessage record. To do that,
154 : /// create AppendRequest with new WAL and pass it to safekeeper.
155 0 : pub async fn append_logical_message(
156 0 : tli: &WalResidentTimeline,
157 0 : msg: &AppendLogicalMessage,
158 0 : ) -> anyhow::Result<InsertedWAL> {
159 0 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
160 0 : let sk_state = tli.get_state().await.1;
161 :
162 0 : let begin_lsn = msg.begin_lsn;
163 0 : let end_lsn = begin_lsn + wal_data.len() as u64;
164 :
165 0 : let commit_lsn = if msg.set_commit_lsn {
166 0 : end_lsn
167 : } else {
168 0 : sk_state.commit_lsn
169 : };
170 :
171 0 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
172 0 : h: AppendRequestHeader {
173 0 : term: msg.term,
174 0 : term_start_lsn: begin_lsn,
175 0 : begin_lsn,
176 0 : end_lsn,
177 0 : commit_lsn,
178 0 : truncate_lsn: msg.truncate_lsn,
179 0 : proposer_uuid: [0u8; 16],
180 0 : },
181 0 : wal_data,
182 0 : });
183 :
184 0 : let response = tli.process_msg(&append_request).await?;
185 :
186 0 : let append_response = match response {
187 0 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
188 0 : _ => anyhow::bail!("not AppendResponse"),
189 : };
190 :
191 0 : Ok(InsertedWAL {
192 0 : begin_lsn,
193 0 : end_lsn,
194 0 : append_response,
195 0 : })
196 0 : }
|