TLA Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use anyhow::{anyhow, bail, Context, Result};
9 : use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
10 : use compute_api::spec::ComputeMode;
11 : use control_plane::attachment_service::AttachmentService;
12 : use control_plane::endpoint::ComputeControlPlane;
13 : use control_plane::local_env::LocalEnv;
14 : use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
15 : use control_plane::safekeeper::SafekeeperNode;
16 : use control_plane::tenant_migration::migrate_tenant;
17 : use control_plane::{broker, local_env};
18 : use pageserver_api::models::TimelineInfo;
19 : use pageserver_api::{
20 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
21 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
22 : };
23 : use postgres_backend::AuthType;
24 : use safekeeper_api::{
25 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
26 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
27 : };
28 : use std::collections::{BTreeSet, HashMap};
29 : use std::path::PathBuf;
30 : use std::process::exit;
31 : use std::str::FromStr;
32 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
33 : use utils::{
34 : auth::{Claims, Scope},
35 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
36 : lsn::Lsn,
37 : project_git_version,
38 : };
39 :
40 : // Default id of a safekeeper node, if not specified on the command line.
41 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
42 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
43 : const DEFAULT_BRANCH_NAME: &str = "main";
44 : project_git_version!(GIT_VERSION);
45 :
46 : const DEFAULT_PG_VERSION: &str = "15";
47 :
48 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
49 :
50 UBC 0 : fn default_conf(num_pageservers: u16) -> String {
51 0 : let mut template = format!(
52 0 : r#"
53 0 : # Default built-in configuration, defined in main.rs
54 0 : control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
55 0 :
56 0 : [broker]
57 0 : listen_addr = '{DEFAULT_BROKER_ADDR}'
58 0 :
59 0 : [[safekeepers]]
60 0 : id = {DEFAULT_SAFEKEEPER_ID}
61 0 : pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
62 0 : http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
63 0 :
64 0 : "#,
65 0 : );
66 :
67 0 : for i in 0..num_pageservers {
68 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
69 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
70 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
71 0 :
72 0 : template += &format!(
73 0 : r#"
74 0 : [[pageservers]]
75 0 : id = {pageserver_id}
76 0 : listen_pg_addr = '127.0.0.1:{pg_port}'
77 0 : listen_http_addr = '127.0.0.1:{http_port}'
78 0 : pg_auth_type = '{trust_auth}'
79 0 : http_auth_type = '{trust_auth}'
80 0 : "#,
81 0 : trust_auth = AuthType::Trust,
82 0 : )
83 : }
84 :
85 0 : template
86 0 : }
87 :
88 : ///
89 : /// Timelines tree element used as a value in the HashMap.
90 : ///
91 : struct TimelineTreeEl {
92 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
93 : pub info: TimelineInfo,
94 : /// Name, recovered from neon config mappings
95 : pub name: Option<String>,
96 : /// Holds all direct children of this timeline referenced using `timeline_id`.
97 : pub children: BTreeSet<TimelineId>,
98 : }
99 :
100 : // Main entry point for the 'neon_local' CLI utility
101 : //
102 : // This utility helps to manage neon installation. That includes following:
103 : // * Management of local postgres installations running on top of the
104 : // pageserver.
105 : // * Providing CLI api to the pageserver
106 : // * TODO: export/import to/from usual postgres
107 CBC 5708 : fn main() -> Result<()> {
108 5708 : let matches = cli().get_matches();
109 :
110 5708 : let (sub_name, sub_args) = match matches.subcommand() {
111 5708 : Some(subcommand_data) => subcommand_data,
112 UBC 0 : None => bail!("no subcommand provided"),
113 : };
114 :
115 : // Check for 'neon init' command first.
116 CBC 5708 : let subcommand_result = if sub_name == "init" {
117 333 : handle_init(sub_args).map(Some)
118 : } else {
119 : // all other commands need an existing config
120 5375 : let mut env = LocalEnv::load_config().context("Error loading config")?;
121 5375 : let original_env = env.clone();
122 5375 :
123 5375 : let rt = tokio::runtime::Builder::new_current_thread()
124 5375 : .enable_all()
125 5375 : .build()
126 5375 : .unwrap();
127 :
128 5375 : let subcommand_result = match sub_name {
129 5375 : "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
130 4946 : "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
131 4581 : "start" => rt.block_on(handle_start_all(sub_args, &env)),
132 4578 : "stop" => handle_stop_all(sub_args, &env),
133 4575 : "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
134 3468 : "attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
135 2800 : "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
136 1803 : "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
137 1 : "mappings" => handle_mappings(sub_args, &mut env),
138 UBC 0 : "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
139 0 : _ => bail!("unexpected subcommand {sub_name}"),
140 : };
141 :
142 CBC 5375 : if original_env != env {
143 745 : subcommand_result.map(|()| Some(env))
144 : } else {
145 4630 : subcommand_result.map(|()| None)
146 : }
147 : };
148 :
149 5664 : match subcommand_result {
150 1078 : Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?,
151 4586 : Ok(None) => (),
152 44 : Err(e) => {
153 44 : eprintln!("command failed: {e:?}");
154 44 : exit(1);
155 : }
156 : }
157 5664 : Ok(())
158 5664 : }
159 :
160 : ///
161 : /// Prints timelines list as a tree-like structure.
162 : ///
163 19 : fn print_timelines_tree(
164 19 : timelines: Vec<TimelineInfo>,
165 19 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
166 19 : ) -> Result<()> {
167 19 : let mut timelines_hash = timelines
168 19 : .iter()
169 30 : .map(|t| {
170 30 : (
171 30 : t.timeline_id,
172 30 : TimelineTreeEl {
173 30 : info: t.clone(),
174 30 : children: BTreeSet::new(),
175 30 : name: timeline_name_mappings
176 30 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
177 30 : },
178 30 : )
179 30 : })
180 19 : .collect::<HashMap<_, _>>();
181 :
182 : // Memorize all direct children of each timeline.
183 30 : for timeline in timelines.iter() {
184 30 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
185 15 : timelines_hash
186 15 : .get_mut(&ancestor_timeline_id)
187 15 : .context("missing timeline info in the HashMap")?
188 : .children
189 15 : .insert(timeline.timeline_id);
190 15 : }
191 : }
192 :
193 30 : for timeline in timelines_hash.values() {
194 : // Start with root local timelines (no ancestors) first.
195 30 : if timeline.info.ancestor_timeline_id.is_none() {
196 15 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
197 15 : }
198 : }
199 :
200 19 : Ok(())
201 19 : }
202 :
203 : ///
204 : /// Recursively prints timeline info with all its children.
205 : ///
206 30 : fn print_timeline(
207 30 : nesting_level: usize,
208 30 : is_last: &[bool],
209 30 : timeline: &TimelineTreeEl,
210 30 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
211 30 : ) -> Result<()> {
212 30 : if nesting_level > 0 {
213 15 : let ancestor_lsn = match timeline.info.ancestor_lsn {
214 15 : Some(lsn) => lsn.to_string(),
215 UBC 0 : None => "Unknown Lsn".to_string(),
216 : };
217 :
218 CBC 15 : let mut br_sym = "┣━";
219 15 :
220 15 : // Draw each nesting padding with proper style
221 15 : // depending on whether its timeline ended or not.
222 15 : if nesting_level > 1 {
223 3 : for l in &is_last[1..is_last.len() - 1] {
224 3 : if *l {
225 UBC 0 : print!(" ");
226 CBC 3 : } else {
227 3 : print!("┃ ");
228 3 : }
229 : }
230 12 : }
231 :
232 : // We are the last in this sub-timeline
233 15 : if *is_last.last().unwrap() {
234 10 : br_sym = "┗━";
235 10 : }
236 :
237 15 : print!("{} @{}: ", br_sym, ancestor_lsn);
238 15 : }
239 :
240 : // Finally print a timeline id and name with new line
241 30 : println!(
242 30 : "{} [{}]",
243 30 : timeline.name.as_deref().unwrap_or("_no_name_"),
244 30 : timeline.info.timeline_id
245 30 : );
246 30 :
247 30 : let len = timeline.children.len();
248 30 : let mut i: usize = 0;
249 30 : let mut is_last_new = Vec::from(is_last);
250 30 : is_last_new.push(false);
251 :
252 45 : for child in &timeline.children {
253 15 : i += 1;
254 15 :
255 15 : // Mark that the last padding is the end of the timeline
256 15 : if i == len {
257 10 : if let Some(last) = is_last_new.last_mut() {
258 10 : *last = true;
259 10 : }
260 5 : }
261 :
262 : print_timeline(
263 15 : nesting_level + 1,
264 15 : &is_last_new,
265 15 : timelines
266 15 : .get(child)
267 15 : .context("missing timeline info in the HashMap")?,
268 15 : timelines,
269 UBC 0 : )?;
270 : }
271 :
272 CBC 30 : Ok(())
273 30 : }
274 :
275 : /// Returns a map of timeline IDs to timeline_id@lsn strings.
276 : /// Connects to the pageserver to query this information.
277 UBC 0 : async fn get_timeline_infos(
278 0 : env: &local_env::LocalEnv,
279 0 : tenant_id: &TenantId,
280 0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
281 0 : Ok(get_default_pageserver(env)
282 0 : .timeline_list(tenant_id)
283 0 : .await?
284 0 : .into_iter()
285 0 : .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
286 0 : .collect())
287 0 : }
288 :
289 : // Helper function to parse --tenant_id option, or get the default from config file
290 CBC 884 : fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
291 884 : if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
292 884 : tenant_id_from_arguments
293 UBC 0 : } else if let Some(default_id) = env.default_tenant_id {
294 0 : Ok(default_id)
295 : } else {
296 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
297 : }
298 CBC 884 : }
299 :
300 1293 : fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
301 1293 : sub_match
302 1293 : .get_one::<String>("tenant-id")
303 1293 : .map(|tenant_id| TenantId::from_str(tenant_id))
304 1293 : .transpose()
305 1293 : .context("Failed to parse tenant id from the argument string")
306 1293 : }
307 :
308 514 : fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
309 514 : sub_match
310 514 : .get_one::<String>("timeline-id")
311 514 : .map(|timeline_id| TimelineId::from_str(timeline_id))
312 514 : .transpose()
313 514 : .context("Failed to parse timeline id from the argument string")
314 514 : }
315 :
316 333 : fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
317 333 : let num_pageservers = init_match
318 333 : .get_one::<u16>("num-pageservers")
319 333 : .expect("num-pageservers arg has a default");
320 : // Create config file
321 333 : let toml_file: String = if let Some(config_path) = init_match.get_one::<PathBuf>("config") {
322 : // load and parse the file
323 333 : std::fs::read_to_string(config_path).with_context(|| {
324 UBC 0 : format!(
325 0 : "Could not read configuration file '{}'",
326 0 : config_path.display()
327 0 : )
328 CBC 333 : })?
329 : } else {
330 : // Built-in default config
331 UBC 0 : default_conf(*num_pageservers)
332 : };
333 :
334 CBC 333 : let pg_version = init_match
335 333 : .get_one::<u32>("pg-version")
336 333 : .copied()
337 333 : .context("Failed to parse postgres version from the argument string")?;
338 :
339 333 : let mut env =
340 333 : LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
341 333 : let force = init_match.get_flag("force");
342 333 : env.init(pg_version, force)
343 333 : .context("Failed to initialize neon repository")?;
344 :
345 : // Create remote storage location for default LocalFs remote storage
346 333 : std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
347 :
348 : // Initialize pageserver, create initial tenant and timeline.
349 682 : for ps_conf in &env.pageservers {
350 349 : PageServerNode::from_env(&env, ps_conf)
351 349 : .initialize(&pageserver_config_overrides(init_match))
352 349 : .unwrap_or_else(|e| {
353 UBC 0 : eprintln!("pageserver init failed: {e:?}");
354 0 : exit(1);
355 CBC 349 : });
356 349 : }
357 :
358 333 : Ok(env)
359 333 : }
360 :
361 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
362 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
363 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
364 : /// than this CLI.
365 794 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
366 794 : let ps_conf = env
367 794 : .pageservers
368 794 : .first()
369 794 : .expect("Config is validated to contain at least one pageserver");
370 794 : PageServerNode::from_env(env, ps_conf)
371 794 : }
372 :
373 906 : fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
374 906 : init_match
375 906 : .get_many::<String>("pageserver-config-override")
376 906 : .into_iter()
377 906 : .flatten()
378 906 : .map(String::as_str)
379 906 : .collect()
380 906 : }
381 :
382 429 : async fn handle_tenant(
383 429 : tenant_match: &ArgMatches,
384 429 : env: &mut local_env::LocalEnv,
385 429 : ) -> anyhow::Result<()> {
386 429 : let pageserver = get_default_pageserver(env);
387 429 : match tenant_match.subcommand() {
388 429 : Some(("list", _)) => {
389 24 : for t in pageserver.tenant_list().await? {
390 11 : println!("{} {:?}", t.id, t.state);
391 11 : }
392 : }
393 423 : Some(("create", create_match)) => {
394 409 : let tenant_conf: HashMap<_, _> = create_match
395 409 : .get_many::<String>("config")
396 656 : .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
397 409 : .unwrap_or_default();
398 :
399 : // If tenant ID was not specified, generate one
400 409 : let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
401 :
402 409 : let generation = if env.control_plane_api.is_some() {
403 : // We must register the tenant with the attachment service, so
404 : // that when the pageserver restarts, it will be re-attached.
405 409 : let attachment_service = AttachmentService::from_env(env);
406 409 : attachment_service
407 409 : .attach_hook(tenant_id, pageserver.conf.id)
408 1227 : .await?
409 : } else {
410 UBC 0 : None
411 : };
412 :
413 CBC 409 : pageserver
414 409 : .tenant_create(tenant_id, generation, tenant_conf)
415 1632 : .await?;
416 407 : println!("tenant {tenant_id} successfully created on the pageserver");
417 :
418 : // Create an initial timeline for the new tenant
419 407 : let new_timeline_id = parse_timeline_id(create_match)?;
420 407 : let pg_version = create_match
421 407 : .get_one::<u32>("pg-version")
422 407 : .copied()
423 407 : .context("Failed to parse postgres version from the argument string")?;
424 :
425 407 : let timeline_info = pageserver
426 407 : .timeline_create(
427 407 : tenant_id,
428 407 : new_timeline_id,
429 407 : None,
430 407 : None,
431 407 : Some(pg_version),
432 407 : None,
433 407 : )
434 407 : .await?;
435 407 : let new_timeline_id = timeline_info.timeline_id;
436 407 : let last_record_lsn = timeline_info.last_record_lsn;
437 407 :
438 407 : env.register_branch_mapping(
439 407 : DEFAULT_BRANCH_NAME.to_string(),
440 407 : tenant_id,
441 407 : new_timeline_id,
442 407 : )?;
443 :
444 407 : println!(
445 407 : "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
446 407 : );
447 407 :
448 407 : if create_match.get_flag("set-default") {
449 1 : println!("Setting tenant {tenant_id} as a default one");
450 1 : env.default_tenant_id = Some(tenant_id);
451 406 : }
452 : }
453 14 : Some(("set-default", set_default_match)) => {
454 UBC 0 : let tenant_id =
455 0 : parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
456 0 : println!("Setting tenant {tenant_id} as a default one");
457 0 : env.default_tenant_id = Some(tenant_id);
458 : }
459 CBC 14 : Some(("config", create_match)) => {
460 14 : let tenant_id = get_tenant_id(create_match, env)?;
461 14 : let tenant_conf: HashMap<_, _> = create_match
462 14 : .get_many::<String>("config")
463 47 : .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
464 14 : .unwrap_or_default();
465 14 :
466 14 : pageserver
467 14 : .tenant_config(tenant_id, tenant_conf)
468 56 : .await
469 14 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
470 14 : println!("tenant {tenant_id} successfully configured on the pageserver");
471 : }
472 UBC 0 : Some(("migrate", matches)) => {
473 0 : let tenant_id = get_tenant_id(matches, env)?;
474 0 : let new_pageserver = get_pageserver(env, matches)?;
475 0 : let new_pageserver_id = new_pageserver.conf.id;
476 0 :
477 0 : migrate_tenant(env, tenant_id, new_pageserver).await?;
478 0 : println!("tenant {tenant_id} migrated to {}", new_pageserver_id);
479 : }
480 :
481 0 : Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
482 0 : None => bail!("no tenant subcommand provided"),
483 : }
484 CBC 427 : Ok(())
485 429 : }
486 :
487 365 : async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
488 365 : let pageserver = get_default_pageserver(env);
489 365 :
490 365 : match timeline_match.subcommand() {
491 365 : Some(("list", list_match)) => {
492 19 : let tenant_id = get_tenant_id(list_match, env)?;
493 76 : let timelines = pageserver.timeline_list(&tenant_id).await?;
494 19 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
495 : }
496 346 : Some(("create", create_match)) => {
497 102 : let tenant_id = get_tenant_id(create_match, env)?;
498 102 : let new_branch_name = create_match
499 102 : .get_one::<String>("branch-name")
500 102 : .ok_or_else(|| anyhow!("No branch name provided"))?;
501 :
502 102 : let pg_version = create_match
503 102 : .get_one::<u32>("pg-version")
504 102 : .copied()
505 102 : .context("Failed to parse postgres version from the argument string")?;
506 :
507 102 : let new_timeline_id_opt = parse_timeline_id(create_match)?;
508 :
509 102 : let timeline_info = pageserver
510 102 : .timeline_create(
511 102 : tenant_id,
512 102 : new_timeline_id_opt,
513 102 : None,
514 102 : None,
515 102 : Some(pg_version),
516 102 : None,
517 102 : )
518 408 : .await?;
519 100 : let new_timeline_id = timeline_info.timeline_id;
520 100 :
521 100 : let last_record_lsn = timeline_info.last_record_lsn;
522 100 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
523 :
524 100 : println!(
525 100 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
526 100 : timeline_info.timeline_id
527 100 : );
528 : }
529 244 : Some(("import", import_match)) => {
530 5 : let tenant_id = get_tenant_id(import_match, env)?;
531 5 : let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
532 5 : let name = import_match
533 5 : .get_one::<String>("node-name")
534 5 : .ok_or_else(|| anyhow!("No node name provided"))?;
535 :
536 : // Parse base inputs
537 5 : let base_tarfile = import_match
538 5 : .get_one::<PathBuf>("base-tarfile")
539 5 : .ok_or_else(|| anyhow!("No base-tarfile provided"))?
540 5 : .to_owned();
541 5 : let base_lsn = Lsn::from_str(
542 5 : import_match
543 5 : .get_one::<String>("base-lsn")
544 5 : .ok_or_else(|| anyhow!("No base-lsn provided"))?,
545 UBC 0 : )?;
546 CBC 5 : let base = (base_lsn, base_tarfile);
547 5 :
548 5 : // Parse pg_wal inputs
549 5 : let wal_tarfile = import_match.get_one::<PathBuf>("wal-tarfile").cloned();
550 5 : let end_lsn = import_match
551 5 : .get_one::<String>("end-lsn")
552 5 : .map(|s| Lsn::from_str(s).unwrap());
553 5 : // TODO validate both or none are provided
554 5 : let pg_wal = end_lsn.zip(wal_tarfile);
555 :
556 5 : let pg_version = import_match
557 5 : .get_one::<u32>("pg-version")
558 5 : .copied()
559 5 : .context("Failed to parse postgres version from the argument string")?;
560 :
561 5 : let mut cplane = ComputeControlPlane::load(env.clone())?;
562 5 : println!("Importing timeline into pageserver ...");
563 5 : pageserver
564 5 : .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
565 52050 : .await?;
566 3 : env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
567 :
568 3 : println!("Creating endpoint for imported timeline ...");
569 3 : cplane.new_endpoint(
570 3 : name,
571 3 : tenant_id,
572 3 : timeline_id,
573 3 : None,
574 3 : None,
575 3 : pg_version,
576 3 : ComputeMode::Primary,
577 3 : DEFAULT_PAGESERVER_ID,
578 3 : )?;
579 3 : println!("Done");
580 : }
581 239 : Some(("branch", branch_match)) => {
582 239 : let tenant_id = get_tenant_id(branch_match, env)?;
583 239 : let new_branch_name = branch_match
584 239 : .get_one::<String>("branch-name")
585 239 : .ok_or_else(|| anyhow!("No branch name provided"))?;
586 239 : let ancestor_branch_name = branch_match
587 239 : .get_one::<String>("ancestor-branch-name")
588 239 : .map(|s| s.as_str())
589 239 : .unwrap_or(DEFAULT_BRANCH_NAME);
590 239 : let ancestor_timeline_id = env
591 239 : .get_branch_timeline_id(ancestor_branch_name, tenant_id)
592 239 : .ok_or_else(|| {
593 UBC 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
594 CBC 239 : })?;
595 :
596 239 : let start_lsn = branch_match
597 239 : .get_one::<String>("ancestor-start-lsn")
598 239 : .map(|lsn_str| Lsn::from_str(lsn_str))
599 239 : .transpose()
600 239 : .context("Failed to parse ancestor start Lsn from the request")?;
601 239 : let timeline_info = pageserver
602 239 : .timeline_create(
603 239 : tenant_id,
604 239 : None,
605 239 : start_lsn,
606 239 : Some(ancestor_timeline_id),
607 239 : None,
608 239 : None,
609 239 : )
610 956 : .await?;
611 235 : let new_timeline_id = timeline_info.timeline_id;
612 235 :
613 235 : let last_record_lsn = timeline_info.last_record_lsn;
614 235 :
615 235 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
616 :
617 235 : println!(
618 235 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
619 235 : timeline_info.timeline_id
620 235 : );
621 : }
622 UBC 0 : Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
623 0 : None => bail!("no tenant subcommand provided"),
624 : }
625 :
626 CBC 357 : Ok(())
627 365 : }
628 :
629 1802 : async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
630 1802 : let (sub_name, sub_args) = match ep_match.subcommand() {
631 1802 : Some(ep_subcommand_data) => ep_subcommand_data,
632 UBC 0 : None => bail!("no endpoint subcommand provided"),
633 : };
634 CBC 1802 : let mut cplane = ComputeControlPlane::load(env.clone())?;
635 :
636 1802 : match sub_name {
637 1802 : "list" => {
638 UBC 0 : let tenant_id = get_tenant_id(sub_args, env)?;
639 0 : let timeline_infos = get_timeline_infos(env, &tenant_id)
640 0 : .await
641 0 : .unwrap_or_else(|e| {
642 0 : eprintln!("Failed to load timeline info: {}", e);
643 0 : HashMap::new()
644 0 : });
645 0 :
646 0 : let timeline_name_mappings = env.timeline_name_mappings();
647 0 :
648 0 : let mut table = comfy_table::Table::new();
649 0 :
650 0 : table.load_preset(comfy_table::presets::NOTHING);
651 0 :
652 0 : table.set_header([
653 0 : "ENDPOINT",
654 0 : "ADDRESS",
655 0 : "TIMELINE",
656 0 : "BRANCH NAME",
657 0 : "LSN",
658 0 : "STATUS",
659 0 : ]);
660 :
661 0 : for (endpoint_id, endpoint) in cplane
662 0 : .endpoints
663 0 : .iter()
664 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
665 0 : {
666 0 : let lsn_str = match endpoint.mode {
667 0 : ComputeMode::Static(lsn) => {
668 0 : // -> read-only endpoint
669 0 : // Use the node's LSN.
670 0 : lsn.to_string()
671 : }
672 : _ => {
673 : // -> primary endpoint or hot replica
674 : // Use the LSN at the end of the timeline.
675 0 : timeline_infos
676 0 : .get(&endpoint.timeline_id)
677 0 : .map(|bi| bi.last_record_lsn.to_string())
678 0 : .unwrap_or_else(|| "?".to_string())
679 : }
680 : };
681 :
682 0 : let branch_name = timeline_name_mappings
683 0 : .get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
684 0 : .map(|name| name.as_str())
685 0 : .unwrap_or("?");
686 0 :
687 0 : table.add_row([
688 0 : endpoint_id.as_str(),
689 0 : &endpoint.pg_address.to_string(),
690 0 : &endpoint.timeline_id.to_string(),
691 0 : branch_name,
692 0 : lsn_str.as_str(),
693 0 : endpoint.status(),
694 0 : ]);
695 : }
696 :
697 0 : println!("{table}");
698 : }
699 CBC 1802 : "create" => {
700 505 : let tenant_id = get_tenant_id(sub_args, env)?;
701 505 : let branch_name = sub_args
702 505 : .get_one::<String>("branch-name")
703 505 : .map(|s| s.as_str())
704 505 : .unwrap_or(DEFAULT_BRANCH_NAME);
705 505 : let endpoint_id = sub_args
706 505 : .get_one::<String>("endpoint_id")
707 505 : .map(String::to_string)
708 505 : .unwrap_or_else(|| format!("ep-{branch_name}"));
709 :
710 505 : let lsn = sub_args
711 505 : .get_one::<String>("lsn")
712 505 : .map(|lsn_str| Lsn::from_str(lsn_str))
713 505 : .transpose()
714 505 : .context("Failed to parse Lsn from the request")?;
715 505 : let timeline_id = env
716 505 : .get_branch_timeline_id(branch_name, tenant_id)
717 505 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
718 :
719 505 : let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
720 505 : let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
721 505 : let pg_version = sub_args
722 505 : .get_one::<u32>("pg-version")
723 505 : .copied()
724 505 : .context("Failed to parse postgres version from the argument string")?;
725 :
726 505 : let hot_standby = sub_args
727 505 : .get_one::<bool>("hot-standby")
728 505 : .copied()
729 505 : .unwrap_or(false);
730 :
731 505 : let pageserver_id =
732 505 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
733 7 : NodeId(id_str.parse().context("while parsing pageserver id")?)
734 : } else {
735 498 : DEFAULT_PAGESERVER_ID
736 : };
737 :
738 505 : let mode = match (lsn, hot_standby) {
739 49 : (Some(lsn), false) => ComputeMode::Static(lsn),
740 2 : (None, true) => ComputeMode::Replica,
741 454 : (None, false) => ComputeMode::Primary,
742 UBC 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
743 : };
744 :
745 CBC 505 : match (mode, hot_standby) {
746 : (ComputeMode::Static(_), true) => {
747 UBC 0 : bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
748 : }
749 : (ComputeMode::Primary, true) => {
750 0 : bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
751 : }
752 CBC 505 : _ => {}
753 505 : }
754 505 :
755 505 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
756 :
757 481 : cplane.new_endpoint(
758 481 : &endpoint_id,
759 481 : tenant_id,
760 481 : timeline_id,
761 481 : pg_port,
762 481 : http_port,
763 481 : pg_version,
764 481 : mode,
765 481 : pageserver_id,
766 481 : )?;
767 : }
768 1297 : "start" => {
769 546 : let endpoint_id = sub_args
770 546 : .get_one::<String>("endpoint_id")
771 546 : .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
772 :
773 546 : let pageserver_id =
774 546 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
775 7 : NodeId(id_str.parse().context("while parsing pageserver id")?)
776 : } else {
777 539 : DEFAULT_PAGESERVER_ID
778 : };
779 :
780 546 : let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
781 :
782 : // If --safekeepers argument is given, use only the listed safekeeper nodes.
783 546 : let safekeepers =
784 546 : if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
785 541 : let mut safekeepers: Vec<NodeId> = Vec::new();
786 686 : for sk_id in safekeepers_str.split(',').map(str::trim) {
787 686 : let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
788 UBC 0 : anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
789 CBC 686 : })?);
790 686 : safekeepers.push(sk_id);
791 : }
792 541 : safekeepers
793 : } else {
794 5 : env.safekeepers.iter().map(|sk| sk.id).collect()
795 : };
796 :
797 546 : let endpoint = cplane
798 546 : .endpoints
799 546 : .get(endpoint_id.as_str())
800 546 : .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
801 :
802 546 : cplane.check_conflicting_endpoints(
803 546 : endpoint.mode,
804 546 : endpoint.tenant_id,
805 546 : endpoint.timeline_id,
806 546 : )?;
807 :
808 545 : let ps_conf = env.get_pageserver_conf(pageserver_id)?;
809 545 : let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
810 15 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
811 15 :
812 15 : Some(env.generate_auth_token(&claims)?)
813 : } else {
814 530 : None
815 : };
816 :
817 545 : println!("Starting existing endpoint {endpoint_id}...");
818 545 : endpoint
819 545 : .start(&auth_token, safekeepers, remote_ext_config)
820 6170 : .await?;
821 : }
822 751 : "reconfigure" => {
823 217 : let endpoint_id = sub_args
824 217 : .get_one::<String>("endpoint_id")
825 217 : .ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?;
826 217 : let endpoint = cplane
827 217 : .endpoints
828 217 : .get(endpoint_id.as_str())
829 217 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
830 217 : let pageserver_id =
831 217 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
832 : Some(NodeId(
833 217 : id_str.parse().context("while parsing pageserver id")?,
834 : ))
835 : } else {
836 UBC 0 : None
837 : };
838 CBC 651 : endpoint.reconfigure(pageserver_id).await?;
839 : }
840 534 : "stop" => {
841 534 : let endpoint_id = sub_args
842 534 : .get_one::<String>("endpoint_id")
843 534 : .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
844 534 : let destroy = sub_args.get_flag("destroy");
845 :
846 534 : let endpoint = cplane
847 534 : .endpoints
848 534 : .get(endpoint_id.as_str())
849 534 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
850 534 : endpoint.stop(destroy)?;
851 : }
852 :
853 UBC 0 : _ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
854 : }
855 :
856 CBC 1769 : Ok(())
857 1802 : }
858 :
859 1 : fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
860 1 : let (sub_name, sub_args) = match sub_match.subcommand() {
861 1 : Some(ep_subcommand_data) => ep_subcommand_data,
862 UBC 0 : None => bail!("no mappings subcommand provided"),
863 : };
864 :
865 CBC 1 : match sub_name {
866 1 : "map" => {
867 1 : let branch_name = sub_args
868 1 : .get_one::<String>("branch-name")
869 1 : .expect("branch-name argument missing");
870 1 :
871 1 : let tenant_id = sub_args
872 1 : .get_one::<String>("tenant-id")
873 1 : .map(|x| TenantId::from_str(x))
874 1 : .expect("tenant-id argument missing")
875 1 : .expect("malformed tenant-id arg");
876 1 :
877 1 : let timeline_id = sub_args
878 1 : .get_one::<String>("timeline-id")
879 1 : .map(|x| TimelineId::from_str(x))
880 1 : .expect("timeline-id argument missing")
881 1 : .expect("malformed timeline-id arg");
882 1 :
883 1 : env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
884 :
885 1 : Ok(())
886 : }
887 UBC 0 : other => unimplemented!("mappings subcommand {other}"),
888 : }
889 CBC 1 : }
890 :
891 1107 : fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
892 1107 : let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
893 1107 : NodeId(id_str.parse().context("while parsing pageserver id")?)
894 : } else {
895 UBC 0 : DEFAULT_PAGESERVER_ID
896 : };
897 :
898 : Ok(PageServerNode::from_env(
899 CBC 1107 : env,
900 1107 : env.get_pageserver_conf(node_id)?,
901 : ))
902 1107 : }
903 :
904 1107 : async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
905 1107 : match sub_match.subcommand() {
906 1107 : Some(("start", subcommand_args)) => {
907 553 : if let Err(e) = get_pageserver(env, subcommand_args)?
908 553 : .start(&pageserver_config_overrides(subcommand_args))
909 3332 : .await
910 : {
911 UBC 0 : eprintln!("pageserver start failed: {e}");
912 0 : exit(1);
913 CBC 553 : }
914 : }
915 :
916 554 : Some(("stop", subcommand_args)) => {
917 554 : let immediate = subcommand_args
918 554 : .get_one::<String>("stop-mode")
919 554 : .map(|s| s.as_str())
920 554 : == Some("immediate");
921 :
922 554 : if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) {
923 UBC 0 : eprintln!("pageserver stop failed: {}", e);
924 0 : exit(1);
925 CBC 553 : }
926 : }
927 :
928 UBC 0 : Some(("restart", subcommand_args)) => {
929 0 : let pageserver = get_pageserver(env, subcommand_args)?;
930 : //TODO what shutdown strategy should we use here?
931 0 : if let Err(e) = pageserver.stop(false) {
932 0 : eprintln!("pageserver stop failed: {}", e);
933 0 : exit(1);
934 0 : }
935 :
936 0 : if let Err(e) = pageserver
937 0 : .start(&pageserver_config_overrides(subcommand_args))
938 0 : .await
939 : {
940 0 : eprintln!("pageserver start failed: {e}");
941 0 : exit(1);
942 0 : }
943 : }
944 :
945 0 : Some(("migrate", subcommand_args)) => {
946 0 : let pageserver = get_pageserver(env, subcommand_args)?;
947 : //TODO what shutdown strategy should we use here?
948 0 : if let Err(e) = pageserver.stop(false) {
949 0 : eprintln!("pageserver stop failed: {}", e);
950 0 : exit(1);
951 0 : }
952 :
953 0 : if let Err(e) = pageserver
954 0 : .start(&pageserver_config_overrides(subcommand_args))
955 0 : .await
956 : {
957 0 : eprintln!("pageserver start failed: {e}");
958 0 : exit(1);
959 0 : }
960 : }
961 :
962 0 : Some(("status", subcommand_args)) => {
963 0 : match get_pageserver(env, subcommand_args)?.check_status().await {
964 0 : Ok(_) => println!("Page server is up and running"),
965 0 : Err(err) => {
966 0 : eprintln!("Page server is not available: {}", err);
967 0 : exit(1);
968 : }
969 : }
970 : }
971 :
972 0 : Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
973 0 : None => bail!("no pageserver subcommand provided"),
974 : }
975 CBC 1106 : Ok(())
976 1107 : }
977 :
978 668 : async fn handle_attachment_service(
979 668 : sub_match: &ArgMatches,
980 668 : env: &local_env::LocalEnv,
981 668 : ) -> Result<()> {
982 668 : let svc = AttachmentService::from_env(env);
983 668 : match sub_match.subcommand() {
984 668 : Some(("start", _start_match)) => {
985 333 : if let Err(e) = svc.start().await {
986 UBC 0 : eprintln!("start failed: {e}");
987 0 : exit(1);
988 CBC 333 : }
989 : }
990 :
991 335 : Some(("stop", stop_match)) => {
992 335 : let immediate = stop_match
993 335 : .get_one::<String>("stop-mode")
994 335 : .map(|s| s.as_str())
995 335 : == Some("immediate");
996 :
997 335 : if let Err(e) = svc.stop(immediate) {
998 UBC 0 : eprintln!("stop failed: {}", e);
999 0 : exit(1);
1000 CBC 335 : }
1001 : }
1002 UBC 0 : Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
1003 0 : None => bail!("no attachment_service subcommand provided"),
1004 : }
1005 CBC 668 : Ok(())
1006 668 : }
1007 :
1008 997 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1009 1326 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1010 997 : Ok(SafekeeperNode::from_env(env, node))
1011 : } else {
1012 UBC 0 : bail!("could not find safekeeper {id}")
1013 : }
1014 CBC 997 : }
1015 :
1016 : // Get list of options to append to safekeeper command invocation.
1017 481 : fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec<String> {
1018 481 : init_match
1019 481 : .get_many::<String>("safekeeper-extra-opt")
1020 481 : .into_iter()
1021 481 : .flatten()
1022 481 : .map(|s| s.to_owned())
1023 481 : .collect()
1024 481 : }
1025 :
1026 997 : async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1027 997 : let (sub_name, sub_args) = match sub_match.subcommand() {
1028 997 : Some(safekeeper_command_data) => safekeeper_command_data,
1029 UBC 0 : None => bail!("no safekeeper subcommand provided"),
1030 : };
1031 :
1032 : // All the commands take an optional safekeeper name argument
1033 CBC 997 : let sk_id = if let Some(id_str) = sub_args.get_one::<String>("id") {
1034 983 : NodeId(id_str.parse().context("while parsing safekeeper id")?)
1035 : } else {
1036 14 : DEFAULT_SAFEKEEPER_ID
1037 : };
1038 997 : let safekeeper = get_safekeeper(env, sk_id)?;
1039 :
1040 997 : match sub_name {
1041 997 : "start" => {
1042 481 : let extra_opts = safekeeper_extra_opts(sub_args);
1043 :
1044 1935 : if let Err(e) = safekeeper.start(extra_opts).await {
1045 UBC 0 : eprintln!("safekeeper start failed: {}", e);
1046 0 : exit(1);
1047 CBC 481 : }
1048 : }
1049 :
1050 516 : "stop" => {
1051 516 : let immediate =
1052 516 : sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1053 :
1054 516 : if let Err(e) = safekeeper.stop(immediate) {
1055 UBC 0 : eprintln!("safekeeper stop failed: {}", e);
1056 0 : exit(1);
1057 CBC 516 : }
1058 : }
1059 :
1060 UBC 0 : "restart" => {
1061 0 : let immediate =
1062 0 : sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1063 :
1064 0 : if let Err(e) = safekeeper.stop(immediate) {
1065 0 : eprintln!("safekeeper stop failed: {}", e);
1066 0 : exit(1);
1067 0 : }
1068 0 :
1069 0 : let extra_opts = safekeeper_extra_opts(sub_args);
1070 0 : if let Err(e) = safekeeper.start(extra_opts).await {
1071 0 : eprintln!("safekeeper start failed: {}", e);
1072 0 : exit(1);
1073 0 : }
1074 : }
1075 :
1076 : _ => {
1077 0 : bail!("Unexpected safekeeper subcommand '{}'", sub_name)
1078 : }
1079 : }
1080 CBC 997 : Ok(())
1081 997 : }
1082 :
1083 3 : async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
1084 3 : // Endpoints are not started automatically
1085 3 :
1086 10 : broker::start_broker_process(env).await?;
1087 :
1088 : // Only start the attachment service if the pageserver is configured to need it
1089 3 : if env.control_plane_api.is_some() {
1090 3 : let attachment_service = AttachmentService::from_env(env);
1091 3 : if let Err(e) = attachment_service.start().await {
1092 UBC 0 : eprintln!("attachment_service start failed: {:#}", e);
1093 0 : try_stop_all(env, true);
1094 0 : exit(1);
1095 CBC 3 : }
1096 UBC 0 : }
1097 :
1098 CBC 7 : for ps_conf in &env.pageservers {
1099 4 : let pageserver = PageServerNode::from_env(env, ps_conf);
1100 4 : if let Err(e) = pageserver
1101 4 : .start(&pageserver_config_overrides(sub_match))
1102 24 : .await
1103 : {
1104 UBC 0 : eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
1105 0 : try_stop_all(env, true);
1106 0 : exit(1);
1107 CBC 4 : }
1108 : }
1109 :
1110 4 : for node in env.safekeepers.iter() {
1111 4 : let safekeeper = SafekeeperNode::from_env(env, node);
1112 16 : if let Err(e) = safekeeper.start(vec![]).await {
1113 UBC 0 : eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
1114 0 : try_stop_all(env, false);
1115 0 : exit(1);
1116 CBC 4 : }
1117 : }
1118 3 : Ok(())
1119 3 : }
1120 :
1121 3 : fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1122 3 : let immediate =
1123 3 : sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1124 3 :
1125 3 : try_stop_all(env, immediate);
1126 3 :
1127 3 : Ok(())
1128 3 : }
1129 :
1130 3 : fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
1131 3 : // Stop all endpoints
1132 3 : match ComputeControlPlane::load(env.clone()) {
1133 3 : Ok(cplane) => {
1134 5 : for (_k, node) in cplane.endpoints {
1135 2 : if let Err(e) = node.stop(false) {
1136 UBC 0 : eprintln!("postgres stop failed: {e:#}");
1137 CBC 2 : }
1138 : }
1139 : }
1140 UBC 0 : Err(e) => {
1141 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
1142 : }
1143 : }
1144 :
1145 CBC 7 : for ps_conf in &env.pageservers {
1146 4 : let pageserver = PageServerNode::from_env(env, ps_conf);
1147 4 : if let Err(e) = pageserver.stop(immediate) {
1148 UBC 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
1149 CBC 4 : }
1150 : }
1151 :
1152 4 : for node in env.safekeepers.iter() {
1153 4 : let safekeeper = SafekeeperNode::from_env(env, node);
1154 4 : if let Err(e) = safekeeper.stop(immediate) {
1155 UBC 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
1156 CBC 4 : }
1157 : }
1158 :
1159 3 : if let Err(e) = broker::stop_broker_process(env) {
1160 UBC 0 : eprintln!("neon broker stop failed: {e:#}");
1161 CBC 3 : }
1162 :
1163 3 : if env.control_plane_api.is_some() {
1164 3 : let attachment_service = AttachmentService::from_env(env);
1165 3 : if let Err(e) = attachment_service.stop(immediate) {
1166 UBC 0 : eprintln!("attachment service stop failed: {e:#}");
1167 CBC 3 : }
1168 UBC 0 : }
1169 CBC 3 : }
1170 :
1171 5709 : fn cli() -> Command {
1172 5709 : let branch_name_arg = Arg::new("branch-name")
1173 5709 : .long("branch-name")
1174 5709 : .help("Name of the branch to be created or used as an alias for other services")
1175 5709 : .required(false);
1176 5709 :
1177 5709 : let endpoint_id_arg = Arg::new("endpoint_id")
1178 5709 : .help("Postgres endpoint id")
1179 5709 : .required(false);
1180 5709 :
1181 5709 : let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
1182 5709 :
1183 5709 : // --id, when using a pageserver command
1184 5709 : let pageserver_id_arg = Arg::new("pageserver-id")
1185 5709 : .long("id")
1186 5709 : .global(true)
1187 5709 : .help("pageserver id")
1188 5709 : .required(false);
1189 5709 : // --pageserver-id when using a non-pageserver command
1190 5709 : let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id")
1191 5709 : .long("pageserver-id")
1192 5709 : .required(false);
1193 5709 :
1194 5709 : let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt")
1195 5709 : .short('e')
1196 5709 : .long("safekeeper-extra-opt")
1197 5709 : .num_args(1)
1198 5709 : .action(ArgAction::Append)
1199 5709 : .help("Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo")
1200 5709 : .required(false);
1201 5709 :
1202 5709 : let tenant_id_arg = Arg::new("tenant-id")
1203 5709 : .long("tenant-id")
1204 5709 : .help("Tenant id. Represented as a hexadecimal string 32 symbols length")
1205 5709 : .required(false);
1206 5709 :
1207 5709 : let timeline_id_arg = Arg::new("timeline-id")
1208 5709 : .long("timeline-id")
1209 5709 : .help("Timeline id. Represented as a hexadecimal string 32 symbols length")
1210 5709 : .required(false);
1211 5709 :
1212 5709 : let pg_version_arg = Arg::new("pg-version")
1213 5709 : .long("pg-version")
1214 5709 : .help("Postgres version to use for the initial tenant")
1215 5709 : .required(false)
1216 5709 : .value_parser(value_parser!(u32))
1217 5709 : .default_value(DEFAULT_PG_VERSION);
1218 5709 :
1219 5709 : let pg_port_arg = Arg::new("pg-port")
1220 5709 : .long("pg-port")
1221 5709 : .required(false)
1222 5709 : .value_parser(value_parser!(u16))
1223 5709 : .value_name("pg-port");
1224 5709 :
1225 5709 : let http_port_arg = Arg::new("http-port")
1226 5709 : .long("http-port")
1227 5709 : .required(false)
1228 5709 : .value_parser(value_parser!(u16))
1229 5709 : .value_name("http-port");
1230 5709 :
1231 5709 : let safekeepers_arg = Arg::new("safekeepers")
1232 5709 : .long("safekeepers")
1233 5709 : .required(false)
1234 5709 : .value_name("safekeepers");
1235 5709 :
1236 5709 : let stop_mode_arg = Arg::new("stop-mode")
1237 5709 : .short('m')
1238 5709 : .value_parser(["fast", "immediate"])
1239 5709 : .default_value("fast")
1240 5709 : .help("If 'immediate', don't flush repository data at shutdown")
1241 5709 : .required(false)
1242 5709 : .value_name("stop-mode");
1243 5709 :
1244 5709 : let pageserver_config_args = Arg::new("pageserver-config-override")
1245 5709 : .long("pageserver-config-override")
1246 5709 : .num_args(1)
1247 5709 : .action(ArgAction::Append)
1248 5709 : .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
1249 5709 : .required(false);
1250 5709 :
1251 5709 : let remote_ext_config_args = Arg::new("remote-ext-config")
1252 5709 : .long("remote-ext-config")
1253 5709 : .num_args(1)
1254 5709 : .help("Configure the remote extensions storage proxy gateway to request for extensions.")
1255 5709 : .required(false);
1256 5709 :
1257 5709 : let lsn_arg = Arg::new("lsn")
1258 5709 : .long("lsn")
1259 5709 : .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
1260 5709 : .required(false);
1261 5709 :
1262 5709 : let hot_standby_arg = Arg::new("hot-standby")
1263 5709 : .value_parser(value_parser!(bool))
1264 5709 : .long("hot-standby")
1265 5709 : .help("If set, the node will be a hot replica on the specified timeline")
1266 5709 : .required(false);
1267 5709 :
1268 5709 : let force_arg = Arg::new("force")
1269 5709 : .value_parser(value_parser!(bool))
1270 5709 : .long("force")
1271 5709 : .action(ArgAction::SetTrue)
1272 5709 : .help("Force initialization even if the repository is not empty")
1273 5709 : .required(false);
1274 5709 :
1275 5709 : let num_pageservers_arg = Arg::new("num-pageservers")
1276 5709 : .value_parser(value_parser!(u16))
1277 5709 : .long("num-pageservers")
1278 5709 : .help("How many pageservers to create (default 1)")
1279 5709 : .required(false)
1280 5709 : .default_value("1");
1281 5709 :
1282 5709 : Command::new("Neon CLI")
1283 5709 : .arg_required_else_help(true)
1284 5709 : .version(GIT_VERSION)
1285 5709 : .subcommand(
1286 5709 : Command::new("init")
1287 5709 : .about("Initialize a new Neon repository, preparing configs for services to start with")
1288 5709 : .arg(pageserver_config_args.clone())
1289 5709 : .arg(num_pageservers_arg.clone())
1290 5709 : .arg(
1291 5709 : Arg::new("config")
1292 5709 : .long("config")
1293 5709 : .required(false)
1294 5709 : .value_parser(value_parser!(PathBuf))
1295 5709 : .value_name("config"),
1296 5709 : )
1297 5709 : .arg(pg_version_arg.clone())
1298 5709 : .arg(force_arg)
1299 5709 : )
1300 5709 : .subcommand(
1301 5709 : Command::new("timeline")
1302 5709 : .about("Manage timelines")
1303 5709 : .subcommand(Command::new("list")
1304 5709 : .about("List all timelines, available to this pageserver")
1305 5709 : .arg(tenant_id_arg.clone()))
1306 5709 : .subcommand(Command::new("branch")
1307 5709 : .about("Create a new timeline, using another timeline as a base, copying its data")
1308 5709 : .arg(tenant_id_arg.clone())
1309 5709 : .arg(branch_name_arg.clone())
1310 5709 : .arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
1311 5709 : .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
1312 5709 : .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
1313 5709 : .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
1314 5709 : .subcommand(Command::new("create")
1315 5709 : .about("Create a new blank timeline")
1316 5709 : .arg(tenant_id_arg.clone())
1317 5709 : .arg(timeline_id_arg.clone())
1318 5709 : .arg(branch_name_arg.clone())
1319 5709 : .arg(pg_version_arg.clone())
1320 5709 : )
1321 5709 : .subcommand(Command::new("import")
1322 5709 : .about("Import timeline from basebackup directory")
1323 5709 : .arg(tenant_id_arg.clone())
1324 5709 : .arg(timeline_id_arg.clone())
1325 5709 : .arg(Arg::new("node-name").long("node-name")
1326 5709 : .help("Name to assign to the imported timeline"))
1327 5709 : .arg(Arg::new("base-tarfile")
1328 5709 : .long("base-tarfile")
1329 5709 : .value_parser(value_parser!(PathBuf))
1330 5709 : .help("Basebackup tarfile to import")
1331 5709 : )
1332 5709 : .arg(Arg::new("base-lsn").long("base-lsn")
1333 5709 : .help("Lsn the basebackup starts at"))
1334 5709 : .arg(Arg::new("wal-tarfile")
1335 5709 : .long("wal-tarfile")
1336 5709 : .value_parser(value_parser!(PathBuf))
1337 5709 : .help("Wal to add after base")
1338 5709 : )
1339 5709 : .arg(Arg::new("end-lsn").long("end-lsn")
1340 5709 : .help("Lsn the basebackup ends at"))
1341 5709 : .arg(pg_version_arg.clone())
1342 5709 : )
1343 5709 : ).subcommand(
1344 5709 : Command::new("tenant")
1345 5709 : .arg_required_else_help(true)
1346 5709 : .about("Manage tenants")
1347 5709 : .subcommand(Command::new("list"))
1348 5709 : .subcommand(Command::new("create")
1349 5709 : .arg(tenant_id_arg.clone())
1350 5709 : .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
1351 5709 : .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))
1352 5709 : .arg(pg_version_arg.clone())
1353 5709 : .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
1354 5709 : .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
1355 5709 : )
1356 5709 : .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
1357 5709 : .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
1358 5709 : .subcommand(Command::new("config")
1359 5709 : .arg(tenant_id_arg.clone())
1360 5709 : .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
1361 5709 : .subcommand(Command::new("migrate")
1362 5709 : .about("Migrate a tenant from one pageserver to another")
1363 5709 : .arg(tenant_id_arg.clone())
1364 5709 : .arg(pageserver_id_arg.clone()))
1365 5709 : )
1366 5709 : .subcommand(
1367 5709 : Command::new("pageserver")
1368 5709 : .arg_required_else_help(true)
1369 5709 : .about("Manage pageserver")
1370 5709 : .arg(pageserver_id_arg)
1371 5709 : .subcommand(Command::new("status"))
1372 5709 : .subcommand(Command::new("start")
1373 5709 : .about("Start local pageserver")
1374 5709 : .arg(pageserver_config_args.clone())
1375 5709 : )
1376 5709 : .subcommand(Command::new("stop")
1377 5709 : .about("Stop local pageserver")
1378 5709 : .arg(stop_mode_arg.clone())
1379 5709 : )
1380 5709 : .subcommand(Command::new("restart")
1381 5709 : .about("Restart local pageserver")
1382 5709 : .arg(pageserver_config_args.clone())
1383 5709 : )
1384 5709 : )
1385 5709 : .subcommand(
1386 5709 : Command::new("attachment_service")
1387 5709 : .arg_required_else_help(true)
1388 5709 : .about("Manage attachment_service")
1389 5709 : .subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
1390 5709 : .subcommand(Command::new("stop").about("Stop local pageserver")
1391 5709 : .arg(stop_mode_arg.clone()))
1392 5709 : )
1393 5709 : .subcommand(
1394 5709 : Command::new("safekeeper")
1395 5709 : .arg_required_else_help(true)
1396 5709 : .about("Manage safekeepers")
1397 5709 : .subcommand(Command::new("start")
1398 5709 : .about("Start local safekeeper")
1399 5709 : .arg(safekeeper_id_arg.clone())
1400 5709 : .arg(safekeeper_extra_opt_arg.clone())
1401 5709 : )
1402 5709 : .subcommand(Command::new("stop")
1403 5709 : .about("Stop local safekeeper")
1404 5709 : .arg(safekeeper_id_arg.clone())
1405 5709 : .arg(stop_mode_arg.clone())
1406 5709 : )
1407 5709 : .subcommand(Command::new("restart")
1408 5709 : .about("Restart local safekeeper")
1409 5709 : .arg(safekeeper_id_arg)
1410 5709 : .arg(stop_mode_arg.clone())
1411 5709 : .arg(safekeeper_extra_opt_arg)
1412 5709 : )
1413 5709 : )
1414 5709 : .subcommand(
1415 5709 : Command::new("endpoint")
1416 5709 : .arg_required_else_help(true)
1417 5709 : .about("Manage postgres instances")
1418 5709 : .subcommand(Command::new("list").arg(tenant_id_arg.clone()))
1419 5709 : .subcommand(Command::new("create")
1420 5709 : .about("Create a compute endpoint")
1421 5709 : .arg(endpoint_id_arg.clone())
1422 5709 : .arg(branch_name_arg.clone())
1423 5709 : .arg(tenant_id_arg.clone())
1424 5709 : .arg(lsn_arg.clone())
1425 5709 : .arg(pg_port_arg.clone())
1426 5709 : .arg(http_port_arg.clone())
1427 5709 : .arg(endpoint_pageserver_id_arg.clone())
1428 5709 : .arg(
1429 5709 : Arg::new("config-only")
1430 5709 : .help("Don't do basebackup, create endpoint directory with only config files")
1431 5709 : .long("config-only")
1432 5709 : .required(false))
1433 5709 : .arg(pg_version_arg.clone())
1434 5709 : .arg(hot_standby_arg.clone())
1435 5709 : )
1436 5709 : .subcommand(Command::new("start")
1437 5709 : .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
1438 5709 : .arg(endpoint_id_arg.clone())
1439 5709 : .arg(endpoint_pageserver_id_arg.clone())
1440 5709 : .arg(safekeepers_arg)
1441 5709 : .arg(remote_ext_config_args)
1442 5709 : )
1443 5709 : .subcommand(Command::new("reconfigure")
1444 5709 : .about("Reconfigure the endpoint")
1445 5709 : .arg(endpoint_pageserver_id_arg)
1446 5709 : .arg(endpoint_id_arg.clone())
1447 5709 : .arg(tenant_id_arg.clone())
1448 5709 : )
1449 5709 : .subcommand(
1450 5709 : Command::new("stop")
1451 5709 : .arg(endpoint_id_arg)
1452 5709 : .arg(
1453 5709 : Arg::new("destroy")
1454 5709 : .help("Also delete data directory (now optional, should be default in future)")
1455 5709 : .long("destroy")
1456 5709 : .action(ArgAction::SetTrue)
1457 5709 : .required(false)
1458 5709 : )
1459 5709 : )
1460 5709 :
1461 5709 : )
1462 5709 : .subcommand(
1463 5709 : Command::new("mappings")
1464 5709 : .arg_required_else_help(true)
1465 5709 : .about("Manage neon_local branch name mappings")
1466 5709 : .subcommand(
1467 5709 : Command::new("map")
1468 5709 : .about("Create new mapping which cannot exist already")
1469 5709 : .arg(branch_name_arg.clone())
1470 5709 : .arg(tenant_id_arg.clone())
1471 5709 : .arg(timeline_id_arg.clone())
1472 5709 : )
1473 5709 : )
1474 5709 : // Obsolete old name for 'endpoint'. We now just print an error if it's used.
1475 5709 : .subcommand(
1476 5709 : Command::new("pg")
1477 5709 : .hide(true)
1478 5709 : .arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
1479 5709 : .trailing_var_arg(true)
1480 5709 : )
1481 5709 : .subcommand(
1482 5709 : Command::new("start")
1483 5709 : .about("Start page server and safekeepers")
1484 5709 : .arg(pageserver_config_args)
1485 5709 : )
1486 5709 : .subcommand(
1487 5709 : Command::new("stop")
1488 5709 : .about("Stop page server and safekeepers")
1489 5709 : .arg(stop_mode_arg)
1490 5709 : )
1491 5709 : }
1492 :
1493 1 : #[test]
1494 1 : fn verify_cli() {
1495 1 : cli().debug_assert();
1496 1 : }
|