Line data Source code
1 : use std::ffi::CString;
2 :
3 : use postgres_ffi::WAL_SEGMENT_SIZE;
4 : use utils::id::TenantTimelineId;
5 :
6 : use crate::{
7 : api_bindings::{create_api, take_vec_u8, Level},
8 : bindings::{
9 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate,
10 : WalProposerFree, WalProposerStart,
11 : },
12 : };
13 :
14 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
15 : /// for simple cases, hence todo!() in default implementations.
16 : ///
17 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
18 : pub trait ApiImpl {
19 0 : fn get_shmem_state(&self) -> &mut crate::bindings::WalproposerShmemState {
20 0 : todo!()
21 0 : }
22 :
23 0 : fn start_streaming(&self, _startpos: u64) {
24 0 : todo!()
25 0 : }
26 :
27 0 : fn get_flush_rec_ptr(&self) -> u64 {
28 0 : todo!()
29 0 : }
30 :
31 0 : fn get_current_timestamp(&self) -> i64 {
32 0 : todo!()
33 0 : }
34 :
35 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
36 0 : todo!()
37 0 : }
38 :
39 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
40 0 : todo!()
41 0 : }
42 :
43 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
44 0 : todo!()
45 0 : }
46 :
47 0 : fn conn_connect_poll(
48 0 : &self,
49 0 : _sk: &mut Safekeeper,
50 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
51 0 : todo!()
52 0 : }
53 :
54 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
55 0 : todo!()
56 0 : }
57 :
58 0 : fn conn_get_query_result(
59 0 : &self,
60 0 : _sk: &mut Safekeeper,
61 0 : ) -> crate::bindings::WalProposerExecStatusType {
62 0 : todo!()
63 0 : }
64 :
65 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
66 0 : todo!()
67 0 : }
68 :
69 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
70 0 : todo!()
71 0 : }
72 :
73 0 : fn conn_async_read(&self, _sk: &mut Safekeeper) -> (&[u8], crate::bindings::PGAsyncReadResult) {
74 0 : todo!()
75 0 : }
76 :
77 0 : fn conn_async_write(
78 0 : &self,
79 0 : _sk: &mut Safekeeper,
80 0 : _buf: &[u8],
81 0 : ) -> crate::bindings::PGAsyncWriteResult {
82 0 : todo!()
83 0 : }
84 :
85 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
86 0 : todo!()
87 0 : }
88 :
89 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
90 0 : todo!()
91 0 : }
92 :
93 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
94 0 : todo!()
95 0 : }
96 :
97 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
98 0 : todo!()
99 0 : }
100 :
101 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
102 0 : todo!()
103 0 : }
104 :
105 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
106 0 : todo!()
107 0 : }
108 :
109 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
110 0 : todo!()
111 0 : }
112 :
113 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
114 0 : todo!()
115 0 : }
116 :
117 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
118 0 : todo!()
119 0 : }
120 :
121 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
122 0 : todo!()
123 0 : }
124 :
125 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
126 0 : todo!()
127 0 : }
128 :
129 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
130 0 : todo!()
131 0 : }
132 :
133 0 : fn get_redo_start_lsn(&self) -> u64 {
134 0 : todo!()
135 0 : }
136 :
137 0 : fn finish_sync_safekeepers(&self, _lsn: u64) {
138 0 : todo!()
139 0 : }
140 :
141 0 : fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
142 0 : todo!()
143 0 : }
144 :
145 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
146 0 : todo!()
147 0 : }
148 :
149 0 : fn after_election(&self, _wp: &mut WalProposer) {
150 0 : todo!()
151 0 : }
152 : }
153 :
154 : pub enum WaitResult {
155 : Latch,
156 : Timeout,
157 : Network(*mut Safekeeper, u32),
158 : }
159 :
160 : pub struct Config {
161 : /// Tenant and timeline id
162 : pub ttid: TenantTimelineId,
163 : /// List of safekeepers in format `host:port`
164 : pub safekeepers_list: Vec<String>,
165 : /// Safekeeper reconnect timeout in milliseconds
166 : pub safekeeper_reconnect_timeout: i32,
167 : /// Safekeeper connection timeout in milliseconds
168 : pub safekeeper_connection_timeout: i32,
169 : /// walproposer mode, finish when all safekeepers are synced or subscribe
170 : /// to WAL streaming
171 : pub sync_safekeepers: bool,
172 : }
173 :
174 : /// WalProposer main struct. C methods are reexported as Rust functions.
175 : pub struct Wrapper {
176 : wp: *mut WalProposer,
177 : _safekeepers_list_vec: Vec<u8>,
178 : }
179 :
180 : impl Wrapper {
181 2 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
182 2 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
183 2 : .unwrap()
184 2 : .into_raw();
185 2 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
186 2 : .unwrap()
187 2 : .into_raw();
188 2 :
189 2 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
190 2 : .unwrap()
191 2 : .into_bytes_with_nul();
192 2 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
193 2 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
194 2 :
195 2 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
196 2 :
197 2 : let c_config = WalProposerConfig {
198 2 : neon_tenant,
199 2 : neon_timeline,
200 2 : safekeepers_list,
201 2 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
202 2 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
203 2 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
204 2 : syncSafekeepers: config.sync_safekeepers,
205 2 : systemId: 0,
206 2 : pgTimeline: 1,
207 2 : callback_data,
208 2 : };
209 2 : let c_config = Box::into_raw(Box::new(c_config));
210 2 :
211 2 : let api = create_api();
212 2 : let wp = unsafe { WalProposerCreate(c_config, api) };
213 2 : Wrapper {
214 2 : wp,
215 2 : _safekeepers_list_vec: safekeepers_list_vec,
216 2 : }
217 2 : }
218 :
219 2 : pub fn start(&self) {
220 2 : unsafe { WalProposerStart(self.wp) }
221 2 : }
222 : }
223 :
224 : impl Drop for Wrapper {
225 2 : fn drop(&mut self) {
226 2 : unsafe {
227 2 : let config = (*self.wp).config;
228 2 : drop(Box::from_raw(
229 2 : (*config).callback_data as *mut Box<dyn ApiImpl>,
230 2 : ));
231 2 : drop(CString::from_raw((*config).neon_tenant));
232 2 : drop(CString::from_raw((*config).neon_timeline));
233 2 : drop(Box::from_raw(config));
234 :
235 2 : for i in 0..(*self.wp).n_safekeepers {
236 2 : let sk = &mut (*self.wp).safekeeper[i as usize];
237 2 : take_vec_u8(&mut sk.inbuf);
238 2 : }
239 :
240 2 : WalProposerFree(self.wp);
241 2 : }
242 2 : }
243 : }
244 :
245 : #[cfg(test)]
246 : mod tests {
247 : use core::panic;
248 : use std::{
249 : cell::Cell,
250 : sync::{atomic::AtomicUsize, mpsc::sync_channel},
251 : };
252 :
253 : use utils::id::TenantTimelineId;
254 :
255 : use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
256 :
257 : use super::ApiImpl;
258 :
259 8 : #[derive(Clone, Copy, Debug)]
260 : struct WaitEventsData {
261 : sk: *mut crate::bindings::Safekeeper,
262 : event_mask: u32,
263 : }
264 :
265 : struct MockImpl {
266 : // data to return from wait_event_set
267 : wait_events: Cell<WaitEventsData>,
268 : // walproposer->safekeeper messages
269 : expected_messages: Vec<Vec<u8>>,
270 : expected_ptr: AtomicUsize,
271 : // safekeeper->walproposer messages
272 : safekeeper_replies: Vec<Vec<u8>>,
273 : replies_ptr: AtomicUsize,
274 : // channel to send LSN to the main thread
275 : sync_channel: std::sync::mpsc::SyncSender<u64>,
276 : }
277 :
278 : impl MockImpl {
279 4 : fn check_walproposer_msg(&self, msg: &[u8]) {
280 4 : let ptr = self
281 4 : .expected_ptr
282 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
283 4 :
284 4 : if ptr >= self.expected_messages.len() {
285 0 : panic!("unexpected message from walproposer");
286 4 : }
287 4 :
288 4 : let expected_msg = &self.expected_messages[ptr];
289 4 : assert_eq!(msg, expected_msg.as_slice());
290 4 : }
291 :
292 4 : fn next_safekeeper_reply(&self) -> &[u8] {
293 4 : let ptr = self
294 4 : .replies_ptr
295 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
296 4 :
297 4 : if ptr >= self.safekeeper_replies.len() {
298 0 : panic!("no more safekeeper replies");
299 4 : }
300 4 :
301 4 : &self.safekeeper_replies[ptr]
302 4 : }
303 : }
304 :
305 : impl ApiImpl for MockImpl {
306 28 : fn get_current_timestamp(&self) -> i64 {
307 28 : println!("get_current_timestamp");
308 28 : 0
309 28 : }
310 :
311 2 : fn conn_status(
312 2 : &self,
313 2 : _: &mut crate::bindings::Safekeeper,
314 2 : ) -> crate::bindings::WalProposerConnStatusType {
315 2 : println!("conn_status");
316 2 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
317 2 : }
318 :
319 2 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
320 2 : println!("conn_connect_start");
321 2 : }
322 :
323 2 : fn conn_connect_poll(
324 2 : &self,
325 2 : _: &mut crate::bindings::Safekeeper,
326 2 : ) -> crate::bindings::WalProposerConnectPollStatusType {
327 2 : println!("conn_connect_poll");
328 2 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
329 2 : }
330 :
331 2 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
332 2 : println!("conn_send_query: {}", query);
333 2 : true
334 2 : }
335 :
336 2 : fn conn_get_query_result(
337 2 : &self,
338 2 : _: &mut crate::bindings::Safekeeper,
339 2 : ) -> crate::bindings::WalProposerExecStatusType {
340 2 : println!("conn_get_query_result");
341 2 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
342 2 : }
343 :
344 4 : fn conn_async_read(
345 4 : &self,
346 4 : _: &mut crate::bindings::Safekeeper,
347 4 : ) -> (&[u8], crate::bindings::PGAsyncReadResult) {
348 4 : println!("conn_async_read");
349 4 : let reply = self.next_safekeeper_reply();
350 4 : println!("conn_async_read result: {:?}", reply);
351 4 : (
352 4 : reply,
353 4 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS,
354 4 : )
355 4 : }
356 :
357 4 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
358 4 : println!("conn_blocking_write: {:?}", buf);
359 4 : self.check_walproposer_msg(buf);
360 4 : true
361 4 : }
362 :
363 2 : fn recovery_download(
364 2 : &self,
365 2 : _wp: &mut crate::bindings::WalProposer,
366 2 : _sk: &mut crate::bindings::Safekeeper,
367 2 : ) -> bool {
368 2 : true
369 2 : }
370 :
371 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
372 0 : println!("wal_reader_allocate");
373 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
374 0 : }
375 :
376 2 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
377 2 : println!("init_event_set")
378 2 : }
379 :
380 8 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
381 8 : println!(
382 8 : "update_event_set, sk={:?}, events_mask={:#b}",
383 8 : sk as *mut crate::bindings::Safekeeper, event_mask
384 8 : );
385 8 : self.wait_events.set(WaitEventsData { sk, event_mask });
386 8 : }
387 :
388 4 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
389 4 : println!(
390 4 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
391 4 : sk as *mut crate::bindings::Safekeeper, event_mask
392 4 : );
393 4 : self.wait_events.set(WaitEventsData { sk, event_mask });
394 4 : }
395 :
396 2 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
397 2 : println!(
398 2 : "rm_safekeeper_event_set, sk={:?}",
399 2 : sk as *mut crate::bindings::Safekeeper
400 2 : );
401 2 : }
402 :
403 8 : fn wait_event_set(
404 8 : &self,
405 8 : _: &mut crate::bindings::WalProposer,
406 8 : timeout_millis: i64,
407 8 : ) -> super::WaitResult {
408 8 : let data = self.wait_events.get();
409 8 : println!(
410 8 : "wait_event_set, timeout_millis={}, res={:?}",
411 8 : timeout_millis, data
412 8 : );
413 8 : super::WaitResult::Network(data.sk, data.event_mask)
414 8 : }
415 :
416 2 : fn strong_random(&self, buf: &mut [u8]) -> bool {
417 2 : println!("strong_random");
418 2 : buf.fill(0);
419 2 : true
420 2 : }
421 :
422 2 : fn finish_sync_safekeepers(&self, lsn: u64) {
423 2 : self.sync_channel.send(lsn).unwrap();
424 2 : panic!("sync safekeepers finished at lsn={}", lsn);
425 : }
426 :
427 14 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
428 14 : println!("wp_log[{}] {}", level, msg);
429 14 : }
430 :
431 0 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
432 0 : println!("after_election");
433 0 : }
434 : }
435 :
436 : /// Test that walproposer can successfully connect to safekeeper and finish
437 : /// sync_safekeepers. API is mocked in MockImpl.
438 : ///
439 : /// Run this test with valgrind to detect leaks:
440 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
441 2 : #[test]
442 2 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
443 2 : let ttid = TenantTimelineId::new(
444 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
445 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
446 : );
447 :
448 2 : let (sender, receiver) = sync_channel(1);
449 2 :
450 2 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
451 2 : wait_events: Cell::new(WaitEventsData {
452 2 : sk: std::ptr::null_mut(),
453 2 : event_mask: 0,
454 2 : }),
455 2 : expected_messages: vec![
456 2 : // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
457 2 : vec![
458 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
459 2 : 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
460 2 : 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
461 2 : 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
462 2 : ],
463 2 : // VoteRequest(VoteRequest { term: 3 })
464 2 : vec![
465 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
466 2 : 0, 0, 0, 0, 0, 0,
467 2 : ],
468 2 : ],
469 2 : expected_ptr: AtomicUsize::new(0),
470 2 : safekeeper_replies: vec![
471 2 : // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
472 2 : vec![
473 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
474 2 : ],
475 2 : // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
476 2 : vec![
477 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
478 2 : 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
479 2 : 0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
480 2 : ],
481 2 : ],
482 2 : replies_ptr: AtomicUsize::new(0),
483 2 : sync_channel: sender,
484 2 : });
485 2 : let config = crate::walproposer::Config {
486 2 : ttid,
487 2 : safekeepers_list: vec!["localhost:5000".to_string()],
488 2 : safekeeper_reconnect_timeout: 1000,
489 2 : safekeeper_connection_timeout: 10000,
490 2 : sync_safekeepers: true,
491 2 : };
492 2 :
493 2 : let wp = Wrapper::new(my_impl, config);
494 2 :
495 2 : // walproposer will panic when it finishes sync_safekeepers
496 2 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
497 2 : // validate the resulting LSN
498 2 : assert_eq!(receiver.try_recv(), Ok(1337));
499 2 : Ok(())
500 : // drop() will free up resources here
501 2 : }
502 : }
|