Line data Source code
1 : #![allow(clippy::todo)]
2 :
3 : use std::ffi::CString;
4 :
5 : use crate::{
6 : api_bindings::{create_api, take_vec_u8, Level},
7 : bindings::{
8 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
9 : WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
10 : },
11 : };
12 : use postgres_ffi::WAL_SEGMENT_SIZE;
13 : use utils::{id::TenantTimelineId, lsn::Lsn};
14 :
15 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
16 : /// for simple cases, hence todo!() in default implementations.
17 : ///
18 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
19 : pub trait ApiImpl {
20 0 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
21 0 : todo!()
22 : }
23 :
24 0 : fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
25 0 : todo!()
26 : }
27 :
28 0 : fn get_flush_rec_ptr(&self) -> u64 {
29 0 : todo!()
30 : }
31 :
32 0 : fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
33 0 : todo!()
34 : }
35 :
36 0 : fn get_current_timestamp(&self) -> i64 {
37 0 : todo!()
38 : }
39 :
40 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
41 0 : todo!()
42 : }
43 :
44 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
45 0 : todo!()
46 : }
47 :
48 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
49 0 : todo!()
50 : }
51 :
52 0 : fn conn_connect_poll(
53 0 : &self,
54 0 : _sk: &mut Safekeeper,
55 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
56 0 : todo!()
57 : }
58 :
59 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
60 0 : todo!()
61 : }
62 :
63 0 : fn conn_get_query_result(
64 0 : &self,
65 0 : _sk: &mut Safekeeper,
66 0 : ) -> crate::bindings::WalProposerExecStatusType {
67 0 : todo!()
68 : }
69 :
70 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
71 0 : todo!()
72 : }
73 :
74 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
75 0 : todo!()
76 : }
77 :
78 0 : fn conn_async_read(
79 0 : &self,
80 0 : _sk: &mut Safekeeper,
81 0 : _vec: &mut Vec<u8>,
82 0 : ) -> crate::bindings::PGAsyncReadResult {
83 0 : todo!()
84 : }
85 :
86 0 : fn conn_async_write(
87 0 : &self,
88 0 : _sk: &mut Safekeeper,
89 0 : _buf: &[u8],
90 0 : ) -> crate::bindings::PGAsyncWriteResult {
91 0 : todo!()
92 : }
93 :
94 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
95 0 : todo!()
96 : }
97 :
98 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
99 0 : todo!()
100 : }
101 :
102 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
103 0 : todo!()
104 : }
105 :
106 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
107 0 : todo!()
108 : }
109 :
110 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
111 0 : todo!()
112 : }
113 :
114 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
115 0 : todo!()
116 : }
117 :
118 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
119 0 : todo!()
120 : }
121 :
122 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
123 0 : todo!()
124 : }
125 :
126 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
127 0 : todo!()
128 : }
129 :
130 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
131 0 : todo!()
132 : }
133 :
134 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
135 0 : todo!()
136 : }
137 :
138 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
139 0 : todo!()
140 : }
141 :
142 0 : fn get_redo_start_lsn(&self) -> u64 {
143 0 : todo!()
144 : }
145 :
146 0 : fn finish_sync_safekeepers(&self, _lsn: u64) {
147 0 : todo!()
148 : }
149 :
150 0 : fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
151 0 : todo!()
152 : }
153 :
154 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
155 0 : todo!()
156 : }
157 :
158 0 : fn after_election(&self, _wp: &mut WalProposer) {
159 0 : todo!()
160 : }
161 : }
162 :
163 : #[derive(Debug)]
164 : pub enum WaitResult {
165 : Latch,
166 : Timeout,
167 : Network(*mut Safekeeper, u32),
168 : }
169 :
170 : #[derive(Clone)]
171 : pub struct Config {
172 : /// Tenant and timeline id
173 : pub ttid: TenantTimelineId,
174 : /// List of safekeepers in format `host:port`
175 : pub safekeepers_list: Vec<String>,
176 : /// Safekeeper reconnect timeout in milliseconds
177 : pub safekeeper_reconnect_timeout: i32,
178 : /// Safekeeper connection timeout in milliseconds
179 : pub safekeeper_connection_timeout: i32,
180 : /// walproposer mode, finish when all safekeepers are synced or subscribe
181 : /// to WAL streaming
182 : pub sync_safekeepers: bool,
183 : }
184 :
185 : /// WalProposer main struct. C methods are reexported as Rust functions.
186 : pub struct Wrapper {
187 : wp: *mut WalProposer,
188 : _safekeepers_list_vec: Vec<u8>,
189 : }
190 :
191 : impl Wrapper {
192 73332 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
193 73332 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
194 73332 : .unwrap()
195 73332 : .into_raw();
196 73332 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
197 73332 : .unwrap()
198 73332 : .into_raw();
199 73332 :
200 73332 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
201 73332 : .unwrap()
202 73332 : .into_bytes_with_nul();
203 73332 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
204 73332 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
205 73332 :
206 73332 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
207 73332 :
208 73332 : let c_config = WalProposerConfig {
209 73332 : neon_tenant,
210 73332 : neon_timeline,
211 73332 : safekeepers_list,
212 73332 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
213 73332 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
214 73332 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
215 73332 : syncSafekeepers: config.sync_safekeepers,
216 73332 : systemId: 0,
217 73332 : pgTimeline: 1,
218 73332 : callback_data,
219 73332 : };
220 73332 : let c_config = Box::into_raw(Box::new(c_config));
221 73332 :
222 73332 : let api = create_api();
223 73332 : let wp = unsafe { WalProposerCreate(c_config, api) };
224 73332 : Wrapper {
225 73332 : wp,
226 73332 : _safekeepers_list_vec: safekeepers_list_vec,
227 73332 : }
228 73332 : }
229 :
230 73332 : pub fn start(&self) {
231 73332 : unsafe { WalProposerStart(self.wp) }
232 73332 : }
233 : }
234 :
235 : impl Drop for Wrapper {
236 73328 : fn drop(&mut self) {
237 73328 : unsafe {
238 73328 : let config = (*self.wp).config;
239 73328 : drop(Box::from_raw(
240 73328 : (*config).callback_data as *mut Box<dyn ApiImpl>,
241 73328 : ));
242 73328 : drop(CString::from_raw((*config).neon_tenant));
243 73328 : drop(CString::from_raw((*config).neon_timeline));
244 73328 : drop(Box::from_raw(config));
245 :
246 219980 : for i in 0..(*self.wp).n_safekeepers {
247 219980 : let sk = &mut (*self.wp).safekeeper[i as usize];
248 219980 : take_vec_u8(&mut sk.inbuf);
249 219980 : }
250 :
251 73328 : WalProposerFree(self.wp);
252 73328 : }
253 73328 : }
254 : }
255 :
256 : pub struct StreamingCallback {
257 : wp: *mut WalProposer,
258 : }
259 :
260 : impl StreamingCallback {
261 1347 : pub fn new(wp: *mut WalProposer) -> StreamingCallback {
262 1347 : StreamingCallback { wp }
263 1347 : }
264 :
265 3116 : pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
266 3116 : unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
267 3116 : }
268 :
269 3116 : pub fn poll(&self) {
270 3116 : unsafe { WalProposerPoll(self.wp) }
271 3116 : }
272 : }
273 :
274 : #[cfg(test)]
275 : mod tests {
276 : use core::panic;
277 : use std::{
278 : cell::Cell,
279 : sync::{atomic::AtomicUsize, mpsc::sync_channel},
280 : };
281 :
282 : use std::cell::UnsafeCell;
283 : use utils::id::TenantTimelineId;
284 :
285 : use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
286 :
287 : use super::ApiImpl;
288 :
289 : #[derive(Clone, Copy, Debug)]
290 : struct WaitEventsData {
291 : sk: *mut crate::bindings::Safekeeper,
292 : event_mask: u32,
293 : }
294 :
295 : struct MockImpl {
296 : // data to return from wait_event_set
297 : wait_events: Cell<WaitEventsData>,
298 : // walproposer->safekeeper messages
299 : expected_messages: Vec<Vec<u8>>,
300 : expected_ptr: AtomicUsize,
301 : // safekeeper->walproposer messages
302 : safekeeper_replies: Vec<Vec<u8>>,
303 : replies_ptr: AtomicUsize,
304 : // channel to send LSN to the main thread
305 : sync_channel: std::sync::mpsc::SyncSender<u64>,
306 : // Shmem state, used for storing donor info
307 : shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
308 : }
309 :
310 : impl MockImpl {
311 4 : fn check_walproposer_msg(&self, msg: &[u8]) {
312 4 : let ptr = self
313 4 : .expected_ptr
314 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
315 4 :
316 4 : if ptr >= self.expected_messages.len() {
317 0 : panic!("unexpected message from walproposer");
318 4 : }
319 4 :
320 4 : let expected_msg = &self.expected_messages[ptr];
321 4 : assert_eq!(msg, expected_msg.as_slice());
322 4 : }
323 :
324 4 : fn next_safekeeper_reply(&self) -> &[u8] {
325 4 : let ptr = self
326 4 : .replies_ptr
327 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
328 4 :
329 4 : if ptr >= self.safekeeper_replies.len() {
330 0 : panic!("no more safekeeper replies");
331 4 : }
332 4 :
333 4 : &self.safekeeper_replies[ptr]
334 4 : }
335 : }
336 :
337 : impl ApiImpl for MockImpl {
338 4 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
339 4 : self.shmem.get()
340 4 : }
341 :
342 28 : fn get_current_timestamp(&self) -> i64 {
343 28 : println!("get_current_timestamp");
344 28 : 0
345 28 : }
346 :
347 0 : fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
348 0 : let mut shmem = unsafe { *self.get_shmem_state() };
349 0 : shmem.propEpochStartLsn.value = donor_lsn;
350 0 : shmem.donor_conninfo = donor.conninfo;
351 0 : shmem.donor_lsn = donor_lsn;
352 0 : }
353 :
354 2 : fn conn_status(
355 2 : &self,
356 2 : _: &mut crate::bindings::Safekeeper,
357 2 : ) -> crate::bindings::WalProposerConnStatusType {
358 2 : println!("conn_status");
359 2 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
360 2 : }
361 :
362 2 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
363 2 : println!("conn_connect_start");
364 2 : }
365 :
366 2 : fn conn_connect_poll(
367 2 : &self,
368 2 : _: &mut crate::bindings::Safekeeper,
369 2 : ) -> crate::bindings::WalProposerConnectPollStatusType {
370 2 : println!("conn_connect_poll");
371 2 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
372 2 : }
373 :
374 2 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
375 2 : println!("conn_send_query: {}", query);
376 2 : true
377 2 : }
378 :
379 2 : fn conn_get_query_result(
380 2 : &self,
381 2 : _: &mut crate::bindings::Safekeeper,
382 2 : ) -> crate::bindings::WalProposerExecStatusType {
383 2 : println!("conn_get_query_result");
384 2 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
385 2 : }
386 :
387 4 : fn conn_async_read(
388 4 : &self,
389 4 : _: &mut crate::bindings::Safekeeper,
390 4 : vec: &mut Vec<u8>,
391 4 : ) -> crate::bindings::PGAsyncReadResult {
392 4 : println!("conn_async_read");
393 4 : let reply = self.next_safekeeper_reply();
394 4 : println!("conn_async_read result: {:?}", reply);
395 4 : vec.extend_from_slice(reply);
396 4 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
397 4 : }
398 :
399 4 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
400 4 : println!("conn_blocking_write: {:?}", buf);
401 4 : self.check_walproposer_msg(buf);
402 4 : true
403 4 : }
404 :
405 2 : fn recovery_download(
406 2 : &self,
407 2 : _wp: &mut crate::bindings::WalProposer,
408 2 : _sk: &mut crate::bindings::Safekeeper,
409 2 : ) -> bool {
410 2 : true
411 2 : }
412 :
413 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
414 0 : println!("wal_reader_allocate");
415 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
416 0 : }
417 :
418 2 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
419 2 : println!("init_event_set")
420 2 : }
421 :
422 8 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
423 8 : println!(
424 8 : "update_event_set, sk={:?}, events_mask={:#b}",
425 8 : sk as *mut crate::bindings::Safekeeper, event_mask
426 8 : );
427 8 : self.wait_events.set(WaitEventsData { sk, event_mask });
428 8 : }
429 :
430 4 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
431 4 : println!(
432 4 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
433 4 : sk as *mut crate::bindings::Safekeeper, event_mask
434 4 : );
435 4 : self.wait_events.set(WaitEventsData { sk, event_mask });
436 4 : }
437 :
438 2 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
439 2 : println!(
440 2 : "rm_safekeeper_event_set, sk={:?}",
441 2 : sk as *mut crate::bindings::Safekeeper
442 2 : );
443 2 : }
444 :
445 8 : fn wait_event_set(
446 8 : &self,
447 8 : _: &mut crate::bindings::WalProposer,
448 8 : timeout_millis: i64,
449 8 : ) -> super::WaitResult {
450 8 : let data = self.wait_events.get();
451 8 : println!(
452 8 : "wait_event_set, timeout_millis={}, res={:?}",
453 8 : timeout_millis, data
454 8 : );
455 8 : super::WaitResult::Network(data.sk, data.event_mask)
456 8 : }
457 :
458 2 : fn strong_random(&self, buf: &mut [u8]) -> bool {
459 2 : println!("strong_random");
460 2 : buf.fill(0);
461 2 : true
462 2 : }
463 :
464 2 : fn finish_sync_safekeepers(&self, lsn: u64) {
465 2 : self.sync_channel.send(lsn).unwrap();
466 2 : panic!("sync safekeepers finished at lsn={}", lsn);
467 : }
468 :
469 14 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
470 14 : println!("wp_log[{}] {}", level, msg);
471 14 : }
472 :
473 2 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
474 2 : println!("after_election");
475 2 : }
476 : }
477 :
478 : /// Test that walproposer can successfully connect to safekeeper and finish
479 : /// sync_safekeepers. API is mocked in MockImpl.
480 : ///
481 : /// Run this test with valgrind to detect leaks:
482 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
483 : #[test]
484 2 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
485 2 : let ttid = TenantTimelineId::new(
486 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
487 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
488 : );
489 :
490 2 : let (sender, receiver) = sync_channel(1);
491 2 :
492 2 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
493 2 : wait_events: Cell::new(WaitEventsData {
494 2 : sk: std::ptr::null_mut(),
495 2 : event_mask: 0,
496 2 : }),
497 2 : expected_messages: vec![
498 2 : // TODO: When updating Postgres versions, this test will cause
499 2 : // problems. Postgres version in message needs updating.
500 2 : //
501 2 : // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160003, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
502 2 : vec![
503 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
504 2 : 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
505 2 : 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
506 2 : 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
507 2 : ],
508 2 : // VoteRequest(VoteRequest { term: 3 })
509 2 : vec![
510 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
511 2 : 0, 0, 0, 0, 0, 0,
512 2 : ],
513 2 : ],
514 2 : expected_ptr: AtomicUsize::new(0),
515 2 : safekeeper_replies: vec![
516 2 : // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
517 2 : vec![
518 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
519 2 : ],
520 2 : // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
521 2 : vec![
522 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
523 2 : 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
524 2 : 0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
525 2 : ],
526 2 : ],
527 2 : replies_ptr: AtomicUsize::new(0),
528 2 : sync_channel: sender,
529 2 : shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
530 2 : });
531 2 : let config = crate::walproposer::Config {
532 2 : ttid,
533 2 : safekeepers_list: vec!["localhost:5000".to_string()],
534 2 : safekeeper_reconnect_timeout: 1000,
535 2 : safekeeper_connection_timeout: 10000,
536 2 : sync_safekeepers: true,
537 2 : };
538 2 :
539 2 : let wp = Wrapper::new(my_impl, config);
540 2 :
541 2 : // walproposer will panic when it finishes sync_safekeepers
542 2 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
543 2 : // validate the resulting LSN
544 2 : assert_eq!(receiver.try_recv(), Ok(1337));
545 2 : Ok(())
546 : // drop() will free up resources here
547 2 : }
548 : }
|