LCOV - code coverage report
Current view: top level - libs/proxy/tokio-postgres2/src - query.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 86 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use std::pin::Pin;
       2              : use std::task::{Context, Poll};
       3              : 
       4              : use bytes::BufMut;
       5              : use futures_util::{Stream, ready};
       6              : use postgres_protocol2::message::backend::Message;
       7              : use postgres_protocol2::message::frontend;
       8              : use postgres_types2::Format;
       9              : 
      10              : use crate::client::{CachedTypeInfo, InnerClient, Responses};
      11              : use crate::{Error, ReadyForQueryStatus, Row, Statement};
      12              : 
      13            0 : pub async fn query_txt<'a, S, I>(
      14            0 :     client: &'a mut InnerClient,
      15            0 :     typecache: &mut CachedTypeInfo,
      16            0 :     query: &str,
      17            0 :     params: I,
      18            0 : ) -> Result<RowStream<'a>, Error>
      19            0 : where
      20            0 :     S: AsRef<str>,
      21            0 :     I: IntoIterator<Item = Option<S>>,
      22            0 :     I::IntoIter: ExactSizeIterator,
      23            0 : {
      24            0 :     let params = params.into_iter();
      25            0 :     let mut client = client.start()?;
      26              : 
      27              :     // Flow:
      28              :     // 1. Parse the query
      29              :     // 2. Inspect the row description for OIDs
      30              :     // 3. If there's any OIDs we don't already know about, perform the typeinfo routine
      31              :     // 4. Execute the query
      32              :     // 5. Sync.
      33              :     //
      34              :     // The typeinfo routine:
      35              :     // 1. Parse the typeinfo query
      36              :     // 2. Execute the query on each OID
      37              :     // 3. If the result does not match an OID we know, repeat 2.
      38              : 
      39              :     // parse the query and get type info
      40            0 :     let responses = client.send_with_flush(|buf| {
      41            0 :         frontend::parse(
      42            0 :             "",                 // unnamed prepared statement
      43            0 :             query,              // query to parse
      44            0 :             std::iter::empty(), // give no type info
      45            0 :             buf,
      46              :         )
      47            0 :         .map_err(Error::encode)?;
      48            0 :         frontend::describe(b'S', "", buf).map_err(Error::encode)?;
      49            0 :         Ok(())
      50            0 :     })?;
      51              : 
      52            0 :     match responses.next().await? {
      53            0 :         Message::ParseComplete => {}
      54            0 :         _ => return Err(Error::unexpected_message()),
      55              :     }
      56              : 
      57            0 :     match responses.next().await? {
      58            0 :         Message::ParameterDescription(_) => {}
      59            0 :         _ => return Err(Error::unexpected_message()),
      60              :     };
      61              : 
      62            0 :     let row_description = match responses.next().await? {
      63            0 :         Message::RowDescription(body) => Some(body),
      64            0 :         Message::NoData => None,
      65            0 :         _ => return Err(Error::unexpected_message()),
      66              :     };
      67              : 
      68            0 :     let columns =
      69            0 :         crate::prepare::parse_row_description(&mut client, typecache, row_description).await?;
      70              : 
      71            0 :     let responses = client.send_with_sync(|buf| {
      72              :         // Bind, pass params as text, retrieve as text
      73            0 :         match frontend::bind(
      74            0 :             "",                 // empty string selects the unnamed portal
      75            0 :             "",                 // unnamed prepared statement
      76            0 :             std::iter::empty(), // all parameters use the default format (text)
      77            0 :             params,
      78            0 :             |param, buf| match param {
      79            0 :                 Some(param) => {
      80            0 :                     buf.put_slice(param.as_ref().as_bytes());
      81            0 :                     Ok(postgres_protocol2::IsNull::No)
      82              :                 }
      83            0 :                 None => Ok(postgres_protocol2::IsNull::Yes),
      84            0 :             },
      85            0 :             Some(0), // all text
      86            0 :             buf,
      87              :         ) {
      88            0 :             Ok(()) => Ok(()),
      89            0 :             Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
      90            0 :             Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
      91            0 :         }?;
      92              : 
      93              :         // Execute
      94            0 :         frontend::execute("", 0, buf).map_err(Error::encode)?;
      95              : 
      96            0 :         Ok(())
      97            0 :     })?;
      98              : 
      99            0 :     match responses.next().await? {
     100            0 :         Message::BindComplete => {}
     101            0 :         _ => return Err(Error::unexpected_message()),
     102              :     }
     103              : 
     104            0 :     Ok(RowStream {
     105            0 :         responses,
     106            0 :         statement: Statement::new("", columns),
     107            0 :         command_tag: None,
     108            0 :         status: ReadyForQueryStatus::Unknown,
     109            0 :         output_format: Format::Text,
     110            0 :     })
     111            0 : }
     112              : 
     113              : /// A stream of table rows.
     114              : pub struct RowStream<'a> {
     115              :     responses: &'a mut Responses,
     116              :     output_format: Format,
     117              :     pub statement: Statement,
     118              :     pub command_tag: Option<String>,
     119              :     pub status: ReadyForQueryStatus,
     120              : }
     121              : 
     122              : impl Stream for RowStream<'_> {
     123              :     type Item = Result<Row, Error>;
     124              : 
     125            0 :     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
     126            0 :         let this = self.get_mut();
     127              :         loop {
     128            0 :             match ready!(this.responses.poll_next(cx)?) {
     129            0 :                 Message::DataRow(body) => {
     130            0 :                     return Poll::Ready(Some(Ok(Row::new(
     131            0 :                         this.statement.clone(),
     132            0 :                         body,
     133            0 :                         this.output_format,
     134            0 :                     )?)));
     135              :                 }
     136            0 :                 Message::EmptyQueryResponse | Message::PortalSuspended => {}
     137            0 :                 Message::CommandComplete(body) => {
     138            0 :                     if let Ok(tag) = body.tag() {
     139            0 :                         this.command_tag = Some(tag.to_string());
     140            0 :                     }
     141              :                 }
     142            0 :                 Message::ReadyForQuery(status) => {
     143            0 :                     this.status = status.into();
     144            0 :                     return Poll::Ready(None);
     145              :                 }
     146            0 :                 _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
     147              :             }
     148              :         }
     149            0 :     }
     150              : }
        

Generated by: LCOV version 2.1-beta