LCOV - code coverage report
Current view: top level - control_plane/src/bin - neon_local.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 26.4 % 1492 394
Test Date: 2024-09-19 20:36:02 Functions: 1.8 % 110 2

            Line data    Source code
       1              : //!
       2              : //! `neon_local` is an executable that can be used to create a local
       3              : //! Neon environment, for testing purposes. The local environment is
       4              : //! quite different from the cloud environment with Kubernetes, but it
       5              : //! easier to work with locally. The python tests in `test_runner`
       6              : //! rely on `neon_local` to set up the environment for each test.
       7              : //!
       8              : use anyhow::{anyhow, bail, Context, Result};
       9              : use clap::{value_parser, Arg, ArgAction, ArgMatches, Command, ValueEnum};
      10              : use compute_api::spec::ComputeMode;
      11              : use control_plane::endpoint::ComputeControlPlane;
      12              : use control_plane::local_env::{
      13              :     InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
      14              :     SafekeeperConf,
      15              : };
      16              : use control_plane::pageserver::PageServerNode;
      17              : use control_plane::safekeeper::SafekeeperNode;
      18              : use control_plane::storage_controller::{
      19              :     NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
      20              : };
      21              : use control_plane::{broker, local_env};
      22              : use pageserver_api::config::{
      23              :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
      24              :     DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
      25              : };
      26              : use pageserver_api::controller_api::{
      27              :     NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
      28              : };
      29              : use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
      30              : use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
      31              : use postgres_backend::AuthType;
      32              : use postgres_connection::parse_host_port;
      33              : use safekeeper_api::{
      34              :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
      35              :     DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
      36              : };
      37              : use std::borrow::Cow;
      38              : use std::collections::{BTreeSet, HashMap};
      39              : use std::path::PathBuf;
      40              : use std::process::exit;
      41              : use std::str::FromStr;
      42              : use std::time::Duration;
      43              : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
      44              : use tokio::task::JoinSet;
      45              : use url::Host;
      46              : use utils::{
      47              :     auth::{Claims, Scope},
      48              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      49              :     lsn::Lsn,
      50              :     project_git_version,
      51              : };
      52              : 
      53              : // Default id of a safekeeper node, if not specified on the command line.
      54              : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
      55              : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
      56              : const DEFAULT_BRANCH_NAME: &str = "main";
      57              : project_git_version!(GIT_VERSION);
      58              : 
      59              : const DEFAULT_PG_VERSION: &str = "16";
      60              : 
      61              : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
      62              : 
      63              : ///
      64              : /// Timelines tree element used as a value in the HashMap.
      65              : ///
      66              : struct TimelineTreeEl {
      67              :     /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
      68              :     pub info: TimelineInfo,
      69              :     /// Name, recovered from neon config mappings
      70              :     pub name: Option<String>,
      71              :     /// Holds all direct children of this timeline referenced using `timeline_id`.
      72              :     pub children: BTreeSet<TimelineId>,
      73              : }
      74              : 
      75              : // Main entry point for the 'neon_local' CLI utility
      76              : //
      77              : // This utility helps to manage neon installation. That includes following:
      78              : //   * Management of local postgres installations running on top of the
      79              : //     pageserver.
      80              : //   * Providing CLI api to the pageserver
      81              : //   * TODO: export/import to/from usual postgres
      82            0 : fn main() -> Result<()> {
      83            0 :     let matches = cli().get_matches();
      84              : 
      85            0 :     let (sub_name, sub_args) = match matches.subcommand() {
      86            0 :         Some(subcommand_data) => subcommand_data,
      87            0 :         None => bail!("no subcommand provided"),
      88              :     };
      89              : 
      90              :     // Check for 'neon init' command first.
      91            0 :     let subcommand_result = if sub_name == "init" {
      92            0 :         handle_init(sub_args).map(|env| Some(Cow::Owned(env)))
      93              :     } else {
      94              :         // all other commands need an existing config
      95              : 
      96            0 :         let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
      97            0 :         let original_env = env.clone();
      98            0 :         let env = Box::leak(Box::new(env));
      99            0 :         let rt = tokio::runtime::Builder::new_current_thread()
     100            0 :             .enable_all()
     101            0 :             .build()
     102            0 :             .unwrap();
     103              : 
     104            0 :         let subcommand_result = match sub_name {
     105            0 :             "tenant" => rt.block_on(handle_tenant(sub_args, env)),
     106            0 :             "timeline" => rt.block_on(handle_timeline(sub_args, env)),
     107            0 :             "start" => rt.block_on(handle_start_all(env, get_start_timeout(sub_args))),
     108            0 :             "stop" => rt.block_on(handle_stop_all(sub_args, env)),
     109            0 :             "pageserver" => rt.block_on(handle_pageserver(sub_args, env)),
     110            0 :             "storage_controller" => rt.block_on(handle_storage_controller(sub_args, env)),
     111            0 :             "storage_broker" => rt.block_on(handle_storage_broker(sub_args, env)),
     112            0 :             "safekeeper" => rt.block_on(handle_safekeeper(sub_args, env)),
     113            0 :             "endpoint" => rt.block_on(handle_endpoint(sub_args, env)),
     114            0 :             "mappings" => handle_mappings(sub_args, env),
     115            0 :             "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
     116            0 :             _ => bail!("unexpected subcommand {sub_name}"),
     117              :         };
     118              : 
     119            0 :         if &original_env != env {
     120            0 :             subcommand_result.map(|()| Some(Cow::Borrowed(env)))
     121              :         } else {
     122            0 :             subcommand_result.map(|()| None)
     123              :         }
     124              :     };
     125              : 
     126            0 :     match subcommand_result {
     127            0 :         Ok(Some(updated_env)) => updated_env.persist_config()?,
     128            0 :         Ok(None) => (),
     129            0 :         Err(e) => {
     130            0 :             eprintln!("command failed: {e:?}");
     131            0 :             exit(1);
     132              :         }
     133              :     }
     134            0 :     Ok(())
     135            0 : }
     136              : 
     137              : ///
     138              : /// Prints timelines list as a tree-like structure.
     139              : ///
     140            0 : fn print_timelines_tree(
     141            0 :     timelines: Vec<TimelineInfo>,
     142            0 :     mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
     143            0 : ) -> Result<()> {
     144            0 :     let mut timelines_hash = timelines
     145            0 :         .iter()
     146            0 :         .map(|t| {
     147            0 :             (
     148            0 :                 t.timeline_id,
     149            0 :                 TimelineTreeEl {
     150            0 :                     info: t.clone(),
     151            0 :                     children: BTreeSet::new(),
     152            0 :                     name: timeline_name_mappings
     153            0 :                         .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
     154            0 :                 },
     155            0 :             )
     156            0 :         })
     157            0 :         .collect::<HashMap<_, _>>();
     158              : 
     159              :     // Memorize all direct children of each timeline.
     160            0 :     for timeline in timelines.iter() {
     161            0 :         if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
     162            0 :             timelines_hash
     163            0 :                 .get_mut(&ancestor_timeline_id)
     164            0 :                 .context("missing timeline info in the HashMap")?
     165              :                 .children
     166            0 :                 .insert(timeline.timeline_id);
     167            0 :         }
     168              :     }
     169              : 
     170            0 :     for timeline in timelines_hash.values() {
     171              :         // Start with root local timelines (no ancestors) first.
     172            0 :         if timeline.info.ancestor_timeline_id.is_none() {
     173            0 :             print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
     174            0 :         }
     175              :     }
     176              : 
     177            0 :     Ok(())
     178            0 : }
     179              : 
     180              : ///
     181              : /// Recursively prints timeline info with all its children.
     182              : ///
     183            0 : fn print_timeline(
     184            0 :     nesting_level: usize,
     185            0 :     is_last: &[bool],
     186            0 :     timeline: &TimelineTreeEl,
     187            0 :     timelines: &HashMap<TimelineId, TimelineTreeEl>,
     188            0 : ) -> Result<()> {
     189            0 :     if nesting_level > 0 {
     190            0 :         let ancestor_lsn = match timeline.info.ancestor_lsn {
     191            0 :             Some(lsn) => lsn.to_string(),
     192            0 :             None => "Unknown Lsn".to_string(),
     193              :         };
     194              : 
     195            0 :         let mut br_sym = "┣━";
     196            0 : 
     197            0 :         // Draw each nesting padding with proper style
     198            0 :         // depending on whether its timeline ended or not.
     199            0 :         if nesting_level > 1 {
     200            0 :             for l in &is_last[1..is_last.len() - 1] {
     201            0 :                 if *l {
     202            0 :                     print!("   ");
     203            0 :                 } else {
     204            0 :                     print!("┃  ");
     205            0 :                 }
     206              :             }
     207            0 :         }
     208              : 
     209              :         // We are the last in this sub-timeline
     210            0 :         if *is_last.last().unwrap() {
     211            0 :             br_sym = "┗━";
     212            0 :         }
     213              : 
     214            0 :         print!("{} @{}: ", br_sym, ancestor_lsn);
     215            0 :     }
     216              : 
     217              :     // Finally print a timeline id and name with new line
     218            0 :     println!(
     219            0 :         "{} [{}]",
     220            0 :         timeline.name.as_deref().unwrap_or("_no_name_"),
     221            0 :         timeline.info.timeline_id
     222            0 :     );
     223            0 : 
     224            0 :     let len = timeline.children.len();
     225            0 :     let mut i: usize = 0;
     226            0 :     let mut is_last_new = Vec::from(is_last);
     227            0 :     is_last_new.push(false);
     228              : 
     229            0 :     for child in &timeline.children {
     230            0 :         i += 1;
     231            0 : 
     232            0 :         // Mark that the last padding is the end of the timeline
     233            0 :         if i == len {
     234            0 :             if let Some(last) = is_last_new.last_mut() {
     235            0 :                 *last = true;
     236            0 :             }
     237            0 :         }
     238              : 
     239              :         print_timeline(
     240            0 :             nesting_level + 1,
     241            0 :             &is_last_new,
     242            0 :             timelines
     243            0 :                 .get(child)
     244            0 :                 .context("missing timeline info in the HashMap")?,
     245            0 :             timelines,
     246            0 :         )?;
     247              :     }
     248              : 
     249            0 :     Ok(())
     250            0 : }
     251              : 
     252              : /// Returns a map of timeline IDs to timeline_id@lsn strings.
     253              : /// Connects to the pageserver to query this information.
     254            0 : async fn get_timeline_infos(
     255            0 :     env: &local_env::LocalEnv,
     256            0 :     tenant_shard_id: &TenantShardId,
     257            0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
     258            0 :     Ok(get_default_pageserver(env)
     259            0 :         .timeline_list(tenant_shard_id)
     260            0 :         .await?
     261            0 :         .into_iter()
     262            0 :         .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
     263            0 :         .collect())
     264            0 : }
     265              : 
     266              : // Helper function to parse --tenant_id option, or get the default from config file
     267            0 : fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
     268            0 :     if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
     269            0 :         tenant_id_from_arguments
     270            0 :     } else if let Some(default_id) = env.default_tenant_id {
     271            0 :         Ok(default_id)
     272              :     } else {
     273            0 :         anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
     274              :     }
     275            0 : }
     276              : 
     277              : // Helper function to parse --tenant_id option, for commands that accept a shard suffix
     278            0 : fn get_tenant_shard_id(
     279            0 :     sub_match: &ArgMatches,
     280            0 :     env: &local_env::LocalEnv,
     281            0 : ) -> anyhow::Result<TenantShardId> {
     282            0 :     if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
     283            0 :         tenant_id_from_arguments
     284            0 :     } else if let Some(default_id) = env.default_tenant_id {
     285            0 :         Ok(TenantShardId::unsharded(default_id))
     286              :     } else {
     287            0 :         anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
     288              :     }
     289            0 : }
     290              : 
     291            0 : fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
     292            0 :     sub_match
     293            0 :         .get_one::<String>("tenant-id")
     294            0 :         .map(|tenant_id| TenantId::from_str(tenant_id))
     295            0 :         .transpose()
     296            0 :         .context("Failed to parse tenant id from the argument string")
     297            0 : }
     298              : 
     299            0 : fn parse_tenant_shard_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantShardId>> {
     300            0 :     sub_match
     301            0 :         .get_one::<String>("tenant-id")
     302            0 :         .map(|id_str| TenantShardId::from_str(id_str))
     303            0 :         .transpose()
     304            0 :         .context("Failed to parse tenant shard id from the argument string")
     305            0 : }
     306              : 
     307            0 : fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
     308            0 :     sub_match
     309            0 :         .get_one::<String>("timeline-id")
     310            0 :         .map(|timeline_id| TimelineId::from_str(timeline_id))
     311            0 :         .transpose()
     312            0 :         .context("Failed to parse timeline id from the argument string")
     313            0 : }
     314              : 
     315            0 : fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
     316            0 :     let num_pageservers = init_match.get_one::<u16>("num-pageservers");
     317            0 : 
     318            0 :     let force = init_match.get_one("force").expect("we set a default value");
     319              : 
     320              :     // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
     321            0 :     let init_conf: NeonLocalInitConf = if let Some(config_path) =
     322            0 :         init_match.get_one::<PathBuf>("config")
     323              :     {
     324              :         // User (likely the Python test suite) provided a description of the environment.
     325            0 :         if num_pageservers.is_some() {
     326            0 :             bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
     327            0 :         }
     328              :         // load and parse the file
     329            0 :         let contents = std::fs::read_to_string(config_path).with_context(|| {
     330            0 :             format!(
     331            0 :                 "Could not read configuration file '{}'",
     332            0 :                 config_path.display()
     333            0 :             )
     334            0 :         })?;
     335            0 :         toml_edit::de::from_str(&contents)?
     336              :     } else {
     337              :         // User (likely interactive) did not provide a description of the environment, give them the default
     338            0 :         NeonLocalInitConf {
     339            0 :             control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
     340            0 :             broker: NeonBroker {
     341            0 :                 listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
     342            0 :             },
     343            0 :             safekeepers: vec![SafekeeperConf {
     344            0 :                 id: DEFAULT_SAFEKEEPER_ID,
     345            0 :                 pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
     346            0 :                 http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
     347            0 :                 ..Default::default()
     348            0 :             }],
     349            0 :             pageservers: (0..num_pageservers.copied().unwrap_or(1))
     350            0 :                 .map(|i| {
     351            0 :                     let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
     352            0 :                     let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
     353            0 :                     let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
     354            0 :                     NeonLocalInitPageserverConf {
     355            0 :                         id: pageserver_id,
     356            0 :                         listen_pg_addr: format!("127.0.0.1:{pg_port}"),
     357            0 :                         listen_http_addr: format!("127.0.0.1:{http_port}"),
     358            0 :                         pg_auth_type: AuthType::Trust,
     359            0 :                         http_auth_type: AuthType::Trust,
     360            0 :                         other: Default::default(),
     361            0 :                     }
     362            0 :                 })
     363            0 :                 .collect(),
     364            0 :             pg_distrib_dir: None,
     365            0 :             neon_distrib_dir: None,
     366            0 :             default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
     367            0 :             storage_controller: None,
     368            0 :             control_plane_compute_hook_api: None,
     369            0 :         }
     370              :     };
     371              : 
     372            0 :     LocalEnv::init(init_conf, force)
     373            0 :         .context("materialize initial neon_local environment on disk")?;
     374            0 :     Ok(LocalEnv::load_config(&local_env::base_path())
     375            0 :         .expect("freshly written config should be loadable"))
     376            0 : }
     377              : 
     378              : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
     379              : /// For typical interactive use, one would just run with a single pageserver.  Scenarios with
     380              : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
     381              : /// than this CLI.
     382            0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
     383            0 :     let ps_conf = env
     384            0 :         .pageservers
     385            0 :         .first()
     386            0 :         .expect("Config is validated to contain at least one pageserver");
     387            0 :     PageServerNode::from_env(env, ps_conf)
     388            0 : }
     389              : 
     390            0 : async fn handle_tenant(
     391            0 :     tenant_match: &ArgMatches,
     392            0 :     env: &mut local_env::LocalEnv,
     393            0 : ) -> anyhow::Result<()> {
     394            0 :     let pageserver = get_default_pageserver(env);
     395            0 :     match tenant_match.subcommand() {
     396            0 :         Some(("list", _)) => {
     397            0 :             for t in pageserver.tenant_list().await? {
     398            0 :                 println!("{} {:?}", t.id, t.state);
     399            0 :             }
     400              :         }
     401            0 :         Some(("import", import_match)) => {
     402            0 :             let tenant_id = parse_tenant_id(import_match)?.unwrap_or_else(TenantId::generate);
     403            0 : 
     404            0 :             let storage_controller = StorageController::from_env(env);
     405            0 :             let create_response = storage_controller.tenant_import(tenant_id).await?;
     406              : 
     407            0 :             let shard_zero = create_response
     408            0 :                 .shards
     409            0 :                 .first()
     410            0 :                 .expect("Import response omitted shards");
     411            0 : 
     412            0 :             let attached_pageserver_id = shard_zero.node_id;
     413            0 :             let pageserver =
     414            0 :                 PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
     415              : 
     416            0 :             println!(
     417            0 :                 "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
     418            0 :             );
     419              : 
     420            0 :             let timelines = pageserver
     421            0 :                 .http_client
     422            0 :                 .list_timelines(shard_zero.shard_id)
     423            0 :                 .await?;
     424              : 
     425              :             // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
     426            0 :             let main_timeline = timelines
     427            0 :                 .iter()
     428            0 :                 .find(|t| t.ancestor_timeline_id.is_none())
     429            0 :                 .expect("No timelines found")
     430            0 :                 .timeline_id;
     431            0 : 
     432            0 :             let mut branch_i = 0;
     433            0 :             for timeline in timelines.iter() {
     434            0 :                 let branch_name = if timeline.timeline_id == main_timeline {
     435            0 :                     "main".to_string()
     436              :                 } else {
     437            0 :                     branch_i += 1;
     438            0 :                     format!("branch_{branch_i}")
     439              :                 };
     440              : 
     441            0 :                 println!(
     442            0 :                     "Importing timeline {tenant_id}/{} as branch {branch_name}",
     443            0 :                     timeline.timeline_id
     444            0 :                 );
     445            0 : 
     446            0 :                 env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
     447              :             }
     448              :         }
     449            0 :         Some(("create", create_match)) => {
     450            0 :             let tenant_conf: HashMap<_, _> = create_match
     451            0 :                 .get_many::<String>("config")
     452            0 :                 .map(|vals: clap::parser::ValuesRef<'_, String>| {
     453            0 :                     vals.flat_map(|c| c.split_once(':')).collect()
     454            0 :                 })
     455            0 :                 .unwrap_or_default();
     456            0 : 
     457            0 :             let shard_count: u8 = create_match
     458            0 :                 .get_one::<u8>("shard-count")
     459            0 :                 .cloned()
     460            0 :                 .unwrap_or(0);
     461            0 : 
     462            0 :             let shard_stripe_size: Option<u32> =
     463            0 :                 create_match.get_one::<u32>("shard-stripe-size").cloned();
     464              : 
     465            0 :             let placement_policy = match create_match.get_one::<String>("placement-policy") {
     466            0 :                 Some(s) if !s.is_empty() => serde_json::from_str::<PlacementPolicy>(s)?,
     467            0 :                 _ => PlacementPolicy::Attached(0),
     468              :             };
     469              : 
     470            0 :             let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
     471              : 
     472              :             // If tenant ID was not specified, generate one
     473            0 :             let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
     474            0 : 
     475            0 :             // We must register the tenant with the storage controller, so
     476            0 :             // that when the pageserver restarts, it will be re-attached.
     477            0 :             let storage_controller = StorageController::from_env(env);
     478            0 :             storage_controller
     479            0 :                 .tenant_create(TenantCreateRequest {
     480            0 :                     // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
     481            0 :                     // storage controller expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
     482            0 :                     // type is used both in storage controller (for creating tenants) and in pageserver (for creating shards)
     483            0 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
     484            0 :                     generation: None,
     485            0 :                     shard_parameters: ShardParameters {
     486            0 :                         count: ShardCount::new(shard_count),
     487            0 :                         stripe_size: shard_stripe_size
     488            0 :                             .map(ShardStripeSize)
     489            0 :                             .unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
     490            0 :                     },
     491            0 :                     placement_policy: Some(placement_policy),
     492            0 :                     config: tenant_conf,
     493            0 :                 })
     494            0 :                 .await?;
     495            0 :             println!("tenant {tenant_id} successfully created on the pageserver");
     496              : 
     497              :             // Create an initial timeline for the new tenant
     498            0 :             let new_timeline_id =
     499            0 :                 parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate());
     500            0 :             let pg_version = create_match
     501            0 :                 .get_one::<u32>("pg-version")
     502            0 :                 .copied()
     503            0 :                 .context("Failed to parse postgres version from the argument string")?;
     504              : 
     505              :             // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
     506              :             // different shards picking different start lsns.  Maybe we have to teach storage controller
     507              :             // to let shard 0 branch first and then propagate the chosen LSN to other shards.
     508            0 :             storage_controller
     509            0 :                 .tenant_timeline_create(
     510            0 :                     tenant_id,
     511            0 :                     TimelineCreateRequest {
     512            0 :                         new_timeline_id,
     513            0 :                         ancestor_timeline_id: None,
     514            0 :                         ancestor_start_lsn: None,
     515            0 :                         existing_initdb_timeline_id: None,
     516            0 :                         pg_version: Some(pg_version),
     517            0 :                     },
     518            0 :                 )
     519            0 :                 .await?;
     520              : 
     521            0 :             env.register_branch_mapping(
     522            0 :                 DEFAULT_BRANCH_NAME.to_string(),
     523            0 :                 tenant_id,
     524            0 :                 new_timeline_id,
     525            0 :             )?;
     526              : 
     527            0 :             println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
     528            0 : 
     529            0 :             if create_match.get_flag("set-default") {
     530            0 :                 println!("Setting tenant {tenant_id} as a default one");
     531            0 :                 env.default_tenant_id = Some(tenant_id);
     532            0 :             }
     533              :         }
     534            0 :         Some(("set-default", set_default_match)) => {
     535            0 :             let tenant_id =
     536            0 :                 parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
     537            0 :             println!("Setting tenant {tenant_id} as a default one");
     538            0 :             env.default_tenant_id = Some(tenant_id);
     539              :         }
     540            0 :         Some(("config", create_match)) => {
     541            0 :             let tenant_id = get_tenant_id(create_match, env)?;
     542            0 :             let tenant_conf: HashMap<_, _> = create_match
     543            0 :                 .get_many::<String>("config")
     544            0 :                 .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
     545            0 :                 .unwrap_or_default();
     546            0 : 
     547            0 :             pageserver
     548            0 :                 .tenant_config(tenant_id, tenant_conf)
     549            0 :                 .await
     550            0 :                 .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
     551            0 :             println!("tenant {tenant_id} successfully configured on the pageserver");
     552              :         }
     553              : 
     554            0 :         Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
     555            0 :         None => bail!("no tenant subcommand provided"),
     556              :     }
     557            0 :     Ok(())
     558            0 : }
     559              : 
     560            0 : async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
     561            0 :     let pageserver = get_default_pageserver(env);
     562            0 : 
     563            0 :     match timeline_match.subcommand() {
     564            0 :         Some(("list", list_match)) => {
     565              :             // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
     566              :             // where shard 0 is attached, and query there.
     567            0 :             let tenant_shard_id = get_tenant_shard_id(list_match, env)?;
     568            0 :             let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
     569            0 :             print_timelines_tree(timelines, env.timeline_name_mappings())?;
     570              :         }
     571            0 :         Some(("create", create_match)) => {
     572            0 :             let tenant_id = get_tenant_id(create_match, env)?;
     573            0 :             let new_branch_name = create_match
     574            0 :                 .get_one::<String>("branch-name")
     575            0 :                 .ok_or_else(|| anyhow!("No branch name provided"))?;
     576              : 
     577            0 :             let pg_version = create_match
     578            0 :                 .get_one::<u32>("pg-version")
     579            0 :                 .copied()
     580            0 :                 .context("Failed to parse postgres version from the argument string")?;
     581              : 
     582            0 :             let new_timeline_id_opt = parse_timeline_id(create_match)?;
     583            0 :             let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
     584            0 : 
     585            0 :             let storage_controller = StorageController::from_env(env);
     586            0 :             let create_req = TimelineCreateRequest {
     587            0 :                 new_timeline_id,
     588            0 :                 ancestor_timeline_id: None,
     589            0 :                 existing_initdb_timeline_id: None,
     590            0 :                 ancestor_start_lsn: None,
     591            0 :                 pg_version: Some(pg_version),
     592            0 :             };
     593            0 :             let timeline_info = storage_controller
     594            0 :                 .tenant_timeline_create(tenant_id, create_req)
     595            0 :                 .await?;
     596              : 
     597            0 :             let last_record_lsn = timeline_info.last_record_lsn;
     598            0 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
     599              : 
     600            0 :             println!(
     601            0 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
     602            0 :                 timeline_info.timeline_id
     603            0 :             );
     604              :         }
     605            0 :         Some(("import", import_match)) => {
     606            0 :             let tenant_id = get_tenant_id(import_match, env)?;
     607            0 :             let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
     608            0 :             let branch_name = import_match
     609            0 :                 .get_one::<String>("branch-name")
     610            0 :                 .ok_or_else(|| anyhow!("No branch name provided"))?;
     611              : 
     612              :             // Parse base inputs
     613            0 :             let base_tarfile = import_match
     614            0 :                 .get_one::<PathBuf>("base-tarfile")
     615            0 :                 .ok_or_else(|| anyhow!("No base-tarfile provided"))?
     616            0 :                 .to_owned();
     617            0 :             let base_lsn = Lsn::from_str(
     618            0 :                 import_match
     619            0 :                     .get_one::<String>("base-lsn")
     620            0 :                     .ok_or_else(|| anyhow!("No base-lsn provided"))?,
     621            0 :             )?;
     622            0 :             let base = (base_lsn, base_tarfile);
     623            0 : 
     624            0 :             // Parse pg_wal inputs
     625            0 :             let wal_tarfile = import_match.get_one::<PathBuf>("wal-tarfile").cloned();
     626            0 :             let end_lsn = import_match
     627            0 :                 .get_one::<String>("end-lsn")
     628            0 :                 .map(|s| Lsn::from_str(s).unwrap());
     629            0 :             // TODO validate both or none are provided
     630            0 :             let pg_wal = end_lsn.zip(wal_tarfile);
     631              : 
     632            0 :             let pg_version = import_match
     633            0 :                 .get_one::<u32>("pg-version")
     634            0 :                 .copied()
     635            0 :                 .context("Failed to parse postgres version from the argument string")?;
     636              : 
     637            0 :             println!("Importing timeline into pageserver ...");
     638            0 :             pageserver
     639            0 :                 .timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
     640            0 :                 .await?;
     641            0 :             env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
     642            0 :             println!("Done");
     643              :         }
     644            0 :         Some(("branch", branch_match)) => {
     645            0 :             let tenant_id = get_tenant_id(branch_match, env)?;
     646            0 :             let new_timeline_id =
     647            0 :                 parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
     648            0 :             let new_branch_name = branch_match
     649            0 :                 .get_one::<String>("branch-name")
     650            0 :                 .ok_or_else(|| anyhow!("No branch name provided"))?;
     651            0 :             let ancestor_branch_name = branch_match
     652            0 :                 .get_one::<String>("ancestor-branch-name")
     653            0 :                 .map(|s| s.as_str())
     654            0 :                 .unwrap_or(DEFAULT_BRANCH_NAME);
     655            0 :             let ancestor_timeline_id = env
     656            0 :                 .get_branch_timeline_id(ancestor_branch_name, tenant_id)
     657            0 :                 .ok_or_else(|| {
     658            0 :                     anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
     659            0 :                 })?;
     660              : 
     661            0 :             let start_lsn = branch_match
     662            0 :                 .get_one::<String>("ancestor-start-lsn")
     663            0 :                 .map(|lsn_str| Lsn::from_str(lsn_str))
     664            0 :                 .transpose()
     665            0 :                 .context("Failed to parse ancestor start Lsn from the request")?;
     666            0 :             let storage_controller = StorageController::from_env(env);
     667            0 :             let create_req = TimelineCreateRequest {
     668            0 :                 new_timeline_id,
     669            0 :                 ancestor_timeline_id: Some(ancestor_timeline_id),
     670            0 :                 existing_initdb_timeline_id: None,
     671            0 :                 ancestor_start_lsn: start_lsn,
     672            0 :                 pg_version: None,
     673            0 :             };
     674            0 :             let timeline_info = storage_controller
     675            0 :                 .tenant_timeline_create(tenant_id, create_req)
     676            0 :                 .await?;
     677              : 
     678            0 :             let last_record_lsn = timeline_info.last_record_lsn;
     679            0 : 
     680            0 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
     681              : 
     682            0 :             println!(
     683            0 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
     684            0 :                 timeline_info.timeline_id
     685            0 :             );
     686              :         }
     687            0 :         Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
     688            0 :         None => bail!("no tenant subcommand provided"),
     689              :     }
     690              : 
     691            0 :     Ok(())
     692            0 : }
     693              : 
     694            0 : async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
     695            0 :     let (sub_name, sub_args) = match ep_match.subcommand() {
     696            0 :         Some(ep_subcommand_data) => ep_subcommand_data,
     697            0 :         None => bail!("no endpoint subcommand provided"),
     698              :     };
     699            0 :     let mut cplane = ComputeControlPlane::load(env.clone())?;
     700              : 
     701            0 :     match sub_name {
     702            0 :         "list" => {
     703              :             // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
     704              :             // where shard 0 is attached, and query there.
     705            0 :             let tenant_shard_id = get_tenant_shard_id(sub_args, env)?;
     706            0 :             let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
     707            0 :                 .await
     708            0 :                 .unwrap_or_else(|e| {
     709            0 :                     eprintln!("Failed to load timeline info: {}", e);
     710            0 :                     HashMap::new()
     711            0 :                 });
     712            0 : 
     713            0 :             let timeline_name_mappings = env.timeline_name_mappings();
     714            0 : 
     715            0 :             let mut table = comfy_table::Table::new();
     716            0 : 
     717            0 :             table.load_preset(comfy_table::presets::NOTHING);
     718            0 : 
     719            0 :             table.set_header([
     720            0 :                 "ENDPOINT",
     721            0 :                 "ADDRESS",
     722            0 :                 "TIMELINE",
     723            0 :                 "BRANCH NAME",
     724            0 :                 "LSN",
     725            0 :                 "STATUS",
     726            0 :             ]);
     727              : 
     728            0 :             for (endpoint_id, endpoint) in cplane
     729            0 :                 .endpoints
     730            0 :                 .iter()
     731            0 :                 .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
     732              :             {
     733            0 :                 let lsn_str = match endpoint.mode {
     734            0 :                     ComputeMode::Static(lsn) => {
     735            0 :                         // -> read-only endpoint
     736            0 :                         // Use the node's LSN.
     737            0 :                         lsn.to_string()
     738              :                     }
     739              :                     _ => {
     740              :                         // -> primary endpoint or hot replica
     741              :                         // Use the LSN at the end of the timeline.
     742            0 :                         timeline_infos
     743            0 :                             .get(&endpoint.timeline_id)
     744            0 :                             .map(|bi| bi.last_record_lsn.to_string())
     745            0 :                             .unwrap_or_else(|| "?".to_string())
     746              :                     }
     747              :                 };
     748              : 
     749            0 :                 let branch_name = timeline_name_mappings
     750            0 :                     .get(&TenantTimelineId::new(
     751            0 :                         tenant_shard_id.tenant_id,
     752            0 :                         endpoint.timeline_id,
     753            0 :                     ))
     754            0 :                     .map(|name| name.as_str())
     755            0 :                     .unwrap_or("?");
     756            0 : 
     757            0 :                 table.add_row([
     758            0 :                     endpoint_id.as_str(),
     759            0 :                     &endpoint.pg_address.to_string(),
     760            0 :                     &endpoint.timeline_id.to_string(),
     761            0 :                     branch_name,
     762            0 :                     lsn_str.as_str(),
     763            0 :                     &format!("{}", endpoint.status()),
     764            0 :                 ]);
     765            0 :             }
     766              : 
     767            0 :             println!("{table}");
     768              :         }
     769            0 :         "create" => {
     770            0 :             let tenant_id = get_tenant_id(sub_args, env)?;
     771            0 :             let branch_name = sub_args
     772            0 :                 .get_one::<String>("branch-name")
     773            0 :                 .map(|s| s.as_str())
     774            0 :                 .unwrap_or(DEFAULT_BRANCH_NAME);
     775            0 :             let endpoint_id = sub_args
     776            0 :                 .get_one::<String>("endpoint_id")
     777            0 :                 .map(String::to_string)
     778            0 :                 .unwrap_or_else(|| format!("ep-{branch_name}"));
     779            0 :             let update_catalog = sub_args
     780            0 :                 .get_one::<bool>("update-catalog")
     781            0 :                 .cloned()
     782            0 :                 .unwrap_or_default();
     783              : 
     784            0 :             let lsn = sub_args
     785            0 :                 .get_one::<String>("lsn")
     786            0 :                 .map(|lsn_str| Lsn::from_str(lsn_str))
     787            0 :                 .transpose()
     788            0 :                 .context("Failed to parse Lsn from the request")?;
     789            0 :             let timeline_id = env
     790            0 :                 .get_branch_timeline_id(branch_name, tenant_id)
     791            0 :                 .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
     792              : 
     793            0 :             let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
     794            0 :             let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
     795            0 :             let pg_version = sub_args
     796            0 :                 .get_one::<u32>("pg-version")
     797            0 :                 .copied()
     798            0 :                 .context("Failed to parse postgres version from the argument string")?;
     799              : 
     800            0 :             let hot_standby = sub_args
     801            0 :                 .get_one::<bool>("hot-standby")
     802            0 :                 .copied()
     803            0 :                 .unwrap_or(false);
     804            0 : 
     805            0 :             let allow_multiple = sub_args.get_flag("allow-multiple");
     806              : 
     807            0 :             let mode = match (lsn, hot_standby) {
     808            0 :                 (Some(lsn), false) => ComputeMode::Static(lsn),
     809            0 :                 (None, true) => ComputeMode::Replica,
     810            0 :                 (None, false) => ComputeMode::Primary,
     811            0 :                 (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
     812              :             };
     813              : 
     814            0 :             match (mode, hot_standby) {
     815              :                 (ComputeMode::Static(_), true) => {
     816            0 :                     bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
     817              :                 }
     818              :                 (ComputeMode::Primary, true) => {
     819            0 :                     bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
     820              :                 }
     821            0 :                 _ => {}
     822            0 :             }
     823            0 : 
     824            0 :             if !allow_multiple {
     825            0 :                 cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
     826            0 :             }
     827              : 
     828            0 :             cplane.new_endpoint(
     829            0 :                 &endpoint_id,
     830            0 :                 tenant_id,
     831            0 :                 timeline_id,
     832            0 :                 pg_port,
     833            0 :                 http_port,
     834            0 :                 pg_version,
     835            0 :                 mode,
     836            0 :                 !update_catalog,
     837            0 :             )?;
     838              :         }
     839            0 :         "start" => {
     840            0 :             let endpoint_id = sub_args
     841            0 :                 .get_one::<String>("endpoint_id")
     842            0 :                 .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
     843              : 
     844            0 :             let pageserver_id =
     845            0 :                 if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
     846              :                     Some(NodeId(
     847            0 :                         id_str.parse().context("while parsing pageserver id")?,
     848              :                     ))
     849              :                 } else {
     850            0 :                     None
     851              :                 };
     852              : 
     853            0 :             let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
     854            0 : 
     855            0 :             let allow_multiple = sub_args.get_flag("allow-multiple");
     856              : 
     857              :             // If --safekeepers argument is given, use only the listed
     858              :             // safekeeper nodes; otherwise all from the env.
     859            0 :             let safekeepers = if let Some(safekeepers) = parse_safekeepers(sub_args)? {
     860            0 :                 safekeepers
     861              :             } else {
     862            0 :                 env.safekeepers.iter().map(|sk| sk.id).collect()
     863              :             };
     864              : 
     865            0 :             let endpoint = cplane
     866            0 :                 .endpoints
     867            0 :                 .get(endpoint_id.as_str())
     868            0 :                 .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
     869              : 
     870            0 :             let create_test_user = sub_args
     871            0 :                 .get_one::<bool>("create-test-user")
     872            0 :                 .cloned()
     873            0 :                 .unwrap_or_default();
     874            0 : 
     875            0 :             if !allow_multiple {
     876            0 :                 cplane.check_conflicting_endpoints(
     877            0 :                     endpoint.mode,
     878            0 :                     endpoint.tenant_id,
     879            0 :                     endpoint.timeline_id,
     880            0 :                 )?;
     881            0 :             }
     882              : 
     883            0 :             let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
     884            0 :                 let conf = env.get_pageserver_conf(pageserver_id).unwrap();
     885            0 :                 let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
     886            0 :                 (
     887            0 :                     vec![(parsed.0, parsed.1.unwrap_or(5432))],
     888            0 :                     // If caller is telling us what pageserver to use, this is not a tenant which is
     889            0 :                     // full managed by storage controller, therefore not sharded.
     890            0 :                     ShardParameters::DEFAULT_STRIPE_SIZE,
     891            0 :                 )
     892              :             } else {
     893              :                 // Look up the currently attached location of the tenant, and its striping metadata,
     894              :                 // to pass these on to postgres.
     895            0 :                 let storage_controller = StorageController::from_env(env);
     896            0 :                 let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
     897            0 :                 let pageservers = locate_result
     898            0 :                     .shards
     899            0 :                     .into_iter()
     900            0 :                     .map(|shard| {
     901            0 :                         (
     902            0 :                             Host::parse(&shard.listen_pg_addr)
     903            0 :                                 .expect("Storage controller reported bad hostname"),
     904            0 :                             shard.listen_pg_port,
     905            0 :                         )
     906            0 :                     })
     907            0 :                     .collect::<Vec<_>>();
     908            0 :                 let stripe_size = locate_result.shard_params.stripe_size;
     909            0 : 
     910            0 :                 (pageservers, stripe_size)
     911              :             };
     912            0 :             assert!(!pageservers.is_empty());
     913              : 
     914            0 :             let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
     915            0 :             let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
     916            0 :                 let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
     917            0 : 
     918            0 :                 Some(env.generate_auth_token(&claims)?)
     919              :             } else {
     920            0 :                 None
     921              :             };
     922              : 
     923            0 :             println!("Starting existing endpoint {endpoint_id}...");
     924            0 :             endpoint
     925            0 :                 .start(
     926            0 :                     &auth_token,
     927            0 :                     safekeepers,
     928            0 :                     pageservers,
     929            0 :                     remote_ext_config,
     930            0 :                     stripe_size.0 as usize,
     931            0 :                     create_test_user,
     932            0 :                 )
     933            0 :                 .await?;
     934              :         }
     935            0 :         "reconfigure" => {
     936            0 :             let endpoint_id = sub_args
     937            0 :                 .get_one::<String>("endpoint_id")
     938            0 :                 .ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?;
     939            0 :             let endpoint = cplane
     940            0 :                 .endpoints
     941            0 :                 .get(endpoint_id.as_str())
     942            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
     943            0 :             let pageservers =
     944            0 :                 if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
     945            0 :                     let ps_id = NodeId(id_str.parse().context("while parsing pageserver id")?);
     946            0 :                     let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
     947            0 :                     vec![(
     948            0 :                         pageserver.pg_connection_config.host().clone(),
     949            0 :                         pageserver.pg_connection_config.port(),
     950            0 :                     )]
     951              :                 } else {
     952            0 :                     let storage_controller = StorageController::from_env(env);
     953            0 :                     storage_controller
     954            0 :                         .tenant_locate(endpoint.tenant_id)
     955            0 :                         .await?
     956              :                         .shards
     957            0 :                         .into_iter()
     958            0 :                         .map(|shard| {
     959            0 :                             (
     960            0 :                                 Host::parse(&shard.listen_pg_addr)
     961            0 :                                     .expect("Storage controller reported malformed host"),
     962            0 :                                 shard.listen_pg_port,
     963            0 :                             )
     964            0 :                         })
     965            0 :                         .collect::<Vec<_>>()
     966              :                 };
     967              :             // If --safekeepers argument is given, use only the listed
     968              :             // safekeeper nodes; otherwise all from the env.
     969            0 :             let safekeepers = parse_safekeepers(sub_args)?;
     970            0 :             endpoint.reconfigure(pageservers, None, safekeepers).await?;
     971              :         }
     972            0 :         "stop" => {
     973            0 :             let endpoint_id = sub_args
     974            0 :                 .get_one::<String>("endpoint_id")
     975            0 :                 .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
     976            0 :             let destroy = sub_args.get_flag("destroy");
     977            0 :             let mode = sub_args.get_one::<String>("mode").expect("has a default");
     978              : 
     979            0 :             let endpoint = cplane
     980            0 :                 .endpoints
     981            0 :                 .get(endpoint_id.as_str())
     982            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
     983            0 :             endpoint.stop(mode, destroy)?;
     984              :         }
     985              : 
     986            0 :         _ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
     987              :     }
     988              : 
     989            0 :     Ok(())
     990            0 : }
     991              : 
     992              : /// Parse --safekeepers as list of safekeeper ids.
     993            0 : fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
     994            0 :     if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
     995            0 :         let mut safekeepers: Vec<NodeId> = Vec::new();
     996            0 :         for sk_id in safekeepers_str.split(',').map(str::trim) {
     997            0 :             let sk_id = NodeId(
     998            0 :                 u64::from_str(sk_id)
     999            0 :                     .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
    1000              :             );
    1001            0 :             safekeepers.push(sk_id);
    1002              :         }
    1003            0 :         Ok(Some(safekeepers))
    1004              :     } else {
    1005            0 :         Ok(None)
    1006              :     }
    1007            0 : }
    1008              : 
    1009            0 : fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
    1010            0 :     let (sub_name, sub_args) = match sub_match.subcommand() {
    1011            0 :         Some(ep_subcommand_data) => ep_subcommand_data,
    1012            0 :         None => bail!("no mappings subcommand provided"),
    1013              :     };
    1014              : 
    1015            0 :     match sub_name {
    1016            0 :         "map" => {
    1017            0 :             let branch_name = sub_args
    1018            0 :                 .get_one::<String>("branch-name")
    1019            0 :                 .expect("branch-name argument missing");
    1020            0 : 
    1021            0 :             let tenant_id = sub_args
    1022            0 :                 .get_one::<String>("tenant-id")
    1023            0 :                 .map(|x| TenantId::from_str(x))
    1024            0 :                 .expect("tenant-id argument missing")
    1025            0 :                 .expect("malformed tenant-id arg");
    1026            0 : 
    1027            0 :             let timeline_id = sub_args
    1028            0 :                 .get_one::<String>("timeline-id")
    1029            0 :                 .map(|x| TimelineId::from_str(x))
    1030            0 :                 .expect("timeline-id argument missing")
    1031            0 :                 .expect("malformed timeline-id arg");
    1032            0 : 
    1033            0 :             env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
    1034              : 
    1035            0 :             Ok(())
    1036              :         }
    1037            0 :         other => unimplemented!("mappings subcommand {other}"),
    1038              :     }
    1039            0 : }
    1040              : 
    1041            0 : fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
    1042            0 :     let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
    1043            0 :         NodeId(id_str.parse().context("while parsing pageserver id")?)
    1044              :     } else {
    1045            0 :         DEFAULT_PAGESERVER_ID
    1046              :     };
    1047              : 
    1048              :     Ok(PageServerNode::from_env(
    1049            0 :         env,
    1050            0 :         env.get_pageserver_conf(node_id)?,
    1051              :     ))
    1052            0 : }
    1053              : 
    1054            0 : fn get_start_timeout(args: &ArgMatches) -> &Duration {
    1055            0 :     let humantime_duration = args
    1056            0 :         .get_one::<humantime::Duration>("start-timeout")
    1057            0 :         .expect("invalid value for start-timeout");
    1058            0 :     humantime_duration.as_ref()
    1059            0 : }
    1060              : 
    1061            0 : fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs {
    1062            0 :     let maybe_instance_id = args.get_one::<u8>("instance-id");
    1063            0 : 
    1064            0 :     let base_port = args.get_one::<u16>("base-port");
    1065            0 : 
    1066            0 :     if maybe_instance_id.is_some() && base_port.is_none() {
    1067            0 :         panic!("storage-controller start specificied instance-id but did not provide base-port");
    1068            0 :     }
    1069            0 : 
    1070            0 :     let start_timeout = args
    1071            0 :         .get_one::<humantime::Duration>("start-timeout")
    1072            0 :         .expect("invalid value for start-timeout");
    1073            0 : 
    1074            0 :     NeonStorageControllerStartArgs {
    1075            0 :         instance_id: maybe_instance_id.copied().unwrap_or(1),
    1076            0 :         base_port: base_port.copied(),
    1077            0 :         start_timeout: *start_timeout,
    1078            0 :     }
    1079            0 : }
    1080              : 
    1081            0 : fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs {
    1082            0 :     let maybe_instance_id = args.get_one::<u8>("instance-id");
    1083            0 :     let immediate = args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
    1084            0 : 
    1085            0 :     NeonStorageControllerStopArgs {
    1086            0 :         instance_id: maybe_instance_id.copied().unwrap_or(1),
    1087            0 :         immediate,
    1088            0 :     }
    1089            0 : }
    1090              : 
    1091            0 : async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
    1092            0 :     match sub_match.subcommand() {
    1093            0 :         Some(("start", subcommand_args)) => {
    1094            0 :             if let Err(e) = get_pageserver(env, subcommand_args)?
    1095            0 :                 .start(get_start_timeout(subcommand_args))
    1096            0 :                 .await
    1097              :             {
    1098            0 :                 eprintln!("pageserver start failed: {e}");
    1099            0 :                 exit(1);
    1100            0 :             }
    1101              :         }
    1102              : 
    1103            0 :         Some(("stop", subcommand_args)) => {
    1104            0 :             let immediate = subcommand_args
    1105            0 :                 .get_one::<String>("stop-mode")
    1106            0 :                 .map(|s| s.as_str())
    1107            0 :                 == Some("immediate");
    1108              : 
    1109            0 :             if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) {
    1110            0 :                 eprintln!("pageserver stop failed: {}", e);
    1111            0 :                 exit(1);
    1112            0 :             }
    1113              :         }
    1114              : 
    1115            0 :         Some(("restart", subcommand_args)) => {
    1116            0 :             let pageserver = get_pageserver(env, subcommand_args)?;
    1117              :             //TODO what shutdown strategy should we use here?
    1118            0 :             if let Err(e) = pageserver.stop(false) {
    1119            0 :                 eprintln!("pageserver stop failed: {}", e);
    1120            0 :                 exit(1);
    1121            0 :             }
    1122              : 
    1123            0 :             if let Err(e) = pageserver.start(get_start_timeout(sub_match)).await {
    1124            0 :                 eprintln!("pageserver start failed: {e}");
    1125            0 :                 exit(1);
    1126            0 :             }
    1127              :         }
    1128              : 
    1129            0 :         Some(("status", subcommand_args)) => {
    1130            0 :             match get_pageserver(env, subcommand_args)?.check_status().await {
    1131            0 :                 Ok(_) => println!("Page server is up and running"),
    1132            0 :                 Err(err) => {
    1133            0 :                     eprintln!("Page server is not available: {}", err);
    1134            0 :                     exit(1);
    1135              :                 }
    1136              :             }
    1137              :         }
    1138              : 
    1139            0 :         Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
    1140            0 :         None => bail!("no pageserver subcommand provided"),
    1141              :     }
    1142            0 :     Ok(())
    1143            0 : }
    1144              : 
    1145            0 : async fn handle_storage_controller(
    1146            0 :     sub_match: &ArgMatches,
    1147            0 :     env: &local_env::LocalEnv,
    1148            0 : ) -> Result<()> {
    1149            0 :     let svc = StorageController::from_env(env);
    1150            0 :     match sub_match.subcommand() {
    1151            0 :         Some(("start", start_match)) => {
    1152            0 :             if let Err(e) = svc.start(storage_controller_start_args(start_match)).await {
    1153            0 :                 eprintln!("start failed: {e}");
    1154            0 :                 exit(1);
    1155            0 :             }
    1156              :         }
    1157              : 
    1158            0 :         Some(("stop", stop_match)) => {
    1159            0 :             if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await {
    1160            0 :                 eprintln!("stop failed: {}", e);
    1161            0 :                 exit(1);
    1162            0 :             }
    1163              :         }
    1164            0 :         Some((sub_name, _)) => bail!("Unexpected storage_controller subcommand '{}'", sub_name),
    1165            0 :         None => bail!("no storage_controller subcommand provided"),
    1166              :     }
    1167            0 :     Ok(())
    1168            0 : }
    1169              : 
    1170            0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
    1171            0 :     if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
    1172            0 :         Ok(SafekeeperNode::from_env(env, node))
    1173              :     } else {
    1174            0 :         bail!("could not find safekeeper {id}")
    1175              :     }
    1176            0 : }
    1177              : 
    1178              : // Get list of options to append to safekeeper command invocation.
    1179            0 : fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec<String> {
    1180            0 :     init_match
    1181            0 :         .get_many::<String>("safekeeper-extra-opt")
    1182            0 :         .into_iter()
    1183            0 :         .flatten()
    1184            0 :         .map(|s| s.to_owned())
    1185            0 :         .collect()
    1186            0 : }
    1187              : 
    1188            0 : async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
    1189            0 :     let (sub_name, sub_args) = match sub_match.subcommand() {
    1190            0 :         Some(safekeeper_command_data) => safekeeper_command_data,
    1191            0 :         None => bail!("no safekeeper subcommand provided"),
    1192              :     };
    1193              : 
    1194              :     // All the commands take an optional safekeeper name argument
    1195            0 :     let sk_id = if let Some(id_str) = sub_args.get_one::<String>("id") {
    1196            0 :         NodeId(id_str.parse().context("while parsing safekeeper id")?)
    1197              :     } else {
    1198            0 :         DEFAULT_SAFEKEEPER_ID
    1199              :     };
    1200            0 :     let safekeeper = get_safekeeper(env, sk_id)?;
    1201              : 
    1202            0 :     match sub_name {
    1203            0 :         "start" => {
    1204            0 :             let extra_opts = safekeeper_extra_opts(sub_args);
    1205              : 
    1206            0 :             if let Err(e) = safekeeper
    1207            0 :                 .start(extra_opts, get_start_timeout(sub_args))
    1208            0 :                 .await
    1209              :             {
    1210            0 :                 eprintln!("safekeeper start failed: {}", e);
    1211            0 :                 exit(1);
    1212            0 :             }
    1213              :         }
    1214              : 
    1215            0 :         "stop" => {
    1216            0 :             let immediate =
    1217            0 :                 sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
    1218              : 
    1219            0 :             if let Err(e) = safekeeper.stop(immediate) {
    1220            0 :                 eprintln!("safekeeper stop failed: {}", e);
    1221            0 :                 exit(1);
    1222            0 :             }
    1223              :         }
    1224              : 
    1225            0 :         "restart" => {
    1226            0 :             let immediate =
    1227            0 :                 sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
    1228              : 
    1229            0 :             if let Err(e) = safekeeper.stop(immediate) {
    1230            0 :                 eprintln!("safekeeper stop failed: {}", e);
    1231            0 :                 exit(1);
    1232            0 :             }
    1233            0 : 
    1234            0 :             let extra_opts = safekeeper_extra_opts(sub_args);
    1235            0 :             if let Err(e) = safekeeper
    1236            0 :                 .start(extra_opts, get_start_timeout(sub_args))
    1237            0 :                 .await
    1238              :             {
    1239            0 :                 eprintln!("safekeeper start failed: {}", e);
    1240            0 :                 exit(1);
    1241            0 :             }
    1242              :         }
    1243              : 
    1244              :         _ => {
    1245            0 :             bail!("Unexpected safekeeper subcommand '{}'", sub_name)
    1246              :         }
    1247              :     }
    1248            0 :     Ok(())
    1249            0 : }
    1250              : 
    1251            0 : async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
    1252            0 :     let (sub_name, sub_args) = match sub_match.subcommand() {
    1253            0 :         Some(broker_command_data) => broker_command_data,
    1254            0 :         None => bail!("no broker subcommand provided"),
    1255              :     };
    1256              : 
    1257            0 :     match sub_name {
    1258            0 :         "start" => {
    1259            0 :             if let Err(e) = broker::start_broker_process(env, get_start_timeout(sub_args)).await {
    1260            0 :                 eprintln!("broker start failed: {e}");
    1261            0 :                 exit(1);
    1262            0 :             }
    1263              :         }
    1264              : 
    1265            0 :         "stop" => {
    1266            0 :             if let Err(e) = broker::stop_broker_process(env) {
    1267            0 :                 eprintln!("broker stop failed: {e}");
    1268            0 :                 exit(1);
    1269            0 :             }
    1270              :         }
    1271              : 
    1272            0 :         _ => bail!("Unexpected broker subcommand '{}'", sub_name),
    1273              :     }
    1274            0 :     Ok(())
    1275            0 : }
    1276              : 
    1277            0 : async fn handle_start_all(
    1278            0 :     env: &'static local_env::LocalEnv,
    1279            0 :     retry_timeout: &Duration,
    1280            0 : ) -> anyhow::Result<()> {
    1281            0 :     let Err(errors) = handle_start_all_impl(env, *retry_timeout).await else {
    1282            0 :         neon_start_status_check(env, retry_timeout)
    1283            0 :             .await
    1284            0 :             .context("status check after successful startup of all services")?;
    1285            0 :         return Ok(());
    1286              :     };
    1287              : 
    1288            0 :     eprintln!("startup failed because one or more services could not be started");
    1289              : 
    1290            0 :     for e in errors {
    1291            0 :         eprintln!("{e}");
    1292            0 :         let debug_repr = format!("{e:?}");
    1293            0 :         for line in debug_repr.lines() {
    1294            0 :             eprintln!("  {line}");
    1295            0 :         }
    1296              :     }
    1297              : 
    1298            0 :     try_stop_all(env, true).await;
    1299              : 
    1300            0 :     exit(2);
    1301            0 : }
    1302              : 
    1303              : /// Returns Ok() if and only if all services could be started successfully.
    1304              : /// Otherwise, returns the list of errors that occurred during startup.
    1305            0 : async fn handle_start_all_impl(
    1306            0 :     env: &'static local_env::LocalEnv,
    1307            0 :     retry_timeout: Duration,
    1308            0 : ) -> Result<(), Vec<anyhow::Error>> {
    1309            0 :     // Endpoints are not started automatically
    1310            0 : 
    1311            0 :     let mut js = JoinSet::new();
    1312              : 
    1313              :     // force infalliblity through closure
    1314              :     #[allow(clippy::redundant_closure_call)]
    1315            0 :     (|| {
    1316            0 :         js.spawn(async move {
    1317            0 :             let retry_timeout = retry_timeout;
    1318            0 :             broker::start_broker_process(env, &retry_timeout).await
    1319            0 :         });
    1320            0 : 
    1321            0 :         // Only start the storage controller if the pageserver is configured to need it
    1322            0 :         if env.control_plane_api.is_some() {
    1323            0 :             js.spawn(async move {
    1324            0 :                 let storage_controller = StorageController::from_env(env);
    1325            0 :                 storage_controller
    1326            0 :                     .start(NeonStorageControllerStartArgs::with_default_instance_id(
    1327            0 :                         retry_timeout.into(),
    1328            0 :                     ))
    1329            0 :                     .await
    1330            0 :                     .map_err(|e| e.context("start storage_controller"))
    1331            0 :             });
    1332            0 :         }
    1333              : 
    1334            0 :         for ps_conf in &env.pageservers {
    1335            0 :             js.spawn(async move {
    1336            0 :                 let pageserver = PageServerNode::from_env(env, ps_conf);
    1337            0 :                 pageserver
    1338            0 :                     .start(&retry_timeout)
    1339            0 :                     .await
    1340            0 :                     .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
    1341            0 :             });
    1342            0 :         }
    1343              : 
    1344            0 :         for node in env.safekeepers.iter() {
    1345            0 :             js.spawn(async move {
    1346            0 :                 let safekeeper = SafekeeperNode::from_env(env, node);
    1347            0 :                 safekeeper
    1348            0 :                     .start(vec![], &retry_timeout)
    1349            0 :                     .await
    1350            0 :                     .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
    1351            0 :             });
    1352            0 :         }
    1353            0 :     })();
    1354            0 : 
    1355            0 :     let mut errors = Vec::new();
    1356            0 :     while let Some(result) = js.join_next().await {
    1357            0 :         let result = result.expect("we don't panic or cancel the tasks");
    1358            0 :         if let Err(e) = result {
    1359            0 :             errors.push(e);
    1360            0 :         }
    1361              :     }
    1362              : 
    1363            0 :     if !errors.is_empty() {
    1364            0 :         return Err(errors);
    1365            0 :     }
    1366            0 : 
    1367            0 :     Ok(())
    1368            0 : }
    1369              : 
    1370            0 : async fn neon_start_status_check(
    1371            0 :     env: &local_env::LocalEnv,
    1372            0 :     retry_timeout: &Duration,
    1373            0 : ) -> anyhow::Result<()> {
    1374              :     const RETRY_INTERVAL: Duration = Duration::from_millis(100);
    1375              :     const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
    1376              : 
    1377            0 :     if env.control_plane_api.is_none() {
    1378            0 :         return Ok(());
    1379            0 :     }
    1380            0 : 
    1381            0 :     let storcon = StorageController::from_env(env);
    1382            0 : 
    1383            0 :     let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
    1384            0 :     let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
    1385            0 : 
    1386            0 :     println!("\nRunning neon status check");
    1387              : 
    1388            0 :     for retry in 0..retries {
    1389            0 :         if retry == notice_after_retries {
    1390            0 :             println!("\nNeon status check has not passed yet, continuing to wait")
    1391            0 :         }
    1392              : 
    1393            0 :         let mut passed = true;
    1394            0 :         let mut nodes = storcon.node_list().await?;
    1395            0 :         let mut pageservers = env.pageservers.clone();
    1396            0 : 
    1397            0 :         if nodes.len() != pageservers.len() {
    1398            0 :             continue;
    1399            0 :         }
    1400            0 : 
    1401            0 :         nodes.sort_by_key(|ps| ps.id);
    1402            0 :         pageservers.sort_by_key(|ps| ps.id);
    1403              : 
    1404            0 :         for (idx, pageserver) in pageservers.iter().enumerate() {
    1405            0 :             let node = &nodes[idx];
    1406            0 :             if node.id != pageserver.id {
    1407            0 :                 passed = false;
    1408            0 :                 break;
    1409            0 :             }
    1410              : 
    1411            0 :             if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
    1412            0 :                 passed = false;
    1413            0 :                 break;
    1414            0 :             }
    1415              :         }
    1416              : 
    1417            0 :         if passed {
    1418            0 :             println!("\nNeon started and passed status check");
    1419            0 :             return Ok(());
    1420            0 :         }
    1421            0 : 
    1422            0 :         tokio::time::sleep(RETRY_INTERVAL).await;
    1423              :     }
    1424              : 
    1425            0 :     anyhow::bail!("\nNeon passed status check")
    1426            0 : }
    1427              : 
    1428            0 : async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
    1429            0 :     let immediate =
    1430            0 :         sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
    1431            0 : 
    1432            0 :     try_stop_all(env, immediate).await;
    1433              : 
    1434            0 :     Ok(())
    1435            0 : }
    1436              : 
    1437            0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
    1438            0 :     // Stop all endpoints
    1439            0 :     match ComputeControlPlane::load(env.clone()) {
    1440            0 :         Ok(cplane) => {
    1441            0 :             for (_k, node) in cplane.endpoints {
    1442            0 :                 if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
    1443            0 :                     eprintln!("postgres stop failed: {e:#}");
    1444            0 :                 }
    1445              :             }
    1446              :         }
    1447            0 :         Err(e) => {
    1448            0 :             eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
    1449              :         }
    1450              :     }
    1451              : 
    1452            0 :     for ps_conf in &env.pageservers {
    1453            0 :         let pageserver = PageServerNode::from_env(env, ps_conf);
    1454            0 :         if let Err(e) = pageserver.stop(immediate) {
    1455            0 :             eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
    1456            0 :         }
    1457              :     }
    1458              : 
    1459            0 :     for node in env.safekeepers.iter() {
    1460            0 :         let safekeeper = SafekeeperNode::from_env(env, node);
    1461            0 :         if let Err(e) = safekeeper.stop(immediate) {
    1462            0 :             eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
    1463            0 :         }
    1464              :     }
    1465              : 
    1466            0 :     if let Err(e) = broker::stop_broker_process(env) {
    1467            0 :         eprintln!("neon broker stop failed: {e:#}");
    1468            0 :     }
    1469              : 
    1470              :     // Stop all storage controller instances. In the most common case there's only one,
    1471              :     // but iterate though the base data directory in order to discover the instances.
    1472            0 :     let storcon_instances = env
    1473            0 :         .storage_controller_instances()
    1474            0 :         .await
    1475            0 :         .expect("Must inspect data dir");
    1476            0 :     for (instance_id, _instance_dir_path) in storcon_instances {
    1477            0 :         let storage_controller = StorageController::from_env(env);
    1478            0 :         let stop_args = NeonStorageControllerStopArgs {
    1479            0 :             instance_id,
    1480            0 :             immediate,
    1481            0 :         };
    1482              : 
    1483            0 :         if let Err(e) = storage_controller.stop(stop_args).await {
    1484            0 :             eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
    1485            0 :         }
    1486              :     }
    1487            0 : }
    1488              : 
    1489            1 : fn cli() -> Command {
    1490            1 :     let timeout_arg = Arg::new("start-timeout")
    1491            1 :         .long("start-timeout")
    1492            1 :         .short('t')
    1493            1 :         .global(true)
    1494            1 :         .help("timeout until we fail the command, e.g. 30s")
    1495            1 :         .value_parser(value_parser!(humantime::Duration))
    1496            1 :         .default_value("10s")
    1497            1 :         .required(false);
    1498            1 : 
    1499            1 :     let branch_name_arg = Arg::new("branch-name")
    1500            1 :         .long("branch-name")
    1501            1 :         .help("Name of the branch to be created or used as an alias for other services")
    1502            1 :         .required(false);
    1503            1 : 
    1504            1 :     let endpoint_id_arg = Arg::new("endpoint_id")
    1505            1 :         .help("Postgres endpoint id")
    1506            1 :         .required(false);
    1507            1 : 
    1508            1 :     let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
    1509            1 : 
    1510            1 :     // --id, when using a pageserver command
    1511            1 :     let pageserver_id_arg = Arg::new("pageserver-id")
    1512            1 :         .long("id")
    1513            1 :         .global(true)
    1514            1 :         .help("pageserver id")
    1515            1 :         .required(false);
    1516            1 :     // --pageserver-id when using a non-pageserver command
    1517            1 :     let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id")
    1518            1 :         .long("pageserver-id")
    1519            1 :         .required(false);
    1520            1 : 
    1521            1 :     let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt")
    1522            1 :         .short('e')
    1523            1 :         .long("safekeeper-extra-opt")
    1524            1 :         .num_args(1)
    1525            1 :         .action(ArgAction::Append)
    1526            1 :         .help("Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo")
    1527            1 :         .required(false);
    1528            1 : 
    1529            1 :     let tenant_id_arg = Arg::new("tenant-id")
    1530            1 :         .long("tenant-id")
    1531            1 :         .help("Tenant id. Represented as a hexadecimal string 32 symbols length")
    1532            1 :         .required(false);
    1533            1 : 
    1534            1 :     let timeline_id_arg = Arg::new("timeline-id")
    1535            1 :         .long("timeline-id")
    1536            1 :         .help("Timeline id. Represented as a hexadecimal string 32 symbols length")
    1537            1 :         .required(false);
    1538            1 : 
    1539            1 :     let pg_version_arg = Arg::new("pg-version")
    1540            1 :         .long("pg-version")
    1541            1 :         .help("Postgres version to use for the initial tenant")
    1542            1 :         .required(false)
    1543            1 :         .value_parser(value_parser!(u32))
    1544            1 :         .default_value(DEFAULT_PG_VERSION);
    1545            1 : 
    1546            1 :     let pg_port_arg = Arg::new("pg-port")
    1547            1 :         .long("pg-port")
    1548            1 :         .required(false)
    1549            1 :         .value_parser(value_parser!(u16))
    1550            1 :         .value_name("pg-port");
    1551            1 : 
    1552            1 :     let http_port_arg = Arg::new("http-port")
    1553            1 :         .long("http-port")
    1554            1 :         .required(false)
    1555            1 :         .value_parser(value_parser!(u16))
    1556            1 :         .value_name("http-port");
    1557            1 : 
    1558            1 :     let safekeepers_arg = Arg::new("safekeepers")
    1559            1 :         .long("safekeepers")
    1560            1 :         .required(false)
    1561            1 :         .value_name("safekeepers");
    1562            1 : 
    1563            1 :     let stop_mode_arg = Arg::new("stop-mode")
    1564            1 :         .short('m')
    1565            1 :         .value_parser(["fast", "immediate"])
    1566            1 :         .default_value("fast")
    1567            1 :         .help("If 'immediate', don't flush repository data at shutdown")
    1568            1 :         .required(false)
    1569            1 :         .value_name("stop-mode");
    1570            1 : 
    1571            1 :     let remote_ext_config_args = Arg::new("remote-ext-config")
    1572            1 :         .long("remote-ext-config")
    1573            1 :         .num_args(1)
    1574            1 :         .help("Configure the remote extensions storage proxy gateway to request for extensions.")
    1575            1 :         .required(false);
    1576            1 : 
    1577            1 :     let lsn_arg = Arg::new("lsn")
    1578            1 :         .long("lsn")
    1579            1 :         .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
    1580            1 :         .required(false);
    1581            1 : 
    1582            1 :     let hot_standby_arg = Arg::new("hot-standby")
    1583            1 :         .value_parser(value_parser!(bool))
    1584            1 :         .long("hot-standby")
    1585            1 :         .help("If set, the node will be a hot replica on the specified timeline")
    1586            1 :         .required(false);
    1587            1 : 
    1588            1 :     let force_arg = Arg::new("force")
    1589            1 :         .value_parser(value_parser!(InitForceMode))
    1590            1 :         .long("force")
    1591            1 :         .default_value(
    1592            1 :             InitForceMode::MustNotExist
    1593            1 :                 .to_possible_value()
    1594            1 :                 .unwrap()
    1595            1 :                 .get_name()
    1596            1 :                 .to_owned(),
    1597            1 :         )
    1598            1 :         .help("Force initialization even if the repository is not empty")
    1599            1 :         .required(false);
    1600            1 : 
    1601            1 :     let num_pageservers_arg = Arg::new("num-pageservers")
    1602            1 :         .value_parser(value_parser!(u16))
    1603            1 :         .long("num-pageservers")
    1604            1 :         .help("How many pageservers to create (default 1)");
    1605            1 : 
    1606            1 :     let update_catalog = Arg::new("update-catalog")
    1607            1 :         .value_parser(value_parser!(bool))
    1608            1 :         .long("update-catalog")
    1609            1 :         .help("If set, will set up the catalog for neon_superuser")
    1610            1 :         .required(false);
    1611            1 : 
    1612            1 :     let create_test_user = Arg::new("create-test-user")
    1613            1 :         .value_parser(value_parser!(bool))
    1614            1 :         .long("create-test-user")
    1615            1 :         .help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`")
    1616            1 :         .required(false);
    1617            1 : 
    1618            1 :     let allow_multiple = Arg::new("allow-multiple")
    1619            1 :         .help("Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests.")
    1620            1 :         .long("allow-multiple")
    1621            1 :         .action(ArgAction::SetTrue)
    1622            1 :         .required(false);
    1623            1 : 
    1624            1 :     let instance_id = Arg::new("instance-id")
    1625            1 :         .long("instance-id")
    1626            1 :         .help("Identifier used to distinguish storage controller instances (default 1)")
    1627            1 :         .value_parser(value_parser!(u8))
    1628            1 :         .required(false);
    1629            1 : 
    1630            1 :     let base_port = Arg::new("base-port")
    1631            1 :         .long("base-port")
    1632            1 :         .help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)")
    1633            1 :         .value_parser(value_parser!(u16))
    1634            1 :         .required(false);
    1635            1 : 
    1636            1 :     Command::new("Neon CLI")
    1637            1 :         .arg_required_else_help(true)
    1638            1 :         .version(GIT_VERSION)
    1639            1 :         .subcommand(
    1640            1 :             Command::new("init")
    1641            1 :                 .about("Initialize a new Neon repository, preparing configs for services to start with")
    1642            1 :                 .arg(num_pageservers_arg.clone())
    1643            1 :                 .arg(
    1644            1 :                     Arg::new("config")
    1645            1 :                         .long("config")
    1646            1 :                         .required(false)
    1647            1 :                         .value_parser(value_parser!(PathBuf))
    1648            1 :                         .value_name("config")
    1649            1 :                 )
    1650            1 :                 .arg(force_arg)
    1651            1 :         )
    1652            1 :         .subcommand(
    1653            1 :             Command::new("timeline")
    1654            1 :             .about("Manage timelines")
    1655            1 :             .arg_required_else_help(true)
    1656            1 :             .subcommand(Command::new("list")
    1657            1 :                 .about("List all timelines, available to this pageserver")
    1658            1 :                 .arg(tenant_id_arg.clone()))
    1659            1 :             .subcommand(Command::new("branch")
    1660            1 :                 .about("Create a new timeline, using another timeline as a base, copying its data")
    1661            1 :                 .arg(tenant_id_arg.clone())
    1662            1 :                 .arg(timeline_id_arg.clone())
    1663            1 :                 .arg(branch_name_arg.clone())
    1664            1 :                 .arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
    1665            1 :                     .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
    1666            1 :                 .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
    1667            1 :                     .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
    1668            1 :             .subcommand(Command::new("create")
    1669            1 :                 .about("Create a new blank timeline")
    1670            1 :                 .arg(tenant_id_arg.clone())
    1671            1 :                 .arg(timeline_id_arg.clone())
    1672            1 :                 .arg(branch_name_arg.clone())
    1673            1 :                 .arg(pg_version_arg.clone())
    1674            1 :             )
    1675            1 :             .subcommand(Command::new("import")
    1676            1 :                 .about("Import timeline from basebackup directory")
    1677            1 :                 .arg(tenant_id_arg.clone())
    1678            1 :                 .arg(timeline_id_arg.clone())
    1679            1 :                 .arg(branch_name_arg.clone())
    1680            1 :                 .arg(Arg::new("base-tarfile")
    1681            1 :                     .long("base-tarfile")
    1682            1 :                     .value_parser(value_parser!(PathBuf))
    1683            1 :                     .help("Basebackup tarfile to import")
    1684            1 :                 )
    1685            1 :                 .arg(Arg::new("base-lsn").long("base-lsn")
    1686            1 :                     .help("Lsn the basebackup starts at"))
    1687            1 :                 .arg(Arg::new("wal-tarfile")
    1688            1 :                     .long("wal-tarfile")
    1689            1 :                     .value_parser(value_parser!(PathBuf))
    1690            1 :                     .help("Wal to add after base")
    1691            1 :                 )
    1692            1 :                 .arg(Arg::new("end-lsn").long("end-lsn")
    1693            1 :                     .help("Lsn the basebackup ends at"))
    1694            1 :                 .arg(pg_version_arg.clone())
    1695            1 :             )
    1696            1 :         ).subcommand(
    1697            1 :             Command::new("tenant")
    1698            1 :             .arg_required_else_help(true)
    1699            1 :             .about("Manage tenants")
    1700            1 :             .subcommand(Command::new("list"))
    1701            1 :             .subcommand(Command::new("create")
    1702            1 :                 .arg(tenant_id_arg.clone())
    1703            1 :                 .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
    1704            1 :                 .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))
    1705            1 :                 .arg(pg_version_arg.clone())
    1706            1 :                 .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
    1707            1 :                     .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
    1708            1 :                 .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
    1709            1 :                 .arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
    1710            1 :                 .arg(Arg::new("placement-policy").value_parser(value_parser!(String)).long("placement-policy").action(ArgAction::Set).help("Placement policy shards in this tenant"))
    1711            1 :                 )
    1712            1 :             .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
    1713            1 :                 .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
    1714            1 :             .subcommand(Command::new("config")
    1715            1 :                 .arg(tenant_id_arg.clone())
    1716            1 :                 .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
    1717            1 :             .subcommand(Command::new("import").arg(tenant_id_arg.clone().required(true))
    1718            1 :                 .about("Import a tenant that is present in remote storage, and create branches for its timelines"))
    1719            1 :         )
    1720            1 :         .subcommand(
    1721            1 :             Command::new("pageserver")
    1722            1 :                 .arg_required_else_help(true)
    1723            1 :                 .about("Manage pageserver")
    1724            1 :                 .arg(pageserver_id_arg)
    1725            1 :                 .subcommand(Command::new("status"))
    1726            1 :                 .subcommand(Command::new("start")
    1727            1 :                     .about("Start local pageserver")
    1728            1 :                     .arg(timeout_arg.clone())
    1729            1 :                 )
    1730            1 :                 .subcommand(Command::new("stop")
    1731            1 :                     .about("Stop local pageserver")
    1732            1 :                     .arg(stop_mode_arg.clone())
    1733            1 :                 )
    1734            1 :                 .subcommand(Command::new("restart")
    1735            1 :                     .about("Restart local pageserver")
    1736            1 :                     .arg(timeout_arg.clone())
    1737            1 :                 )
    1738            1 :         )
    1739            1 :         .subcommand(
    1740            1 :             Command::new("storage_controller")
    1741            1 :                 .arg_required_else_help(true)
    1742            1 :                 .about("Manage storage_controller")
    1743            1 :                 .subcommand(Command::new("start").about("Start storage controller")
    1744            1 :                             .arg(timeout_arg.clone())
    1745            1 :                             .arg(instance_id.clone())
    1746            1 :                             .arg(base_port))
    1747            1 :                 .subcommand(Command::new("stop").about("Stop storage controller")
    1748            1 :                             .arg(stop_mode_arg.clone())
    1749            1 :                             .arg(instance_id))
    1750            1 :         )
    1751            1 :         .subcommand(
    1752            1 :             Command::new("storage_broker")
    1753            1 :                 .arg_required_else_help(true)
    1754            1 :                 .about("Manage broker")
    1755            1 :                 .subcommand(Command::new("start")
    1756            1 :                             .about("Start broker")
    1757            1 :                             .arg(timeout_arg.clone())
    1758            1 :                 )
    1759            1 :                 .subcommand(Command::new("stop")
    1760            1 :                             .about("Stop broker")
    1761            1 :                             .arg(stop_mode_arg.clone())
    1762            1 :                 )
    1763            1 :         )
    1764            1 :         .subcommand(
    1765            1 :             Command::new("safekeeper")
    1766            1 :                 .arg_required_else_help(true)
    1767            1 :                 .about("Manage safekeepers")
    1768            1 :                 .subcommand(Command::new("start")
    1769            1 :                             .about("Start local safekeeper")
    1770            1 :                             .arg(safekeeper_id_arg.clone())
    1771            1 :                             .arg(safekeeper_extra_opt_arg.clone())
    1772            1 :                             .arg(timeout_arg.clone())
    1773            1 :                 )
    1774            1 :                 .subcommand(Command::new("stop")
    1775            1 :                             .about("Stop local safekeeper")
    1776            1 :                             .arg(safekeeper_id_arg.clone())
    1777            1 :                             .arg(stop_mode_arg.clone())
    1778            1 :                 )
    1779            1 :                 .subcommand(Command::new("restart")
    1780            1 :                             .about("Restart local safekeeper")
    1781            1 :                             .arg(safekeeper_id_arg)
    1782            1 :                             .arg(stop_mode_arg.clone())
    1783            1 :                             .arg(safekeeper_extra_opt_arg)
    1784            1 :                             .arg(timeout_arg.clone())
    1785            1 :                 )
    1786            1 :         )
    1787            1 :         .subcommand(
    1788            1 :             Command::new("endpoint")
    1789            1 :                 .arg_required_else_help(true)
    1790            1 :                 .about("Manage postgres instances")
    1791            1 :                 .subcommand(Command::new("list").arg(tenant_id_arg.clone()))
    1792            1 :                 .subcommand(Command::new("create")
    1793            1 :                     .about("Create a compute endpoint")
    1794            1 :                     .arg(endpoint_id_arg.clone())
    1795            1 :                     .arg(branch_name_arg.clone())
    1796            1 :                     .arg(tenant_id_arg.clone())
    1797            1 :                     .arg(lsn_arg.clone())
    1798            1 :                     .arg(pg_port_arg.clone())
    1799            1 :                     .arg(http_port_arg.clone())
    1800            1 :                     .arg(endpoint_pageserver_id_arg.clone())
    1801            1 :                     .arg(
    1802            1 :                         Arg::new("config-only")
    1803            1 :                             .help("Don't do basebackup, create endpoint directory with only config files")
    1804            1 :                             .long("config-only")
    1805            1 :                             .required(false))
    1806            1 :                     .arg(pg_version_arg.clone())
    1807            1 :                     .arg(hot_standby_arg.clone())
    1808            1 :                     .arg(update_catalog)
    1809            1 :                     .arg(allow_multiple.clone())
    1810            1 :                 )
    1811            1 :                 .subcommand(Command::new("start")
    1812            1 :                     .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
    1813            1 :                     .arg(endpoint_id_arg.clone())
    1814            1 :                     .arg(endpoint_pageserver_id_arg.clone())
    1815            1 :                     .arg(safekeepers_arg.clone())
    1816            1 :                     .arg(remote_ext_config_args)
    1817            1 :                     .arg(create_test_user)
    1818            1 :                     .arg(allow_multiple.clone())
    1819            1 :                     .arg(timeout_arg.clone())
    1820            1 :                 )
    1821            1 :                 .subcommand(Command::new("reconfigure")
    1822            1 :                             .about("Reconfigure the endpoint")
    1823            1 :                             .arg(endpoint_pageserver_id_arg)
    1824            1 :                             .arg(safekeepers_arg)
    1825            1 :                             .arg(endpoint_id_arg.clone())
    1826            1 :                             .arg(tenant_id_arg.clone())
    1827            1 :                 )
    1828            1 :                 .subcommand(
    1829            1 :                     Command::new("stop")
    1830            1 :                     .arg(endpoint_id_arg)
    1831            1 :                     .arg(
    1832            1 :                         Arg::new("destroy")
    1833            1 :                             .help("Also delete data directory (now optional, should be default in future)")
    1834            1 :                             .long("destroy")
    1835            1 :                             .action(ArgAction::SetTrue)
    1836            1 :                             .required(false)
    1837            1 :                     )
    1838            1 :                     .arg(
    1839            1 :                         Arg::new("mode")
    1840            1 :                             .help("Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")
    1841            1 :                             .long("mode")
    1842            1 :                             .action(ArgAction::Set)
    1843            1 :                             .required(false)
    1844            1 :                             .value_parser(["smart", "fast", "immediate"])
    1845            1 :                             .default_value("fast")
    1846            1 :                     )
    1847            1 :                 )
    1848            1 : 
    1849            1 :         )
    1850            1 :         .subcommand(
    1851            1 :             Command::new("mappings")
    1852            1 :                 .arg_required_else_help(true)
    1853            1 :                 .about("Manage neon_local branch name mappings")
    1854            1 :                 .subcommand(
    1855            1 :                     Command::new("map")
    1856            1 :                         .about("Create new mapping which cannot exist already")
    1857            1 :                         .arg(branch_name_arg.clone())
    1858            1 :                         .arg(tenant_id_arg.clone())
    1859            1 :                         .arg(timeline_id_arg.clone())
    1860            1 :                 )
    1861            1 :         )
    1862            1 :         // Obsolete old name for 'endpoint'. We now just print an error if it's used.
    1863            1 :         .subcommand(
    1864            1 :             Command::new("pg")
    1865            1 :                 .hide(true)
    1866            1 :                 .arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
    1867            1 :                 .trailing_var_arg(true)
    1868            1 :         )
    1869            1 :         .subcommand(
    1870            1 :             Command::new("start")
    1871            1 :                 .about("Start page server and safekeepers")
    1872            1 :                 .arg(timeout_arg.clone())
    1873            1 :         )
    1874            1 :         .subcommand(
    1875            1 :             Command::new("stop")
    1876            1 :                 .about("Stop page server and safekeepers")
    1877            1 :                 .arg(stop_mode_arg)
    1878            1 :         )
    1879            1 : }
    1880              : 
    1881              : #[test]
    1882            1 : fn verify_cli() {
    1883            1 :     cli().debug_assert();
    1884            1 : }
        

Generated by: LCOV version 2.1-beta