Line data Source code
1 : //! This module is responsible for locating and loading paths in a local setup.
2 : //!
3 : //! Now it also provides init method which acts like a stub for proper installation
4 : //! script which will use local paths.
5 :
6 : use std::collections::HashMap;
7 : use std::net::{IpAddr, Ipv4Addr, SocketAddr};
8 : use std::path::{Path, PathBuf};
9 : use std::process::{Command, Stdio};
10 : use std::time::Duration;
11 : use std::{env, fs};
12 :
13 : use anyhow::{Context, bail};
14 : use clap::ValueEnum;
15 : use postgres_backend::AuthType;
16 : use reqwest::Url;
17 : use serde::{Deserialize, Serialize};
18 : use utils::auth::encode_from_key_file;
19 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
20 :
21 : use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
22 : use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
23 : use crate::safekeeper::SafekeeperNode;
24 :
25 : pub const DEFAULT_PG_VERSION: u32 = 16;
26 :
27 : //
28 : // This data structures represents neon_local CLI config
29 : //
30 : // It is deserialized from the .neon/config file, or the config file passed
31 : // to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
32 : // an example.
33 : //
34 : #[derive(PartialEq, Eq, Clone, Debug)]
35 : pub struct LocalEnv {
36 : // Base directory for all the nodes (the pageserver, safekeepers and
37 : // compute endpoints).
38 : //
39 : // This is not stored in the config file. Rather, this is the path where the
40 : // config file itself is. It is read from the NEON_REPO_DIR env variable which
41 : // must be an absolute path. If the env var is not set, $PWD/.neon is used.
42 : pub base_data_dir: PathBuf,
43 :
44 : // Path to postgres distribution. It's expected that "bin", "include",
45 : // "lib", "share" from postgres distribution are there. If at some point
46 : // in time we will be able to run against vanilla postgres we may split that
47 : // to four separate paths and match OS-specific installation layout.
48 : pub pg_distrib_dir: PathBuf,
49 :
50 : // Path to pageserver binary.
51 : pub neon_distrib_dir: PathBuf,
52 :
53 : // Default tenant ID to use with the 'neon_local' command line utility, when
54 : // --tenant_id is not explicitly specified.
55 : pub default_tenant_id: Option<TenantId>,
56 :
57 : // used to issue tokens during e.g pg start
58 : pub private_key_path: PathBuf,
59 : pub public_key_path: PathBuf,
60 :
61 : pub broker: NeonBroker,
62 :
63 : // Configuration for the storage controller (1 per neon_local environment)
64 : pub storage_controller: NeonStorageControllerConf,
65 :
66 : /// This Vec must always contain at least one pageserver
67 : /// Populdated by [`Self::load_config`] from the individual `pageserver.toml`s.
68 : /// NB: not used anymore except for informing users that they need to change their `.neon/config`.
69 : pub pageservers: Vec<PageServerConf>,
70 :
71 : pub safekeepers: Vec<SafekeeperConf>,
72 :
73 : pub object_storage: ObjectStorageConf,
74 :
75 : // Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
76 : // be propagated into each pageserver's configuration.
77 : pub control_plane_api: Url,
78 :
79 : // Control plane upcall APIs for storage controller. If set, this will be propagated into the
80 : // storage controller's configuration.
81 : pub control_plane_hooks_api: Option<Url>,
82 :
83 : /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
84 : // A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
85 : // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
86 : // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
87 : pub branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
88 :
89 : /// Flag to generate SSL certificates for components that need it.
90 : /// Also generates root CA certificate that is used to sign all other certificates.
91 : pub generate_local_ssl_certs: bool,
92 : }
93 :
94 : /// On-disk state stored in `.neon/config`.
95 0 : #[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)]
96 : #[serde(default, deny_unknown_fields)]
97 : pub struct OnDiskConfig {
98 : pub pg_distrib_dir: PathBuf,
99 : pub neon_distrib_dir: PathBuf,
100 : pub default_tenant_id: Option<TenantId>,
101 : pub private_key_path: PathBuf,
102 : pub public_key_path: PathBuf,
103 : pub broker: NeonBroker,
104 : pub storage_controller: NeonStorageControllerConf,
105 : #[serde(
106 : skip_serializing,
107 : deserialize_with = "fail_if_pageservers_field_specified"
108 : )]
109 : pub pageservers: Vec<PageServerConf>,
110 : pub safekeepers: Vec<SafekeeperConf>,
111 : pub object_storage: ObjectStorageConf,
112 : pub control_plane_api: Option<Url>,
113 : pub control_plane_hooks_api: Option<Url>,
114 : pub control_plane_compute_hook_api: Option<Url>,
115 : branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
116 : // Note: skip serializing because in compat tests old storage controller fails
117 : // to load new config file. May be removed after this field is in release branch.
118 : #[serde(skip_serializing_if = "std::ops::Not::not")]
119 : pub generate_local_ssl_certs: bool,
120 : }
121 :
122 0 : fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
123 0 : where
124 0 : D: serde::Deserializer<'de>,
125 0 : {
126 0 : Err(serde::de::Error::custom(
127 0 : "The 'pageservers' field is no longer used; pageserver.toml is now authoritative; \
128 0 : Please remove the `pageservers` from your .neon/config.",
129 0 : ))
130 0 : }
131 :
132 : /// The description of the neon_local env to be initialized by `neon_local init --config`.
133 0 : #[derive(Clone, Debug, Deserialize)]
134 : #[serde(deny_unknown_fields)]
135 : pub struct NeonLocalInitConf {
136 : // TODO: do we need this? Seems unused
137 : pub pg_distrib_dir: Option<PathBuf>,
138 : // TODO: do we need this? Seems unused
139 : pub neon_distrib_dir: Option<PathBuf>,
140 : pub default_tenant_id: TenantId,
141 : pub broker: NeonBroker,
142 : pub storage_controller: Option<NeonStorageControllerConf>,
143 : pub pageservers: Vec<NeonLocalInitPageserverConf>,
144 : pub safekeepers: Vec<SafekeeperConf>,
145 : pub object_storage: ObjectStorageConf,
146 : pub control_plane_api: Option<Url>,
147 : pub control_plane_hooks_api: Option<Url>,
148 : pub generate_local_ssl_certs: bool,
149 : }
150 :
151 0 : #[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
152 : #[serde(default)]
153 : pub struct ObjectStorageConf {
154 : pub port: u16,
155 : }
156 :
157 : /// Broker config for cluster internal communication.
158 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
159 : #[serde(default)]
160 : pub struct NeonBroker {
161 : /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
162 : pub listen_addr: SocketAddr,
163 : }
164 :
165 : /// A part of storage controller's config the neon_local knows about.
166 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
167 : #[serde(default)]
168 : pub struct NeonStorageControllerConf {
169 : /// Heartbeat timeout before marking a node offline
170 : #[serde(with = "humantime_serde")]
171 : pub max_offline: Duration,
172 :
173 : #[serde(with = "humantime_serde")]
174 : pub max_warming_up: Duration,
175 :
176 : pub start_as_candidate: bool,
177 :
178 : /// Database url used when running multiple storage controller instances
179 : pub database_url: Option<SocketAddr>,
180 :
181 : /// Thresholds for auto-splitting a tenant into shards.
182 : pub split_threshold: Option<u64>,
183 : pub max_split_shards: Option<u8>,
184 : pub initial_split_threshold: Option<u64>,
185 : pub initial_split_shards: Option<u8>,
186 :
187 : pub max_secondary_lag_bytes: Option<u64>,
188 :
189 : #[serde(with = "humantime_serde")]
190 : pub heartbeat_interval: Duration,
191 :
192 : #[serde(with = "humantime_serde")]
193 : pub long_reconcile_threshold: Option<Duration>,
194 :
195 : pub use_https_pageserver_api: bool,
196 :
197 : pub timelines_onto_safekeepers: bool,
198 :
199 : pub use_https_safekeeper_api: bool,
200 :
201 : pub use_local_compute_notifications: bool,
202 : }
203 :
204 : impl NeonStorageControllerConf {
205 : // Use a shorter pageserver unavailability interval than the default to speed up tests.
206 : const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
207 :
208 : const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
209 :
210 : // Very tight heartbeat interval to speed up tests
211 : const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
212 : }
213 :
214 : impl Default for NeonStorageControllerConf {
215 0 : fn default() -> Self {
216 0 : Self {
217 0 : max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL,
218 0 : max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL,
219 0 : start_as_candidate: false,
220 0 : database_url: None,
221 0 : split_threshold: None,
222 0 : max_split_shards: None,
223 0 : initial_split_threshold: None,
224 0 : initial_split_shards: None,
225 0 : max_secondary_lag_bytes: None,
226 0 : heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
227 0 : long_reconcile_threshold: None,
228 0 : use_https_pageserver_api: false,
229 0 : timelines_onto_safekeepers: false,
230 0 : use_https_safekeeper_api: false,
231 0 : use_local_compute_notifications: true,
232 0 : }
233 0 : }
234 : }
235 :
236 : // Dummy Default impl to satisfy Deserialize derive.
237 : impl Default for NeonBroker {
238 0 : fn default() -> Self {
239 0 : NeonBroker {
240 0 : listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
241 0 : }
242 0 : }
243 : }
244 :
245 : impl NeonBroker {
246 0 : pub fn client_url(&self) -> Url {
247 0 : Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
248 0 : }
249 : }
250 :
251 : // neon_local needs to know this subset of pageserver configuration.
252 : // For legacy reasons, this information is duplicated from `pageserver.toml` into `.neon/config`.
253 : // It can get stale if `pageserver.toml` is changed.
254 : // TODO(christian): don't store this at all in `.neon/config`, always load it from `pageserver.toml`
255 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
256 : #[serde(default, deny_unknown_fields)]
257 : pub struct PageServerConf {
258 : pub id: NodeId,
259 : pub listen_pg_addr: String,
260 : pub listen_http_addr: String,
261 : pub listen_https_addr: Option<String>,
262 : pub pg_auth_type: AuthType,
263 : pub http_auth_type: AuthType,
264 : pub no_sync: bool,
265 : }
266 :
267 : impl Default for PageServerConf {
268 0 : fn default() -> Self {
269 0 : Self {
270 0 : id: NodeId(0),
271 0 : listen_pg_addr: String::new(),
272 0 : listen_http_addr: String::new(),
273 0 : listen_https_addr: None,
274 0 : pg_auth_type: AuthType::Trust,
275 0 : http_auth_type: AuthType::Trust,
276 0 : no_sync: false,
277 0 : }
278 0 : }
279 : }
280 :
281 : /// The toml that can be passed to `neon_local init --config`.
282 : /// This is a subset of the `pageserver.toml` configuration.
283 : // TODO(christian): use pageserver_api::config::ConfigToml (PR #7656)
284 0 : #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
285 : pub struct NeonLocalInitPageserverConf {
286 : pub id: NodeId,
287 : pub listen_pg_addr: String,
288 : pub listen_http_addr: String,
289 : pub listen_https_addr: Option<String>,
290 : pub pg_auth_type: AuthType,
291 : pub http_auth_type: AuthType,
292 : #[serde(default, skip_serializing_if = "std::ops::Not::not")]
293 : pub no_sync: bool,
294 : #[serde(flatten)]
295 : pub other: HashMap<String, toml::Value>,
296 : }
297 :
298 : impl From<&NeonLocalInitPageserverConf> for PageServerConf {
299 0 : fn from(conf: &NeonLocalInitPageserverConf) -> Self {
300 0 : let NeonLocalInitPageserverConf {
301 0 : id,
302 0 : listen_pg_addr,
303 0 : listen_http_addr,
304 0 : listen_https_addr,
305 0 : pg_auth_type,
306 0 : http_auth_type,
307 0 : no_sync,
308 0 : other: _,
309 0 : } = conf;
310 0 : Self {
311 0 : id: *id,
312 0 : listen_pg_addr: listen_pg_addr.clone(),
313 0 : listen_http_addr: listen_http_addr.clone(),
314 0 : listen_https_addr: listen_https_addr.clone(),
315 0 : pg_auth_type: *pg_auth_type,
316 0 : http_auth_type: *http_auth_type,
317 0 : no_sync: *no_sync,
318 0 : }
319 0 : }
320 : }
321 :
322 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
323 : #[serde(default)]
324 : pub struct SafekeeperConf {
325 : pub id: NodeId,
326 : pub pg_port: u16,
327 : pub pg_tenant_only_port: Option<u16>,
328 : pub http_port: u16,
329 : pub https_port: Option<u16>,
330 : pub sync: bool,
331 : pub remote_storage: Option<String>,
332 : pub backup_threads: Option<u32>,
333 : pub auth_enabled: bool,
334 : pub listen_addr: Option<String>,
335 : }
336 :
337 : impl Default for SafekeeperConf {
338 0 : fn default() -> Self {
339 0 : Self {
340 0 : id: NodeId(0),
341 0 : pg_port: 0,
342 0 : pg_tenant_only_port: None,
343 0 : http_port: 0,
344 0 : https_port: None,
345 0 : sync: true,
346 0 : remote_storage: None,
347 0 : backup_threads: None,
348 0 : auth_enabled: false,
349 0 : listen_addr: None,
350 0 : }
351 0 : }
352 : }
353 :
354 : #[derive(Clone, Copy)]
355 : pub enum InitForceMode {
356 : MustNotExist,
357 : EmptyDirOk,
358 : RemoveAllContents,
359 : }
360 :
361 : impl ValueEnum for InitForceMode {
362 0 : fn value_variants<'a>() -> &'a [Self] {
363 0 : &[
364 0 : Self::MustNotExist,
365 0 : Self::EmptyDirOk,
366 0 : Self::RemoveAllContents,
367 0 : ]
368 0 : }
369 :
370 0 : fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
371 0 : Some(clap::builder::PossibleValue::new(match self {
372 0 : InitForceMode::MustNotExist => "must-not-exist",
373 0 : InitForceMode::EmptyDirOk => "empty-dir-ok",
374 0 : InitForceMode::RemoveAllContents => "remove-all-contents",
375 : }))
376 0 : }
377 : }
378 :
379 : impl SafekeeperConf {
380 : /// Compute is served by port on which only tenant scoped tokens allowed, if
381 : /// it is configured.
382 0 : pub fn get_compute_port(&self) -> u16 {
383 0 : self.pg_tenant_only_port.unwrap_or(self.pg_port)
384 0 : }
385 : }
386 :
387 : impl LocalEnv {
388 0 : pub fn pg_distrib_dir_raw(&self) -> PathBuf {
389 0 : self.pg_distrib_dir.clone()
390 0 : }
391 :
392 0 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
393 0 : let path = self.pg_distrib_dir.clone();
394 0 :
395 0 : #[allow(clippy::manual_range_patterns)]
396 0 : match pg_version {
397 0 : 14 | 15 | 16 | 17 => Ok(path.join(format!("v{pg_version}"))),
398 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
399 : }
400 0 : }
401 :
402 0 : pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result<PathBuf> {
403 0 : Ok(self.pg_distrib_dir(pg_version)?.join(dir_name))
404 0 : }
405 :
406 0 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
407 0 : self.pg_dir(pg_version, "bin")
408 0 : }
409 :
410 0 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
411 0 : self.pg_dir(pg_version, "lib")
412 0 : }
413 :
414 0 : pub fn object_storage_bin(&self) -> PathBuf {
415 0 : self.neon_distrib_dir.join("object_storage")
416 0 : }
417 :
418 0 : pub fn pageserver_bin(&self) -> PathBuf {
419 0 : self.neon_distrib_dir.join("pageserver")
420 0 : }
421 :
422 0 : pub fn storage_controller_bin(&self) -> PathBuf {
423 0 : // Irrespective of configuration, storage controller binary is always
424 0 : // run from the same location as neon_local. This means that for compatibility
425 0 : // tests that run old pageserver/safekeeper, they still run latest storage controller.
426 0 : let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned();
427 0 : neon_local_bin_dir.join("storage_controller")
428 0 : }
429 :
430 0 : pub fn safekeeper_bin(&self) -> PathBuf {
431 0 : self.neon_distrib_dir.join("safekeeper")
432 0 : }
433 :
434 0 : pub fn storage_broker_bin(&self) -> PathBuf {
435 0 : self.neon_distrib_dir.join("storage_broker")
436 0 : }
437 :
438 0 : pub fn endpoints_path(&self) -> PathBuf {
439 0 : self.base_data_dir.join("endpoints")
440 0 : }
441 :
442 0 : pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
443 0 : self.base_data_dir
444 0 : .join(format!("pageserver_{pageserver_id}"))
445 0 : }
446 :
447 0 : pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
448 0 : self.base_data_dir.join("safekeepers").join(data_dir_name)
449 0 : }
450 :
451 0 : pub fn object_storage_data_dir(&self) -> PathBuf {
452 0 : self.base_data_dir.join("object_storage")
453 0 : }
454 :
455 0 : pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
456 0 : if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
457 0 : Ok(conf)
458 : } else {
459 0 : let have_ids = self
460 0 : .pageservers
461 0 : .iter()
462 0 : .map(|node| format!("{}:{}", node.id, node.listen_http_addr))
463 0 : .collect::<Vec<_>>();
464 0 : let joined = have_ids.join(",");
465 0 : bail!("could not find pageserver {id}, have ids {joined}")
466 : }
467 0 : }
468 :
469 0 : pub fn ssl_ca_cert_path(&self) -> Option<PathBuf> {
470 0 : if self.generate_local_ssl_certs {
471 0 : Some(self.base_data_dir.join("rootCA.crt"))
472 : } else {
473 0 : None
474 : }
475 0 : }
476 :
477 0 : pub fn ssl_ca_key_path(&self) -> Option<PathBuf> {
478 0 : if self.generate_local_ssl_certs {
479 0 : Some(self.base_data_dir.join("rootCA.key"))
480 : } else {
481 0 : None
482 : }
483 0 : }
484 :
485 0 : pub fn generate_ssl_ca_cert(&self) -> anyhow::Result<()> {
486 0 : let cert_path = self.ssl_ca_cert_path().unwrap();
487 0 : let key_path = self.ssl_ca_key_path().unwrap();
488 0 : if !fs::exists(cert_path.as_path())? {
489 0 : generate_ssl_ca_cert(cert_path.as_path(), key_path.as_path())?;
490 0 : }
491 0 : Ok(())
492 0 : }
493 :
494 0 : pub fn generate_ssl_cert(&self, cert_path: &Path, key_path: &Path) -> anyhow::Result<()> {
495 0 : self.generate_ssl_ca_cert()?;
496 0 : generate_ssl_cert(
497 0 : cert_path,
498 0 : key_path,
499 0 : self.ssl_ca_cert_path().unwrap().as_path(),
500 0 : self.ssl_ca_key_path().unwrap().as_path(),
501 0 : )
502 0 : }
503 :
504 : /// Inspect the base data directory and extract the instance id and instance directory path
505 : /// for all storage controller instances
506 0 : pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
507 0 : let mut instances = Vec::default();
508 :
509 0 : let dir = std::fs::read_dir(self.base_data_dir.clone())?;
510 0 : for dentry in dir {
511 0 : let dentry = dentry?;
512 0 : let is_dir = dentry.metadata()?.is_dir();
513 0 : let filename = dentry.file_name().into_string().unwrap();
514 0 : let parsed_instance_id = match filename.strip_prefix("storage_controller_") {
515 0 : Some(suffix) => suffix.parse::<u8>().ok(),
516 0 : None => None,
517 : };
518 :
519 0 : let is_instance_dir = is_dir && parsed_instance_id.is_some();
520 :
521 0 : if !is_instance_dir {
522 0 : continue;
523 0 : }
524 0 :
525 0 : instances.push((
526 0 : parsed_instance_id.expect("Checked previously"),
527 0 : dentry.path(),
528 0 : ));
529 : }
530 :
531 0 : Ok(instances)
532 0 : }
533 :
534 0 : pub fn register_branch_mapping(
535 0 : &mut self,
536 0 : branch_name: String,
537 0 : tenant_id: TenantId,
538 0 : timeline_id: TimelineId,
539 0 : ) -> anyhow::Result<()> {
540 0 : let existing_values = self
541 0 : .branch_name_mappings
542 0 : .entry(branch_name.clone())
543 0 : .or_default();
544 0 :
545 0 : let existing_ids = existing_values
546 0 : .iter()
547 0 : .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
548 :
549 0 : if let Some((_, old_timeline_id)) = existing_ids {
550 0 : if old_timeline_id == &timeline_id {
551 0 : Ok(())
552 : } else {
553 0 : bail!(
554 0 : "branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"
555 0 : );
556 : }
557 : } else {
558 0 : existing_values.push((tenant_id, timeline_id));
559 0 : Ok(())
560 : }
561 0 : }
562 :
563 0 : pub fn get_branch_timeline_id(
564 0 : &self,
565 0 : branch_name: &str,
566 0 : tenant_id: TenantId,
567 0 : ) -> Option<TimelineId> {
568 0 : self.branch_name_mappings
569 0 : .get(branch_name)?
570 0 : .iter()
571 0 : .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
572 0 : .map(|&(_, timeline_id)| timeline_id)
573 0 : }
574 :
575 0 : pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
576 0 : self.branch_name_mappings
577 0 : .iter()
578 0 : .flat_map(|(name, tenant_timelines)| {
579 0 : tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
580 0 : (TenantTimelineId::new(tenant_id, timeline_id), name.clone())
581 0 : })
582 0 : })
583 0 : .collect()
584 0 : }
585 :
586 : /// Construct `Self` from on-disk state.
587 0 : pub fn load_config(repopath: &Path) -> anyhow::Result<Self> {
588 0 : if !repopath.exists() {
589 0 : bail!(
590 0 : "Neon config is not found in {}. You need to run 'neon_local init' first",
591 0 : repopath.to_str().unwrap()
592 0 : );
593 0 : }
594 :
595 : // TODO: check that it looks like a neon repository
596 :
597 : // load and parse file
598 0 : let config_file_contents = fs::read_to_string(repopath.join("config"))?;
599 0 : let on_disk_config: OnDiskConfig = toml::from_str(config_file_contents.as_str())?;
600 0 : let mut env = {
601 0 : let OnDiskConfig {
602 0 : pg_distrib_dir,
603 0 : neon_distrib_dir,
604 0 : default_tenant_id,
605 0 : private_key_path,
606 0 : public_key_path,
607 0 : broker,
608 0 : storage_controller,
609 0 : pageservers,
610 0 : safekeepers,
611 0 : control_plane_api,
612 0 : control_plane_hooks_api,
613 0 : control_plane_compute_hook_api: _,
614 0 : branch_name_mappings,
615 0 : generate_local_ssl_certs,
616 0 : object_storage,
617 0 : } = on_disk_config;
618 0 : LocalEnv {
619 0 : base_data_dir: repopath.to_owned(),
620 0 : pg_distrib_dir,
621 0 : neon_distrib_dir,
622 0 : default_tenant_id,
623 0 : private_key_path,
624 0 : public_key_path,
625 0 : broker,
626 0 : storage_controller,
627 0 : pageservers,
628 0 : safekeepers,
629 0 : control_plane_api: control_plane_api.unwrap(),
630 0 : control_plane_hooks_api,
631 0 : branch_name_mappings,
632 0 : generate_local_ssl_certs,
633 0 : object_storage,
634 0 : }
635 0 : };
636 0 :
637 0 : // The source of truth for pageserver configuration is the pageserver.toml.
638 0 : assert!(
639 0 : env.pageservers.is_empty(),
640 0 : "we ensure this during deserialization"
641 : );
642 0 : env.pageservers = {
643 0 : let iter = std::fs::read_dir(repopath).context("open dir")?;
644 0 : let mut pageservers = Vec::new();
645 0 : for res in iter {
646 0 : let dentry = res?;
647 : const PREFIX: &str = "pageserver_";
648 0 : let dentry_name = dentry
649 0 : .file_name()
650 0 : .into_string()
651 0 : .ok()
652 0 : .with_context(|| format!("non-utf8 dentry: {:?}", dentry.path()))
653 0 : .unwrap();
654 0 : if !dentry_name.starts_with(PREFIX) {
655 0 : continue;
656 0 : }
657 0 : if !dentry.file_type().context("determine file type")?.is_dir() {
658 0 : anyhow::bail!("expected a directory, got {:?}", dentry.path());
659 0 : }
660 0 : let id = dentry_name[PREFIX.len()..]
661 0 : .parse::<NodeId>()
662 0 : .with_context(|| format!("parse id from {:?}", dentry.path()))?;
663 : // TODO(christian): use pageserver_api::config::ConfigToml (PR #7656)
664 0 : #[derive(serde::Serialize, serde::Deserialize)]
665 : // (allow unknown fields, unlike PageServerConf)
666 : struct PageserverConfigTomlSubset {
667 : listen_pg_addr: String,
668 : listen_http_addr: String,
669 : listen_https_addr: Option<String>,
670 : pg_auth_type: AuthType,
671 : http_auth_type: AuthType,
672 : #[serde(default)]
673 : no_sync: bool,
674 : }
675 0 : let config_toml_path = dentry.path().join("pageserver.toml");
676 0 : let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
677 0 : &std::fs::read_to_string(&config_toml_path)
678 0 : .with_context(|| format!("read {:?}", config_toml_path))?,
679 : )
680 0 : .context("parse pageserver.toml")?;
681 0 : let identity_toml_path = dentry.path().join("identity.toml");
682 0 : #[derive(serde::Serialize, serde::Deserialize)]
683 : struct IdentityTomlSubset {
684 : id: NodeId,
685 : }
686 0 : let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
687 0 : &std::fs::read_to_string(&identity_toml_path)
688 0 : .with_context(|| format!("read {:?}", identity_toml_path))?,
689 : )
690 0 : .context("parse identity.toml")?;
691 : let PageserverConfigTomlSubset {
692 0 : listen_pg_addr,
693 0 : listen_http_addr,
694 0 : listen_https_addr,
695 0 : pg_auth_type,
696 0 : http_auth_type,
697 0 : no_sync,
698 0 : } = config_toml;
699 0 : let IdentityTomlSubset {
700 0 : id: identity_toml_id,
701 0 : } = identity_toml;
702 0 : let conf = PageServerConf {
703 : id: {
704 0 : anyhow::ensure!(
705 0 : identity_toml_id == id,
706 0 : "id mismatch: identity.toml:id={identity_toml_id} pageserver_(.*) id={id}",
707 : );
708 0 : id
709 0 : },
710 0 : listen_pg_addr,
711 0 : listen_http_addr,
712 0 : listen_https_addr,
713 0 : pg_auth_type,
714 0 : http_auth_type,
715 0 : no_sync,
716 0 : };
717 0 : pageservers.push(conf);
718 : }
719 0 : pageservers
720 0 : };
721 0 :
722 0 : Ok(env)
723 0 : }
724 :
725 0 : pub fn persist_config(&self) -> anyhow::Result<()> {
726 0 : Self::persist_config_impl(
727 0 : &self.base_data_dir,
728 0 : &OnDiskConfig {
729 0 : pg_distrib_dir: self.pg_distrib_dir.clone(),
730 0 : neon_distrib_dir: self.neon_distrib_dir.clone(),
731 0 : default_tenant_id: self.default_tenant_id,
732 0 : private_key_path: self.private_key_path.clone(),
733 0 : public_key_path: self.public_key_path.clone(),
734 0 : broker: self.broker.clone(),
735 0 : storage_controller: self.storage_controller.clone(),
736 0 : pageservers: vec![], // it's skip_serializing anyway
737 0 : safekeepers: self.safekeepers.clone(),
738 0 : control_plane_api: Some(self.control_plane_api.clone()),
739 0 : control_plane_hooks_api: self.control_plane_hooks_api.clone(),
740 0 : control_plane_compute_hook_api: None,
741 0 : branch_name_mappings: self.branch_name_mappings.clone(),
742 0 : generate_local_ssl_certs: self.generate_local_ssl_certs,
743 0 : object_storage: self.object_storage.clone(),
744 0 : },
745 0 : )
746 0 : }
747 :
748 0 : pub fn persist_config_impl(base_path: &Path, config: &OnDiskConfig) -> anyhow::Result<()> {
749 0 : let conf_content = &toml::to_string_pretty(config)?;
750 0 : let target_config_path = base_path.join("config");
751 0 : fs::write(&target_config_path, conf_content).with_context(|| {
752 0 : format!(
753 0 : "Failed to write config file into path '{}'",
754 0 : target_config_path.display()
755 0 : )
756 0 : })
757 0 : }
758 :
759 : // this function is used only for testing purposes in CLI e g generate tokens during init
760 0 : pub fn generate_auth_token<S: Serialize>(&self, claims: &S) -> anyhow::Result<String> {
761 0 : let private_key_path = self.get_private_key_path();
762 0 : let key_data = fs::read(private_key_path)?;
763 0 : encode_from_key_file(claims, &key_data)
764 0 : }
765 :
766 0 : pub fn get_private_key_path(&self) -> PathBuf {
767 0 : if self.private_key_path.is_absolute() {
768 0 : self.private_key_path.to_path_buf()
769 : } else {
770 0 : self.base_data_dir.join(&self.private_key_path)
771 : }
772 0 : }
773 :
774 : /// Materialize the [`NeonLocalInitConf`] to disk. Called during [`neon_local init`].
775 0 : pub fn init(conf: NeonLocalInitConf, force: &InitForceMode) -> anyhow::Result<()> {
776 0 : let base_path = base_path();
777 0 : assert_ne!(base_path, Path::new(""));
778 0 : let base_path = &base_path;
779 0 :
780 0 : // create base_path dir
781 0 : if base_path.exists() {
782 0 : match force {
783 : InitForceMode::MustNotExist => {
784 0 : bail!(
785 0 : "directory '{}' already exists. Perhaps already initialized?",
786 0 : base_path.display()
787 0 : );
788 : }
789 : InitForceMode::EmptyDirOk => {
790 0 : if let Some(res) = std::fs::read_dir(base_path)?.next() {
791 0 : res.context("check if directory is empty")?;
792 0 : anyhow::bail!("directory not empty: {base_path:?}");
793 0 : }
794 : }
795 : InitForceMode::RemoveAllContents => {
796 0 : println!("removing all contents of '{}'", base_path.display());
797 : // instead of directly calling `remove_dir_all`, we keep the original dir but removing
798 : // all contents inside. This helps if the developer symbol links another directory (i.e.,
799 : // S3 local SSD) to the `.neon` base directory.
800 0 : for entry in std::fs::read_dir(base_path)? {
801 0 : let entry = entry?;
802 0 : let path = entry.path();
803 0 : if path.is_dir() {
804 0 : fs::remove_dir_all(&path)?;
805 : } else {
806 0 : fs::remove_file(&path)?;
807 : }
808 : }
809 : }
810 : }
811 0 : }
812 0 : if !base_path.exists() {
813 0 : fs::create_dir(base_path)?;
814 0 : }
815 :
816 : let NeonLocalInitConf {
817 0 : pg_distrib_dir,
818 0 : neon_distrib_dir,
819 0 : default_tenant_id,
820 0 : broker,
821 0 : storage_controller,
822 0 : pageservers,
823 0 : safekeepers,
824 0 : control_plane_api,
825 0 : generate_local_ssl_certs,
826 0 : control_plane_hooks_api,
827 0 : object_storage,
828 0 : } = conf;
829 0 :
830 0 : // Find postgres binaries.
831 0 : // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
832 0 : // Note that later in the code we assume, that distrib dirs follow the same pattern
833 0 : // for all postgres versions.
834 0 : let pg_distrib_dir = pg_distrib_dir.unwrap_or_else(|| {
835 0 : if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
836 0 : postgres_bin.into()
837 : } else {
838 0 : let cwd = env::current_dir().unwrap();
839 0 : cwd.join("pg_install")
840 : }
841 0 : });
842 0 :
843 0 : // Find neon binaries.
844 0 : let neon_distrib_dir = neon_distrib_dir
845 0 : .unwrap_or_else(|| env::current_exe().unwrap().parent().unwrap().to_owned());
846 0 :
847 0 : // Generate keypair for JWT.
848 0 : //
849 0 : // The keypair is only needed if authentication is enabled in any of the
850 0 : // components. For convenience, we generate the keypair even if authentication
851 0 : // is not enabled, so that you can easily enable it after the initialization
852 0 : // step.
853 0 : generate_auth_keys(
854 0 : base_path.join("auth_private_key.pem").as_path(),
855 0 : base_path.join("auth_public_key.pem").as_path(),
856 0 : )
857 0 : .context("generate auth keys")?;
858 0 : let private_key_path = PathBuf::from("auth_private_key.pem");
859 0 : let public_key_path = PathBuf::from("auth_public_key.pem");
860 0 :
861 0 : // create the runtime type because the remaining initialization code below needs
862 0 : // a LocalEnv instance op operation
863 0 : // TODO: refactor to avoid this, LocalEnv should only be constructed from on-disk state
864 0 : let env = LocalEnv {
865 0 : base_data_dir: base_path.clone(),
866 0 : pg_distrib_dir,
867 0 : neon_distrib_dir,
868 0 : default_tenant_id: Some(default_tenant_id),
869 0 : private_key_path,
870 0 : public_key_path,
871 0 : broker,
872 0 : storage_controller: storage_controller.unwrap_or_default(),
873 0 : pageservers: pageservers.iter().map(Into::into).collect(),
874 0 : safekeepers,
875 0 : control_plane_api: control_plane_api.unwrap(),
876 0 : control_plane_hooks_api,
877 0 : branch_name_mappings: Default::default(),
878 0 : generate_local_ssl_certs,
879 0 : object_storage,
880 0 : };
881 0 :
882 0 : if generate_local_ssl_certs {
883 0 : env.generate_ssl_ca_cert()?;
884 0 : }
885 :
886 : // create endpoints dir
887 0 : fs::create_dir_all(env.endpoints_path())?;
888 :
889 : // create safekeeper dirs
890 0 : for safekeeper in &env.safekeepers {
891 0 : fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;
892 0 : SafekeeperNode::from_env(&env, safekeeper)
893 0 : .initialize()
894 0 : .context("safekeeper init failed")?;
895 : }
896 :
897 : // initialize pageserver state
898 0 : for (i, ps) in pageservers.into_iter().enumerate() {
899 0 : let runtime_ps = &env.pageservers[i];
900 0 : assert_eq!(&PageServerConf::from(&ps), runtime_ps);
901 0 : fs::create_dir(env.pageserver_data_dir(ps.id))?;
902 0 : PageServerNode::from_env(&env, runtime_ps)
903 0 : .initialize(ps)
904 0 : .context("pageserver init failed")?;
905 : }
906 :
907 0 : ObjectStorage::from_env(&env)
908 0 : .init()
909 0 : .context("object storage init failed")?;
910 :
911 : // setup remote remote location for default LocalFs remote storage
912 0 : std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
913 0 : std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
914 :
915 0 : env.persist_config()
916 0 : }
917 : }
918 :
919 0 : pub fn base_path() -> PathBuf {
920 0 : let path = match std::env::var_os("NEON_REPO_DIR") {
921 0 : Some(val) => {
922 0 : let path = PathBuf::from(val);
923 0 : if !path.is_absolute() {
924 : // repeat the env var in the error because our default is always absolute
925 0 : panic!("NEON_REPO_DIR must be an absolute path, got {path:?}");
926 0 : }
927 0 : path
928 : }
929 : None => {
930 0 : let pwd = std::env::current_dir()
931 0 : // technically this can fail but it's quite unlikeley
932 0 : .expect("determine current directory");
933 0 : let pwd_abs = pwd.canonicalize().expect("canonicalize current directory");
934 0 : pwd_abs.join(".neon")
935 : }
936 : };
937 0 : assert!(path.is_absolute());
938 0 : path
939 0 : }
940 :
941 : /// Generate a public/private key pair for JWT authentication
942 0 : fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
943 : // Generate the key pair
944 : //
945 : // openssl genpkey -algorithm ed25519 -out auth_private_key.pem
946 0 : let keygen_output = Command::new("openssl")
947 0 : .arg("genpkey")
948 0 : .args(["-algorithm", "ed25519"])
949 0 : .args(["-out", private_key_path.to_str().unwrap()])
950 0 : .stdout(Stdio::null())
951 0 : .output()
952 0 : .context("failed to generate auth private key")?;
953 0 : if !keygen_output.status.success() {
954 0 : bail!(
955 0 : "openssl failed: '{}'",
956 0 : String::from_utf8_lossy(&keygen_output.stderr)
957 0 : );
958 0 : }
959 : // Extract the public key from the private key file
960 : //
961 : // openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
962 0 : let keygen_output = Command::new("openssl")
963 0 : .arg("pkey")
964 0 : .args(["-in", private_key_path.to_str().unwrap()])
965 0 : .arg("-pubout")
966 0 : .args(["-out", public_key_path.to_str().unwrap()])
967 0 : .output()
968 0 : .context("failed to extract public key from private key")?;
969 0 : if !keygen_output.status.success() {
970 0 : bail!(
971 0 : "openssl failed: '{}'",
972 0 : String::from_utf8_lossy(&keygen_output.stderr)
973 0 : );
974 0 : }
975 0 : Ok(())
976 0 : }
977 :
978 0 : fn generate_ssl_ca_cert(cert_path: &Path, key_path: &Path) -> anyhow::Result<()> {
979 : // openssl req -x509 -newkey rsa:2048 -nodes -subj "/CN=Neon Local CA" -days 36500 \
980 : // -out rootCA.crt -keyout rootCA.key
981 0 : let keygen_output = Command::new("openssl")
982 0 : .args([
983 0 : "req", "-x509", "-newkey", "rsa:2048", "-nodes", "-days", "36500",
984 0 : ])
985 0 : .args(["-subj", "/CN=Neon Local CA"])
986 0 : .args(["-out", cert_path.to_str().unwrap()])
987 0 : .args(["-keyout", key_path.to_str().unwrap()])
988 0 : .output()
989 0 : .context("failed to generate CA certificate")?;
990 0 : if !keygen_output.status.success() {
991 0 : bail!(
992 0 : "openssl failed: '{}'",
993 0 : String::from_utf8_lossy(&keygen_output.stderr)
994 0 : );
995 0 : }
996 0 : Ok(())
997 0 : }
998 :
999 0 : fn generate_ssl_cert(
1000 0 : cert_path: &Path,
1001 0 : key_path: &Path,
1002 0 : ca_cert_path: &Path,
1003 0 : ca_key_path: &Path,
1004 0 : ) -> anyhow::Result<()> {
1005 0 : // Generate Certificate Signing Request (CSR).
1006 0 : let mut csr_path = cert_path.to_path_buf();
1007 0 : csr_path.set_extension(".csr");
1008 :
1009 : // openssl req -new -nodes -newkey rsa:2048 -keyout server.key -out server.csr \
1010 : // -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
1011 0 : let keygen_output = Command::new("openssl")
1012 0 : .args(["req", "-new", "-nodes"])
1013 0 : .args(["-newkey", "rsa:2048"])
1014 0 : .args(["-subj", "/CN=localhost"])
1015 0 : .args(["-addext", "subjectAltName=DNS:localhost,IP:127.0.0.1"])
1016 0 : .args(["-keyout", key_path.to_str().unwrap()])
1017 0 : .args(["-out", csr_path.to_str().unwrap()])
1018 0 : .output()
1019 0 : .context("failed to generate CSR")?;
1020 0 : if !keygen_output.status.success() {
1021 0 : bail!(
1022 0 : "openssl failed: '{}'",
1023 0 : String::from_utf8_lossy(&keygen_output.stderr)
1024 0 : );
1025 0 : }
1026 :
1027 : // Sign CSR with CA key.
1028 : //
1029 : // openssl x509 -req -in server.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial \
1030 : // -out server.crt -days 36500 -copy_extensions copyall
1031 0 : let keygen_output = Command::new("openssl")
1032 0 : .args(["x509", "-req"])
1033 0 : .args(["-in", csr_path.to_str().unwrap()])
1034 0 : .args(["-CA", ca_cert_path.to_str().unwrap()])
1035 0 : .args(["-CAkey", ca_key_path.to_str().unwrap()])
1036 0 : .arg("-CAcreateserial")
1037 0 : .args(["-out", cert_path.to_str().unwrap()])
1038 0 : .args(["-days", "36500"])
1039 0 : .args(["-copy_extensions", "copyall"])
1040 0 : .output()
1041 0 : .context("failed to sign CSR")?;
1042 0 : if !keygen_output.status.success() {
1043 0 : bail!(
1044 0 : "openssl failed: '{}'",
1045 0 : String::from_utf8_lossy(&keygen_output.stderr)
1046 0 : );
1047 0 : }
1048 0 :
1049 0 : // Remove CSR file as it's not needed anymore.
1050 0 : fs::remove_file(csr_path)?;
1051 :
1052 0 : Ok(())
1053 0 : }
|