Line data Source code
1 : //!
2 : //! This module implements JSON_CTRL protocol, which allows exchange
3 : //! JSON messages over psql for testing purposes.
4 : //!
5 : //! Currently supports AppendLogicalMessage, which is used for WAL
6 : //! modifications in tests.
7 : //!
8 :
9 : use std::sync::Arc;
10 :
11 : use anyhow::Context;
12 : use bytes::Bytes;
13 : use postgres_backend::QueryError;
14 : use serde::{Deserialize, Serialize};
15 : use tokio::io::{AsyncRead, AsyncWrite};
16 : use tracing::*;
17 : use utils::id::TenantTimelineId;
18 :
19 : use crate::handler::SafekeeperPostgresHandler;
20 : use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
21 : use crate::safekeeper::{
22 : AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
23 : };
24 : use crate::safekeeper::{Term, TermHistory, TermLsn};
25 : use crate::state::TimelinePersistentState;
26 : use crate::timeline::Timeline;
27 : use crate::GlobalTimelines;
28 : use postgres_backend::PostgresBackend;
29 : use postgres_ffi::encode_logical_message;
30 : use postgres_ffi::WAL_SEGMENT_SIZE;
31 : use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
32 : use utils::lsn::Lsn;
33 :
34 66 : #[derive(Serialize, Deserialize, Debug)]
35 : pub struct AppendLogicalMessage {
36 : // prefix and message to build LogicalMessage
37 : pub lm_prefix: String,
38 : pub lm_message: String,
39 :
40 : // if true, commit_lsn will match flush_lsn after append
41 : pub set_commit_lsn: bool,
42 :
43 : // if true, ProposerElected will be sent before append
44 : pub send_proposer_elected: bool,
45 :
46 : // fields from AppendRequestHeader
47 : pub term: Term,
48 : #[serde(with = "utils::lsn::serde_as_u64")]
49 : pub epoch_start_lsn: Lsn,
50 : #[serde(with = "utils::lsn::serde_as_u64")]
51 : pub begin_lsn: Lsn,
52 : #[serde(with = "utils::lsn::serde_as_u64")]
53 : pub truncate_lsn: Lsn,
54 : pub pg_version: u32,
55 : }
56 :
57 3 : #[derive(Debug, Serialize)]
58 : struct AppendResult {
59 : // safekeeper state after append
60 : state: TimelinePersistentState,
61 : // info about new record in the WAL
62 : inserted_wal: InsertedWAL,
63 : }
64 :
65 : /// Handles command to craft logical message WAL record with given
66 : /// content, and then append it with specified term and lsn. This
67 : /// function is used to test safekeepers in different scenarios.
68 3 : pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
69 3 : spg: &SafekeeperPostgresHandler,
70 3 : pgb: &mut PostgresBackend<IO>,
71 3 : append_request: &AppendLogicalMessage,
72 3 : ) -> Result<(), QueryError> {
73 3 : info!("JSON_CTRL request: {append_request:?}");
74 :
75 : // need to init safekeeper state before AppendRequest
76 17 : let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?;
77 :
78 : // if send_proposer_elected is true, we need to update local history
79 3 : if append_request.send_proposer_elected {
80 9744 : send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
81 0 : }
82 :
83 12 : let inserted_wal = append_logical_message(&tli, append_request).await?;
84 3 : let response = AppendResult {
85 3 : state: tli.get_state().await.1,
86 3 : inserted_wal,
87 : };
88 3 : let response_data = serde_json::to_vec(&response)
89 3 : .with_context(|| format!("Response {response:?} is not a json array"))?;
90 :
91 3 : pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
92 3 : name: b"json",
93 3 : typoid: TEXT_OID,
94 3 : typlen: -1,
95 3 : ..Default::default()
96 3 : }]))?
97 3 : .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
98 3 : .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
99 3 : Ok(())
100 3 : }
101 :
102 : /// Prepare safekeeper to process append requests without crashes,
103 : /// by sending ProposerGreeting with default server.wal_seg_size.
104 3 : async fn prepare_safekeeper(
105 3 : ttid: TenantTimelineId,
106 3 : pg_version: u32,
107 3 : ) -> anyhow::Result<Arc<Timeline>> {
108 3 : GlobalTimelines::create(
109 3 : ttid,
110 3 : ServerInfo {
111 3 : pg_version,
112 3 : wal_seg_size: WAL_SEGMENT_SIZE as u32,
113 3 : system_id: 0,
114 3 : },
115 3 : Lsn::INVALID,
116 3 : Lsn::INVALID,
117 3 : )
118 17 : .await
119 3 : }
120 :
121 3 : async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
122 : // add new term to existing history
123 3 : let history = tli.get_state().await.1.acceptor_state.term_history;
124 3 : let history = history.up_to(lsn.checked_sub(1u64).unwrap());
125 3 : let mut history_entries = history.0;
126 3 : history_entries.push(TermLsn { term, lsn });
127 3 : let history = TermHistory(history_entries);
128 3 :
129 3 : let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
130 3 : term,
131 3 : start_streaming_at: lsn,
132 3 : term_history: history,
133 3 : timeline_start_lsn: lsn,
134 3 : });
135 3 :
136 9744 : tli.process_msg(&proposer_elected_request).await?;
137 3 : Ok(())
138 3 : }
139 :
140 3 : #[derive(Debug, Serialize)]
141 : pub struct InsertedWAL {
142 : begin_lsn: Lsn,
143 : pub end_lsn: Lsn,
144 : append_response: AppendResponse,
145 : }
146 :
147 : /// Extend local WAL with new LogicalMessage record. To do that,
148 : /// create AppendRequest with new WAL and pass it to safekeeper.
149 3 : pub async fn append_logical_message(
150 3 : tli: &Arc<Timeline>,
151 3 : msg: &AppendLogicalMessage,
152 3 : ) -> anyhow::Result<InsertedWAL> {
153 3 : let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
154 3 : let sk_state = tli.get_state().await.1;
155 :
156 3 : let begin_lsn = msg.begin_lsn;
157 3 : let end_lsn = begin_lsn + wal_data.len() as u64;
158 :
159 3 : let commit_lsn = if msg.set_commit_lsn {
160 3 : end_lsn
161 : } else {
162 0 : sk_state.commit_lsn
163 : };
164 :
165 3 : let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
166 3 : h: AppendRequestHeader {
167 3 : term: msg.term,
168 3 : epoch_start_lsn: begin_lsn,
169 3 : begin_lsn,
170 3 : end_lsn,
171 3 : commit_lsn,
172 3 : truncate_lsn: msg.truncate_lsn,
173 3 : proposer_uuid: [0u8; 16],
174 3 : },
175 3 : wal_data: Bytes::from(wal_data),
176 3 : });
177 :
178 12 : let response = tli.process_msg(&append_request).await?;
179 :
180 3 : let append_response = match response {
181 3 : Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
182 0 : _ => anyhow::bail!("not AppendResponse"),
183 : };
184 :
185 3 : Ok(InsertedWAL {
186 3 : begin_lsn,
187 3 : end_lsn,
188 3 : append_response,
189 3 : })
190 3 : }
|