Line data Source code
1 : use crate::client::{InnerClient, Responses};
2 : use crate::codec::FrontendMessage;
3 : use crate::connection::RequestMessages;
4 : use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
5 : use bytes::Bytes;
6 : use fallible_iterator::FallibleIterator;
7 : use futures_util::{ready, Stream};
8 : use log::debug;
9 : use pin_project_lite::pin_project;
10 : use postgres_protocol2::message::backend::Message;
11 : use postgres_protocol2::message::frontend;
12 : use std::marker::PhantomPinned;
13 : use std::pin::Pin;
14 : use std::sync::Arc;
15 : use std::task::{Context, Poll};
16 :
17 : /// Information about a column of a single query row.
18 : #[derive(Debug)]
19 : pub struct SimpleColumn {
20 : name: String,
21 : }
22 :
23 : impl SimpleColumn {
24 0 : pub(crate) fn new(name: String) -> SimpleColumn {
25 0 : SimpleColumn { name }
26 0 : }
27 :
28 : /// Returns the name of the column.
29 0 : pub fn name(&self) -> &str {
30 0 : &self.name
31 0 : }
32 : }
33 :
34 0 : pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
35 0 : debug!("executing simple query: {}", query);
36 :
37 0 : let buf = encode(client, query)?;
38 0 : let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
39 :
40 0 : Ok(SimpleQueryStream {
41 0 : responses,
42 0 : columns: None,
43 0 : status: ReadyForQueryStatus::Unknown,
44 0 : _p: PhantomPinned,
45 0 : })
46 0 : }
47 :
48 0 : pub async fn batch_execute(
49 0 : client: &InnerClient,
50 0 : query: &str,
51 0 : ) -> Result<ReadyForQueryStatus, Error> {
52 0 : debug!("executing statement batch: {}", query);
53 :
54 0 : let buf = encode(client, query)?;
55 0 : let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
56 :
57 : loop {
58 0 : match responses.next().await? {
59 0 : Message::ReadyForQuery(status) => return Ok(status.into()),
60 : Message::CommandComplete(_)
61 : | Message::EmptyQueryResponse
62 : | Message::RowDescription(_)
63 0 : | Message::DataRow(_) => {}
64 0 : _ => return Err(Error::unexpected_message()),
65 : }
66 : }
67 0 : }
68 :
69 0 : pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
70 0 : client.with_buf(|buf| {
71 0 : frontend::query(query, buf).map_err(Error::encode)?;
72 0 : Ok(buf.split().freeze())
73 0 : })
74 0 : }
75 :
76 : pin_project! {
77 : /// A stream of simple query results.
78 : pub struct SimpleQueryStream {
79 : responses: Responses,
80 : columns: Option<Arc<[SimpleColumn]>>,
81 : status: ReadyForQueryStatus,
82 : #[pin]
83 : _p: PhantomPinned,
84 : }
85 : }
86 :
87 : impl SimpleQueryStream {
88 : /// Returns if the connection is ready for querying, with the status of the connection.
89 : ///
90 : /// This might be available only after the stream has been exhausted.
91 0 : pub fn ready_status(&self) -> ReadyForQueryStatus {
92 0 : self.status
93 0 : }
94 : }
95 :
96 : impl Stream for SimpleQueryStream {
97 : type Item = Result<SimpleQueryMessage, Error>;
98 :
99 0 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100 0 : let this = self.project();
101 : loop {
102 0 : match ready!(this.responses.poll_next(cx)?) {
103 0 : Message::CommandComplete(body) => {
104 0 : let rows = body
105 0 : .tag()
106 0 : .map_err(Error::parse)?
107 0 : .rsplit(' ')
108 0 : .next()
109 0 : .unwrap()
110 0 : .parse()
111 0 : .unwrap_or(0);
112 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
113 : }
114 : Message::EmptyQueryResponse => {
115 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
116 : }
117 0 : Message::RowDescription(body) => {
118 0 : let columns = body
119 0 : .fields()
120 0 : .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
121 0 : .collect::<Vec<_>>()
122 0 : .map_err(Error::parse)?
123 0 : .into();
124 0 :
125 0 : *this.columns = Some(columns);
126 : }
127 0 : Message::DataRow(body) => {
128 0 : let row = match &this.columns {
129 0 : Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
130 0 : None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
131 : };
132 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
133 : }
134 0 : Message::ReadyForQuery(s) => {
135 0 : *this.status = s.into();
136 0 : return Poll::Ready(None);
137 : }
138 0 : _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
139 : }
140 : }
141 0 : }
142 : }
|