Line data Source code
1 : use std::pin::Pin;
2 : use std::task::{Context, Poll};
3 :
4 : use bytes::BufMut;
5 : use futures_util::{Stream, ready};
6 : use postgres_protocol2::message::backend::Message;
7 : use postgres_protocol2::message::frontend;
8 : use postgres_types2::Format;
9 :
10 : use crate::client::{CachedTypeInfo, InnerClient, Responses};
11 : use crate::{Error, ReadyForQueryStatus, Row, Statement};
12 :
13 0 : pub async fn query_txt<'a, S, I>(
14 0 : client: &'a mut InnerClient,
15 0 : typecache: &mut CachedTypeInfo,
16 0 : query: &str,
17 0 : params: I,
18 0 : ) -> Result<RowStream<'a>, Error>
19 0 : where
20 0 : S: AsRef<str>,
21 0 : I: IntoIterator<Item = Option<S>>,
22 0 : I::IntoIter: ExactSizeIterator,
23 0 : {
24 0 : let params = params.into_iter();
25 0 : let mut client = client.start()?;
26 :
27 : // Flow:
28 : // 1. Parse the query
29 : // 2. Inspect the row description for OIDs
30 : // 3. If there's any OIDs we don't already know about, perform the typeinfo routine
31 : // 4. Execute the query
32 : // 5. Sync.
33 : //
34 : // The typeinfo routine:
35 : // 1. Parse the typeinfo query
36 : // 2. Execute the query on each OID
37 : // 3. If the result does not match an OID we know, repeat 2.
38 :
39 : // parse the query and get type info
40 0 : let responses = client.send_with_flush(|buf| {
41 0 : frontend::parse(
42 0 : "", // unnamed prepared statement
43 0 : query, // query to parse
44 0 : std::iter::empty(), // give no type info
45 0 : buf,
46 : )
47 0 : .map_err(Error::encode)?;
48 0 : frontend::describe(b'S', "", buf).map_err(Error::encode)?;
49 0 : Ok(())
50 0 : })?;
51 :
52 0 : match responses.next().await? {
53 0 : Message::ParseComplete => {}
54 0 : _ => return Err(Error::unexpected_message()),
55 : }
56 :
57 0 : match responses.next().await? {
58 0 : Message::ParameterDescription(_) => {}
59 0 : _ => return Err(Error::unexpected_message()),
60 : };
61 :
62 0 : let row_description = match responses.next().await? {
63 0 : Message::RowDescription(body) => Some(body),
64 0 : Message::NoData => None,
65 0 : _ => return Err(Error::unexpected_message()),
66 : };
67 :
68 0 : let columns =
69 0 : crate::prepare::parse_row_description(&mut client, typecache, row_description).await?;
70 :
71 0 : let responses = client.send_with_sync(|buf| {
72 : // Bind, pass params as text, retrieve as text
73 0 : match frontend::bind(
74 0 : "", // empty string selects the unnamed portal
75 0 : "", // unnamed prepared statement
76 0 : std::iter::empty(), // all parameters use the default format (text)
77 0 : params,
78 0 : |param, buf| match param {
79 0 : Some(param) => {
80 0 : buf.put_slice(param.as_ref().as_bytes());
81 0 : Ok(postgres_protocol2::IsNull::No)
82 : }
83 0 : None => Ok(postgres_protocol2::IsNull::Yes),
84 0 : },
85 0 : Some(0), // all text
86 0 : buf,
87 : ) {
88 0 : Ok(()) => Ok(()),
89 0 : Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
90 0 : Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
91 0 : }?;
92 :
93 : // Execute
94 0 : frontend::execute("", 0, buf).map_err(Error::encode)?;
95 :
96 0 : Ok(())
97 0 : })?;
98 :
99 0 : match responses.next().await? {
100 0 : Message::BindComplete => {}
101 0 : _ => return Err(Error::unexpected_message()),
102 : }
103 :
104 0 : Ok(RowStream {
105 0 : responses,
106 0 : statement: Statement::new("", columns),
107 0 : command_tag: None,
108 0 : status: ReadyForQueryStatus::Unknown,
109 0 : output_format: Format::Text,
110 0 : })
111 0 : }
112 :
113 : /// A stream of table rows.
114 : pub struct RowStream<'a> {
115 : responses: &'a mut Responses,
116 : output_format: Format,
117 : pub statement: Statement,
118 : pub command_tag: Option<String>,
119 : pub status: ReadyForQueryStatus,
120 : }
121 :
122 : impl Stream for RowStream<'_> {
123 : type Item = Result<Row, Error>;
124 :
125 0 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 0 : let this = self.get_mut();
127 : loop {
128 0 : match ready!(this.responses.poll_next(cx)?) {
129 0 : Message::DataRow(body) => {
130 0 : return Poll::Ready(Some(Ok(Row::new(
131 0 : this.statement.clone(),
132 0 : body,
133 0 : this.output_format,
134 0 : )?)));
135 : }
136 0 : Message::EmptyQueryResponse | Message::PortalSuspended => {}
137 0 : Message::CommandComplete(body) => {
138 0 : if let Ok(tag) = body.tag() {
139 0 : this.command_tag = Some(tag.to_string());
140 0 : }
141 : }
142 0 : Message::ReadyForQuery(status) => {
143 0 : this.status = status.into();
144 0 : return Poll::Ready(None);
145 : }
146 0 : _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
147 : }
148 : }
149 0 : }
150 : }
|