Line data Source code
1 : //! This module is responsible for locating and loading paths in a local setup.
2 : //!
3 : //! Now it also provides init method which acts like a stub for proper installation
4 : //! script which will use local paths.
5 :
6 : use anyhow::{bail, ensure, Context};
7 :
8 : use postgres_backend::AuthType;
9 : use reqwest::Url;
10 : use serde::{Deserialize, Serialize};
11 : use serde_with::{serde_as, DisplayFromStr};
12 : use std::collections::HashMap;
13 : use std::env;
14 : use std::fs;
15 : use std::net::IpAddr;
16 : use std::net::Ipv4Addr;
17 : use std::net::SocketAddr;
18 : use std::path::{Path, PathBuf};
19 : use std::process::{Command, Stdio};
20 : use utils::{
21 : auth::{encode_from_key_file, Claims},
22 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
23 : };
24 :
25 : use crate::safekeeper::SafekeeperNode;
26 :
27 : pub const DEFAULT_PG_VERSION: u32 = 15;
28 :
29 : //
30 : // This data structures represents neon_local CLI config
31 : //
32 : // It is deserialized from the .neon/config file, or the config file passed
33 : // to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
34 : // an example.
35 : //
36 : #[serde_as]
37 99473 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
38 : pub struct LocalEnv {
39 : // Base directory for all the nodes (the pageserver, safekeepers and
40 : // compute endpoints).
41 : //
42 : // This is not stored in the config file. Rather, this is the path where the
43 : // config file itself is. It is read from the NEON_REPO_DIR env variable or
44 : // '.neon' if not given.
45 : #[serde(skip)]
46 : pub base_data_dir: PathBuf,
47 :
48 : // Path to postgres distribution. It's expected that "bin", "include",
49 : // "lib", "share" from postgres distribution are there. If at some point
50 : // in time we will be able to run against vanilla postgres we may split that
51 : // to four separate paths and match OS-specific installation layout.
52 : #[serde(default)]
53 : pub pg_distrib_dir: PathBuf,
54 :
55 : // Path to pageserver binary.
56 : #[serde(default)]
57 : pub neon_distrib_dir: PathBuf,
58 :
59 : // Default tenant ID to use with the 'neon_local' command line utility, when
60 : // --tenant_id is not explicitly specified.
61 : #[serde(default)]
62 : #[serde_as(as = "Option<DisplayFromStr>")]
63 : pub default_tenant_id: Option<TenantId>,
64 :
65 : // used to issue tokens during e.g pg start
66 : #[serde(default)]
67 : pub private_key_path: PathBuf,
68 :
69 : pub broker: NeonBroker,
70 :
71 : pub pageserver: PageServerConf,
72 :
73 : #[serde(default)]
74 : pub safekeepers: Vec<SafekeeperConf>,
75 :
76 : /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
77 : #[serde(default)]
78 : // A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
79 : // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
80 : // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
81 : #[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
82 : branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
83 : }
84 :
85 : /// Broker config for cluster internal communication.
86 19859 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
87 : #[serde(default)]
88 : pub struct NeonBroker {
89 : /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
90 : pub listen_addr: SocketAddr,
91 : }
92 :
93 : // Dummy Default impl to satisfy Deserialize derive.
94 : impl Default for NeonBroker {
95 5410 : fn default() -> Self {
96 5410 : NeonBroker {
97 5410 : listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
98 5410 : }
99 5410 : }
100 : }
101 :
102 : impl NeonBroker {
103 2043 : pub fn client_url(&self) -> Url {
104 2043 : Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
105 2043 : }
106 : }
107 :
108 59517 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
109 : #[serde(default)]
110 : pub struct PageServerConf {
111 : // node id
112 : pub id: NodeId,
113 :
114 : // Pageserver connection settings
115 : pub listen_pg_addr: String,
116 : pub listen_http_addr: String,
117 :
118 : // auth type used for the PG and HTTP ports
119 : pub pg_auth_type: AuthType,
120 : pub http_auth_type: AuthType,
121 : }
122 :
123 : impl Default for PageServerConf {
124 5411 : fn default() -> Self {
125 5411 : Self {
126 5411 : id: NodeId(0),
127 5411 : listen_pg_addr: String::new(),
128 5411 : listen_http_addr: String::new(),
129 5411 : pg_auth_type: AuthType::Trust,
130 5411 : http_auth_type: AuthType::Trust,
131 5411 : }
132 5411 : }
133 : }
134 :
135 91249 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
136 : #[serde(default)]
137 : pub struct SafekeeperConf {
138 : pub id: NodeId,
139 : pub pg_port: u16,
140 : pub pg_tenant_only_port: Option<u16>,
141 : pub http_port: u16,
142 : pub sync: bool,
143 : pub remote_storage: Option<String>,
144 : pub backup_threads: Option<u32>,
145 : pub auth_enabled: bool,
146 : }
147 :
148 : impl Default for SafekeeperConf {
149 7019 : fn default() -> Self {
150 7019 : Self {
151 7019 : id: NodeId(0),
152 7019 : pg_port: 0,
153 7019 : pg_tenant_only_port: None,
154 7019 : http_port: 0,
155 7019 : sync: true,
156 7019 : remote_storage: None,
157 7019 : backup_threads: None,
158 7019 : auth_enabled: false,
159 7019 : }
160 7019 : }
161 : }
162 :
163 : impl SafekeeperConf {
164 : /// Compute is served by port on which only tenant scoped tokens allowed, if
165 : /// it is configured.
166 1473 : pub fn get_compute_port(&self) -> u16 {
167 1473 : self.pg_tenant_only_port.unwrap_or(self.pg_port)
168 1473 : }
169 : }
170 :
171 : impl LocalEnv {
172 1519 : pub fn pg_distrib_dir_raw(&self) -> PathBuf {
173 1519 : self.pg_distrib_dir.clone()
174 1519 : }
175 :
176 3000 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
177 3000 : let path = self.pg_distrib_dir.clone();
178 3000 :
179 3000 : match pg_version {
180 3000 : 14 => Ok(path.join(format!("v{pg_version}"))),
181 0 : 15 => Ok(path.join(format!("v{pg_version}"))),
182 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
183 : }
184 3000 : }
185 :
186 1688 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
187 1688 : match pg_version {
188 1688 : 14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
189 0 : 15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
190 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
191 : }
192 1688 : }
193 1312 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
194 1312 : match pg_version {
195 1312 : 14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
196 0 : 15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
197 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
198 : }
199 1312 : }
200 :
201 944 : pub fn pageserver_bin(&self) -> PathBuf {
202 944 : self.neon_distrib_dir.join("pageserver")
203 944 : }
204 :
205 517 : pub fn safekeeper_bin(&self) -> PathBuf {
206 517 : self.neon_distrib_dir.join("safekeeper")
207 517 : }
208 :
209 4 : pub fn storage_broker_bin(&self) -> PathBuf {
210 4 : self.neon_distrib_dir.join("storage_broker")
211 4 : }
212 :
213 10801 : pub fn endpoints_path(&self) -> PathBuf {
214 10801 : self.base_data_dir.join("endpoints")
215 10801 : }
216 :
217 : // TODO: move pageserver files into ./pageserver
218 2096 : pub fn pageserver_data_dir(&self) -> PathBuf {
219 2096 : self.base_data_dir.clone()
220 2096 : }
221 :
222 2528 : pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
223 2528 : self.base_data_dir.join("safekeepers").join(data_dir_name)
224 2528 : }
225 :
226 888 : pub fn register_branch_mapping(
227 888 : &mut self,
228 888 : branch_name: String,
229 888 : tenant_id: TenantId,
230 888 : timeline_id: TimelineId,
231 888 : ) -> anyhow::Result<()> {
232 888 : let existing_values = self
233 888 : .branch_name_mappings
234 888 : .entry(branch_name.clone())
235 888 : .or_default();
236 888 :
237 888 : let existing_ids = existing_values
238 888 : .iter()
239 888 : .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
240 :
241 888 : if let Some((_, old_timeline_id)) = existing_ids {
242 1 : if old_timeline_id == &timeline_id {
243 1 : Ok(())
244 : } else {
245 0 : bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
246 : }
247 : } else {
248 887 : existing_values.push((tenant_id, timeline_id));
249 887 : Ok(())
250 : }
251 888 : }
252 :
253 850 : pub fn get_branch_timeline_id(
254 850 : &self,
255 850 : branch_name: &str,
256 850 : tenant_id: TenantId,
257 850 : ) -> Option<TimelineId> {
258 850 : self.branch_name_mappings
259 850 : .get(branch_name)?
260 850 : .iter()
261 971 : .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
262 850 : .map(|&(_, timeline_id)| timeline_id)
263 850 : .map(TimelineId::from)
264 850 : }
265 :
266 15 : pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
267 15 : self.branch_name_mappings
268 15 : .iter()
269 36 : .flat_map(|(name, tenant_timelines)| {
270 42 : tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
271 42 : (TenantTimelineId::new(tenant_id, timeline_id), name.clone())
272 42 : })
273 36 : })
274 15 : .collect()
275 15 : }
276 :
277 : /// Create a LocalEnv from a config file.
278 : ///
279 : /// Unlike 'load_config', this function fills in any defaults that are missing
280 : /// from the config file.
281 371 : pub fn parse_config(toml: &str) -> anyhow::Result<Self> {
282 371 : let mut env: LocalEnv = toml::from_str(toml)?;
283 :
284 : // Find postgres binaries.
285 : // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
286 : // Note that later in the code we assume, that distrib dirs follow the same pattern
287 : // for all postgres versions.
288 370 : if env.pg_distrib_dir == Path::new("") {
289 370 : if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
290 369 : env.pg_distrib_dir = postgres_bin.into();
291 369 : } else {
292 1 : let cwd = env::current_dir()?;
293 1 : env.pg_distrib_dir = cwd.join("pg_install")
294 : }
295 0 : }
296 :
297 : // Find neon binaries.
298 370 : if env.neon_distrib_dir == Path::new("") {
299 370 : env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
300 0 : }
301 :
302 370 : env.base_data_dir = base_path();
303 370 :
304 370 : Ok(env)
305 371 : }
306 :
307 : /// Locate and load config
308 5040 : pub fn load_config() -> anyhow::Result<Self> {
309 5040 : let repopath = base_path();
310 5040 :
311 5040 : if !repopath.exists() {
312 0 : bail!(
313 0 : "Neon config is not found in {}. You need to run 'neon_local init' first",
314 0 : repopath.to_str().unwrap()
315 0 : );
316 5040 : }
317 :
318 : // TODO: check that it looks like a neon repository
319 :
320 : // load and parse file
321 5040 : let config = fs::read_to_string(repopath.join("config"))?;
322 5040 : let mut env: LocalEnv = toml::from_str(config.as_str())?;
323 :
324 5040 : env.base_data_dir = repopath;
325 5040 :
326 5040 : Ok(env)
327 5040 : }
328 :
329 1625 : pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
330 1625 : // Currently, the user first passes a config file with 'neon_local init --config=<path>'
331 1625 : // We read that in, in `create_config`, and fill any missing defaults. Then it's saved
332 1625 : // to .neon/config. TODO: We lose any formatting and comments along the way, which is
333 1625 : // a bit sad.
334 1625 : let mut conf_content = r#"# This file describes a locale deployment of the page server
335 1625 : # and safekeeeper node. It is read by the 'neon_local' command-line
336 1625 : # utility.
337 1625 : "#
338 1625 : .to_string();
339 1625 :
340 1625 : // Convert the LocalEnv to a toml file.
341 1625 : //
342 1625 : // This could be as simple as this:
343 1625 : //
344 1625 : // conf_content += &toml::to_string_pretty(env)?;
345 1625 : //
346 1625 : // But it results in a "values must be emitted before tables". I'm not sure
347 1625 : // why, AFAICS the table, i.e. 'safekeepers: Vec<SafekeeperConf>' is last.
348 1625 : // Maybe rust reorders the fields to squeeze avoid padding or something?
349 1625 : // In any case, converting to toml::Value first, and serializing that, works.
350 1625 : // See https://github.com/alexcrichton/toml-rs/issues/142
351 1625 : conf_content += &toml::to_string_pretty(&toml::Value::try_from(self)?)?;
352 :
353 1625 : let target_config_path = base_path.join("config");
354 1625 : fs::write(&target_config_path, conf_content).with_context(|| {
355 0 : format!(
356 0 : "Failed to write config file into path '{}'",
357 0 : target_config_path.display()
358 0 : )
359 1625 : })
360 1625 : }
361 :
362 : // this function is used only for testing purposes in CLI e g generate tokens during init
363 81 : pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
364 81 : let private_key_path = if self.private_key_path.is_absolute() {
365 0 : self.private_key_path.to_path_buf()
366 : } else {
367 81 : self.base_data_dir.join(&self.private_key_path)
368 : };
369 :
370 81 : let key_data = fs::read(private_key_path)?;
371 81 : encode_from_key_file(claims, &key_data)
372 81 : }
373 :
374 : //
375 : // Initialize a new Neon repository
376 : //
377 369 : pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
378 369 : // check if config already exists
379 369 : let base_path = &self.base_data_dir;
380 369 : ensure!(
381 369 : base_path != Path::new(""),
382 0 : "repository base path is missing"
383 : );
384 :
385 369 : if base_path.exists() {
386 0 : if force {
387 0 : println!("removing all contents of '{}'", base_path.display());
388 : // instead of directly calling `remove_dir_all`, we keep the original dir but removing
389 : // all contents inside. This helps if the developer symbol links another directory (i.e.,
390 : // S3 local SSD) to the `.neon` base directory.
391 0 : for entry in std::fs::read_dir(base_path)? {
392 0 : let entry = entry?;
393 0 : let path = entry.path();
394 0 : if path.is_dir() {
395 0 : fs::remove_dir_all(&path)?;
396 : } else {
397 0 : fs::remove_file(&path)?;
398 : }
399 : }
400 : } else {
401 0 : bail!(
402 0 : "directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
403 0 : base_path.display()
404 0 : );
405 : }
406 369 : }
407 :
408 369 : if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
409 0 : bail!(
410 0 : "Can't find postgres binary at {}",
411 0 : self.pg_bin_dir(pg_version)?.display()
412 : );
413 369 : }
414 738 : for binary in ["pageserver", "safekeeper"] {
415 738 : if !self.neon_distrib_dir.join(binary).exists() {
416 0 : bail!(
417 0 : "Can't find binary '{binary}' in neon distrib dir '{}'",
418 0 : self.neon_distrib_dir.display()
419 0 : );
420 738 : }
421 : }
422 :
423 369 : if !base_path.exists() {
424 369 : fs::create_dir(base_path)?;
425 0 : }
426 :
427 : // Generate keypair for JWT.
428 : //
429 : // The keypair is only needed if authentication is enabled in any of the
430 : // components. For convenience, we generate the keypair even if authentication
431 : // is not enabled, so that you can easily enable it after the initialization
432 : // step. However, if the key generation fails, we treat it as non-fatal if
433 : // authentication was not enabled.
434 369 : if self.private_key_path == PathBuf::new() {
435 369 : match generate_auth_keys(
436 369 : base_path.join("auth_private_key.pem").as_path(),
437 369 : base_path.join("auth_public_key.pem").as_path(),
438 369 : ) {
439 369 : Ok(()) => {
440 369 : self.private_key_path = PathBuf::from("auth_private_key.pem");
441 369 : }
442 0 : Err(e) => {
443 0 : if !self.auth_keys_needed() {
444 0 : eprintln!("Could not generate keypair for JWT authentication: {e}");
445 0 : eprintln!("Continuing anyway because authentication was not enabled");
446 0 : self.private_key_path = PathBuf::from("auth_private_key.pem");
447 0 : } else {
448 0 : return Err(e);
449 : }
450 : }
451 : }
452 0 : }
453 :
454 369 : fs::create_dir_all(self.endpoints_path())?;
455 :
456 799 : for safekeeper in &self.safekeepers {
457 430 : fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
458 : }
459 :
460 369 : self.persist_config(base_path)
461 369 : }
462 :
463 0 : fn auth_keys_needed(&self) -> bool {
464 0 : self.pageserver.pg_auth_type == AuthType::NeonJWT
465 0 : || self.pageserver.http_auth_type == AuthType::NeonJWT
466 0 : || self.safekeepers.iter().any(|sk| sk.auth_enabled)
467 0 : }
468 : }
469 :
470 5410 : fn base_path() -> PathBuf {
471 5410 : match std::env::var_os("NEON_REPO_DIR") {
472 5409 : Some(val) => PathBuf::from(val),
473 1 : None => PathBuf::from(".neon"),
474 : }
475 5410 : }
476 :
477 : /// Generate a public/private key pair for JWT authentication
478 369 : fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
479 : // Generate the key pair
480 : //
481 : // openssl genpkey -algorithm ed25519 -out auth_private_key.pem
482 369 : let keygen_output = Command::new("openssl")
483 369 : .arg("genpkey")
484 369 : .args(["-algorithm", "ed25519"])
485 369 : .args(["-out", private_key_path.to_str().unwrap()])
486 369 : .stdout(Stdio::null())
487 369 : .output()
488 369 : .context("failed to generate auth private key")?;
489 369 : if !keygen_output.status.success() {
490 0 : bail!(
491 0 : "openssl failed: '{}'",
492 0 : String::from_utf8_lossy(&keygen_output.stderr)
493 0 : );
494 369 : }
495 : // Extract the public key from the private key file
496 : //
497 : // openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
498 369 : let keygen_output = Command::new("openssl")
499 369 : .arg("pkey")
500 369 : .args(["-in", private_key_path.to_str().unwrap()])
501 369 : .arg("-pubout")
502 369 : .args(["-out", public_key_path.to_str().unwrap()])
503 369 : .output()
504 369 : .context("failed to extract public key from private key")?;
505 369 : if !keygen_output.status.success() {
506 0 : bail!(
507 0 : "openssl failed: '{}'",
508 0 : String::from_utf8_lossy(&keygen_output.stderr)
509 0 : );
510 369 : }
511 369 : Ok(())
512 369 : }
513 :
514 : #[cfg(test)]
515 : mod tests {
516 : use super::*;
517 :
518 1 : #[test]
519 1 : fn simple_conf_parsing() {
520 1 : let simple_conf_toml = include_str!("../simple.conf");
521 1 : let simple_conf_parse_result = LocalEnv::parse_config(simple_conf_toml);
522 1 : assert!(
523 1 : simple_conf_parse_result.is_ok(),
524 0 : "failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}"
525 : );
526 :
527 1 : let string_to_replace = "listen_addr = '127.0.0.1:50051'";
528 1 : let spoiled_url_str = "listen_addr = '!@$XOXO%^&'";
529 1 : let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str);
530 1 : assert!(
531 1 : spoiled_url_toml.contains(spoiled_url_str),
532 0 : "Failed to replace string {string_to_replace} in the toml file {simple_conf_toml}"
533 : );
534 1 : let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml);
535 1 : assert!(
536 1 : spoiled_url_parse_result.is_err(),
537 0 : "expected toml with invalid Url {spoiled_url_toml} to fail the parsing, but got {spoiled_url_parse_result:?}"
538 : );
539 1 : }
540 : }
|