Line data Source code
1 : use std::marker::PhantomPinned;
2 : use std::pin::Pin;
3 : use std::sync::Arc;
4 : use std::task::{Context, Poll};
5 :
6 : use bytes::Bytes;
7 : use fallible_iterator::FallibleIterator;
8 : use futures_util::{Stream, ready};
9 : use log::debug;
10 : use pin_project_lite::pin_project;
11 : use postgres_protocol2::message::backend::Message;
12 : use postgres_protocol2::message::frontend;
13 :
14 : use crate::client::{InnerClient, Responses};
15 : use crate::codec::FrontendMessage;
16 : use crate::connection::RequestMessages;
17 : use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
18 :
19 : /// Information about a column of a single query row.
20 : #[derive(Debug)]
21 : pub struct SimpleColumn {
22 : name: String,
23 : }
24 :
25 : impl SimpleColumn {
26 0 : pub(crate) fn new(name: String) -> SimpleColumn {
27 0 : SimpleColumn { name }
28 0 : }
29 :
30 : /// Returns the name of the column.
31 0 : pub fn name(&self) -> &str {
32 0 : &self.name
33 0 : }
34 : }
35 :
36 0 : pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
37 0 : debug!("executing simple query: {}", query);
38 :
39 0 : let buf = encode(client, query)?;
40 0 : let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
41 :
42 0 : Ok(SimpleQueryStream {
43 0 : responses,
44 0 : columns: None,
45 0 : status: ReadyForQueryStatus::Unknown,
46 0 : _p: PhantomPinned,
47 0 : })
48 0 : }
49 :
50 0 : pub async fn batch_execute(
51 0 : client: &InnerClient,
52 0 : query: &str,
53 0 : ) -> Result<ReadyForQueryStatus, Error> {
54 0 : debug!("executing statement batch: {}", query);
55 :
56 0 : let buf = encode(client, query)?;
57 0 : let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
58 :
59 : loop {
60 0 : match responses.next().await? {
61 0 : Message::ReadyForQuery(status) => return Ok(status.into()),
62 : Message::CommandComplete(_)
63 : | Message::EmptyQueryResponse
64 : | Message::RowDescription(_)
65 0 : | Message::DataRow(_) => {}
66 0 : _ => return Err(Error::unexpected_message()),
67 : }
68 : }
69 0 : }
70 :
71 0 : pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
72 0 : client.with_buf(|buf| {
73 0 : frontend::query(query, buf).map_err(Error::encode)?;
74 0 : Ok(buf.split().freeze())
75 0 : })
76 0 : }
77 :
78 : pin_project! {
79 : /// A stream of simple query results.
80 : pub struct SimpleQueryStream {
81 : responses: Responses,
82 : columns: Option<Arc<[SimpleColumn]>>,
83 : status: ReadyForQueryStatus,
84 : #[pin]
85 : _p: PhantomPinned,
86 : }
87 : }
88 :
89 : impl SimpleQueryStream {
90 : /// Returns if the connection is ready for querying, with the status of the connection.
91 : ///
92 : /// This might be available only after the stream has been exhausted.
93 0 : pub fn ready_status(&self) -> ReadyForQueryStatus {
94 0 : self.status
95 0 : }
96 : }
97 :
98 : impl Stream for SimpleQueryStream {
99 : type Item = Result<SimpleQueryMessage, Error>;
100 :
101 0 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 0 : let this = self.project();
103 : loop {
104 0 : match ready!(this.responses.poll_next(cx)?) {
105 0 : Message::CommandComplete(body) => {
106 0 : let rows = body
107 0 : .tag()
108 0 : .map_err(Error::parse)?
109 0 : .rsplit(' ')
110 0 : .next()
111 0 : .unwrap()
112 0 : .parse()
113 0 : .unwrap_or(0);
114 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
115 : }
116 : Message::EmptyQueryResponse => {
117 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
118 : }
119 0 : Message::RowDescription(body) => {
120 0 : let columns = body
121 0 : .fields()
122 0 : .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
123 0 : .collect::<Vec<_>>()
124 0 : .map_err(Error::parse)?
125 0 : .into();
126 0 :
127 0 : *this.columns = Some(columns);
128 : }
129 0 : Message::DataRow(body) => {
130 0 : let row = match &this.columns {
131 0 : Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
132 0 : None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
133 : };
134 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
135 : }
136 0 : Message::ReadyForQuery(s) => {
137 0 : *this.status = s.into();
138 0 : return Poll::Ready(None);
139 : }
140 0 : _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
141 : }
142 : }
143 0 : }
144 : }
|