Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use anyhow::Context;
10 : use postgres_backend::QueryError;
11 : use serde::{Deserialize, Serialize};
12 : use tokio::io::{AsyncRead, AsyncWrite};
13 : use tracing::*;
14 : use utils::id::TenantTimelineId;
15 :
16 : use crate::handler::SafekeeperPostgresHandler;
17 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
18 : use crate::safekeeper::{
19 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
20 : };
21 : use crate::safekeeper::{Term, TermHistory, TermLsn};
22 : use crate::state::TimelinePersistentState;
23 : use crate::timeline::WalResidentTimeline;
24 : use crate::GlobalTimelines;
25 : use postgres_backend::PostgresBackend;
26 : use postgres_ffi::encode_logical_message;
27 : use postgres_ffi::WAL_SEGMENT_SIZE;
28 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
29 : use utils::lsn::Lsn;
30 :
31 0 : #[derive(Serialize, Deserialize, Debug)]
32 : pub struct AppendLogicalMessage {
33 : // prefix and message to build LogicalMessage
34 : pub lm_prefix: String,
35 : pub lm_message: String,
36 :
37 : // if true, commit_lsn will match flush_lsn after append
38 : pub set_commit_lsn: bool,
39 :
40 : // if true, ProposerElected will be sent before append
41 : pub send_proposer_elected: bool,
42 :
43 : // fields from AppendRequestHeader
44 : pub term: Term,
45 : #[serde(with = "utils::lsn::serde_as_u64")]
46 : pub epoch_start_lsn: Lsn,
47 : #[serde(with = "utils::lsn::serde_as_u64")]
48 : pub begin_lsn: Lsn,
49 : #[serde(with = "utils::lsn::serde_as_u64")]
50 : pub truncate_lsn: Lsn,
51 : pub pg_version: u32,
52 : }
53 :
54 : #[derive(Debug, Serialize)]
55 : struct AppendResult {
56 : // safekeeper state after append
57 : state: TimelinePersistentState,
58 : // info about new record in the WAL
59 : inserted_wal: InsertedWAL,
60 : }
61 :
62 : /// Handles command to craft logical message WAL record with given
63 : /// content, and then append it with specified term and lsn. This
64 : /// function is used to test safekeepers in different scenarios.
65 0 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
66 0 : spg: &SafekeeperPostgresHandler,
67 0 : pgb: &mut PostgresBackend<IO>,
68 0 : append_request: &AppendLogicalMessage,
69 0 : ) -> Result<(), QueryError> {
70 0 : info!("JSON_CTRL request: {append_request:?}");
71 :
72 : // need to init safekeeper state before AppendRequest
73 0 : let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
74 :
75 : // if send_proposer_elected is true, we need to update local history
76 0 : if append_request.send_proposer_elected {
77 0 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
78 0 : }
79 :
80 0 : let inserted_wal = append_logical_message(&tli, append_request).await?;
81 0 : let response = AppendResult {
82 0 : state: tli.get_state().await.1,
83 0 : inserted_wal,
84 : };
85 0 : let response_data = serde_json::to_vec(&response)
86 0 : .with_context(|| format!("Response {response:?} is not a json array"))?;
87 :
88 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
89 0 : name: b"json",
90 0 : typoid: TEXT_OID,
91 0 : typlen: -1,
92 0 : ..Default::default()
93 0 : }]))?
94 0 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
95 0 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
96 0 : Ok(())
97 0 : }
98 :
99 : /// Prepare safekeeper to process append requests without crashes,
100 : /// by sending ProposerGreeting with default server.wal_seg_size.
101 0 : async fn prepare_safekeeper(
102 0 : ttid: TenantTimelineId,
103 0 : pg_version: u32,
104 0 : ) -> anyhow::Result<WalResidentTimeline> {
105 0 : let tli = GlobalTimelines::create(
106 0 : ttid,
107 0 : ServerInfo {
108 0 : pg_version,
109 0 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
110 0 : system_id: 0,
111 0 : },
112 0 : Lsn::INVALID,
113 0 : Lsn::INVALID,
114 0 : )
115 0 : .await?;
116 :
117 0 : tli.wal_residence_guard().await
118 0 : }
119 :
120 0 : async fn send_proposer_elected(
121 0 : tli: &WalResidentTimeline,
122 0 : term: Term,
123 0 : lsn: Lsn,
124 0 : ) -> anyhow::Result<()> {
125 : // add new term to existing history
126 0 : let history = tli.get_state().await.1.acceptor_state.term_history;
127 0 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
128 0 : let mut history_entries = history.0;
129 0 : history_entries.push(TermLsn { term, lsn });
130 0 : let history = TermHistory(history_entries);
131 0 :
132 0 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
133 0 : term,
134 0 : start_streaming_at: lsn,
135 0 : term_history: history,
136 0 : timeline_start_lsn: lsn,
137 0 : });
138 0 :
139 0 : tli.process_msg(&proposer_elected_request).await?;
140 0 : Ok(())
141 0 : }
142 :
143 : #[derive(Debug, Serialize)]
144 : pub struct InsertedWAL {
145 : begin_lsn: Lsn,
146 : pub end_lsn: Lsn,
147 : append_response: AppendResponse,
148 : }
149 :
150 : /// Extend local WAL with new LogicalMessage record. To do that,
151 : /// create AppendRequest with new WAL and pass it to safekeeper.
152 0 : pub async fn append_logical_message(
153 0 : tli: &WalResidentTimeline,
154 0 : msg: &AppendLogicalMessage,
155 0 : ) -> anyhow::Result<InsertedWAL> {
156 0 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
157 0 : let sk_state = tli.get_state().await.1;
158 :
159 0 : let begin_lsn = msg.begin_lsn;
160 0 : let end_lsn = begin_lsn + wal_data.len() as u64;
161 :
162 0 : let commit_lsn = if msg.set_commit_lsn {
163 0 : end_lsn
164 : } else {
165 0 : sk_state.commit_lsn
166 : };
167 :
168 0 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
169 0 : h: AppendRequestHeader {
170 0 : term: msg.term,
171 0 : term_start_lsn: begin_lsn,
172 0 : begin_lsn,
173 0 : end_lsn,
174 0 : commit_lsn,
175 0 : truncate_lsn: msg.truncate_lsn,
176 0 : proposer_uuid: [0u8; 16],
177 0 : },
178 0 : wal_data,
179 0 : });
180 :
181 0 : let response = tli.process_msg(&append_request).await?;
182 :
183 0 : let append_response = match response {
184 0 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
185 0 : _ => anyhow::bail!("not AppendResponse"),
186 : };
187 :
188 0 : Ok(InsertedWAL {
189 0 : begin_lsn,
190 0 : end_lsn,
191 0 : append_response,
192 0 : })
193 0 : }
|