Line data Source code
1 : use std::ffi::CString;
2 :
3 : use postgres_ffi::WAL_SEGMENT_SIZE;
4 : use utils::{id::TenantTimelineId, lsn::Lsn};
5 :
6 : use crate::{
7 : api_bindings::{create_api, take_vec_u8, Level},
8 : bindings::{
9 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
10 : WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
11 : },
12 : };
13 :
14 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
15 : /// for simple cases, hence todo!() in default implementations.
16 : ///
17 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
18 : pub trait ApiImpl {
19 0 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
20 0 : todo!()
21 : }
22 :
23 0 : fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
24 0 : todo!()
25 : }
26 :
27 0 : fn get_flush_rec_ptr(&self) -> u64 {
28 0 : todo!()
29 : }
30 :
31 0 : fn get_current_timestamp(&self) -> i64 {
32 0 : todo!()
33 : }
34 :
35 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
36 0 : todo!()
37 : }
38 :
39 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
40 0 : todo!()
41 : }
42 :
43 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
44 0 : todo!()
45 : }
46 :
47 0 : fn conn_connect_poll(
48 0 : &self,
49 0 : _sk: &mut Safekeeper,
50 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
51 0 : todo!()
52 : }
53 :
54 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
55 0 : todo!()
56 : }
57 :
58 0 : fn conn_get_query_result(
59 0 : &self,
60 0 : _sk: &mut Safekeeper,
61 0 : ) -> crate::bindings::WalProposerExecStatusType {
62 0 : todo!()
63 : }
64 :
65 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
66 0 : todo!()
67 : }
68 :
69 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
70 0 : todo!()
71 : }
72 :
73 0 : fn conn_async_read(
74 0 : &self,
75 0 : _sk: &mut Safekeeper,
76 0 : _vec: &mut Vec<u8>,
77 0 : ) -> crate::bindings::PGAsyncReadResult {
78 0 : todo!()
79 : }
80 :
81 0 : fn conn_async_write(
82 0 : &self,
83 0 : _sk: &mut Safekeeper,
84 0 : _buf: &[u8],
85 0 : ) -> crate::bindings::PGAsyncWriteResult {
86 0 : todo!()
87 : }
88 :
89 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
90 0 : todo!()
91 : }
92 :
93 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
94 0 : todo!()
95 : }
96 :
97 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
98 0 : todo!()
99 : }
100 :
101 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
102 0 : todo!()
103 : }
104 :
105 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
106 0 : todo!()
107 : }
108 :
109 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
110 0 : todo!()
111 : }
112 :
113 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
114 0 : todo!()
115 : }
116 :
117 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
118 0 : todo!()
119 : }
120 :
121 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
122 0 : todo!()
123 : }
124 :
125 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
126 0 : todo!()
127 : }
128 :
129 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
130 0 : todo!()
131 : }
132 :
133 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
134 0 : todo!()
135 : }
136 :
137 0 : fn get_redo_start_lsn(&self) -> u64 {
138 0 : todo!()
139 : }
140 :
141 0 : fn finish_sync_safekeepers(&self, _lsn: u64) {
142 0 : todo!()
143 : }
144 :
145 0 : fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
146 0 : todo!()
147 : }
148 :
149 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
150 0 : todo!()
151 : }
152 :
153 0 : fn after_election(&self, _wp: &mut WalProposer) {
154 0 : todo!()
155 : }
156 : }
157 :
158 690 : #[derive(Debug)]
159 : pub enum WaitResult {
160 : Latch,
161 : Timeout,
162 : Network(*mut Safekeeper, u32),
163 : }
164 :
165 72092 : #[derive(Clone)]
166 : pub struct Config {
167 : /// Tenant and timeline id
168 : pub ttid: TenantTimelineId,
169 : /// List of safekeepers in format `host:port`
170 : pub safekeepers_list: Vec<String>,
171 : /// Safekeeper reconnect timeout in milliseconds
172 : pub safekeeper_reconnect_timeout: i32,
173 : /// Safekeeper connection timeout in milliseconds
174 : pub safekeeper_connection_timeout: i32,
175 : /// walproposer mode, finish when all safekeepers are synced or subscribe
176 : /// to WAL streaming
177 : pub sync_safekeepers: bool,
178 : }
179 :
180 : /// WalProposer main struct. C methods are reexported as Rust functions.
181 : pub struct Wrapper {
182 : wp: *mut WalProposer,
183 : _safekeepers_list_vec: Vec<u8>,
184 : }
185 :
186 : impl Wrapper {
187 72094 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
188 72094 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
189 72094 : .unwrap()
190 72094 : .into_raw();
191 72094 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
192 72094 : .unwrap()
193 72094 : .into_raw();
194 72094 :
195 72094 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
196 72094 : .unwrap()
197 72094 : .into_bytes_with_nul();
198 72094 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
199 72094 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
200 72094 :
201 72094 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
202 72094 :
203 72094 : let c_config = WalProposerConfig {
204 72094 : neon_tenant,
205 72094 : neon_timeline,
206 72094 : safekeepers_list,
207 72094 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
208 72094 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
209 72094 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
210 72094 : syncSafekeepers: config.sync_safekeepers,
211 72094 : systemId: 0,
212 72094 : pgTimeline: 1,
213 72094 : callback_data,
214 72094 : };
215 72094 : let c_config = Box::into_raw(Box::new(c_config));
216 72094 :
217 72094 : let api = create_api();
218 72094 : let wp = unsafe { WalProposerCreate(c_config, api) };
219 72094 : Wrapper {
220 72094 : wp,
221 72094 : _safekeepers_list_vec: safekeepers_list_vec,
222 72094 : }
223 72094 : }
224 :
225 72094 : pub fn start(&self) {
226 72094 : unsafe { WalProposerStart(self.wp) }
227 72094 : }
228 : }
229 :
230 : impl Drop for Wrapper {
231 72090 : fn drop(&mut self) {
232 72090 : unsafe {
233 72090 : let config = (*self.wp).config;
234 72090 : drop(Box::from_raw(
235 72090 : (*config).callback_data as *mut Box<dyn ApiImpl>,
236 72090 : ));
237 72090 : drop(CString::from_raw((*config).neon_tenant));
238 72090 : drop(CString::from_raw((*config).neon_timeline));
239 72090 : drop(Box::from_raw(config));
240 :
241 216266 : for i in 0..(*self.wp).n_safekeepers {
242 216266 : let sk = &mut (*self.wp).safekeeper[i as usize];
243 216266 : take_vec_u8(&mut sk.inbuf);
244 216266 : }
245 :
246 72090 : WalProposerFree(self.wp);
247 72090 : }
248 72090 : }
249 : }
250 :
251 : pub struct StreamingCallback {
252 : wp: *mut WalProposer,
253 : }
254 :
255 : impl StreamingCallback {
256 1259 : pub fn new(wp: *mut WalProposer) -> StreamingCallback {
257 1259 : StreamingCallback { wp }
258 1259 : }
259 :
260 2847 : pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
261 2847 : unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
262 2847 : }
263 :
264 2847 : pub fn poll(&self) {
265 2847 : unsafe { WalProposerPoll(self.wp) }
266 2847 : }
267 : }
268 :
269 : #[cfg(test)]
270 : mod tests {
271 : use core::panic;
272 : use std::{
273 : cell::Cell,
274 : sync::{atomic::AtomicUsize, mpsc::sync_channel},
275 : };
276 :
277 : use utils::id::TenantTimelineId;
278 :
279 : use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
280 :
281 : use super::ApiImpl;
282 :
283 8 : #[derive(Clone, Copy, Debug)]
284 : struct WaitEventsData {
285 : sk: *mut crate::bindings::Safekeeper,
286 : event_mask: u32,
287 : }
288 :
289 : struct MockImpl {
290 : // data to return from wait_event_set
291 : wait_events: Cell<WaitEventsData>,
292 : // walproposer->safekeeper messages
293 : expected_messages: Vec<Vec<u8>>,
294 : expected_ptr: AtomicUsize,
295 : // safekeeper->walproposer messages
296 : safekeeper_replies: Vec<Vec<u8>>,
297 : replies_ptr: AtomicUsize,
298 : // channel to send LSN to the main thread
299 : sync_channel: std::sync::mpsc::SyncSender<u64>,
300 : }
301 :
302 : impl MockImpl {
303 4 : fn check_walproposer_msg(&self, msg: &[u8]) {
304 4 : let ptr = self
305 4 : .expected_ptr
306 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
307 4 :
308 4 : if ptr >= self.expected_messages.len() {
309 0 : panic!("unexpected message from walproposer");
310 4 : }
311 4 :
312 4 : let expected_msg = &self.expected_messages[ptr];
313 4 : assert_eq!(msg, expected_msg.as_slice());
314 4 : }
315 :
316 4 : fn next_safekeeper_reply(&self) -> &[u8] {
317 4 : let ptr = self
318 4 : .replies_ptr
319 4 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
320 4 :
321 4 : if ptr >= self.safekeeper_replies.len() {
322 0 : panic!("no more safekeeper replies");
323 4 : }
324 4 :
325 4 : &self.safekeeper_replies[ptr]
326 4 : }
327 : }
328 :
329 : impl ApiImpl for MockImpl {
330 28 : fn get_current_timestamp(&self) -> i64 {
331 28 : println!("get_current_timestamp");
332 28 : 0
333 28 : }
334 :
335 2 : fn conn_status(
336 2 : &self,
337 2 : _: &mut crate::bindings::Safekeeper,
338 2 : ) -> crate::bindings::WalProposerConnStatusType {
339 2 : println!("conn_status");
340 2 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
341 2 : }
342 :
343 2 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
344 2 : println!("conn_connect_start");
345 2 : }
346 :
347 2 : fn conn_connect_poll(
348 2 : &self,
349 2 : _: &mut crate::bindings::Safekeeper,
350 2 : ) -> crate::bindings::WalProposerConnectPollStatusType {
351 2 : println!("conn_connect_poll");
352 2 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
353 2 : }
354 :
355 2 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
356 2 : println!("conn_send_query: {}", query);
357 2 : true
358 2 : }
359 :
360 2 : fn conn_get_query_result(
361 2 : &self,
362 2 : _: &mut crate::bindings::Safekeeper,
363 2 : ) -> crate::bindings::WalProposerExecStatusType {
364 2 : println!("conn_get_query_result");
365 2 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
366 2 : }
367 :
368 4 : fn conn_async_read(
369 4 : &self,
370 4 : _: &mut crate::bindings::Safekeeper,
371 4 : vec: &mut Vec<u8>,
372 4 : ) -> crate::bindings::PGAsyncReadResult {
373 4 : println!("conn_async_read");
374 4 : let reply = self.next_safekeeper_reply();
375 4 : println!("conn_async_read result: {:?}", reply);
376 4 : vec.extend_from_slice(reply);
377 4 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
378 4 : }
379 :
380 4 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
381 4 : println!("conn_blocking_write: {:?}", buf);
382 4 : self.check_walproposer_msg(buf);
383 4 : true
384 4 : }
385 :
386 2 : fn recovery_download(
387 2 : &self,
388 2 : _wp: &mut crate::bindings::WalProposer,
389 2 : _sk: &mut crate::bindings::Safekeeper,
390 2 : ) -> bool {
391 2 : true
392 2 : }
393 :
394 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
395 0 : println!("wal_reader_allocate");
396 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
397 0 : }
398 :
399 2 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
400 2 : println!("init_event_set")
401 2 : }
402 :
403 8 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
404 8 : println!(
405 8 : "update_event_set, sk={:?}, events_mask={:#b}",
406 8 : sk as *mut crate::bindings::Safekeeper, event_mask
407 8 : );
408 8 : self.wait_events.set(WaitEventsData { sk, event_mask });
409 8 : }
410 :
411 4 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
412 4 : println!(
413 4 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
414 4 : sk as *mut crate::bindings::Safekeeper, event_mask
415 4 : );
416 4 : self.wait_events.set(WaitEventsData { sk, event_mask });
417 4 : }
418 :
419 2 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
420 2 : println!(
421 2 : "rm_safekeeper_event_set, sk={:?}",
422 2 : sk as *mut crate::bindings::Safekeeper
423 2 : );
424 2 : }
425 :
426 8 : fn wait_event_set(
427 8 : &self,
428 8 : _: &mut crate::bindings::WalProposer,
429 8 : timeout_millis: i64,
430 8 : ) -> super::WaitResult {
431 8 : let data = self.wait_events.get();
432 8 : println!(
433 8 : "wait_event_set, timeout_millis={}, res={:?}",
434 8 : timeout_millis, data
435 8 : );
436 8 : super::WaitResult::Network(data.sk, data.event_mask)
437 8 : }
438 :
439 2 : fn strong_random(&self, buf: &mut [u8]) -> bool {
440 2 : println!("strong_random");
441 2 : buf.fill(0);
442 2 : true
443 2 : }
444 :
445 2 : fn finish_sync_safekeepers(&self, lsn: u64) {
446 2 : self.sync_channel.send(lsn).unwrap();
447 2 : panic!("sync safekeepers finished at lsn={}", lsn);
448 : }
449 :
450 14 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
451 14 : println!("wp_log[{}] {}", level, msg);
452 14 : }
453 :
454 2 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
455 2 : println!("after_election");
456 2 : }
457 : }
458 :
459 : /// Test that walproposer can successfully connect to safekeeper and finish
460 : /// sync_safekeepers. API is mocked in MockImpl.
461 : ///
462 : /// Run this test with valgrind to detect leaks:
463 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
464 2 : #[test]
465 2 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
466 2 : let ttid = TenantTimelineId::new(
467 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
468 2 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
469 : );
470 :
471 2 : let (sender, receiver) = sync_channel(1);
472 2 :
473 2 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
474 2 : wait_events: Cell::new(WaitEventsData {
475 2 : sk: std::ptr::null_mut(),
476 2 : event_mask: 0,
477 2 : }),
478 2 : expected_messages: vec![
479 2 : // TODO: When updating Postgres versions, this test will cause
480 2 : // problems. Postgres version in message needs updating.
481 2 : //
482 2 : // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160002, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
483 2 : vec![
484 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
485 2 : 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
486 2 : 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
487 2 : 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
488 2 : ],
489 2 : // VoteRequest(VoteRequest { term: 3 })
490 2 : vec![
491 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
492 2 : 0, 0, 0, 0, 0, 0,
493 2 : ],
494 2 : ],
495 2 : expected_ptr: AtomicUsize::new(0),
496 2 : safekeeper_replies: vec![
497 2 : // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
498 2 : vec![
499 2 : 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
500 2 : ],
501 2 : // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
502 2 : vec![
503 2 : 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
504 2 : 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
505 2 : 0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
506 2 : ],
507 2 : ],
508 2 : replies_ptr: AtomicUsize::new(0),
509 2 : sync_channel: sender,
510 2 : });
511 2 : let config = crate::walproposer::Config {
512 2 : ttid,
513 2 : safekeepers_list: vec!["localhost:5000".to_string()],
514 2 : safekeeper_reconnect_timeout: 1000,
515 2 : safekeeper_connection_timeout: 10000,
516 2 : sync_safekeepers: true,
517 2 : };
518 2 :
519 2 : let wp = Wrapper::new(my_impl, config);
520 2 :
521 2 : // walproposer will panic when it finishes sync_safekeepers
522 2 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
523 2 : // validate the resulting LSN
524 2 : assert_eq!(receiver.try_recv(), Ok(1337));
525 2 : Ok(())
526 : // drop() will free up resources here
527 2 : }
528 : }
|