Line data Source code
1 : //! This module is responsible for locating and loading paths in a local setup.
2 : //!
3 : //! Now it also provides init method which acts like a stub for proper installation
4 : //! script which will use local paths.
5 :
6 : use std::collections::HashMap;
7 : use std::net::{IpAddr, Ipv4Addr, SocketAddr};
8 : use std::path::{Path, PathBuf};
9 : use std::process::{Command, Stdio};
10 : use std::time::Duration;
11 : use std::{env, fs};
12 :
13 : use anyhow::{Context, bail};
14 : use clap::ValueEnum;
15 : use postgres_backend::AuthType;
16 : use reqwest::Url;
17 : use serde::{Deserialize, Serialize};
18 : use utils::auth::{Claims, encode_from_key_file};
19 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
20 :
21 : use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
22 : use crate::safekeeper::SafekeeperNode;
23 :
24 : pub const DEFAULT_PG_VERSION: u32 = 16;
25 :
26 : //
27 : // This data structures represents neon_local CLI config
28 : //
29 : // It is deserialized from the .neon/config file, or the config file passed
30 : // to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
31 : // an example.
32 : //
33 : #[derive(PartialEq, Eq, Clone, Debug)]
34 : pub struct LocalEnv {
35 : // Base directory for all the nodes (the pageserver, safekeepers and
36 : // compute endpoints).
37 : //
38 : // This is not stored in the config file. Rather, this is the path where the
39 : // config file itself is. It is read from the NEON_REPO_DIR env variable which
40 : // must be an absolute path. If the env var is not set, $PWD/.neon is used.
41 : pub base_data_dir: PathBuf,
42 :
43 : // Path to postgres distribution. It's expected that "bin", "include",
44 : // "lib", "share" from postgres distribution are there. If at some point
45 : // in time we will be able to run against vanilla postgres we may split that
46 : // to four separate paths and match OS-specific installation layout.
47 : pub pg_distrib_dir: PathBuf,
48 :
49 : // Path to pageserver binary.
50 : pub neon_distrib_dir: PathBuf,
51 :
52 : // Default tenant ID to use with the 'neon_local' command line utility, when
53 : // --tenant_id is not explicitly specified.
54 : pub default_tenant_id: Option<TenantId>,
55 :
56 : // used to issue tokens during e.g pg start
57 : pub private_key_path: PathBuf,
58 :
59 : pub broker: NeonBroker,
60 :
61 : // Configuration for the storage controller (1 per neon_local environment)
62 : pub storage_controller: NeonStorageControllerConf,
63 :
64 : /// This Vec must always contain at least one pageserver
65 : /// Populdated by [`Self::load_config`] from the individual `pageserver.toml`s.
66 : /// NB: not used anymore except for informing users that they need to change their `.neon/config`.
67 : pub pageservers: Vec<PageServerConf>,
68 :
69 : pub safekeepers: Vec<SafekeeperConf>,
70 :
71 : // Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
72 : // be propagated into each pageserver's configuration.
73 : pub control_plane_api: Url,
74 :
75 : // Control plane upcall API for storage controller. If set, this will be propagated into the
76 : // storage controller's configuration.
77 : pub control_plane_compute_hook_api: Option<Url>,
78 :
79 : /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
80 : // A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
81 : // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
82 : // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
83 : pub branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
84 : }
85 :
86 : /// On-disk state stored in `.neon/config`.
87 0 : #[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)]
88 : #[serde(default, deny_unknown_fields)]
89 : pub struct OnDiskConfig {
90 : pub pg_distrib_dir: PathBuf,
91 : pub neon_distrib_dir: PathBuf,
92 : pub default_tenant_id: Option<TenantId>,
93 : pub private_key_path: PathBuf,
94 : pub broker: NeonBroker,
95 : pub storage_controller: NeonStorageControllerConf,
96 : #[serde(
97 : skip_serializing,
98 : deserialize_with = "fail_if_pageservers_field_specified"
99 : )]
100 : pub pageservers: Vec<PageServerConf>,
101 : pub safekeepers: Vec<SafekeeperConf>,
102 : pub control_plane_api: Option<Url>,
103 : pub control_plane_compute_hook_api: Option<Url>,
104 : branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
105 : }
106 :
107 0 : fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
108 0 : where
109 0 : D: serde::Deserializer<'de>,
110 0 : {
111 0 : Err(serde::de::Error::custom(
112 0 : "The 'pageservers' field is no longer used; pageserver.toml is now authoritative; \
113 0 : Please remove the `pageservers` from your .neon/config.",
114 0 : ))
115 0 : }
116 :
117 : /// The description of the neon_local env to be initialized by `neon_local init --config`.
118 0 : #[derive(Clone, Debug, Deserialize)]
119 : #[serde(deny_unknown_fields)]
120 : pub struct NeonLocalInitConf {
121 : // TODO: do we need this? Seems unused
122 : pub pg_distrib_dir: Option<PathBuf>,
123 : // TODO: do we need this? Seems unused
124 : pub neon_distrib_dir: Option<PathBuf>,
125 : pub default_tenant_id: TenantId,
126 : pub broker: NeonBroker,
127 : pub storage_controller: Option<NeonStorageControllerConf>,
128 : pub pageservers: Vec<NeonLocalInitPageserverConf>,
129 : pub safekeepers: Vec<SafekeeperConf>,
130 : pub control_plane_api: Option<Url>,
131 : pub control_plane_compute_hook_api: Option<Option<Url>>,
132 : }
133 :
134 : /// Broker config for cluster internal communication.
135 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
136 : #[serde(default)]
137 : pub struct NeonBroker {
138 : /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
139 : pub listen_addr: SocketAddr,
140 : }
141 :
142 : /// Broker config for cluster internal communication.
143 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
144 : #[serde(default)]
145 : pub struct NeonStorageControllerConf {
146 : /// Heartbeat timeout before marking a node offline
147 : #[serde(with = "humantime_serde")]
148 : pub max_offline: Duration,
149 :
150 : #[serde(with = "humantime_serde")]
151 : pub max_warming_up: Duration,
152 :
153 : pub start_as_candidate: bool,
154 :
155 : /// Database url used when running multiple storage controller instances
156 : pub database_url: Option<SocketAddr>,
157 :
158 : /// Threshold for auto-splitting a tenant into shards
159 : pub split_threshold: Option<u64>,
160 :
161 : pub max_secondary_lag_bytes: Option<u64>,
162 :
163 : #[serde(with = "humantime_serde")]
164 : pub heartbeat_interval: Duration,
165 :
166 : #[serde(with = "humantime_serde")]
167 : pub long_reconcile_threshold: Option<Duration>,
168 :
169 : pub load_safekeepers: bool,
170 : }
171 :
172 : impl NeonStorageControllerConf {
173 : // Use a shorter pageserver unavailability interval than the default to speed up tests.
174 : const DEFAULT_MAX_OFFLINE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
175 :
176 : const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
177 :
178 : // Very tight heartbeat interval to speed up tests
179 : const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
180 : }
181 :
182 : impl Default for NeonStorageControllerConf {
183 0 : fn default() -> Self {
184 0 : Self {
185 0 : max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL,
186 0 : max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL,
187 0 : start_as_candidate: false,
188 0 : database_url: None,
189 0 : split_threshold: None,
190 0 : max_secondary_lag_bytes: None,
191 0 : heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
192 0 : long_reconcile_threshold: None,
193 0 : load_safekeepers: true,
194 0 : }
195 0 : }
196 : }
197 :
198 : // Dummy Default impl to satisfy Deserialize derive.
199 : impl Default for NeonBroker {
200 0 : fn default() -> Self {
201 0 : NeonBroker {
202 0 : listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
203 0 : }
204 0 : }
205 : }
206 :
207 : impl NeonBroker {
208 0 : pub fn client_url(&self) -> Url {
209 0 : Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
210 0 : }
211 : }
212 :
213 : // neon_local needs to know this subset of pageserver configuration.
214 : // For legacy reasons, this information is duplicated from `pageserver.toml` into `.neon/config`.
215 : // It can get stale if `pageserver.toml` is changed.
216 : // TODO(christian): don't store this at all in `.neon/config`, always load it from `pageserver.toml`
217 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
218 : #[serde(default, deny_unknown_fields)]
219 : pub struct PageServerConf {
220 : pub id: NodeId,
221 : pub listen_pg_addr: String,
222 : pub listen_http_addr: String,
223 : pub pg_auth_type: AuthType,
224 : pub http_auth_type: AuthType,
225 : pub no_sync: bool,
226 : }
227 :
228 : impl Default for PageServerConf {
229 0 : fn default() -> Self {
230 0 : Self {
231 0 : id: NodeId(0),
232 0 : listen_pg_addr: String::new(),
233 0 : listen_http_addr: String::new(),
234 0 : pg_auth_type: AuthType::Trust,
235 0 : http_auth_type: AuthType::Trust,
236 0 : no_sync: false,
237 0 : }
238 0 : }
239 : }
240 :
241 : /// The toml that can be passed to `neon_local init --config`.
242 : /// This is a subset of the `pageserver.toml` configuration.
243 : // TODO(christian): use pageserver_api::config::ConfigToml (PR #7656)
244 0 : #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
245 : pub struct NeonLocalInitPageserverConf {
246 : pub id: NodeId,
247 : pub listen_pg_addr: String,
248 : pub listen_http_addr: String,
249 : pub pg_auth_type: AuthType,
250 : pub http_auth_type: AuthType,
251 : #[serde(default, skip_serializing_if = "std::ops::Not::not")]
252 : pub no_sync: bool,
253 : #[serde(flatten)]
254 : pub other: HashMap<String, toml::Value>,
255 : }
256 :
257 : impl From<&NeonLocalInitPageserverConf> for PageServerConf {
258 0 : fn from(conf: &NeonLocalInitPageserverConf) -> Self {
259 0 : let NeonLocalInitPageserverConf {
260 0 : id,
261 0 : listen_pg_addr,
262 0 : listen_http_addr,
263 0 : pg_auth_type,
264 0 : http_auth_type,
265 0 : no_sync,
266 0 : other: _,
267 0 : } = conf;
268 0 : Self {
269 0 : id: *id,
270 0 : listen_pg_addr: listen_pg_addr.clone(),
271 0 : listen_http_addr: listen_http_addr.clone(),
272 0 : pg_auth_type: *pg_auth_type,
273 0 : http_auth_type: *http_auth_type,
274 0 : no_sync: *no_sync,
275 0 : }
276 0 : }
277 : }
278 :
279 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
280 : #[serde(default)]
281 : pub struct SafekeeperConf {
282 : pub id: NodeId,
283 : pub pg_port: u16,
284 : pub pg_tenant_only_port: Option<u16>,
285 : pub http_port: u16,
286 : pub sync: bool,
287 : pub remote_storage: Option<String>,
288 : pub backup_threads: Option<u32>,
289 : pub auth_enabled: bool,
290 : pub listen_addr: Option<String>,
291 : }
292 :
293 : impl Default for SafekeeperConf {
294 0 : fn default() -> Self {
295 0 : Self {
296 0 : id: NodeId(0),
297 0 : pg_port: 0,
298 0 : pg_tenant_only_port: None,
299 0 : http_port: 0,
300 0 : sync: true,
301 0 : remote_storage: None,
302 0 : backup_threads: None,
303 0 : auth_enabled: false,
304 0 : listen_addr: None,
305 0 : }
306 0 : }
307 : }
308 :
309 : #[derive(Clone, Copy)]
310 : pub enum InitForceMode {
311 : MustNotExist,
312 : EmptyDirOk,
313 : RemoveAllContents,
314 : }
315 :
316 : impl ValueEnum for InitForceMode {
317 0 : fn value_variants<'a>() -> &'a [Self] {
318 0 : &[
319 0 : Self::MustNotExist,
320 0 : Self::EmptyDirOk,
321 0 : Self::RemoveAllContents,
322 0 : ]
323 0 : }
324 :
325 0 : fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
326 0 : Some(clap::builder::PossibleValue::new(match self {
327 0 : InitForceMode::MustNotExist => "must-not-exist",
328 0 : InitForceMode::EmptyDirOk => "empty-dir-ok",
329 0 : InitForceMode::RemoveAllContents => "remove-all-contents",
330 : }))
331 0 : }
332 : }
333 :
334 : impl SafekeeperConf {
335 : /// Compute is served by port on which only tenant scoped tokens allowed, if
336 : /// it is configured.
337 0 : pub fn get_compute_port(&self) -> u16 {
338 0 : self.pg_tenant_only_port.unwrap_or(self.pg_port)
339 0 : }
340 : }
341 :
342 : impl LocalEnv {
343 0 : pub fn pg_distrib_dir_raw(&self) -> PathBuf {
344 0 : self.pg_distrib_dir.clone()
345 0 : }
346 :
347 0 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
348 0 : let path = self.pg_distrib_dir.clone();
349 0 :
350 0 : #[allow(clippy::manual_range_patterns)]
351 0 : match pg_version {
352 0 : 14 | 15 | 16 | 17 => Ok(path.join(format!("v{pg_version}"))),
353 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
354 : }
355 0 : }
356 :
357 0 : pub fn pg_dir(&self, pg_version: u32, dir_name: &str) -> anyhow::Result<PathBuf> {
358 0 : Ok(self.pg_distrib_dir(pg_version)?.join(dir_name))
359 0 : }
360 :
361 0 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
362 0 : self.pg_dir(pg_version, "bin")
363 0 : }
364 :
365 0 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
366 0 : self.pg_dir(pg_version, "lib")
367 0 : }
368 :
369 0 : pub fn pageserver_bin(&self) -> PathBuf {
370 0 : self.neon_distrib_dir.join("pageserver")
371 0 : }
372 :
373 0 : pub fn storage_controller_bin(&self) -> PathBuf {
374 0 : // Irrespective of configuration, storage controller binary is always
375 0 : // run from the same location as neon_local. This means that for compatibility
376 0 : // tests that run old pageserver/safekeeper, they still run latest storage controller.
377 0 : let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned();
378 0 : neon_local_bin_dir.join("storage_controller")
379 0 : }
380 :
381 0 : pub fn safekeeper_bin(&self) -> PathBuf {
382 0 : self.neon_distrib_dir.join("safekeeper")
383 0 : }
384 :
385 0 : pub fn storage_broker_bin(&self) -> PathBuf {
386 0 : self.neon_distrib_dir.join("storage_broker")
387 0 : }
388 :
389 0 : pub fn endpoints_path(&self) -> PathBuf {
390 0 : self.base_data_dir.join("endpoints")
391 0 : }
392 :
393 0 : pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
394 0 : self.base_data_dir
395 0 : .join(format!("pageserver_{pageserver_id}"))
396 0 : }
397 :
398 0 : pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
399 0 : self.base_data_dir.join("safekeepers").join(data_dir_name)
400 0 : }
401 :
402 0 : pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
403 0 : if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
404 0 : Ok(conf)
405 : } else {
406 0 : let have_ids = self
407 0 : .pageservers
408 0 : .iter()
409 0 : .map(|node| format!("{}:{}", node.id, node.listen_http_addr))
410 0 : .collect::<Vec<_>>();
411 0 : let joined = have_ids.join(",");
412 0 : bail!("could not find pageserver {id}, have ids {joined}")
413 : }
414 0 : }
415 :
416 : /// Inspect the base data directory and extract the instance id and instance directory path
417 : /// for all storage controller instances
418 0 : pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
419 0 : let mut instances = Vec::default();
420 :
421 0 : let dir = std::fs::read_dir(self.base_data_dir.clone())?;
422 0 : for dentry in dir {
423 0 : let dentry = dentry?;
424 0 : let is_dir = dentry.metadata()?.is_dir();
425 0 : let filename = dentry.file_name().into_string().unwrap();
426 0 : let parsed_instance_id = match filename.strip_prefix("storage_controller_") {
427 0 : Some(suffix) => suffix.parse::<u8>().ok(),
428 0 : None => None,
429 : };
430 :
431 0 : let is_instance_dir = is_dir && parsed_instance_id.is_some();
432 :
433 0 : if !is_instance_dir {
434 0 : continue;
435 0 : }
436 0 :
437 0 : instances.push((
438 0 : parsed_instance_id.expect("Checked previously"),
439 0 : dentry.path(),
440 0 : ));
441 : }
442 :
443 0 : Ok(instances)
444 0 : }
445 :
446 0 : pub fn register_branch_mapping(
447 0 : &mut self,
448 0 : branch_name: String,
449 0 : tenant_id: TenantId,
450 0 : timeline_id: TimelineId,
451 0 : ) -> anyhow::Result<()> {
452 0 : let existing_values = self
453 0 : .branch_name_mappings
454 0 : .entry(branch_name.clone())
455 0 : .or_default();
456 0 :
457 0 : let existing_ids = existing_values
458 0 : .iter()
459 0 : .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
460 :
461 0 : if let Some((_, old_timeline_id)) = existing_ids {
462 0 : if old_timeline_id == &timeline_id {
463 0 : Ok(())
464 : } else {
465 0 : bail!(
466 0 : "branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"
467 0 : );
468 : }
469 : } else {
470 0 : existing_values.push((tenant_id, timeline_id));
471 0 : Ok(())
472 : }
473 0 : }
474 :
475 0 : pub fn get_branch_timeline_id(
476 0 : &self,
477 0 : branch_name: &str,
478 0 : tenant_id: TenantId,
479 0 : ) -> Option<TimelineId> {
480 0 : self.branch_name_mappings
481 0 : .get(branch_name)?
482 0 : .iter()
483 0 : .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
484 0 : .map(|&(_, timeline_id)| timeline_id)
485 0 : }
486 :
487 0 : pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
488 0 : self.branch_name_mappings
489 0 : .iter()
490 0 : .flat_map(|(name, tenant_timelines)| {
491 0 : tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
492 0 : (TenantTimelineId::new(tenant_id, timeline_id), name.clone())
493 0 : })
494 0 : })
495 0 : .collect()
496 0 : }
497 :
498 : /// Construct `Self` from on-disk state.
499 0 : pub fn load_config(repopath: &Path) -> anyhow::Result<Self> {
500 0 : if !repopath.exists() {
501 0 : bail!(
502 0 : "Neon config is not found in {}. You need to run 'neon_local init' first",
503 0 : repopath.to_str().unwrap()
504 0 : );
505 0 : }
506 :
507 : // TODO: check that it looks like a neon repository
508 :
509 : // load and parse file
510 0 : let config_file_contents = fs::read_to_string(repopath.join("config"))?;
511 0 : let on_disk_config: OnDiskConfig = toml::from_str(config_file_contents.as_str())?;
512 0 : let mut env = {
513 0 : let OnDiskConfig {
514 0 : pg_distrib_dir,
515 0 : neon_distrib_dir,
516 0 : default_tenant_id,
517 0 : private_key_path,
518 0 : broker,
519 0 : storage_controller,
520 0 : pageservers,
521 0 : safekeepers,
522 0 : control_plane_api,
523 0 : control_plane_compute_hook_api,
524 0 : branch_name_mappings,
525 0 : } = on_disk_config;
526 0 : LocalEnv {
527 0 : base_data_dir: repopath.to_owned(),
528 0 : pg_distrib_dir,
529 0 : neon_distrib_dir,
530 0 : default_tenant_id,
531 0 : private_key_path,
532 0 : broker,
533 0 : storage_controller,
534 0 : pageservers,
535 0 : safekeepers,
536 0 : control_plane_api: control_plane_api.unwrap(),
537 0 : control_plane_compute_hook_api,
538 0 : branch_name_mappings,
539 0 : }
540 0 : };
541 0 :
542 0 : // The source of truth for pageserver configuration is the pageserver.toml.
543 0 : assert!(
544 0 : env.pageservers.is_empty(),
545 0 : "we ensure this during deserialization"
546 : );
547 0 : env.pageservers = {
548 0 : let iter = std::fs::read_dir(repopath).context("open dir")?;
549 0 : let mut pageservers = Vec::new();
550 0 : for res in iter {
551 0 : let dentry = res?;
552 : const PREFIX: &str = "pageserver_";
553 0 : let dentry_name = dentry
554 0 : .file_name()
555 0 : .into_string()
556 0 : .ok()
557 0 : .with_context(|| format!("non-utf8 dentry: {:?}", dentry.path()))
558 0 : .unwrap();
559 0 : if !dentry_name.starts_with(PREFIX) {
560 0 : continue;
561 0 : }
562 0 : if !dentry.file_type().context("determine file type")?.is_dir() {
563 0 : anyhow::bail!("expected a directory, got {:?}", dentry.path());
564 0 : }
565 0 : let id = dentry_name[PREFIX.len()..]
566 0 : .parse::<NodeId>()
567 0 : .with_context(|| format!("parse id from {:?}", dentry.path()))?;
568 : // TODO(christian): use pageserver_api::config::ConfigToml (PR #7656)
569 0 : #[derive(serde::Serialize, serde::Deserialize)]
570 : // (allow unknown fields, unlike PageServerConf)
571 : struct PageserverConfigTomlSubset {
572 : listen_pg_addr: String,
573 : listen_http_addr: String,
574 : pg_auth_type: AuthType,
575 : http_auth_type: AuthType,
576 : #[serde(default)]
577 : no_sync: bool,
578 : }
579 0 : let config_toml_path = dentry.path().join("pageserver.toml");
580 0 : let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
581 0 : &std::fs::read_to_string(&config_toml_path)
582 0 : .with_context(|| format!("read {:?}", config_toml_path))?,
583 : )
584 0 : .context("parse pageserver.toml")?;
585 0 : let identity_toml_path = dentry.path().join("identity.toml");
586 0 : #[derive(serde::Serialize, serde::Deserialize)]
587 : struct IdentityTomlSubset {
588 : id: NodeId,
589 : }
590 0 : let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
591 0 : &std::fs::read_to_string(&identity_toml_path)
592 0 : .with_context(|| format!("read {:?}", identity_toml_path))?,
593 : )
594 0 : .context("parse identity.toml")?;
595 : let PageserverConfigTomlSubset {
596 0 : listen_pg_addr,
597 0 : listen_http_addr,
598 0 : pg_auth_type,
599 0 : http_auth_type,
600 0 : no_sync,
601 0 : } = config_toml;
602 0 : let IdentityTomlSubset {
603 0 : id: identity_toml_id,
604 0 : } = identity_toml;
605 0 : let conf = PageServerConf {
606 : id: {
607 0 : anyhow::ensure!(
608 0 : identity_toml_id == id,
609 0 : "id mismatch: identity.toml:id={identity_toml_id} pageserver_(.*) id={id}",
610 : );
611 0 : id
612 0 : },
613 0 : listen_pg_addr,
614 0 : listen_http_addr,
615 0 : pg_auth_type,
616 0 : http_auth_type,
617 0 : no_sync,
618 0 : };
619 0 : pageservers.push(conf);
620 : }
621 0 : pageservers
622 0 : };
623 0 :
624 0 : Ok(env)
625 0 : }
626 :
627 0 : pub fn persist_config(&self) -> anyhow::Result<()> {
628 0 : Self::persist_config_impl(
629 0 : &self.base_data_dir,
630 0 : &OnDiskConfig {
631 0 : pg_distrib_dir: self.pg_distrib_dir.clone(),
632 0 : neon_distrib_dir: self.neon_distrib_dir.clone(),
633 0 : default_tenant_id: self.default_tenant_id,
634 0 : private_key_path: self.private_key_path.clone(),
635 0 : broker: self.broker.clone(),
636 0 : storage_controller: self.storage_controller.clone(),
637 0 : pageservers: vec![], // it's skip_serializing anyway
638 0 : safekeepers: self.safekeepers.clone(),
639 0 : control_plane_api: Some(self.control_plane_api.clone()),
640 0 : control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
641 0 : branch_name_mappings: self.branch_name_mappings.clone(),
642 0 : },
643 0 : )
644 0 : }
645 :
646 0 : pub fn persist_config_impl(base_path: &Path, config: &OnDiskConfig) -> anyhow::Result<()> {
647 0 : let conf_content = &toml::to_string_pretty(config)?;
648 0 : let target_config_path = base_path.join("config");
649 0 : fs::write(&target_config_path, conf_content).with_context(|| {
650 0 : format!(
651 0 : "Failed to write config file into path '{}'",
652 0 : target_config_path.display()
653 0 : )
654 0 : })
655 0 : }
656 :
657 : // this function is used only for testing purposes in CLI e g generate tokens during init
658 0 : pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
659 0 : let private_key_path = self.get_private_key_path();
660 0 : let key_data = fs::read(private_key_path)?;
661 0 : encode_from_key_file(claims, &key_data)
662 0 : }
663 :
664 0 : pub fn get_private_key_path(&self) -> PathBuf {
665 0 : if self.private_key_path.is_absolute() {
666 0 : self.private_key_path.to_path_buf()
667 : } else {
668 0 : self.base_data_dir.join(&self.private_key_path)
669 : }
670 0 : }
671 :
672 : /// Materialize the [`NeonLocalInitConf`] to disk. Called during [`neon_local init`].
673 0 : pub fn init(conf: NeonLocalInitConf, force: &InitForceMode) -> anyhow::Result<()> {
674 0 : let base_path = base_path();
675 0 : assert_ne!(base_path, Path::new(""));
676 0 : let base_path = &base_path;
677 0 :
678 0 : // create base_path dir
679 0 : if base_path.exists() {
680 0 : match force {
681 : InitForceMode::MustNotExist => {
682 0 : bail!(
683 0 : "directory '{}' already exists. Perhaps already initialized?",
684 0 : base_path.display()
685 0 : );
686 : }
687 : InitForceMode::EmptyDirOk => {
688 0 : if let Some(res) = std::fs::read_dir(base_path)?.next() {
689 0 : res.context("check if directory is empty")?;
690 0 : anyhow::bail!("directory not empty: {base_path:?}");
691 0 : }
692 : }
693 : InitForceMode::RemoveAllContents => {
694 0 : println!("removing all contents of '{}'", base_path.display());
695 : // instead of directly calling `remove_dir_all`, we keep the original dir but removing
696 : // all contents inside. This helps if the developer symbol links another directory (i.e.,
697 : // S3 local SSD) to the `.neon` base directory.
698 0 : for entry in std::fs::read_dir(base_path)? {
699 0 : let entry = entry?;
700 0 : let path = entry.path();
701 0 : if path.is_dir() {
702 0 : fs::remove_dir_all(&path)?;
703 : } else {
704 0 : fs::remove_file(&path)?;
705 : }
706 : }
707 : }
708 : }
709 0 : }
710 0 : if !base_path.exists() {
711 0 : fs::create_dir(base_path)?;
712 0 : }
713 :
714 : let NeonLocalInitConf {
715 0 : pg_distrib_dir,
716 0 : neon_distrib_dir,
717 0 : default_tenant_id,
718 0 : broker,
719 0 : storage_controller,
720 0 : pageservers,
721 0 : safekeepers,
722 0 : control_plane_api,
723 0 : control_plane_compute_hook_api,
724 0 : } = conf;
725 0 :
726 0 : // Find postgres binaries.
727 0 : // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
728 0 : // Note that later in the code we assume, that distrib dirs follow the same pattern
729 0 : // for all postgres versions.
730 0 : let pg_distrib_dir = pg_distrib_dir.unwrap_or_else(|| {
731 0 : if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
732 0 : postgres_bin.into()
733 : } else {
734 0 : let cwd = env::current_dir().unwrap();
735 0 : cwd.join("pg_install")
736 : }
737 0 : });
738 0 :
739 0 : // Find neon binaries.
740 0 : let neon_distrib_dir = neon_distrib_dir
741 0 : .unwrap_or_else(|| env::current_exe().unwrap().parent().unwrap().to_owned());
742 0 :
743 0 : // Generate keypair for JWT.
744 0 : //
745 0 : // The keypair is only needed if authentication is enabled in any of the
746 0 : // components. For convenience, we generate the keypair even if authentication
747 0 : // is not enabled, so that you can easily enable it after the initialization
748 0 : // step.
749 0 : generate_auth_keys(
750 0 : base_path.join("auth_private_key.pem").as_path(),
751 0 : base_path.join("auth_public_key.pem").as_path(),
752 0 : )
753 0 : .context("generate auth keys")?;
754 0 : let private_key_path = PathBuf::from("auth_private_key.pem");
755 0 :
756 0 : // create the runtime type because the remaining initialization code below needs
757 0 : // a LocalEnv instance op operation
758 0 : // TODO: refactor to avoid this, LocalEnv should only be constructed from on-disk state
759 0 : let env = LocalEnv {
760 0 : base_data_dir: base_path.clone(),
761 0 : pg_distrib_dir,
762 0 : neon_distrib_dir,
763 0 : default_tenant_id: Some(default_tenant_id),
764 0 : private_key_path,
765 0 : broker,
766 0 : storage_controller: storage_controller.unwrap_or_default(),
767 0 : pageservers: pageservers.iter().map(Into::into).collect(),
768 0 : safekeepers,
769 0 : control_plane_api: control_plane_api.unwrap(),
770 0 : control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
771 0 : branch_name_mappings: Default::default(),
772 0 : };
773 0 :
774 0 : // create endpoints dir
775 0 : fs::create_dir_all(env.endpoints_path())?;
776 :
777 : // create safekeeper dirs
778 0 : for safekeeper in &env.safekeepers {
779 0 : fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;
780 : }
781 :
782 : // initialize pageserver state
783 0 : for (i, ps) in pageservers.into_iter().enumerate() {
784 0 : let runtime_ps = &env.pageservers[i];
785 0 : assert_eq!(&PageServerConf::from(&ps), runtime_ps);
786 0 : fs::create_dir(env.pageserver_data_dir(ps.id))?;
787 0 : PageServerNode::from_env(&env, runtime_ps)
788 0 : .initialize(ps)
789 0 : .context("pageserver init failed")?;
790 : }
791 :
792 : // setup remote remote location for default LocalFs remote storage
793 0 : std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
794 :
795 0 : env.persist_config()
796 0 : }
797 : }
798 :
799 0 : pub fn base_path() -> PathBuf {
800 0 : let path = match std::env::var_os("NEON_REPO_DIR") {
801 0 : Some(val) => {
802 0 : let path = PathBuf::from(val);
803 0 : if !path.is_absolute() {
804 : // repeat the env var in the error because our default is always absolute
805 0 : panic!("NEON_REPO_DIR must be an absolute path, got {path:?}");
806 0 : }
807 0 : path
808 : }
809 : None => {
810 0 : let pwd = std::env::current_dir()
811 0 : // technically this can fail but it's quite unlikeley
812 0 : .expect("determine current directory");
813 0 : let pwd_abs = pwd.canonicalize().expect("canonicalize current directory");
814 0 : pwd_abs.join(".neon")
815 : }
816 : };
817 0 : assert!(path.is_absolute());
818 0 : path
819 0 : }
820 :
821 : /// Generate a public/private key pair for JWT authentication
822 0 : fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
823 : // Generate the key pair
824 : //
825 : // openssl genpkey -algorithm ed25519 -out auth_private_key.pem
826 0 : let keygen_output = Command::new("openssl")
827 0 : .arg("genpkey")
828 0 : .args(["-algorithm", "ed25519"])
829 0 : .args(["-out", private_key_path.to_str().unwrap()])
830 0 : .stdout(Stdio::null())
831 0 : .output()
832 0 : .context("failed to generate auth private key")?;
833 0 : if !keygen_output.status.success() {
834 0 : bail!(
835 0 : "openssl failed: '{}'",
836 0 : String::from_utf8_lossy(&keygen_output.stderr)
837 0 : );
838 0 : }
839 : // Extract the public key from the private key file
840 : //
841 : // openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
842 0 : let keygen_output = Command::new("openssl")
843 0 : .arg("pkey")
844 0 : .args(["-in", private_key_path.to_str().unwrap()])
845 0 : .arg("-pubout")
846 0 : .args(["-out", public_key_path.to_str().unwrap()])
847 0 : .output()
848 0 : .context("failed to extract public key from private key")?;
849 0 : if !keygen_output.status.success() {
850 0 : bail!(
851 0 : "openssl failed: '{}'",
852 0 : String::from_utf8_lossy(&keygen_output.stderr)
853 0 : );
854 0 : }
855 0 : Ok(())
856 0 : }
|