LCOV - code coverage report
Current view: top level - libs/proxy/tokio-postgres2/src - simple_query.rs (source / functions) Coverage Total Hit
Test: 5fe7fa8d483b39476409aee736d6d5e32728bfac.info Lines: 0.0 % 71 0
Test Date: 2025-03-12 16:10:49 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use std::marker::PhantomPinned;
       2              : use std::pin::Pin;
       3              : use std::sync::Arc;
       4              : use std::task::{Context, Poll};
       5              : 
       6              : use bytes::Bytes;
       7              : use fallible_iterator::FallibleIterator;
       8              : use futures_util::{Stream, ready};
       9              : use log::debug;
      10              : use pin_project_lite::pin_project;
      11              : use postgres_protocol2::message::backend::Message;
      12              : use postgres_protocol2::message::frontend;
      13              : 
      14              : use crate::client::{InnerClient, Responses};
      15              : use crate::codec::FrontendMessage;
      16              : use crate::connection::RequestMessages;
      17              : use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
      18              : 
      19              : /// Information about a column of a single query row.
      20              : #[derive(Debug)]
      21              : pub struct SimpleColumn {
      22              :     name: String,
      23              : }
      24              : 
      25              : impl SimpleColumn {
      26            0 :     pub(crate) fn new(name: String) -> SimpleColumn {
      27            0 :         SimpleColumn { name }
      28            0 :     }
      29              : 
      30              :     /// Returns the name of the column.
      31            0 :     pub fn name(&self) -> &str {
      32            0 :         &self.name
      33            0 :     }
      34              : }
      35              : 
      36            0 : pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
      37            0 :     debug!("executing simple query: {}", query);
      38              : 
      39            0 :     let buf = encode(client, query)?;
      40            0 :     let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
      41              : 
      42            0 :     Ok(SimpleQueryStream {
      43            0 :         responses,
      44            0 :         columns: None,
      45            0 :         status: ReadyForQueryStatus::Unknown,
      46            0 :         _p: PhantomPinned,
      47            0 :     })
      48            0 : }
      49              : 
      50            0 : pub async fn batch_execute(
      51            0 :     client: &InnerClient,
      52            0 :     query: &str,
      53            0 : ) -> Result<ReadyForQueryStatus, Error> {
      54            0 :     debug!("executing statement batch: {}", query);
      55              : 
      56            0 :     let buf = encode(client, query)?;
      57            0 :     let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
      58              : 
      59              :     loop {
      60            0 :         match responses.next().await? {
      61            0 :             Message::ReadyForQuery(status) => return Ok(status.into()),
      62              :             Message::CommandComplete(_)
      63              :             | Message::EmptyQueryResponse
      64              :             | Message::RowDescription(_)
      65            0 :             | Message::DataRow(_) => {}
      66            0 :             _ => return Err(Error::unexpected_message()),
      67              :         }
      68              :     }
      69            0 : }
      70              : 
      71            0 : pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
      72            0 :     client.with_buf(|buf| {
      73            0 :         frontend::query(query, buf).map_err(Error::encode)?;
      74            0 :         Ok(buf.split().freeze())
      75            0 :     })
      76            0 : }
      77              : 
      78              : pin_project! {
      79              :     /// A stream of simple query results.
      80              :     pub struct SimpleQueryStream {
      81              :         responses: Responses,
      82              :         columns: Option<Arc<[SimpleColumn]>>,
      83              :         status: ReadyForQueryStatus,
      84              :         #[pin]
      85              :         _p: PhantomPinned,
      86              :     }
      87              : }
      88              : 
      89              : impl SimpleQueryStream {
      90              :     /// Returns if the connection is ready for querying, with the status of the connection.
      91              :     ///
      92              :     /// This might be available only after the stream has been exhausted.
      93            0 :     pub fn ready_status(&self) -> ReadyForQueryStatus {
      94            0 :         self.status
      95            0 :     }
      96              : }
      97              : 
      98              : impl Stream for SimpleQueryStream {
      99              :     type Item = Result<SimpleQueryMessage, Error>;
     100              : 
     101            0 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     102            0 :         let this = self.project();
     103              :         loop {
     104            0 :             match ready!(this.responses.poll_next(cx)?) {
     105            0 :                 Message::CommandComplete(body) => {
     106            0 :                     let rows = body
     107            0 :                         .tag()
     108            0 :                         .map_err(Error::parse)?
     109            0 :                         .rsplit(' ')
     110            0 :                         .next()
     111            0 :                         .unwrap()
     112            0 :                         .parse()
     113            0 :                         .unwrap_or(0);
     114            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
     115              :                 }
     116              :                 Message::EmptyQueryResponse => {
     117            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
     118              :                 }
     119            0 :                 Message::RowDescription(body) => {
     120            0 :                     let columns = body
     121            0 :                         .fields()
     122            0 :                         .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
     123            0 :                         .collect::<Vec<_>>()
     124            0 :                         .map_err(Error::parse)?
     125            0 :                         .into();
     126            0 : 
     127            0 :                     *this.columns = Some(columns);
     128              :                 }
     129            0 :                 Message::DataRow(body) => {
     130            0 :                     let row = match &this.columns {
     131            0 :                         Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
     132            0 :                         None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     133              :                     };
     134            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
     135              :                 }
     136            0 :                 Message::ReadyForQuery(s) => {
     137            0 :                     *this.status = s.into();
     138            0 :                     return Poll::Ready(None);
     139              :                 }
     140            0 :                 _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     141              :             }
     142              :         }
     143            0 :     }
     144              : }
        

Generated by: LCOV version 2.1-beta