TLA Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use std::sync::Arc;
10 :
11 : use anyhow::Context;
12 : use bytes::Bytes;
13 : use postgres_backend::QueryError;
14 : use serde::{Deserialize, Serialize};
15 : use tokio::io::{AsyncRead, AsyncWrite};
16 : use tracing::*;
17 : use utils::id::TenantTimelineId;
18 :
19 : use crate::handler::SafekeeperPostgresHandler;
20 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
21 : use crate::safekeeper::{
22 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
23 : };
24 : use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
25 : use crate::timeline::Timeline;
26 : use crate::GlobalTimelines;
27 : use postgres_backend::PostgresBackend;
28 : use postgres_ffi::encode_logical_message;
29 : use postgres_ffi::WAL_SEGMENT_SIZE;
30 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
31 : use utils::lsn::Lsn;
32 :
33 CBC 57 : #[derive(Serialize, Deserialize, Debug)]
34 : pub struct AppendLogicalMessage {
35 : // prefix and message to build LogicalMessage
36 : pub lm_prefix: String,
37 : pub lm_message: String,
38 :
39 : // if true, commit_lsn will match flush_lsn after append
40 : pub set_commit_lsn: bool,
41 :
42 : // if true, ProposerElected will be sent before append
43 : pub send_proposer_elected: bool,
44 :
45 : // fields from AppendRequestHeader
46 : pub term: Term,
47 : pub epoch_start_lsn: Lsn,
48 : pub begin_lsn: Lsn,
49 : pub truncate_lsn: Lsn,
50 : pub pg_version: u32,
51 : }
52 :
53 3 : #[derive(Debug, Serialize)]
54 : struct AppendResult {
55 : // safekeeper state after append
56 : state: SafeKeeperState,
57 : // info about new record in the WAL
58 : inserted_wal: InsertedWAL,
59 : }
60 :
61 : /// Handles command to craft logical message WAL record with given
62 : /// content, and then append it with specified term and lsn. This
63 : /// function is used to test safekeepers in different scenarios.
64 3 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
65 3 : spg: &SafekeeperPostgresHandler,
66 3 : pgb: &mut PostgresBackend<IO>,
67 3 : append_request: &AppendLogicalMessage,
68 3 : ) -> Result<(), QueryError> {
69 3 : info!("JSON_CTRL request: {append_request:?}");
70 :
71 : // need to init safekeeper state before AppendRequest
72 17 : let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
73 :
74 : // if send_proposer_elected is true, we need to update local history
75 3 : if append_request.send_proposer_elected {
76 9678 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
77 UBC 0 : }
78 :
79 CBC 18 : let inserted_wal = append_logical_message(&tli, append_request).await?;
80 3 : let response = AppendResult {
81 3 : state: tli.get_state().await.1,
82 3 : inserted_wal,
83 : };
84 3 : let response_data = serde_json::to_vec(&response)
85 3 : .with_context(|| format!("Response {response:?} is not a json array"))?;
86 :
87 3 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
88 3 : name: b"json",
89 3 : typoid: TEXT_OID,
90 3 : typlen: -1,
91 3 : ..Default::default()
92 3 : }]))?
93 3 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
94 3 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
95 3 : Ok(())
96 3 : }
97 :
98 : /// Prepare safekeeper to process append requests without crashes,
99 : /// by sending ProposerGreeting with default server.wal_seg_size.
100 3 : async fn prepare_safekeeper(
101 3 : ttid: TenantTimelineId,
102 3 : pg_version: u32,
103 3 : ) -> anyhow::Result<Arc<Timeline>> {
104 3 : GlobalTimelines::create(
105 3 : ttid,
106 3 : ServerInfo {
107 3 : pg_version,
108 3 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
109 3 : system_id: 0,
110 3 : },
111 3 : Lsn::INVALID,
112 3 : Lsn::INVALID,
113 3 : )
114 17 : .await
115 3 : }
116 :
117 3 : async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
118 : // add new term to existing history
119 3 : let history = tli.get_state().await.1.acceptor_state.term_history;
120 3 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
121 3 : let mut history_entries = history.0;
122 3 : history_entries.push(TermLsn { term, lsn });
123 3 : let history = TermHistory(history_entries);
124 3 :
125 3 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
126 3 : term,
127 3 : start_streaming_at: lsn,
128 3 : term_history: history,
129 3 : timeline_start_lsn: lsn,
130 3 : });
131 3 :
132 9678 : tli.process_msg(&proposer_elected_request).await?;
133 3 : Ok(())
134 3 : }
135 :
136 3 : #[derive(Debug, Serialize)]
137 : pub struct InsertedWAL {
138 : begin_lsn: Lsn,
139 : pub end_lsn: Lsn,
140 : append_response: AppendResponse,
141 : }
142 :
143 : /// Extend local WAL with new LogicalMessage record. To do that,
144 : /// create AppendRequest with new WAL and pass it to safekeeper.
145 3 : pub async fn append_logical_message(
146 3 : tli: &Arc<Timeline>,
147 3 : msg: &AppendLogicalMessage,
148 3 : ) -> anyhow::Result<InsertedWAL> {
149 3 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
150 3 : let sk_state = tli.get_state().await.1;
151 :
152 3 : let begin_lsn = msg.begin_lsn;
153 3 : let end_lsn = begin_lsn + wal_data.len() as u64;
154 :
155 3 : let commit_lsn = if msg.set_commit_lsn {
156 3 : end_lsn
157 : } else {
158 UBC 0 : sk_state.commit_lsn
159 : };
160 :
161 CBC 3 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
162 3 : h: AppendRequestHeader {
163 3 : term: msg.term,
164 3 : epoch_start_lsn: begin_lsn,
165 3 : begin_lsn,
166 3 : end_lsn,
167 3 : commit_lsn,
168 3 : truncate_lsn: msg.truncate_lsn,
169 3 : proposer_uuid: [0u8; 16],
170 3 : },
171 3 : wal_data: Bytes::from(wal_data),
172 3 : });
173 :
174 18 : let response = tli.process_msg(&append_request).await?;
175 :
176 3 : let append_response = match response {
177 3 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
178 UBC 0 : _ => anyhow::bail!("not AppendResponse"),
179 : };
180 :
181 CBC 3 : Ok(InsertedWAL {
182 3 : begin_lsn,
183 3 : end_lsn,
184 3 : append_response,
185 3 : })
186 3 : }
|