Line data Source code
1 : //! Client for making request to a running Postgres server's communicator control socket.
2 : //!
3 : //! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
4 : //! a Unix Domain Socket in the Postgres data directory. This provides access to it.
5 :
6 : use std::path::Path;
7 :
8 : use anyhow::Context;
9 : use hyper::client::conn::http1::SendRequest;
10 : use hyper_util::rt::TokioIo;
11 :
12 : /// Name of the socket within the Postgres data directory. This better match that in
13 : /// `pgxn/neon/communicator/src/lib.rs`.
14 : const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
15 :
16 : /// Open a connection to the communicator's control socket, prepare to send requests to it
17 : /// with hyper.
18 0 : pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
19 0 : where
20 0 : B: hyper::body::Body + 'static + Send,
21 0 : B::Data: Send,
22 0 : B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
23 0 : {
24 0 : let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
25 0 : let socket_path_len = socket_path.display().to_string().len();
26 :
27 : // There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
28 : // Unix Domain socket. The limit is on the connect(2) function used to open the
29 : // socket, not on the absolute path itself. Postgres changes the current directory to
30 : // the data directory and uses a relative path to bind to the socket, and the relative
31 : // path "./neon-communicator.socket" is always short, but when compute_ctl needs to
32 : // open the socket, we need to use a full path, which can be arbitrarily long.
33 : //
34 : // There are a few ways we could work around this:
35 : //
36 : // 1. Change the current directory to the Postgres data directory and use a relative
37 : // path in the connect(2) call. That's problematic because the current directory
38 : // applies to the whole process. We could change the current directory early in
39 : // compute_ctl startup, and that might be a good idea anyway for other reasons too:
40 : // it would be more robust if the data directory is moved around or unlinked for
41 : // some reason, and you would be less likely to accidentally litter other parts of
42 : // the filesystem with e.g. temporary files. However, that's a pretty invasive
43 : // change.
44 : //
45 : // 2. On Linux, you could open() the data directory, and refer to the the socket
46 : // inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
47 : // Linux-only.
48 : //
49 : // 3. Create a symbolic link to the socket with a shorter path, and use that.
50 : //
51 : // We use the symbolic link approach here. Hopefully the paths we use in production
52 : // are shorter, so that we can open the socket directly, so that this hack is needed
53 : // only in development.
54 0 : let connect_result = if socket_path_len < 100 {
55 : // We can open the path directly with no hacks.
56 0 : tokio::net::UnixStream::connect(socket_path).await
57 : } else {
58 : // The path to the socket is too long. Create a symlink to it with a shorter path.
59 0 : let short_path = std::env::temp_dir().join(format!(
60 0 : "compute_ctl.short-socket.{}.{}",
61 0 : std::process::id(),
62 0 : tokio::task::id()
63 0 : ));
64 0 : std::os::unix::fs::symlink(&socket_path, &short_path)?;
65 :
66 : // Delete the symlink as soon as we have connected to it. There's a small chance
67 : // of leaking if the process dies before we remove it, so try to keep that window
68 : // as small as possible.
69 0 : scopeguard::defer! {
70 : if let Err(err) = std::fs::remove_file(&short_path) {
71 : tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
72 : short_path.display(), err);
73 : }
74 : }
75 :
76 0 : tracing::info!(
77 0 : "created symlink \"{}\" for socket \"{}\", opening it now",
78 0 : short_path.display(),
79 0 : socket_path.display()
80 : );
81 :
82 0 : tokio::net::UnixStream::connect(&short_path).await
83 : };
84 :
85 0 : let stream = connect_result.context("connecting to communicator control socket")?;
86 :
87 0 : let io = TokioIo::new(stream);
88 0 : let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
89 :
90 : // spawn a task to poll the connection and drive the HTTP state
91 0 : tokio::spawn(async move {
92 0 : if let Err(err) = connection.await {
93 0 : eprintln!("Error in connection: {err}");
94 0 : }
95 0 : });
96 :
97 0 : Ok(request_sender)
98 0 : }
|