TLA Line data Source code
1 : //! A C-Rust shim: defines implementation of C walproposer API, assuming wp
2 : //! callback_data stores Box to some Rust implementation.
3 :
4 : #![allow(dead_code)]
5 :
6 : use std::ffi::CStr;
7 : use std::ffi::CString;
8 :
9 : use crate::bindings::uint32;
10 : use crate::bindings::walproposer_api;
11 : use crate::bindings::NeonWALReadResult;
12 : use crate::bindings::PGAsyncReadResult;
13 : use crate::bindings::PGAsyncWriteResult;
14 : use crate::bindings::Safekeeper;
15 : use crate::bindings::Size;
16 : use crate::bindings::StringInfoData;
17 : use crate::bindings::TimestampTz;
18 : use crate::bindings::WalProposer;
19 : use crate::bindings::WalProposerConnStatusType;
20 : use crate::bindings::WalProposerConnectPollStatusType;
21 : use crate::bindings::WalProposerExecStatusType;
22 : use crate::bindings::WalproposerShmemState;
23 : use crate::bindings::XLogRecPtr;
24 : use crate::walproposer::ApiImpl;
25 : use crate::walproposer::WaitResult;
26 :
27 UBC 0 : extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
28 0 : unsafe {
29 0 : let callback_data = (*(*wp).config).callback_data;
30 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
31 0 : (*api).get_shmem_state()
32 0 : }
33 0 : }
34 :
35 0 : extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
36 0 : unsafe {
37 0 : let callback_data = (*(*wp).config).callback_data;
38 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
39 0 : (*api).start_streaming(startpos)
40 0 : }
41 0 : }
42 :
43 0 : extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
44 0 : unsafe {
45 0 : let callback_data = (*(*wp).config).callback_data;
46 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
47 0 : (*api).get_flush_rec_ptr()
48 0 : }
49 0 : }
50 :
51 CBC 14 : extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
52 14 : unsafe {
53 14 : let callback_data = (*(*wp).config).callback_data;
54 14 : let api = callback_data as *mut Box<dyn ApiImpl>;
55 14 : (*api).get_current_timestamp()
56 14 : }
57 14 : }
58 :
59 UBC 0 : extern "C" fn conn_error_message(sk: *mut Safekeeper) -> *mut ::std::os::raw::c_char {
60 0 : unsafe {
61 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
62 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
63 0 : let msg = (*api).conn_error_message(&mut (*sk));
64 0 : let msg = CString::new(msg).unwrap();
65 0 : // TODO: fix leaking error message
66 0 : msg.into_raw()
67 0 : }
68 0 : }
69 :
70 CBC 1 : extern "C" fn conn_status(sk: *mut Safekeeper) -> WalProposerConnStatusType {
71 1 : unsafe {
72 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
73 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
74 1 : (*api).conn_status(&mut (*sk))
75 1 : }
76 1 : }
77 :
78 1 : extern "C" fn conn_connect_start(sk: *mut Safekeeper) {
79 1 : unsafe {
80 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
81 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
82 1 : (*api).conn_connect_start(&mut (*sk))
83 1 : }
84 1 : }
85 :
86 1 : extern "C" fn conn_connect_poll(sk: *mut Safekeeper) -> WalProposerConnectPollStatusType {
87 1 : unsafe {
88 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
89 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
90 1 : (*api).conn_connect_poll(&mut (*sk))
91 1 : }
92 1 : }
93 :
94 1 : extern "C" fn conn_send_query(sk: *mut Safekeeper, query: *mut ::std::os::raw::c_char) -> bool {
95 1 : let query = unsafe { CStr::from_ptr(query) };
96 1 : let query = query.to_str().unwrap();
97 1 :
98 1 : unsafe {
99 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
100 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
101 1 : (*api).conn_send_query(&mut (*sk), query)
102 1 : }
103 1 : }
104 :
105 1 : extern "C" fn conn_get_query_result(sk: *mut Safekeeper) -> WalProposerExecStatusType {
106 1 : unsafe {
107 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
108 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
109 1 : (*api).conn_get_query_result(&mut (*sk))
110 1 : }
111 1 : }
112 :
113 UBC 0 : extern "C" fn conn_flush(sk: *mut Safekeeper) -> ::std::os::raw::c_int {
114 0 : unsafe {
115 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
116 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
117 0 : (*api).conn_flush(&mut (*sk))
118 0 : }
119 0 : }
120 :
121 0 : extern "C" fn conn_finish(sk: *mut Safekeeper) {
122 0 : unsafe {
123 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
124 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
125 0 : (*api).conn_finish(&mut (*sk))
126 0 : }
127 0 : }
128 :
129 CBC 2 : extern "C" fn conn_async_read(
130 2 : sk: *mut Safekeeper,
131 2 : buf: *mut *mut ::std::os::raw::c_char,
132 2 : amount: *mut ::std::os::raw::c_int,
133 2 : ) -> PGAsyncReadResult {
134 2 : unsafe {
135 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
136 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
137 2 : let (res, result) = (*api).conn_async_read(&mut (*sk));
138 2 :
139 2 : // This function has guarantee that returned buf will be valid until
140 2 : // the next call. So we can store a Vec in each Safekeeper and reuse
141 2 : // it on the next call.
142 2 : let mut inbuf = take_vec_u8(&mut (*sk).inbuf).unwrap_or_default();
143 2 :
144 2 : inbuf.clear();
145 2 : inbuf.extend_from_slice(res);
146 2 :
147 2 : // Put a Vec back to sk->inbuf and return data ptr.
148 2 : *buf = store_vec_u8(&mut (*sk).inbuf, inbuf);
149 2 : *amount = res.len() as i32;
150 2 :
151 2 : result
152 2 : }
153 2 : }
154 :
155 UBC 0 : extern "C" fn conn_async_write(
156 0 : sk: *mut Safekeeper,
157 0 : buf: *const ::std::os::raw::c_void,
158 0 : size: usize,
159 0 : ) -> PGAsyncWriteResult {
160 0 : unsafe {
161 0 : let buf = std::slice::from_raw_parts(buf as *const u8, size);
162 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
163 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
164 0 : (*api).conn_async_write(&mut (*sk), buf)
165 0 : }
166 0 : }
167 :
168 CBC 2 : extern "C" fn conn_blocking_write(
169 2 : sk: *mut Safekeeper,
170 2 : buf: *const ::std::os::raw::c_void,
171 2 : size: usize,
172 2 : ) -> bool {
173 2 : unsafe {
174 2 : let buf = std::slice::from_raw_parts(buf as *const u8, size);
175 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
176 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
177 2 : (*api).conn_blocking_write(&mut (*sk), buf)
178 2 : }
179 2 : }
180 :
181 1 : extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
182 1 : unsafe {
183 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
184 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
185 1 : (*api).recovery_download(&mut (*wp), &mut (*sk))
186 1 : }
187 1 : }
188 :
189 UBC 0 : extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
190 0 : unsafe {
191 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
192 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
193 0 : (*api).wal_reader_allocate(&mut (*sk));
194 0 : }
195 0 : }
196 :
197 : #[allow(clippy::unnecessary_cast)]
198 0 : extern "C" fn wal_read(
199 0 : sk: *mut Safekeeper,
200 0 : buf: *mut ::std::os::raw::c_char,
201 0 : startptr: XLogRecPtr,
202 0 : count: Size,
203 0 : _errmsg: *mut *mut ::std::os::raw::c_char,
204 0 : ) -> NeonWALReadResult {
205 0 : unsafe {
206 0 : let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
207 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
208 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
209 0 : // TODO: errmsg is not forwarded
210 0 : (*api).wal_read(&mut (*sk), buf, startptr)
211 0 : }
212 0 : }
213 :
214 0 : extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 {
215 0 : unsafe {
216 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
217 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
218 0 : (*api).wal_reader_events(&mut (*sk))
219 0 : }
220 0 : }
221 :
222 CBC 1 : extern "C" fn init_event_set(wp: *mut WalProposer) {
223 1 : unsafe {
224 1 : let callback_data = (*(*wp).config).callback_data;
225 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
226 1 : (*api).init_event_set(&mut (*wp));
227 1 : }
228 1 : }
229 :
230 4 : extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) {
231 4 : unsafe {
232 4 : let callback_data = (*(*(*sk).wp).config).callback_data;
233 4 : let api = callback_data as *mut Box<dyn ApiImpl>;
234 4 : (*api).update_event_set(&mut (*sk), events);
235 4 : }
236 4 : }
237 :
238 UBC 0 : extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) {
239 0 : unsafe {
240 0 : let callback_data = (*(*(*sk).wp).config).callback_data;
241 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
242 0 : (*api).active_state_update_event_set(&mut (*sk));
243 0 : }
244 0 : }
245 :
246 CBC 2 : extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
247 2 : unsafe {
248 2 : let callback_data = (*(*(*sk).wp).config).callback_data;
249 2 : let api = callback_data as *mut Box<dyn ApiImpl>;
250 2 : (*api).add_safekeeper_event_set(&mut (*sk), events);
251 2 : }
252 2 : }
253 :
254 1 : extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
255 1 : unsafe {
256 1 : let callback_data = (*(*(*sk).wp).config).callback_data;
257 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
258 1 : (*api).rm_safekeeper_event_set(&mut (*sk));
259 1 : }
260 1 : }
261 :
262 4 : extern "C" fn wait_event_set(
263 4 : wp: *mut WalProposer,
264 4 : timeout: ::std::os::raw::c_long,
265 4 : event_sk: *mut *mut Safekeeper,
266 4 : events: *mut uint32,
267 4 : ) -> ::std::os::raw::c_int {
268 4 : unsafe {
269 4 : let callback_data = (*(*wp).config).callback_data;
270 4 : let api = callback_data as *mut Box<dyn ApiImpl>;
271 4 : let result = (*api).wait_event_set(&mut (*wp), timeout);
272 4 : match result {
273 : WaitResult::Latch => {
274 UBC 0 : *event_sk = std::ptr::null_mut();
275 0 : *events = crate::bindings::WL_LATCH_SET;
276 0 : 1
277 : }
278 : WaitResult::Timeout => {
279 0 : *event_sk = std::ptr::null_mut();
280 0 : *events = crate::bindings::WL_TIMEOUT;
281 0 : 0
282 : }
283 CBC 4 : WaitResult::Network(sk, event_mask) => {
284 4 : *event_sk = sk;
285 4 : *events = event_mask;
286 4 : 1
287 : }
288 : }
289 : }
290 4 : }
291 :
292 1 : extern "C" fn strong_random(
293 1 : wp: *mut WalProposer,
294 1 : buf: *mut ::std::os::raw::c_void,
295 1 : len: usize,
296 1 : ) -> bool {
297 1 : unsafe {
298 1 : let buf = std::slice::from_raw_parts_mut(buf as *mut u8, len);
299 1 : let callback_data = (*(*wp).config).callback_data;
300 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
301 1 : (*api).strong_random(buf)
302 1 : }
303 1 : }
304 :
305 UBC 0 : extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
306 0 : unsafe {
307 0 : let callback_data = (*(*wp).config).callback_data;
308 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
309 0 : (*api).get_redo_start_lsn()
310 0 : }
311 0 : }
312 :
313 CBC 1 : extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
314 1 : unsafe {
315 1 : let callback_data = (*(*wp).config).callback_data;
316 1 : let api = callback_data as *mut Box<dyn ApiImpl>;
317 1 : (*api).finish_sync_safekeepers(lsn)
318 1 : }
319 1 : }
320 :
321 UBC 0 : extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
322 0 : unsafe {
323 0 : let callback_data = (*(*wp).config).callback_data;
324 0 : let api = callback_data as *mut Box<dyn ApiImpl>;
325 0 : (*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
326 0 : }
327 0 : }
328 :
329 CBC 7 : extern "C" fn log_internal(
330 7 : wp: *mut WalProposer,
331 7 : level: ::std::os::raw::c_int,
332 7 : line: *const ::std::os::raw::c_char,
333 7 : ) {
334 7 : unsafe {
335 7 : let callback_data = (*(*wp).config).callback_data;
336 7 : let api = callback_data as *mut Box<dyn ApiImpl>;
337 7 : let line = CStr::from_ptr(line);
338 7 : let line = line.to_str().unwrap();
339 7 : (*api).log_internal(&mut (*wp), Level::from(level as u32), line)
340 7 : }
341 7 : }
342 :
343 7 : #[derive(Debug)]
344 : pub enum Level {
345 : Debug5,
346 : Debug4,
347 : Debug3,
348 : Debug2,
349 : Debug1,
350 : Log,
351 : Info,
352 : Notice,
353 : Warning,
354 : Error,
355 : Fatal,
356 : Panic,
357 : WPEvent,
358 : }
359 :
360 : impl Level {
361 7 : pub fn from(elevel: u32) -> Level {
362 7 : use crate::bindings::*;
363 7 :
364 7 : match elevel {
365 UBC 0 : DEBUG5 => Level::Debug5,
366 0 : DEBUG4 => Level::Debug4,
367 0 : DEBUG3 => Level::Debug3,
368 0 : DEBUG2 => Level::Debug2,
369 0 : DEBUG1 => Level::Debug1,
370 CBC 7 : LOG => Level::Log,
371 UBC 0 : INFO => Level::Info,
372 0 : NOTICE => Level::Notice,
373 0 : WARNING => Level::Warning,
374 0 : ERROR => Level::Error,
375 0 : FATAL => Level::Fatal,
376 0 : PANIC => Level::Panic,
377 0 : WPEVENT => Level::WPEvent,
378 0 : _ => panic!("unknown log level {}", elevel),
379 : }
380 CBC 7 : }
381 : }
382 :
383 1 : pub(crate) fn create_api() -> walproposer_api {
384 1 : walproposer_api {
385 1 : get_shmem_state: Some(get_shmem_state),
386 1 : start_streaming: Some(start_streaming),
387 1 : get_flush_rec_ptr: Some(get_flush_rec_ptr),
388 1 : get_current_timestamp: Some(get_current_timestamp),
389 1 : conn_error_message: Some(conn_error_message),
390 1 : conn_status: Some(conn_status),
391 1 : conn_connect_start: Some(conn_connect_start),
392 1 : conn_connect_poll: Some(conn_connect_poll),
393 1 : conn_send_query: Some(conn_send_query),
394 1 : conn_get_query_result: Some(conn_get_query_result),
395 1 : conn_flush: Some(conn_flush),
396 1 : conn_finish: Some(conn_finish),
397 1 : conn_async_read: Some(conn_async_read),
398 1 : conn_async_write: Some(conn_async_write),
399 1 : conn_blocking_write: Some(conn_blocking_write),
400 1 : recovery_download: Some(recovery_download),
401 1 : wal_reader_allocate: Some(wal_reader_allocate),
402 1 : wal_read: Some(wal_read),
403 1 : wal_reader_events: Some(wal_reader_events),
404 1 : init_event_set: Some(init_event_set),
405 1 : update_event_set: Some(update_event_set),
406 1 : active_state_update_event_set: Some(active_state_update_event_set),
407 1 : add_safekeeper_event_set: Some(add_safekeeper_event_set),
408 1 : rm_safekeeper_event_set: Some(rm_safekeeper_event_set),
409 1 : wait_event_set: Some(wait_event_set),
410 1 : strong_random: Some(strong_random),
411 1 : get_redo_start_lsn: Some(get_redo_start_lsn),
412 1 : finish_sync_safekeepers: Some(finish_sync_safekeepers),
413 1 : process_safekeeper_feedback: Some(process_safekeeper_feedback),
414 1 : log_internal: Some(log_internal),
415 1 : }
416 1 : }
417 :
418 : impl std::fmt::Display for Level {
419 7 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
420 7 : write!(f, "{:?}", self)
421 7 : }
422 : }
423 :
424 : /// Take ownership of `Vec<u8>` from StringInfoData.
425 : #[allow(clippy::unnecessary_cast)]
426 3 : pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option<Vec<u8>> {
427 3 : if pg.data.is_null() {
428 1 : return None;
429 2 : }
430 2 :
431 2 : let ptr = pg.data as *mut u8;
432 2 : let length = pg.len as usize;
433 2 : let capacity = pg.maxlen as usize;
434 2 :
435 2 : pg.data = std::ptr::null_mut();
436 2 : pg.len = 0;
437 2 : pg.maxlen = 0;
438 2 :
439 2 : unsafe { Some(Vec::from_raw_parts(ptr, length, capacity)) }
440 3 : }
441 :
442 : /// Store `Vec<u8>` in StringInfoData.
443 2 : fn store_vec_u8(pg: &mut StringInfoData, vec: Vec<u8>) -> *mut ::std::os::raw::c_char {
444 2 : let ptr = vec.as_ptr() as *mut ::std::os::raw::c_char;
445 2 : let length = vec.len();
446 2 : let capacity = vec.capacity();
447 :
448 2 : assert!(pg.data.is_null());
449 :
450 2 : pg.data = ptr;
451 2 : pg.len = length as i32;
452 2 : pg.maxlen = capacity as i32;
453 2 :
454 2 : std::mem::forget(vec);
455 2 :
456 2 : ptr
457 2 : }
|