Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use anyhow::Context;
10 : use bytes::Bytes;
11 : use postgres_backend::QueryError;
12 : use serde::{Deserialize, Serialize};
13 : use tokio::io::{AsyncRead, AsyncWrite};
14 : use tracing::*;
15 : use utils::id::TenantTimelineId;
16 :
17 : use crate::handler::SafekeeperPostgresHandler;
18 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
19 : use crate::safekeeper::{
20 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
21 : };
22 : use crate::safekeeper::{Term, TermHistory, TermLsn};
23 : use crate::state::TimelinePersistentState;
24 : use crate::timeline::WalResidentTimeline;
25 : use crate::GlobalTimelines;
26 : use postgres_backend::PostgresBackend;
27 : use postgres_ffi::encode_logical_message;
28 : use postgres_ffi::WAL_SEGMENT_SIZE;
29 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
30 : use utils::lsn::Lsn;
31 :
32 0 : #[derive(Serialize, Deserialize, Debug)]
33 : pub struct AppendLogicalMessage {
34 : // prefix and message to build LogicalMessage
35 : pub lm_prefix: String,
36 : pub lm_message: String,
37 :
38 : // if true, commit_lsn will match flush_lsn after append
39 : pub set_commit_lsn: bool,
40 :
41 : // if true, ProposerElected will be sent before append
42 : pub send_proposer_elected: bool,
43 :
44 : // fields from AppendRequestHeader
45 : pub term: Term,
46 : #[serde(with = "utils::lsn::serde_as_u64")]
47 : pub epoch_start_lsn: Lsn,
48 : #[serde(with = "utils::lsn::serde_as_u64")]
49 : pub begin_lsn: Lsn,
50 : #[serde(with = "utils::lsn::serde_as_u64")]
51 : pub truncate_lsn: Lsn,
52 : pub pg_version: u32,
53 : }
54 :
55 : #[derive(Debug, Serialize)]
56 : struct AppendResult {
57 : // safekeeper state after append
58 : state: TimelinePersistentState,
59 : // info about new record in the WAL
60 : inserted_wal: InsertedWAL,
61 : }
62 :
63 : /// Handles command to craft logical message WAL record with given
64 : /// content, and then append it with specified term and lsn. This
65 : /// function is used to test safekeepers in different scenarios.
66 0 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
67 0 : spg: &SafekeeperPostgresHandler,
68 0 : pgb: &mut PostgresBackend<IO>,
69 0 : append_request: &AppendLogicalMessage,
70 0 : ) -> Result<(), QueryError> {
71 0 : info!("JSON_CTRL request: {append_request:?}");
72 :
73 : // need to init safekeeper state before AppendRequest
74 0 : let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
75 :
76 : // if send_proposer_elected is true, we need to update local history
77 0 : if append_request.send_proposer_elected {
78 0 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
79 0 : }
80 :
81 0 : let inserted_wal = append_logical_message(&tli, append_request).await?;
82 0 : let response = AppendResult {
83 0 : state: tli.get_state().await.1,
84 0 : inserted_wal,
85 : };
86 0 : let response_data = serde_json::to_vec(&response)
87 0 : .with_context(|| format!("Response {response:?} is not a json array"))?;
88 :
89 0 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
90 0 : name: b"json",
91 0 : typoid: TEXT_OID,
92 0 : typlen: -1,
93 0 : ..Default::default()
94 0 : }]))?
95 0 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
96 0 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
97 0 : Ok(())
98 0 : }
99 :
100 : /// Prepare safekeeper to process append requests without crashes,
101 : /// by sending ProposerGreeting with default server.wal_seg_size.
102 0 : async fn prepare_safekeeper(
103 0 : ttid: TenantTimelineId,
104 0 : pg_version: u32,
105 0 : ) -> anyhow::Result<WalResidentTimeline> {
106 0 : let tli = GlobalTimelines::create(
107 0 : ttid,
108 0 : ServerInfo {
109 0 : pg_version,
110 0 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
111 0 : system_id: 0,
112 0 : },
113 0 : Lsn::INVALID,
114 0 : Lsn::INVALID,
115 0 : )
116 0 : .await?;
117 :
118 0 : tli.wal_residence_guard().await
119 0 : }
120 :
121 0 : async fn send_proposer_elected(
122 0 : tli: &WalResidentTimeline,
123 0 : term: Term,
124 0 : lsn: Lsn,
125 0 : ) -> anyhow::Result<()> {
126 : // add new term to existing history
127 0 : let history = tli.get_state().await.1.acceptor_state.term_history;
128 0 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
129 0 : let mut history_entries = history.0;
130 0 : history_entries.push(TermLsn { term, lsn });
131 0 : let history = TermHistory(history_entries);
132 0 :
133 0 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
134 0 : term,
135 0 : start_streaming_at: lsn,
136 0 : term_history: history,
137 0 : timeline_start_lsn: lsn,
138 0 : });
139 0 :
140 0 : tli.process_msg(&proposer_elected_request).await?;
141 0 : Ok(())
142 0 : }
143 :
144 : #[derive(Debug, Serialize)]
145 : pub struct InsertedWAL {
146 : begin_lsn: Lsn,
147 : pub end_lsn: Lsn,
148 : append_response: AppendResponse,
149 : }
150 :
151 : /// Extend local WAL with new LogicalMessage record. To do that,
152 : /// create AppendRequest with new WAL and pass it to safekeeper.
153 0 : pub async fn append_logical_message(
154 0 : tli: &WalResidentTimeline,
155 0 : msg: &AppendLogicalMessage,
156 0 : ) -> anyhow::Result<InsertedWAL> {
157 0 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
158 0 : let sk_state = tli.get_state().await.1;
159 :
160 0 : let begin_lsn = msg.begin_lsn;
161 0 : let end_lsn = begin_lsn + wal_data.len() as u64;
162 :
163 0 : let commit_lsn = if msg.set_commit_lsn {
164 0 : end_lsn
165 : } else {
166 0 : sk_state.commit_lsn
167 : };
168 :
169 0 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
170 0 : h: AppendRequestHeader {
171 0 : term: msg.term,
172 0 : term_start_lsn: begin_lsn,
173 0 : begin_lsn,
174 0 : end_lsn,
175 0 : commit_lsn,
176 0 : truncate_lsn: msg.truncate_lsn,
177 0 : proposer_uuid: [0u8; 16],
178 0 : },
179 0 : wal_data: Bytes::from(wal_data),
180 0 : });
181 :
182 0 : let response = tli.process_msg(&append_request).await?;
183 :
184 0 : let append_response = match response {
185 0 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
186 0 : _ => anyhow::bail!("not AppendResponse"),
187 : };
188 :
189 0 : Ok(InsertedWAL {
190 0 : begin_lsn,
191 0 : end_lsn,
192 0 : append_response,
193 0 : })
194 0 : }
|