LCOV - code coverage report
Current view: top level - libs/proxy/tokio-postgres2/src - codec.rs (source / functions) Coverage Total Hit
Test: 1bc5ae03d954f0bf54b121f37b7d3bce4c35c3a4.info Lines: 0.0 % 40 0
Test Date: 2025-07-24 14:52:13 Functions: 0.0 % 4 0

            Line data    Source code
       1              : use std::io;
       2              : 
       3              : use bytes::BytesMut;
       4              : use fallible_iterator::FallibleIterator;
       5              : use postgres_protocol2::message::backend;
       6              : use tokio::sync::mpsc::UnboundedSender;
       7              : use tokio_util::codec::{Decoder, Encoder};
       8              : 
       9              : pub enum FrontendMessage {
      10              :     Raw(BytesMut),
      11              :     RecordNotices(RecordNotices),
      12              : }
      13              : 
      14              : pub struct RecordNotices {
      15              :     pub sender: UnboundedSender<Box<str>>,
      16              :     pub limit: usize,
      17              : }
      18              : 
      19              : pub enum BackendMessage {
      20              :     Normal {
      21              :         messages: BackendMessages,
      22              :         ready: bool,
      23              :     },
      24              :     Async(backend::Message),
      25              : }
      26              : 
      27              : pub struct BackendMessages(BytesMut);
      28              : 
      29              : impl BackendMessages {
      30            0 :     pub fn empty() -> BackendMessages {
      31            0 :         BackendMessages(BytesMut::new())
      32            0 :     }
      33              : }
      34              : 
      35              : impl FallibleIterator for BackendMessages {
      36              :     type Item = backend::Message;
      37              :     type Error = io::Error;
      38              : 
      39            0 :     fn next(&mut self) -> io::Result<Option<backend::Message>> {
      40            0 :         backend::Message::parse(&mut self.0)
      41            0 :     }
      42              : }
      43              : 
      44              : pub struct PostgresCodec;
      45              : 
      46              : impl Encoder<BytesMut> for PostgresCodec {
      47              :     type Error = io::Error;
      48              : 
      49            0 :     fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> io::Result<()> {
      50              :         // When it comes to request/response workflows, we usually flush the entire write
      51              :         // buffer in order to wait for the response before we send a new request.
      52              :         // Therefore we can avoid the copy and just replace the buffer.
      53            0 :         if dst.is_empty() {
      54            0 :             *dst = item;
      55            0 :         } else {
      56            0 :             dst.extend_from_slice(&item);
      57            0 :         }
      58            0 :         Ok(())
      59            0 :     }
      60              : }
      61              : 
      62              : impl Decoder for PostgresCodec {
      63              :     type Item = BackendMessage;
      64              :     type Error = io::Error;
      65              : 
      66            0 :     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
      67            0 :         let mut idx = 0;
      68              : 
      69            0 :         let mut ready = false;
      70            0 :         while let Some(header) = backend::Header::parse(&src[idx..])? {
      71            0 :             let len = header.len() as usize + 1;
      72            0 :             if src[idx..].len() < len {
      73            0 :                 break;
      74            0 :             }
      75              : 
      76            0 :             match header.tag() {
      77              :                 backend::NOTICE_RESPONSE_TAG
      78              :                 | backend::NOTIFICATION_RESPONSE_TAG
      79              :                 | backend::PARAMETER_STATUS_TAG => {
      80            0 :                     if idx == 0 {
      81            0 :                         let message = backend::Message::parse(src)?.unwrap();
      82            0 :                         return Ok(Some(BackendMessage::Async(message)));
      83              :                     } else {
      84            0 :                         break;
      85              :                     }
      86              :                 }
      87            0 :                 _ => {}
      88              :             }
      89              : 
      90            0 :             idx += len;
      91              : 
      92            0 :             if header.tag() == backend::READY_FOR_QUERY_TAG {
      93            0 :                 ready = true;
      94            0 :                 break;
      95            0 :             }
      96              :         }
      97              : 
      98            0 :         if idx == 0 {
      99            0 :             Ok(None)
     100              :         } else {
     101            0 :             Ok(Some(BackendMessage::Normal {
     102            0 :                 messages: BackendMessages(src.split_to(idx)),
     103            0 :                 ready,
     104            0 :             }))
     105              :         }
     106            0 :     }
     107              : }
        

Generated by: LCOV version 2.1-beta