TLA Line data Source code
1 : //! This module is responsible for locating and loading paths in a local setup.
2 : //!
3 : //! Now it also provides init method which acts like a stub for proper installation
4 : //! script which will use local paths.
5 :
6 : use anyhow::{bail, ensure, Context};
7 :
8 : use postgres_backend::AuthType;
9 : use reqwest::Url;
10 : use serde::{Deserialize, Serialize};
11 : use std::collections::HashMap;
12 : use std::env;
13 : use std::fs;
14 : use std::net::IpAddr;
15 : use std::net::Ipv4Addr;
16 : use std::net::SocketAddr;
17 : use std::path::{Path, PathBuf};
18 : use std::process::{Command, Stdio};
19 : use utils::{
20 : auth::{encode_from_key_file, Claims},
21 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
22 : };
23 :
24 : use crate::safekeeper::SafekeeperNode;
25 :
26 : pub const DEFAULT_PG_VERSION: u32 = 15;
27 :
28 : //
29 : // This data structures represents neon_local CLI config
30 : //
31 : // It is deserialized from the .neon/config file, or the config file passed
32 : // to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
33 : // an example.
34 : //
35 CBC 105803 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
36 : pub struct LocalEnv {
37 : // Base directory for all the nodes (the pageserver, safekeepers and
38 : // compute endpoints).
39 : //
40 : // This is not stored in the config file. Rather, this is the path where the
41 : // config file itself is. It is read from the NEON_REPO_DIR env variable or
42 : // '.neon' if not given.
43 : #[serde(skip)]
44 : pub base_data_dir: PathBuf,
45 :
46 : // Path to postgres distribution. It's expected that "bin", "include",
47 : // "lib", "share" from postgres distribution are there. If at some point
48 : // in time we will be able to run against vanilla postgres we may split that
49 : // to four separate paths and match OS-specific installation layout.
50 : #[serde(default)]
51 : pub pg_distrib_dir: PathBuf,
52 :
53 : // Path to pageserver binary.
54 : #[serde(default)]
55 : pub neon_distrib_dir: PathBuf,
56 :
57 : // Default tenant ID to use with the 'neon_local' command line utility, when
58 : // --tenant_id is not explicitly specified.
59 : #[serde(default)]
60 : pub default_tenant_id: Option<TenantId>,
61 :
62 : // used to issue tokens during e.g pg start
63 : #[serde(default)]
64 : pub private_key_path: PathBuf,
65 :
66 : pub broker: NeonBroker,
67 :
68 : /// This Vec must always contain at least one pageserver
69 : pub pageservers: Vec<PageServerConf>,
70 :
71 : #[serde(default)]
72 : pub safekeepers: Vec<SafekeeperConf>,
73 :
74 : // Control plane location: if None, we will not run attachment_service. If set, this will
75 : // be propagated into each pageserver's configuration.
76 : #[serde(default)]
77 : pub control_plane_api: Option<Url>,
78 :
79 : /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
80 : #[serde(default)]
81 : // A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
82 : // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
83 : // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
84 : branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
85 : }
86 :
87 : /// Broker config for cluster internal communication.
88 20023 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
89 : #[serde(default)]
90 : pub struct NeonBroker {
91 : /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
92 : pub listen_addr: SocketAddr,
93 : }
94 :
95 : // Dummy Default impl to satisfy Deserialize derive.
96 : impl Default for NeonBroker {
97 5709 : fn default() -> Self {
98 5709 : NeonBroker {
99 5709 : listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
100 5709 : }
101 5709 : }
102 : }
103 :
104 : impl NeonBroker {
105 1395 : pub fn client_url(&self) -> Url {
106 1395 : Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
107 1395 : }
108 : }
109 :
110 71918 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
111 : #[serde(default)]
112 : pub struct PageServerConf {
113 : // node id
114 : pub id: NodeId,
115 :
116 : // Pageserver connection settings
117 : pub listen_pg_addr: String,
118 : pub listen_http_addr: String,
119 :
120 : // auth type used for the PG and HTTP ports
121 : pub pg_auth_type: AuthType,
122 : pub http_auth_type: AuthType,
123 : }
124 :
125 : impl Default for PageServerConf {
126 6538 : fn default() -> Self {
127 6538 : Self {
128 6538 : id: NodeId(0),
129 6538 : listen_pg_addr: String::new(),
130 6538 : listen_http_addr: String::new(),
131 6538 : pg_auth_type: AuthType::Trust,
132 6538 : http_auth_type: AuthType::Trust,
133 6538 : }
134 6538 : }
135 : }
136 :
137 95828 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
138 : #[serde(default)]
139 : pub struct SafekeeperConf {
140 : pub id: NodeId,
141 : pub pg_port: u16,
142 : pub pg_tenant_only_port: Option<u16>,
143 : pub http_port: u16,
144 : pub sync: bool,
145 : pub remote_storage: Option<String>,
146 : pub backup_threads: Option<u32>,
147 : pub auth_enabled: bool,
148 : }
149 :
150 : impl Default for SafekeeperConf {
151 7380 : fn default() -> Self {
152 7380 : Self {
153 7380 : id: NodeId(0),
154 7380 : pg_port: 0,
155 7380 : pg_tenant_only_port: None,
156 7380 : http_port: 0,
157 7380 : sync: true,
158 7380 : remote_storage: None,
159 7380 : backup_threads: None,
160 7380 : auth_enabled: false,
161 7380 : }
162 7380 : }
163 : }
164 :
165 : impl SafekeeperConf {
166 : /// Compute is served by port on which only tenant scoped tokens allowed, if
167 : /// it is configured.
168 1209 : pub fn get_compute_port(&self) -> u16 {
169 1209 : self.pg_tenant_only_port.unwrap_or(self.pg_port)
170 1209 : }
171 : }
172 :
173 : impl LocalEnv {
174 906 : pub fn pg_distrib_dir_raw(&self) -> PathBuf {
175 906 : self.pg_distrib_dir.clone()
176 906 : }
177 :
178 2486 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
179 2486 : let path = self.pg_distrib_dir.clone();
180 2486 :
181 2486 : #[allow(clippy::manual_range_patterns)]
182 2486 : match pg_version {
183 2486 : 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
184 UBC 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
185 : }
186 CBC 2486 : }
187 :
188 1414 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
189 1414 : Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
190 1414 : }
191 1072 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
192 1072 : Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
193 1072 : }
194 :
195 906 : pub fn pageserver_bin(&self) -> PathBuf {
196 906 : self.neon_distrib_dir.join("pageserver")
197 906 : }
198 :
199 336 : pub fn attachment_service_bin(&self) -> PathBuf {
200 336 : self.neon_distrib_dir.join("attachment_service")
201 336 : }
202 :
203 485 : pub fn safekeeper_bin(&self) -> PathBuf {
204 485 : self.neon_distrib_dir.join("safekeeper")
205 485 : }
206 :
207 3 : pub fn storage_broker_bin(&self) -> PathBuf {
208 3 : self.neon_distrib_dir.join("storage_broker")
209 3 : }
210 :
211 11243 : pub fn endpoints_path(&self) -> PathBuf {
212 11243 : self.base_data_dir.join("endpoints")
213 11243 : }
214 :
215 2020 : pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
216 2020 : self.base_data_dir
217 2020 : .join(format!("pageserver_{pageserver_id}"))
218 2020 : }
219 :
220 2374 : pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
221 2374 : self.base_data_dir.join("safekeepers").join(data_dir_name)
222 2374 : }
223 :
224 6007 : pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
225 6514 : if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
226 6006 : Ok(conf)
227 : } else {
228 1 : bail!("could not find pageserver {id}")
229 : }
230 6007 : }
231 :
232 746 : pub fn register_branch_mapping(
233 746 : &mut self,
234 746 : branch_name: String,
235 746 : tenant_id: TenantId,
236 746 : timeline_id: TimelineId,
237 746 : ) -> anyhow::Result<()> {
238 746 : let existing_values = self
239 746 : .branch_name_mappings
240 746 : .entry(branch_name.clone())
241 746 : .or_default();
242 746 :
243 746 : let existing_ids = existing_values
244 746 : .iter()
245 746 : .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
246 :
247 746 : if let Some((_, old_timeline_id)) = existing_ids {
248 1 : if old_timeline_id == &timeline_id {
249 1 : Ok(())
250 : } else {
251 UBC 0 : bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
252 : }
253 : } else {
254 CBC 745 : existing_values.push((tenant_id, timeline_id));
255 745 : Ok(())
256 : }
257 746 : }
258 :
259 744 : pub fn get_branch_timeline_id(
260 744 : &self,
261 744 : branch_name: &str,
262 744 : tenant_id: TenantId,
263 744 : ) -> Option<TimelineId> {
264 744 : self.branch_name_mappings
265 744 : .get(branch_name)?
266 744 : .iter()
267 837 : .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
268 744 : .map(|&(_, timeline_id)| timeline_id)
269 744 : .map(TimelineId::from)
270 744 : }
271 :
272 19 : pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
273 19 : self.branch_name_mappings
274 19 : .iter()
275 32 : .flat_map(|(name, tenant_timelines)| {
276 34 : tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
277 34 : (TenantTimelineId::new(tenant_id, timeline_id), name.clone())
278 34 : })
279 32 : })
280 19 : .collect()
281 19 : }
282 :
283 : /// Create a LocalEnv from a config file.
284 : ///
285 : /// Unlike 'load_config', this function fills in any defaults that are missing
286 : /// from the config file.
287 335 : pub fn parse_config(toml: &str) -> anyhow::Result<Self> {
288 335 : let mut env: LocalEnv = toml::from_str(toml)?;
289 :
290 : // Find postgres binaries.
291 : // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
292 : // Note that later in the code we assume, that distrib dirs follow the same pattern
293 : // for all postgres versions.
294 334 : if env.pg_distrib_dir == Path::new("") {
295 334 : if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
296 333 : env.pg_distrib_dir = postgres_bin.into();
297 333 : } else {
298 1 : let cwd = env::current_dir()?;
299 1 : env.pg_distrib_dir = cwd.join("pg_install")
300 : }
301 UBC 0 : }
302 :
303 : // Find neon binaries.
304 CBC 334 : if env.neon_distrib_dir == Path::new("") {
305 334 : env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
306 UBC 0 : }
307 :
308 CBC 334 : if env.pageservers.is_empty() {
309 UBC 0 : anyhow::bail!("Configuration must contain at least one pageserver");
310 CBC 334 : }
311 334 :
312 334 : env.base_data_dir = base_path();
313 334 :
314 334 : Ok(env)
315 335 : }
316 :
317 : /// Locate and load config
318 5375 : pub fn load_config() -> anyhow::Result<Self> {
319 5375 : let repopath = base_path();
320 5375 :
321 5375 : if !repopath.exists() {
322 UBC 0 : bail!(
323 0 : "Neon config is not found in {}. You need to run 'neon_local init' first",
324 0 : repopath.to_str().unwrap()
325 0 : );
326 CBC 5375 : }
327 :
328 : // TODO: check that it looks like a neon repository
329 :
330 : // load and parse file
331 5375 : let config = fs::read_to_string(repopath.join("config"))?;
332 5375 : let mut env: LocalEnv = toml::from_str(config.as_str())?;
333 :
334 5375 : env.base_data_dir = repopath;
335 5375 :
336 5375 : Ok(env)
337 5375 : }
338 :
339 1411 : pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
340 1411 : // Currently, the user first passes a config file with 'neon_local init --config=<path>'
341 1411 : // We read that in, in `create_config`, and fill any missing defaults. Then it's saved
342 1411 : // to .neon/config. TODO: We lose any formatting and comments along the way, which is
343 1411 : // a bit sad.
344 1411 : let mut conf_content = r#"# This file describes a local deployment of the page server
345 1411 : # and safekeeeper node. It is read by the 'neon_local' command-line
346 1411 : # utility.
347 1411 : "#
348 1411 : .to_string();
349 1411 :
350 1411 : // Convert the LocalEnv to a toml file.
351 1411 : //
352 1411 : // This could be as simple as this:
353 1411 : //
354 1411 : // conf_content += &toml::to_string_pretty(env)?;
355 1411 : //
356 1411 : // But it results in a "values must be emitted before tables". I'm not sure
357 1411 : // why, AFAICS the table, i.e. 'safekeepers: Vec<SafekeeperConf>' is last.
358 1411 : // Maybe rust reorders the fields to squeeze avoid padding or something?
359 1411 : // In any case, converting to toml::Value first, and serializing that, works.
360 1411 : // See https://github.com/alexcrichton/toml-rs/issues/142
361 1411 : conf_content += &toml::to_string_pretty(&toml::Value::try_from(self)?)?;
362 :
363 1411 : let target_config_path = base_path.join("config");
364 1411 : fs::write(&target_config_path, conf_content).with_context(|| {
365 UBC 0 : format!(
366 0 : "Failed to write config file into path '{}'",
367 0 : target_config_path.display()
368 0 : )
369 CBC 1411 : })
370 1411 : }
371 :
372 : // this function is used only for testing purposes in CLI e g generate tokens during init
373 163 : pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
374 163 : let private_key_path = if self.private_key_path.is_absolute() {
375 UBC 0 : self.private_key_path.to_path_buf()
376 : } else {
377 CBC 163 : self.base_data_dir.join(&self.private_key_path)
378 : };
379 :
380 163 : let key_data = fs::read(private_key_path)?;
381 163 : encode_from_key_file(claims, &key_data)
382 163 : }
383 :
384 : //
385 : // Initialize a new Neon repository
386 : //
387 333 : pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
388 333 : // check if config already exists
389 333 : let base_path = &self.base_data_dir;
390 333 : ensure!(
391 333 : base_path != Path::new(""),
392 UBC 0 : "repository base path is missing"
393 : );
394 :
395 CBC 333 : if base_path.exists() {
396 UBC 0 : if force {
397 0 : println!("removing all contents of '{}'", base_path.display());
398 : // instead of directly calling `remove_dir_all`, we keep the original dir but removing
399 : // all contents inside. This helps if the developer symbol links another directory (i.e.,
400 : // S3 local SSD) to the `.neon` base directory.
401 0 : for entry in std::fs::read_dir(base_path)? {
402 0 : let entry = entry?;
403 0 : let path = entry.path();
404 0 : if path.is_dir() {
405 0 : fs::remove_dir_all(&path)?;
406 : } else {
407 0 : fs::remove_file(&path)?;
408 : }
409 : }
410 : } else {
411 0 : bail!(
412 0 : "directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
413 0 : base_path.display()
414 0 : );
415 : }
416 CBC 333 : }
417 :
418 333 : if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
419 UBC 0 : bail!(
420 0 : "Can't find postgres binary at {}",
421 0 : self.pg_bin_dir(pg_version)?.display()
422 : );
423 CBC 333 : }
424 666 : for binary in ["pageserver", "safekeeper"] {
425 666 : if !self.neon_distrib_dir.join(binary).exists() {
426 UBC 0 : bail!(
427 0 : "Can't find binary '{binary}' in neon distrib dir '{}'",
428 0 : self.neon_distrib_dir.display()
429 0 : );
430 CBC 666 : }
431 : }
432 :
433 333 : if !base_path.exists() {
434 333 : fs::create_dir(base_path)?;
435 UBC 0 : }
436 :
437 : // Generate keypair for JWT.
438 : //
439 : // The keypair is only needed if authentication is enabled in any of the
440 : // components. For convenience, we generate the keypair even if authentication
441 : // is not enabled, so that you can easily enable it after the initialization
442 : // step. However, if the key generation fails, we treat it as non-fatal if
443 : // authentication was not enabled.
444 CBC 333 : if self.private_key_path == PathBuf::new() {
445 333 : match generate_auth_keys(
446 333 : base_path.join("auth_private_key.pem").as_path(),
447 333 : base_path.join("auth_public_key.pem").as_path(),
448 333 : ) {
449 333 : Ok(()) => {
450 333 : self.private_key_path = PathBuf::from("auth_private_key.pem");
451 333 : }
452 UBC 0 : Err(e) => {
453 0 : if !self.auth_keys_needed() {
454 0 : eprintln!("Could not generate keypair for JWT authentication: {e}");
455 0 : eprintln!("Continuing anyway because authentication was not enabled");
456 0 : self.private_key_path = PathBuf::from("auth_private_key.pem");
457 0 : } else {
458 0 : return Err(e);
459 : }
460 : }
461 : }
462 0 : }
463 :
464 CBC 333 : fs::create_dir_all(self.endpoints_path())?;
465 :
466 732 : for safekeeper in &self.safekeepers {
467 399 : fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
468 : }
469 :
470 333 : self.persist_config(base_path)
471 333 : }
472 :
473 UBC 0 : fn auth_keys_needed(&self) -> bool {
474 0 : self.pageservers.iter().any(|ps| {
475 0 : ps.pg_auth_type == AuthType::NeonJWT || ps.http_auth_type == AuthType::NeonJWT
476 0 : }) || self.safekeepers.iter().any(|sk| sk.auth_enabled)
477 0 : }
478 : }
479 :
480 CBC 5709 : fn base_path() -> PathBuf {
481 5709 : match std::env::var_os("NEON_REPO_DIR") {
482 5708 : Some(val) => PathBuf::from(val),
483 1 : None => PathBuf::from(".neon"),
484 : }
485 5709 : }
486 :
487 : /// Generate a public/private key pair for JWT authentication
488 333 : fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
489 : // Generate the key pair
490 : //
491 : // openssl genpkey -algorithm ed25519 -out auth_private_key.pem
492 333 : let keygen_output = Command::new("openssl")
493 333 : .arg("genpkey")
494 333 : .args(["-algorithm", "ed25519"])
495 333 : .args(["-out", private_key_path.to_str().unwrap()])
496 333 : .stdout(Stdio::null())
497 333 : .output()
498 333 : .context("failed to generate auth private key")?;
499 333 : if !keygen_output.status.success() {
500 UBC 0 : bail!(
501 0 : "openssl failed: '{}'",
502 0 : String::from_utf8_lossy(&keygen_output.stderr)
503 0 : );
504 CBC 333 : }
505 : // Extract the public key from the private key file
506 : //
507 : // openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
508 333 : let keygen_output = Command::new("openssl")
509 333 : .arg("pkey")
510 333 : .args(["-in", private_key_path.to_str().unwrap()])
511 333 : .arg("-pubout")
512 333 : .args(["-out", public_key_path.to_str().unwrap()])
513 333 : .output()
514 333 : .context("failed to extract public key from private key")?;
515 333 : if !keygen_output.status.success() {
516 UBC 0 : bail!(
517 0 : "openssl failed: '{}'",
518 0 : String::from_utf8_lossy(&keygen_output.stderr)
519 0 : );
520 CBC 333 : }
521 333 : Ok(())
522 333 : }
523 :
524 : #[cfg(test)]
525 : mod tests {
526 : use super::*;
527 :
528 1 : #[test]
529 1 : fn simple_conf_parsing() {
530 1 : let simple_conf_toml = include_str!("../simple.conf");
531 1 : let simple_conf_parse_result = LocalEnv::parse_config(simple_conf_toml);
532 1 : assert!(
533 1 : simple_conf_parse_result.is_ok(),
534 UBC 0 : "failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}"
535 : );
536 :
537 CBC 1 : let string_to_replace = "listen_addr = '127.0.0.1:50051'";
538 1 : let spoiled_url_str = "listen_addr = '!@$XOXO%^&'";
539 1 : let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str);
540 1 : assert!(
541 1 : spoiled_url_toml.contains(spoiled_url_str),
542 UBC 0 : "Failed to replace string {string_to_replace} in the toml file {simple_conf_toml}"
543 : );
544 CBC 1 : let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml);
545 1 : assert!(
546 1 : spoiled_url_parse_result.is_err(),
547 UBC 0 : "expected toml with invalid Url {spoiled_url_toml} to fail the parsing, but got {spoiled_url_parse_result:?}"
548 : );
549 CBC 1 : }
550 : }
|