Line data Source code
1 : #![allow(clippy::todo)]
2 :
3 : use std::ffi::CString;
4 :
5 : use crate::{
6 : api_bindings::{create_api, take_vec_u8, Level},
7 : bindings::{
8 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
9 : WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
10 : },
11 : };
12 : use postgres_ffi::WAL_SEGMENT_SIZE;
13 : use utils::{id::TenantTimelineId, lsn::Lsn};
14 :
15 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
16 : /// for simple cases, hence todo!() in default implementations.
17 : ///
18 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
19 : pub trait ApiImpl {
20 0 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
21 0 : todo!()
22 : }
23 :
24 0 : fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
25 0 : todo!()
26 : }
27 :
28 0 : fn get_flush_rec_ptr(&self) -> u64 {
29 0 : todo!()
30 : }
31 :
32 0 : fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
33 0 : todo!()
34 : }
35 :
36 0 : fn get_current_timestamp(&self) -> i64 {
37 0 : todo!()
38 : }
39 :
40 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
41 0 : todo!()
42 : }
43 :
44 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
45 0 : todo!()
46 : }
47 :
48 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
49 0 : todo!()
50 : }
51 :
52 0 : fn conn_connect_poll(
53 0 : &self,
54 0 : _sk: &mut Safekeeper,
55 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
56 0 : todo!()
57 : }
58 :
59 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
60 0 : todo!()
61 : }
62 :
63 0 : fn conn_get_query_result(
64 0 : &self,
65 0 : _sk: &mut Safekeeper,
66 0 : ) -> crate::bindings::WalProposerExecStatusType {
67 0 : todo!()
68 : }
69 :
70 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
71 0 : todo!()
72 : }
73 :
74 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
75 0 : todo!()
76 : }
77 :
78 0 : fn conn_async_read(
79 0 : &self,
80 0 : _sk: &mut Safekeeper,
81 0 : _vec: &mut Vec<u8>,
82 0 : ) -> crate::bindings::PGAsyncReadResult {
83 0 : todo!()
84 : }
85 :
86 0 : fn conn_async_write(
87 0 : &self,
88 0 : _sk: &mut Safekeeper,
89 0 : _buf: &[u8],
90 0 : ) -> crate::bindings::PGAsyncWriteResult {
91 0 : todo!()
92 : }
93 :
94 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
95 0 : todo!()
96 : }
97 :
98 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
99 0 : todo!()
100 : }
101 :
102 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
103 0 : todo!()
104 : }
105 :
106 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
107 0 : todo!()
108 : }
109 :
110 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
111 0 : todo!()
112 : }
113 :
114 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
115 0 : todo!()
116 : }
117 :
118 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
119 0 : todo!()
120 : }
121 :
122 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
123 0 : todo!()
124 : }
125 :
126 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
127 0 : todo!()
128 : }
129 :
130 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
131 0 : todo!()
132 : }
133 :
134 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
135 0 : todo!()
136 : }
137 :
138 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
139 0 : todo!()
140 : }
141 :
142 0 : fn get_redo_start_lsn(&self) -> u64 {
143 0 : todo!()
144 : }
145 :
146 0 : fn finish_sync_safekeepers(&self, _lsn: u64) {
147 0 : todo!()
148 : }
149 :
150 0 : fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
151 0 : todo!()
152 : }
153 :
154 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
155 0 : todo!()
156 : }
157 :
158 0 : fn after_election(&self, _wp: &mut WalProposer) {
159 0 : todo!()
160 : }
161 : }
162 :
163 : #[derive(Debug)]
164 : pub enum WaitResult {
165 : Latch,
166 : Timeout,
167 : Network(*mut Safekeeper, u32),
168 : }
169 :
170 : #[derive(Clone)]
171 : pub struct Config {
172 : /// Tenant and timeline id
173 : pub ttid: TenantTimelineId,
174 : /// List of safekeepers in format `host:port`
175 : pub safekeepers_list: Vec<String>,
176 : /// Safekeeper reconnect timeout in milliseconds
177 : pub safekeeper_reconnect_timeout: i32,
178 : /// Safekeeper connection timeout in milliseconds
179 : pub safekeeper_connection_timeout: i32,
180 : /// walproposer mode, finish when all safekeepers are synced or subscribe
181 : /// to WAL streaming
182 : pub sync_safekeepers: bool,
183 : }
184 :
185 : /// WalProposer main struct. C methods are reexported as Rust functions.
186 : pub struct Wrapper {
187 : wp: *mut WalProposer,
188 : _safekeepers_list_vec: Vec<u8>,
189 : }
190 :
191 : impl Wrapper {
192 216624 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
193 216624 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
194 216624 : .unwrap()
195 216624 : .into_raw();
196 216624 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
197 216624 : .unwrap()
198 216624 : .into_raw();
199 216624 :
200 216624 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
201 216624 : .unwrap()
202 216624 : .into_bytes_with_nul();
203 216624 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
204 216624 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
205 216624 :
206 216624 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
207 216624 :
208 216624 : let c_config = WalProposerConfig {
209 216624 : neon_tenant,
210 216624 : neon_timeline,
211 216624 : safekeepers_list,
212 216624 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
213 216624 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
214 216624 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
215 216624 : syncSafekeepers: config.sync_safekeepers,
216 216624 : systemId: 0,
217 216624 : pgTimeline: 1,
218 216624 : callback_data,
219 216624 : };
220 216624 : let c_config = Box::into_raw(Box::new(c_config));
221 216624 :
222 216624 : let api = create_api();
223 216624 : let wp = unsafe { WalProposerCreate(c_config, api) };
224 216624 : Wrapper {
225 216624 : wp,
226 216624 : _safekeepers_list_vec: safekeepers_list_vec,
227 216624 : }
228 216624 : }
229 :
230 216624 : pub fn start(&self) {
231 216624 : unsafe { WalProposerStart(self.wp) }
232 216624 : }
233 : }
234 :
235 : impl Drop for Wrapper {
236 216612 : fn drop(&mut self) {
237 216612 : unsafe {
238 216612 : let config = (*self.wp).config;
239 216612 : drop(Box::from_raw(
240 216612 : (*config).callback_data as *mut Box<dyn ApiImpl>,
241 216612 : ));
242 216612 : drop(CString::from_raw((*config).neon_tenant));
243 216612 : drop(CString::from_raw((*config).neon_timeline));
244 216612 : drop(Box::from_raw(config));
245 :
246 649824 : for i in 0..(*self.wp).n_safekeepers {
247 649824 : let sk = &mut (*self.wp).safekeeper[i as usize];
248 649824 : take_vec_u8(&mut sk.inbuf);
249 649824 : }
250 :
251 216612 : WalProposerFree(self.wp);
252 216612 : }
253 216612 : }
254 : }
255 :
256 : pub struct StreamingCallback {
257 : wp: *mut WalProposer,
258 : }
259 :
260 : impl StreamingCallback {
261 3838 : pub fn new(wp: *mut WalProposer) -> StreamingCallback {
262 3838 : StreamingCallback { wp }
263 3838 : }
264 :
265 8816 : pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
266 8816 : unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
267 8816 : }
268 :
269 8816 : pub fn poll(&self) {
270 8816 : unsafe { WalProposerPoll(self.wp) }
271 8816 : }
272 : }
273 :
274 : #[cfg(test)]
275 : mod tests {
276 : use core::panic;
277 : use std::{
278 : cell::Cell,
279 : sync::{atomic::AtomicUsize, mpsc::sync_channel},
280 : };
281 :
282 : use std::cell::UnsafeCell;
283 : use utils::id::TenantTimelineId;
284 :
285 : use crate::{
286 : api_bindings::Level,
287 : bindings::{NeonWALReadResult, PG_VERSION_NUM},
288 : walproposer::Wrapper,
289 : };
290 :
291 : use super::ApiImpl;
292 :
293 : #[derive(Clone, Copy, Debug)]
294 : struct WaitEventsData {
295 : sk: *mut crate::bindings::Safekeeper,
296 : event_mask: u32,
297 : }
298 :
299 : struct MockImpl {
300 : // data to return from wait_event_set
301 : wait_events: Cell<WaitEventsData>,
302 : // walproposer->safekeeper messages
303 : expected_messages: Vec<Vec<u8>>,
304 : expected_ptr: AtomicUsize,
305 : // safekeeper->walproposer messages
306 : safekeeper_replies: Vec<Vec<u8>>,
307 : replies_ptr: AtomicUsize,
308 : // channel to send LSN to the main thread
309 : sync_channel: std::sync::mpsc::SyncSender<u64>,
310 : // Shmem state, used for storing donor info
311 : shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
312 : }
313 :
314 : impl MockImpl {
315 12 : fn check_walproposer_msg(&self, msg: &[u8]) {
316 12 : let ptr = self
317 12 : .expected_ptr
318 12 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
319 12 :
320 12 : if ptr >= self.expected_messages.len() {
321 0 : panic!("unexpected message from walproposer");
322 12 : }
323 12 :
324 12 : let expected_msg = &self.expected_messages[ptr];
325 12 : assert_eq!(msg, expected_msg.as_slice());
326 12 : }
327 :
328 12 : fn next_safekeeper_reply(&self) -> &[u8] {
329 12 : let ptr = self
330 12 : .replies_ptr
331 12 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
332 12 :
333 12 : if ptr >= self.safekeeper_replies.len() {
334 0 : panic!("no more safekeeper replies");
335 12 : }
336 12 :
337 12 : &self.safekeeper_replies[ptr]
338 12 : }
339 : }
340 :
341 : impl ApiImpl for MockImpl {
342 12 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
343 12 : self.shmem.get()
344 12 : }
345 :
346 84 : fn get_current_timestamp(&self) -> i64 {
347 84 : println!("get_current_timestamp");
348 84 : 0
349 84 : }
350 :
351 0 : fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
352 0 : let mut shmem = unsafe { *self.get_shmem_state() };
353 0 : shmem.propEpochStartLsn.value = donor_lsn;
354 0 : shmem.donor_conninfo = donor.conninfo;
355 0 : shmem.donor_lsn = donor_lsn;
356 0 : }
357 :
358 6 : fn conn_status(
359 6 : &self,
360 6 : _: &mut crate::bindings::Safekeeper,
361 6 : ) -> crate::bindings::WalProposerConnStatusType {
362 6 : println!("conn_status");
363 6 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
364 6 : }
365 :
366 6 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
367 6 : println!("conn_connect_start");
368 6 : }
369 :
370 6 : fn conn_connect_poll(
371 6 : &self,
372 6 : _: &mut crate::bindings::Safekeeper,
373 6 : ) -> crate::bindings::WalProposerConnectPollStatusType {
374 6 : println!("conn_connect_poll");
375 6 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
376 6 : }
377 :
378 6 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
379 6 : println!("conn_send_query: {}", query);
380 6 : true
381 6 : }
382 :
383 6 : fn conn_get_query_result(
384 6 : &self,
385 6 : _: &mut crate::bindings::Safekeeper,
386 6 : ) -> crate::bindings::WalProposerExecStatusType {
387 6 : println!("conn_get_query_result");
388 6 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
389 6 : }
390 :
391 12 : fn conn_async_read(
392 12 : &self,
393 12 : _: &mut crate::bindings::Safekeeper,
394 12 : vec: &mut Vec<u8>,
395 12 : ) -> crate::bindings::PGAsyncReadResult {
396 12 : println!("conn_async_read");
397 12 : let reply = self.next_safekeeper_reply();
398 12 : println!("conn_async_read result: {:?}", reply);
399 12 : vec.extend_from_slice(reply);
400 12 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
401 12 : }
402 :
403 12 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
404 12 : println!("conn_blocking_write: {:?}", buf);
405 12 : self.check_walproposer_msg(buf);
406 12 : true
407 12 : }
408 :
409 6 : fn recovery_download(
410 6 : &self,
411 6 : _wp: &mut crate::bindings::WalProposer,
412 6 : _sk: &mut crate::bindings::Safekeeper,
413 6 : ) -> bool {
414 6 : true
415 6 : }
416 :
417 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
418 0 : println!("wal_reader_allocate");
419 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
420 0 : }
421 :
422 6 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
423 6 : println!("init_event_set")
424 6 : }
425 :
426 24 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
427 24 : println!(
428 24 : "update_event_set, sk={:?}, events_mask={:#b}",
429 24 : sk as *mut crate::bindings::Safekeeper, event_mask
430 24 : );
431 24 : self.wait_events.set(WaitEventsData { sk, event_mask });
432 24 : }
433 :
434 12 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
435 12 : println!(
436 12 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
437 12 : sk as *mut crate::bindings::Safekeeper, event_mask
438 12 : );
439 12 : self.wait_events.set(WaitEventsData { sk, event_mask });
440 12 : }
441 :
442 6 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
443 6 : println!(
444 6 : "rm_safekeeper_event_set, sk={:?}",
445 6 : sk as *mut crate::bindings::Safekeeper
446 6 : );
447 6 : }
448 :
449 24 : fn wait_event_set(
450 24 : &self,
451 24 : _: &mut crate::bindings::WalProposer,
452 24 : timeout_millis: i64,
453 24 : ) -> super::WaitResult {
454 24 : let data = self.wait_events.get();
455 24 : println!(
456 24 : "wait_event_set, timeout_millis={}, res={:?}",
457 24 : timeout_millis, data
458 24 : );
459 24 : super::WaitResult::Network(data.sk, data.event_mask)
460 24 : }
461 :
462 6 : fn strong_random(&self, buf: &mut [u8]) -> bool {
463 6 : println!("strong_random");
464 6 : buf.fill(0);
465 6 : true
466 6 : }
467 :
468 6 : fn finish_sync_safekeepers(&self, lsn: u64) {
469 6 : self.sync_channel.send(lsn).unwrap();
470 6 : panic!("sync safekeepers finished at lsn={}", lsn);
471 : }
472 :
473 42 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
474 42 : println!("wp_log[{}] {}", level, msg);
475 42 : }
476 :
477 6 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
478 6 : println!("after_election");
479 6 : }
480 : }
481 :
482 : /// Test that walproposer can successfully connect to safekeeper and finish
483 : /// sync_safekeepers. API is mocked in MockImpl.
484 : ///
485 : /// Run this test with valgrind to detect leaks:
486 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
487 : #[test]
488 6 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
489 6 : let ttid = TenantTimelineId::new(
490 6 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
491 6 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
492 : );
493 :
494 6 : let (sender, receiver) = sync_channel(1);
495 6 :
496 6 : // Messages definitions are at walproposer.h
497 6 : // xxx: it would be better to extract them from safekeeper crate and
498 6 : // use serialization/deserialization here.
499 6 : let greeting_tag = (b'g' as u64).to_ne_bytes();
500 6 : let proto_version = 2_u32.to_ne_bytes();
501 6 : let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes();
502 6 : let proposer_id = [0; 16];
503 6 : let system_id = 0_u64.to_ne_bytes();
504 6 : let tenant_id = ttid.tenant_id.as_arr();
505 6 : let timeline_id = ttid.timeline_id.as_arr();
506 6 : let pg_tli = 1_u32.to_ne_bytes();
507 6 : let wal_seg_size = 16777216_u32.to_ne_bytes();
508 6 : let proposer_greeting = [
509 6 : greeting_tag.as_slice(),
510 6 : proto_version.as_slice(),
511 6 : pg_version.as_slice(),
512 6 : proposer_id.as_slice(),
513 6 : system_id.as_slice(),
514 6 : tenant_id.as_slice(),
515 6 : timeline_id.as_slice(),
516 6 : pg_tli.as_slice(),
517 6 : wal_seg_size.as_slice(),
518 6 : ]
519 6 : .concat();
520 6 :
521 6 : let voting_tag = (b'v' as u64).to_ne_bytes();
522 6 : let vote_request_term = 3_u64.to_ne_bytes();
523 6 : let proposer_id = [0; 16];
524 6 : let vote_request = [
525 6 : voting_tag.as_slice(),
526 6 : vote_request_term.as_slice(),
527 6 : proposer_id.as_slice(),
528 6 : ]
529 6 : .concat();
530 6 :
531 6 : let acceptor_greeting_term = 2_u64.to_ne_bytes();
532 6 : let acceptor_greeting_node_id = 1_u64.to_ne_bytes();
533 6 : let acceptor_greeting = [
534 6 : greeting_tag.as_slice(),
535 6 : acceptor_greeting_term.as_slice(),
536 6 : acceptor_greeting_node_id.as_slice(),
537 6 : ]
538 6 : .concat();
539 6 :
540 6 : let vote_response_term = 3_u64.to_ne_bytes();
541 6 : let vote_given = 1_u64.to_ne_bytes();
542 6 : let flush_lsn = 0x539_u64.to_ne_bytes();
543 6 : let truncate_lsn = 0x539_u64.to_ne_bytes();
544 6 : let th_len = 1_u32.to_ne_bytes();
545 6 : let th_term = 2_u64.to_ne_bytes();
546 6 : let th_lsn = 0x539_u64.to_ne_bytes();
547 6 : let timeline_start_lsn = 0x539_u64.to_ne_bytes();
548 6 : let vote_response = [
549 6 : voting_tag.as_slice(),
550 6 : vote_response_term.as_slice(),
551 6 : vote_given.as_slice(),
552 6 : flush_lsn.as_slice(),
553 6 : truncate_lsn.as_slice(),
554 6 : th_len.as_slice(),
555 6 : th_term.as_slice(),
556 6 : th_lsn.as_slice(),
557 6 : timeline_start_lsn.as_slice(),
558 6 : ]
559 6 : .concat();
560 6 :
561 6 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
562 6 : wait_events: Cell::new(WaitEventsData {
563 6 : sk: std::ptr::null_mut(),
564 6 : event_mask: 0,
565 6 : }),
566 6 : expected_messages: vec![proposer_greeting, vote_request],
567 6 : expected_ptr: AtomicUsize::new(0),
568 6 : safekeeper_replies: vec![acceptor_greeting, vote_response],
569 6 : replies_ptr: AtomicUsize::new(0),
570 6 : sync_channel: sender,
571 6 : shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
572 6 : });
573 6 : let config = crate::walproposer::Config {
574 6 : ttid,
575 6 : safekeepers_list: vec!["localhost:5000".to_string()],
576 6 : safekeeper_reconnect_timeout: 1000,
577 6 : safekeeper_connection_timeout: 10000,
578 6 : sync_safekeepers: true,
579 6 : };
580 6 :
581 6 : let wp = Wrapper::new(my_impl, config);
582 6 :
583 6 : // walproposer will panic when it finishes sync_safekeepers
584 6 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
585 6 : // validate the resulting LSN
586 6 : assert_eq!(receiver.try_recv(), Ok(1337));
587 6 : Ok(())
588 : // drop() will free up resources here
589 6 : }
590 : }
|