LCOV - code coverage report
Current view: top level - libs/proxy/tokio-postgres2/src - simple_query.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 71 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use crate::client::{InnerClient, Responses};
       2              : use crate::codec::FrontendMessage;
       3              : use crate::connection::RequestMessages;
       4              : use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
       5              : use bytes::Bytes;
       6              : use fallible_iterator::FallibleIterator;
       7              : use futures_util::{ready, Stream};
       8              : use log::debug;
       9              : use pin_project_lite::pin_project;
      10              : use postgres_protocol2::message::backend::Message;
      11              : use postgres_protocol2::message::frontend;
      12              : use std::marker::PhantomPinned;
      13              : use std::pin::Pin;
      14              : use std::sync::Arc;
      15              : use std::task::{Context, Poll};
      16              : 
      17              : /// Information about a column of a single query row.
      18              : #[derive(Debug)]
      19              : pub struct SimpleColumn {
      20              :     name: String,
      21              : }
      22              : 
      23              : impl SimpleColumn {
      24            0 :     pub(crate) fn new(name: String) -> SimpleColumn {
      25            0 :         SimpleColumn { name }
      26            0 :     }
      27              : 
      28              :     /// Returns the name of the column.
      29            0 :     pub fn name(&self) -> &str {
      30            0 :         &self.name
      31            0 :     }
      32              : }
      33              : 
      34            0 : pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
      35            0 :     debug!("executing simple query: {}", query);
      36              : 
      37            0 :     let buf = encode(client, query)?;
      38            0 :     let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
      39              : 
      40            0 :     Ok(SimpleQueryStream {
      41            0 :         responses,
      42            0 :         columns: None,
      43            0 :         status: ReadyForQueryStatus::Unknown,
      44            0 :         _p: PhantomPinned,
      45            0 :     })
      46            0 : }
      47              : 
      48            0 : pub async fn batch_execute(
      49            0 :     client: &InnerClient,
      50            0 :     query: &str,
      51            0 : ) -> Result<ReadyForQueryStatus, Error> {
      52            0 :     debug!("executing statement batch: {}", query);
      53              : 
      54            0 :     let buf = encode(client, query)?;
      55            0 :     let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
      56              : 
      57              :     loop {
      58            0 :         match responses.next().await? {
      59            0 :             Message::ReadyForQuery(status) => return Ok(status.into()),
      60              :             Message::CommandComplete(_)
      61              :             | Message::EmptyQueryResponse
      62              :             | Message::RowDescription(_)
      63            0 :             | Message::DataRow(_) => {}
      64            0 :             _ => return Err(Error::unexpected_message()),
      65              :         }
      66              :     }
      67            0 : }
      68              : 
      69            0 : pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
      70            0 :     client.with_buf(|buf| {
      71            0 :         frontend::query(query, buf).map_err(Error::encode)?;
      72            0 :         Ok(buf.split().freeze())
      73            0 :     })
      74            0 : }
      75              : 
      76              : pin_project! {
      77              :     /// A stream of simple query results.
      78              :     pub struct SimpleQueryStream {
      79              :         responses: Responses,
      80              :         columns: Option<Arc<[SimpleColumn]>>,
      81              :         status: ReadyForQueryStatus,
      82              :         #[pin]
      83              :         _p: PhantomPinned,
      84              :     }
      85              : }
      86              : 
      87              : impl SimpleQueryStream {
      88              :     /// Returns if the connection is ready for querying, with the status of the connection.
      89              :     ///
      90              :     /// This might be available only after the stream has been exhausted.
      91            0 :     pub fn ready_status(&self) -> ReadyForQueryStatus {
      92            0 :         self.status
      93            0 :     }
      94              : }
      95              : 
      96              : impl Stream for SimpleQueryStream {
      97              :     type Item = Result<SimpleQueryMessage, Error>;
      98              : 
      99            0 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     100            0 :         let this = self.project();
     101              :         loop {
     102            0 :             match ready!(this.responses.poll_next(cx)?) {
     103            0 :                 Message::CommandComplete(body) => {
     104            0 :                     let rows = body
     105            0 :                         .tag()
     106            0 :                         .map_err(Error::parse)?
     107            0 :                         .rsplit(' ')
     108            0 :                         .next()
     109            0 :                         .unwrap()
     110            0 :                         .parse()
     111            0 :                         .unwrap_or(0);
     112            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
     113              :                 }
     114              :                 Message::EmptyQueryResponse => {
     115            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
     116              :                 }
     117            0 :                 Message::RowDescription(body) => {
     118            0 :                     let columns = body
     119            0 :                         .fields()
     120            0 :                         .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
     121            0 :                         .collect::<Vec<_>>()
     122            0 :                         .map_err(Error::parse)?
     123            0 :                         .into();
     124            0 : 
     125            0 :                     *this.columns = Some(columns);
     126              :                 }
     127            0 :                 Message::DataRow(body) => {
     128            0 :                     let row = match &this.columns {
     129            0 :                         Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
     130            0 :                         None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     131              :                     };
     132            0 :                     return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
     133              :                 }
     134            0 :                 Message::ReadyForQuery(s) => {
     135            0 :                     *this.status = s.into();
     136            0 :                     return Poll::Ready(None);
     137              :                 }
     138            0 :                 _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     139              :             }
     140              :         }
     141            0 :     }
     142              : }
        

Generated by: LCOV version 2.1-beta