Line data Source code
1 : //! A C-Rust shim: defines implementation of C walproposer API, assuming wp
2 : //! callback_data stores Box to some Rust implementation.
3 :
4 : #![allow(dead_code)]
5 :
6 : use std::ffi::CStr;
7 : use std::ffi::CString;
8 :
9 : use crate::bindings::uint32;
10 : use crate::bindings::walproposer_api;
11 : use crate::bindings::NeonWALReadResult;
12 : use crate::bindings::PGAsyncReadResult;
13 : use crate::bindings::PGAsyncWriteResult;
14 : use crate::bindings::Safekeeper;
15 : use crate::bindings::Size;
16 : use crate::bindings::StringInfoData;
17 : use crate::bindings::TimestampTz;
18 : use crate::bindings::WalProposer;
19 : use crate::bindings::WalProposerConnStatusType;
20 : use crate::bindings::WalProposerConnectPollStatusType;
21 : use crate::bindings::WalProposerExecStatusType;
22 : use crate::bindings::WalproposerShmemState;
23 : use crate::bindings::XLogRecPtr;
24 : use crate::walproposer::ApiImpl;
25 : use crate::walproposer::WaitResult;
26 :
27 0 : extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
28 0 : unsafe {
29 0 : let callback_data = (*(*wp).config).callback_data;
30 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
31 0 : (*api).get_shmem_state()
32 0 : }
33 0 : }
34 :
35 0 : extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
36 0 : unsafe {
37 0 : let callback_data = (*(*wp).config).callback_data;
38 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
39 0 : (*api).start_streaming(startpos)
40 0 : }
41 0 : }
42 :
43 0 : extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
44 0 : unsafe {
45 0 : let callback_data = (*(*wp).config).callback_data;
46 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
47 0 : (*api).get_flush_rec_ptr()
48 0 : }
49 0 : }
50 :
51 28 : extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
52 28 : unsafe {
53 28 : let callback_data = (*(*wp).config).callback_data;
54 28 : let api = callback_data as *mut Box<dyn ApiImpl>;
55 28 : (*api).get_current_timestamp()
56 28 : }
57 28 : }
58 :
59 0 : extern "C" fn conn_error_message(sk: *mut Safekeeper) -> *mut ::std::os::raw::c_char {
60 0 : unsafe {
61 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
62 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
63 0 : let msg = (*api).conn_error_message(&mut (*sk));
64 0 : let msg = CString::new(msg).unwrap();
65 0 : // TODO: fix leaking error message
66 0 : msg.into_raw()
67 0 : }
68 0 : }
69 :
70 2 : extern "C" fn conn_status(sk: *mut Safekeeper) -> WalProposerConnStatusType {
71 2 : unsafe {
72 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
73 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
74 2 : (*api).conn_status(&mut (*sk))
75 2 : }
76 2 : }
77 :
78 2 : extern "C" fn conn_connect_start(sk: *mut Safekeeper) {
79 2 : unsafe {
80 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
81 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
82 2 : (*api).conn_connect_start(&mut (*sk))
83 2 : }
84 2 : }
85 :
86 2 : extern "C" fn conn_connect_poll(sk: *mut Safekeeper) -> WalProposerConnectPollStatusType {
87 2 : unsafe {
88 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
89 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
90 2 : (*api).conn_connect_poll(&mut (*sk))
91 2 : }
92 2 : }
93 :
94 2 : extern "C" fn conn_send_query(sk: *mut Safekeeper, query: *mut ::std::os::raw::c_char) -> bool {
95 2 : let query = unsafe { CStr::from_ptr(query) };
96 2 : let query = query.to_str().unwrap();
97 2 :
98 2 : unsafe {
99 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
100 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
101 2 : (*api).conn_send_query(&mut (*sk), query)
102 2 : }
103 2 : }
104 :
105 2 : extern "C" fn conn_get_query_result(sk: *mut Safekeeper) -> WalProposerExecStatusType {
106 2 : unsafe {
107 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
108 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
109 2 : (*api).conn_get_query_result(&mut (*sk))
110 2 : }
111 2 : }
112 :
113 0 : extern "C" fn conn_flush(sk: *mut Safekeeper) -> ::std::os::raw::c_int {
114 0 : unsafe {
115 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
116 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
117 0 : (*api).conn_flush(&mut (*sk))
118 0 : }
119 0 : }
120 :
121 0 : extern "C" fn conn_finish(sk: *mut Safekeeper) {
122 0 : unsafe {
123 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
124 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
125 0 : (*api).conn_finish(&mut (*sk))
126 0 : }
127 0 : }
128 :
129 4 : extern "C" fn conn_async_read(
130 4 : sk: *mut Safekeeper,
131 4 : buf: *mut *mut ::std::os::raw::c_char,
132 4 : amount: *mut ::std::os::raw::c_int,
133 4 : ) -> PGAsyncReadResult {
134 4 : unsafe {
135 4 : let callback_data = (*(*(*sk).wp).config).callback_data;
136 4 : let api = callback_data as *mut Box<dyn ApiImpl>;
137 4 : let (res, result) = (*api).conn_async_read(&mut (*sk));
138 4 :
139 4 : // This function has guarantee that returned buf will be valid until
140 4 : // the next call. So we can store a Vec in each Safekeeper and reuse
141 4 : // it on the next call.
142 4 : let mut inbuf = take_vec_u8(&mut (*sk).inbuf).unwrap_or_default();
143 4 :
144 4 : inbuf.clear();
145 4 : inbuf.extend_from_slice(res);
146 4 :
147 4 : // Put a Vec back to sk->inbuf and return data ptr.
148 4 : *buf = store_vec_u8(&mut (*sk).inbuf, inbuf);
149 4 : *amount = res.len() as i32;
150 4 :
151 4 : result
152 4 : }
153 4 : }
154 :
155 0 : extern "C" fn conn_async_write(
156 0 : sk: *mut Safekeeper,
157 0 : buf: *const ::std::os::raw::c_void,
158 0 : size: usize,
159 0 : ) -> PGAsyncWriteResult {
160 0 : unsafe {
161 0 : let buf = std::slice::from_raw_parts(buf as *const u8, size);
162 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
163 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
164 0 : (*api).conn_async_write(&mut (*sk), buf)
165 0 : }
166 0 : }
167 :
168 4 : extern "C" fn conn_blocking_write(
169 4 : sk: *mut Safekeeper,
170 4 : buf: *const ::std::os::raw::c_void,
171 4 : size: usize,
172 4 : ) -> bool {
173 4 : unsafe {
174 4 : let buf = std::slice::from_raw_parts(buf as *const u8, size);
175 4 : let callback_data = (*(*(*sk).wp).config).callback_data;
176 4 : let api = callback_data as *mut Box<dyn ApiImpl>;
177 4 : (*api).conn_blocking_write(&mut (*sk), buf)
178 4 : }
179 4 : }
180 :
181 2 : extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
182 2 : unsafe {
183 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
184 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
185 2 : (*api).recovery_download(&mut (*wp), &mut (*sk))
186 2 : }
187 2 : }
188 :
189 0 : extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
190 0 : unsafe {
191 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
192 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
193 0 : (*api).wal_reader_allocate(&mut (*sk));
194 0 : }
195 0 : }
196 :
197 : #[allow(clippy::unnecessary_cast)]
198 0 : extern "C" fn wal_read(
199 0 : sk: *mut Safekeeper,
200 0 : buf: *mut ::std::os::raw::c_char,
201 0 : startptr: XLogRecPtr,
202 0 : count: Size,
203 0 : _errmsg: *mut *mut ::std::os::raw::c_char,
204 0 : ) -> NeonWALReadResult {
205 0 : unsafe {
206 0 : let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
207 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
208 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
209 0 : // TODO: errmsg is not forwarded
210 0 : (*api).wal_read(&mut (*sk), buf, startptr)
211 0 : }
212 0 : }
213 :
214 0 : extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 {
215 0 : unsafe {
216 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
217 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
218 0 : (*api).wal_reader_events(&mut (*sk))
219 0 : }
220 0 : }
221 :
222 2 : extern "C" fn init_event_set(wp: *mut WalProposer) {
223 2 : unsafe {
224 2 : let callback_data = (*(*wp).config).callback_data;
225 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
226 2 : (*api).init_event_set(&mut (*wp));
227 2 : }
228 2 : }
229 :
230 8 : extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) {
231 8 : unsafe {
232 8 : let callback_data = (*(*(*sk).wp).config).callback_data;
233 8 : let api = callback_data as *mut Box<dyn ApiImpl>;
234 8 : (*api).update_event_set(&mut (*sk), events);
235 8 : }
236 8 : }
237 :
238 0 : extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) {
239 0 : unsafe {
240 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
241 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
242 0 : (*api).active_state_update_event_set(&mut (*sk));
243 0 : }
244 0 : }
245 :
246 4 : extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
247 4 : unsafe {
248 4 : let callback_data = (*(*(*sk).wp).config).callback_data;
249 4 : let api = callback_data as *mut Box<dyn ApiImpl>;
250 4 : (*api).add_safekeeper_event_set(&mut (*sk), events);
251 4 : }
252 4 : }
253 :
254 2 : extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
255 2 : unsafe {
256 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
257 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
258 2 : (*api).rm_safekeeper_event_set(&mut (*sk));
259 2 : }
260 2 : }
261 :
262 8 : extern "C" fn wait_event_set(
263 8 : wp: *mut WalProposer,
264 8 : timeout: ::std::os::raw::c_long,
265 8 : event_sk: *mut *mut Safekeeper,
266 8 : events: *mut uint32,
267 8 : ) -> ::std::os::raw::c_int {
268 8 : unsafe {
269 8 : let callback_data = (*(*wp).config).callback_data;
270 8 : let api = callback_data as *mut Box<dyn ApiImpl>;
271 8 : let result = (*api).wait_event_set(&mut (*wp), timeout);
272 8 : match result {
273 : WaitResult::Latch => {
274 0 : *event_sk = std::ptr::null_mut();
275 0 : *events = crate::bindings::WL_LATCH_SET;
276 0 : 1
277 : }
278 : WaitResult::Timeout => {
279 0 : *event_sk = std::ptr::null_mut();
280 0 : *events = crate::bindings::WL_TIMEOUT;
281 0 : 0
282 : }
283 8 : WaitResult::Network(sk, event_mask) => {
284 8 : *event_sk = sk;
285 8 : *events = event_mask;
286 8 : 1
287 : }
288 : }
289 : }
290 8 : }
291 :
292 2 : extern "C" fn strong_random(
293 2 : wp: *mut WalProposer,
294 2 : buf: *mut ::std::os::raw::c_void,
295 2 : len: usize,
296 2 : ) -> bool {
297 2 : unsafe {
298 2 : let buf = std::slice::from_raw_parts_mut(buf as *mut u8, len);
299 2 : let callback_data = (*(*wp).config).callback_data;
300 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
301 2 : (*api).strong_random(buf)
302 2 : }
303 2 : }
304 :
305 0 : extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
306 0 : unsafe {
307 0 : let callback_data = (*(*wp).config).callback_data;
308 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
309 0 : (*api).get_redo_start_lsn()
310 0 : }
311 0 : }
312 :
313 2 : extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
314 2 : unsafe {
315 2 : let callback_data = (*(*wp).config).callback_data;
316 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
317 2 : (*api).finish_sync_safekeepers(lsn)
318 2 : }
319 2 : }
320 :
321 0 : extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
322 0 : unsafe {
323 0 : let callback_data = (*(*wp).config).callback_data;
324 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
325 0 : (*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
326 0 : }
327 0 : }
328 :
329 14 : extern "C" fn log_internal(
330 14 : wp: *mut WalProposer,
331 14 : level: ::std::os::raw::c_int,
332 14 : line: *const ::std::os::raw::c_char,
333 14 : ) {
334 14 : unsafe {
335 14 : let callback_data = (*(*wp).config).callback_data;
336 14 : let api = callback_data as *mut Box<dyn ApiImpl>;
337 14 : let line = CStr::from_ptr(line);
338 14 : let line = line.to_str().unwrap();
339 14 : (*api).log_internal(&mut (*wp), Level::from(level as u32), line)
340 14 : }
341 14 : }
342 :
343 14 : #[derive(Debug)]
344 : pub enum Level {
345 : Debug5,
346 : Debug4,
347 : Debug3,
348 : Debug2,
349 : Debug1,
350 : Log,
351 : Info,
352 : Notice,
353 : Warning,
354 : Error,
355 : Fatal,
356 : Panic,
357 : WPEvent,
358 : }
359 :
360 : impl Level {
361 14 : pub fn from(elevel: u32) -> Level {
362 14 : use crate::bindings::*;
363 14 :
364 14 : match elevel {
365 0 : DEBUG5 => Level::Debug5,
366 0 : DEBUG4 => Level::Debug4,
367 0 : DEBUG3 => Level::Debug3,
368 0 : DEBUG2 => Level::Debug2,
369 0 : DEBUG1 => Level::Debug1,
370 14 : LOG => Level::Log,
371 0 : INFO => Level::Info,
372 0 : NOTICE => Level::Notice,
373 0 : WARNING => Level::Warning,
374 0 : ERROR => Level::Error,
375 0 : FATAL => Level::Fatal,
376 0 : PANIC => Level::Panic,
377 0 : WPEVENT => Level::WPEvent,
378 0 : _ => panic!("unknown log level {}", elevel),
379 : }
380 14 : }
381 : }
382 :
383 2 : pub(crate) fn create_api() -> walproposer_api {
384 2 : walproposer_api {
385 2 : get_shmem_state: Some(get_shmem_state),
386 2 : start_streaming: Some(start_streaming),
387 2 : get_flush_rec_ptr: Some(get_flush_rec_ptr),
388 2 : get_current_timestamp: Some(get_current_timestamp),
389 2 : conn_error_message: Some(conn_error_message),
390 2 : conn_status: Some(conn_status),
391 2 : conn_connect_start: Some(conn_connect_start),
392 2 : conn_connect_poll: Some(conn_connect_poll),
393 2 : conn_send_query: Some(conn_send_query),
394 2 : conn_get_query_result: Some(conn_get_query_result),
395 2 : conn_flush: Some(conn_flush),
396 2 : conn_finish: Some(conn_finish),
397 2 : conn_async_read: Some(conn_async_read),
398 2 : conn_async_write: Some(conn_async_write),
399 2 : conn_blocking_write: Some(conn_blocking_write),
400 2 : recovery_download: Some(recovery_download),
401 2 : wal_reader_allocate: Some(wal_reader_allocate),
402 2 : wal_read: Some(wal_read),
403 2 : wal_reader_events: Some(wal_reader_events),
404 2 : init_event_set: Some(init_event_set),
405 2 : update_event_set: Some(update_event_set),
406 2 : active_state_update_event_set: Some(active_state_update_event_set),
407 2 : add_safekeeper_event_set: Some(add_safekeeper_event_set),
408 2 : rm_safekeeper_event_set: Some(rm_safekeeper_event_set),
409 2 : wait_event_set: Some(wait_event_set),
410 2 : strong_random: Some(strong_random),
411 2 : get_redo_start_lsn: Some(get_redo_start_lsn),
412 2 : finish_sync_safekeepers: Some(finish_sync_safekeepers),
413 2 : process_safekeeper_feedback: Some(process_safekeeper_feedback),
414 2 : log_internal: Some(log_internal),
415 2 : }
416 2 : }
417 :
418 : impl std::fmt::Display for Level {
419 14 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
420 14 : write!(f, "{:?}", self)
421 14 : }
422 : }
423 :
424 : /// Take ownership of `Vec<u8>` from StringInfoData.
425 : #[allow(clippy::unnecessary_cast)]
426 6 : pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option<Vec<u8>> {
427 6 : if pg.data.is_null() {
428 2 : return None;
429 4 : }
430 4 :
431 4 : let ptr = pg.data as *mut u8;
432 4 : let length = pg.len as usize;
433 4 : let capacity = pg.maxlen as usize;
434 4 :
435 4 : pg.data = std::ptr::null_mut();
436 4 : pg.len = 0;
437 4 : pg.maxlen = 0;
438 4 :
439 4 : unsafe { Some(Vec::from_raw_parts(ptr, length, capacity)) }
440 6 : }
441 :
442 : /// Store `Vec<u8>` in StringInfoData.
443 4 : fn store_vec_u8(pg: &mut StringInfoData, vec: Vec<u8>) -> *mut ::std::os::raw::c_char {
444 4 : let ptr = vec.as_ptr() as *mut ::std::os::raw::c_char;
445 4 : let length = vec.len();
446 4 : let capacity = vec.capacity();
447 :
448 4 : assert!(pg.data.is_null());
449 :
450 4 : pg.data = ptr;
451 4 : pg.len = length as i32;
452 4 : pg.maxlen = capacity as i32;
453 4 :
454 4 : std::mem::forget(vec);
455 4 :
456 4 : ptr
457 4 : }
|