LCOV - code coverage report
Current view: top level - libs/proxy/tokio-postgres2/src - simple_query.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 64 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use std::pin::Pin;
       2              : use std::sync::Arc;
       3              : use std::task::{Context, Poll};
       4              : 
       5              : use fallible_iterator::FallibleIterator;
       6              : use futures_util::{Stream, ready};
       7              : use pin_project_lite::pin_project;
       8              : use postgres_protocol2::message::backend::Message;
       9              : use tracing::debug;
      10              : 
      11              : use crate::client::{InnerClient, Responses};
      12              : use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
      13              : 
      14              : /// Information about a column of a single query row.
      15              : #[derive(Debug)]
      16              : pub struct SimpleColumn {
      17              :     name: String,
      18              : }
      19              : 
      20              : impl SimpleColumn {
      21            0 :     pub(crate) fn new(name: String) -> SimpleColumn {
      22            0 :         SimpleColumn { name }
      23            0 :     }
      24              : 
      25              :     /// Returns the name of the column.
      26            0 :     pub fn name(&self) -> &str {
      27            0 :         &self.name
      28            0 :     }
      29              : }
      30              : 
      31            0 : pub async fn simple_query<'a>(
      32            0 :     client: &'a mut InnerClient,
      33            0 :     query: &str,
      34            0 : ) -> Result<SimpleQueryStream<'a>, Error> {
      35            0 :     debug!("executing simple query: {}", query);
      36              : 
      37            0 :     let responses = client.send_simple_query(query)?;
      38              : 
      39            0 :     Ok(SimpleQueryStream {
      40            0 :         responses,
      41            0 :         columns: None,
      42            0 :         status: ReadyForQueryStatus::Unknown,
      43            0 :     })
      44            0 : }
      45              : 
      46            0 : pub async fn batch_execute(
      47            0 :     client: &mut InnerClient,
      48            0 :     query: &str,
      49            0 : ) -> Result<ReadyForQueryStatus, Error> {
      50            0 :     debug!("executing statement batch: {}", query);
      51              : 
      52            0 :     let responses = client.send_simple_query(query)?;
      53              : 
      54              :     loop {
      55            0 :         match responses.next().await? {
      56            0 :             Message::ReadyForQuery(status) => return Ok(status.into()),
      57              :             Message::CommandComplete(_)
      58              :             | Message::EmptyQueryResponse
      59              :             | Message::RowDescription(_)
      60            0 :             | Message::DataRow(_) => {}
      61            0 :             _ => return Err(Error::unexpected_message()),
      62              :         }
      63              :     }
      64            0 : }
      65              : 
      66              : pin_project! {
      67              :     /// A stream of simple query results.
      68              :     pub struct SimpleQueryStream<'a> {
      69              :         responses: &'a mut Responses,
      70              :         columns: Option<Arc<[SimpleColumn]>>,
      71              :         status: ReadyForQueryStatus,
      72              :     }
      73              : }
      74              : 
      75              : impl SimpleQueryStream<'_> {
      76              :     /// Returns if the connection is ready for querying, with the status of the connection.
      77              :     ///
      78              :     /// This might be available only after the stream has been exhausted.
      79            0 :     pub fn ready_status(&self) -> ReadyForQueryStatus {
      80            0 :         self.status
      81            0 :     }
      82              : }
      83              : 
      84              : impl Stream for SimpleQueryStream<'_> {
      85              :     type Item = Result<SimpleQueryMessage, Error>;
      86              : 
      87            0 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
      88            0 :         let this = self.project();
      89              :         loop {
      90            0 :             match ready!(this.responses.poll_next(cx)?) {
      91            0 :                 Message::CommandComplete(body) => {
      92            0 :                     let rows = body
      93            0 :                         .tag()
      94            0 :                         .map_err(Error::parse)?
      95            0 :                         .rsplit(' ')
      96            0 :                         .next()
      97            0 :                         .unwrap()
      98            0 :                         .parse()
      99            0 :                         .unwrap_or(0);
     100            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
     101              :                 }
     102              :                 Message::EmptyQueryResponse => {
     103            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
     104              :                 }
     105            0 :                 Message::RowDescription(body) => {
     106            0 :                     let columns = body
     107            0 :                         .fields()
     108            0 :                         .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
     109            0 :                         .collect::<Vec<_>>()
     110            0 :                         .map_err(Error::parse)?
     111            0 :                         .into();
     112              : 
     113            0 :                     *this.columns = Some(columns);
     114              :                 }
     115            0 :                 Message::DataRow(body) => {
     116            0 :                     let row = match &this.columns {
     117            0 :                         Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
     118            0 :                         None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     119              :                     };
     120            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
     121              :                 }
     122            0 :                 Message::ReadyForQuery(s) => {
     123            0 :                     *this.status = s.into();
     124            0 :                     return Poll::Ready(None);
     125              :                 }
     126            0 :                 _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     127              :             }
     128              :         }
     129            0 :     }
     130              : }
        

Generated by: LCOV version 2.1-beta