Line data Source code
1 : //! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
2 : //!
3 : //! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
4 : //! process latch is raised. That wakes up the loop in the main thread, see
5 : //! `communicator_new_bgworker_main()`. It reads the message from the channel and
6 : //! ereport()s it. This ensures that only one thread, the main thread, calls the
7 : //! PostgreSQL logging routines at any time.
8 :
9 : use std::ffi::c_char;
10 : use std::sync::atomic::{AtomicU64, Ordering};
11 : use std::sync::mpsc::sync_channel;
12 : use std::sync::mpsc::{Receiver, SyncSender};
13 : use std::sync::mpsc::{TryRecvError, TrySendError};
14 :
15 : use tracing::info;
16 : use tracing::{Event, Level, Metadata, Subscriber};
17 : use tracing_subscriber::filter::LevelFilter;
18 : use tracing_subscriber::fmt::format::Writer;
19 : use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter};
20 : use tracing_subscriber::registry::LookupSpan;
21 :
22 : use crate::worker_process::callbacks::callback_set_my_latch;
23 :
24 : /// This handle is passed to the C code, and used by [`communicator_worker_poll_logging`]
25 : pub struct LoggingReceiver {
26 : receiver: Receiver<FormattedEventWithMeta>,
27 : }
28 :
29 : /// This is passed to `tracing`
30 : struct LoggingSender {
31 : sender: SyncSender<FormattedEventWithMeta>,
32 : }
33 :
34 : static DROPPED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
35 :
36 : /// Called once, at worker process startup. The returned LoggingState is passed back
37 : /// in the subsequent calls to `pump_logging`. It is opaque to the C code.
38 : #[unsafe(no_mangle)]
39 0 : pub extern "C" fn communicator_worker_configure_logging() -> Box<LoggingReceiver> {
40 0 : let (sender, receiver) = sync_channel(1000);
41 :
42 0 : let receiver = LoggingReceiver { receiver };
43 0 : let sender = LoggingSender { sender };
44 :
45 : use tracing_subscriber::prelude::*;
46 0 : let r = tracing_subscriber::registry();
47 :
48 0 : let r = r.with(
49 0 : tracing_subscriber::fmt::layer()
50 0 : .with_ansi(false)
51 0 : .event_format(SimpleFormatter)
52 0 : .with_writer(sender)
53 : // TODO: derive this from log_min_messages? Currently the code in
54 : // communicator_process.c forces log_min_messages='INFO'.
55 0 : .with_filter(LevelFilter::from_level(Level::INFO)),
56 : );
57 0 : r.init();
58 :
59 0 : info!("communicator process logging started");
60 :
61 0 : Box::new(receiver)
62 0 : }
63 :
64 : /// Read one message from the logging queue. This is essentially a wrapper to Receiver,
65 : /// with a C-friendly signature.
66 : ///
67 : /// The message is copied into *errbuf, which is a caller-supplied buffer of size
68 : /// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always
69 : /// NULL-terminated.
70 : ///
71 : /// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see
72 : /// elog.h
73 : ///
74 : /// If there was a message, *dropped_event_count_p is also updated with a counter of how
75 : /// many log messages in total has been dropped. By comparing that with the value from
76 : /// previous call, you can tell how many were dropped since last call.
77 : ///
78 : /// Returns:
79 : ///
80 : /// 0 if there were no messages
81 : /// 1 if there was a message. The message and its level are returned in
82 : /// *errbuf and *elevel_p. *dropped_event_count_p is also updated.
83 : /// -1 on error, i.e the other end of the queue was disconnected
84 : #[unsafe(no_mangle)]
85 0 : pub extern "C" fn communicator_worker_poll_logging(
86 0 : state: &mut LoggingReceiver,
87 0 : errbuf: *mut c_char,
88 0 : errbuf_len: u32,
89 0 : elevel_p: &mut i32,
90 0 : dropped_event_count_p: &mut u64,
91 0 : ) -> i32 {
92 0 : let msg = match state.receiver.try_recv() {
93 0 : Err(TryRecvError::Empty) => return 0,
94 0 : Err(TryRecvError::Disconnected) => return -1,
95 0 : Ok(msg) => msg,
96 : };
97 :
98 0 : let src: &[u8] = &msg.message;
99 0 : let dst: *mut u8 = errbuf.cast();
100 0 : let len = std::cmp::min(src.len(), errbuf_len as usize - 1);
101 0 : unsafe {
102 0 : std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
103 0 : *(dst.add(len)) = b'\0'; // NULL terminator
104 0 : }
105 :
106 : // Map the tracing Level to PostgreSQL elevel.
107 : //
108 : // XXX: These levels are copied from PostgreSQL's elog.h. Introduce another enum to
109 : // hide these?
110 0 : *elevel_p = match msg.level {
111 0 : Level::TRACE => 10, // DEBUG5
112 0 : Level::DEBUG => 14, // DEBUG1
113 0 : Level::INFO => 17, // INFO
114 0 : Level::WARN => 19, // WARNING
115 0 : Level::ERROR => 21, // ERROR
116 : };
117 :
118 0 : *dropped_event_count_p = DROPPED_EVENT_COUNT.load(Ordering::Relaxed);
119 :
120 0 : 1
121 0 : }
122 :
123 : //---- The following functions can be called from any thread ----
124 :
125 : #[derive(Clone)]
126 : struct FormattedEventWithMeta {
127 : message: Vec<u8>,
128 : level: tracing::Level,
129 : }
130 :
131 : impl Default for FormattedEventWithMeta {
132 0 : fn default() -> Self {
133 0 : FormattedEventWithMeta {
134 0 : message: Vec::new(),
135 0 : level: tracing::Level::DEBUG,
136 0 : }
137 0 : }
138 : }
139 :
140 : struct EventBuilder<'a> {
141 : event: FormattedEventWithMeta,
142 :
143 : sender: &'a LoggingSender,
144 : }
145 :
146 : impl std::io::Write for EventBuilder<'_> {
147 0 : fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
148 0 : self.event.message.write(buf)
149 0 : }
150 0 : fn flush(&mut self) -> std::io::Result<()> {
151 0 : self.sender.send_event(self.event.clone());
152 0 : Ok(())
153 0 : }
154 : }
155 :
156 : impl Drop for EventBuilder<'_> {
157 0 : fn drop(&mut self) {
158 0 : let sender = self.sender;
159 0 : let event = std::mem::take(&mut self.event);
160 :
161 0 : sender.send_event(event);
162 0 : }
163 : }
164 :
165 : impl<'a> MakeWriter<'a> for LoggingSender {
166 : type Writer = EventBuilder<'a>;
167 :
168 0 : fn make_writer(&'a self) -> Self::Writer {
169 0 : panic!("not expected to be called when make_writer_for is implemented");
170 : }
171 :
172 0 : fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer {
173 0 : EventBuilder {
174 0 : event: FormattedEventWithMeta {
175 0 : message: Vec::new(),
176 0 : level: *meta.level(),
177 0 : },
178 0 : sender: self,
179 0 : }
180 0 : }
181 : }
182 :
183 : impl LoggingSender {
184 0 : fn send_event(&self, e: FormattedEventWithMeta) {
185 0 : match self.sender.try_send(e) {
186 0 : Ok(()) => {
187 0 : // notify the main thread
188 0 : callback_set_my_latch();
189 0 : }
190 0 : Err(TrySendError::Disconnected(_)) => {}
191 0 : Err(TrySendError::Full(_)) => {
192 0 : // The queue is full, cannot send any more. To avoid blocking the tokio
193 0 : // thread, simply drop the message. Better to lose some logs than get
194 0 : // stuck if there's a problem with the logging.
195 0 : //
196 0 : // Record the fact that was a message was dropped by incrementing the
197 0 : // counter.
198 0 : DROPPED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
199 0 : }
200 : }
201 0 : }
202 : }
203 :
204 : /// Simple formatter implementation for tracing_subscriber, which prints the log spans and
205 : /// message part like the default formatter, but no timestamp or error level. The error
206 : /// level is captured separately by `FormattedEventWithMeta', and when the error is
207 : /// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that
208 : /// point. (The timestamp printed will therefore lag behind the timestamp on the event
209 : /// here, if the main thread doesn't process the log message promptly)
210 : struct SimpleFormatter;
211 :
212 : impl<S, N> FormatEvent<S, N> for SimpleFormatter
213 : where
214 : S: Subscriber + for<'a> LookupSpan<'a>,
215 : N: for<'a> FormatFields<'a> + 'static,
216 : {
217 0 : fn format_event(
218 0 : &self,
219 0 : ctx: &FmtContext<'_, S, N>,
220 0 : mut writer: Writer<'_>,
221 0 : event: &Event<'_>,
222 0 : ) -> std::fmt::Result {
223 : // Format all the spans in the event's span context.
224 0 : if let Some(scope) = ctx.event_scope() {
225 0 : for span in scope.from_root() {
226 0 : write!(writer, "{}", span.name())?;
227 :
228 : // `FormattedFields` is a formatted representation of the span's fields,
229 : // which is stored in its extensions by the `fmt` layer's `new_span`
230 : // method. The fields will have been formatted by the same field formatter
231 : // that's provided to the event formatter in the `FmtContext`.
232 0 : let ext = span.extensions();
233 0 : let fields = &ext
234 0 : .get::<FormattedFields<N>>()
235 0 : .expect("will never be `None`");
236 :
237 : // Skip formatting the fields if the span had no fields.
238 0 : if !fields.is_empty() {
239 0 : write!(writer, "{{{fields}}}")?;
240 0 : }
241 0 : write!(writer, ": ")?;
242 : }
243 0 : }
244 :
245 : // Write fields on the event
246 0 : ctx.field_format().format_fields(writer.by_ref(), event)?;
247 :
248 0 : Ok(())
249 0 : }
250 : }
|