Line data Source code
1 : use std::pin::Pin;
2 : use std::sync::Arc;
3 : use std::task::{Context, Poll};
4 :
5 : use fallible_iterator::FallibleIterator;
6 : use futures_util::{Stream, ready};
7 : use pin_project_lite::pin_project;
8 : use postgres_protocol2::message::backend::Message;
9 : use tracing::debug;
10 :
11 : use crate::client::{InnerClient, Responses};
12 : use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
13 :
14 : /// Information about a column of a single query row.
15 : #[derive(Debug)]
16 : pub struct SimpleColumn {
17 : name: String,
18 : }
19 :
20 : impl SimpleColumn {
21 0 : pub(crate) fn new(name: String) -> SimpleColumn {
22 0 : SimpleColumn { name }
23 0 : }
24 :
25 : /// Returns the name of the column.
26 0 : pub fn name(&self) -> &str {
27 0 : &self.name
28 0 : }
29 : }
30 :
31 0 : pub async fn simple_query<'a>(
32 0 : client: &'a mut InnerClient,
33 0 : query: &str,
34 0 : ) -> Result<SimpleQueryStream<'a>, Error> {
35 0 : debug!("executing simple query: {}", query);
36 :
37 0 : let responses = client.send_simple_query(query)?;
38 :
39 0 : Ok(SimpleQueryStream {
40 0 : responses,
41 0 : columns: None,
42 0 : status: ReadyForQueryStatus::Unknown,
43 0 : })
44 0 : }
45 :
46 0 : pub async fn batch_execute(
47 0 : client: &mut InnerClient,
48 0 : query: &str,
49 0 : ) -> Result<ReadyForQueryStatus, Error> {
50 0 : debug!("executing statement batch: {}", query);
51 :
52 0 : let responses = client.send_simple_query(query)?;
53 :
54 : loop {
55 0 : match responses.next().await? {
56 0 : Message::ReadyForQuery(status) => return Ok(status.into()),
57 : Message::CommandComplete(_)
58 : | Message::EmptyQueryResponse
59 : | Message::RowDescription(_)
60 0 : | Message::DataRow(_) => {}
61 0 : _ => return Err(Error::unexpected_message()),
62 : }
63 : }
64 0 : }
65 :
66 : pin_project! {
67 : /// A stream of simple query results.
68 : pub struct SimpleQueryStream<'a> {
69 : responses: &'a mut Responses,
70 : columns: Option<Arc<[SimpleColumn]>>,
71 : status: ReadyForQueryStatus,
72 : }
73 : }
74 :
75 : impl SimpleQueryStream<'_> {
76 : /// Returns if the connection is ready for querying, with the status of the connection.
77 : ///
78 : /// This might be available only after the stream has been exhausted.
79 0 : pub fn ready_status(&self) -> ReadyForQueryStatus {
80 0 : self.status
81 0 : }
82 : }
83 :
84 : impl Stream for SimpleQueryStream<'_> {
85 : type Item = Result<SimpleQueryMessage, Error>;
86 :
87 0 : fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88 0 : let this = self.project();
89 : loop {
90 0 : match ready!(this.responses.poll_next(cx)?) {
91 0 : Message::CommandComplete(body) => {
92 0 : let rows = body
93 0 : .tag()
94 0 : .map_err(Error::parse)?
95 0 : .rsplit(' ')
96 0 : .next()
97 0 : .unwrap()
98 0 : .parse()
99 0 : .unwrap_or(0);
100 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
101 : }
102 : Message::EmptyQueryResponse => {
103 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
104 : }
105 0 : Message::RowDescription(body) => {
106 0 : let columns = body
107 0 : .fields()
108 0 : .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
109 0 : .collect::<Vec<_>>()
110 0 : .map_err(Error::parse)?
111 0 : .into();
112 :
113 0 : *this.columns = Some(columns);
114 : }
115 0 : Message::DataRow(body) => {
116 0 : let row = match &this.columns {
117 0 : Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
118 0 : None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
119 : };
120 0 : return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
121 : }
122 0 : Message::ReadyForQuery(s) => {
123 0 : *this.status = s.into();
124 0 : return Poll::Ready(None);
125 : }
126 0 : _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
127 : }
128 : }
129 0 : }
130 : }
|