Line data Source code
1 : use std::io;
2 :
3 : use bytes::BytesMut;
4 : use fallible_iterator::FallibleIterator;
5 : use postgres_protocol2::message::backend;
6 : use tokio::sync::mpsc::UnboundedSender;
7 : use tokio_util::codec::{Decoder, Encoder};
8 :
9 : pub enum FrontendMessage {
10 : Raw(BytesMut),
11 : RecordNotices(RecordNotices),
12 : }
13 :
14 : pub struct RecordNotices {
15 : pub sender: UnboundedSender<Box<str>>,
16 : pub limit: usize,
17 : }
18 :
19 : pub enum BackendMessage {
20 : Normal {
21 : messages: BackendMessages,
22 : ready: bool,
23 : },
24 : Async(backend::Message),
25 : }
26 :
27 : pub struct BackendMessages(BytesMut);
28 :
29 : impl BackendMessages {
30 0 : pub fn empty() -> BackendMessages {
31 0 : BackendMessages(BytesMut::new())
32 0 : }
33 : }
34 :
35 : impl FallibleIterator for BackendMessages {
36 : type Item = backend::Message;
37 : type Error = io::Error;
38 :
39 0 : fn next(&mut self) -> io::Result<Option<backend::Message>> {
40 0 : backend::Message::parse(&mut self.0)
41 0 : }
42 : }
43 :
44 : pub struct PostgresCodec;
45 :
46 : impl Encoder<BytesMut> for PostgresCodec {
47 : type Error = io::Error;
48 :
49 0 : fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> io::Result<()> {
50 : // When it comes to request/response workflows, we usually flush the entire write
51 : // buffer in order to wait for the response before we send a new request.
52 : // Therefore we can avoid the copy and just replace the buffer.
53 0 : if dst.is_empty() {
54 0 : *dst = item;
55 0 : } else {
56 0 : dst.extend_from_slice(&item);
57 0 : }
58 0 : Ok(())
59 0 : }
60 : }
61 :
62 : impl Decoder for PostgresCodec {
63 : type Item = BackendMessage;
64 : type Error = io::Error;
65 :
66 0 : fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
67 0 : let mut idx = 0;
68 :
69 0 : let mut ready = false;
70 0 : while let Some(header) = backend::Header::parse(&src[idx..])? {
71 0 : let len = header.len() as usize + 1;
72 0 : if src[idx..].len() < len {
73 0 : break;
74 0 : }
75 :
76 0 : match header.tag() {
77 : backend::NOTICE_RESPONSE_TAG
78 : | backend::NOTIFICATION_RESPONSE_TAG
79 : | backend::PARAMETER_STATUS_TAG => {
80 0 : if idx == 0 {
81 0 : let message = backend::Message::parse(src)?.unwrap();
82 0 : return Ok(Some(BackendMessage::Async(message)));
83 : } else {
84 0 : break;
85 : }
86 : }
87 0 : _ => {}
88 : }
89 :
90 0 : idx += len;
91 :
92 0 : if header.tag() == backend::READY_FOR_QUERY_TAG {
93 0 : ready = true;
94 0 : break;
95 0 : }
96 : }
97 :
98 0 : if idx == 0 {
99 0 : Ok(None)
100 : } else {
101 0 : Ok(Some(BackendMessage::Normal {
102 0 : messages: BackendMessages(src.split_to(idx)),
103 0 : ready,
104 0 : }))
105 : }
106 0 : }
107 : }
|