Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use anyhow::Context;
10 : use postgres_backend::QueryError;
11 : use serde::{Deserialize, Serialize};
12 : use tokio::io::{AsyncRead, AsyncWrite};
13 : use tracing::*;
14 :
15 : use crate::handler::SafekeeperPostgresHandler;
16 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
17 : use crate::safekeeper::{
18 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
19 : };
20 : use crate::safekeeper::{Term, TermHistory, TermLsn};
21 : use crate::state::TimelinePersistentState;
22 : use crate::timeline::WalResidentTimeline;
23 : use postgres_backend::PostgresBackend;
24 : use postgres_ffi::encode_logical_message;
25 : use postgres_ffi::WAL_SEGMENT_SIZE;
26 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
27 : use utils::lsn::Lsn;
28 :
29 0 : #[derive(Serialize, Deserialize, Debug)]
30 : pub struct AppendLogicalMessage {
31 : // prefix and message to build LogicalMessage
32 : pub lm_prefix: String,
33 : pub lm_message: String,
34 :
35 : // if true, commit_lsn will match flush_lsn after append
36 : pub set_commit_lsn: bool,
37 :
38 : // if true, ProposerElected will be sent before append
39 : pub send_proposer_elected: bool,
40 :
41 : // fields from AppendRequestHeader
42 : pub term: Term,
43 : #[serde(with = "utils::lsn::serde_as_u64")]
44 : pub epoch_start_lsn: Lsn,
45 : #[serde(with = "utils::lsn::serde_as_u64")]
46 : pub begin_lsn: Lsn,
47 : #[serde(with = "utils::lsn::serde_as_u64")]
48 : pub truncate_lsn: Lsn,
49 : pub pg_version: u32,
50 : }
51 :
52 : #[derive(Debug, Serialize)]
53 : struct AppendResult {
54 : // safekeeper state after append
55 : state: TimelinePersistentState,
56 : // info about new record in the WAL
57 : inserted_wal: InsertedWAL,
58 : }
59 :
60 : /// Handles command to craft logical message WAL record with given
61 : /// content, and then append it with specified term and lsn. This
62 : /// function is used to test safekeepers in different scenarios.
63 0 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
64 0 : spg: &SafekeeperPostgresHandler,
65 0 : pgb: &mut PostgresBackend<IO>,
66 0 : append_request: &AppendLogicalMessage,
67 0 : ) -> Result<(), QueryError> {
68 0 : info!("JSON_CTRL request: {append_request:?}");
69 :
70 : // need to init safekeeper state before AppendRequest
71 0 : let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
72 :
73 : // if send_proposer_elected is true, we need to update local history
74 0 : if append_request.send_proposer_elected {
75 0 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
76 0 : }
77 :
78 0 : let inserted_wal = append_logical_message(&tli, append_request).await?;
79 0 : let response = AppendResult {
80 0 : state: tli.get_state().await.1,
81 0 : inserted_wal,
82 : };
83 0 : let response_data = serde_json::to_vec(&response)
84 0 : .with_context(|| format!("Response {response:?} is not a json array"))?;
85 :
86 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
87 0 : name: b"json",
88 0 : typoid: TEXT_OID,
89 0 : typlen: -1,
90 0 : ..Default::default()
91 0 : }]))?
92 0 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
93 0 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
94 0 : Ok(())
95 0 : }
96 :
97 : /// Prepare safekeeper to process append requests without crashes,
98 : /// by sending ProposerGreeting with default server.wal_seg_size.
99 0 : async fn prepare_safekeeper(
100 0 : spg: &SafekeeperPostgresHandler,
101 0 : pg_version: u32,
102 0 : ) -> anyhow::Result<WalResidentTimeline> {
103 0 : let tli = spg
104 0 : .global_timelines
105 0 : .create(
106 0 : spg.ttid,
107 0 : ServerInfo {
108 0 : pg_version,
109 0 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
110 0 : system_id: 0,
111 0 : },
112 0 : Lsn::INVALID,
113 0 : Lsn::INVALID,
114 0 : )
115 0 : .await?;
116 :
117 0 : tli.wal_residence_guard().await
118 0 : }
119 :
120 0 : async fn send_proposer_elected(
121 0 : tli: &WalResidentTimeline,
122 0 : term: Term,
123 0 : lsn: Lsn,
124 0 : ) -> anyhow::Result<()> {
125 : // add new term to existing history
126 0 : let history = tli.get_state().await.1.acceptor_state.term_history;
127 0 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
128 0 : let mut history_entries = history.0;
129 0 : history_entries.push(TermLsn { term, lsn });
130 0 : let history = TermHistory(history_entries);
131 0 :
132 0 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
133 0 : term,
134 0 : start_streaming_at: lsn,
135 0 : term_history: history,
136 0 : timeline_start_lsn: lsn,
137 0 : });
138 0 :
139 0 : tli.process_msg(&proposer_elected_request).await?;
140 0 : Ok(())
141 0 : }
142 :
143 : #[derive(Debug, Serialize)]
144 : pub struct InsertedWAL {
145 : begin_lsn: Lsn,
146 : pub end_lsn: Lsn,
147 : append_response: AppendResponse,
148 : }
149 :
150 : /// Extend local WAL with new LogicalMessage record. To do that,
151 : /// create AppendRequest with new WAL and pass it to safekeeper.
152 0 : pub async fn append_logical_message(
153 0 : tli: &WalResidentTimeline,
154 0 : msg: &AppendLogicalMessage,
155 0 : ) -> anyhow::Result<InsertedWAL> {
156 0 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
157 0 : let sk_state = tli.get_state().await.1;
158 :
159 0 : let begin_lsn = msg.begin_lsn;
160 0 : let end_lsn = begin_lsn + wal_data.len() as u64;
161 :
162 0 : let commit_lsn = if msg.set_commit_lsn {
163 0 : end_lsn
164 : } else {
165 0 : sk_state.commit_lsn
166 : };
167 :
168 0 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
169 0 : h: AppendRequestHeader {
170 0 : term: msg.term,
171 0 : term_start_lsn: begin_lsn,
172 0 : begin_lsn,
173 0 : end_lsn,
174 0 : commit_lsn,
175 0 : truncate_lsn: msg.truncate_lsn,
176 0 : proposer_uuid: [0u8; 16],
177 0 : },
178 0 : wal_data,
179 0 : });
180 :
181 0 : let response = tli.process_msg(&append_request).await?;
182 :
183 0 : let append_response = match response {
184 0 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
185 0 : _ => anyhow::bail!("not AppendResponse"),
186 : };
187 :
188 0 : Ok(InsertedWAL {
189 0 : begin_lsn,
190 0 : end_lsn,
191 0 : append_response,
192 0 : })
193 0 : }
|