Line data Source code
1 : use std::ffi::CString;
2 :
3 : use crate::{
4 : api_bindings::{create_api, take_vec_u8, Level},
5 : bindings::{
6 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
7 : WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
8 : },
9 : };
10 : use postgres_ffi::WAL_SEGMENT_SIZE;
11 : use utils::{id::TenantTimelineId, lsn::Lsn};
12 :
13 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
14 : /// for simple cases, hence todo!() in default implementations.
15 : ///
16 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
17 : pub trait ApiImpl {
18 0 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
19 0 : todo!()
20 : }
21 :
22 0 : fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
23 0 : todo!()
24 : }
25 :
26 0 : fn get_flush_rec_ptr(&self) -> u64 {
27 0 : todo!()
28 : }
29 :
30 0 : fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
31 0 : todo!()
32 : }
33 :
34 0 : fn get_current_timestamp(&self) -> i64 {
35 0 : todo!()
36 : }
37 :
38 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
39 0 : todo!()
40 : }
41 :
42 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
43 0 : todo!()
44 : }
45 :
46 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
47 0 : todo!()
48 : }
49 :
50 0 : fn conn_connect_poll(
51 0 : &self,
52 0 : _sk: &mut Safekeeper,
53 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
54 0 : todo!()
55 : }
56 :
57 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
58 0 : todo!()
59 : }
60 :
61 0 : fn conn_get_query_result(
62 0 : &self,
63 0 : _sk: &mut Safekeeper,
64 0 : ) -> crate::bindings::WalProposerExecStatusType {
65 0 : todo!()
66 : }
67 :
68 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
69 0 : todo!()
70 : }
71 :
72 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
73 0 : todo!()
74 : }
75 :
76 0 : fn conn_async_read(
77 0 : &self,
78 0 : _sk: &mut Safekeeper,
79 0 : _vec: &mut Vec<u8>,
80 0 : ) -> crate::bindings::PGAsyncReadResult {
81 0 : todo!()
82 : }
83 :
84 0 : fn conn_async_write(
85 0 : &self,
86 0 : _sk: &mut Safekeeper,
87 0 : _buf: &[u8],
88 0 : ) -> crate::bindings::PGAsyncWriteResult {
89 0 : todo!()
90 : }
91 :
92 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
93 0 : todo!()
94 : }
95 :
96 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
97 0 : todo!()
98 : }
99 :
100 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
101 0 : todo!()
102 : }
103 :
104 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
105 0 : todo!()
106 : }
107 :
108 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
109 0 : todo!()
110 : }
111 :
112 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
113 0 : todo!()
114 : }
115 :
116 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
117 0 : todo!()
118 : }
119 :
120 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
121 0 : todo!()
122 : }
123 :
124 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
125 0 : todo!()
126 : }
127 :
128 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
129 0 : todo!()
130 : }
131 :
132 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
133 0 : todo!()
134 : }
135 :
136 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
137 0 : todo!()
138 : }
139 :
140 0 : fn get_redo_start_lsn(&self) -> u64 {
141 0 : todo!()
142 : }
143 :
144 0 : fn finish_sync_safekeepers(&self, _lsn: u64) {
145 0 : todo!()
146 : }
147 :
148 0 : fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
149 0 : todo!()
150 : }
151 :
152 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
153 0 : todo!()
154 : }
155 :
156 0 : fn after_election(&self, _wp: &mut WalProposer) {
157 0 : todo!()
158 : }
159 : }
160 :
161 : #[derive(Debug)]
162 : pub enum WaitResult {
163 : Latch,
164 : Timeout,
165 : Network(*mut Safekeeper, u32),
166 : }
167 :
168 : #[derive(Clone)]
169 : pub struct Config {
170 : /// Tenant and timeline id
171 : pub ttid: TenantTimelineId,
172 : /// List of safekeepers in format `host:port`
173 : pub safekeepers_list: Vec<String>,
174 : /// Safekeeper reconnect timeout in milliseconds
175 : pub safekeeper_reconnect_timeout: i32,
176 : /// Safekeeper connection timeout in milliseconds
177 : pub safekeeper_connection_timeout: i32,
178 : /// walproposer mode, finish when all safekeepers are synced or subscribe
179 : /// to WAL streaming
180 : pub sync_safekeepers: bool,
181 : }
182 :
183 : /// WalProposer main struct. C methods are reexported as Rust functions.
184 : pub struct Wrapper {
185 : wp: *mut WalProposer,
186 : _safekeepers_list_vec: Vec<u8>,
187 : }
188 :
189 : impl Wrapper {
190 72630 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
191 72630 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
192 72630 : .unwrap()
193 72630 : .into_raw();
194 72630 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
195 72630 : .unwrap()
196 72630 : .into_raw();
197 72630 :
198 72630 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
199 72630 : .unwrap()
200 72630 : .into_bytes_with_nul();
201 72630 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
202 72630 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
203 72630 :
204 72630 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
205 72630 :
206 72630 : let c_config = WalProposerConfig {
207 72630 : neon_tenant,
208 72630 : neon_timeline,
209 72630 : safekeepers_list,
210 72630 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
211 72630 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
212 72630 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
213 72630 : syncSafekeepers: config.sync_safekeepers,
214 72630 : systemId: 0,
215 72630 : pgTimeline: 1,
216 72630 : callback_data,
217 72630 : };
218 72630 : let c_config = Box::into_raw(Box::new(c_config));
219 72630 :
220 72630 : let api = create_api();
221 72630 : let wp = unsafe { WalProposerCreate(c_config, api) };
222 72630 : Wrapper {
223 72630 : wp,
224 72630 : _safekeepers_list_vec: safekeepers_list_vec,
225 72630 : }
226 72630 : }
227 :
228 72630 : pub fn start(&self) {
229 72630 : unsafe { WalProposerStart(self.wp) }
230 72630 : }
231 : }
232 :
233 : impl Drop for Wrapper {
234 72626 : fn drop(&mut self) {
235 72626 : unsafe {
236 72626 : let config = (*self.wp).config;
237 72626 : drop(Box::from_raw(
238 72626 : (*config).callback_data as *mut Box<dyn ApiImpl>,
239 72626 : ));
240 72626 : drop(CString::from_raw((*config).neon_tenant));
241 72626 : drop(CString::from_raw((*config).neon_timeline));
242 72626 : drop(Box::from_raw(config));
243 :
244 217874 : for i in 0..(*self.wp).n_safekeepers {
245 217874 : let sk = &mut (*self.wp).safekeeper[i as usize];
246 217874 : take_vec_u8(&mut sk.inbuf);
247 217874 : }
248 :
249 72626 : WalProposerFree(self.wp);
250 72626 : }
251 72626 : }
252 : }
253 :
254 : pub struct StreamingCallback {
255 : wp: *mut WalProposer,
256 : }
257 :
258 : impl StreamingCallback {
259 1358 : pub fn new(wp: *mut WalProposer) -> StreamingCallback {
260 1358 : StreamingCallback { wp }
261 1358 : }
262 :
263 2942 : pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
264 2942 : unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
265 2942 : }
266 :
267 2942 : pub fn poll(&self) {
268 2942 : unsafe { WalProposerPoll(self.wp) }
269 2942 : }
270 : }
271 :
272 : #[cfg(test)]
273 : mod tests {
274 : use core::panic;
275 : use std::{
276 : cell::Cell,
277 : sync::{atomic::AtomicUsize, mpsc::sync_channel},
278 : };
279 :
280 : use std::cell::UnsafeCell;
281 : use utils::id::TenantTimelineId;
282 :
283 : use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
284 :
285 : use super::ApiImpl;
286 :
287 : #[derive(Clone, Copy, Debug)]
288 : struct WaitEventsData {
289 : sk: *mut crate::bindings::Safekeeper,
290 : event_mask: u32,
291 : }
292 :
293 : struct MockImpl {
294 : // data to return from wait_event_set
295 : wait_events: Cell<WaitEventsData>,
296 : // walproposer->safekeeper messages
297 : expected_messages: Vec<Vec<u8>>,
298 : expected_ptr: AtomicUsize,
299 : // safekeeper->walproposer messages
300 : safekeeper_replies: Vec<Vec<u8>>,
301 : replies_ptr: AtomicUsize,
302 : // channel to send LSN to the main thread
303 : sync_channel: std::sync::mpsc::SyncSender<u64>,
304 : // Shmem state, used for storing donor info
305 : shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
306 : }
307 :
308 : impl MockImpl {
309 4 : fn check_walproposer_msg(&self, msg: &[u8]) {
310 4 : let ptr = self
311 4 : .expected_ptr
312 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
313 4 :
314 4 : if ptr >= self.expected_messages.len() {
315 0 : panic!("unexpected message from walproposer");
316 4 : }
317 4 :
318 4 : let expected_msg = &self.expected_messages[ptr];
319 4 : assert_eq!(msg, expected_msg.as_slice());
320 4 : }
321 :
322 4 : fn next_safekeeper_reply(&self) -> &[u8] {
323 4 : let ptr = self
324 4 : .replies_ptr
325 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
326 4 :
327 4 : if ptr >= self.safekeeper_replies.len() {
328 0 : panic!("no more safekeeper replies");
329 4 : }
330 4 :
331 4 : &self.safekeeper_replies[ptr]
332 4 : }
333 : }
334 :
335 : impl ApiImpl for MockImpl {
336 4 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
337 4 : self.shmem.get()
338 4 : }
339 :
340 28 : fn get_current_timestamp(&self) -> i64 {
341 28 : println!("get_current_timestamp");
342 28 : 0
343 28 : }
344 :
345 0 : fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
346 0 : let mut shmem = unsafe { *self.get_shmem_state() };
347 0 : shmem.propEpochStartLsn.value = donor_lsn;
348 0 : shmem.donor_conninfo = donor.conninfo;
349 0 : shmem.donor_lsn = donor_lsn;
350 0 : }
351 :
352 2 : fn conn_status(
353 2 : &self,
354 2 : _: &mut crate::bindings::Safekeeper,
355 2 : ) -> crate::bindings::WalProposerConnStatusType {
356 2 : println!("conn_status");
357 2 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
358 2 : }
359 :
360 2 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
361 2 : println!("conn_connect_start");
362 2 : }
363 :
364 2 : fn conn_connect_poll(
365 2 : &self,
366 2 : _: &mut crate::bindings::Safekeeper,
367 2 : ) -> crate::bindings::WalProposerConnectPollStatusType {
368 2 : println!("conn_connect_poll");
369 2 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
370 2 : }
371 :
372 2 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
373 2 : println!("conn_send_query: {}", query);
374 2 : true
375 2 : }
376 :
377 2 : fn conn_get_query_result(
378 2 : &self,
379 2 : _: &mut crate::bindings::Safekeeper,
380 2 : ) -> crate::bindings::WalProposerExecStatusType {
381 2 : println!("conn_get_query_result");
382 2 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
383 2 : }
384 :
385 4 : fn conn_async_read(
386 4 : &self,
387 4 : _: &mut crate::bindings::Safekeeper,
388 4 : vec: &mut Vec<u8>,
389 4 : ) -> crate::bindings::PGAsyncReadResult {
390 4 : println!("conn_async_read");
391 4 : let reply = self.next_safekeeper_reply();
392 4 : println!("conn_async_read result: {:?}", reply);
393 4 : vec.extend_from_slice(reply);
394 4 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
395 4 : }
396 :
397 4 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
398 4 : println!("conn_blocking_write: {:?}", buf);
399 4 : self.check_walproposer_msg(buf);
400 4 : true
401 4 : }
402 :
403 2 : fn recovery_download(
404 2 : &self,
405 2 : _wp: &mut crate::bindings::WalProposer,
406 2 : _sk: &mut crate::bindings::Safekeeper,
407 2 : ) -> bool {
408 2 : true
409 2 : }
410 :
411 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
412 0 : println!("wal_reader_allocate");
413 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
414 0 : }
415 :
416 2 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
417 2 : println!("init_event_set")
418 2 : }
419 :
420 8 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
421 8 : println!(
422 8 : "update_event_set, sk={:?}, events_mask={:#b}",
423 8 : sk as *mut crate::bindings::Safekeeper, event_mask
424 8 : );
425 8 : self.wait_events.set(WaitEventsData { sk, event_mask });
426 8 : }
427 :
428 4 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
429 4 : println!(
430 4 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
431 4 : sk as *mut crate::bindings::Safekeeper, event_mask
432 4 : );
433 4 : self.wait_events.set(WaitEventsData { sk, event_mask });
434 4 : }
435 :
436 2 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
437 2 : println!(
438 2 : "rm_safekeeper_event_set, sk={:?}",
439 2 : sk as *mut crate::bindings::Safekeeper
440 2 : );
441 2 : }
442 :
443 8 : fn wait_event_set(
444 8 : &self,
445 8 : _: &mut crate::bindings::WalProposer,
446 8 : timeout_millis: i64,
447 8 : ) -> super::WaitResult {
448 8 : let data = self.wait_events.get();
449 8 : println!(
450 8 : "wait_event_set, timeout_millis={}, res={:?}",
451 8 : timeout_millis, data
452 8 : );
453 8 : super::WaitResult::Network(data.sk, data.event_mask)
454 8 : }
455 :
456 2 : fn strong_random(&self, buf: &mut [u8]) -> bool {
457 2 : println!("strong_random");
458 2 : buf.fill(0);
459 2 : true
460 2 : }
461 :
462 2 : fn finish_sync_safekeepers(&self, lsn: u64) {
463 2 : self.sync_channel.send(lsn).unwrap();
464 2 : panic!("sync safekeepers finished at lsn={}", lsn);
465 : }
466 :
467 14 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
468 14 : println!("wp_log[{}] {}", level, msg);
469 14 : }
470 :
471 2 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
472 2 : println!("after_election");
473 2 : }
474 : }
475 :
476 : /// Test that walproposer can successfully connect to safekeeper and finish
477 : /// sync_safekeepers. API is mocked in MockImpl.
478 : ///
479 : /// Run this test with valgrind to detect leaks:
480 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
481 : #[test]
482 2 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
483 2 : let ttid = TenantTimelineId::new(
484 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
485 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
486 : );
487 :
488 2 : let (sender, receiver) = sync_channel(1);
489 2 :
490 2 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
491 2 : wait_events: Cell::new(WaitEventsData {
492 2 : sk: std::ptr::null_mut(),
493 2 : event_mask: 0,
494 2 : }),
495 2 : expected_messages: vec![
496 2 : // TODO: When updating Postgres versions, this test will cause
497 2 : // problems. Postgres version in message needs updating.
498 2 : //
499 2 : // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160003, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
500 2 : vec![
501 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
502 2 : 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
503 2 : 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
504 2 : 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
505 2 : ],
506 2 : // VoteRequest(VoteRequest { term: 3 })
507 2 : vec![
508 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
509 2 : 0, 0, 0, 0, 0, 0,
510 2 : ],
511 2 : ],
512 2 : expected_ptr: AtomicUsize::new(0),
513 2 : safekeeper_replies: vec![
514 2 : // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
515 2 : vec![
516 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
517 2 : ],
518 2 : // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
519 2 : vec![
520 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
521 2 : 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
522 2 : 0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
523 2 : ],
524 2 : ],
525 2 : replies_ptr: AtomicUsize::new(0),
526 2 : sync_channel: sender,
527 2 : shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
528 2 : });
529 2 : let config = crate::walproposer::Config {
530 2 : ttid,
531 2 : safekeepers_list: vec!["localhost:5000".to_string()],
532 2 : safekeeper_reconnect_timeout: 1000,
533 2 : safekeeper_connection_timeout: 10000,
534 2 : sync_safekeepers: true,
535 2 : };
536 2 :
537 2 : let wp = Wrapper::new(my_impl, config);
538 2 :
539 2 : // walproposer will panic when it finishes sync_safekeepers
540 2 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
541 2 : // validate the resulting LSN
542 2 : assert_eq!(receiver.try_recv(), Ok(1337));
543 2 : Ok(())
544 : // drop() will free up resources here
545 2 : }
546 : }
|