Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use anyhow::{anyhow, bail, Context, Result};
9 : use clap::{value_parser, Arg, ArgAction, ArgMatches, Command, ValueEnum};
10 : use compute_api::spec::ComputeMode;
11 : use control_plane::attachment_service::AttachmentService;
12 : use control_plane::endpoint::ComputeControlPlane;
13 : use control_plane::local_env::{InitForceMode, LocalEnv};
14 : use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
15 : use control_plane::safekeeper::SafekeeperNode;
16 : use control_plane::{broker, local_env};
17 : use pageserver_api::controller_api::{
18 : NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy,
19 : };
20 : use pageserver_api::models::{
21 : ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo,
22 : };
23 : use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
24 : use pageserver_api::{
25 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
26 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
27 : };
28 : use postgres_backend::AuthType;
29 : use postgres_connection::parse_host_port;
30 : use safekeeper_api::{
31 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
32 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
33 : };
34 : use std::collections::{BTreeSet, HashMap};
35 : use std::path::PathBuf;
36 : use std::process::exit;
37 : use std::str::FromStr;
38 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
39 : use url::Host;
40 : use utils::{
41 : auth::{Claims, Scope},
42 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
43 : lsn::Lsn,
44 : project_git_version,
45 : };
46 :
47 : // Default id of a safekeeper node, if not specified on the command line.
48 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
49 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
50 : const DEFAULT_BRANCH_NAME: &str = "main";
51 : project_git_version!(GIT_VERSION);
52 :
53 : const DEFAULT_PG_VERSION: &str = "15";
54 :
55 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
56 :
57 0 : fn default_conf(num_pageservers: u16) -> String {
58 0 : let mut template = format!(
59 0 : r#"
60 0 : # Default built-in configuration, defined in main.rs
61 0 : control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
62 0 :
63 0 : [broker]
64 0 : listen_addr = '{DEFAULT_BROKER_ADDR}'
65 0 :
66 0 : [[safekeepers]]
67 0 : id = {DEFAULT_SAFEKEEPER_ID}
68 0 : pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
69 0 : http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
70 0 :
71 0 : "#,
72 0 : );
73 :
74 0 : for i in 0..num_pageservers {
75 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
76 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
77 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
78 0 :
79 0 : template += &format!(
80 0 : r#"
81 0 : [[pageservers]]
82 0 : id = {pageserver_id}
83 0 : listen_pg_addr = '127.0.0.1:{pg_port}'
84 0 : listen_http_addr = '127.0.0.1:{http_port}'
85 0 : pg_auth_type = '{trust_auth}'
86 0 : http_auth_type = '{trust_auth}'
87 0 : "#,
88 0 : trust_auth = AuthType::Trust,
89 0 : )
90 : }
91 :
92 0 : template
93 0 : }
94 :
95 : ///
96 : /// Timelines tree element used as a value in the HashMap.
97 : ///
98 : struct TimelineTreeEl {
99 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
100 : pub info: TimelineInfo,
101 : /// Name, recovered from neon config mappings
102 : pub name: Option<String>,
103 : /// Holds all direct children of this timeline referenced using `timeline_id`.
104 : pub children: BTreeSet<TimelineId>,
105 : }
106 :
107 : // Main entry point for the 'neon_local' CLI utility
108 : //
109 : // This utility helps to manage neon installation. That includes following:
110 : // * Management of local postgres installations running on top of the
111 : // pageserver.
112 : // * Providing CLI api to the pageserver
113 : // * TODO: export/import to/from usual postgres
114 0 : fn main() -> Result<()> {
115 0 : let matches = cli().get_matches();
116 :
117 0 : let (sub_name, sub_args) = match matches.subcommand() {
118 0 : Some(subcommand_data) => subcommand_data,
119 0 : None => bail!("no subcommand provided"),
120 : };
121 :
122 : // Check for 'neon init' command first.
123 0 : let subcommand_result = if sub_name == "init" {
124 0 : handle_init(sub_args).map(Some)
125 : } else {
126 : // all other commands need an existing config
127 0 : let mut env = LocalEnv::load_config().context("Error loading config")?;
128 0 : let original_env = env.clone();
129 0 :
130 0 : let rt = tokio::runtime::Builder::new_current_thread()
131 0 : .enable_all()
132 0 : .build()
133 0 : .unwrap();
134 :
135 0 : let subcommand_result = match sub_name {
136 0 : "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
137 0 : "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
138 0 : "start" => rt.block_on(handle_start_all(sub_args, &env)),
139 0 : "stop" => rt.block_on(handle_stop_all(sub_args, &env)),
140 0 : "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
141 0 : "attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
142 0 : "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
143 0 : "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
144 0 : "mappings" => handle_mappings(sub_args, &mut env),
145 0 : "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
146 0 : _ => bail!("unexpected subcommand {sub_name}"),
147 : };
148 :
149 0 : if original_env != env {
150 0 : subcommand_result.map(|()| Some(env))
151 : } else {
152 0 : subcommand_result.map(|()| None)
153 : }
154 : };
155 :
156 0 : match subcommand_result {
157 0 : Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?,
158 0 : Ok(None) => (),
159 0 : Err(e) => {
160 0 : eprintln!("command failed: {e:?}");
161 0 : exit(1);
162 : }
163 : }
164 0 : Ok(())
165 0 : }
166 :
167 : ///
168 : /// Prints timelines list as a tree-like structure.
169 : ///
170 0 : fn print_timelines_tree(
171 0 : timelines: Vec<TimelineInfo>,
172 0 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
173 0 : ) -> Result<()> {
174 0 : let mut timelines_hash = timelines
175 0 : .iter()
176 0 : .map(|t| {
177 0 : (
178 0 : t.timeline_id,
179 0 : TimelineTreeEl {
180 0 : info: t.clone(),
181 0 : children: BTreeSet::new(),
182 0 : name: timeline_name_mappings
183 0 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
184 0 : },
185 0 : )
186 0 : })
187 0 : .collect::<HashMap<_, _>>();
188 :
189 : // Memorize all direct children of each timeline.
190 0 : for timeline in timelines.iter() {
191 0 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
192 0 : timelines_hash
193 0 : .get_mut(&ancestor_timeline_id)
194 0 : .context("missing timeline info in the HashMap")?
195 : .children
196 0 : .insert(timeline.timeline_id);
197 0 : }
198 : }
199 :
200 0 : for timeline in timelines_hash.values() {
201 : // Start with root local timelines (no ancestors) first.
202 0 : if timeline.info.ancestor_timeline_id.is_none() {
203 0 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
204 0 : }
205 : }
206 :
207 0 : Ok(())
208 0 : }
209 :
210 : ///
211 : /// Recursively prints timeline info with all its children.
212 : ///
213 0 : fn print_timeline(
214 0 : nesting_level: usize,
215 0 : is_last: &[bool],
216 0 : timeline: &TimelineTreeEl,
217 0 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
218 0 : ) -> Result<()> {
219 0 : if nesting_level > 0 {
220 0 : let ancestor_lsn = match timeline.info.ancestor_lsn {
221 0 : Some(lsn) => lsn.to_string(),
222 0 : None => "Unknown Lsn".to_string(),
223 : };
224 :
225 0 : let mut br_sym = "┣━";
226 0 :
227 0 : // Draw each nesting padding with proper style
228 0 : // depending on whether its timeline ended or not.
229 0 : if nesting_level > 1 {
230 0 : for l in &is_last[1..is_last.len() - 1] {
231 0 : if *l {
232 0 : print!(" ");
233 0 : } else {
234 0 : print!("┃ ");
235 0 : }
236 : }
237 0 : }
238 :
239 : // We are the last in this sub-timeline
240 0 : if *is_last.last().unwrap() {
241 0 : br_sym = "┗━";
242 0 : }
243 :
244 0 : print!("{} @{}: ", br_sym, ancestor_lsn);
245 0 : }
246 :
247 : // Finally print a timeline id and name with new line
248 0 : println!(
249 0 : "{} [{}]",
250 0 : timeline.name.as_deref().unwrap_or("_no_name_"),
251 0 : timeline.info.timeline_id
252 0 : );
253 0 :
254 0 : let len = timeline.children.len();
255 0 : let mut i: usize = 0;
256 0 : let mut is_last_new = Vec::from(is_last);
257 0 : is_last_new.push(false);
258 :
259 0 : for child in &timeline.children {
260 0 : i += 1;
261 0 :
262 0 : // Mark that the last padding is the end of the timeline
263 0 : if i == len {
264 0 : if let Some(last) = is_last_new.last_mut() {
265 0 : *last = true;
266 0 : }
267 0 : }
268 :
269 : print_timeline(
270 0 : nesting_level + 1,
271 0 : &is_last_new,
272 0 : timelines
273 0 : .get(child)
274 0 : .context("missing timeline info in the HashMap")?,
275 0 : timelines,
276 0 : )?;
277 : }
278 :
279 0 : Ok(())
280 0 : }
281 :
282 : /// Returns a map of timeline IDs to timeline_id@lsn strings.
283 : /// Connects to the pageserver to query this information.
284 0 : async fn get_timeline_infos(
285 0 : env: &local_env::LocalEnv,
286 0 : tenant_shard_id: &TenantShardId,
287 0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
288 0 : Ok(get_default_pageserver(env)
289 0 : .timeline_list(tenant_shard_id)
290 0 : .await?
291 0 : .into_iter()
292 0 : .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
293 0 : .collect())
294 0 : }
295 :
296 : // Helper function to parse --tenant_id option, or get the default from config file
297 0 : fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
298 0 : if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
299 0 : tenant_id_from_arguments
300 0 : } else if let Some(default_id) = env.default_tenant_id {
301 0 : Ok(default_id)
302 : } else {
303 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
304 : }
305 0 : }
306 :
307 : // Helper function to parse --tenant_id option, for commands that accept a shard suffix
308 0 : fn get_tenant_shard_id(
309 0 : sub_match: &ArgMatches,
310 0 : env: &local_env::LocalEnv,
311 0 : ) -> anyhow::Result<TenantShardId> {
312 0 : if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
313 0 : tenant_id_from_arguments
314 0 : } else if let Some(default_id) = env.default_tenant_id {
315 0 : Ok(TenantShardId::unsharded(default_id))
316 : } else {
317 0 : anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
318 : }
319 0 : }
320 :
321 0 : fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
322 0 : sub_match
323 0 : .get_one::<String>("tenant-id")
324 0 : .map(|tenant_id| TenantId::from_str(tenant_id))
325 0 : .transpose()
326 0 : .context("Failed to parse tenant id from the argument string")
327 0 : }
328 :
329 0 : fn parse_tenant_shard_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantShardId>> {
330 0 : sub_match
331 0 : .get_one::<String>("tenant-id")
332 0 : .map(|id_str| TenantShardId::from_str(id_str))
333 0 : .transpose()
334 0 : .context("Failed to parse tenant shard id from the argument string")
335 0 : }
336 :
337 0 : fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
338 0 : sub_match
339 0 : .get_one::<String>("timeline-id")
340 0 : .map(|timeline_id| TimelineId::from_str(timeline_id))
341 0 : .transpose()
342 0 : .context("Failed to parse timeline id from the argument string")
343 0 : }
344 :
345 0 : fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
346 0 : let num_pageservers = init_match
347 0 : .get_one::<u16>("num-pageservers")
348 0 : .expect("num-pageservers arg has a default");
349 : // Create config file
350 0 : let toml_file: String = if let Some(config_path) = init_match.get_one::<PathBuf>("config") {
351 : // load and parse the file
352 0 : std::fs::read_to_string(config_path).with_context(|| {
353 0 : format!(
354 0 : "Could not read configuration file '{}'",
355 0 : config_path.display()
356 0 : )
357 0 : })?
358 : } else {
359 : // Built-in default config
360 0 : default_conf(*num_pageservers)
361 : };
362 :
363 0 : let pg_version = init_match
364 0 : .get_one::<u32>("pg-version")
365 0 : .copied()
366 0 : .context("Failed to parse postgres version from the argument string")?;
367 :
368 0 : let mut env =
369 0 : LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
370 0 : let force = init_match.get_one("force").expect("we set a default value");
371 0 : env.init(pg_version, force)
372 0 : .context("Failed to initialize neon repository")?;
373 :
374 : // Create remote storage location for default LocalFs remote storage
375 0 : std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
376 :
377 : // Initialize pageserver, create initial tenant and timeline.
378 0 : for ps_conf in &env.pageservers {
379 0 : PageServerNode::from_env(&env, ps_conf)
380 0 : .initialize(&pageserver_config_overrides(init_match))
381 0 : .unwrap_or_else(|e| {
382 0 : eprintln!("pageserver init failed: {e:?}");
383 0 : exit(1);
384 0 : });
385 0 : }
386 :
387 0 : Ok(env)
388 0 : }
389 :
390 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
391 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
392 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
393 : /// than this CLI.
394 0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
395 0 : let ps_conf = env
396 0 : .pageservers
397 0 : .first()
398 0 : .expect("Config is validated to contain at least one pageserver");
399 0 : PageServerNode::from_env(env, ps_conf)
400 0 : }
401 :
402 0 : fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
403 0 : init_match
404 0 : .get_many::<String>("pageserver-config-override")
405 0 : .into_iter()
406 0 : .flatten()
407 0 : .map(String::as_str)
408 0 : .collect()
409 0 : }
410 :
411 0 : async fn handle_tenant(
412 0 : tenant_match: &ArgMatches,
413 0 : env: &mut local_env::LocalEnv,
414 0 : ) -> anyhow::Result<()> {
415 0 : let pageserver = get_default_pageserver(env);
416 0 : match tenant_match.subcommand() {
417 0 : Some(("list", _)) => {
418 0 : for t in pageserver.tenant_list().await? {
419 0 : println!("{} {:?}", t.id, t.state);
420 0 : }
421 : }
422 0 : Some(("create", create_match)) => {
423 0 : let tenant_conf: HashMap<_, _> = create_match
424 0 : .get_many::<String>("config")
425 0 : .map(|vals: clap::parser::ValuesRef<'_, String>| {
426 0 : vals.flat_map(|c| c.split_once(':')).collect()
427 0 : })
428 0 : .unwrap_or_default();
429 0 :
430 0 : let shard_count: u8 = create_match
431 0 : .get_one::<u8>("shard-count")
432 0 : .cloned()
433 0 : .unwrap_or(0);
434 0 :
435 0 : let shard_stripe_size: Option<u32> =
436 0 : create_match.get_one::<u32>("shard-stripe-size").cloned();
437 :
438 0 : let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
439 :
440 : // If tenant ID was not specified, generate one
441 0 : let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
442 0 :
443 0 : // We must register the tenant with the attachment service, so
444 0 : // that when the pageserver restarts, it will be re-attached.
445 0 : let attachment_service = AttachmentService::from_env(env);
446 0 : attachment_service
447 0 : .tenant_create(TenantCreateRequest {
448 0 : // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
449 0 : // attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
450 0 : // type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
451 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
452 0 : generation: None,
453 0 : shard_parameters: ShardParameters {
454 0 : count: ShardCount::new(shard_count),
455 0 : stripe_size: shard_stripe_size
456 0 : .map(ShardStripeSize)
457 0 : .unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
458 0 : },
459 0 : config: tenant_conf,
460 0 : })
461 0 : .await?;
462 0 : println!("tenant {tenant_id} successfully created on the pageserver");
463 :
464 : // Create an initial timeline for the new tenant
465 0 : let new_timeline_id =
466 0 : parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
467 0 : let pg_version = create_match
468 0 : .get_one::<u32>("pg-version")
469 0 : .copied()
470 0 : .context("Failed to parse postgres version from the argument string")?;
471 :
472 : // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
473 : // different shards picking different start lsns. Maybe we have to teach attachment service
474 : // to let shard 0 branch first and then propagate the chosen LSN to other shards.
475 0 : attachment_service
476 0 : .tenant_timeline_create(
477 0 : tenant_id,
478 0 : TimelineCreateRequest {
479 0 : new_timeline_id,
480 0 : ancestor_timeline_id: None,
481 0 : ancestor_start_lsn: None,
482 0 : existing_initdb_timeline_id: None,
483 0 : pg_version: Some(pg_version),
484 0 : },
485 0 : )
486 0 : .await?;
487 :
488 0 : env.register_branch_mapping(
489 0 : DEFAULT_BRANCH_NAME.to_string(),
490 0 : tenant_id,
491 0 : new_timeline_id,
492 0 : )?;
493 :
494 0 : println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
495 0 :
496 0 : if create_match.get_flag("set-default") {
497 0 : println!("Setting tenant {tenant_id} as a default one");
498 0 : env.default_tenant_id = Some(tenant_id);
499 0 : }
500 : }
501 0 : Some(("set-default", set_default_match)) => {
502 0 : let tenant_id =
503 0 : parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
504 0 : println!("Setting tenant {tenant_id} as a default one");
505 0 : env.default_tenant_id = Some(tenant_id);
506 : }
507 0 : Some(("config", create_match)) => {
508 0 : let tenant_id = get_tenant_id(create_match, env)?;
509 0 : let tenant_conf: HashMap<_, _> = create_match
510 0 : .get_many::<String>("config")
511 0 : .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
512 0 : .unwrap_or_default();
513 0 :
514 0 : pageserver
515 0 : .tenant_config(tenant_id, tenant_conf)
516 0 : .await
517 0 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
518 0 : println!("tenant {tenant_id} successfully configured on the pageserver");
519 : }
520 0 : Some(("migrate", matches)) => {
521 0 : let tenant_shard_id = get_tenant_shard_id(matches, env)?;
522 0 : let new_pageserver = get_pageserver(env, matches)?;
523 0 : let new_pageserver_id = new_pageserver.conf.id;
524 0 :
525 0 : let attachment_service = AttachmentService::from_env(env);
526 0 : attachment_service
527 0 : .tenant_migrate(tenant_shard_id, new_pageserver_id)
528 0 : .await?;
529 :
530 0 : println!("tenant {tenant_shard_id} migrated to {}", new_pageserver_id);
531 : }
532 0 : Some(("status", matches)) => {
533 0 : let tenant_id = get_tenant_id(matches, env)?;
534 :
535 0 : let mut shard_table = comfy_table::Table::new();
536 0 : shard_table.set_header(["Shard", "Pageserver", "Physical Size"]);
537 0 :
538 0 : let mut tenant_synthetic_size = None;
539 0 :
540 0 : let attachment_service = AttachmentService::from_env(env);
541 0 : for shard in attachment_service.tenant_locate(tenant_id).await?.shards {
542 0 : let pageserver =
543 0 : PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
544 :
545 0 : let size = pageserver
546 0 : .http_client
547 0 : .tenant_details(shard.shard_id)
548 0 : .await?
549 : .tenant_info
550 : .current_physical_size
551 0 : .unwrap();
552 0 :
553 0 : shard_table.add_row([
554 0 : format!("{}", shard.shard_id.shard_slug()),
555 0 : format!("{}", shard.node_id.0),
556 0 : format!("{} MiB", size / (1024 * 1024)),
557 0 : ]);
558 0 :
559 0 : if shard.shard_id.is_zero() {
560 : tenant_synthetic_size =
561 0 : Some(pageserver.tenant_synthetic_size(shard.shard_id).await?);
562 0 : }
563 : }
564 :
565 0 : let Some(synthetic_size) = tenant_synthetic_size else {
566 0 : bail!("Shard 0 not found")
567 : };
568 :
569 0 : let mut tenant_table = comfy_table::Table::new();
570 0 : tenant_table.add_row(["Tenant ID".to_string(), tenant_id.to_string()]);
571 0 : tenant_table.add_row([
572 0 : "Synthetic size".to_string(),
573 0 : format!("{} MiB", synthetic_size.size.unwrap_or(0) / (1024 * 1024)),
574 0 : ]);
575 0 :
576 0 : println!("{tenant_table}");
577 0 : println!("{shard_table}");
578 : }
579 0 : Some(("shard-split", matches)) => {
580 0 : let tenant_id = get_tenant_id(matches, env)?;
581 0 : let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
582 0 :
583 0 : let attachment_service = AttachmentService::from_env(env);
584 0 : let result = attachment_service
585 0 : .tenant_split(tenant_id, shard_count)
586 0 : .await?;
587 0 : println!(
588 0 : "Split tenant {} into shards {}",
589 0 : tenant_id,
590 0 : result
591 0 : .new_shards
592 0 : .iter()
593 0 : .map(|s| format!("{:?}", s))
594 0 : .collect::<Vec<_>>()
595 0 : .join(",")
596 0 : );
597 : }
598 :
599 0 : Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
600 0 : None => bail!("no tenant subcommand provided"),
601 : }
602 0 : Ok(())
603 0 : }
604 :
605 0 : async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
606 0 : let pageserver = get_default_pageserver(env);
607 0 :
608 0 : match timeline_match.subcommand() {
609 0 : Some(("list", list_match)) => {
610 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
611 : // where shard 0 is attached, and query there.
612 0 : let tenant_shard_id = get_tenant_shard_id(list_match, env)?;
613 0 : let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
614 0 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
615 : }
616 0 : Some(("create", create_match)) => {
617 0 : let tenant_id = get_tenant_id(create_match, env)?;
618 0 : let new_branch_name = create_match
619 0 : .get_one::<String>("branch-name")
620 0 : .ok_or_else(|| anyhow!("No branch name provided"))?;
621 :
622 0 : let pg_version = create_match
623 0 : .get_one::<u32>("pg-version")
624 0 : .copied()
625 0 : .context("Failed to parse postgres version from the argument string")?;
626 :
627 0 : let new_timeline_id_opt = parse_timeline_id(create_match)?;
628 0 : let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
629 0 :
630 0 : let attachment_service = AttachmentService::from_env(env);
631 0 : let create_req = TimelineCreateRequest {
632 0 : new_timeline_id,
633 0 : ancestor_timeline_id: None,
634 0 : existing_initdb_timeline_id: None,
635 0 : ancestor_start_lsn: None,
636 0 : pg_version: Some(pg_version),
637 0 : };
638 0 : let timeline_info = attachment_service
639 0 : .tenant_timeline_create(tenant_id, create_req)
640 0 : .await?;
641 :
642 0 : let last_record_lsn = timeline_info.last_record_lsn;
643 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
644 :
645 0 : println!(
646 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
647 0 : timeline_info.timeline_id
648 0 : );
649 : }
650 0 : Some(("import", import_match)) => {
651 0 : let tenant_id = get_tenant_id(import_match, env)?;
652 0 : let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
653 0 : let name = import_match
654 0 : .get_one::<String>("node-name")
655 0 : .ok_or_else(|| anyhow!("No node name provided"))?;
656 0 : let update_catalog = import_match
657 0 : .get_one::<bool>("update-catalog")
658 0 : .cloned()
659 0 : .unwrap_or_default();
660 :
661 : // Parse base inputs
662 0 : let base_tarfile = import_match
663 0 : .get_one::<PathBuf>("base-tarfile")
664 0 : .ok_or_else(|| anyhow!("No base-tarfile provided"))?
665 0 : .to_owned();
666 0 : let base_lsn = Lsn::from_str(
667 0 : import_match
668 0 : .get_one::<String>("base-lsn")
669 0 : .ok_or_else(|| anyhow!("No base-lsn provided"))?,
670 0 : )?;
671 0 : let base = (base_lsn, base_tarfile);
672 0 :
673 0 : // Parse pg_wal inputs
674 0 : let wal_tarfile = import_match.get_one::<PathBuf>("wal-tarfile").cloned();
675 0 : let end_lsn = import_match
676 0 : .get_one::<String>("end-lsn")
677 0 : .map(|s| Lsn::from_str(s).unwrap());
678 0 : // TODO validate both or none are provided
679 0 : let pg_wal = end_lsn.zip(wal_tarfile);
680 :
681 0 : let pg_version = import_match
682 0 : .get_one::<u32>("pg-version")
683 0 : .copied()
684 0 : .context("Failed to parse postgres version from the argument string")?;
685 :
686 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
687 0 : println!("Importing timeline into pageserver ...");
688 0 : pageserver
689 0 : .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
690 0 : .await?;
691 0 : env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
692 :
693 0 : println!("Creating endpoint for imported timeline ...");
694 0 : cplane.new_endpoint(
695 0 : name,
696 0 : tenant_id,
697 0 : timeline_id,
698 0 : None,
699 0 : None,
700 0 : pg_version,
701 0 : ComputeMode::Primary,
702 0 : !update_catalog,
703 0 : )?;
704 0 : println!("Done");
705 : }
706 0 : Some(("branch", branch_match)) => {
707 0 : let tenant_id = get_tenant_id(branch_match, env)?;
708 0 : let new_branch_name = branch_match
709 0 : .get_one::<String>("branch-name")
710 0 : .ok_or_else(|| anyhow!("No branch name provided"))?;
711 0 : let ancestor_branch_name = branch_match
712 0 : .get_one::<String>("ancestor-branch-name")
713 0 : .map(|s| s.as_str())
714 0 : .unwrap_or(DEFAULT_BRANCH_NAME);
715 0 : let ancestor_timeline_id = env
716 0 : .get_branch_timeline_id(ancestor_branch_name, tenant_id)
717 0 : .ok_or_else(|| {
718 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
719 0 : })?;
720 :
721 0 : let start_lsn = branch_match
722 0 : .get_one::<String>("ancestor-start-lsn")
723 0 : .map(|lsn_str| Lsn::from_str(lsn_str))
724 0 : .transpose()
725 0 : .context("Failed to parse ancestor start Lsn from the request")?;
726 0 : let new_timeline_id = TimelineId::generate();
727 0 : let attachment_service = AttachmentService::from_env(env);
728 0 : let create_req = TimelineCreateRequest {
729 0 : new_timeline_id,
730 0 : ancestor_timeline_id: Some(ancestor_timeline_id),
731 0 : existing_initdb_timeline_id: None,
732 0 : ancestor_start_lsn: start_lsn,
733 0 : pg_version: None,
734 0 : };
735 0 : let timeline_info = attachment_service
736 0 : .tenant_timeline_create(tenant_id, create_req)
737 0 : .await?;
738 :
739 0 : let last_record_lsn = timeline_info.last_record_lsn;
740 0 :
741 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
742 :
743 0 : println!(
744 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
745 0 : timeline_info.timeline_id
746 0 : );
747 : }
748 0 : Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
749 0 : None => bail!("no tenant subcommand provided"),
750 : }
751 :
752 0 : Ok(())
753 0 : }
754 :
755 0 : async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
756 0 : let (sub_name, sub_args) = match ep_match.subcommand() {
757 0 : Some(ep_subcommand_data) => ep_subcommand_data,
758 0 : None => bail!("no endpoint subcommand provided"),
759 : };
760 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
761 :
762 0 : match sub_name {
763 0 : "list" => {
764 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
765 : // where shard 0 is attached, and query there.
766 0 : let tenant_shard_id = get_tenant_shard_id(sub_args, env)?;
767 0 : let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
768 0 : .await
769 0 : .unwrap_or_else(|e| {
770 0 : eprintln!("Failed to load timeline info: {}", e);
771 0 : HashMap::new()
772 0 : });
773 0 :
774 0 : let timeline_name_mappings = env.timeline_name_mappings();
775 0 :
776 0 : let mut table = comfy_table::Table::new();
777 0 :
778 0 : table.load_preset(comfy_table::presets::NOTHING);
779 0 :
780 0 : table.set_header([
781 0 : "ENDPOINT",
782 0 : "ADDRESS",
783 0 : "TIMELINE",
784 0 : "BRANCH NAME",
785 0 : "LSN",
786 0 : "STATUS",
787 0 : ]);
788 :
789 0 : for (endpoint_id, endpoint) in cplane
790 0 : .endpoints
791 0 : .iter()
792 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
793 0 : {
794 0 : let lsn_str = match endpoint.mode {
795 0 : ComputeMode::Static(lsn) => {
796 0 : // -> read-only endpoint
797 0 : // Use the node's LSN.
798 0 : lsn.to_string()
799 : }
800 : _ => {
801 : // -> primary endpoint or hot replica
802 : // Use the LSN at the end of the timeline.
803 0 : timeline_infos
804 0 : .get(&endpoint.timeline_id)
805 0 : .map(|bi| bi.last_record_lsn.to_string())
806 0 : .unwrap_or_else(|| "?".to_string())
807 : }
808 : };
809 :
810 0 : let branch_name = timeline_name_mappings
811 0 : .get(&TenantTimelineId::new(
812 0 : tenant_shard_id.tenant_id,
813 0 : endpoint.timeline_id,
814 0 : ))
815 0 : .map(|name| name.as_str())
816 0 : .unwrap_or("?");
817 0 :
818 0 : table.add_row([
819 0 : endpoint_id.as_str(),
820 0 : &endpoint.pg_address.to_string(),
821 0 : &endpoint.timeline_id.to_string(),
822 0 : branch_name,
823 0 : lsn_str.as_str(),
824 0 : &format!("{}", endpoint.status()),
825 0 : ]);
826 : }
827 :
828 0 : println!("{table}");
829 : }
830 0 : "create" => {
831 0 : let tenant_id = get_tenant_id(sub_args, env)?;
832 0 : let branch_name = sub_args
833 0 : .get_one::<String>("branch-name")
834 0 : .map(|s| s.as_str())
835 0 : .unwrap_or(DEFAULT_BRANCH_NAME);
836 0 : let endpoint_id = sub_args
837 0 : .get_one::<String>("endpoint_id")
838 0 : .map(String::to_string)
839 0 : .unwrap_or_else(|| format!("ep-{branch_name}"));
840 0 : let update_catalog = sub_args
841 0 : .get_one::<bool>("update-catalog")
842 0 : .cloned()
843 0 : .unwrap_or_default();
844 :
845 0 : let lsn = sub_args
846 0 : .get_one::<String>("lsn")
847 0 : .map(|lsn_str| Lsn::from_str(lsn_str))
848 0 : .transpose()
849 0 : .context("Failed to parse Lsn from the request")?;
850 0 : let timeline_id = env
851 0 : .get_branch_timeline_id(branch_name, tenant_id)
852 0 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
853 :
854 0 : let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
855 0 : let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
856 0 : let pg_version = sub_args
857 0 : .get_one::<u32>("pg-version")
858 0 : .copied()
859 0 : .context("Failed to parse postgres version from the argument string")?;
860 :
861 0 : let hot_standby = sub_args
862 0 : .get_one::<bool>("hot-standby")
863 0 : .copied()
864 0 : .unwrap_or(false);
865 :
866 0 : let mode = match (lsn, hot_standby) {
867 0 : (Some(lsn), false) => ComputeMode::Static(lsn),
868 0 : (None, true) => ComputeMode::Replica,
869 0 : (None, false) => ComputeMode::Primary,
870 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
871 : };
872 :
873 0 : match (mode, hot_standby) {
874 : (ComputeMode::Static(_), true) => {
875 0 : bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
876 : }
877 : (ComputeMode::Primary, true) => {
878 0 : bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
879 : }
880 0 : _ => {}
881 0 : }
882 0 :
883 0 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
884 :
885 0 : cplane.new_endpoint(
886 0 : &endpoint_id,
887 0 : tenant_id,
888 0 : timeline_id,
889 0 : pg_port,
890 0 : http_port,
891 0 : pg_version,
892 0 : mode,
893 0 : !update_catalog,
894 0 : )?;
895 : }
896 0 : "start" => {
897 0 : let endpoint_id = sub_args
898 0 : .get_one::<String>("endpoint_id")
899 0 : .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
900 :
901 0 : let pageserver_id =
902 0 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
903 : Some(NodeId(
904 0 : id_str.parse().context("while parsing pageserver id")?,
905 : ))
906 : } else {
907 0 : None
908 : };
909 :
910 0 : let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
911 :
912 : // If --safekeepers argument is given, use only the listed safekeeper nodes.
913 0 : let safekeepers =
914 0 : if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
915 0 : let mut safekeepers: Vec<NodeId> = Vec::new();
916 0 : for sk_id in safekeepers_str.split(',').map(str::trim) {
917 0 : let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
918 0 : anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
919 0 : })?);
920 0 : safekeepers.push(sk_id);
921 : }
922 0 : safekeepers
923 : } else {
924 0 : env.safekeepers.iter().map(|sk| sk.id).collect()
925 : };
926 :
927 0 : let endpoint = cplane
928 0 : .endpoints
929 0 : .get(endpoint_id.as_str())
930 0 : .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
931 :
932 0 : let create_test_user = sub_args
933 0 : .get_one::<bool>("create-test-user")
934 0 : .cloned()
935 0 : .unwrap_or_default();
936 0 :
937 0 : cplane.check_conflicting_endpoints(
938 0 : endpoint.mode,
939 0 : endpoint.tenant_id,
940 0 : endpoint.timeline_id,
941 0 : )?;
942 :
943 0 : let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
944 0 : let conf = env.get_pageserver_conf(pageserver_id).unwrap();
945 0 : let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
946 0 : (
947 0 : vec![(parsed.0, parsed.1.unwrap_or(5432))],
948 0 : // If caller is telling us what pageserver to use, this is not a tenant which is
949 0 : // full managed by attachment service, therefore not sharded.
950 0 : ShardParameters::DEFAULT_STRIPE_SIZE,
951 0 : )
952 : } else {
953 : // Look up the currently attached location of the tenant, and its striping metadata,
954 : // to pass these on to postgres.
955 0 : let attachment_service = AttachmentService::from_env(env);
956 0 : let locate_result = attachment_service.tenant_locate(endpoint.tenant_id).await?;
957 0 : let pageservers = locate_result
958 0 : .shards
959 0 : .into_iter()
960 0 : .map(|shard| {
961 0 : (
962 0 : Host::parse(&shard.listen_pg_addr)
963 0 : .expect("Attachment service reported bad hostname"),
964 0 : shard.listen_pg_port,
965 0 : )
966 0 : })
967 0 : .collect::<Vec<_>>();
968 0 : let stripe_size = locate_result.shard_params.stripe_size;
969 0 :
970 0 : (pageservers, stripe_size)
971 : };
972 0 : assert!(!pageservers.is_empty());
973 :
974 0 : let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
975 0 : let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
976 0 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
977 0 :
978 0 : Some(env.generate_auth_token(&claims)?)
979 : } else {
980 0 : None
981 : };
982 :
983 0 : println!("Starting existing endpoint {endpoint_id}...");
984 0 : endpoint
985 0 : .start(
986 0 : &auth_token,
987 0 : safekeepers,
988 0 : pageservers,
989 0 : remote_ext_config,
990 0 : stripe_size.0 as usize,
991 0 : create_test_user,
992 0 : )
993 0 : .await?;
994 : }
995 0 : "reconfigure" => {
996 0 : let endpoint_id = sub_args
997 0 : .get_one::<String>("endpoint_id")
998 0 : .ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?;
999 0 : let endpoint = cplane
1000 0 : .endpoints
1001 0 : .get(endpoint_id.as_str())
1002 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1003 0 : let pageservers =
1004 0 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
1005 0 : let ps_id = NodeId(id_str.parse().context("while parsing pageserver id")?);
1006 0 : let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
1007 0 : vec![(
1008 0 : pageserver.pg_connection_config.host().clone(),
1009 0 : pageserver.pg_connection_config.port(),
1010 0 : )]
1011 : } else {
1012 0 : let attachment_service = AttachmentService::from_env(env);
1013 0 : attachment_service
1014 0 : .tenant_locate(endpoint.tenant_id)
1015 0 : .await?
1016 : .shards
1017 0 : .into_iter()
1018 0 : .map(|shard| {
1019 0 : (
1020 0 : Host::parse(&shard.listen_pg_addr)
1021 0 : .expect("Attachment service reported malformed host"),
1022 0 : shard.listen_pg_port,
1023 0 : )
1024 0 : })
1025 0 : .collect::<Vec<_>>()
1026 : };
1027 0 : endpoint.reconfigure(pageservers).await?;
1028 : }
1029 0 : "stop" => {
1030 0 : let endpoint_id = sub_args
1031 0 : .get_one::<String>("endpoint_id")
1032 0 : .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
1033 0 : let destroy = sub_args.get_flag("destroy");
1034 0 : let mode = sub_args.get_one::<String>("mode").expect("has a default");
1035 :
1036 0 : let endpoint = cplane
1037 0 : .endpoints
1038 0 : .get(endpoint_id.as_str())
1039 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1040 0 : endpoint.stop(mode, destroy)?;
1041 : }
1042 :
1043 0 : _ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
1044 : }
1045 :
1046 0 : Ok(())
1047 0 : }
1048 :
1049 0 : fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
1050 0 : let (sub_name, sub_args) = match sub_match.subcommand() {
1051 0 : Some(ep_subcommand_data) => ep_subcommand_data,
1052 0 : None => bail!("no mappings subcommand provided"),
1053 : };
1054 :
1055 0 : match sub_name {
1056 0 : "map" => {
1057 0 : let branch_name = sub_args
1058 0 : .get_one::<String>("branch-name")
1059 0 : .expect("branch-name argument missing");
1060 0 :
1061 0 : let tenant_id = sub_args
1062 0 : .get_one::<String>("tenant-id")
1063 0 : .map(|x| TenantId::from_str(x))
1064 0 : .expect("tenant-id argument missing")
1065 0 : .expect("malformed tenant-id arg");
1066 0 :
1067 0 : let timeline_id = sub_args
1068 0 : .get_one::<String>("timeline-id")
1069 0 : .map(|x| TimelineId::from_str(x))
1070 0 : .expect("timeline-id argument missing")
1071 0 : .expect("malformed timeline-id arg");
1072 0 :
1073 0 : env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
1074 :
1075 0 : Ok(())
1076 : }
1077 0 : other => unimplemented!("mappings subcommand {other}"),
1078 : }
1079 0 : }
1080 :
1081 0 : fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
1082 0 : let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
1083 0 : NodeId(id_str.parse().context("while parsing pageserver id")?)
1084 : } else {
1085 0 : DEFAULT_PAGESERVER_ID
1086 : };
1087 :
1088 : Ok(PageServerNode::from_env(
1089 0 : env,
1090 0 : env.get_pageserver_conf(node_id)?,
1091 : ))
1092 0 : }
1093 :
1094 0 : async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1095 0 : match sub_match.subcommand() {
1096 0 : Some(("start", subcommand_args)) => {
1097 0 : let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
1098 0 : if let Err(e) = get_pageserver(env, subcommand_args)?
1099 0 : .start(&pageserver_config_overrides(subcommand_args), *register)
1100 0 : .await
1101 : {
1102 0 : eprintln!("pageserver start failed: {e}");
1103 0 : exit(1);
1104 0 : }
1105 : }
1106 :
1107 0 : Some(("stop", subcommand_args)) => {
1108 0 : let immediate = subcommand_args
1109 0 : .get_one::<String>("stop-mode")
1110 0 : .map(|s| s.as_str())
1111 0 : == Some("immediate");
1112 :
1113 0 : if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) {
1114 0 : eprintln!("pageserver stop failed: {}", e);
1115 0 : exit(1);
1116 0 : }
1117 : }
1118 :
1119 0 : Some(("restart", subcommand_args)) => {
1120 0 : let pageserver = get_pageserver(env, subcommand_args)?;
1121 : //TODO what shutdown strategy should we use here?
1122 0 : if let Err(e) = pageserver.stop(false) {
1123 0 : eprintln!("pageserver stop failed: {}", e);
1124 0 : exit(1);
1125 0 : }
1126 :
1127 0 : if let Err(e) = pageserver
1128 0 : .start(&pageserver_config_overrides(subcommand_args), false)
1129 0 : .await
1130 : {
1131 0 : eprintln!("pageserver start failed: {e}");
1132 0 : exit(1);
1133 0 : }
1134 : }
1135 :
1136 0 : Some(("set-state", subcommand_args)) => {
1137 0 : let pageserver = get_pageserver(env, subcommand_args)?;
1138 0 : let scheduling = subcommand_args.get_one("scheduling");
1139 0 : let availability = subcommand_args.get_one("availability");
1140 0 :
1141 0 : let attachment_service = AttachmentService::from_env(env);
1142 0 : attachment_service
1143 0 : .node_configure(NodeConfigureRequest {
1144 0 : node_id: pageserver.conf.id,
1145 0 : scheduling: scheduling.cloned(),
1146 0 : availability: availability.cloned(),
1147 0 : })
1148 0 : .await?;
1149 : }
1150 :
1151 0 : Some(("status", subcommand_args)) => {
1152 0 : match get_pageserver(env, subcommand_args)?.check_status().await {
1153 0 : Ok(_) => println!("Page server is up and running"),
1154 0 : Err(err) => {
1155 0 : eprintln!("Page server is not available: {}", err);
1156 0 : exit(1);
1157 : }
1158 : }
1159 : }
1160 :
1161 0 : Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
1162 0 : None => bail!("no pageserver subcommand provided"),
1163 : }
1164 0 : Ok(())
1165 0 : }
1166 :
1167 0 : async fn handle_attachment_service(
1168 0 : sub_match: &ArgMatches,
1169 0 : env: &local_env::LocalEnv,
1170 0 : ) -> Result<()> {
1171 0 : let svc = AttachmentService::from_env(env);
1172 0 : match sub_match.subcommand() {
1173 0 : Some(("start", _start_match)) => {
1174 0 : if let Err(e) = svc.start().await {
1175 0 : eprintln!("start failed: {e}");
1176 0 : exit(1);
1177 0 : }
1178 : }
1179 :
1180 0 : Some(("stop", stop_match)) => {
1181 0 : let immediate = stop_match
1182 0 : .get_one::<String>("stop-mode")
1183 0 : .map(|s| s.as_str())
1184 0 : == Some("immediate");
1185 :
1186 0 : if let Err(e) = svc.stop(immediate).await {
1187 0 : eprintln!("stop failed: {}", e);
1188 0 : exit(1);
1189 0 : }
1190 : }
1191 0 : Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
1192 0 : None => bail!("no attachment_service subcommand provided"),
1193 : }
1194 0 : Ok(())
1195 0 : }
1196 :
1197 0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1198 0 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1199 0 : Ok(SafekeeperNode::from_env(env, node))
1200 : } else {
1201 0 : bail!("could not find safekeeper {id}")
1202 : }
1203 0 : }
1204 :
1205 : // Get list of options to append to safekeeper command invocation.
1206 0 : fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec<String> {
1207 0 : init_match
1208 0 : .get_many::<String>("safekeeper-extra-opt")
1209 0 : .into_iter()
1210 0 : .flatten()
1211 0 : .map(|s| s.to_owned())
1212 0 : .collect()
1213 0 : }
1214 :
1215 0 : async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1216 0 : let (sub_name, sub_args) = match sub_match.subcommand() {
1217 0 : Some(safekeeper_command_data) => safekeeper_command_data,
1218 0 : None => bail!("no safekeeper subcommand provided"),
1219 : };
1220 :
1221 : // All the commands take an optional safekeeper name argument
1222 0 : let sk_id = if let Some(id_str) = sub_args.get_one::<String>("id") {
1223 0 : NodeId(id_str.parse().context("while parsing safekeeper id")?)
1224 : } else {
1225 0 : DEFAULT_SAFEKEEPER_ID
1226 : };
1227 0 : let safekeeper = get_safekeeper(env, sk_id)?;
1228 :
1229 0 : match sub_name {
1230 0 : "start" => {
1231 0 : let extra_opts = safekeeper_extra_opts(sub_args);
1232 :
1233 0 : if let Err(e) = safekeeper.start(extra_opts).await {
1234 0 : eprintln!("safekeeper start failed: {}", e);
1235 0 : exit(1);
1236 0 : }
1237 : }
1238 :
1239 0 : "stop" => {
1240 0 : let immediate =
1241 0 : sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1242 :
1243 0 : if let Err(e) = safekeeper.stop(immediate) {
1244 0 : eprintln!("safekeeper stop failed: {}", e);
1245 0 : exit(1);
1246 0 : }
1247 : }
1248 :
1249 0 : "restart" => {
1250 0 : let immediate =
1251 0 : sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1252 :
1253 0 : if let Err(e) = safekeeper.stop(immediate) {
1254 0 : eprintln!("safekeeper stop failed: {}", e);
1255 0 : exit(1);
1256 0 : }
1257 0 :
1258 0 : let extra_opts = safekeeper_extra_opts(sub_args);
1259 0 : if let Err(e) = safekeeper.start(extra_opts).await {
1260 0 : eprintln!("safekeeper start failed: {}", e);
1261 0 : exit(1);
1262 0 : }
1263 : }
1264 :
1265 : _ => {
1266 0 : bail!("Unexpected safekeeper subcommand '{}'", sub_name)
1267 : }
1268 : }
1269 0 : Ok(())
1270 0 : }
1271 :
1272 0 : async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
1273 0 : // Endpoints are not started automatically
1274 0 :
1275 0 : broker::start_broker_process(env).await?;
1276 :
1277 : // Only start the attachment service if the pageserver is configured to need it
1278 0 : if env.control_plane_api.is_some() {
1279 0 : let attachment_service = AttachmentService::from_env(env);
1280 0 : if let Err(e) = attachment_service.start().await {
1281 0 : eprintln!("attachment_service start failed: {:#}", e);
1282 0 : try_stop_all(env, true).await;
1283 0 : exit(1);
1284 0 : }
1285 0 : }
1286 :
1287 0 : for ps_conf in &env.pageservers {
1288 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1289 0 : if let Err(e) = pageserver
1290 0 : .start(&pageserver_config_overrides(sub_match), true)
1291 0 : .await
1292 : {
1293 0 : eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
1294 0 : try_stop_all(env, true).await;
1295 0 : exit(1);
1296 0 : }
1297 : }
1298 :
1299 0 : for node in env.safekeepers.iter() {
1300 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1301 0 : if let Err(e) = safekeeper.start(vec![]).await {
1302 0 : eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
1303 0 : try_stop_all(env, false).await;
1304 0 : exit(1);
1305 0 : }
1306 : }
1307 0 : Ok(())
1308 0 : }
1309 :
1310 0 : async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1311 0 : let immediate =
1312 0 : sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1313 0 :
1314 0 : try_stop_all(env, immediate).await;
1315 :
1316 0 : Ok(())
1317 0 : }
1318 :
1319 0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
1320 0 : // Stop all endpoints
1321 0 : match ComputeControlPlane::load(env.clone()) {
1322 0 : Ok(cplane) => {
1323 0 : for (_k, node) in cplane.endpoints {
1324 0 : if let Err(e) = node.stop(if immediate { "immediate" } else { "fast " }, false) {
1325 0 : eprintln!("postgres stop failed: {e:#}");
1326 0 : }
1327 : }
1328 : }
1329 0 : Err(e) => {
1330 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
1331 : }
1332 : }
1333 :
1334 0 : for ps_conf in &env.pageservers {
1335 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1336 0 : if let Err(e) = pageserver.stop(immediate) {
1337 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
1338 0 : }
1339 : }
1340 :
1341 0 : for node in env.safekeepers.iter() {
1342 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1343 0 : if let Err(e) = safekeeper.stop(immediate) {
1344 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
1345 0 : }
1346 : }
1347 :
1348 0 : if let Err(e) = broker::stop_broker_process(env) {
1349 0 : eprintln!("neon broker stop failed: {e:#}");
1350 0 : }
1351 :
1352 0 : if env.control_plane_api.is_some() {
1353 0 : let attachment_service = AttachmentService::from_env(env);
1354 0 : if let Err(e) = attachment_service.stop(immediate).await {
1355 0 : eprintln!("attachment service stop failed: {e:#}");
1356 0 : }
1357 0 : }
1358 0 : }
1359 :
1360 2 : fn cli() -> Command {
1361 2 : let branch_name_arg = Arg::new("branch-name")
1362 2 : .long("branch-name")
1363 2 : .help("Name of the branch to be created or used as an alias for other services")
1364 2 : .required(false);
1365 2 :
1366 2 : let endpoint_id_arg = Arg::new("endpoint_id")
1367 2 : .help("Postgres endpoint id")
1368 2 : .required(false);
1369 2 :
1370 2 : let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
1371 2 :
1372 2 : // --id, when using a pageserver command
1373 2 : let pageserver_id_arg = Arg::new("pageserver-id")
1374 2 : .long("id")
1375 2 : .global(true)
1376 2 : .help("pageserver id")
1377 2 : .required(false);
1378 2 : // --pageserver-id when using a non-pageserver command
1379 2 : let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id")
1380 2 : .long("pageserver-id")
1381 2 : .required(false);
1382 2 :
1383 2 : let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt")
1384 2 : .short('e')
1385 2 : .long("safekeeper-extra-opt")
1386 2 : .num_args(1)
1387 2 : .action(ArgAction::Append)
1388 2 : .help("Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo")
1389 2 : .required(false);
1390 2 :
1391 2 : let tenant_id_arg = Arg::new("tenant-id")
1392 2 : .long("tenant-id")
1393 2 : .help("Tenant id. Represented as a hexadecimal string 32 symbols length")
1394 2 : .required(false);
1395 2 :
1396 2 : let timeline_id_arg = Arg::new("timeline-id")
1397 2 : .long("timeline-id")
1398 2 : .help("Timeline id. Represented as a hexadecimal string 32 symbols length")
1399 2 : .required(false);
1400 2 :
1401 2 : let pg_version_arg = Arg::new("pg-version")
1402 2 : .long("pg-version")
1403 2 : .help("Postgres version to use for the initial tenant")
1404 2 : .required(false)
1405 2 : .value_parser(value_parser!(u32))
1406 2 : .default_value(DEFAULT_PG_VERSION);
1407 2 :
1408 2 : let pg_port_arg = Arg::new("pg-port")
1409 2 : .long("pg-port")
1410 2 : .required(false)
1411 2 : .value_parser(value_parser!(u16))
1412 2 : .value_name("pg-port");
1413 2 :
1414 2 : let http_port_arg = Arg::new("http-port")
1415 2 : .long("http-port")
1416 2 : .required(false)
1417 2 : .value_parser(value_parser!(u16))
1418 2 : .value_name("http-port");
1419 2 :
1420 2 : let safekeepers_arg = Arg::new("safekeepers")
1421 2 : .long("safekeepers")
1422 2 : .required(false)
1423 2 : .value_name("safekeepers");
1424 2 :
1425 2 : let stop_mode_arg = Arg::new("stop-mode")
1426 2 : .short('m')
1427 2 : .value_parser(["fast", "immediate"])
1428 2 : .default_value("fast")
1429 2 : .help("If 'immediate', don't flush repository data at shutdown")
1430 2 : .required(false)
1431 2 : .value_name("stop-mode");
1432 2 :
1433 2 : let pageserver_config_args = Arg::new("pageserver-config-override")
1434 2 : .long("pageserver-config-override")
1435 2 : .num_args(1)
1436 2 : .action(ArgAction::Append)
1437 2 : .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
1438 2 : .required(false);
1439 2 :
1440 2 : let remote_ext_config_args = Arg::new("remote-ext-config")
1441 2 : .long("remote-ext-config")
1442 2 : .num_args(1)
1443 2 : .help("Configure the remote extensions storage proxy gateway to request for extensions.")
1444 2 : .required(false);
1445 2 :
1446 2 : let lsn_arg = Arg::new("lsn")
1447 2 : .long("lsn")
1448 2 : .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
1449 2 : .required(false);
1450 2 :
1451 2 : let hot_standby_arg = Arg::new("hot-standby")
1452 2 : .value_parser(value_parser!(bool))
1453 2 : .long("hot-standby")
1454 2 : .help("If set, the node will be a hot replica on the specified timeline")
1455 2 : .required(false);
1456 2 :
1457 2 : let force_arg = Arg::new("force")
1458 2 : .value_parser(value_parser!(InitForceMode))
1459 2 : .long("force")
1460 2 : .default_value(
1461 2 : InitForceMode::MustNotExist
1462 2 : .to_possible_value()
1463 2 : .unwrap()
1464 2 : .get_name()
1465 2 : .to_owned(),
1466 2 : )
1467 2 : .help("Force initialization even if the repository is not empty")
1468 2 : .required(false);
1469 2 :
1470 2 : let num_pageservers_arg = Arg::new("num-pageservers")
1471 2 : .value_parser(value_parser!(u16))
1472 2 : .long("num-pageservers")
1473 2 : .help("How many pageservers to create (default 1)")
1474 2 : .required(false)
1475 2 : .default_value("1");
1476 2 :
1477 2 : let update_catalog = Arg::new("update-catalog")
1478 2 : .value_parser(value_parser!(bool))
1479 2 : .long("update-catalog")
1480 2 : .help("If set, will set up the catalog for neon_superuser")
1481 2 : .required(false);
1482 2 :
1483 2 : let create_test_user = Arg::new("create-test-user")
1484 2 : .value_parser(value_parser!(bool))
1485 2 : .long("create-test-user")
1486 2 : .help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`")
1487 2 : .required(false);
1488 2 :
1489 2 : Command::new("Neon CLI")
1490 2 : .arg_required_else_help(true)
1491 2 : .version(GIT_VERSION)
1492 2 : .subcommand(
1493 2 : Command::new("init")
1494 2 : .about("Initialize a new Neon repository, preparing configs for services to start with")
1495 2 : .arg(pageserver_config_args.clone())
1496 2 : .arg(num_pageservers_arg.clone())
1497 2 : .arg(
1498 2 : Arg::new("config")
1499 2 : .long("config")
1500 2 : .required(false)
1501 2 : .value_parser(value_parser!(PathBuf))
1502 2 : .value_name("config"),
1503 2 : )
1504 2 : .arg(pg_version_arg.clone())
1505 2 : .arg(force_arg)
1506 2 : )
1507 2 : .subcommand(
1508 2 : Command::new("timeline")
1509 2 : .about("Manage timelines")
1510 2 : .subcommand(Command::new("list")
1511 2 : .about("List all timelines, available to this pageserver")
1512 2 : .arg(tenant_id_arg.clone()))
1513 2 : .subcommand(Command::new("branch")
1514 2 : .about("Create a new timeline, using another timeline as a base, copying its data")
1515 2 : .arg(tenant_id_arg.clone())
1516 2 : .arg(branch_name_arg.clone())
1517 2 : .arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
1518 2 : .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
1519 2 : .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
1520 2 : .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
1521 2 : .subcommand(Command::new("create")
1522 2 : .about("Create a new blank timeline")
1523 2 : .arg(tenant_id_arg.clone())
1524 2 : .arg(timeline_id_arg.clone())
1525 2 : .arg(branch_name_arg.clone())
1526 2 : .arg(pg_version_arg.clone())
1527 2 : )
1528 2 : .subcommand(Command::new("import")
1529 2 : .about("Import timeline from basebackup directory")
1530 2 : .arg(tenant_id_arg.clone())
1531 2 : .arg(timeline_id_arg.clone())
1532 2 : .arg(Arg::new("node-name").long("node-name")
1533 2 : .help("Name to assign to the imported timeline"))
1534 2 : .arg(Arg::new("base-tarfile")
1535 2 : .long("base-tarfile")
1536 2 : .value_parser(value_parser!(PathBuf))
1537 2 : .help("Basebackup tarfile to import")
1538 2 : )
1539 2 : .arg(Arg::new("base-lsn").long("base-lsn")
1540 2 : .help("Lsn the basebackup starts at"))
1541 2 : .arg(Arg::new("wal-tarfile")
1542 2 : .long("wal-tarfile")
1543 2 : .value_parser(value_parser!(PathBuf))
1544 2 : .help("Wal to add after base")
1545 2 : )
1546 2 : .arg(Arg::new("end-lsn").long("end-lsn")
1547 2 : .help("Lsn the basebackup ends at"))
1548 2 : .arg(pg_version_arg.clone())
1549 2 : .arg(update_catalog.clone())
1550 2 : )
1551 2 : ).subcommand(
1552 2 : Command::new("tenant")
1553 2 : .arg_required_else_help(true)
1554 2 : .about("Manage tenants")
1555 2 : .subcommand(Command::new("list"))
1556 2 : .subcommand(Command::new("create")
1557 2 : .arg(tenant_id_arg.clone())
1558 2 : .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
1559 2 : .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))
1560 2 : .arg(pg_version_arg.clone())
1561 2 : .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
1562 2 : .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
1563 2 : .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
1564 2 : .arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
1565 2 : )
1566 2 : .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
1567 2 : .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
1568 2 : .subcommand(Command::new("config")
1569 2 : .arg(tenant_id_arg.clone())
1570 2 : .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
1571 2 : .subcommand(Command::new("migrate")
1572 2 : .about("Migrate a tenant from one pageserver to another")
1573 2 : .arg(tenant_id_arg.clone())
1574 2 : .arg(pageserver_id_arg.clone()))
1575 2 : .subcommand(Command::new("status")
1576 2 : .about("Human readable summary of the tenant's shards and attachment locations")
1577 2 : .arg(tenant_id_arg.clone()))
1578 2 : .subcommand(Command::new("shard-split")
1579 2 : .about("Increase the number of shards in the tenant")
1580 2 : .arg(tenant_id_arg.clone())
1581 2 : .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
1582 2 : )
1583 2 : )
1584 2 : .subcommand(
1585 2 : Command::new("pageserver")
1586 2 : .arg_required_else_help(true)
1587 2 : .about("Manage pageserver")
1588 2 : .arg(pageserver_id_arg)
1589 2 : .subcommand(Command::new("status"))
1590 2 : .subcommand(Command::new("start")
1591 2 : .about("Start local pageserver")
1592 2 : .arg(pageserver_config_args.clone()).arg(Arg::new("register")
1593 2 : .long("register")
1594 2 : .default_value("true").required(false)
1595 2 : .value_parser(value_parser!(bool))
1596 2 : .value_name("register"))
1597 2 : )
1598 2 : .subcommand(Command::new("stop")
1599 2 : .about("Stop local pageserver")
1600 2 : .arg(stop_mode_arg.clone())
1601 2 : )
1602 2 : .subcommand(Command::new("restart")
1603 2 : .about("Restart local pageserver")
1604 2 : .arg(pageserver_config_args.clone())
1605 2 : )
1606 2 : .subcommand(Command::new("set-state")
1607 2 : .arg(Arg::new("availability").value_parser(value_parser!(NodeAvailability)).long("availability").action(ArgAction::Set).help("Availability state: offline,active"))
1608 2 : .arg(Arg::new("scheduling").value_parser(value_parser!(NodeSchedulingPolicy)).long("scheduling").action(ArgAction::Set).help("Scheduling state: draining,pause,filling,active"))
1609 2 : .about("Set scheduling or availability state of pageserver node")
1610 2 : .arg(pageserver_config_args.clone())
1611 2 : )
1612 2 : )
1613 2 : .subcommand(
1614 2 : Command::new("attachment_service")
1615 2 : .arg_required_else_help(true)
1616 2 : .about("Manage attachment_service")
1617 2 : .subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
1618 2 : .subcommand(Command::new("stop").about("Stop local pageserver")
1619 2 : .arg(stop_mode_arg.clone()))
1620 2 : )
1621 2 : .subcommand(
1622 2 : Command::new("safekeeper")
1623 2 : .arg_required_else_help(true)
1624 2 : .about("Manage safekeepers")
1625 2 : .subcommand(Command::new("start")
1626 2 : .about("Start local safekeeper")
1627 2 : .arg(safekeeper_id_arg.clone())
1628 2 : .arg(safekeeper_extra_opt_arg.clone())
1629 2 : )
1630 2 : .subcommand(Command::new("stop")
1631 2 : .about("Stop local safekeeper")
1632 2 : .arg(safekeeper_id_arg.clone())
1633 2 : .arg(stop_mode_arg.clone())
1634 2 : )
1635 2 : .subcommand(Command::new("restart")
1636 2 : .about("Restart local safekeeper")
1637 2 : .arg(safekeeper_id_arg)
1638 2 : .arg(stop_mode_arg.clone())
1639 2 : .arg(safekeeper_extra_opt_arg)
1640 2 : )
1641 2 : )
1642 2 : .subcommand(
1643 2 : Command::new("endpoint")
1644 2 : .arg_required_else_help(true)
1645 2 : .about("Manage postgres instances")
1646 2 : .subcommand(Command::new("list").arg(tenant_id_arg.clone()))
1647 2 : .subcommand(Command::new("create")
1648 2 : .about("Create a compute endpoint")
1649 2 : .arg(endpoint_id_arg.clone())
1650 2 : .arg(branch_name_arg.clone())
1651 2 : .arg(tenant_id_arg.clone())
1652 2 : .arg(lsn_arg.clone())
1653 2 : .arg(pg_port_arg.clone())
1654 2 : .arg(http_port_arg.clone())
1655 2 : .arg(endpoint_pageserver_id_arg.clone())
1656 2 : .arg(
1657 2 : Arg::new("config-only")
1658 2 : .help("Don't do basebackup, create endpoint directory with only config files")
1659 2 : .long("config-only")
1660 2 : .required(false))
1661 2 : .arg(pg_version_arg.clone())
1662 2 : .arg(hot_standby_arg.clone())
1663 2 : .arg(update_catalog)
1664 2 : )
1665 2 : .subcommand(Command::new("start")
1666 2 : .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
1667 2 : .arg(endpoint_id_arg.clone())
1668 2 : .arg(endpoint_pageserver_id_arg.clone())
1669 2 : .arg(safekeepers_arg)
1670 2 : .arg(remote_ext_config_args)
1671 2 : .arg(create_test_user)
1672 2 : )
1673 2 : .subcommand(Command::new("reconfigure")
1674 2 : .about("Reconfigure the endpoint")
1675 2 : .arg(endpoint_pageserver_id_arg)
1676 2 : .arg(endpoint_id_arg.clone())
1677 2 : .arg(tenant_id_arg.clone())
1678 2 : )
1679 2 : .subcommand(
1680 2 : Command::new("stop")
1681 2 : .arg(endpoint_id_arg)
1682 2 : .arg(
1683 2 : Arg::new("destroy")
1684 2 : .help("Also delete data directory (now optional, should be default in future)")
1685 2 : .long("destroy")
1686 2 : .action(ArgAction::SetTrue)
1687 2 : .required(false)
1688 2 : )
1689 2 : .arg(
1690 2 : Arg::new("mode")
1691 2 : .help("Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")
1692 2 : .long("mode")
1693 2 : .action(ArgAction::Set)
1694 2 : .required(false)
1695 2 : .value_parser(["smart", "fast", "immediate"])
1696 2 : .default_value("fast")
1697 2 : )
1698 2 : )
1699 2 :
1700 2 : )
1701 2 : .subcommand(
1702 2 : Command::new("mappings")
1703 2 : .arg_required_else_help(true)
1704 2 : .about("Manage neon_local branch name mappings")
1705 2 : .subcommand(
1706 2 : Command::new("map")
1707 2 : .about("Create new mapping which cannot exist already")
1708 2 : .arg(branch_name_arg.clone())
1709 2 : .arg(tenant_id_arg.clone())
1710 2 : .arg(timeline_id_arg.clone())
1711 2 : )
1712 2 : )
1713 2 : // Obsolete old name for 'endpoint'. We now just print an error if it's used.
1714 2 : .subcommand(
1715 2 : Command::new("pg")
1716 2 : .hide(true)
1717 2 : .arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
1718 2 : .trailing_var_arg(true)
1719 2 : )
1720 2 : .subcommand(
1721 2 : Command::new("start")
1722 2 : .about("Start page server and safekeepers")
1723 2 : .arg(pageserver_config_args)
1724 2 : )
1725 2 : .subcommand(
1726 2 : Command::new("stop")
1727 2 : .about("Stop page server and safekeepers")
1728 2 : .arg(stop_mode_arg)
1729 2 : )
1730 2 : }
1731 :
1732 2 : #[test]
1733 2 : fn verify_cli() {
1734 2 : cli().debug_assert();
1735 2 : }
|