Line data Source code
1 : use crate::client::SocketConfig;
2 : use crate::codec::BackendMessage;
3 : use crate::config::Host;
4 : use crate::connect_raw::connect_raw;
5 : use crate::connect_socket::connect_socket;
6 : use crate::tls::{MakeTlsConnect, TlsConnect};
7 : use crate::{Client, Config, Connection, Error, RawConnection};
8 : use postgres_protocol2::message::backend::Message;
9 : use tokio::net::TcpStream;
10 : use tokio::sync::mpsc;
11 :
12 0 : pub async fn connect<T>(
13 0 : mut tls: T,
14 0 : config: &Config,
15 0 : ) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
16 0 : where
17 0 : T: MakeTlsConnect<TcpStream>,
18 0 : {
19 0 : let hostname = match &config.host {
20 0 : Host::Tcp(host) => host.as_str(),
21 : };
22 :
23 0 : let tls = tls
24 0 : .make_tls_connect(hostname)
25 0 : .map_err(|e| Error::tls(e.into()))?;
26 :
27 0 : match connect_once(&config.host, config.port, tls, config).await {
28 0 : Ok((client, connection)) => Ok((client, connection)),
29 0 : Err(e) => Err(e),
30 : }
31 0 : }
32 :
33 0 : async fn connect_once<T>(
34 0 : host: &Host,
35 0 : port: u16,
36 0 : tls: T,
37 0 : config: &Config,
38 0 : ) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
39 0 : where
40 0 : T: TlsConnect<TcpStream>,
41 0 : {
42 0 : let socket = connect_socket(host, port, config.connect_timeout).await?;
43 : let RawConnection {
44 0 : stream,
45 0 : parameters,
46 0 : delayed_notice,
47 0 : process_id,
48 0 : secret_key,
49 0 : } = connect_raw(socket, tls, config).await?;
50 :
51 0 : let socket_config = SocketConfig {
52 0 : host: host.clone(),
53 0 : port,
54 0 : connect_timeout: config.connect_timeout,
55 0 : };
56 0 :
57 0 : let (sender, receiver) = mpsc::unbounded_channel();
58 0 : let client = Client::new(
59 0 : sender,
60 0 : socket_config,
61 0 : config.ssl_mode,
62 0 : process_id,
63 0 : secret_key,
64 0 : );
65 0 :
66 0 : // delayed notices are always sent as "Async" messages.
67 0 : let delayed = delayed_notice
68 0 : .into_iter()
69 0 : .map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
70 0 : .collect();
71 0 :
72 0 : let connection = Connection::new(stream, delayed, parameters, receiver);
73 0 :
74 0 : Ok((client, connection))
75 0 : }
|