TLA Line data Source code
1 : //! This module is responsible for locating and loading paths in a local setup.
2 : //!
3 : //! Now it also provides init method which acts like a stub for proper installation
4 : //! script which will use local paths.
5 :
6 : use anyhow::{bail, ensure, Context};
7 :
8 : use postgres_backend::AuthType;
9 : use reqwest::Url;
10 : use serde::{Deserialize, Serialize};
11 : use serde_with::{serde_as, DisplayFromStr};
12 : use std::collections::HashMap;
13 : use std::env;
14 : use std::fs;
15 : use std::net::IpAddr;
16 : use std::net::Ipv4Addr;
17 : use std::net::SocketAddr;
18 : use std::path::{Path, PathBuf};
19 : use std::process::{Command, Stdio};
20 : use utils::{
21 : auth::{encode_from_key_file, Claims},
22 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
23 : };
24 :
25 : use crate::safekeeper::SafekeeperNode;
26 :
27 : pub const DEFAULT_PG_VERSION: u32 = 15;
28 :
29 : //
30 : // This data structures represents neon_local CLI config
31 : //
32 : // It is deserialized from the .neon/config file, or the config file passed
33 : // to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
34 : // an example.
35 : //
36 : #[serde_as]
37 CBC 95864 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
38 : pub struct LocalEnv {
39 : // Base directory for all the nodes (the pageserver, safekeepers and
40 : // compute endpoints).
41 : //
42 : // This is not stored in the config file. Rather, this is the path where the
43 : // config file itself is. It is read from the NEON_REPO_DIR env variable or
44 : // '.neon' if not given.
45 : #[serde(skip)]
46 : pub base_data_dir: PathBuf,
47 :
48 : // Path to postgres distribution. It's expected that "bin", "include",
49 : // "lib", "share" from postgres distribution are there. If at some point
50 : // in time we will be able to run against vanilla postgres we may split that
51 : // to four separate paths and match OS-specific installation layout.
52 : #[serde(default)]
53 : pub pg_distrib_dir: PathBuf,
54 :
55 : // Path to pageserver binary.
56 : #[serde(default)]
57 : pub neon_distrib_dir: PathBuf,
58 :
59 : // Default tenant ID to use with the 'neon_local' command line utility, when
60 : // --tenant_id is not explicitly specified.
61 : #[serde(default)]
62 : #[serde_as(as = "Option<DisplayFromStr>")]
63 : pub default_tenant_id: Option<TenantId>,
64 :
65 : // used to issue tokens during e.g pg start
66 : #[serde(default)]
67 : pub private_key_path: PathBuf,
68 :
69 : pub broker: NeonBroker,
70 :
71 : /// This Vec must always contain at least one pageserver
72 : pub pageservers: Vec<PageServerConf>,
73 :
74 : #[serde(default)]
75 : pub safekeepers: Vec<SafekeeperConf>,
76 :
77 : // Control plane location: if None, we will not run attachment_service. If set, this will
78 : // be propagated into each pageserver's configuration.
79 : #[serde(default)]
80 : pub control_plane_api: Option<Url>,
81 :
82 : /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
83 : #[serde(default)]
84 : // A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
85 : // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
86 : // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
87 : #[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
88 : branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
89 : }
90 :
91 : /// Broker config for cluster internal communication.
92 22727 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
93 : #[serde(default)]
94 : pub struct NeonBroker {
95 : /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
96 : pub listen_addr: SocketAddr,
97 : }
98 :
99 : // Dummy Default impl to satisfy Deserialize derive.
100 : impl Default for NeonBroker {
101 5188 : fn default() -> Self {
102 5188 : NeonBroker {
103 5188 : listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
104 5188 : }
105 5188 : }
106 : }
107 :
108 : impl NeonBroker {
109 1977 : pub fn client_url(&self) -> Url {
110 1977 : Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
111 1977 : }
112 : }
113 :
114 57266 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
115 : #[serde(default)]
116 : pub struct PageServerConf {
117 : // node id
118 : pub id: NodeId,
119 :
120 : // Pageserver connection settings
121 : pub listen_pg_addr: String,
122 : pub listen_http_addr: String,
123 :
124 : // auth type used for the PG and HTTP ports
125 : pub pg_auth_type: AuthType,
126 : pub http_auth_type: AuthType,
127 : }
128 :
129 : impl Default for PageServerConf {
130 5206 : fn default() -> Self {
131 5206 : Self {
132 5206 : id: NodeId(0),
133 5206 : listen_pg_addr: String::new(),
134 5206 : listen_http_addr: String::new(),
135 5206 : pg_auth_type: AuthType::Trust,
136 5206 : http_auth_type: AuthType::Trust,
137 5206 : }
138 5206 : }
139 : }
140 :
141 88056 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
142 : #[serde(default)]
143 : pub struct SafekeeperConf {
144 : pub id: NodeId,
145 : pub pg_port: u16,
146 : pub pg_tenant_only_port: Option<u16>,
147 : pub http_port: u16,
148 : pub sync: bool,
149 : pub remote_storage: Option<String>,
150 : pub backup_threads: Option<u32>,
151 : pub auth_enabled: bool,
152 : }
153 :
154 : impl Default for SafekeeperConf {
155 6770 : fn default() -> Self {
156 6770 : Self {
157 6770 : id: NodeId(0),
158 6770 : pg_port: 0,
159 6770 : pg_tenant_only_port: None,
160 6770 : http_port: 0,
161 6770 : sync: true,
162 6770 : remote_storage: None,
163 6770 : backup_threads: None,
164 6770 : auth_enabled: false,
165 6770 : }
166 6770 : }
167 : }
168 :
169 : impl SafekeeperConf {
170 : /// Compute is served by port on which only tenant scoped tokens allowed, if
171 : /// it is configured.
172 1400 : pub fn get_compute_port(&self) -> u16 {
173 1400 : self.pg_tenant_only_port.unwrap_or(self.pg_port)
174 1400 : }
175 : }
176 :
177 : impl LocalEnv {
178 1469 : pub fn pg_distrib_dir_raw(&self) -> PathBuf {
179 1469 : self.pg_distrib_dir.clone()
180 1469 : }
181 :
182 2891 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
183 2891 : let path = self.pg_distrib_dir.clone();
184 2891 :
185 2891 : #[allow(clippy::manual_range_patterns)]
186 2891 : match pg_version {
187 2891 : 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
188 UBC 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
189 : }
190 CBC 2891 : }
191 :
192 1623 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
193 1623 : Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
194 1623 : }
195 1268 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
196 1268 : Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
197 1268 : }
198 :
199 909 : pub fn pageserver_bin(&self) -> PathBuf {
200 909 : self.neon_distrib_dir.join("pageserver")
201 909 : }
202 :
203 13 : pub fn attachment_service_bin(&self) -> PathBuf {
204 13 : self.neon_distrib_dir.join("attachment_service")
205 13 : }
206 :
207 500 : pub fn safekeeper_bin(&self) -> PathBuf {
208 500 : self.neon_distrib_dir.join("safekeeper")
209 500 : }
210 :
211 5 : pub fn storage_broker_bin(&self) -> PathBuf {
212 5 : self.neon_distrib_dir.join("storage_broker")
213 5 : }
214 :
215 10407 : pub fn endpoints_path(&self) -> PathBuf {
216 10407 : self.base_data_dir.join("endpoints")
217 10407 : }
218 :
219 2032 : pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
220 2032 : self.base_data_dir
221 2032 : .join(format!("pageserver_{pageserver_id}"))
222 2032 : }
223 :
224 2439 : pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
225 2439 : self.base_data_dir.join("safekeepers").join(data_dir_name)
226 2439 : }
227 :
228 : pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
229 8086 : if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
230 8081 : Ok(conf)
231 : } else {
232 1 : bail!("could not find pageserver {id}")
233 : }
234 8082 : }
235 :
236 803 : pub fn register_branch_mapping(
237 803 : &mut self,
238 803 : branch_name: String,
239 803 : tenant_id: TenantId,
240 803 : timeline_id: TimelineId,
241 803 : ) -> anyhow::Result<()> {
242 803 : let existing_values = self
243 803 : .branch_name_mappings
244 803 : .entry(branch_name.clone())
245 803 : .or_default();
246 803 :
247 803 : let existing_ids = existing_values
248 803 : .iter()
249 803 : .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
250 :
251 803 : if let Some((_, old_timeline_id)) = existing_ids {
252 1 : if old_timeline_id == &timeline_id {
253 1 : Ok(())
254 : } else {
255 UBC 0 : bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
256 : }
257 : } else {
258 CBC 802 : existing_values.push((tenant_id, timeline_id));
259 802 : Ok(())
260 : }
261 803 : }
262 :
263 808 : pub fn get_branch_timeline_id(
264 808 : &self,
265 808 : branch_name: &str,
266 808 : tenant_id: TenantId,
267 808 : ) -> Option<TimelineId> {
268 808 : self.branch_name_mappings
269 808 : .get(branch_name)?
270 808 : .iter()
271 913 : .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
272 808 : .map(|&(_, timeline_id)| timeline_id)
273 808 : .map(TimelineId::from)
274 808 : }
275 :
276 15 : pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
277 15 : self.branch_name_mappings
278 15 : .iter()
279 32 : .flat_map(|(name, tenant_timelines)| {
280 34 : tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
281 34 : (TenantTimelineId::new(tenant_id, timeline_id), name.clone())
282 34 : })
283 32 : })
284 15 : .collect()
285 15 : }
286 :
287 : /// Create a LocalEnv from a config file.
288 : ///
289 : /// Unlike 'load_config', this function fills in any defaults that are missing
290 : /// from the config file.
291 350 : pub fn parse_config(toml: &str) -> anyhow::Result<Self> {
292 350 : let mut env: LocalEnv = toml::from_str(toml)?;
293 :
294 : // Find postgres binaries.
295 : // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
296 : // Note that later in the code we assume, that distrib dirs follow the same pattern
297 : // for all postgres versions.
298 349 : if env.pg_distrib_dir == Path::new("") {
299 349 : if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
300 348 : env.pg_distrib_dir = postgres_bin.into();
301 348 : } else {
302 1 : let cwd = env::current_dir()?;
303 1 : env.pg_distrib_dir = cwd.join("pg_install")
304 : }
305 UBC 0 : }
306 :
307 : // Find neon binaries.
308 CBC 349 : if env.neon_distrib_dir == Path::new("") {
309 349 : env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
310 UBC 0 : }
311 :
312 CBC 349 : if env.pageservers.is_empty() {
313 UBC 0 : anyhow::bail!("Configuration must contain at least one pageserver");
314 CBC 349 : }
315 349 :
316 349 : env.base_data_dir = base_path();
317 349 :
318 349 : Ok(env)
319 350 : }
320 :
321 : /// Locate and load config
322 4839 : pub fn load_config() -> anyhow::Result<Self> {
323 4839 : let repopath = base_path();
324 4839 :
325 4839 : if !repopath.exists() {
326 UBC 0 : bail!(
327 0 : "Neon config is not found in {}. You need to run 'neon_local init' first",
328 0 : repopath.to_str().unwrap()
329 0 : );
330 CBC 4839 : }
331 :
332 : // TODO: check that it looks like a neon repository
333 :
334 : // load and parse file
335 4839 : let config = fs::read_to_string(repopath.join("config"))?;
336 4839 : let mut env: LocalEnv = toml::from_str(config.as_str())?;
337 :
338 4839 : env.base_data_dir = repopath;
339 4839 :
340 4839 : Ok(env)
341 4839 : }
342 :
343 1498 : pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> {
344 1498 : // Currently, the user first passes a config file with 'neon_local init --config=<path>'
345 1498 : // We read that in, in `create_config`, and fill any missing defaults. Then it's saved
346 1498 : // to .neon/config. TODO: We lose any formatting and comments along the way, which is
347 1498 : // a bit sad.
348 1498 : let mut conf_content = r#"# This file describes a local deployment of the page server
349 1498 : # and safekeeeper node. It is read by the 'neon_local' command-line
350 1498 : # utility.
351 1498 : "#
352 1498 : .to_string();
353 1498 :
354 1498 : // Convert the LocalEnv to a toml file.
355 1498 : //
356 1498 : // This could be as simple as this:
357 1498 : //
358 1498 : // conf_content += &toml::to_string_pretty(env)?;
359 1498 : //
360 1498 : // But it results in a "values must be emitted before tables". I'm not sure
361 1498 : // why, AFAICS the table, i.e. 'safekeepers: Vec<SafekeeperConf>' is last.
362 1498 : // Maybe rust reorders the fields to squeeze avoid padding or something?
363 1498 : // In any case, converting to toml::Value first, and serializing that, works.
364 1498 : // See https://github.com/alexcrichton/toml-rs/issues/142
365 1498 : conf_content += &toml::to_string_pretty(&toml::Value::try_from(self)?)?;
366 :
367 1498 : let target_config_path = base_path.join("config");
368 1498 : fs::write(&target_config_path, conf_content).with_context(|| {
369 UBC 0 : format!(
370 0 : "Failed to write config file into path '{}'",
371 0 : target_config_path.display()
372 0 : )
373 CBC 1498 : })
374 1498 : }
375 :
376 : // this function is used only for testing purposes in CLI e g generate tokens during init
377 81 : pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
378 81 : let private_key_path = if self.private_key_path.is_absolute() {
379 UBC 0 : self.private_key_path.to_path_buf()
380 : } else {
381 CBC 81 : self.base_data_dir.join(&self.private_key_path)
382 : };
383 :
384 81 : let key_data = fs::read(private_key_path)?;
385 81 : encode_from_key_file(claims, &key_data)
386 81 : }
387 :
388 : //
389 : // Initialize a new Neon repository
390 : //
391 348 : pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
392 348 : // check if config already exists
393 348 : let base_path = &self.base_data_dir;
394 348 : ensure!(
395 348 : base_path != Path::new(""),
396 UBC 0 : "repository base path is missing"
397 : );
398 :
399 CBC 348 : if base_path.exists() {
400 UBC 0 : if force {
401 0 : println!("removing all contents of '{}'", base_path.display());
402 : // instead of directly calling `remove_dir_all`, we keep the original dir but removing
403 : // all contents inside. This helps if the developer symbol links another directory (i.e.,
404 : // S3 local SSD) to the `.neon` base directory.
405 0 : for entry in std::fs::read_dir(base_path)? {
406 0 : let entry = entry?;
407 0 : let path = entry.path();
408 0 : if path.is_dir() {
409 0 : fs::remove_dir_all(&path)?;
410 : } else {
411 0 : fs::remove_file(&path)?;
412 : }
413 : }
414 : } else {
415 0 : bail!(
416 0 : "directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
417 0 : base_path.display()
418 0 : );
419 : }
420 CBC 348 : }
421 :
422 348 : if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
423 UBC 0 : bail!(
424 0 : "Can't find postgres binary at {}",
425 0 : self.pg_bin_dir(pg_version)?.display()
426 : );
427 CBC 348 : }
428 696 : for binary in ["pageserver", "safekeeper"] {
429 696 : if !self.neon_distrib_dir.join(binary).exists() {
430 UBC 0 : bail!(
431 0 : "Can't find binary '{binary}' in neon distrib dir '{}'",
432 0 : self.neon_distrib_dir.display()
433 0 : );
434 CBC 696 : }
435 : }
436 :
437 348 : if !base_path.exists() {
438 348 : fs::create_dir(base_path)?;
439 UBC 0 : }
440 :
441 : // Generate keypair for JWT.
442 : //
443 : // The keypair is only needed if authentication is enabled in any of the
444 : // components. For convenience, we generate the keypair even if authentication
445 : // is not enabled, so that you can easily enable it after the initialization
446 : // step. However, if the key generation fails, we treat it as non-fatal if
447 : // authentication was not enabled.
448 CBC 348 : if self.private_key_path == PathBuf::new() {
449 348 : match generate_auth_keys(
450 348 : base_path.join("auth_private_key.pem").as_path(),
451 348 : base_path.join("auth_public_key.pem").as_path(),
452 348 : ) {
453 348 : Ok(()) => {
454 348 : self.private_key_path = PathBuf::from("auth_private_key.pem");
455 348 : }
456 UBC 0 : Err(e) => {
457 0 : if !self.auth_keys_needed() {
458 0 : eprintln!("Could not generate keypair for JWT authentication: {e}");
459 0 : eprintln!("Continuing anyway because authentication was not enabled");
460 0 : self.private_key_path = PathBuf::from("auth_private_key.pem");
461 0 : } else {
462 0 : return Err(e);
463 : }
464 : }
465 : }
466 0 : }
467 :
468 CBC 348 : fs::create_dir_all(self.endpoints_path())?;
469 :
470 756 : for safekeeper in &self.safekeepers {
471 408 : fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
472 : }
473 :
474 348 : self.persist_config(base_path)
475 348 : }
476 :
477 UBC 0 : fn auth_keys_needed(&self) -> bool {
478 0 : self.pageservers.iter().any(|ps| {
479 0 : ps.pg_auth_type == AuthType::NeonJWT || ps.http_auth_type == AuthType::NeonJWT
480 0 : }) || self.safekeepers.iter().any(|sk| sk.auth_enabled)
481 0 : }
482 : }
483 :
484 CBC 5188 : fn base_path() -> PathBuf {
485 5188 : match std::env::var_os("NEON_REPO_DIR") {
486 5187 : Some(val) => PathBuf::from(val),
487 1 : None => PathBuf::from(".neon"),
488 : }
489 5188 : }
490 :
491 : /// Generate a public/private key pair for JWT authentication
492 348 : fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
493 : // Generate the key pair
494 : //
495 : // openssl genpkey -algorithm ed25519 -out auth_private_key.pem
496 348 : let keygen_output = Command::new("openssl")
497 348 : .arg("genpkey")
498 348 : .args(["-algorithm", "ed25519"])
499 348 : .args(["-out", private_key_path.to_str().unwrap()])
500 348 : .stdout(Stdio::null())
501 348 : .output()
502 348 : .context("failed to generate auth private key")?;
503 348 : if !keygen_output.status.success() {
504 UBC 0 : bail!(
505 0 : "openssl failed: '{}'",
506 0 : String::from_utf8_lossy(&keygen_output.stderr)
507 0 : );
508 CBC 348 : }
509 : // Extract the public key from the private key file
510 : //
511 : // openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
512 348 : let keygen_output = Command::new("openssl")
513 348 : .arg("pkey")
514 348 : .args(["-in", private_key_path.to_str().unwrap()])
515 348 : .arg("-pubout")
516 348 : .args(["-out", public_key_path.to_str().unwrap()])
517 348 : .output()
518 348 : .context("failed to extract public key from private key")?;
519 348 : if !keygen_output.status.success() {
520 UBC 0 : bail!(
521 0 : "openssl failed: '{}'",
522 0 : String::from_utf8_lossy(&keygen_output.stderr)
523 0 : );
524 CBC 348 : }
525 348 : Ok(())
526 348 : }
527 :
528 : #[cfg(test)]
529 : mod tests {
530 : use super::*;
531 :
532 1 : #[test]
533 1 : fn simple_conf_parsing() {
534 1 : let simple_conf_toml = include_str!("../simple.conf");
535 1 : let simple_conf_parse_result = LocalEnv::parse_config(simple_conf_toml);
536 1 : assert!(
537 1 : simple_conf_parse_result.is_ok(),
538 UBC 0 : "failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}"
539 : );
540 :
541 CBC 1 : let string_to_replace = "listen_addr = '127.0.0.1:50051'";
542 1 : let spoiled_url_str = "listen_addr = '!@$XOXO%^&'";
543 1 : let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str);
544 1 : assert!(
545 1 : spoiled_url_toml.contains(spoiled_url_str),
546 UBC 0 : "Failed to replace string {string_to_replace} in the toml file {simple_conf_toml}"
547 : );
548 CBC 1 : let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml);
549 1 : assert!(
550 1 : spoiled_url_parse_result.is_err(),
551 UBC 0 : "expected toml with invalid Url {spoiled_url_toml} to fail the parsing, but got {spoiled_url_parse_result:?}"
552 : );
553 CBC 1 : }
554 : }
|