LCOV - code coverage report
Current view: top level - compute_tools/src - communicator_socket_client.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 32 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 3 0

            Line data    Source code
       1              : //! Client for making request to a running Postgres server's communicator control socket.
       2              : //!
       3              : //! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
       4              : //! a Unix Domain Socket in the Postgres data directory. This provides access to it.
       5              : 
       6              : use std::path::Path;
       7              : 
       8              : use anyhow::Context;
       9              : use hyper::client::conn::http1::SendRequest;
      10              : use hyper_util::rt::TokioIo;
      11              : 
      12              : /// Name of the socket within the Postgres data directory. This better match that in
      13              : /// `pgxn/neon/communicator/src/lib.rs`.
      14              : const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
      15              : 
      16              : /// Open a connection to the communicator's control socket, prepare to send requests to it
      17              : /// with hyper.
      18            0 : pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
      19            0 : where
      20            0 :     B: hyper::body::Body + 'static + Send,
      21            0 :     B::Data: Send,
      22            0 :     B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
      23            0 : {
      24            0 :     let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
      25            0 :     let socket_path_len = socket_path.display().to_string().len();
      26              : 
      27              :     // There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
      28              :     // Unix Domain socket. The limit is on the connect(2) function used to open the
      29              :     // socket, not on the absolute path itself. Postgres changes the current directory to
      30              :     // the data directory and uses a relative path to bind to the socket, and the relative
      31              :     // path "./neon-communicator.socket" is always short, but when compute_ctl needs to
      32              :     // open the socket, we need to use a full path, which can be arbitrarily long.
      33              :     //
      34              :     // There are a few ways we could work around this:
      35              :     //
      36              :     // 1. Change the current directory to the Postgres data directory and use a relative
      37              :     //    path in the connect(2) call. That's problematic because the current directory
      38              :     //    applies to the whole process. We could change the current directory early in
      39              :     //    compute_ctl startup, and that might be a good idea anyway for other reasons too:
      40              :     //    it would be more robust if the data directory is moved around or unlinked for
      41              :     //    some reason, and you would be less likely to accidentally litter other parts of
      42              :     //    the filesystem with e.g. temporary files. However, that's a pretty invasive
      43              :     //    change.
      44              :     //
      45              :     // 2. On Linux, you could open() the data directory, and refer to the the socket
      46              :     //    inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
      47              :     //    Linux-only.
      48              :     //
      49              :     // 3. Create a symbolic link to the socket with a shorter path, and use that.
      50              :     //
      51              :     // We use the symbolic link approach here. Hopefully the paths we use in production
      52              :     // are shorter, so that we can open the socket directly, so that this hack is needed
      53              :     // only in development.
      54            0 :     let connect_result = if socket_path_len < 100 {
      55              :         // We can open the path directly with no hacks.
      56            0 :         tokio::net::UnixStream::connect(socket_path).await
      57              :     } else {
      58              :         // The path to the socket is too long. Create a symlink to it with a shorter path.
      59            0 :         let short_path = std::env::temp_dir().join(format!(
      60            0 :             "compute_ctl.short-socket.{}.{}",
      61            0 :             std::process::id(),
      62            0 :             tokio::task::id()
      63            0 :         ));
      64            0 :         std::os::unix::fs::symlink(&socket_path, &short_path)?;
      65              : 
      66              :         // Delete the symlink as soon as we have connected to it. There's a small chance
      67              :         // of leaking if the process dies before we remove it, so try to keep that window
      68              :         // as small as possible.
      69            0 :         scopeguard::defer! {
      70              :             if let Err(err) = std::fs::remove_file(&short_path) {
      71              :                 tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
      72              :                                short_path.display(), err);
      73              :             }
      74              :         }
      75              : 
      76            0 :         tracing::info!(
      77            0 :             "created symlink \"{}\" for socket \"{}\", opening it now",
      78            0 :             short_path.display(),
      79            0 :             socket_path.display()
      80              :         );
      81              : 
      82            0 :         tokio::net::UnixStream::connect(&short_path).await
      83              :     };
      84              : 
      85            0 :     let stream = connect_result.context("connecting to communicator control socket")?;
      86              : 
      87            0 :     let io = TokioIo::new(stream);
      88            0 :     let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
      89              : 
      90              :     // spawn a task to poll the connection and drive the HTTP state
      91            0 :     tokio::spawn(async move {
      92            0 :         if let Err(err) = connection.await {
      93            0 :             eprintln!("Error in connection: {err}");
      94            0 :         }
      95            0 :     });
      96              : 
      97            0 :     Ok(request_sender)
      98            0 : }
        

Generated by: LCOV version 2.1-beta