Line data Source code
1 : //!
2 : //! WAL service listens for client connections and
3 : //! receive WAL from wal_proposer and send it to WAL receivers
4 : //!
5 : use anyhow::{Context, Result};
6 : use postgres_backend::QueryError;
7 : use std::{future, time::Duration};
8 : use tokio::net::TcpStream;
9 : use tokio_io_timeout::TimeoutReader;
10 : use tracing::*;
11 : use utils::{auth::Scope, measured_stream::MeasuredStream};
12 :
13 : use crate::handler::SafekeeperPostgresHandler;
14 : use crate::metrics::TrafficMetrics;
15 : use crate::SafeKeeperConf;
16 : use postgres_backend::{AuthType, PostgresBackend};
17 :
18 : /// Accept incoming TCP connections and spawn them into a background thread.
19 : /// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
20 : /// to any tenant are allowed) or Tenant (only tokens giving access to specific
21 : /// tenant are allowed). Doesn't matter if auth is disabled in conf.
22 1034 : pub async fn task_main(
23 1034 : conf: SafeKeeperConf,
24 1034 : pg_listener: std::net::TcpListener,
25 1034 : allowed_auth_scope: Scope,
26 1034 : ) -> anyhow::Result<()> {
27 : // Tokio's from_std won't do this for us, per its comment.
28 1034 : pg_listener.set_nonblocking(true)?;
29 :
30 1034 : let listener = tokio::net::TcpListener::from_std(pg_listener)?;
31 1034 : let mut connection_count: ConnectionCount = 0;
32 :
33 : loop {
34 4761 : let (socket, peer_addr) = listener.accept().await.context("accept")?;
35 0 : debug!("accepted connection from {}", peer_addr);
36 3727 : let conf = conf.clone();
37 3727 : let conn_id = issue_connection_id(&mut connection_count);
38 3727 :
39 3727 : tokio::spawn(async move {
40 3727 : if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope)
41 3727 : .instrument(info_span!("", cid = %conn_id))
42 7338733 : .await
43 : {
44 487 : error!("connection handler exited: {}", err);
45 2870 : }
46 3727 : });
47 : }
48 0 : }
49 :
50 : /// This is run by `task_main` above, inside a background thread.
51 : ///
52 3727 : async fn handle_socket(
53 3727 : socket: TcpStream,
54 3727 : conf: SafeKeeperConf,
55 3727 : conn_id: ConnectionId,
56 3727 : allowed_auth_scope: Scope,
57 3727 : ) -> Result<(), QueryError> {
58 3727 : socket.set_nodelay(true)?;
59 3727 : let peer_addr = socket.peer_addr()?;
60 :
61 : // Set timeout on reading from the socket. It prevents hanged up connection
62 : // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
63 : // default, and tokio doesn't provide ability to set it out of the box.
64 3727 : let mut socket = TimeoutReader::new(socket);
65 3727 : let wal_service_timeout = Duration::from_secs(60 * 10);
66 3727 : socket.set_timeout(Some(wal_service_timeout));
67 3727 : // pin! is here because TimeoutReader (due to storing sleep future inside)
68 3727 : // is not Unpin, and all pgbackend/framed/tokio dependencies require stream
69 3727 : // to be Unpin. Which is reasonable, as indeed something like TimeoutReader
70 3727 : // shouldn't be moved.
71 3727 : tokio::pin!(socket);
72 3727 :
73 3727 : let traffic_metrics = TrafficMetrics::new();
74 3727 : if let Some(current_az) = conf.availability_zone.as_deref() {
75 3727 : traffic_metrics.set_sk_az(current_az);
76 3727 : }
77 :
78 3727 : let socket = MeasuredStream::new(
79 3727 : socket,
80 3228651 : |cnt| {
81 3228651 : traffic_metrics.observe_read(cnt);
82 3228651 : },
83 2967391 : |cnt| {
84 2967391 : traffic_metrics.observe_write(cnt);
85 2967391 : },
86 3727 : );
87 :
88 3727 : let auth_key = match allowed_auth_scope {
89 2943 : Scope::Tenant => conf.pg_tenant_only_auth.clone(),
90 784 : _ => conf.pg_auth.clone(),
91 : };
92 3727 : let auth_type = match auth_key {
93 3592 : None => AuthType::Trust,
94 135 : Some(_) => AuthType::NeonJWT,
95 : };
96 3727 : let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
97 3727 : let mut conn_handler =
98 3727 : SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair);
99 3727 : let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
100 : // libpq protocol between safekeeper and walproposer / pageserver
101 : // We don't use shutdown.
102 3727 : pgbackend
103 3727 : .run(&mut conn_handler, future::pending::<()>)
104 7338733 : .await
105 3357 : }
106 :
107 : /// Unique WAL service connection ids are logged in spans for observability.
108 : pub type ConnectionId = u32;
109 : pub type ConnectionCount = u32;
110 :
111 3727 : pub fn issue_connection_id(count: &mut ConnectionCount) -> ConnectionId {
112 3727 : *count = count.wrapping_add(1);
113 3727 : *count
114 3727 : }
|