Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use anyhow::{anyhow, bail, Context, Result};
9 : use clap::{value_parser, Arg, ArgAction, ArgMatches, Command, ValueEnum};
10 : use compute_api::spec::ComputeMode;
11 : use control_plane::endpoint::ComputeControlPlane;
12 : use control_plane::local_env::{
13 : InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
14 : SafekeeperConf,
15 : };
16 : use control_plane::pageserver::PageServerNode;
17 : use control_plane::safekeeper::SafekeeperNode;
18 : use control_plane::storage_controller::{
19 : NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
20 : };
21 : use control_plane::{broker, local_env};
22 : use pageserver_api::config::{
23 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
24 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
25 : };
26 : use pageserver_api::controller_api::{
27 : NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
28 : };
29 : use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
30 : use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
31 : use postgres_backend::AuthType;
32 : use postgres_connection::parse_host_port;
33 : use safekeeper_api::{
34 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
35 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
36 : };
37 : use std::borrow::Cow;
38 : use std::collections::{BTreeSet, HashMap};
39 : use std::path::PathBuf;
40 : use std::process::exit;
41 : use std::str::FromStr;
42 : use std::time::Duration;
43 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
44 : use tokio::task::JoinSet;
45 : use url::Host;
46 : use utils::{
47 : auth::{Claims, Scope},
48 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
49 : lsn::Lsn,
50 : project_git_version,
51 : };
52 :
53 : // Default id of a safekeeper node, if not specified on the command line.
54 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
55 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
56 : const DEFAULT_BRANCH_NAME: &str = "main";
57 : project_git_version!(GIT_VERSION);
58 :
59 : const DEFAULT_PG_VERSION: &str = "16";
60 :
61 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
62 :
63 : ///
64 : /// Timelines tree element used as a value in the HashMap.
65 : ///
66 : struct TimelineTreeEl {
67 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
68 : pub info: TimelineInfo,
69 : /// Name, recovered from neon config mappings
70 : pub name: Option<String>,
71 : /// Holds all direct children of this timeline referenced using `timeline_id`.
72 : pub children: BTreeSet<TimelineId>,
73 : }
74 :
75 : // Main entry point for the 'neon_local' CLI utility
76 : //
77 : // This utility helps to manage neon installation. That includes following:
78 : // * Management of local postgres installations running on top of the
79 : // pageserver.
80 : // * Providing CLI api to the pageserver
81 : // * TODO: export/import to/from usual postgres
82 0 : fn main() -> Result<()> {
83 0 : let matches = cli().get_matches();
84 :
85 0 : let (sub_name, sub_args) = match matches.subcommand() {
86 0 : Some(subcommand_data) => subcommand_data,
87 0 : None => bail!("no subcommand provided"),
88 : };
89 :
90 : // Check for 'neon init' command first.
91 0 : let subcommand_result = if sub_name == "init" {
92 0 : handle_init(sub_args).map(|env| Some(Cow::Owned(env)))
93 : } else {
94 : // all other commands need an existing config
95 :
96 0 : let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
97 0 : let original_env = env.clone();
98 0 : let env = Box::leak(Box::new(env));
99 0 : let rt = tokio::runtime::Builder::new_current_thread()
100 0 : .enable_all()
101 0 : .build()
102 0 : .unwrap();
103 :
104 0 : let subcommand_result = match sub_name {
105 0 : "tenant" => rt.block_on(handle_tenant(sub_args, env)),
106 0 : "timeline" => rt.block_on(handle_timeline(sub_args, env)),
107 0 : "start" => rt.block_on(handle_start_all(env, get_start_timeout(sub_args))),
108 0 : "stop" => rt.block_on(handle_stop_all(sub_args, env)),
109 0 : "pageserver" => rt.block_on(handle_pageserver(sub_args, env)),
110 0 : "storage_controller" => rt.block_on(handle_storage_controller(sub_args, env)),
111 0 : "storage_broker" => rt.block_on(handle_storage_broker(sub_args, env)),
112 0 : "safekeeper" => rt.block_on(handle_safekeeper(sub_args, env)),
113 0 : "endpoint" => rt.block_on(handle_endpoint(sub_args, env)),
114 0 : "mappings" => handle_mappings(sub_args, env),
115 0 : "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
116 0 : _ => bail!("unexpected subcommand {sub_name}"),
117 : };
118 :
119 0 : if &original_env != env {
120 0 : subcommand_result.map(|()| Some(Cow::Borrowed(env)))
121 : } else {
122 0 : subcommand_result.map(|()| None)
123 : }
124 : };
125 :
126 0 : match subcommand_result {
127 0 : Ok(Some(updated_env)) => updated_env.persist_config()?,
128 0 : Ok(None) => (),
129 0 : Err(e) => {
130 0 : eprintln!("command failed: {e:?}");
131 0 : exit(1);
132 : }
133 : }
134 0 : Ok(())
135 0 : }
136 :
137 : ///
138 : /// Prints timelines list as a tree-like structure.
139 : ///
140 0 : fn print_timelines_tree(
141 0 : timelines: Vec<TimelineInfo>,
142 0 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
143 0 : ) -> Result<()> {
144 0 : let mut timelines_hash = timelines
145 0 : .iter()
146 0 : .map(|t| {
147 0 : (
148 0 : t.timeline_id,
149 0 : TimelineTreeEl {
150 0 : info: t.clone(),
151 0 : children: BTreeSet::new(),
152 0 : name: timeline_name_mappings
153 0 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
154 0 : },
155 0 : )
156 0 : })
157 0 : .collect::<HashMap<_, _>>();
158 :
159 : // Memorize all direct children of each timeline.
160 0 : for timeline in timelines.iter() {
161 0 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
162 0 : timelines_hash
163 0 : .get_mut(&ancestor_timeline_id)
164 0 : .context("missing timeline info in the HashMap")?
165 : .children
166 0 : .insert(timeline.timeline_id);
167 0 : }
168 : }
169 :
170 0 : for timeline in timelines_hash.values() {
171 : // Start with root local timelines (no ancestors) first.
172 0 : if timeline.info.ancestor_timeline_id.is_none() {
173 0 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
174 0 : }
175 : }
176 :
177 0 : Ok(())
178 0 : }
179 :
180 : ///
181 : /// Recursively prints timeline info with all its children.
182 : ///
183 0 : fn print_timeline(
184 0 : nesting_level: usize,
185 0 : is_last: &[bool],
186 0 : timeline: &TimelineTreeEl,
187 0 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
188 0 : ) -> Result<()> {
189 0 : if nesting_level > 0 {
190 0 : let ancestor_lsn = match timeline.info.ancestor_lsn {
191 0 : Some(lsn) => lsn.to_string(),
192 0 : None => "Unknown Lsn".to_string(),
193 : };
194 :
195 0 : let mut br_sym = "┣━";
196 0 :
197 0 : // Draw each nesting padding with proper style
198 0 : // depending on whether its timeline ended or not.
199 0 : if nesting_level > 1 {
200 0 : for l in &is_last[1..is_last.len() - 1] {
201 0 : if *l {
202 0 : print!(" ");
203 0 : } else {
204 0 : print!("┃ ");
205 0 : }
206 : }
207 0 : }
208 :
209 : // We are the last in this sub-timeline
210 0 : if *is_last.last().unwrap() {
211 0 : br_sym = "┗━";
212 0 : }
213 :
214 0 : print!("{} @{}: ", br_sym, ancestor_lsn);
215 0 : }
216 :
217 : // Finally print a timeline id and name with new line
218 0 : println!(
219 0 : "{} [{}]",
220 0 : timeline.name.as_deref().unwrap_or("_no_name_"),
221 0 : timeline.info.timeline_id
222 0 : );
223 0 :
224 0 : let len = timeline.children.len();
225 0 : let mut i: usize = 0;
226 0 : let mut is_last_new = Vec::from(is_last);
227 0 : is_last_new.push(false);
228 :
229 0 : for child in &timeline.children {
230 0 : i += 1;
231 0 :
232 0 : // Mark that the last padding is the end of the timeline
233 0 : if i == len {
234 0 : if let Some(last) = is_last_new.last_mut() {
235 0 : *last = true;
236 0 : }
237 0 : }
238 :
239 : print_timeline(
240 0 : nesting_level + 1,
241 0 : &is_last_new,
242 0 : timelines
243 0 : .get(child)
244 0 : .context("missing timeline info in the HashMap")?,
245 0 : timelines,
246 0 : )?;
247 : }
248 :
249 0 : Ok(())
250 0 : }
251 :
252 : /// Returns a map of timeline IDs to timeline_id@lsn strings.
253 : /// Connects to the pageserver to query this information.
254 0 : async fn get_timeline_infos(
255 0 : env: &local_env::LocalEnv,
256 0 : tenant_shard_id: &TenantShardId,
257 0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
258 0 : Ok(get_default_pageserver(env)
259 0 : .timeline_list(tenant_shard_id)
260 0 : .await?
261 0 : .into_iter()
262 0 : .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
263 0 : .collect())
264 0 : }
265 :
266 : // Helper function to parse --tenant_id option, or get the default from config file
267 0 : fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
268 0 : if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
269 0 : tenant_id_from_arguments
270 0 : } else if let Some(default_id) = env.default_tenant_id {
271 0 : Ok(default_id)
272 : } else {
273 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
274 : }
275 0 : }
276 :
277 : // Helper function to parse --tenant_id option, for commands that accept a shard suffix
278 0 : fn get_tenant_shard_id(
279 0 : sub_match: &ArgMatches,
280 0 : env: &local_env::LocalEnv,
281 0 : ) -> anyhow::Result<TenantShardId> {
282 0 : if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
283 0 : tenant_id_from_arguments
284 0 : } else if let Some(default_id) = env.default_tenant_id {
285 0 : Ok(TenantShardId::unsharded(default_id))
286 : } else {
287 0 : anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
288 : }
289 0 : }
290 :
291 0 : fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
292 0 : sub_match
293 0 : .get_one::<String>("tenant-id")
294 0 : .map(|tenant_id| TenantId::from_str(tenant_id))
295 0 : .transpose()
296 0 : .context("Failed to parse tenant id from the argument string")
297 0 : }
298 :
299 0 : fn parse_tenant_shard_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantShardId>> {
300 0 : sub_match
301 0 : .get_one::<String>("tenant-id")
302 0 : .map(|id_str| TenantShardId::from_str(id_str))
303 0 : .transpose()
304 0 : .context("Failed to parse tenant shard id from the argument string")
305 0 : }
306 :
307 0 : fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
308 0 : sub_match
309 0 : .get_one::<String>("timeline-id")
310 0 : .map(|timeline_id| TimelineId::from_str(timeline_id))
311 0 : .transpose()
312 0 : .context("Failed to parse timeline id from the argument string")
313 0 : }
314 :
315 0 : fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
316 0 : let num_pageservers = init_match.get_one::<u16>("num-pageservers");
317 0 :
318 0 : let force = init_match.get_one("force").expect("we set a default value");
319 :
320 : // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
321 0 : let init_conf: NeonLocalInitConf = if let Some(config_path) =
322 0 : init_match.get_one::<PathBuf>("config")
323 : {
324 : // User (likely the Python test suite) provided a description of the environment.
325 0 : if num_pageservers.is_some() {
326 0 : bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
327 0 : }
328 : // load and parse the file
329 0 : let contents = std::fs::read_to_string(config_path).with_context(|| {
330 0 : format!(
331 0 : "Could not read configuration file '{}'",
332 0 : config_path.display()
333 0 : )
334 0 : })?;
335 0 : toml_edit::de::from_str(&contents)?
336 : } else {
337 : // User (likely interactive) did not provide a description of the environment, give them the default
338 0 : NeonLocalInitConf {
339 0 : control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
340 0 : broker: NeonBroker {
341 0 : listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
342 0 : },
343 0 : safekeepers: vec![SafekeeperConf {
344 0 : id: DEFAULT_SAFEKEEPER_ID,
345 0 : pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
346 0 : http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
347 0 : ..Default::default()
348 0 : }],
349 0 : pageservers: (0..num_pageservers.copied().unwrap_or(1))
350 0 : .map(|i| {
351 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
352 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
353 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
354 0 : NeonLocalInitPageserverConf {
355 0 : id: pageserver_id,
356 0 : listen_pg_addr: format!("127.0.0.1:{pg_port}"),
357 0 : listen_http_addr: format!("127.0.0.1:{http_port}"),
358 0 : pg_auth_type: AuthType::Trust,
359 0 : http_auth_type: AuthType::Trust,
360 0 : other: Default::default(),
361 0 : }
362 0 : })
363 0 : .collect(),
364 0 : pg_distrib_dir: None,
365 0 : neon_distrib_dir: None,
366 0 : default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
367 0 : storage_controller: None,
368 0 : control_plane_compute_hook_api: None,
369 0 : }
370 : };
371 :
372 0 : LocalEnv::init(init_conf, force)
373 0 : .context("materialize initial neon_local environment on disk")?;
374 0 : Ok(LocalEnv::load_config(&local_env::base_path())
375 0 : .expect("freshly written config should be loadable"))
376 0 : }
377 :
378 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
379 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
380 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
381 : /// than this CLI.
382 0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
383 0 : let ps_conf = env
384 0 : .pageservers
385 0 : .first()
386 0 : .expect("Config is validated to contain at least one pageserver");
387 0 : PageServerNode::from_env(env, ps_conf)
388 0 : }
389 :
390 0 : async fn handle_tenant(
391 0 : tenant_match: &ArgMatches,
392 0 : env: &mut local_env::LocalEnv,
393 0 : ) -> anyhow::Result<()> {
394 0 : let pageserver = get_default_pageserver(env);
395 0 : match tenant_match.subcommand() {
396 0 : Some(("list", _)) => {
397 0 : for t in pageserver.tenant_list().await? {
398 0 : println!("{} {:?}", t.id, t.state);
399 0 : }
400 : }
401 0 : Some(("import", import_match)) => {
402 0 : let tenant_id = parse_tenant_id(import_match)?.unwrap_or_else(TenantId::generate);
403 0 :
404 0 : let storage_controller = StorageController::from_env(env);
405 0 : let create_response = storage_controller.tenant_import(tenant_id).await?;
406 :
407 0 : let shard_zero = create_response
408 0 : .shards
409 0 : .first()
410 0 : .expect("Import response omitted shards");
411 0 :
412 0 : let attached_pageserver_id = shard_zero.node_id;
413 0 : let pageserver =
414 0 : PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
415 :
416 0 : println!(
417 0 : "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
418 0 : );
419 :
420 0 : let timelines = pageserver
421 0 : .http_client
422 0 : .list_timelines(shard_zero.shard_id)
423 0 : .await?;
424 :
425 : // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
426 0 : let main_timeline = timelines
427 0 : .iter()
428 0 : .find(|t| t.ancestor_timeline_id.is_none())
429 0 : .expect("No timelines found")
430 0 : .timeline_id;
431 0 :
432 0 : let mut branch_i = 0;
433 0 : for timeline in timelines.iter() {
434 0 : let branch_name = if timeline.timeline_id == main_timeline {
435 0 : "main".to_string()
436 : } else {
437 0 : branch_i += 1;
438 0 : format!("branch_{branch_i}")
439 : };
440 :
441 0 : println!(
442 0 : "Importing timeline {tenant_id}/{} as branch {branch_name}",
443 0 : timeline.timeline_id
444 0 : );
445 0 :
446 0 : env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
447 : }
448 : }
449 0 : Some(("create", create_match)) => {
450 0 : let tenant_conf: HashMap<_, _> = create_match
451 0 : .get_many::<String>("config")
452 0 : .map(|vals: clap::parser::ValuesRef<'_, String>| {
453 0 : vals.flat_map(|c| c.split_once(':')).collect()
454 0 : })
455 0 : .unwrap_or_default();
456 0 :
457 0 : let shard_count: u8 = create_match
458 0 : .get_one::<u8>("shard-count")
459 0 : .cloned()
460 0 : .unwrap_or(0);
461 0 :
462 0 : let shard_stripe_size: Option<u32> =
463 0 : create_match.get_one::<u32>("shard-stripe-size").cloned();
464 :
465 0 : let placement_policy = match create_match.get_one::<String>("placement-policy") {
466 0 : Some(s) if !s.is_empty() => serde_json::from_str::<PlacementPolicy>(s)?,
467 0 : _ => PlacementPolicy::Attached(0),
468 : };
469 :
470 0 : let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
471 :
472 : // If tenant ID was not specified, generate one
473 0 : let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
474 0 :
475 0 : // We must register the tenant with the storage controller, so
476 0 : // that when the pageserver restarts, it will be re-attached.
477 0 : let storage_controller = StorageController::from_env(env);
478 0 : storage_controller
479 0 : .tenant_create(TenantCreateRequest {
480 0 : // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
481 0 : // storage controller expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
482 0 : // type is used both in storage controller (for creating tenants) and in pageserver (for creating shards)
483 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
484 0 : generation: None,
485 0 : shard_parameters: ShardParameters {
486 0 : count: ShardCount::new(shard_count),
487 0 : stripe_size: shard_stripe_size
488 0 : .map(ShardStripeSize)
489 0 : .unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
490 0 : },
491 0 : placement_policy: Some(placement_policy),
492 0 : config: tenant_conf,
493 0 : })
494 0 : .await?;
495 0 : println!("tenant {tenant_id} successfully created on the pageserver");
496 :
497 : // Create an initial timeline for the new tenant
498 0 : let new_timeline_id =
499 0 : parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
500 0 : let pg_version = create_match
501 0 : .get_one::<u32>("pg-version")
502 0 : .copied()
503 0 : .context("Failed to parse postgres version from the argument string")?;
504 :
505 : // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
506 : // different shards picking different start lsns. Maybe we have to teach storage controller
507 : // to let shard 0 branch first and then propagate the chosen LSN to other shards.
508 0 : storage_controller
509 0 : .tenant_timeline_create(
510 0 : tenant_id,
511 0 : TimelineCreateRequest {
512 0 : new_timeline_id,
513 0 : ancestor_timeline_id: None,
514 0 : ancestor_start_lsn: None,
515 0 : existing_initdb_timeline_id: None,
516 0 : pg_version: Some(pg_version),
517 0 : },
518 0 : )
519 0 : .await?;
520 :
521 0 : env.register_branch_mapping(
522 0 : DEFAULT_BRANCH_NAME.to_string(),
523 0 : tenant_id,
524 0 : new_timeline_id,
525 0 : )?;
526 :
527 0 : println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
528 0 :
529 0 : if create_match.get_flag("set-default") {
530 0 : println!("Setting tenant {tenant_id} as a default one");
531 0 : env.default_tenant_id = Some(tenant_id);
532 0 : }
533 : }
534 0 : Some(("set-default", set_default_match)) => {
535 0 : let tenant_id =
536 0 : parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
537 0 : println!("Setting tenant {tenant_id} as a default one");
538 0 : env.default_tenant_id = Some(tenant_id);
539 : }
540 0 : Some(("config", create_match)) => {
541 0 : let tenant_id = get_tenant_id(create_match, env)?;
542 0 : let tenant_conf: HashMap<_, _> = create_match
543 0 : .get_many::<String>("config")
544 0 : .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
545 0 : .unwrap_or_default();
546 0 :
547 0 : pageserver
548 0 : .tenant_config(tenant_id, tenant_conf)
549 0 : .await
550 0 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
551 0 : println!("tenant {tenant_id} successfully configured on the pageserver");
552 : }
553 :
554 0 : Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
555 0 : None => bail!("no tenant subcommand provided"),
556 : }
557 0 : Ok(())
558 0 : }
559 :
560 0 : async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
561 0 : let pageserver = get_default_pageserver(env);
562 0 :
563 0 : match timeline_match.subcommand() {
564 0 : Some(("list", list_match)) => {
565 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
566 : // where shard 0 is attached, and query there.
567 0 : let tenant_shard_id = get_tenant_shard_id(list_match, env)?;
568 0 : let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
569 0 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
570 : }
571 0 : Some(("create", create_match)) => {
572 0 : let tenant_id = get_tenant_id(create_match, env)?;
573 0 : let new_branch_name = create_match
574 0 : .get_one::<String>("branch-name")
575 0 : .ok_or_else(|| anyhow!("No branch name provided"))?;
576 :
577 0 : let pg_version = create_match
578 0 : .get_one::<u32>("pg-version")
579 0 : .copied()
580 0 : .context("Failed to parse postgres version from the argument string")?;
581 :
582 0 : let new_timeline_id_opt = parse_timeline_id(create_match)?;
583 0 : let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
584 0 :
585 0 : let storage_controller = StorageController::from_env(env);
586 0 : let create_req = TimelineCreateRequest {
587 0 : new_timeline_id,
588 0 : ancestor_timeline_id: None,
589 0 : existing_initdb_timeline_id: None,
590 0 : ancestor_start_lsn: None,
591 0 : pg_version: Some(pg_version),
592 0 : };
593 0 : let timeline_info = storage_controller
594 0 : .tenant_timeline_create(tenant_id, create_req)
595 0 : .await?;
596 :
597 0 : let last_record_lsn = timeline_info.last_record_lsn;
598 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
599 :
600 0 : println!(
601 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
602 0 : timeline_info.timeline_id
603 0 : );
604 : }
605 0 : Some(("import", import_match)) => {
606 0 : let tenant_id = get_tenant_id(import_match, env)?;
607 0 : let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
608 0 : let branch_name = import_match
609 0 : .get_one::<String>("branch-name")
610 0 : .ok_or_else(|| anyhow!("No branch name provided"))?;
611 :
612 : // Parse base inputs
613 0 : let base_tarfile = import_match
614 0 : .get_one::<PathBuf>("base-tarfile")
615 0 : .ok_or_else(|| anyhow!("No base-tarfile provided"))?
616 0 : .to_owned();
617 0 : let base_lsn = Lsn::from_str(
618 0 : import_match
619 0 : .get_one::<String>("base-lsn")
620 0 : .ok_or_else(|| anyhow!("No base-lsn provided"))?,
621 0 : )?;
622 0 : let base = (base_lsn, base_tarfile);
623 0 :
624 0 : // Parse pg_wal inputs
625 0 : let wal_tarfile = import_match.get_one::<PathBuf>("wal-tarfile").cloned();
626 0 : let end_lsn = import_match
627 0 : .get_one::<String>("end-lsn")
628 0 : .map(|s| Lsn::from_str(s).unwrap());
629 0 : // TODO validate both or none are provided
630 0 : let pg_wal = end_lsn.zip(wal_tarfile);
631 :
632 0 : let pg_version = import_match
633 0 : .get_one::<u32>("pg-version")
634 0 : .copied()
635 0 : .context("Failed to parse postgres version from the argument string")?;
636 :
637 0 : println!("Importing timeline into pageserver ...");
638 0 : pageserver
639 0 : .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
640 0 : .await?;
641 0 : env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
642 0 : println!("Done");
643 : }
644 0 : Some(("branch", branch_match)) => {
645 0 : let tenant_id = get_tenant_id(branch_match, env)?;
646 0 : let new_timeline_id =
647 0 : parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
648 0 : let new_branch_name = branch_match
649 0 : .get_one::<String>("branch-name")
650 0 : .ok_or_else(|| anyhow!("No branch name provided"))?;
651 0 : let ancestor_branch_name = branch_match
652 0 : .get_one::<String>("ancestor-branch-name")
653 0 : .map(|s| s.as_str())
654 0 : .unwrap_or(DEFAULT_BRANCH_NAME);
655 0 : let ancestor_timeline_id = env
656 0 : .get_branch_timeline_id(ancestor_branch_name, tenant_id)
657 0 : .ok_or_else(|| {
658 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
659 0 : })?;
660 :
661 0 : let start_lsn = branch_match
662 0 : .get_one::<String>("ancestor-start-lsn")
663 0 : .map(|lsn_str| Lsn::from_str(lsn_str))
664 0 : .transpose()
665 0 : .context("Failed to parse ancestor start Lsn from the request")?;
666 0 : let storage_controller = StorageController::from_env(env);
667 0 : let create_req = TimelineCreateRequest {
668 0 : new_timeline_id,
669 0 : ancestor_timeline_id: Some(ancestor_timeline_id),
670 0 : existing_initdb_timeline_id: None,
671 0 : ancestor_start_lsn: start_lsn,
672 0 : pg_version: None,
673 0 : };
674 0 : let timeline_info = storage_controller
675 0 : .tenant_timeline_create(tenant_id, create_req)
676 0 : .await?;
677 :
678 0 : let last_record_lsn = timeline_info.last_record_lsn;
679 0 :
680 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
681 :
682 0 : println!(
683 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
684 0 : timeline_info.timeline_id
685 0 : );
686 : }
687 0 : Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
688 0 : None => bail!("no tenant subcommand provided"),
689 : }
690 :
691 0 : Ok(())
692 0 : }
693 :
694 0 : async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
695 0 : let (sub_name, sub_args) = match ep_match.subcommand() {
696 0 : Some(ep_subcommand_data) => ep_subcommand_data,
697 0 : None => bail!("no endpoint subcommand provided"),
698 : };
699 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
700 :
701 0 : match sub_name {
702 0 : "list" => {
703 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
704 : // where shard 0 is attached, and query there.
705 0 : let tenant_shard_id = get_tenant_shard_id(sub_args, env)?;
706 0 : let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
707 0 : .await
708 0 : .unwrap_or_else(|e| {
709 0 : eprintln!("Failed to load timeline info: {}", e);
710 0 : HashMap::new()
711 0 : });
712 0 :
713 0 : let timeline_name_mappings = env.timeline_name_mappings();
714 0 :
715 0 : let mut table = comfy_table::Table::new();
716 0 :
717 0 : table.load_preset(comfy_table::presets::NOTHING);
718 0 :
719 0 : table.set_header([
720 0 : "ENDPOINT",
721 0 : "ADDRESS",
722 0 : "TIMELINE",
723 0 : "BRANCH NAME",
724 0 : "LSN",
725 0 : "STATUS",
726 0 : ]);
727 :
728 0 : for (endpoint_id, endpoint) in cplane
729 0 : .endpoints
730 0 : .iter()
731 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
732 : {
733 0 : let lsn_str = match endpoint.mode {
734 0 : ComputeMode::Static(lsn) => {
735 0 : // -> read-only endpoint
736 0 : // Use the node's LSN.
737 0 : lsn.to_string()
738 : }
739 : _ => {
740 : // -> primary endpoint or hot replica
741 : // Use the LSN at the end of the timeline.
742 0 : timeline_infos
743 0 : .get(&endpoint.timeline_id)
744 0 : .map(|bi| bi.last_record_lsn.to_string())
745 0 : .unwrap_or_else(|| "?".to_string())
746 : }
747 : };
748 :
749 0 : let branch_name = timeline_name_mappings
750 0 : .get(&TenantTimelineId::new(
751 0 : tenant_shard_id.tenant_id,
752 0 : endpoint.timeline_id,
753 0 : ))
754 0 : .map(|name| name.as_str())
755 0 : .unwrap_or("?");
756 0 :
757 0 : table.add_row([
758 0 : endpoint_id.as_str(),
759 0 : &endpoint.pg_address.to_string(),
760 0 : &endpoint.timeline_id.to_string(),
761 0 : branch_name,
762 0 : lsn_str.as_str(),
763 0 : &format!("{}", endpoint.status()),
764 0 : ]);
765 0 : }
766 :
767 0 : println!("{table}");
768 : }
769 0 : "create" => {
770 0 : let tenant_id = get_tenant_id(sub_args, env)?;
771 0 : let branch_name = sub_args
772 0 : .get_one::<String>("branch-name")
773 0 : .map(|s| s.as_str())
774 0 : .unwrap_or(DEFAULT_BRANCH_NAME);
775 0 : let endpoint_id = sub_args
776 0 : .get_one::<String>("endpoint_id")
777 0 : .map(String::to_string)
778 0 : .unwrap_or_else(|| format!("ep-{branch_name}"));
779 0 : let update_catalog = sub_args
780 0 : .get_one::<bool>("update-catalog")
781 0 : .cloned()
782 0 : .unwrap_or_default();
783 :
784 0 : let lsn = sub_args
785 0 : .get_one::<String>("lsn")
786 0 : .map(|lsn_str| Lsn::from_str(lsn_str))
787 0 : .transpose()
788 0 : .context("Failed to parse Lsn from the request")?;
789 0 : let timeline_id = env
790 0 : .get_branch_timeline_id(branch_name, tenant_id)
791 0 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
792 :
793 0 : let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
794 0 : let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
795 0 : let pg_version = sub_args
796 0 : .get_one::<u32>("pg-version")
797 0 : .copied()
798 0 : .context("Failed to parse postgres version from the argument string")?;
799 :
800 0 : let hot_standby = sub_args
801 0 : .get_one::<bool>("hot-standby")
802 0 : .copied()
803 0 : .unwrap_or(false);
804 0 :
805 0 : let allow_multiple = sub_args.get_flag("allow-multiple");
806 :
807 0 : let mode = match (lsn, hot_standby) {
808 0 : (Some(lsn), false) => ComputeMode::Static(lsn),
809 0 : (None, true) => ComputeMode::Replica,
810 0 : (None, false) => ComputeMode::Primary,
811 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
812 : };
813 :
814 0 : match (mode, hot_standby) {
815 : (ComputeMode::Static(_), true) => {
816 0 : bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
817 : }
818 : (ComputeMode::Primary, true) => {
819 0 : bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
820 : }
821 0 : _ => {}
822 0 : }
823 0 :
824 0 : if !allow_multiple {
825 0 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
826 0 : }
827 :
828 0 : cplane.new_endpoint(
829 0 : &endpoint_id,
830 0 : tenant_id,
831 0 : timeline_id,
832 0 : pg_port,
833 0 : http_port,
834 0 : pg_version,
835 0 : mode,
836 0 : !update_catalog,
837 0 : )?;
838 : }
839 0 : "start" => {
840 0 : let endpoint_id = sub_args
841 0 : .get_one::<String>("endpoint_id")
842 0 : .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
843 :
844 0 : let pageserver_id =
845 0 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
846 : Some(NodeId(
847 0 : id_str.parse().context("while parsing pageserver id")?,
848 : ))
849 : } else {
850 0 : None
851 : };
852 :
853 0 : let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
854 0 :
855 0 : let allow_multiple = sub_args.get_flag("allow-multiple");
856 :
857 : // If --safekeepers argument is given, use only the listed
858 : // safekeeper nodes; otherwise all from the env.
859 0 : let safekeepers = if let Some(safekeepers) = parse_safekeepers(sub_args)? {
860 0 : safekeepers
861 : } else {
862 0 : env.safekeepers.iter().map(|sk| sk.id).collect()
863 : };
864 :
865 0 : let endpoint = cplane
866 0 : .endpoints
867 0 : .get(endpoint_id.as_str())
868 0 : .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
869 :
870 0 : let create_test_user = sub_args
871 0 : .get_one::<bool>("create-test-user")
872 0 : .cloned()
873 0 : .unwrap_or_default();
874 0 :
875 0 : if !allow_multiple {
876 0 : cplane.check_conflicting_endpoints(
877 0 : endpoint.mode,
878 0 : endpoint.tenant_id,
879 0 : endpoint.timeline_id,
880 0 : )?;
881 0 : }
882 :
883 0 : let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
884 0 : let conf = env.get_pageserver_conf(pageserver_id).unwrap();
885 0 : let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
886 0 : (
887 0 : vec![(parsed.0, parsed.1.unwrap_or(5432))],
888 0 : // If caller is telling us what pageserver to use, this is not a tenant which is
889 0 : // full managed by storage controller, therefore not sharded.
890 0 : ShardParameters::DEFAULT_STRIPE_SIZE,
891 0 : )
892 : } else {
893 : // Look up the currently attached location of the tenant, and its striping metadata,
894 : // to pass these on to postgres.
895 0 : let storage_controller = StorageController::from_env(env);
896 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
897 0 : let pageservers = locate_result
898 0 : .shards
899 0 : .into_iter()
900 0 : .map(|shard| {
901 0 : (
902 0 : Host::parse(&shard.listen_pg_addr)
903 0 : .expect("Storage controller reported bad hostname"),
904 0 : shard.listen_pg_port,
905 0 : )
906 0 : })
907 0 : .collect::<Vec<_>>();
908 0 : let stripe_size = locate_result.shard_params.stripe_size;
909 0 :
910 0 : (pageservers, stripe_size)
911 : };
912 0 : assert!(!pageservers.is_empty());
913 :
914 0 : let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
915 0 : let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
916 0 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
917 0 :
918 0 : Some(env.generate_auth_token(&claims)?)
919 : } else {
920 0 : None
921 : };
922 :
923 0 : println!("Starting existing endpoint {endpoint_id}...");
924 0 : endpoint
925 0 : .start(
926 0 : &auth_token,
927 0 : safekeepers,
928 0 : pageservers,
929 0 : remote_ext_config,
930 0 : stripe_size.0 as usize,
931 0 : create_test_user,
932 0 : )
933 0 : .await?;
934 : }
935 0 : "reconfigure" => {
936 0 : let endpoint_id = sub_args
937 0 : .get_one::<String>("endpoint_id")
938 0 : .ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?;
939 0 : let endpoint = cplane
940 0 : .endpoints
941 0 : .get(endpoint_id.as_str())
942 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
943 0 : let pageservers =
944 0 : if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
945 0 : let ps_id = NodeId(id_str.parse().context("while parsing pageserver id")?);
946 0 : let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
947 0 : vec![(
948 0 : pageserver.pg_connection_config.host().clone(),
949 0 : pageserver.pg_connection_config.port(),
950 0 : )]
951 : } else {
952 0 : let storage_controller = StorageController::from_env(env);
953 0 : storage_controller
954 0 : .tenant_locate(endpoint.tenant_id)
955 0 : .await?
956 : .shards
957 0 : .into_iter()
958 0 : .map(|shard| {
959 0 : (
960 0 : Host::parse(&shard.listen_pg_addr)
961 0 : .expect("Storage controller reported malformed host"),
962 0 : shard.listen_pg_port,
963 0 : )
964 0 : })
965 0 : .collect::<Vec<_>>()
966 : };
967 : // If --safekeepers argument is given, use only the listed
968 : // safekeeper nodes; otherwise all from the env.
969 0 : let safekeepers = parse_safekeepers(sub_args)?;
970 0 : endpoint.reconfigure(pageservers, None, safekeepers).await?;
971 : }
972 0 : "stop" => {
973 0 : let endpoint_id = sub_args
974 0 : .get_one::<String>("endpoint_id")
975 0 : .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
976 0 : let destroy = sub_args.get_flag("destroy");
977 0 : let mode = sub_args.get_one::<String>("mode").expect("has a default");
978 :
979 0 : let endpoint = cplane
980 0 : .endpoints
981 0 : .get(endpoint_id.as_str())
982 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
983 0 : endpoint.stop(mode, destroy)?;
984 : }
985 :
986 0 : _ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
987 : }
988 :
989 0 : Ok(())
990 0 : }
991 :
992 : /// Parse --safekeepers as list of safekeeper ids.
993 0 : fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
994 0 : if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
995 0 : let mut safekeepers: Vec<NodeId> = Vec::new();
996 0 : for sk_id in safekeepers_str.split(',').map(str::trim) {
997 0 : let sk_id = NodeId(
998 0 : u64::from_str(sk_id)
999 0 : .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
1000 : );
1001 0 : safekeepers.push(sk_id);
1002 : }
1003 0 : Ok(Some(safekeepers))
1004 : } else {
1005 0 : Ok(None)
1006 : }
1007 0 : }
1008 :
1009 0 : fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
1010 0 : let (sub_name, sub_args) = match sub_match.subcommand() {
1011 0 : Some(ep_subcommand_data) => ep_subcommand_data,
1012 0 : None => bail!("no mappings subcommand provided"),
1013 : };
1014 :
1015 0 : match sub_name {
1016 0 : "map" => {
1017 0 : let branch_name = sub_args
1018 0 : .get_one::<String>("branch-name")
1019 0 : .expect("branch-name argument missing");
1020 0 :
1021 0 : let tenant_id = sub_args
1022 0 : .get_one::<String>("tenant-id")
1023 0 : .map(|x| TenantId::from_str(x))
1024 0 : .expect("tenant-id argument missing")
1025 0 : .expect("malformed tenant-id arg");
1026 0 :
1027 0 : let timeline_id = sub_args
1028 0 : .get_one::<String>("timeline-id")
1029 0 : .map(|x| TimelineId::from_str(x))
1030 0 : .expect("timeline-id argument missing")
1031 0 : .expect("malformed timeline-id arg");
1032 0 :
1033 0 : env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
1034 :
1035 0 : Ok(())
1036 : }
1037 0 : other => unimplemented!("mappings subcommand {other}"),
1038 : }
1039 0 : }
1040 :
1041 0 : fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
1042 0 : let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
1043 0 : NodeId(id_str.parse().context("while parsing pageserver id")?)
1044 : } else {
1045 0 : DEFAULT_PAGESERVER_ID
1046 : };
1047 :
1048 : Ok(PageServerNode::from_env(
1049 0 : env,
1050 0 : env.get_pageserver_conf(node_id)?,
1051 : ))
1052 0 : }
1053 :
1054 0 : fn get_start_timeout(args: &ArgMatches) -> &Duration {
1055 0 : let humantime_duration = args
1056 0 : .get_one::<humantime::Duration>("start-timeout")
1057 0 : .expect("invalid value for start-timeout");
1058 0 : humantime_duration.as_ref()
1059 0 : }
1060 :
1061 0 : fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs {
1062 0 : let maybe_instance_id = args.get_one::<u8>("instance-id");
1063 0 :
1064 0 : let base_port = args.get_one::<u16>("base-port");
1065 0 :
1066 0 : if maybe_instance_id.is_some() && base_port.is_none() {
1067 0 : panic!("storage-controller start specificied instance-id but did not provide base-port");
1068 0 : }
1069 0 :
1070 0 : let start_timeout = args
1071 0 : .get_one::<humantime::Duration>("start-timeout")
1072 0 : .expect("invalid value for start-timeout");
1073 0 :
1074 0 : NeonStorageControllerStartArgs {
1075 0 : instance_id: maybe_instance_id.copied().unwrap_or(1),
1076 0 : base_port: base_port.copied(),
1077 0 : start_timeout: *start_timeout,
1078 0 : }
1079 0 : }
1080 :
1081 0 : fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs {
1082 0 : let maybe_instance_id = args.get_one::<u8>("instance-id");
1083 0 : let immediate = args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1084 0 :
1085 0 : NeonStorageControllerStopArgs {
1086 0 : instance_id: maybe_instance_id.copied().unwrap_or(1),
1087 0 : immediate,
1088 0 : }
1089 0 : }
1090 :
1091 0 : async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1092 0 : match sub_match.subcommand() {
1093 0 : Some(("start", subcommand_args)) => {
1094 0 : if let Err(e) = get_pageserver(env, subcommand_args)?
1095 0 : .start(get_start_timeout(subcommand_args))
1096 0 : .await
1097 : {
1098 0 : eprintln!("pageserver start failed: {e}");
1099 0 : exit(1);
1100 0 : }
1101 : }
1102 :
1103 0 : Some(("stop", subcommand_args)) => {
1104 0 : let immediate = subcommand_args
1105 0 : .get_one::<String>("stop-mode")
1106 0 : .map(|s| s.as_str())
1107 0 : == Some("immediate");
1108 :
1109 0 : if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) {
1110 0 : eprintln!("pageserver stop failed: {}", e);
1111 0 : exit(1);
1112 0 : }
1113 : }
1114 :
1115 0 : Some(("restart", subcommand_args)) => {
1116 0 : let pageserver = get_pageserver(env, subcommand_args)?;
1117 : //TODO what shutdown strategy should we use here?
1118 0 : if let Err(e) = pageserver.stop(false) {
1119 0 : eprintln!("pageserver stop failed: {}", e);
1120 0 : exit(1);
1121 0 : }
1122 :
1123 0 : if let Err(e) = pageserver.start(get_start_timeout(sub_match)).await {
1124 0 : eprintln!("pageserver start failed: {e}");
1125 0 : exit(1);
1126 0 : }
1127 : }
1128 :
1129 0 : Some(("status", subcommand_args)) => {
1130 0 : match get_pageserver(env, subcommand_args)?.check_status().await {
1131 0 : Ok(_) => println!("Page server is up and running"),
1132 0 : Err(err) => {
1133 0 : eprintln!("Page server is not available: {}", err);
1134 0 : exit(1);
1135 : }
1136 : }
1137 : }
1138 :
1139 0 : Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
1140 0 : None => bail!("no pageserver subcommand provided"),
1141 : }
1142 0 : Ok(())
1143 0 : }
1144 :
1145 0 : async fn handle_storage_controller(
1146 0 : sub_match: &ArgMatches,
1147 0 : env: &local_env::LocalEnv,
1148 0 : ) -> Result<()> {
1149 0 : let svc = StorageController::from_env(env);
1150 0 : match sub_match.subcommand() {
1151 0 : Some(("start", start_match)) => {
1152 0 : if let Err(e) = svc.start(storage_controller_start_args(start_match)).await {
1153 0 : eprintln!("start failed: {e}");
1154 0 : exit(1);
1155 0 : }
1156 : }
1157 :
1158 0 : Some(("stop", stop_match)) => {
1159 0 : if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await {
1160 0 : eprintln!("stop failed: {}", e);
1161 0 : exit(1);
1162 0 : }
1163 : }
1164 0 : Some((sub_name, _)) => bail!("Unexpected storage_controller subcommand '{}'", sub_name),
1165 0 : None => bail!("no storage_controller subcommand provided"),
1166 : }
1167 0 : Ok(())
1168 0 : }
1169 :
1170 0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1171 0 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1172 0 : Ok(SafekeeperNode::from_env(env, node))
1173 : } else {
1174 0 : bail!("could not find safekeeper {id}")
1175 : }
1176 0 : }
1177 :
1178 : // Get list of options to append to safekeeper command invocation.
1179 0 : fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec<String> {
1180 0 : init_match
1181 0 : .get_many::<String>("safekeeper-extra-opt")
1182 0 : .into_iter()
1183 0 : .flatten()
1184 0 : .map(|s| s.to_owned())
1185 0 : .collect()
1186 0 : }
1187 :
1188 0 : async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1189 0 : let (sub_name, sub_args) = match sub_match.subcommand() {
1190 0 : Some(safekeeper_command_data) => safekeeper_command_data,
1191 0 : None => bail!("no safekeeper subcommand provided"),
1192 : };
1193 :
1194 : // All the commands take an optional safekeeper name argument
1195 0 : let sk_id = if let Some(id_str) = sub_args.get_one::<String>("id") {
1196 0 : NodeId(id_str.parse().context("while parsing safekeeper id")?)
1197 : } else {
1198 0 : DEFAULT_SAFEKEEPER_ID
1199 : };
1200 0 : let safekeeper = get_safekeeper(env, sk_id)?;
1201 :
1202 0 : match sub_name {
1203 0 : "start" => {
1204 0 : let extra_opts = safekeeper_extra_opts(sub_args);
1205 :
1206 0 : if let Err(e) = safekeeper
1207 0 : .start(extra_opts, get_start_timeout(sub_args))
1208 0 : .await
1209 : {
1210 0 : eprintln!("safekeeper start failed: {}", e);
1211 0 : exit(1);
1212 0 : }
1213 : }
1214 :
1215 0 : "stop" => {
1216 0 : let immediate =
1217 0 : sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1218 :
1219 0 : if let Err(e) = safekeeper.stop(immediate) {
1220 0 : eprintln!("safekeeper stop failed: {}", e);
1221 0 : exit(1);
1222 0 : }
1223 : }
1224 :
1225 0 : "restart" => {
1226 0 : let immediate =
1227 0 : sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1228 :
1229 0 : if let Err(e) = safekeeper.stop(immediate) {
1230 0 : eprintln!("safekeeper stop failed: {}", e);
1231 0 : exit(1);
1232 0 : }
1233 0 :
1234 0 : let extra_opts = safekeeper_extra_opts(sub_args);
1235 0 : if let Err(e) = safekeeper
1236 0 : .start(extra_opts, get_start_timeout(sub_args))
1237 0 : .await
1238 : {
1239 0 : eprintln!("safekeeper start failed: {}", e);
1240 0 : exit(1);
1241 0 : }
1242 : }
1243 :
1244 : _ => {
1245 0 : bail!("Unexpected safekeeper subcommand '{}'", sub_name)
1246 : }
1247 : }
1248 0 : Ok(())
1249 0 : }
1250 :
1251 0 : async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1252 0 : let (sub_name, sub_args) = match sub_match.subcommand() {
1253 0 : Some(broker_command_data) => broker_command_data,
1254 0 : None => bail!("no broker subcommand provided"),
1255 : };
1256 :
1257 0 : match sub_name {
1258 0 : "start" => {
1259 0 : if let Err(e) = broker::start_broker_process(env, get_start_timeout(sub_args)).await {
1260 0 : eprintln!("broker start failed: {e}");
1261 0 : exit(1);
1262 0 : }
1263 : }
1264 :
1265 0 : "stop" => {
1266 0 : if let Err(e) = broker::stop_broker_process(env) {
1267 0 : eprintln!("broker stop failed: {e}");
1268 0 : exit(1);
1269 0 : }
1270 : }
1271 :
1272 0 : _ => bail!("Unexpected broker subcommand '{}'", sub_name),
1273 : }
1274 0 : Ok(())
1275 0 : }
1276 :
1277 0 : async fn handle_start_all(
1278 0 : env: &'static local_env::LocalEnv,
1279 0 : retry_timeout: &Duration,
1280 0 : ) -> anyhow::Result<()> {
1281 0 : let Err(errors) = handle_start_all_impl(env, *retry_timeout).await else {
1282 0 : neon_start_status_check(env, retry_timeout)
1283 0 : .await
1284 0 : .context("status check after successful startup of all services")?;
1285 0 : return Ok(());
1286 : };
1287 :
1288 0 : eprintln!("startup failed because one or more services could not be started");
1289 :
1290 0 : for e in errors {
1291 0 : eprintln!("{e}");
1292 0 : let debug_repr = format!("{e:?}");
1293 0 : for line in debug_repr.lines() {
1294 0 : eprintln!(" {line}");
1295 0 : }
1296 : }
1297 :
1298 0 : try_stop_all(env, true).await;
1299 :
1300 0 : exit(2);
1301 0 : }
1302 :
1303 : /// Returns Ok() if and only if all services could be started successfully.
1304 : /// Otherwise, returns the list of errors that occurred during startup.
1305 0 : async fn handle_start_all_impl(
1306 0 : env: &'static local_env::LocalEnv,
1307 0 : retry_timeout: Duration,
1308 0 : ) -> Result<(), Vec<anyhow::Error>> {
1309 0 : // Endpoints are not started automatically
1310 0 :
1311 0 : let mut js = JoinSet::new();
1312 :
1313 : // force infalliblity through closure
1314 : #[allow(clippy::redundant_closure_call)]
1315 0 : (|| {
1316 0 : js.spawn(async move {
1317 0 : let retry_timeout = retry_timeout;
1318 0 : broker::start_broker_process(env, &retry_timeout).await
1319 0 : });
1320 0 :
1321 0 : // Only start the storage controller if the pageserver is configured to need it
1322 0 : if env.control_plane_api.is_some() {
1323 0 : js.spawn(async move {
1324 0 : let storage_controller = StorageController::from_env(env);
1325 0 : storage_controller
1326 0 : .start(NeonStorageControllerStartArgs::with_default_instance_id(
1327 0 : retry_timeout.into(),
1328 0 : ))
1329 0 : .await
1330 0 : .map_err(|e| e.context("start storage_controller"))
1331 0 : });
1332 0 : }
1333 :
1334 0 : for ps_conf in &env.pageservers {
1335 0 : js.spawn(async move {
1336 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1337 0 : pageserver
1338 0 : .start(&retry_timeout)
1339 0 : .await
1340 0 : .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
1341 0 : });
1342 0 : }
1343 :
1344 0 : for node in env.safekeepers.iter() {
1345 0 : js.spawn(async move {
1346 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1347 0 : safekeeper
1348 0 : .start(vec![], &retry_timeout)
1349 0 : .await
1350 0 : .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
1351 0 : });
1352 0 : }
1353 0 : })();
1354 0 :
1355 0 : let mut errors = Vec::new();
1356 0 : while let Some(result) = js.join_next().await {
1357 0 : let result = result.expect("we don't panic or cancel the tasks");
1358 0 : if let Err(e) = result {
1359 0 : errors.push(e);
1360 0 : }
1361 : }
1362 :
1363 0 : if !errors.is_empty() {
1364 0 : return Err(errors);
1365 0 : }
1366 0 :
1367 0 : Ok(())
1368 0 : }
1369 :
1370 0 : async fn neon_start_status_check(
1371 0 : env: &local_env::LocalEnv,
1372 0 : retry_timeout: &Duration,
1373 0 : ) -> anyhow::Result<()> {
1374 : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
1375 : const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
1376 :
1377 0 : if env.control_plane_api.is_none() {
1378 0 : return Ok(());
1379 0 : }
1380 0 :
1381 0 : let storcon = StorageController::from_env(env);
1382 0 :
1383 0 : let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
1384 0 : let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
1385 0 :
1386 0 : println!("\nRunning neon status check");
1387 :
1388 0 : for retry in 0..retries {
1389 0 : if retry == notice_after_retries {
1390 0 : println!("\nNeon status check has not passed yet, continuing to wait")
1391 0 : }
1392 :
1393 0 : let mut passed = true;
1394 0 : let mut nodes = storcon.node_list().await?;
1395 0 : let mut pageservers = env.pageservers.clone();
1396 0 :
1397 0 : if nodes.len() != pageservers.len() {
1398 0 : continue;
1399 0 : }
1400 0 :
1401 0 : nodes.sort_by_key(|ps| ps.id);
1402 0 : pageservers.sort_by_key(|ps| ps.id);
1403 :
1404 0 : for (idx, pageserver) in pageservers.iter().enumerate() {
1405 0 : let node = &nodes[idx];
1406 0 : if node.id != pageserver.id {
1407 0 : passed = false;
1408 0 : break;
1409 0 : }
1410 :
1411 0 : if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
1412 0 : passed = false;
1413 0 : break;
1414 0 : }
1415 : }
1416 :
1417 0 : if passed {
1418 0 : println!("\nNeon started and passed status check");
1419 0 : return Ok(());
1420 0 : }
1421 0 :
1422 0 : tokio::time::sleep(RETRY_INTERVAL).await;
1423 : }
1424 :
1425 0 : anyhow::bail!("\nNeon passed status check")
1426 0 : }
1427 :
1428 0 : async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
1429 0 : let immediate =
1430 0 : sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
1431 0 :
1432 0 : try_stop_all(env, immediate).await;
1433 :
1434 0 : Ok(())
1435 0 : }
1436 :
1437 0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
1438 0 : // Stop all endpoints
1439 0 : match ComputeControlPlane::load(env.clone()) {
1440 0 : Ok(cplane) => {
1441 0 : for (_k, node) in cplane.endpoints {
1442 0 : if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
1443 0 : eprintln!("postgres stop failed: {e:#}");
1444 0 : }
1445 : }
1446 : }
1447 0 : Err(e) => {
1448 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
1449 : }
1450 : }
1451 :
1452 0 : for ps_conf in &env.pageservers {
1453 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1454 0 : if let Err(e) = pageserver.stop(immediate) {
1455 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
1456 0 : }
1457 : }
1458 :
1459 0 : for node in env.safekeepers.iter() {
1460 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1461 0 : if let Err(e) = safekeeper.stop(immediate) {
1462 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
1463 0 : }
1464 : }
1465 :
1466 0 : if let Err(e) = broker::stop_broker_process(env) {
1467 0 : eprintln!("neon broker stop failed: {e:#}");
1468 0 : }
1469 :
1470 : // Stop all storage controller instances. In the most common case there's only one,
1471 : // but iterate though the base data directory in order to discover the instances.
1472 0 : let storcon_instances = env
1473 0 : .storage_controller_instances()
1474 0 : .await
1475 0 : .expect("Must inspect data dir");
1476 0 : for (instance_id, _instance_dir_path) in storcon_instances {
1477 0 : let storage_controller = StorageController::from_env(env);
1478 0 : let stop_args = NeonStorageControllerStopArgs {
1479 0 : instance_id,
1480 0 : immediate,
1481 0 : };
1482 :
1483 0 : if let Err(e) = storage_controller.stop(stop_args).await {
1484 0 : eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
1485 0 : }
1486 : }
1487 0 : }
1488 :
1489 1 : fn cli() -> Command {
1490 1 : let timeout_arg = Arg::new("start-timeout")
1491 1 : .long("start-timeout")
1492 1 : .short('t')
1493 1 : .global(true)
1494 1 : .help("timeout until we fail the command, e.g. 30s")
1495 1 : .value_parser(value_parser!(humantime::Duration))
1496 1 : .default_value("10s")
1497 1 : .required(false);
1498 1 :
1499 1 : let branch_name_arg = Arg::new("branch-name")
1500 1 : .long("branch-name")
1501 1 : .help("Name of the branch to be created or used as an alias for other services")
1502 1 : .required(false);
1503 1 :
1504 1 : let endpoint_id_arg = Arg::new("endpoint_id")
1505 1 : .help("Postgres endpoint id")
1506 1 : .required(false);
1507 1 :
1508 1 : let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
1509 1 :
1510 1 : // --id, when using a pageserver command
1511 1 : let pageserver_id_arg = Arg::new("pageserver-id")
1512 1 : .long("id")
1513 1 : .global(true)
1514 1 : .help("pageserver id")
1515 1 : .required(false);
1516 1 : // --pageserver-id when using a non-pageserver command
1517 1 : let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id")
1518 1 : .long("pageserver-id")
1519 1 : .required(false);
1520 1 :
1521 1 : let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt")
1522 1 : .short('e')
1523 1 : .long("safekeeper-extra-opt")
1524 1 : .num_args(1)
1525 1 : .action(ArgAction::Append)
1526 1 : .help("Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo")
1527 1 : .required(false);
1528 1 :
1529 1 : let tenant_id_arg = Arg::new("tenant-id")
1530 1 : .long("tenant-id")
1531 1 : .help("Tenant id. Represented as a hexadecimal string 32 symbols length")
1532 1 : .required(false);
1533 1 :
1534 1 : let timeline_id_arg = Arg::new("timeline-id")
1535 1 : .long("timeline-id")
1536 1 : .help("Timeline id. Represented as a hexadecimal string 32 symbols length")
1537 1 : .required(false);
1538 1 :
1539 1 : let pg_version_arg = Arg::new("pg-version")
1540 1 : .long("pg-version")
1541 1 : .help("Postgres version to use for the initial tenant")
1542 1 : .required(false)
1543 1 : .value_parser(value_parser!(u32))
1544 1 : .default_value(DEFAULT_PG_VERSION);
1545 1 :
1546 1 : let pg_port_arg = Arg::new("pg-port")
1547 1 : .long("pg-port")
1548 1 : .required(false)
1549 1 : .value_parser(value_parser!(u16))
1550 1 : .value_name("pg-port");
1551 1 :
1552 1 : let http_port_arg = Arg::new("http-port")
1553 1 : .long("http-port")
1554 1 : .required(false)
1555 1 : .value_parser(value_parser!(u16))
1556 1 : .value_name("http-port");
1557 1 :
1558 1 : let safekeepers_arg = Arg::new("safekeepers")
1559 1 : .long("safekeepers")
1560 1 : .required(false)
1561 1 : .value_name("safekeepers");
1562 1 :
1563 1 : let stop_mode_arg = Arg::new("stop-mode")
1564 1 : .short('m')
1565 1 : .value_parser(["fast", "immediate"])
1566 1 : .default_value("fast")
1567 1 : .help("If 'immediate', don't flush repository data at shutdown")
1568 1 : .required(false)
1569 1 : .value_name("stop-mode");
1570 1 :
1571 1 : let remote_ext_config_args = Arg::new("remote-ext-config")
1572 1 : .long("remote-ext-config")
1573 1 : .num_args(1)
1574 1 : .help("Configure the remote extensions storage proxy gateway to request for extensions.")
1575 1 : .required(false);
1576 1 :
1577 1 : let lsn_arg = Arg::new("lsn")
1578 1 : .long("lsn")
1579 1 : .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
1580 1 : .required(false);
1581 1 :
1582 1 : let hot_standby_arg = Arg::new("hot-standby")
1583 1 : .value_parser(value_parser!(bool))
1584 1 : .long("hot-standby")
1585 1 : .help("If set, the node will be a hot replica on the specified timeline")
1586 1 : .required(false);
1587 1 :
1588 1 : let force_arg = Arg::new("force")
1589 1 : .value_parser(value_parser!(InitForceMode))
1590 1 : .long("force")
1591 1 : .default_value(
1592 1 : InitForceMode::MustNotExist
1593 1 : .to_possible_value()
1594 1 : .unwrap()
1595 1 : .get_name()
1596 1 : .to_owned(),
1597 1 : )
1598 1 : .help("Force initialization even if the repository is not empty")
1599 1 : .required(false);
1600 1 :
1601 1 : let num_pageservers_arg = Arg::new("num-pageservers")
1602 1 : .value_parser(value_parser!(u16))
1603 1 : .long("num-pageservers")
1604 1 : .help("How many pageservers to create (default 1)");
1605 1 :
1606 1 : let update_catalog = Arg::new("update-catalog")
1607 1 : .value_parser(value_parser!(bool))
1608 1 : .long("update-catalog")
1609 1 : .help("If set, will set up the catalog for neon_superuser")
1610 1 : .required(false);
1611 1 :
1612 1 : let create_test_user = Arg::new("create-test-user")
1613 1 : .value_parser(value_parser!(bool))
1614 1 : .long("create-test-user")
1615 1 : .help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`")
1616 1 : .required(false);
1617 1 :
1618 1 : let allow_multiple = Arg::new("allow-multiple")
1619 1 : .help("Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests.")
1620 1 : .long("allow-multiple")
1621 1 : .action(ArgAction::SetTrue)
1622 1 : .required(false);
1623 1 :
1624 1 : let instance_id = Arg::new("instance-id")
1625 1 : .long("instance-id")
1626 1 : .help("Identifier used to distinguish storage controller instances (default 1)")
1627 1 : .value_parser(value_parser!(u8))
1628 1 : .required(false);
1629 1 :
1630 1 : let base_port = Arg::new("base-port")
1631 1 : .long("base-port")
1632 1 : .help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)")
1633 1 : .value_parser(value_parser!(u16))
1634 1 : .required(false);
1635 1 :
1636 1 : Command::new("Neon CLI")
1637 1 : .arg_required_else_help(true)
1638 1 : .version(GIT_VERSION)
1639 1 : .subcommand(
1640 1 : Command::new("init")
1641 1 : .about("Initialize a new Neon repository, preparing configs for services to start with")
1642 1 : .arg(num_pageservers_arg.clone())
1643 1 : .arg(
1644 1 : Arg::new("config")
1645 1 : .long("config")
1646 1 : .required(false)
1647 1 : .value_parser(value_parser!(PathBuf))
1648 1 : .value_name("config")
1649 1 : )
1650 1 : .arg(force_arg)
1651 1 : )
1652 1 : .subcommand(
1653 1 : Command::new("timeline")
1654 1 : .about("Manage timelines")
1655 1 : .arg_required_else_help(true)
1656 1 : .subcommand(Command::new("list")
1657 1 : .about("List all timelines, available to this pageserver")
1658 1 : .arg(tenant_id_arg.clone()))
1659 1 : .subcommand(Command::new("branch")
1660 1 : .about("Create a new timeline, using another timeline as a base, copying its data")
1661 1 : .arg(tenant_id_arg.clone())
1662 1 : .arg(timeline_id_arg.clone())
1663 1 : .arg(branch_name_arg.clone())
1664 1 : .arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
1665 1 : .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
1666 1 : .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
1667 1 : .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
1668 1 : .subcommand(Command::new("create")
1669 1 : .about("Create a new blank timeline")
1670 1 : .arg(tenant_id_arg.clone())
1671 1 : .arg(timeline_id_arg.clone())
1672 1 : .arg(branch_name_arg.clone())
1673 1 : .arg(pg_version_arg.clone())
1674 1 : )
1675 1 : .subcommand(Command::new("import")
1676 1 : .about("Import timeline from basebackup directory")
1677 1 : .arg(tenant_id_arg.clone())
1678 1 : .arg(timeline_id_arg.clone())
1679 1 : .arg(branch_name_arg.clone())
1680 1 : .arg(Arg::new("base-tarfile")
1681 1 : .long("base-tarfile")
1682 1 : .value_parser(value_parser!(PathBuf))
1683 1 : .help("Basebackup tarfile to import")
1684 1 : )
1685 1 : .arg(Arg::new("base-lsn").long("base-lsn")
1686 1 : .help("Lsn the basebackup starts at"))
1687 1 : .arg(Arg::new("wal-tarfile")
1688 1 : .long("wal-tarfile")
1689 1 : .value_parser(value_parser!(PathBuf))
1690 1 : .help("Wal to add after base")
1691 1 : )
1692 1 : .arg(Arg::new("end-lsn").long("end-lsn")
1693 1 : .help("Lsn the basebackup ends at"))
1694 1 : .arg(pg_version_arg.clone())
1695 1 : )
1696 1 : ).subcommand(
1697 1 : Command::new("tenant")
1698 1 : .arg_required_else_help(true)
1699 1 : .about("Manage tenants")
1700 1 : .subcommand(Command::new("list"))
1701 1 : .subcommand(Command::new("create")
1702 1 : .arg(tenant_id_arg.clone())
1703 1 : .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
1704 1 : .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))
1705 1 : .arg(pg_version_arg.clone())
1706 1 : .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
1707 1 : .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
1708 1 : .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
1709 1 : .arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
1710 1 : .arg(Arg::new("placement-policy").value_parser(value_parser!(String)).long("placement-policy").action(ArgAction::Set).help("Placement policy shards in this tenant"))
1711 1 : )
1712 1 : .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
1713 1 : .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
1714 1 : .subcommand(Command::new("config")
1715 1 : .arg(tenant_id_arg.clone())
1716 1 : .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
1717 1 : .subcommand(Command::new("import").arg(tenant_id_arg.clone().required(true))
1718 1 : .about("Import a tenant that is present in remote storage, and create branches for its timelines"))
1719 1 : )
1720 1 : .subcommand(
1721 1 : Command::new("pageserver")
1722 1 : .arg_required_else_help(true)
1723 1 : .about("Manage pageserver")
1724 1 : .arg(pageserver_id_arg)
1725 1 : .subcommand(Command::new("status"))
1726 1 : .subcommand(Command::new("start")
1727 1 : .about("Start local pageserver")
1728 1 : .arg(timeout_arg.clone())
1729 1 : )
1730 1 : .subcommand(Command::new("stop")
1731 1 : .about("Stop local pageserver")
1732 1 : .arg(stop_mode_arg.clone())
1733 1 : )
1734 1 : .subcommand(Command::new("restart")
1735 1 : .about("Restart local pageserver")
1736 1 : .arg(timeout_arg.clone())
1737 1 : )
1738 1 : )
1739 1 : .subcommand(
1740 1 : Command::new("storage_controller")
1741 1 : .arg_required_else_help(true)
1742 1 : .about("Manage storage_controller")
1743 1 : .subcommand(Command::new("start").about("Start storage controller")
1744 1 : .arg(timeout_arg.clone())
1745 1 : .arg(instance_id.clone())
1746 1 : .arg(base_port))
1747 1 : .subcommand(Command::new("stop").about("Stop storage controller")
1748 1 : .arg(stop_mode_arg.clone())
1749 1 : .arg(instance_id))
1750 1 : )
1751 1 : .subcommand(
1752 1 : Command::new("storage_broker")
1753 1 : .arg_required_else_help(true)
1754 1 : .about("Manage broker")
1755 1 : .subcommand(Command::new("start")
1756 1 : .about("Start broker")
1757 1 : .arg(timeout_arg.clone())
1758 1 : )
1759 1 : .subcommand(Command::new("stop")
1760 1 : .about("Stop broker")
1761 1 : .arg(stop_mode_arg.clone())
1762 1 : )
1763 1 : )
1764 1 : .subcommand(
1765 1 : Command::new("safekeeper")
1766 1 : .arg_required_else_help(true)
1767 1 : .about("Manage safekeepers")
1768 1 : .subcommand(Command::new("start")
1769 1 : .about("Start local safekeeper")
1770 1 : .arg(safekeeper_id_arg.clone())
1771 1 : .arg(safekeeper_extra_opt_arg.clone())
1772 1 : .arg(timeout_arg.clone())
1773 1 : )
1774 1 : .subcommand(Command::new("stop")
1775 1 : .about("Stop local safekeeper")
1776 1 : .arg(safekeeper_id_arg.clone())
1777 1 : .arg(stop_mode_arg.clone())
1778 1 : )
1779 1 : .subcommand(Command::new("restart")
1780 1 : .about("Restart local safekeeper")
1781 1 : .arg(safekeeper_id_arg)
1782 1 : .arg(stop_mode_arg.clone())
1783 1 : .arg(safekeeper_extra_opt_arg)
1784 1 : .arg(timeout_arg.clone())
1785 1 : )
1786 1 : )
1787 1 : .subcommand(
1788 1 : Command::new("endpoint")
1789 1 : .arg_required_else_help(true)
1790 1 : .about("Manage postgres instances")
1791 1 : .subcommand(Command::new("list").arg(tenant_id_arg.clone()))
1792 1 : .subcommand(Command::new("create")
1793 1 : .about("Create a compute endpoint")
1794 1 : .arg(endpoint_id_arg.clone())
1795 1 : .arg(branch_name_arg.clone())
1796 1 : .arg(tenant_id_arg.clone())
1797 1 : .arg(lsn_arg.clone())
1798 1 : .arg(pg_port_arg.clone())
1799 1 : .arg(http_port_arg.clone())
1800 1 : .arg(endpoint_pageserver_id_arg.clone())
1801 1 : .arg(
1802 1 : Arg::new("config-only")
1803 1 : .help("Don't do basebackup, create endpoint directory with only config files")
1804 1 : .long("config-only")
1805 1 : .required(false))
1806 1 : .arg(pg_version_arg.clone())
1807 1 : .arg(hot_standby_arg.clone())
1808 1 : .arg(update_catalog)
1809 1 : .arg(allow_multiple.clone())
1810 1 : )
1811 1 : .subcommand(Command::new("start")
1812 1 : .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
1813 1 : .arg(endpoint_id_arg.clone())
1814 1 : .arg(endpoint_pageserver_id_arg.clone())
1815 1 : .arg(safekeepers_arg.clone())
1816 1 : .arg(remote_ext_config_args)
1817 1 : .arg(create_test_user)
1818 1 : .arg(allow_multiple.clone())
1819 1 : .arg(timeout_arg.clone())
1820 1 : )
1821 1 : .subcommand(Command::new("reconfigure")
1822 1 : .about("Reconfigure the endpoint")
1823 1 : .arg(endpoint_pageserver_id_arg)
1824 1 : .arg(safekeepers_arg)
1825 1 : .arg(endpoint_id_arg.clone())
1826 1 : .arg(tenant_id_arg.clone())
1827 1 : )
1828 1 : .subcommand(
1829 1 : Command::new("stop")
1830 1 : .arg(endpoint_id_arg)
1831 1 : .arg(
1832 1 : Arg::new("destroy")
1833 1 : .help("Also delete data directory (now optional, should be default in future)")
1834 1 : .long("destroy")
1835 1 : .action(ArgAction::SetTrue)
1836 1 : .required(false)
1837 1 : )
1838 1 : .arg(
1839 1 : Arg::new("mode")
1840 1 : .help("Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")
1841 1 : .long("mode")
1842 1 : .action(ArgAction::Set)
1843 1 : .required(false)
1844 1 : .value_parser(["smart", "fast", "immediate"])
1845 1 : .default_value("fast")
1846 1 : )
1847 1 : )
1848 1 :
1849 1 : )
1850 1 : .subcommand(
1851 1 : Command::new("mappings")
1852 1 : .arg_required_else_help(true)
1853 1 : .about("Manage neon_local branch name mappings")
1854 1 : .subcommand(
1855 1 : Command::new("map")
1856 1 : .about("Create new mapping which cannot exist already")
1857 1 : .arg(branch_name_arg.clone())
1858 1 : .arg(tenant_id_arg.clone())
1859 1 : .arg(timeline_id_arg.clone())
1860 1 : )
1861 1 : )
1862 1 : // Obsolete old name for 'endpoint'. We now just print an error if it's used.
1863 1 : .subcommand(
1864 1 : Command::new("pg")
1865 1 : .hide(true)
1866 1 : .arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
1867 1 : .trailing_var_arg(true)
1868 1 : )
1869 1 : .subcommand(
1870 1 : Command::new("start")
1871 1 : .about("Start page server and safekeepers")
1872 1 : .arg(timeout_arg.clone())
1873 1 : )
1874 1 : .subcommand(
1875 1 : Command::new("stop")
1876 1 : .about("Stop page server and safekeepers")
1877 1 : .arg(stop_mode_arg)
1878 1 : )
1879 1 : }
1880 :
1881 : #[test]
1882 1 : fn verify_cli() {
1883 1 : cli().debug_assert();
1884 1 : }
|