Line data Source code
1 : use std::io;
2 :
3 : use bytes::{Bytes, BytesMut};
4 : use fallible_iterator::FallibleIterator;
5 : use postgres_protocol2::message::backend;
6 : use tokio_util::codec::{Decoder, Encoder};
7 :
8 : pub enum FrontendMessage {
9 : Raw(Bytes),
10 : }
11 :
12 : pub enum BackendMessage {
13 : Normal { messages: BackendMessages },
14 : Async(backend::Message),
15 : }
16 :
17 : pub struct BackendMessages(BytesMut);
18 :
19 : impl BackendMessages {
20 15 : pub fn empty() -> BackendMessages {
21 15 : BackendMessages(BytesMut::new())
22 15 : }
23 : }
24 :
25 : impl FallibleIterator for BackendMessages {
26 : type Item = backend::Message;
27 : type Error = io::Error;
28 :
29 129 : fn next(&mut self) -> io::Result<Option<backend::Message>> {
30 129 : backend::Message::parse(&mut self.0)
31 129 : }
32 : }
33 :
34 : pub struct PostgresCodec;
35 :
36 : impl Encoder<FrontendMessage> for PostgresCodec {
37 : type Error = io::Error;
38 :
39 36 : fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
40 36 : match item {
41 36 : FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
42 : }
43 :
44 36 : Ok(())
45 36 : }
46 : }
47 :
48 : impl Decoder for PostgresCodec {
49 : type Item = BackendMessage;
50 : type Error = io::Error;
51 :
52 69 : fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
53 69 : let mut idx = 0;
54 :
55 104 : while let Some(header) = backend::Header::parse(&src[idx..])? {
56 55 : let len = header.len() as usize + 1;
57 55 : if src[idx..].len() < len {
58 0 : break;
59 55 : }
60 :
61 55 : match header.tag() {
62 : backend::NOTICE_RESPONSE_TAG
63 : | backend::NOTIFICATION_RESPONSE_TAG
64 : | backend::PARAMETER_STATUS_TAG => {
65 13 : if idx == 0 {
66 7 : let message = backend::Message::parse(src)?.unwrap();
67 7 : return Ok(Some(BackendMessage::Async(message)));
68 : } else {
69 6 : break;
70 : }
71 : }
72 42 : _ => {}
73 : }
74 :
75 42 : idx += len;
76 :
77 42 : if header.tag() == backend::READY_FOR_QUERY_TAG {
78 7 : break;
79 35 : }
80 : }
81 :
82 62 : if idx == 0 {
83 24 : Ok(None)
84 : } else {
85 38 : Ok(Some(BackendMessage::Normal {
86 38 : messages: BackendMessages(src.split_to(idx)),
87 38 : }))
88 : }
89 69 : }
90 : }
|