TLA Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use std::sync::Arc;
10 :
11 : use anyhow::Context;
12 : use bytes::Bytes;
13 : use postgres_backend::QueryError;
14 : use serde::{Deserialize, Serialize};
15 : use tokio::io::{AsyncRead, AsyncWrite};
16 : use tracing::*;
17 : use utils::id::TenantTimelineId;
18 :
19 : use crate::handler::SafekeeperPostgresHandler;
20 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
21 : use crate::safekeeper::{
22 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
23 : };
24 : use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
25 : use crate::timeline::Timeline;
26 : use crate::GlobalTimelines;
27 : use postgres_backend::PostgresBackend;
28 : use postgres_ffi::encode_logical_message;
29 : use postgres_ffi::WAL_SEGMENT_SIZE;
30 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
31 : use utils::lsn::Lsn;
32 :
33 CBC 66 : #[derive(Serialize, Deserialize, Debug)]
34 : pub struct AppendLogicalMessage {
35 : // prefix and message to build LogicalMessage
36 : pub lm_prefix: String,
37 : pub lm_message: String,
38 :
39 : // if true, commit_lsn will match flush_lsn after append
40 : pub set_commit_lsn: bool,
41 :
42 : // if true, ProposerElected will be sent before append
43 : pub send_proposer_elected: bool,
44 :
45 : // fields from AppendRequestHeader
46 : pub term: Term,
47 : #[serde(with = "utils::lsn::serde_as_u64")]
48 : pub epoch_start_lsn: Lsn,
49 : #[serde(with = "utils::lsn::serde_as_u64")]
50 : pub begin_lsn: Lsn,
51 : #[serde(with = "utils::lsn::serde_as_u64")]
52 : pub truncate_lsn: Lsn,
53 : pub pg_version: u32,
54 : }
55 :
56 3 : #[derive(Debug, Serialize)]
57 : struct AppendResult {
58 : // safekeeper state after append
59 : state: SafeKeeperState,
60 : // info about new record in the WAL
61 : inserted_wal: InsertedWAL,
62 : }
63 :
64 : /// Handles command to craft logical message WAL record with given
65 : /// content, and then append it with specified term and lsn. This
66 : /// function is used to test safekeepers in different scenarios.
67 3 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
68 3 : spg: &SafekeeperPostgresHandler,
69 3 : pgb: &mut PostgresBackend<IO>,
70 3 : append_request: &AppendLogicalMessage,
71 3 : ) -> Result<(), QueryError> {
72 3 : info!("JSON_CTRL request: {append_request:?}");
73 :
74 : // need to init safekeeper state before AppendRequest
75 17 : let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
76 :
77 : // if send_proposer_elected is true, we need to update local history
78 3 : if append_request.send_proposer_elected {
79 9682 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
80 UBC 0 : }
81 :
82 CBC 21 : let inserted_wal = append_logical_message(&tli, append_request).await?;
83 3 : let response = AppendResult {
84 3 : state: tli.get_state().await.1,
85 3 : inserted_wal,
86 : };
87 3 : let response_data = serde_json::to_vec(&response)
88 3 : .with_context(|| format!("Response {response:?} is not a json array"))?;
89 :
90 3 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
91 3 : name: b"json",
92 3 : typoid: TEXT_OID,
93 3 : typlen: -1,
94 3 : ..Default::default()
95 3 : }]))?
96 3 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
97 3 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
98 3 : Ok(())
99 3 : }
100 :
101 : /// Prepare safekeeper to process append requests without crashes,
102 : /// by sending ProposerGreeting with default server.wal_seg_size.
103 3 : async fn prepare_safekeeper(
104 3 : ttid: TenantTimelineId,
105 3 : pg_version: u32,
106 3 : ) -> anyhow::Result<Arc<Timeline>> {
107 3 : GlobalTimelines::create(
108 3 : ttid,
109 3 : ServerInfo {
110 3 : pg_version,
111 3 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
112 3 : system_id: 0,
113 3 : },
114 3 : Lsn::INVALID,
115 3 : Lsn::INVALID,
116 3 : )
117 17 : .await
118 3 : }
119 :
120 3 : async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
121 : // add new term to existing history
122 3 : let history = tli.get_state().await.1.acceptor_state.term_history;
123 3 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
124 3 : let mut history_entries = history.0;
125 3 : history_entries.push(TermLsn { term, lsn });
126 3 : let history = TermHistory(history_entries);
127 3 :
128 3 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
129 3 : term,
130 3 : start_streaming_at: lsn,
131 3 : term_history: history,
132 3 : timeline_start_lsn: lsn,
133 3 : });
134 3 :
135 9682 : tli.process_msg(&proposer_elected_request).await?;
136 3 : Ok(())
137 3 : }
138 :
139 3 : #[derive(Debug, Serialize)]
140 : pub struct InsertedWAL {
141 : begin_lsn: Lsn,
142 : pub end_lsn: Lsn,
143 : append_response: AppendResponse,
144 : }
145 :
146 : /// Extend local WAL with new LogicalMessage record. To do that,
147 : /// create AppendRequest with new WAL and pass it to safekeeper.
148 3 : pub async fn append_logical_message(
149 3 : tli: &Arc<Timeline>,
150 3 : msg: &AppendLogicalMessage,
151 3 : ) -> anyhow::Result<InsertedWAL> {
152 3 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
153 3 : let sk_state = tli.get_state().await.1;
154 :
155 3 : let begin_lsn = msg.begin_lsn;
156 3 : let end_lsn = begin_lsn + wal_data.len() as u64;
157 :
158 3 : let commit_lsn = if msg.set_commit_lsn {
159 3 : end_lsn
160 : } else {
161 UBC 0 : sk_state.commit_lsn
162 : };
163 :
164 CBC 3 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
165 3 : h: AppendRequestHeader {
166 3 : term: msg.term,
167 3 : epoch_start_lsn: begin_lsn,
168 3 : begin_lsn,
169 3 : end_lsn,
170 3 : commit_lsn,
171 3 : truncate_lsn: msg.truncate_lsn,
172 3 : proposer_uuid: [0u8; 16],
173 3 : },
174 3 : wal_data: Bytes::from(wal_data),
175 3 : });
176 :
177 21 : let response = tli.process_msg(&append_request).await?;
178 :
179 3 : let append_response = match response {
180 3 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
181 UBC 0 : _ => anyhow::bail!("not AppendResponse"),
182 : };
183 :
184 CBC 3 : Ok(InsertedWAL {
185 3 : begin_lsn,
186 3 : end_lsn,
187 3 : append_response,
188 3 : })
189 3 : }
|