LCOV - differential code coverage report
Current view: top level - safekeeper/src - wal_service.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 97.1 % 69 67 2 67
Current Date: 2024-01-09 02:06:09 Functions: 90.9 % 11 10 1 10
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //!   WAL service listens for client connections and
       3                 : //!   receive WAL from wal_proposer and send it to WAL receivers
       4                 : //!
       5                 : use anyhow::{Context, Result};
       6                 : use postgres_backend::QueryError;
       7                 : use std::{future, time::Duration};
       8                 : use tokio::net::TcpStream;
       9                 : use tokio_io_timeout::TimeoutReader;
      10                 : use tracing::*;
      11                 : use utils::{auth::Scope, measured_stream::MeasuredStream};
      12                 : 
      13                 : use crate::handler::SafekeeperPostgresHandler;
      14                 : use crate::metrics::TrafficMetrics;
      15                 : use crate::SafeKeeperConf;
      16                 : use postgres_backend::{AuthType, PostgresBackend};
      17                 : 
      18                 : /// Accept incoming TCP connections and spawn them into a background thread.
      19                 : /// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
      20                 : /// to any tenant are allowed) or Tenant (only tokens giving access to specific
      21                 : /// tenant are allowed). Doesn't matter if auth is disabled in conf.
      22 CBC         970 : pub async fn task_main(
      23             970 :     conf: SafeKeeperConf,
      24             970 :     pg_listener: std::net::TcpListener,
      25             970 :     allowed_auth_scope: Scope,
      26             970 : ) -> anyhow::Result<()> {
      27             970 :     // Tokio's from_std won't do this for us, per its comment.
      28             970 :     pg_listener.set_nonblocking(true)?;
      29                 : 
      30             970 :     let listener = tokio::net::TcpListener::from_std(pg_listener)?;
      31             970 :     let mut connection_count: ConnectionCount = 0;
      32                 : 
      33                 :     loop {
      34            4093 :         let (socket, peer_addr) = listener.accept().await.context("accept")?;
      35 UBC           0 :         debug!("accepted connection from {}", peer_addr);
      36 CBC        3123 :         let conf = conf.clone();
      37            3123 :         let conn_id = issue_connection_id(&mut connection_count);
      38                 : 
      39            3123 :         tokio::spawn(
      40            3123 :             async move {
      41         4841986 :                 if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await {
      42              50 :                     error!("connection handler exited: {}", err);
      43            2682 :                 }
      44            2732 :             }
      45            3123 :             .instrument(info_span!("", cid = %conn_id, ttid = field::Empty)),
      46                 :         );
      47                 :     }
      48 UBC           0 : }
      49                 : 
      50                 : /// This is run by `task_main` above, inside a background thread.
      51                 : ///
      52 CBC        3123 : async fn handle_socket(
      53            3123 :     socket: TcpStream,
      54            3123 :     conf: SafeKeeperConf,
      55            3123 :     conn_id: ConnectionId,
      56            3123 :     allowed_auth_scope: Scope,
      57            3123 : ) -> Result<(), QueryError> {
      58            3123 :     socket.set_nodelay(true)?;
      59            3123 :     let peer_addr = socket.peer_addr()?;
      60                 : 
      61                 :     // Set timeout on reading from the socket. It prevents hanged up connection
      62                 :     // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
      63                 :     // default, and tokio doesn't provide ability to set it out of the box.
      64            3123 :     let mut socket = TimeoutReader::new(socket);
      65            3123 :     let wal_service_timeout = Duration::from_secs(60 * 10);
      66            3123 :     socket.set_timeout(Some(wal_service_timeout));
      67            3123 :     // pin! is here because TimeoutReader (due to storing sleep future inside)
      68            3123 :     // is not Unpin, and all pgbackend/framed/tokio dependencies require stream
      69            3123 :     // to be Unpin. Which is reasonable, as indeed something like TimeoutReader
      70            3123 :     // shouldn't be moved.
      71            3123 :     tokio::pin!(socket);
      72            3123 : 
      73            3123 :     let traffic_metrics = TrafficMetrics::new();
      74            3123 :     if let Some(current_az) = conf.availability_zone.as_deref() {
      75            3123 :         traffic_metrics.set_sk_az(current_az);
      76            3123 :     }
      77                 : 
      78            3123 :     let socket = MeasuredStream::new(
      79            3123 :         socket,
      80         2177218 :         |cnt| {
      81         2177218 :             traffic_metrics.observe_read(cnt);
      82         2177218 :         },
      83         1883400 :         |cnt| {
      84         1883400 :             traffic_metrics.observe_write(cnt);
      85         1883400 :         },
      86            3123 :     );
      87                 : 
      88            3123 :     let auth_key = match allowed_auth_scope {
      89            2389 :         Scope::Tenant => conf.pg_tenant_only_auth.clone(),
      90             734 :         _ => conf.pg_auth.clone(),
      91                 :     };
      92            3123 :     let auth_type = match auth_key {
      93            2991 :         None => AuthType::Trust,
      94             132 :         Some(_) => AuthType::NeonJWT,
      95                 :     };
      96            3123 :     let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
      97            3123 :     let mut conn_handler =
      98            3123 :         SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair);
      99            3123 :     let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
     100                 :     // libpq protocol between safekeeper and walproposer / pageserver
     101                 :     // We don't use shutdown.
     102            3123 :     pgbackend
     103            3123 :         .run(&mut conn_handler, future::pending::<()>)
     104         4841986 :         .await
     105            2732 : }
     106                 : 
     107                 : /// Unique WAL service connection ids are logged in spans for observability.
     108                 : pub type ConnectionId = u32;
     109                 : pub type ConnectionCount = u32;
     110                 : 
     111            3123 : pub fn issue_connection_id(count: &mut ConnectionCount) -> ConnectionId {
     112            3123 :     *count = count.wrapping_add(1);
     113            3123 :     *count
     114            3123 : }
        

Generated by: LCOV version 2.1-beta