Line data Source code
1 : use std::io;
2 :
3 : use bytes::BytesMut;
4 : use fallible_iterator::FallibleIterator;
5 : use postgres_protocol2::message::backend;
6 : use tokio::sync::mpsc::UnboundedSender;
7 : use tokio_util::codec::{Decoder, Encoder};
8 :
9 : pub enum FrontendMessage {
10 : Raw(BytesMut),
11 : RecordNotices(RecordNotices),
12 : }
13 :
14 : pub struct RecordNotices {
15 : pub sender: UnboundedSender<Box<str>>,
16 : pub limit: usize,
17 : }
18 :
19 : pub enum BackendMessage {
20 : Normal {
21 : messages: BackendMessages,
22 : ready: bool,
23 : },
24 : Async(backend::Message),
25 : }
26 :
27 : pub struct BackendMessages(BytesMut);
28 :
29 : impl BackendMessages {
30 0 : pub fn empty() -> BackendMessages {
31 0 : BackendMessages(BytesMut::new())
32 0 : }
33 : }
34 :
35 : impl FallibleIterator for BackendMessages {
36 : type Item = backend::Message;
37 : type Error = io::Error;
38 :
39 0 : fn next(&mut self) -> io::Result<Option<backend::Message>> {
40 0 : backend::Message::parse(&mut self.0)
41 0 : }
42 : }
43 :
44 : pub struct PostgresCodec;
45 :
46 : impl Encoder<BytesMut> for PostgresCodec {
47 : type Error = io::Error;
48 :
49 0 : fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> io::Result<()> {
50 0 : dst.unsplit(item);
51 0 : Ok(())
52 0 : }
53 : }
54 :
55 : impl Decoder for PostgresCodec {
56 : type Item = BackendMessage;
57 : type Error = io::Error;
58 :
59 0 : fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
60 0 : let mut idx = 0;
61 :
62 0 : let mut ready = false;
63 0 : while let Some(header) = backend::Header::parse(&src[idx..])? {
64 0 : let len = header.len() as usize + 1;
65 0 : if src[idx..].len() < len {
66 0 : break;
67 0 : }
68 :
69 0 : match header.tag() {
70 : backend::NOTICE_RESPONSE_TAG
71 : | backend::NOTIFICATION_RESPONSE_TAG
72 : | backend::PARAMETER_STATUS_TAG => {
73 0 : if idx == 0 {
74 0 : let message = backend::Message::parse(src)?.unwrap();
75 0 : return Ok(Some(BackendMessage::Async(message)));
76 : } else {
77 0 : break;
78 : }
79 : }
80 0 : _ => {}
81 : }
82 :
83 0 : idx += len;
84 :
85 0 : if header.tag() == backend::READY_FOR_QUERY_TAG {
86 0 : ready = true;
87 0 : break;
88 0 : }
89 : }
90 :
91 0 : if idx == 0 {
92 0 : Ok(None)
93 : } else {
94 0 : Ok(Some(BackendMessage::Normal {
95 0 : messages: BackendMessages(src.split_to(idx)),
96 0 : ready,
97 0 : }))
98 : }
99 0 : }
100 : }
|