LCOV - code coverage report
Current view: top level - pgxn/neon/communicator/src/worker_process - logging.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 109 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 10 0

            Line data    Source code
       1              : //! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
       2              : //!
       3              : //! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
       4              : //! process latch is raised. That wakes up the loop in the main thread, see
       5              : //! `communicator_new_bgworker_main()`. It reads the message from the channel and
       6              : //! ereport()s it. This ensures that only one thread, the main thread, calls the
       7              : //! PostgreSQL logging routines at any time.
       8              : 
       9              : use std::ffi::c_char;
      10              : use std::sync::atomic::{AtomicU64, Ordering};
      11              : use std::sync::mpsc::sync_channel;
      12              : use std::sync::mpsc::{Receiver, SyncSender};
      13              : use std::sync::mpsc::{TryRecvError, TrySendError};
      14              : 
      15              : use tracing::info;
      16              : use tracing::{Event, Level, Metadata, Subscriber};
      17              : use tracing_subscriber::filter::LevelFilter;
      18              : use tracing_subscriber::fmt::format::Writer;
      19              : use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter};
      20              : use tracing_subscriber::registry::LookupSpan;
      21              : 
      22              : use crate::worker_process::callbacks::callback_set_my_latch;
      23              : 
      24              : /// This handle is passed to the C code, and used by [`communicator_worker_poll_logging`]
      25              : pub struct LoggingReceiver {
      26              :     receiver: Receiver<FormattedEventWithMeta>,
      27              : }
      28              : 
      29              : /// This is passed to `tracing`
      30              : struct LoggingSender {
      31              :     sender: SyncSender<FormattedEventWithMeta>,
      32              : }
      33              : 
      34              : static DROPPED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
      35              : 
      36              : /// Called once, at worker process startup. The returned LoggingState is passed back
      37              : /// in the subsequent calls to `pump_logging`. It is opaque to the C code.
      38              : #[unsafe(no_mangle)]
      39            0 : pub extern "C" fn communicator_worker_configure_logging() -> Box<LoggingReceiver> {
      40            0 :     let (sender, receiver) = sync_channel(1000);
      41              : 
      42            0 :     let receiver = LoggingReceiver { receiver };
      43            0 :     let sender = LoggingSender { sender };
      44              : 
      45              :     use tracing_subscriber::prelude::*;
      46            0 :     let r = tracing_subscriber::registry();
      47              : 
      48            0 :     let r = r.with(
      49            0 :         tracing_subscriber::fmt::layer()
      50            0 :             .with_ansi(false)
      51            0 :             .event_format(SimpleFormatter)
      52            0 :             .with_writer(sender)
      53              :             // TODO: derive this from log_min_messages? Currently the code in
      54              :             // communicator_process.c forces log_min_messages='INFO'.
      55            0 :             .with_filter(LevelFilter::from_level(Level::INFO)),
      56              :     );
      57            0 :     r.init();
      58              : 
      59            0 :     info!("communicator process logging started");
      60              : 
      61            0 :     Box::new(receiver)
      62            0 : }
      63              : 
      64              : /// Read one message from the logging queue. This is essentially a wrapper to Receiver,
      65              : /// with a C-friendly signature.
      66              : ///
      67              : /// The message is copied into *errbuf, which is a caller-supplied buffer of size
      68              : /// `errbuf_len`.  If the message doesn't fit in the buffer, it is truncated. It is always
      69              : /// NULL-terminated.
      70              : ///
      71              : /// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see
      72              : /// elog.h
      73              : ///
      74              : /// If there was a message, *dropped_event_count_p is also updated with a counter of how
      75              : /// many log messages in total has been dropped. By comparing that with the value from
      76              : /// previous call, you can tell how many were dropped since last call.
      77              : ///
      78              : /// Returns:
      79              : ///
      80              : ///   0 if there were no messages
      81              : ///   1 if there was a message. The message and its level are returned in
      82              : ///     *errbuf and *elevel_p. *dropped_event_count_p is also updated.
      83              : ///  -1 on error, i.e the other end of the queue was disconnected
      84              : #[unsafe(no_mangle)]
      85            0 : pub extern "C" fn communicator_worker_poll_logging(
      86            0 :     state: &mut LoggingReceiver,
      87            0 :     errbuf: *mut c_char,
      88            0 :     errbuf_len: u32,
      89            0 :     elevel_p: &mut i32,
      90            0 :     dropped_event_count_p: &mut u64,
      91            0 : ) -> i32 {
      92            0 :     let msg = match state.receiver.try_recv() {
      93            0 :         Err(TryRecvError::Empty) => return 0,
      94            0 :         Err(TryRecvError::Disconnected) => return -1,
      95            0 :         Ok(msg) => msg,
      96              :     };
      97              : 
      98            0 :     let src: &[u8] = &msg.message;
      99            0 :     let dst: *mut u8 = errbuf.cast();
     100            0 :     let len = std::cmp::min(src.len(), errbuf_len as usize - 1);
     101            0 :     unsafe {
     102            0 :         std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
     103            0 :         *(dst.add(len)) = b'\0'; // NULL terminator
     104            0 :     }
     105              : 
     106              :     // Map the tracing Level to PostgreSQL elevel.
     107              :     //
     108              :     // XXX: These levels are copied from PostgreSQL's elog.h. Introduce another enum to
     109              :     // hide these?
     110            0 :     *elevel_p = match msg.level {
     111            0 :         Level::TRACE => 10, // DEBUG5
     112            0 :         Level::DEBUG => 14, // DEBUG1
     113            0 :         Level::INFO => 17,  // INFO
     114            0 :         Level::WARN => 19,  // WARNING
     115            0 :         Level::ERROR => 21, // ERROR
     116              :     };
     117              : 
     118            0 :     *dropped_event_count_p = DROPPED_EVENT_COUNT.load(Ordering::Relaxed);
     119              : 
     120            0 :     1
     121            0 : }
     122              : 
     123              : //---- The following functions can be called from any thread ----
     124              : 
     125              : #[derive(Clone)]
     126              : struct FormattedEventWithMeta {
     127              :     message: Vec<u8>,
     128              :     level: tracing::Level,
     129              : }
     130              : 
     131              : impl Default for FormattedEventWithMeta {
     132            0 :     fn default() -> Self {
     133            0 :         FormattedEventWithMeta {
     134            0 :             message: Vec::new(),
     135            0 :             level: tracing::Level::DEBUG,
     136            0 :         }
     137            0 :     }
     138              : }
     139              : 
     140              : struct EventBuilder<'a> {
     141              :     event: FormattedEventWithMeta,
     142              : 
     143              :     sender: &'a LoggingSender,
     144              : }
     145              : 
     146              : impl std::io::Write for EventBuilder<'_> {
     147            0 :     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
     148            0 :         self.event.message.write(buf)
     149            0 :     }
     150            0 :     fn flush(&mut self) -> std::io::Result<()> {
     151            0 :         self.sender.send_event(self.event.clone());
     152            0 :         Ok(())
     153            0 :     }
     154              : }
     155              : 
     156              : impl Drop for EventBuilder<'_> {
     157            0 :     fn drop(&mut self) {
     158            0 :         let sender = self.sender;
     159            0 :         let event = std::mem::take(&mut self.event);
     160              : 
     161            0 :         sender.send_event(event);
     162            0 :     }
     163              : }
     164              : 
     165              : impl<'a> MakeWriter<'a> for LoggingSender {
     166              :     type Writer = EventBuilder<'a>;
     167              : 
     168            0 :     fn make_writer(&'a self) -> Self::Writer {
     169            0 :         panic!("not expected to be called when make_writer_for is implemented");
     170              :     }
     171              : 
     172            0 :     fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer {
     173            0 :         EventBuilder {
     174            0 :             event: FormattedEventWithMeta {
     175            0 :                 message: Vec::new(),
     176            0 :                 level: *meta.level(),
     177            0 :             },
     178            0 :             sender: self,
     179            0 :         }
     180            0 :     }
     181              : }
     182              : 
     183              : impl LoggingSender {
     184            0 :     fn send_event(&self, e: FormattedEventWithMeta) {
     185            0 :         match self.sender.try_send(e) {
     186            0 :             Ok(()) => {
     187            0 :                 // notify the main thread
     188            0 :                 callback_set_my_latch();
     189            0 :             }
     190            0 :             Err(TrySendError::Disconnected(_)) => {}
     191            0 :             Err(TrySendError::Full(_)) => {
     192            0 :                 // The queue is full, cannot send any more. To avoid blocking the tokio
     193            0 :                 // thread, simply drop the message. Better to lose some logs than get
     194            0 :                 // stuck if there's a problem with the logging.
     195            0 :                 //
     196            0 :                 // Record the fact that was a message was dropped by incrementing the
     197            0 :                 // counter.
     198            0 :                 DROPPED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
     199            0 :             }
     200              :         }
     201            0 :     }
     202              : }
     203              : 
     204              : /// Simple formatter implementation for tracing_subscriber, which prints the log spans and
     205              : /// message part like the default formatter, but no timestamp or error level. The error
     206              : /// level is captured separately by `FormattedEventWithMeta', and when the error is
     207              : /// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that
     208              : /// point. (The timestamp printed will therefore lag behind the timestamp on the event
     209              : /// here, if the main thread doesn't process the log message promptly)
     210              : struct SimpleFormatter;
     211              : 
     212              : impl<S, N> FormatEvent<S, N> for SimpleFormatter
     213              : where
     214              :     S: Subscriber + for<'a> LookupSpan<'a>,
     215              :     N: for<'a> FormatFields<'a> + 'static,
     216              : {
     217            0 :     fn format_event(
     218            0 :         &self,
     219            0 :         ctx: &FmtContext<'_, S, N>,
     220            0 :         mut writer: Writer<'_>,
     221            0 :         event: &Event<'_>,
     222            0 :     ) -> std::fmt::Result {
     223              :         // Format all the spans in the event's span context.
     224            0 :         if let Some(scope) = ctx.event_scope() {
     225            0 :             for span in scope.from_root() {
     226            0 :                 write!(writer, "{}", span.name())?;
     227              : 
     228              :                 // `FormattedFields` is a formatted representation of the span's fields,
     229              :                 // which is stored in its extensions by the `fmt` layer's `new_span`
     230              :                 // method. The fields will have been formatted by the same field formatter
     231              :                 // that's provided to the event formatter in the `FmtContext`.
     232            0 :                 let ext = span.extensions();
     233            0 :                 let fields = &ext
     234            0 :                     .get::<FormattedFields<N>>()
     235            0 :                     .expect("will never be `None`");
     236              : 
     237              :                 // Skip formatting the fields if the span had no fields.
     238            0 :                 if !fields.is_empty() {
     239            0 :                     write!(writer, "{{{fields}}}")?;
     240            0 :                 }
     241            0 :                 write!(writer, ": ")?;
     242              :             }
     243            0 :         }
     244              : 
     245              :         // Write fields on the event
     246            0 :         ctx.field_format().format_fields(writer.by_ref(), event)?;
     247              : 
     248            0 :         Ok(())
     249            0 :     }
     250              : }
        

Generated by: LCOV version 2.1-beta