Line data Source code
1 : #![allow(clippy::todo)]
2 :
3 : use std::ffi::CString;
4 : use std::str::FromStr;
5 :
6 : use postgres_ffi::WAL_SEGMENT_SIZE;
7 : use utils::id::TenantTimelineId;
8 : use utils::lsn::Lsn;
9 :
10 : use crate::api_bindings::{Level, create_api, take_vec_u8};
11 : use crate::bindings::{
12 : NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
13 : WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
14 : };
15 :
16 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
17 : /// for simple cases, hence todo!() in default implementations.
18 : ///
19 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
20 : pub trait ApiImpl {
21 0 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
22 0 : todo!()
23 : }
24 :
25 0 : fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
26 0 : todo!()
27 : }
28 :
29 0 : fn get_flush_rec_ptr(&self) -> u64 {
30 0 : todo!()
31 : }
32 :
33 0 : fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
34 0 : todo!()
35 : }
36 :
37 0 : fn get_current_timestamp(&self) -> i64 {
38 0 : todo!()
39 : }
40 :
41 0 : fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
42 0 : todo!()
43 : }
44 :
45 0 : fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
46 0 : todo!()
47 : }
48 :
49 0 : fn conn_connect_start(&self, _sk: &mut Safekeeper) {
50 0 : todo!()
51 : }
52 :
53 0 : fn conn_connect_poll(
54 0 : &self,
55 0 : _sk: &mut Safekeeper,
56 0 : ) -> crate::bindings::WalProposerConnectPollStatusType {
57 0 : todo!()
58 : }
59 :
60 0 : fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
61 0 : todo!()
62 : }
63 :
64 0 : fn conn_get_query_result(
65 0 : &self,
66 0 : _sk: &mut Safekeeper,
67 0 : ) -> crate::bindings::WalProposerExecStatusType {
68 0 : todo!()
69 : }
70 :
71 0 : fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
72 0 : todo!()
73 : }
74 :
75 0 : fn conn_finish(&self, _sk: &mut Safekeeper) {
76 0 : todo!()
77 : }
78 :
79 0 : fn conn_async_read(
80 0 : &self,
81 0 : _sk: &mut Safekeeper,
82 0 : _vec: &mut Vec<u8>,
83 0 : ) -> crate::bindings::PGAsyncReadResult {
84 0 : todo!()
85 : }
86 :
87 0 : fn conn_async_write(
88 0 : &self,
89 0 : _sk: &mut Safekeeper,
90 0 : _buf: &[u8],
91 0 : ) -> crate::bindings::PGAsyncWriteResult {
92 0 : todo!()
93 : }
94 :
95 0 : fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
96 0 : todo!()
97 : }
98 :
99 0 : fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
100 0 : todo!()
101 : }
102 :
103 0 : fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
104 0 : todo!()
105 : }
106 :
107 0 : fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
108 0 : todo!()
109 : }
110 :
111 0 : fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
112 0 : todo!()
113 : }
114 :
115 0 : fn init_event_set(&self, _wp: &mut WalProposer) {
116 0 : todo!()
117 : }
118 :
119 0 : fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
120 0 : todo!()
121 : }
122 :
123 0 : fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
124 0 : todo!()
125 : }
126 :
127 0 : fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
128 0 : todo!()
129 : }
130 :
131 0 : fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
132 0 : todo!()
133 : }
134 :
135 0 : fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
136 0 : todo!()
137 : }
138 :
139 0 : fn strong_random(&self, _buf: &mut [u8]) -> bool {
140 0 : todo!()
141 : }
142 :
143 0 : fn get_redo_start_lsn(&self) -> u64 {
144 0 : todo!()
145 : }
146 :
147 0 : fn finish_sync_safekeepers(&self, _lsn: u64) -> ! {
148 0 : todo!()
149 : }
150 :
151 0 : fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
152 0 : todo!()
153 : }
154 :
155 0 : fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
156 0 : todo!()
157 : }
158 :
159 0 : fn after_election(&self, _wp: &mut WalProposer) {
160 0 : todo!()
161 : }
162 :
163 : /* BEGIN_HADRON */
164 9028 : fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) {
165 : // Do nothing for testing purposes.
166 1 : }
167 :
168 9500 : fn update_safekeeper_status_for_metrics(
169 9500 : &self,
170 9500 : _wp: &mut WalProposer,
171 9500 : _sk_index: u32,
172 9500 : _status: u8,
173 9500 : ) {
174 : // Do nothing for testing purposes.
175 0 : }
176 : /* END_HADRON */
177 : }
178 :
179 : #[derive(Debug)]
180 : pub enum WaitResult {
181 : Latch,
182 : Timeout,
183 : Network(*mut Safekeeper, u32),
184 : }
185 :
186 : #[derive(Clone)]
187 : pub struct Config {
188 : /// Tenant and timeline id
189 : pub ttid: TenantTimelineId,
190 : /// List of safekeepers in format `host:port`
191 : pub safekeepers_list: Vec<String>,
192 : /// libpq connection info options
193 : pub safekeeper_conninfo_options: String,
194 : /// Safekeeper reconnect timeout in milliseconds
195 : pub safekeeper_reconnect_timeout: i32,
196 : /// Safekeeper connection timeout in milliseconds
197 : pub safekeeper_connection_timeout: i32,
198 : /// walproposer mode, finish when all safekeepers are synced or subscribe
199 : /// to WAL streaming
200 : pub sync_safekeepers: bool,
201 : }
202 :
203 : /// WalProposer main struct. C methods are reexported as Rust functions.
204 : pub struct Wrapper {
205 : wp: *mut WalProposer,
206 : _safekeepers_list_vec: Vec<u8>,
207 : }
208 :
209 : impl Wrapper {
210 9028 : pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
211 9028 : let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
212 9028 : .unwrap()
213 9028 : .into_raw();
214 9028 : let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
215 9028 : .unwrap()
216 9028 : .into_raw();
217 :
218 9028 : let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
219 9028 : .unwrap()
220 9028 : .into_bytes_with_nul();
221 9028 : assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
222 9028 : let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
223 9028 : let safekeeper_conninfo_options = CString::from_str(&config.safekeeper_conninfo_options)
224 9028 : .unwrap()
225 9028 : .into_raw();
226 :
227 9028 : let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
228 :
229 9028 : let c_config = WalProposerConfig {
230 9028 : neon_tenant,
231 9028 : neon_timeline,
232 9028 : safekeepers_list,
233 9028 : safekeeper_conninfo_options,
234 9028 : safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
235 9028 : safekeeper_connection_timeout: config.safekeeper_connection_timeout,
236 9028 : wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
237 9028 : syncSafekeepers: config.sync_safekeepers,
238 9028 : systemId: 0,
239 9028 : pgTimeline: 1,
240 9028 : proto_version: 3,
241 9028 : callback_data,
242 9028 : };
243 9028 : let c_config = Box::into_raw(Box::new(c_config));
244 :
245 9028 : let api = create_api();
246 9028 : let wp = unsafe { WalProposerCreate(c_config, api) };
247 9028 : Wrapper {
248 9028 : wp,
249 9028 : _safekeepers_list_vec: safekeepers_list_vec,
250 9028 : }
251 9028 : }
252 :
253 9028 : pub fn start(&self) {
254 9028 : unsafe { WalProposerStart(self.wp) }
255 9028 : }
256 : }
257 :
258 : impl Drop for Wrapper {
259 9026 : fn drop(&mut self) {
260 : unsafe {
261 9026 : let config = (*self.wp).config;
262 9026 : drop(Box::from_raw(
263 9026 : (*config).callback_data as *mut Box<dyn ApiImpl>,
264 : ));
265 9026 : drop(CString::from_raw((*config).neon_tenant));
266 9026 : drop(CString::from_raw((*config).neon_timeline));
267 9026 : drop(Box::from_raw(config));
268 :
269 27076 : for i in 0..(*self.wp).n_safekeepers {
270 27076 : let sk = &mut (*self.wp).safekeeper[i as usize];
271 27076 : take_vec_u8(&mut sk.inbuf);
272 27076 : }
273 :
274 9026 : WalProposerFree(self.wp);
275 : }
276 9026 : }
277 : }
278 :
279 : pub struct StreamingCallback {
280 : wp: *mut WalProposer,
281 : }
282 :
283 : impl StreamingCallback {
284 152 : pub fn new(wp: *mut WalProposer) -> StreamingCallback {
285 152 : StreamingCallback { wp }
286 152 : }
287 :
288 498 : pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
289 498 : unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
290 498 : }
291 :
292 498 : pub fn poll(&self) {
293 498 : unsafe { WalProposerPoll(self.wp) }
294 498 : }
295 : }
296 :
297 : #[cfg(test)]
298 : mod tests {
299 : use core::panic;
300 : use std::cell::{Cell, UnsafeCell};
301 : use std::ffi::CString;
302 : use std::sync::atomic::AtomicUsize;
303 : use std::sync::mpsc::sync_channel;
304 :
305 : use utils::id::TenantTimelineId;
306 :
307 : use super::ApiImpl;
308 : use crate::api_bindings::Level;
309 : use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM};
310 : use crate::walproposer::Wrapper;
311 :
312 : #[derive(Clone, Copy, Debug)]
313 : struct WaitEventsData {
314 : sk: *mut crate::bindings::Safekeeper,
315 : event_mask: u32,
316 : }
317 :
318 : struct MockImpl {
319 : // data to return from wait_event_set
320 : wait_events: Cell<WaitEventsData>,
321 : // walproposer->safekeeper messages
322 : expected_messages: Vec<Vec<u8>>,
323 : expected_ptr: AtomicUsize,
324 : // safekeeper->walproposer messages
325 : safekeeper_replies: Vec<Vec<u8>>,
326 : replies_ptr: AtomicUsize,
327 : // channel to send LSN to the main thread
328 : sync_channel: std::sync::mpsc::SyncSender<u64>,
329 : // Shmem state, used for storing donor info
330 : shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
331 : }
332 :
333 : impl MockImpl {
334 2 : fn check_walproposer_msg(&self, msg: &[u8]) {
335 2 : let ptr = self
336 2 : .expected_ptr
337 2 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
338 :
339 2 : if ptr >= self.expected_messages.len() {
340 0 : panic!("unexpected message from walproposer");
341 2 : }
342 :
343 2 : let expected_msg = &self.expected_messages[ptr];
344 2 : assert_eq!(msg, expected_msg.as_slice());
345 2 : }
346 :
347 2 : fn next_safekeeper_reply(&self) -> &[u8] {
348 2 : let ptr = self
349 2 : .replies_ptr
350 2 : .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
351 :
352 2 : if ptr >= self.safekeeper_replies.len() {
353 0 : panic!("no more safekeeper replies");
354 2 : }
355 :
356 2 : &self.safekeeper_replies[ptr]
357 2 : }
358 : }
359 :
360 : impl ApiImpl for MockImpl {
361 2 : fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
362 2 : self.shmem.get()
363 2 : }
364 :
365 14 : fn get_current_timestamp(&self) -> i64 {
366 14 : println!("get_current_timestamp");
367 14 : 0
368 14 : }
369 :
370 0 : fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
371 0 : let mut shmem = unsafe { *self.get_shmem_state() };
372 0 : shmem.propEpochStartLsn.value = donor_lsn;
373 0 : shmem.donor_conninfo = donor.conninfo;
374 0 : shmem.donor_lsn = donor_lsn;
375 0 : }
376 :
377 1 : fn conn_status(
378 1 : &self,
379 1 : _: &mut crate::bindings::Safekeeper,
380 1 : ) -> crate::bindings::WalProposerConnStatusType {
381 1 : println!("conn_status");
382 1 : crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
383 1 : }
384 :
385 1 : fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
386 1 : println!("conn_connect_start");
387 1 : }
388 :
389 1 : fn conn_connect_poll(
390 1 : &self,
391 1 : _: &mut crate::bindings::Safekeeper,
392 1 : ) -> crate::bindings::WalProposerConnectPollStatusType {
393 1 : println!("conn_connect_poll");
394 1 : crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
395 1 : }
396 :
397 1 : fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
398 1 : println!("conn_send_query: {query}");
399 1 : true
400 1 : }
401 :
402 1 : fn conn_get_query_result(
403 1 : &self,
404 1 : _: &mut crate::bindings::Safekeeper,
405 1 : ) -> crate::bindings::WalProposerExecStatusType {
406 1 : println!("conn_get_query_result");
407 1 : crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
408 1 : }
409 :
410 2 : fn conn_async_read(
411 2 : &self,
412 2 : _: &mut crate::bindings::Safekeeper,
413 2 : vec: &mut Vec<u8>,
414 2 : ) -> crate::bindings::PGAsyncReadResult {
415 2 : println!("conn_async_read");
416 2 : let reply = self.next_safekeeper_reply();
417 2 : println!("conn_async_read result: {reply:?}");
418 2 : vec.extend_from_slice(reply);
419 2 : crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
420 2 : }
421 :
422 2 : fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
423 2 : println!("conn_blocking_write: {buf:?}");
424 2 : self.check_walproposer_msg(buf);
425 2 : true
426 2 : }
427 :
428 1 : fn recovery_download(
429 1 : &self,
430 1 : _wp: &mut crate::bindings::WalProposer,
431 1 : _sk: &mut crate::bindings::Safekeeper,
432 1 : ) -> bool {
433 1 : true
434 1 : }
435 :
436 0 : fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
437 0 : println!("wal_reader_allocate");
438 0 : crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
439 0 : }
440 :
441 1 : fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
442 1 : println!("init_event_set")
443 1 : }
444 :
445 3 : fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
446 3 : println!(
447 3 : "update_event_set, sk={:?}, events_mask={:#b}",
448 3 : sk as *mut crate::bindings::Safekeeper, event_mask
449 : );
450 3 : self.wait_events.set(WaitEventsData { sk, event_mask });
451 3 : }
452 :
453 2 : fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
454 2 : println!(
455 2 : "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
456 2 : sk as *mut crate::bindings::Safekeeper, event_mask
457 : );
458 2 : self.wait_events.set(WaitEventsData { sk, event_mask });
459 2 : }
460 :
461 1 : fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
462 1 : println!(
463 1 : "rm_safekeeper_event_set, sk={:?}",
464 1 : sk as *mut crate::bindings::Safekeeper
465 : );
466 1 : }
467 :
468 4 : fn wait_event_set(
469 4 : &self,
470 4 : _: &mut crate::bindings::WalProposer,
471 4 : timeout_millis: i64,
472 4 : ) -> super::WaitResult {
473 4 : let data = self.wait_events.get();
474 4 : println!("wait_event_set, timeout_millis={timeout_millis}, res={data:?}");
475 4 : super::WaitResult::Network(data.sk, data.event_mask)
476 4 : }
477 :
478 0 : fn strong_random(&self, buf: &mut [u8]) -> bool {
479 0 : println!("strong_random");
480 0 : buf.fill(0);
481 0 : true
482 0 : }
483 :
484 1 : fn finish_sync_safekeepers(&self, lsn: u64) -> ! {
485 1 : self.sync_channel.send(lsn).unwrap();
486 1 : panic!("sync safekeepers finished at lsn={}", lsn);
487 : }
488 :
489 12 : fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
490 12 : println!("wp_log[{level}] {msg}");
491 12 : }
492 :
493 1 : fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
494 1 : println!("after_election");
495 1 : }
496 : }
497 :
498 : /// Test that walproposer can successfully connect to safekeeper and finish
499 : /// sync_safekeepers. API is mocked in MockImpl.
500 : ///
501 : /// Run this test with valgrind to detect leaks:
502 : /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
503 : #[test]
504 1 : fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
505 1 : let ttid = TenantTimelineId::new(
506 1 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
507 1 : "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
508 : );
509 :
510 1 : let (sender, receiver) = sync_channel(1);
511 :
512 : // Messages definitions are at walproposer.h
513 : // xxx: it would be better to extract them from safekeeper crate and
514 : // use serialization/deserialization here.
515 1 : let greeting_tag = (b'g').to_be_bytes();
516 1 : let tenant_id = CString::new(ttid.tenant_id.to_string())
517 1 : .unwrap()
518 1 : .into_bytes_with_nul();
519 1 : let timeline_id = CString::new(ttid.timeline_id.to_string())
520 1 : .unwrap()
521 1 : .into_bytes_with_nul();
522 1 : let mconf_gen = 0_u32.to_be_bytes();
523 1 : let mconf_members_len = 0_u32.to_be_bytes();
524 1 : let mconf_members_new_len = 0_u32.to_be_bytes();
525 1 : let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
526 1 : let system_id = 0_u64.to_be_bytes();
527 1 : let wal_seg_size = 16777216_u32.to_be_bytes();
528 :
529 1 : let proposer_greeting = [
530 1 : greeting_tag.as_slice(),
531 1 : tenant_id.as_slice(),
532 1 : timeline_id.as_slice(),
533 1 : mconf_gen.as_slice(),
534 1 : mconf_members_len.as_slice(),
535 1 : mconf_members_new_len.as_slice(),
536 1 : pg_version.as_slice(),
537 1 : system_id.as_slice(),
538 1 : wal_seg_size.as_slice(),
539 1 : ]
540 1 : .concat();
541 :
542 1 : let voting_tag = (b'v').to_be_bytes();
543 1 : let vote_request_term = 3_u64.to_be_bytes();
544 1 : let vote_request = [
545 1 : voting_tag.as_slice(),
546 1 : mconf_gen.as_slice(),
547 1 : vote_request_term.as_slice(),
548 1 : ]
549 1 : .concat();
550 :
551 1 : let acceptor_greeting_term = 2_u64.to_be_bytes();
552 1 : let acceptor_greeting_node_id = 1_u64.to_be_bytes();
553 1 : let acceptor_greeting = [
554 1 : greeting_tag.as_slice(),
555 1 : acceptor_greeting_node_id.as_slice(),
556 1 : mconf_gen.as_slice(),
557 1 : mconf_members_len.as_slice(),
558 1 : mconf_members_new_len.as_slice(),
559 1 : acceptor_greeting_term.as_slice(),
560 1 : ]
561 1 : .concat();
562 :
563 1 : let vote_response_term = 3_u64.to_be_bytes();
564 1 : let vote_given = 1_u8.to_be_bytes();
565 1 : let flush_lsn = 0x539_u64.to_be_bytes();
566 1 : let truncate_lsn = 0x539_u64.to_be_bytes();
567 1 : let th_len = 1_u32.to_be_bytes();
568 1 : let th_term = 2_u64.to_be_bytes();
569 1 : let th_lsn = 0x539_u64.to_be_bytes();
570 1 : let vote_response = [
571 1 : voting_tag.as_slice(),
572 1 : mconf_gen.as_slice(),
573 1 : vote_response_term.as_slice(),
574 1 : vote_given.as_slice(),
575 1 : flush_lsn.as_slice(),
576 1 : truncate_lsn.as_slice(),
577 1 : th_len.as_slice(),
578 1 : th_term.as_slice(),
579 1 : th_lsn.as_slice(),
580 1 : ]
581 1 : .concat();
582 :
583 1 : let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
584 1 : wait_events: Cell::new(WaitEventsData {
585 1 : sk: std::ptr::null_mut(),
586 1 : event_mask: 0,
587 1 : }),
588 1 : expected_messages: vec![proposer_greeting, vote_request],
589 1 : expected_ptr: AtomicUsize::new(0),
590 1 : safekeeper_replies: vec![acceptor_greeting, vote_response],
591 1 : replies_ptr: AtomicUsize::new(0),
592 1 : sync_channel: sender,
593 1 : shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
594 1 : });
595 1 : let config = crate::walproposer::Config {
596 1 : ttid,
597 1 : safekeepers_list: vec!["localhost:5000".to_string()],
598 1 : safekeeper_conninfo_options: String::new(),
599 1 : safekeeper_reconnect_timeout: 1000,
600 1 : safekeeper_connection_timeout: 10000,
601 1 : sync_safekeepers: true,
602 1 : };
603 :
604 1 : let wp = Wrapper::new(my_impl, config);
605 :
606 : // walproposer will panic when it finishes sync_safekeepers
607 1 : std::panic::catch_unwind(|| wp.start()).unwrap_err();
608 : // validate the resulting LSN
609 1 : assert_eq!(receiver.try_recv(), Ok(1337));
610 1 : Ok(())
611 : // drop() will free up resources here
612 1 : }
613 : }
|