Line data Source code
1 : #![allow(clippy::todo)]
2 :
3 : use std::ffi::CString;
4 :
5 : use crate::{
6 : api_bindings::{create_api, take_vec_u8, Level},
7 : bindings::{
8 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
9 : WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
10 : },
11 : };
12 : use postgres_ffi::WAL_SEGMENT_SIZE;
13 : use utils::{id::TenantTimelineId, lsn::Lsn};
14 :
15 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
16 : /// for simple cases, hence todo!() in default implementations.
17 : ///
18 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
19 : pub trait ApiImpl {
20 0 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
21 0 : todo!()
22 : }
23 :
24 0 : fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
25 0 : todo!()
26 : }
27 :
28 0 : fn get_flush_rec_ptr(&self) -> u64 {
29 0 : todo!()
30 : }
31 :
32 0 : fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
33 0 : todo!()
34 : }
35 :
36 0 : fn get_current_timestamp(&self) -> i64 {
37 0 : todo!()
38 : }
39 :
40 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
41 0 : todo!()
42 : }
43 :
44 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
45 0 : todo!()
46 : }
47 :
48 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
49 0 : todo!()
50 : }
51 :
52 0 : fn conn_connect_poll(
53 0 : &self,
54 0 : _sk: &mut Safekeeper,
55 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
56 0 : todo!()
57 : }
58 :
59 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
60 0 : todo!()
61 : }
62 :
63 0 : fn conn_get_query_result(
64 0 : &self,
65 0 : _sk: &mut Safekeeper,
66 0 : ) -> crate::bindings::WalProposerExecStatusType {
67 0 : todo!()
68 : }
69 :
70 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
71 0 : todo!()
72 : }
73 :
74 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
75 0 : todo!()
76 : }
77 :
78 0 : fn conn_async_read(
79 0 : &self,
80 0 : _sk: &mut Safekeeper,
81 0 : _vec: &mut Vec<u8>,
82 0 : ) -> crate::bindings::PGAsyncReadResult {
83 0 : todo!()
84 : }
85 :
86 0 : fn conn_async_write(
87 0 : &self,
88 0 : _sk: &mut Safekeeper,
89 0 : _buf: &[u8],
90 0 : ) -> crate::bindings::PGAsyncWriteResult {
91 0 : todo!()
92 : }
93 :
94 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
95 0 : todo!()
96 : }
97 :
98 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
99 0 : todo!()
100 : }
101 :
102 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
103 0 : todo!()
104 : }
105 :
106 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
107 0 : todo!()
108 : }
109 :
110 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
111 0 : todo!()
112 : }
113 :
114 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
115 0 : todo!()
116 : }
117 :
118 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
119 0 : todo!()
120 : }
121 :
122 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
123 0 : todo!()
124 : }
125 :
126 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
127 0 : todo!()
128 : }
129 :
130 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
131 0 : todo!()
132 : }
133 :
134 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
135 0 : todo!()
136 : }
137 :
138 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
139 0 : todo!()
140 : }
141 :
142 0 : fn get_redo_start_lsn(&self) -> u64 {
143 0 : todo!()
144 : }
145 :
146 0 : fn finish_sync_safekeepers(&self, _lsn: u64) {
147 0 : todo!()
148 : }
149 :
150 0 : fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
151 0 : todo!()
152 : }
153 :
154 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
155 0 : todo!()
156 : }
157 :
158 0 : fn after_election(&self, _wp: &mut WalProposer) {
159 0 : todo!()
160 : }
161 : }
162 :
163 : #[derive(Debug)]
164 : pub enum WaitResult {
165 : Latch,
166 : Timeout,
167 : Network(*mut Safekeeper, u32),
168 : }
169 :
170 : #[derive(Clone)]
171 : pub struct Config {
172 : /// Tenant and timeline id
173 : pub ttid: TenantTimelineId,
174 : /// List of safekeepers in format `host:port`
175 : pub safekeepers_list: Vec<String>,
176 : /// Safekeeper reconnect timeout in milliseconds
177 : pub safekeeper_reconnect_timeout: i32,
178 : /// Safekeeper connection timeout in milliseconds
179 : pub safekeeper_connection_timeout: i32,
180 : /// walproposer mode, finish when all safekeepers are synced or subscribe
181 : /// to WAL streaming
182 : pub sync_safekeepers: bool,
183 : }
184 :
185 : /// WalProposer main struct. C methods are reexported as Rust functions.
186 : pub struct Wrapper {
187 : wp: *mut WalProposer,
188 : _safekeepers_list_vec: Vec<u8>,
189 : }
190 :
191 : impl Wrapper {
192 36264 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
193 36264 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
194 36264 : .unwrap()
195 36264 : .into_raw();
196 36264 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
197 36264 : .unwrap()
198 36264 : .into_raw();
199 36264 :
200 36264 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
201 36264 : .unwrap()
202 36264 : .into_bytes_with_nul();
203 36264 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
204 36264 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
205 36264 :
206 36264 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
207 36264 :
208 36264 : let c_config = WalProposerConfig {
209 36264 : neon_tenant,
210 36264 : neon_timeline,
211 36264 : safekeepers_list,
212 36264 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
213 36264 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
214 36264 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
215 36264 : syncSafekeepers: config.sync_safekeepers,
216 36264 : systemId: 0,
217 36264 : pgTimeline: 1,
218 36264 : callback_data,
219 36264 : };
220 36264 : let c_config = Box::into_raw(Box::new(c_config));
221 36264 :
222 36264 : let api = create_api();
223 36264 : let wp = unsafe { WalProposerCreate(c_config, api) };
224 36264 : Wrapper {
225 36264 : wp,
226 36264 : _safekeepers_list_vec: safekeepers_list_vec,
227 36264 : }
228 36264 : }
229 :
230 36264 : pub fn start(&self) {
231 36264 : unsafe { WalProposerStart(self.wp) }
232 36264 : }
233 : }
234 :
235 : impl Drop for Wrapper {
236 36262 : fn drop(&mut self) {
237 36262 : unsafe {
238 36262 : let config = (*self.wp).config;
239 36262 : drop(Box::from_raw(
240 36262 : (*config).callback_data as *mut Box<dyn ApiImpl>,
241 36262 : ));
242 36262 : drop(CString::from_raw((*config).neon_tenant));
243 36262 : drop(CString::from_raw((*config).neon_timeline));
244 36262 : drop(Box::from_raw(config));
245 :
246 108784 : for i in 0..(*self.wp).n_safekeepers {
247 108784 : let sk = &mut (*self.wp).safekeeper[i as usize];
248 108784 : take_vec_u8(&mut sk.inbuf);
249 108784 : }
250 :
251 36262 : WalProposerFree(self.wp);
252 36262 : }
253 36262 : }
254 : }
255 :
256 : pub struct StreamingCallback {
257 : wp: *mut WalProposer,
258 : }
259 :
260 : impl StreamingCallback {
261 686 : pub fn new(wp: *mut WalProposer) -> StreamingCallback {
262 686 : StreamingCallback { wp }
263 686 : }
264 :
265 1533 : pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
266 1533 : unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
267 1533 : }
268 :
269 1533 : pub fn poll(&self) {
270 1533 : unsafe { WalProposerPoll(self.wp) }
271 1533 : }
272 : }
273 :
274 : #[cfg(test)]
275 : mod tests {
276 : use core::panic;
277 : use std::{
278 : cell::Cell,
279 : sync::{atomic::AtomicUsize, mpsc::sync_channel},
280 : };
281 :
282 : use std::cell::UnsafeCell;
283 : use utils::id::TenantTimelineId;
284 :
285 : use crate::{
286 : api_bindings::Level,
287 : bindings::{NeonWALReadResult, PG_VERSION_NUM},
288 : walproposer::Wrapper,
289 : };
290 :
291 : use super::ApiImpl;
292 :
293 : #[derive(Clone, Copy, Debug)]
294 : struct WaitEventsData {
295 : sk: *mut crate::bindings::Safekeeper,
296 : event_mask: u32,
297 : }
298 :
299 : struct MockImpl {
300 : // data to return from wait_event_set
301 : wait_events: Cell<WaitEventsData>,
302 : // walproposer->safekeeper messages
303 : expected_messages: Vec<Vec<u8>>,
304 : expected_ptr: AtomicUsize,
305 : // safekeeper->walproposer messages
306 : safekeeper_replies: Vec<Vec<u8>>,
307 : replies_ptr: AtomicUsize,
308 : // channel to send LSN to the main thread
309 : sync_channel: std::sync::mpsc::SyncSender<u64>,
310 : // Shmem state, used for storing donor info
311 : shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
312 : }
313 :
314 : impl MockImpl {
315 2 : fn check_walproposer_msg(&self, msg: &[u8]) {
316 2 : let ptr = self
317 2 : .expected_ptr
318 2 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
319 2 :
320 2 : if ptr >= self.expected_messages.len() {
321 0 : panic!("unexpected message from walproposer");
322 2 : }
323 2 :
324 2 : let expected_msg = &self.expected_messages[ptr];
325 2 : assert_eq!(msg, expected_msg.as_slice());
326 2 : }
327 :
328 2 : fn next_safekeeper_reply(&self) -> &[u8] {
329 2 : let ptr = self
330 2 : .replies_ptr
331 2 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
332 2 :
333 2 : if ptr >= self.safekeeper_replies.len() {
334 0 : panic!("no more safekeeper replies");
335 2 : }
336 2 :
337 2 : &self.safekeeper_replies[ptr]
338 2 : }
339 : }
340 :
341 : impl ApiImpl for MockImpl {
342 2 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
343 2 : self.shmem.get()
344 2 : }
345 :
346 14 : fn get_current_timestamp(&self) -> i64 {
347 14 : println!("get_current_timestamp");
348 14 : 0
349 14 : }
350 :
351 0 : fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
352 0 : let mut shmem = unsafe { *self.get_shmem_state() };
353 0 : shmem.propEpochStartLsn.value = donor_lsn;
354 0 : shmem.donor_conninfo = donor.conninfo;
355 0 : shmem.donor_lsn = donor_lsn;
356 0 : }
357 :
358 1 : fn conn_status(
359 1 : &self,
360 1 : _: &mut crate::bindings::Safekeeper,
361 1 : ) -> crate::bindings::WalProposerConnStatusType {
362 1 : println!("conn_status");
363 1 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
364 1 : }
365 :
366 1 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
367 1 : println!("conn_connect_start");
368 1 : }
369 :
370 1 : fn conn_connect_poll(
371 1 : &self,
372 1 : _: &mut crate::bindings::Safekeeper,
373 1 : ) -> crate::bindings::WalProposerConnectPollStatusType {
374 1 : println!("conn_connect_poll");
375 1 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
376 1 : }
377 :
378 1 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
379 1 : println!("conn_send_query: {}", query);
380 1 : true
381 1 : }
382 :
383 1 : fn conn_get_query_result(
384 1 : &self,
385 1 : _: &mut crate::bindings::Safekeeper,
386 1 : ) -> crate::bindings::WalProposerExecStatusType {
387 1 : println!("conn_get_query_result");
388 1 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
389 1 : }
390 :
391 2 : fn conn_async_read(
392 2 : &self,
393 2 : _: &mut crate::bindings::Safekeeper,
394 2 : vec: &mut Vec<u8>,
395 2 : ) -> crate::bindings::PGAsyncReadResult {
396 2 : println!("conn_async_read");
397 2 : let reply = self.next_safekeeper_reply();
398 2 : println!("conn_async_read result: {:?}", reply);
399 2 : vec.extend_from_slice(reply);
400 2 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
401 2 : }
402 :
403 2 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
404 2 : println!("conn_blocking_write: {:?}", buf);
405 2 : self.check_walproposer_msg(buf);
406 2 : true
407 2 : }
408 :
409 1 : fn recovery_download(
410 1 : &self,
411 1 : _wp: &mut crate::bindings::WalProposer,
412 1 : _sk: &mut crate::bindings::Safekeeper,
413 1 : ) -> bool {
414 1 : true
415 1 : }
416 :
417 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
418 0 : println!("wal_reader_allocate");
419 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
420 0 : }
421 :
422 1 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
423 1 : println!("init_event_set")
424 1 : }
425 :
426 4 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
427 4 : println!(
428 4 : "update_event_set, sk={:?}, events_mask={:#b}",
429 4 : sk as *mut crate::bindings::Safekeeper, event_mask
430 4 : );
431 4 : self.wait_events.set(WaitEventsData { sk, event_mask });
432 4 : }
433 :
434 2 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
435 2 : println!(
436 2 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
437 2 : sk as *mut crate::bindings::Safekeeper, event_mask
438 2 : );
439 2 : self.wait_events.set(WaitEventsData { sk, event_mask });
440 2 : }
441 :
442 1 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
443 1 : println!(
444 1 : "rm_safekeeper_event_set, sk={:?}",
445 1 : sk as *mut crate::bindings::Safekeeper
446 1 : );
447 1 : }
448 :
449 4 : fn wait_event_set(
450 4 : &self,
451 4 : _: &mut crate::bindings::WalProposer,
452 4 : timeout_millis: i64,
453 4 : ) -> super::WaitResult {
454 4 : let data = self.wait_events.get();
455 4 : println!(
456 4 : "wait_event_set, timeout_millis={}, res={:?}",
457 4 : timeout_millis, data
458 4 : );
459 4 : super::WaitResult::Network(data.sk, data.event_mask)
460 4 : }
461 :
462 1 : fn strong_random(&self, buf: &mut [u8]) -> bool {
463 1 : println!("strong_random");
464 1 : buf.fill(0);
465 1 : true
466 1 : }
467 :
468 1 : fn finish_sync_safekeepers(&self, lsn: u64) {
469 1 : self.sync_channel.send(lsn).unwrap();
470 1 : panic!("sync safekeepers finished at lsn={}", lsn);
471 : }
472 :
473 7 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
474 7 : println!("wp_log[{}] {}", level, msg);
475 7 : }
476 :
477 1 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
478 1 : println!("after_election");
479 1 : }
480 : }
481 :
482 : /// Test that walproposer can successfully connect to safekeeper and finish
483 : /// sync_safekeepers. API is mocked in MockImpl.
484 : ///
485 : /// Run this test with valgrind to detect leaks:
486 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
487 : #[test]
488 1 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
489 1 : let ttid = TenantTimelineId::new(
490 1 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
491 1 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
492 : );
493 :
494 1 : let (sender, receiver) = sync_channel(1);
495 1 :
496 1 : // Messages definitions are at walproposer.h
497 1 : // xxx: it would be better to extract them from safekeeper crate and
498 1 : // use serialization/deserialization here.
499 1 : let greeting_tag = (b'g' as u64).to_ne_bytes();
500 1 : let proto_version = 2_u32.to_ne_bytes();
501 1 : let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes();
502 1 : let proposer_id = [0; 16];
503 1 : let system_id = 0_u64.to_ne_bytes();
504 1 : let tenant_id = ttid.tenant_id.as_arr();
505 1 : let timeline_id = ttid.timeline_id.as_arr();
506 1 : let pg_tli = 1_u32.to_ne_bytes();
507 1 : let wal_seg_size = 16777216_u32.to_ne_bytes();
508 1 : let proposer_greeting = [
509 1 : greeting_tag.as_slice(),
510 1 : proto_version.as_slice(),
511 1 : pg_version.as_slice(),
512 1 : proposer_id.as_slice(),
513 1 : system_id.as_slice(),
514 1 : tenant_id.as_slice(),
515 1 : timeline_id.as_slice(),
516 1 : pg_tli.as_slice(),
517 1 : wal_seg_size.as_slice(),
518 1 : ]
519 1 : .concat();
520 1 :
521 1 : let voting_tag = (b'v' as u64).to_ne_bytes();
522 1 : let vote_request_term = 3_u64.to_ne_bytes();
523 1 : let proposer_id = [0; 16];
524 1 : let vote_request = [
525 1 : voting_tag.as_slice(),
526 1 : vote_request_term.as_slice(),
527 1 : proposer_id.as_slice(),
528 1 : ]
529 1 : .concat();
530 1 :
531 1 : let acceptor_greeting_term = 2_u64.to_ne_bytes();
532 1 : let acceptor_greeting_node_id = 1_u64.to_ne_bytes();
533 1 : let acceptor_greeting = [
534 1 : greeting_tag.as_slice(),
535 1 : acceptor_greeting_term.as_slice(),
536 1 : acceptor_greeting_node_id.as_slice(),
537 1 : ]
538 1 : .concat();
539 1 :
540 1 : let vote_response_term = 3_u64.to_ne_bytes();
541 1 : let vote_given = 1_u64.to_ne_bytes();
542 1 : let flush_lsn = 0x539_u64.to_ne_bytes();
543 1 : let truncate_lsn = 0x539_u64.to_ne_bytes();
544 1 : let th_len = 1_u32.to_ne_bytes();
545 1 : let th_term = 2_u64.to_ne_bytes();
546 1 : let th_lsn = 0x539_u64.to_ne_bytes();
547 1 : let timeline_start_lsn = 0x539_u64.to_ne_bytes();
548 1 : let vote_response = [
549 1 : voting_tag.as_slice(),
550 1 : vote_response_term.as_slice(),
551 1 : vote_given.as_slice(),
552 1 : flush_lsn.as_slice(),
553 1 : truncate_lsn.as_slice(),
554 1 : th_len.as_slice(),
555 1 : th_term.as_slice(),
556 1 : th_lsn.as_slice(),
557 1 : timeline_start_lsn.as_slice(),
558 1 : ]
559 1 : .concat();
560 1 :
561 1 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
562 1 : wait_events: Cell::new(WaitEventsData {
563 1 : sk: std::ptr::null_mut(),
564 1 : event_mask: 0,
565 1 : }),
566 1 : expected_messages: vec![proposer_greeting, vote_request],
567 1 : expected_ptr: AtomicUsize::new(0),
568 1 : safekeeper_replies: vec![acceptor_greeting, vote_response],
569 1 : replies_ptr: AtomicUsize::new(0),
570 1 : sync_channel: sender,
571 1 : shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
572 1 : });
573 1 : let config = crate::walproposer::Config {
574 1 : ttid,
575 1 : safekeepers_list: vec!["localhost:5000".to_string()],
576 1 : safekeeper_reconnect_timeout: 1000,
577 1 : safekeeper_connection_timeout: 10000,
578 1 : sync_safekeepers: true,
579 1 : };
580 1 :
581 1 : let wp = Wrapper::new(my_impl, config);
582 1 :
583 1 : // walproposer will panic when it finishes sync_safekeepers
584 1 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
585 1 : // validate the resulting LSN
586 1 : assert_eq!(receiver.try_recv(), Ok(1337));
587 1 : Ok(())
588 : // drop() will free up resources here
589 1 : }
590 : }
|