LCOV - differential code coverage report
Current view: top level - control_plane/src/bin - neon_local.rs (source / functions) Coverage Total Hit LBC UBC GBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 82.8 % 1110 919 1 190 2 917
Current Date: 2023-10-19 02:04:12 Functions: 65.3 % 75 49 26 49
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! `neon_local` is an executable that can be used to create a local
       3                 : //! Neon environment, for testing purposes. The local environment is
       4                 : //! quite different from the cloud environment with Kubernetes, but it
       5                 : //! easier to work with locally. The python tests in `test_runner`
       6                 : //! rely on `neon_local` to set up the environment for each test.
       7                 : //!
       8                 : use anyhow::{anyhow, bail, Context, Result};
       9                 : use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
      10                 : use compute_api::spec::ComputeMode;
      11                 : use control_plane::attachment_service::AttachmentService;
      12                 : use control_plane::endpoint::ComputeControlPlane;
      13                 : use control_plane::local_env::LocalEnv;
      14                 : use control_plane::pageserver::PageServerNode;
      15                 : use control_plane::safekeeper::SafekeeperNode;
      16                 : use control_plane::{broker, local_env};
      17                 : use pageserver_api::models::TimelineInfo;
      18                 : use pageserver_api::{
      19                 :     DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
      20                 :     DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
      21                 : };
      22                 : use postgres_backend::AuthType;
      23                 : use safekeeper_api::{
      24                 :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
      25                 :     DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
      26                 : };
      27                 : use std::collections::{BTreeSet, HashMap};
      28                 : use std::path::PathBuf;
      29                 : use std::process::exit;
      30                 : use std::str::FromStr;
      31                 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
      32                 : use utils::{
      33                 :     auth::{Claims, Scope},
      34                 :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      35                 :     lsn::Lsn,
      36                 :     project_git_version,
      37                 : };
      38                 : 
      39                 : // Default id of a safekeeper node, if not specified on the command line.
      40                 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
      41                 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
      42                 : const DEFAULT_BRANCH_NAME: &str = "main";
      43                 : project_git_version!(GIT_VERSION);
      44                 : 
      45                 : const DEFAULT_PG_VERSION: &str = "15";
      46                 : 
      47                 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
      48                 : 
      49 UBC           0 : fn default_conf() -> String {
      50               0 :     format!(
      51               0 :         r#"
      52               0 : # Default built-in configuration, defined in main.rs
      53               0 : control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
      54               0 : 
      55               0 : [broker]
      56               0 : listen_addr = '{DEFAULT_BROKER_ADDR}'
      57               0 : 
      58               0 : [[pageservers]]
      59               0 : id = {DEFAULT_PAGESERVER_ID}
      60               0 : listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
      61               0 : listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
      62               0 : pg_auth_type = '{trust_auth}'
      63               0 : http_auth_type = '{trust_auth}'
      64               0 : 
      65               0 : [[safekeepers]]
      66               0 : id = {DEFAULT_SAFEKEEPER_ID}
      67               0 : pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
      68               0 : http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
      69               0 : 
      70               0 : "#,
      71               0 :         trust_auth = AuthType::Trust,
      72               0 :     )
      73               0 : }
      74                 : 
      75                 : ///
      76                 : /// Timelines tree element used as a value in the HashMap.
      77                 : ///
      78                 : struct TimelineTreeEl {
      79                 :     /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
      80                 :     pub info: TimelineInfo,
      81                 :     /// Name, recovered from neon config mappings
      82                 :     pub name: Option<String>,
      83                 :     /// Holds all direct children of this timeline referenced using `timeline_id`.
      84                 :     pub children: BTreeSet<TimelineId>,
      85                 : }
      86                 : 
      87                 : // Main entry point for the 'neon_local' CLI utility
      88                 : //
      89                 : // This utility helps to manage neon installation. That includes following:
      90                 : //   * Management of local postgres installations running on top of the
      91                 : //     pageserver.
      92                 : //   * Providing CLI api to the pageserver
      93                 : //   * TODO: export/import to/from usual postgres
      94 CBC        5187 : fn main() -> Result<()> {
      95            5187 :     let matches = cli().get_matches();
      96                 : 
      97            5187 :     let (sub_name, sub_args) = match matches.subcommand() {
      98            5187 :         Some(subcommand_data) => subcommand_data,
      99 UBC           0 :         None => bail!("no subcommand provided"),
     100                 :     };
     101                 : 
     102                 :     // Check for 'neon init' command first.
     103 CBC        5187 :     let subcommand_result = if sub_name == "init" {
     104             348 :         handle_init(sub_args).map(Some)
     105                 :     } else {
     106                 :         // all other commands need an existing config
     107            4839 :         let mut env = LocalEnv::load_config().context("Error loading config")?;
     108            4839 :         let original_env = env.clone();
     109                 : 
     110            4839 :         let subcommand_result = match sub_name {
     111            4839 :             "tenant" => handle_tenant(sub_args, &mut env),
     112            4382 :             "timeline" => handle_timeline(sub_args, &mut env),
     113            3992 :             "start" => handle_start_all(sub_args, &env),
     114            3987 :             "stop" => handle_stop_all(sub_args, &env),
     115            3982 :             "pageserver" => handle_pageserver(sub_args, &env),
     116            2870 :             "attachment_service" => handle_attachment_service(sub_args, &env),
     117            2844 :             "safekeeper" => handle_safekeeper(sub_args, &env),
     118            1833 :             "endpoint" => handle_endpoint(sub_args, &env),
     119               1 :             "mappings" => handle_mappings(sub_args, &mut env),
     120 UBC           0 :             "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
     121               0 :             _ => bail!("unexpected subcommand {sub_name}"),
     122                 :         };
     123                 : 
     124 CBC        4839 :         if original_env != env {
     125             802 :             subcommand_result.map(|()| Some(env))
     126                 :         } else {
     127            4037 :             subcommand_result.map(|()| None)
     128                 :         }
     129                 :     };
     130                 : 
     131            5148 :     match subcommand_result {
     132            1150 :         Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?,
     133            3998 :         Ok(None) => (),
     134              39 :         Err(e) => {
     135              39 :             eprintln!("command failed: {e:?}");
     136              39 :             exit(1);
     137                 :         }
     138                 :     }
     139            5148 :     Ok(())
     140            5148 : }
     141                 : 
     142                 : ///
     143                 : /// Prints timelines list as a tree-like structure.
     144                 : ///
     145              15 : fn print_timelines_tree(
     146              15 :     timelines: Vec<TimelineInfo>,
     147              15 :     mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
     148              15 : ) -> Result<()> {
     149              15 :     let mut timelines_hash = timelines
     150              15 :         .iter()
     151              30 :         .map(|t| {
     152              30 :             (
     153              30 :                 t.timeline_id,
     154              30 :                 TimelineTreeEl {
     155              30 :                     info: t.clone(),
     156              30 :                     children: BTreeSet::new(),
     157              30 :                     name: timeline_name_mappings
     158              30 :                         .remove(&TenantTimelineId::new(t.tenant_id, t.timeline_id)),
     159              30 :                 },
     160              30 :             )
     161              30 :         })
     162              15 :         .collect::<HashMap<_, _>>();
     163                 : 
     164                 :     // Memorize all direct children of each timeline.
     165              30 :     for timeline in timelines.iter() {
     166              30 :         if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
     167              15 :             timelines_hash
     168              15 :                 .get_mut(&ancestor_timeline_id)
     169              15 :                 .context("missing timeline info in the HashMap")?
     170                 :                 .children
     171              15 :                 .insert(timeline.timeline_id);
     172              15 :         }
     173                 :     }
     174                 : 
     175              30 :     for timeline in timelines_hash.values() {
     176                 :         // Start with root local timelines (no ancestors) first.
     177              30 :         if timeline.info.ancestor_timeline_id.is_none() {
     178              15 :             print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
     179              15 :         }
     180                 :     }
     181                 : 
     182              15 :     Ok(())
     183              15 : }
     184                 : 
     185                 : ///
     186                 : /// Recursively prints timeline info with all its children.
     187                 : ///
     188              30 : fn print_timeline(
     189              30 :     nesting_level: usize,
     190              30 :     is_last: &[bool],
     191              30 :     timeline: &TimelineTreeEl,
     192              30 :     timelines: &HashMap<TimelineId, TimelineTreeEl>,
     193              30 : ) -> Result<()> {
     194              30 :     if nesting_level > 0 {
     195              15 :         let ancestor_lsn = match timeline.info.ancestor_lsn {
     196              15 :             Some(lsn) => lsn.to_string(),
     197 UBC           0 :             None => "Unknown Lsn".to_string(),
     198                 :         };
     199                 : 
     200 CBC          15 :         let mut br_sym = "┣━";
     201              15 : 
     202              15 :         // Draw each nesting padding with proper style
     203              15 :         // depending on whether its timeline ended or not.
     204              15 :         if nesting_level > 1 {
     205               3 :             for l in &is_last[1..is_last.len() - 1] {
     206               3 :                 if *l {
     207 LBC         (3) :                     print!("   ");
     208 CBC           3 :                 } else {
     209 GBC           3 :                     print!("┃  ");
     210               3 :                 }
     211                 :             }
     212 CBC          12 :         }
     213                 : 
     214                 :         // We are the last in this sub-timeline
     215              15 :         if *is_last.last().unwrap() {
     216              10 :             br_sym = "┗━";
     217              10 :         }
     218                 : 
     219              15 :         print!("{} @{}: ", br_sym, ancestor_lsn);
     220              15 :     }
     221                 : 
     222                 :     // Finally print a timeline id and name with new line
     223              30 :     println!(
     224              30 :         "{} [{}]",
     225              30 :         timeline.name.as_deref().unwrap_or("_no_name_"),
     226              30 :         timeline.info.timeline_id
     227              30 :     );
     228              30 : 
     229              30 :     let len = timeline.children.len();
     230              30 :     let mut i: usize = 0;
     231              30 :     let mut is_last_new = Vec::from(is_last);
     232              30 :     is_last_new.push(false);
     233                 : 
     234              45 :     for child in &timeline.children {
     235              15 :         i += 1;
     236              15 : 
     237              15 :         // Mark that the last padding is the end of the timeline
     238              15 :         if i == len {
     239              10 :             if let Some(last) = is_last_new.last_mut() {
     240              10 :                 *last = true;
     241              10 :             }
     242               5 :         }
     243                 : 
     244                 :         print_timeline(
     245              15 :             nesting_level + 1,
     246              15 :             &is_last_new,
     247              15 :             timelines
     248              15 :                 .get(child)
     249              15 :                 .context("missing timeline info in the HashMap")?,
     250              15 :             timelines,
     251 UBC           0 :         )?;
     252                 :     }
     253                 : 
     254 CBC          30 :     Ok(())
     255              30 : }
     256                 : 
     257                 : /// Returns a map of timeline IDs to timeline_id@lsn strings.
     258                 : /// Connects to the pageserver to query this information.
     259 UBC           0 : fn get_timeline_infos(
     260               0 :     env: &local_env::LocalEnv,
     261               0 :     tenant_id: &TenantId,
     262               0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
     263               0 :     Ok(get_default_pageserver(env)
     264               0 :         .timeline_list(tenant_id)?
     265               0 :         .into_iter()
     266               0 :         .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
     267               0 :         .collect())
     268               0 : }
     269                 : 
     270                 : // Helper function to parse --tenant_id option, or get the default from config file
     271                 : fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
     272 CBC        2236 :     if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
     273            2236 :         tenant_id_from_arguments
     274 UBC           0 :     } else if let Some(default_id) = env.default_tenant_id {
     275               0 :         Ok(default_id)
     276                 :     } else {
     277               0 :         anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
     278                 :     }
     279 CBC        2236 : }
     280                 : 
     281            2673 : fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TenantId>> {
     282            2673 :     sub_match
     283            2673 :         .get_one::<String>("tenant-id")
     284            2673 :         .map(|tenant_id| TenantId::from_str(tenant_id))
     285            2673 :         .transpose()
     286            2673 :         .context("Failed to parse tenant id from the argument string")
     287            2673 : }
     288                 : 
     289             440 : fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<TimelineId>> {
     290             440 :     sub_match
     291             440 :         .get_one::<String>("timeline-id")
     292             440 :         .map(|timeline_id| TimelineId::from_str(timeline_id))
     293             440 :         .transpose()
     294             440 :         .context("Failed to parse timeline id from the argument string")
     295             440 : }
     296                 : 
     297             348 : fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
     298                 :     // Create config file
     299             348 :     let toml_file: String = if let Some(config_path) = init_match.get_one::<PathBuf>("config") {
     300                 :         // load and parse the file
     301             348 :         std::fs::read_to_string(config_path).with_context(|| {
     302 UBC           0 :             format!(
     303               0 :                 "Could not read configuration file '{}'",
     304               0 :                 config_path.display()
     305               0 :             )
     306 CBC         348 :         })?
     307                 :     } else {
     308                 :         // Built-in default config
     309 UBC           0 :         default_conf()
     310                 :     };
     311                 : 
     312 CBC         348 :     let pg_version = init_match
     313             348 :         .get_one::<u32>("pg-version")
     314             348 :         .copied()
     315             348 :         .context("Failed to parse postgres version from the argument string")?;
     316                 : 
     317             348 :     let mut env =
     318             348 :         LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
     319             348 :     let force = init_match.get_flag("force");
     320             348 :     env.init(pg_version, force)
     321             348 :         .context("Failed to initialize neon repository")?;
     322                 : 
     323                 :     // Initialize pageserver, create initial tenant and timeline.
     324             697 :     for ps_conf in &env.pageservers {
     325             349 :         PageServerNode::from_env(&env, ps_conf)
     326             349 :             .initialize(&pageserver_config_overrides(init_match))
     327             349 :             .unwrap_or_else(|e| {
     328 UBC           0 :                 eprintln!("pageserver init failed: {e:?}");
     329               0 :                 exit(1);
     330 CBC         349 :             });
     331             349 :     }
     332                 : 
     333             348 :     Ok(env)
     334             348 : }
     335                 : 
     336                 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
     337                 : /// For typical interactive use, one would just run with a single pageserver.  Scenarios with
     338                 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
     339                 : /// than this CLI.
     340             847 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
     341             847 :     let ps_conf = env
     342             847 :         .pageservers
     343             847 :         .first()
     344             847 :         .expect("Config is validated to contain at least one pageserver");
     345             847 :     PageServerNode::from_env(env, ps_conf)
     346             847 : }
     347                 : 
     348             909 : fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
     349             909 :     init_match
     350             909 :         .get_many::<String>("pageserver-config-override")
     351             909 :         .into_iter()
     352             909 :         .flatten()
     353             909 :         .map(String::as_str)
     354             909 :         .collect()
     355             909 : }
     356                 : 
     357             457 : fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
     358             457 :     let pageserver = get_default_pageserver(env);
     359             457 :     match tenant_match.subcommand() {
     360             457 :         Some(("list", _)) => {
     361              11 :             for t in pageserver.tenant_list()? {
     362              11 :                 println!("{} {:?}", t.id, t.state);
     363              11 :             }
     364                 :         }
     365             451 :         Some(("create", create_match)) => {
     366             437 :             let tenant_conf: HashMap<_, _> = create_match
     367             437 :                 .get_many::<String>("config")
     368             685 :                 .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
     369             437 :                 .unwrap_or_default();
     370                 : 
     371                 :             // If tenant ID was not specified, generate one
     372             437 :             let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
     373                 : 
     374             437 :             let generation = if env.control_plane_api.is_some() {
     375                 :                 // We must register the tenant with the attachment service, so
     376                 :                 // that when the pageserver restarts, it will be re-attached.
     377              12 :                 let attachment_service = AttachmentService::from_env(env);
     378              12 :                 attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
     379                 :             } else {
     380             425 :                 None
     381                 :             };
     382                 : 
     383             437 :             pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
     384             435 :             println!("tenant {tenant_id} successfully created on the pageserver");
     385                 : 
     386                 :             // Create an initial timeline for the new tenant
     387             435 :             let new_timeline_id = parse_timeline_id(create_match)?;
     388             435 :             let pg_version = create_match
     389             435 :                 .get_one::<u32>("pg-version")
     390             435 :                 .copied()
     391             435 :                 .context("Failed to parse postgres version from the argument string")?;
     392                 : 
     393             435 :             let timeline_info = pageserver.timeline_create(
     394             435 :                 tenant_id,
     395             435 :                 new_timeline_id,
     396             435 :                 None,
     397             435 :                 None,
     398             435 :                 Some(pg_version),
     399             435 :             )?;
     400             435 :             let new_timeline_id = timeline_info.timeline_id;
     401             435 :             let last_record_lsn = timeline_info.last_record_lsn;
     402             435 : 
     403             435 :             env.register_branch_mapping(
     404             435 :                 DEFAULT_BRANCH_NAME.to_string(),
     405             435 :                 tenant_id,
     406             435 :                 new_timeline_id,
     407             435 :             )?;
     408                 : 
     409             435 :             println!(
     410             435 :                 "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
     411             435 :             );
     412             435 : 
     413             435 :             if create_match.get_flag("set-default") {
     414               1 :                 println!("Setting tenant {tenant_id} as a default one");
     415               1 :                 env.default_tenant_id = Some(tenant_id);
     416             434 :             }
     417                 :         }
     418              14 :         Some(("set-default", set_default_match)) => {
     419 UBC           0 :             let tenant_id =
     420               0 :                 parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
     421               0 :             println!("Setting tenant {tenant_id} as a default one");
     422               0 :             env.default_tenant_id = Some(tenant_id);
     423                 :         }
     424 CBC          14 :         Some(("config", create_match)) => {
     425              14 :             let tenant_id = get_tenant_id(create_match, env)?;
     426              14 :             let tenant_conf: HashMap<_, _> = create_match
     427              14 :                 .get_many::<String>("config")
     428              47 :                 .map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
     429              14 :                 .unwrap_or_default();
     430              14 : 
     431              14 :             pageserver
     432              14 :                 .tenant_config(tenant_id, tenant_conf)
     433              14 :                 .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
     434              14 :             println!("tenant {tenant_id} successfully configured on the pageserver");
     435                 :         }
     436 UBC           0 :         Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
     437               0 :         None => bail!("no tenant subcommand provided"),
     438                 :     }
     439 CBC         455 :     Ok(())
     440             457 : }
     441                 : 
     442             390 : fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
     443             390 :     let pageserver = get_default_pageserver(env);
     444             390 : 
     445             390 :     match timeline_match.subcommand() {
     446             390 :         Some(("list", list_match)) => {
     447              15 :             let tenant_id = get_tenant_id(list_match, env)?;
     448              15 :             let timelines = pageserver.timeline_list(&tenant_id)?;
     449              15 :             print_timelines_tree(timelines, env.timeline_name_mappings())?;
     450                 :         }
     451             375 :         Some(("create", create_match)) => {
     452             127 :             let tenant_id = get_tenant_id(create_match, env)?;
     453             127 :             let new_branch_name = create_match
     454             127 :                 .get_one::<String>("branch-name")
     455             127 :                 .ok_or_else(|| anyhow!("No branch name provided"))?;
     456                 : 
     457             127 :             let pg_version = create_match
     458             127 :                 .get_one::<u32>("pg-version")
     459             127 :                 .copied()
     460             127 :                 .context("Failed to parse postgres version from the argument string")?;
     461                 : 
     462             125 :             let timeline_info =
     463             127 :                 pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
     464             125 :             let new_timeline_id = timeline_info.timeline_id;
     465             125 : 
     466             125 :             let last_record_lsn = timeline_info.last_record_lsn;
     467             125 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
     468                 : 
     469             125 :             println!(
     470             125 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
     471             125 :                 timeline_info.timeline_id
     472             125 :             );
     473                 :         }
     474             248 :         Some(("import", import_match)) => {
     475               5 :             let tenant_id = get_tenant_id(import_match, env)?;
     476               5 :             let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
     477               5 :             let name = import_match
     478               5 :                 .get_one::<String>("node-name")
     479               5 :                 .ok_or_else(|| anyhow!("No node name provided"))?;
     480                 : 
     481                 :             // Parse base inputs
     482               5 :             let base_tarfile = import_match
     483               5 :                 .get_one::<PathBuf>("base-tarfile")
     484               5 :                 .ok_or_else(|| anyhow!("No base-tarfile provided"))?
     485               5 :                 .to_owned();
     486               5 :             let base_lsn = Lsn::from_str(
     487               5 :                 import_match
     488               5 :                     .get_one::<String>("base-lsn")
     489               5 :                     .ok_or_else(|| anyhow!("No base-lsn provided"))?,
     490 UBC           0 :             )?;
     491 CBC           5 :             let base = (base_lsn, base_tarfile);
     492               5 : 
     493               5 :             // Parse pg_wal inputs
     494               5 :             let wal_tarfile = import_match.get_one::<PathBuf>("wal-tarfile").cloned();
     495               5 :             let end_lsn = import_match
     496               5 :                 .get_one::<String>("end-lsn")
     497               5 :                 .map(|s| Lsn::from_str(s).unwrap());
     498               5 :             // TODO validate both or none are provided
     499               5 :             let pg_wal = end_lsn.zip(wal_tarfile);
     500                 : 
     501               5 :             let pg_version = import_match
     502               5 :                 .get_one::<u32>("pg-version")
     503               5 :                 .copied()
     504               5 :                 .context("Failed to parse postgres version from the argument string")?;
     505                 : 
     506               5 :             let mut cplane = ComputeControlPlane::load(env.clone())?;
     507               5 :             println!("Importing timeline into pageserver ...");
     508               5 :             pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
     509               3 :             env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
     510                 : 
     511               3 :             println!("Creating endpoint for imported timeline ...");
     512               3 :             cplane.new_endpoint(
     513               3 :                 name,
     514               3 :                 tenant_id,
     515               3 :                 timeline_id,
     516               3 :                 None,
     517               3 :                 None,
     518               3 :                 pg_version,
     519               3 :                 ComputeMode::Primary,
     520               3 :                 DEFAULT_PAGESERVER_ID,
     521               3 :             )?;
     522               3 :             println!("Done");
     523                 :         }
     524             243 :         Some(("branch", branch_match)) => {
     525             243 :             let tenant_id = get_tenant_id(branch_match, env)?;
     526             243 :             let new_branch_name = branch_match
     527             243 :                 .get_one::<String>("branch-name")
     528             243 :                 .ok_or_else(|| anyhow!("No branch name provided"))?;
     529             243 :             let ancestor_branch_name = branch_match
     530             243 :                 .get_one::<String>("ancestor-branch-name")
     531             243 :                 .map(|s| s.as_str())
     532             243 :                 .unwrap_or(DEFAULT_BRANCH_NAME);
     533             243 :             let ancestor_timeline_id = env
     534             243 :                 .get_branch_timeline_id(ancestor_branch_name, tenant_id)
     535             243 :                 .ok_or_else(|| {
     536 UBC           0 :                     anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
     537 CBC         243 :                 })?;
     538                 : 
     539             243 :             let start_lsn = branch_match
     540             243 :                 .get_one::<String>("ancestor-start-lsn")
     541             243 :                 .map(|lsn_str| Lsn::from_str(lsn_str))
     542             243 :                 .transpose()
     543             243 :                 .context("Failed to parse ancestor start Lsn from the request")?;
     544             243 :             let timeline_info = pageserver.timeline_create(
     545             243 :                 tenant_id,
     546             243 :                 None,
     547             243 :                 start_lsn,
     548             243 :                 Some(ancestor_timeline_id),
     549             243 :                 None,
     550             243 :             )?;
     551             239 :             let new_timeline_id = timeline_info.timeline_id;
     552             239 : 
     553             239 :             let last_record_lsn = timeline_info.last_record_lsn;
     554             239 : 
     555             239 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
     556                 : 
     557             239 :             println!(
     558             239 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
     559             239 :                 timeline_info.timeline_id
     560             239 :             );
     561                 :         }
     562 UBC           0 :         Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
     563               0 :         None => bail!("no tenant subcommand provided"),
     564                 :     }
     565                 : 
     566 CBC         382 :     Ok(())
     567             390 : }
     568                 : 
     569            1832 : fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
     570            1832 :     let (sub_name, sub_args) = match ep_match.subcommand() {
     571            1832 :         Some(ep_subcommand_data) => ep_subcommand_data,
     572 UBC           0 :         None => bail!("no endpoint subcommand provided"),
     573                 :     };
     574 CBC        1832 :     let mut cplane = ComputeControlPlane::load(env.clone())?;
     575                 : 
     576                 :     // All subcommands take an optional --tenant-id option
     577            1832 :     let tenant_id = get_tenant_id(sub_args, env)?;
     578                 : 
     579            1832 :     match sub_name {
     580            1832 :         "list" => {
     581 UBC           0 :             let timeline_infos = get_timeline_infos(env, &tenant_id).unwrap_or_else(|e| {
     582               0 :                 eprintln!("Failed to load timeline info: {}", e);
     583               0 :                 HashMap::new()
     584               0 :             });
     585               0 : 
     586               0 :             let timeline_name_mappings = env.timeline_name_mappings();
     587               0 : 
     588               0 :             let mut table = comfy_table::Table::new();
     589               0 : 
     590               0 :             table.load_preset(comfy_table::presets::NOTHING);
     591               0 : 
     592               0 :             table.set_header([
     593               0 :                 "ENDPOINT",
     594               0 :                 "ADDRESS",
     595               0 :                 "TIMELINE",
     596               0 :                 "BRANCH NAME",
     597               0 :                 "LSN",
     598               0 :                 "STATUS",
     599               0 :             ]);
     600                 : 
     601               0 :             for (endpoint_id, endpoint) in cplane
     602               0 :                 .endpoints
     603               0 :                 .iter()
     604               0 :                 .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
     605               0 :             {
     606               0 :                 let lsn_str = match endpoint.mode {
     607               0 :                     ComputeMode::Static(lsn) => {
     608               0 :                         // -> read-only endpoint
     609               0 :                         // Use the node's LSN.
     610               0 :                         lsn.to_string()
     611                 :                     }
     612                 :                     _ => {
     613                 :                         // -> primary endpoint or hot replica
     614                 :                         // Use the LSN at the end of the timeline.
     615               0 :                         timeline_infos
     616               0 :                             .get(&endpoint.timeline_id)
     617               0 :                             .map(|bi| bi.last_record_lsn.to_string())
     618               0 :                             .unwrap_or_else(|| "?".to_string())
     619                 :                     }
     620                 :                 };
     621                 : 
     622               0 :                 let branch_name = timeline_name_mappings
     623               0 :                     .get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
     624               0 :                     .map(|name| name.as_str())
     625               0 :                     .unwrap_or("?");
     626               0 : 
     627               0 :                 table.add_row([
     628               0 :                     endpoint_id.as_str(),
     629               0 :                     &endpoint.pg_address.to_string(),
     630               0 :                     &endpoint.timeline_id.to_string(),
     631               0 :                     branch_name,
     632               0 :                     lsn_str.as_str(),
     633               0 :                     endpoint.status(),
     634               0 :                 ]);
     635                 :             }
     636                 : 
     637               0 :             println!("{table}");
     638                 :         }
     639 CBC        1832 :         "create" => {
     640             561 :             let branch_name = sub_args
     641             561 :                 .get_one::<String>("branch-name")
     642             561 :                 .map(|s| s.as_str())
     643             561 :                 .unwrap_or(DEFAULT_BRANCH_NAME);
     644             561 :             let endpoint_id = sub_args
     645             561 :                 .get_one::<String>("endpoint_id")
     646             561 :                 .map(String::to_string)
     647             561 :                 .unwrap_or_else(|| format!("ep-{branch_name}"));
     648                 : 
     649             561 :             let lsn = sub_args
     650             561 :                 .get_one::<String>("lsn")
     651             561 :                 .map(|lsn_str| Lsn::from_str(lsn_str))
     652             561 :                 .transpose()
     653             561 :                 .context("Failed to parse Lsn from the request")?;
     654             561 :             let timeline_id = env
     655             561 :                 .get_branch_timeline_id(branch_name, tenant_id)
     656             561 :                 .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
     657                 : 
     658             561 :             let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
     659             561 :             let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
     660             561 :             let pg_version = sub_args
     661             561 :                 .get_one::<u32>("pg-version")
     662             561 :                 .copied()
     663             561 :                 .context("Failed to parse postgres version from the argument string")?;
     664                 : 
     665             561 :             let hot_standby = sub_args
     666             561 :                 .get_one::<bool>("hot-standby")
     667             561 :                 .copied()
     668             561 :                 .unwrap_or(false);
     669                 : 
     670             561 :             let pageserver_id =
     671             561 :                 if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
     672 UBC           0 :                     NodeId(id_str.parse().context("while parsing pageserver id")?)
     673                 :                 } else {
     674 CBC         561 :                     DEFAULT_PAGESERVER_ID
     675                 :                 };
     676                 : 
     677             561 :             let mode = match (lsn, hot_standby) {
     678              88 :                 (Some(lsn), false) => ComputeMode::Static(lsn),
     679               1 :                 (None, true) => ComputeMode::Replica,
     680             472 :                 (None, false) => ComputeMode::Primary,
     681 UBC           0 :                 (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
     682                 :             };
     683                 : 
     684 CBC         561 :             cplane.new_endpoint(
     685             561 :                 &endpoint_id,
     686             561 :                 tenant_id,
     687             561 :                 timeline_id,
     688             561 :                 pg_port,
     689             561 :                 http_port,
     690             561 :                 pg_version,
     691             561 :                 mode,
     692             561 :                 pageserver_id,
     693             561 :             )?;
     694                 :         }
     695            1271 :         "start" => {
     696             641 :             let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
     697             641 :             let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
     698             641 :             let endpoint_id = sub_args
     699             641 :                 .get_one::<String>("endpoint_id")
     700             641 :                 .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
     701                 : 
     702             641 :             let pageserver_id =
     703             641 :                 if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
     704 UBC           0 :                     NodeId(id_str.parse().context("while parsing pageserver id")?)
     705                 :                 } else {
     706 CBC         641 :                     DEFAULT_PAGESERVER_ID
     707                 :                 };
     708                 : 
     709             641 :             let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
     710                 : 
     711                 :             // If --safekeepers argument is given, use only the listed safekeeper nodes.
     712             641 :             let safekeepers =
     713             641 :                 if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
     714             637 :                     let mut safekeepers: Vec<NodeId> = Vec::new();
     715             834 :                     for sk_id in safekeepers_str.split(',').map(str::trim) {
     716             834 :                         let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
     717 UBC           0 :                             anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
     718 CBC         834 :                         })?);
     719             834 :                         safekeepers.push(sk_id);
     720                 :                     }
     721             637 :                     safekeepers
     722                 :                 } else {
     723               8 :                     env.safekeepers.iter().map(|sk| sk.id).collect()
     724                 :                 };
     725                 : 
     726             641 :             let endpoint = cplane.endpoints.get(endpoint_id.as_str());
     727                 : 
     728             641 :             let ps_conf = env.get_pageserver_conf(pageserver_id)?;
     729             641 :             let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
     730              15 :                 let claims = Claims::new(Some(tenant_id), Scope::Tenant);
     731              15 : 
     732              15 :                 Some(env.generate_auth_token(&claims)?)
     733                 :             } else {
     734             626 :                 None
     735                 :             };
     736                 : 
     737             641 :             let hot_standby = sub_args
     738             641 :                 .get_one::<bool>("hot-standby")
     739             641 :                 .copied()
     740             641 :                 .unwrap_or(false);
     741                 : 
     742             641 :             if let Some(endpoint) = endpoint {
     743             637 :                 match (&endpoint.mode, hot_standby) {
     744                 :                     (ComputeMode::Static(_), true) => {
     745 UBC           0 :                         bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
     746                 :                     }
     747                 :                     (ComputeMode::Primary, true) => {
     748               0 :                         bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
     749                 :                     }
     750 CBC         637 :                     _ => {}
     751             637 :                 }
     752             637 :                 println!("Starting existing endpoint {endpoint_id}...");
     753             637 :                 endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
     754                 :             } else {
     755               4 :                 let branch_name = sub_args
     756               4 :                     .get_one::<String>("branch-name")
     757               4 :                     .map(|s| s.as_str())
     758               4 :                     .unwrap_or(DEFAULT_BRANCH_NAME);
     759               4 :                 let timeline_id = env
     760               4 :                     .get_branch_timeline_id(branch_name, tenant_id)
     761               4 :                     .ok_or_else(|| {
     762 UBC           0 :                         anyhow!("Found no timeline id for branch name '{branch_name}'")
     763 CBC           4 :                     })?;
     764               4 :                 let lsn = sub_args
     765               4 :                     .get_one::<String>("lsn")
     766               4 :                     .map(|lsn_str| Lsn::from_str(lsn_str))
     767               4 :                     .transpose()
     768               4 :                     .context("Failed to parse Lsn from the request")?;
     769               4 :                 let pg_version = sub_args
     770               4 :                     .get_one::<u32>("pg-version")
     771               4 :                     .copied()
     772               4 :                     .context("Failed to `pg-version` from the argument string")?;
     773                 : 
     774               4 :                 let mode = match (lsn, hot_standby) {
     775 UBC           0 :                     (Some(lsn), false) => ComputeMode::Static(lsn),
     776               0 :                     (None, true) => ComputeMode::Replica,
     777 CBC           4 :                     (None, false) => ComputeMode::Primary,
     778 UBC           0 :                     (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
     779                 :                 };
     780                 : 
     781                 :                 // when used with custom port this results in non obvious behaviour
     782                 :                 // port is remembered from first start command, i e
     783                 :                 // start --port X
     784                 :                 // stop
     785                 :                 // start <-- will also use port X even without explicit port argument
     786 CBC           4 :                 println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
     787                 : 
     788               4 :                 let ep = cplane.new_endpoint(
     789               4 :                     endpoint_id,
     790               4 :                     tenant_id,
     791               4 :                     timeline_id,
     792               4 :                     pg_port,
     793               4 :                     http_port,
     794               4 :                     pg_version,
     795               4 :                     mode,
     796               4 :                     pageserver_id,
     797               4 :                 )?;
     798               4 :                 ep.start(&auth_token, safekeepers, remote_ext_config)?;
     799                 :             }
     800                 :         }
     801             630 :         "stop" => {
     802             630 :             let endpoint_id = sub_args
     803             630 :                 .get_one::<String>("endpoint_id")
     804             630 :                 .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
     805             630 :             let destroy = sub_args.get_flag("destroy");
     806                 : 
     807             630 :             let endpoint = cplane
     808             630 :                 .endpoints
     809             630 :                 .get(endpoint_id.as_str())
     810             630 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
     811             630 :             endpoint.stop(destroy)?;
     812                 :         }
     813                 : 
     814 UBC           0 :         _ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
     815                 :     }
     816                 : 
     817 CBC        1804 :     Ok(())
     818            1832 : }
     819                 : 
     820               1 : fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
     821               1 :     let (sub_name, sub_args) = match sub_match.subcommand() {
     822               1 :         Some(ep_subcommand_data) => ep_subcommand_data,
     823 UBC           0 :         None => bail!("no mappings subcommand provided"),
     824                 :     };
     825                 : 
     826 CBC           1 :     match sub_name {
     827               1 :         "map" => {
     828               1 :             let branch_name = sub_args
     829               1 :                 .get_one::<String>("branch-name")
     830               1 :                 .expect("branch-name argument missing");
     831               1 : 
     832               1 :             let tenant_id = sub_args
     833               1 :                 .get_one::<String>("tenant-id")
     834               1 :                 .map(|x| TenantId::from_str(x))
     835               1 :                 .expect("tenant-id argument missing")
     836               1 :                 .expect("malformed tenant-id arg");
     837               1 : 
     838               1 :             let timeline_id = sub_args
     839               1 :                 .get_one::<String>("timeline-id")
     840               1 :                 .map(|x| TimelineId::from_str(x))
     841               1 :                 .expect("timeline-id argument missing")
     842               1 :                 .expect("malformed timeline-id arg");
     843               1 : 
     844               1 :             env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
     845                 : 
     846               1 :             Ok(())
     847                 :         }
     848 UBC           0 :         other => unimplemented!("mappings subcommand {other}"),
     849                 :     }
     850 CBC           1 : }
     851                 : 
     852            1112 : fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
     853            1112 :     fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
     854            1112 :         let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
     855            1112 :             NodeId(id_str.parse().context("while parsing pageserver id")?)
     856            1112 :         } else {
     857            1112 :             DEFAULT_PAGESERVER_ID
     858            1112 :         };
     859            1112 : 
     860            1112 :         Ok(PageServerNode::from_env(
     861            1112 :             env,
     862            1112 :             env.get_pageserver_conf(node_id)?,
     863            1112 :         ))
     864            1112 :     }
     865            1112 : 
     866            1112 :     match sub_match.subcommand() {
     867            1112 :         Some(("start", subcommand_args)) => {
     868             554 :             if let Err(e) = get_pageserver(env, subcommand_args)?
     869             554 :                 .start(&pageserver_config_overrides(subcommand_args))
     870                 :             {
     871 UBC           0 :                 eprintln!("pageserver start failed: {e}");
     872               0 :                 exit(1);
     873 CBC         554 :             }
     874                 :         }
     875                 : 
     876             558 :         Some(("stop", subcommand_args)) => {
     877             558 :             let immediate = subcommand_args
     878             558 :                 .get_one::<String>("stop-mode")
     879             558 :                 .map(|s| s.as_str())
     880             558 :                 == Some("immediate");
     881                 : 
     882             558 :             if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) {
     883 UBC           0 :                 eprintln!("pageserver stop failed: {}", e);
     884               0 :                 exit(1);
     885 CBC         557 :             }
     886                 :         }
     887                 : 
     888 UBC           0 :         Some(("restart", subcommand_args)) => {
     889               0 :             let pageserver = get_pageserver(env, subcommand_args)?;
     890                 :             //TODO what shutdown strategy should we use here?
     891               0 :             if let Err(e) = pageserver.stop(false) {
     892               0 :                 eprintln!("pageserver stop failed: {}", e);
     893               0 :                 exit(1);
     894               0 :             }
     895                 : 
     896               0 :             if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
     897               0 :                 eprintln!("pageserver start failed: {e}");
     898               0 :                 exit(1);
     899               0 :             }
     900                 :         }
     901                 : 
     902               0 :         Some(("status", subcommand_args)) => {
     903               0 :             match get_pageserver(env, subcommand_args)?.check_status() {
     904               0 :                 Ok(_) => println!("Page server is up and running"),
     905               0 :                 Err(err) => {
     906               0 :                     eprintln!("Page server is not available: {}", err);
     907               0 :                     exit(1);
     908                 :                 }
     909                 :             }
     910                 :         }
     911                 : 
     912               0 :         Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
     913               0 :         None => bail!("no pageserver subcommand provided"),
     914                 :     }
     915 CBC        1111 :     Ok(())
     916            1112 : }
     917                 : 
     918              26 : fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
     919              26 :     let svc = AttachmentService::from_env(env);
     920              26 :     match sub_match.subcommand() {
     921              26 :         Some(("start", _start_match)) => {
     922              13 :             if let Err(e) = svc.start() {
     923 UBC           0 :                 eprintln!("start failed: {e}");
     924               0 :                 exit(1);
     925 CBC          13 :             }
     926                 :         }
     927                 : 
     928              13 :         Some(("stop", stop_match)) => {
     929              13 :             let immediate = stop_match
     930              13 :                 .get_one::<String>("stop-mode")
     931              13 :                 .map(|s| s.as_str())
     932              13 :                 == Some("immediate");
     933                 : 
     934              13 :             if let Err(e) = svc.stop(immediate) {
     935 UBC           0 :                 eprintln!("stop failed: {}", e);
     936               0 :                 exit(1);
     937 CBC          13 :             }
     938                 :         }
     939 UBC           0 :         Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
     940               0 :         None => bail!("no attachment_service subcommand provided"),
     941                 :     }
     942 CBC          26 :     Ok(())
     943              26 : }
     944                 : 
     945                 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
     946            1343 :     if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
     947            1011 :         Ok(SafekeeperNode::from_env(env, node))
     948                 :     } else {
     949 UBC           0 :         bail!("could not find safekeeper {id}")
     950                 :     }
     951 CBC        1011 : }
     952                 : 
     953                 : // Get list of options to append to safekeeper command invocation.
     954             490 : fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec<String> {
     955             490 :     init_match
     956             490 :         .get_many::<String>("safekeeper-extra-opt")
     957             490 :         .into_iter()
     958             490 :         .flatten()
     959             490 :         .map(|s| s.to_owned())
     960             490 :         .collect()
     961             490 : }
     962                 : 
     963            1011 : fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
     964            1011 :     let (sub_name, sub_args) = match sub_match.subcommand() {
     965            1011 :         Some(safekeeper_command_data) => safekeeper_command_data,
     966 UBC           0 :         None => bail!("no safekeeper subcommand provided"),
     967                 :     };
     968                 : 
     969                 :     // All the commands take an optional safekeeper name argument
     970 CBC        1011 :     let sk_id = if let Some(id_str) = sub_args.get_one::<String>("id") {
     971            1002 :         NodeId(id_str.parse().context("while parsing safekeeper id")?)
     972                 :     } else {
     973               9 :         DEFAULT_SAFEKEEPER_ID
     974                 :     };
     975            1011 :     let safekeeper = get_safekeeper(env, sk_id)?;
     976                 : 
     977            1011 :     match sub_name {
     978            1011 :         "start" => {
     979             490 :             let extra_opts = safekeeper_extra_opts(sub_args);
     980                 : 
     981             490 :             if let Err(e) = safekeeper.start(extra_opts) {
     982 UBC           0 :                 eprintln!("safekeeper start failed: {}", e);
     983               0 :                 exit(1);
     984 CBC         490 :             }
     985                 :         }
     986                 : 
     987             521 :         "stop" => {
     988             521 :             let immediate =
     989             521 :                 sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
     990                 : 
     991             521 :             if let Err(e) = safekeeper.stop(immediate) {
     992 UBC           0 :                 eprintln!("safekeeper stop failed: {}", e);
     993               0 :                 exit(1);
     994 CBC         521 :             }
     995                 :         }
     996                 : 
     997 UBC           0 :         "restart" => {
     998               0 :             let immediate =
     999               0 :                 sub_args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
    1000                 : 
    1001               0 :             if let Err(e) = safekeeper.stop(immediate) {
    1002               0 :                 eprintln!("safekeeper stop failed: {}", e);
    1003               0 :                 exit(1);
    1004               0 :             }
    1005               0 : 
    1006               0 :             let extra_opts = safekeeper_extra_opts(sub_args);
    1007               0 :             if let Err(e) = safekeeper.start(extra_opts) {
    1008               0 :                 eprintln!("safekeeper start failed: {}", e);
    1009               0 :                 exit(1);
    1010               0 :             }
    1011                 :         }
    1012                 : 
    1013                 :         _ => {
    1014               0 :             bail!("Unexpected safekeeper subcommand '{}'", sub_name)
    1015                 :         }
    1016                 :     }
    1017 CBC        1011 :     Ok(())
    1018            1011 : }
    1019                 : 
    1020               5 : fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
    1021               5 :     // Endpoints are not started automatically
    1022               5 : 
    1023               5 :     broker::start_broker_process(env)?;
    1024                 : 
    1025                 :     // Only start the attachment service if the pageserver is configured to need it
    1026               5 :     if env.control_plane_api.is_some() {
    1027 UBC           0 :         let attachment_service = AttachmentService::from_env(env);
    1028               0 :         if let Err(e) = attachment_service.start() {
    1029               0 :             eprintln!("attachment_service start failed: {:#}", e);
    1030               0 :             try_stop_all(env, true);
    1031               0 :             exit(1);
    1032               0 :         }
    1033 CBC           5 :     }
    1034                 : 
    1035              11 :     for ps_conf in &env.pageservers {
    1036               6 :         let pageserver = PageServerNode::from_env(env, ps_conf);
    1037               6 :         if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
    1038 UBC           0 :             eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
    1039               0 :             try_stop_all(env, true);
    1040               0 :             exit(1);
    1041 CBC           6 :         }
    1042                 :     }
    1043                 : 
    1044              10 :     for node in env.safekeepers.iter() {
    1045              10 :         let safekeeper = SafekeeperNode::from_env(env, node);
    1046              10 :         if let Err(e) = safekeeper.start(vec![]) {
    1047 UBC           0 :             eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
    1048               0 :             try_stop_all(env, false);
    1049               0 :             exit(1);
    1050 CBC          10 :         }
    1051                 :     }
    1052               5 :     Ok(())
    1053               5 : }
    1054                 : 
    1055               5 : fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
    1056               5 :     let immediate =
    1057               5 :         sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
    1058               5 : 
    1059               5 :     try_stop_all(env, immediate);
    1060               5 : 
    1061               5 :     Ok(())
    1062               5 : }
    1063                 : 
    1064               5 : fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
    1065               5 :     // Stop all endpoints
    1066               5 :     match ComputeControlPlane::load(env.clone()) {
    1067               5 :         Ok(cplane) => {
    1068               9 :             for (_k, node) in cplane.endpoints {
    1069               4 :                 if let Err(e) = node.stop(false) {
    1070               2 :                     eprintln!("postgres stop failed: {e:#}");
    1071               2 :                 }
    1072                 :             }
    1073                 :         }
    1074 UBC           0 :         Err(e) => {
    1075               0 :             eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
    1076                 :         }
    1077                 :     }
    1078                 : 
    1079 CBC          11 :     for ps_conf in &env.pageservers {
    1080               6 :         let pageserver = PageServerNode::from_env(env, ps_conf);
    1081               6 :         if let Err(e) = pageserver.stop(immediate) {
    1082 UBC           0 :             eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
    1083 CBC           6 :         }
    1084                 :     }
    1085                 : 
    1086              10 :     for node in env.safekeepers.iter() {
    1087              10 :         let safekeeper = SafekeeperNode::from_env(env, node);
    1088              10 :         if let Err(e) = safekeeper.stop(immediate) {
    1089 UBC           0 :             eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
    1090 CBC          10 :         }
    1091                 :     }
    1092                 : 
    1093               5 :     if let Err(e) = broker::stop_broker_process(env) {
    1094 UBC           0 :         eprintln!("neon broker stop failed: {e:#}");
    1095 CBC           5 :     }
    1096                 : 
    1097               5 :     if env.control_plane_api.is_some() {
    1098 UBC           0 :         let attachment_service = AttachmentService::from_env(env);
    1099               0 :         if let Err(e) = attachment_service.stop(immediate) {
    1100               0 :             eprintln!("attachment service stop failed: {e:#}");
    1101               0 :         }
    1102 CBC           5 :     }
    1103               5 : }
    1104                 : 
    1105            5188 : fn cli() -> Command {
    1106            5188 :     let branch_name_arg = Arg::new("branch-name")
    1107            5188 :         .long("branch-name")
    1108            5188 :         .help("Name of the branch to be created or used as an alias for other services")
    1109            5188 :         .required(false);
    1110            5188 : 
    1111            5188 :     let endpoint_id_arg = Arg::new("endpoint_id")
    1112            5188 :         .help("Postgres endpoint id")
    1113            5188 :         .required(false);
    1114            5188 : 
    1115            5188 :     let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
    1116            5188 : 
    1117            5188 :     // --id, when using a pageserver command
    1118            5188 :     let pageserver_id_arg = Arg::new("pageserver-id")
    1119            5188 :         .long("id")
    1120            5188 :         .global(true)
    1121            5188 :         .help("pageserver id")
    1122            5188 :         .required(false);
    1123            5188 :     // --pageserver-id when using a non-pageserver command
    1124            5188 :     let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id")
    1125            5188 :         .long("pageserver-id")
    1126            5188 :         .required(false);
    1127            5188 : 
    1128            5188 :     let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt")
    1129            5188 :         .short('e')
    1130            5188 :         .long("safekeeper-extra-opt")
    1131            5188 :         .num_args(1)
    1132            5188 :         .action(ArgAction::Append)
    1133            5188 :         .help("Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo")
    1134            5188 :         .required(false);
    1135            5188 : 
    1136            5188 :     let tenant_id_arg = Arg::new("tenant-id")
    1137            5188 :         .long("tenant-id")
    1138            5188 :         .help("Tenant id. Represented as a hexadecimal string 32 symbols length")
    1139            5188 :         .required(false);
    1140            5188 : 
    1141            5188 :     let timeline_id_arg = Arg::new("timeline-id")
    1142            5188 :         .long("timeline-id")
    1143            5188 :         .help("Timeline id. Represented as a hexadecimal string 32 symbols length")
    1144            5188 :         .required(false);
    1145            5188 : 
    1146            5188 :     let pg_version_arg = Arg::new("pg-version")
    1147            5188 :         .long("pg-version")
    1148            5188 :         .help("Postgres version to use for the initial tenant")
    1149            5188 :         .required(false)
    1150            5188 :         .value_parser(value_parser!(u32))
    1151            5188 :         .default_value(DEFAULT_PG_VERSION);
    1152            5188 : 
    1153            5188 :     let pg_port_arg = Arg::new("pg-port")
    1154            5188 :         .long("pg-port")
    1155            5188 :         .required(false)
    1156            5188 :         .value_parser(value_parser!(u16))
    1157            5188 :         .value_name("pg-port");
    1158            5188 : 
    1159            5188 :     let http_port_arg = Arg::new("http-port")
    1160            5188 :         .long("http-port")
    1161            5188 :         .required(false)
    1162            5188 :         .value_parser(value_parser!(u16))
    1163            5188 :         .value_name("http-port");
    1164            5188 : 
    1165            5188 :     let safekeepers_arg = Arg::new("safekeepers")
    1166            5188 :         .long("safekeepers")
    1167            5188 :         .required(false)
    1168            5188 :         .value_name("safekeepers");
    1169            5188 : 
    1170            5188 :     let stop_mode_arg = Arg::new("stop-mode")
    1171            5188 :         .short('m')
    1172            5188 :         .value_parser(["fast", "immediate"])
    1173            5188 :         .default_value("fast")
    1174            5188 :         .help("If 'immediate', don't flush repository data at shutdown")
    1175            5188 :         .required(false)
    1176            5188 :         .value_name("stop-mode");
    1177            5188 : 
    1178            5188 :     let pageserver_config_args = Arg::new("pageserver-config-override")
    1179            5188 :         .long("pageserver-config-override")
    1180            5188 :         .num_args(1)
    1181            5188 :         .action(ArgAction::Append)
    1182            5188 :         .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
    1183            5188 :         .required(false);
    1184            5188 : 
    1185            5188 :     let remote_ext_config_args = Arg::new("remote-ext-config")
    1186            5188 :         .long("remote-ext-config")
    1187            5188 :         .num_args(1)
    1188            5188 :         .help("Configure the S3 bucket that we search for extensions in.")
    1189            5188 :         .required(false);
    1190            5188 : 
    1191            5188 :     let lsn_arg = Arg::new("lsn")
    1192            5188 :         .long("lsn")
    1193            5188 :         .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
    1194            5188 :         .required(false);
    1195            5188 : 
    1196            5188 :     let hot_standby_arg = Arg::new("hot-standby")
    1197            5188 :         .value_parser(value_parser!(bool))
    1198            5188 :         .long("hot-standby")
    1199            5188 :         .help("If set, the node will be a hot replica on the specified timeline")
    1200            5188 :         .required(false);
    1201            5188 : 
    1202            5188 :     let force_arg = Arg::new("force")
    1203            5188 :         .value_parser(value_parser!(bool))
    1204            5188 :         .long("force")
    1205            5188 :         .action(ArgAction::SetTrue)
    1206            5188 :         .help("Force initialization even if the repository is not empty")
    1207            5188 :         .required(false);
    1208            5188 : 
    1209            5188 :     Command::new("Neon CLI")
    1210            5188 :         .arg_required_else_help(true)
    1211            5188 :         .version(GIT_VERSION)
    1212            5188 :         .subcommand(
    1213            5188 :             Command::new("init")
    1214            5188 :                 .about("Initialize a new Neon repository, preparing configs for services to start with")
    1215            5188 :                 .arg(pageserver_config_args.clone())
    1216            5188 :                 .arg(
    1217            5188 :                     Arg::new("config")
    1218            5188 :                         .long("config")
    1219            5188 :                         .required(false)
    1220            5188 :                         .value_parser(value_parser!(PathBuf))
    1221            5188 :                         .value_name("config"),
    1222            5188 :                 )
    1223            5188 :                 .arg(pg_version_arg.clone())
    1224            5188 :                 .arg(force_arg)
    1225            5188 :         )
    1226            5188 :         .subcommand(
    1227            5188 :             Command::new("timeline")
    1228            5188 :             .about("Manage timelines")
    1229            5188 :             .subcommand(Command::new("list")
    1230            5188 :                 .about("List all timelines, available to this pageserver")
    1231            5188 :                 .arg(tenant_id_arg.clone()))
    1232            5188 :             .subcommand(Command::new("branch")
    1233            5188 :                 .about("Create a new timeline, using another timeline as a base, copying its data")
    1234            5188 :                 .arg(tenant_id_arg.clone())
    1235            5188 :                 .arg(branch_name_arg.clone())
    1236            5188 :                 .arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
    1237            5188 :                     .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
    1238            5188 :                 .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
    1239            5188 :                     .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
    1240            5188 :             .subcommand(Command::new("create")
    1241            5188 :                 .about("Create a new blank timeline")
    1242            5188 :                 .arg(tenant_id_arg.clone())
    1243            5188 :                 .arg(branch_name_arg.clone())
    1244            5188 :                 .arg(pg_version_arg.clone())
    1245            5188 :             )
    1246            5188 :             .subcommand(Command::new("import")
    1247            5188 :                 .about("Import timeline from basebackup directory")
    1248            5188 :                 .arg(tenant_id_arg.clone())
    1249            5188 :                 .arg(timeline_id_arg.clone())
    1250            5188 :                 .arg(Arg::new("node-name").long("node-name")
    1251            5188 :                     .help("Name to assign to the imported timeline"))
    1252            5188 :                 .arg(Arg::new("base-tarfile")
    1253            5188 :                     .long("base-tarfile")
    1254            5188 :                     .value_parser(value_parser!(PathBuf))
    1255            5188 :                     .help("Basebackup tarfile to import")
    1256            5188 :                 )
    1257            5188 :                 .arg(Arg::new("base-lsn").long("base-lsn")
    1258            5188 :                     .help("Lsn the basebackup starts at"))
    1259            5188 :                 .arg(Arg::new("wal-tarfile")
    1260            5188 :                     .long("wal-tarfile")
    1261            5188 :                     .value_parser(value_parser!(PathBuf))
    1262            5188 :                     .help("Wal to add after base")
    1263            5188 :                 )
    1264            5188 :                 .arg(Arg::new("end-lsn").long("end-lsn")
    1265            5188 :                     .help("Lsn the basebackup ends at"))
    1266            5188 :                 .arg(pg_version_arg.clone())
    1267            5188 :             )
    1268            5188 :         ).subcommand(
    1269            5188 :             Command::new("tenant")
    1270            5188 :             .arg_required_else_help(true)
    1271            5188 :             .about("Manage tenants")
    1272            5188 :             .subcommand(Command::new("list"))
    1273            5188 :             .subcommand(Command::new("create")
    1274            5188 :                 .arg(tenant_id_arg.clone())
    1275            5188 :                 .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
    1276            5188 :                 .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false))
    1277            5188 :                 .arg(pg_version_arg.clone())
    1278            5188 :                 .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false)
    1279            5188 :                     .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified"))
    1280            5188 :                 )
    1281            5188 :             .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true))
    1282            5188 :                 .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"))
    1283            5188 :             .subcommand(Command::new("config")
    1284            5188 :                 .arg(tenant_id_arg.clone())
    1285            5188 :                 .arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
    1286            5188 :         )
    1287            5188 :         .subcommand(
    1288            5188 :             Command::new("pageserver")
    1289            5188 :                 .arg_required_else_help(true)
    1290            5188 :                 .about("Manage pageserver")
    1291            5188 :                 .arg(pageserver_id_arg)
    1292            5188 :                 .subcommand(Command::new("status"))
    1293            5188 :                 .subcommand(Command::new("start")
    1294            5188 :                     .about("Start local pageserver")
    1295            5188 :                     .arg(pageserver_config_args.clone())
    1296            5188 :                 )
    1297            5188 :                 .subcommand(Command::new("stop")
    1298            5188 :                     .about("Stop local pageserver")
    1299            5188 :                     .arg(stop_mode_arg.clone())
    1300            5188 :                 )
    1301            5188 :                 .subcommand(Command::new("restart")
    1302            5188 :                     .about("Restart local pageserver")
    1303            5188 :                     .arg(pageserver_config_args.clone())
    1304            5188 :                 )
    1305            5188 :         )
    1306            5188 :         .subcommand(
    1307            5188 :             Command::new("attachment_service")
    1308            5188 :                 .arg_required_else_help(true)
    1309            5188 :                 .about("Manage attachment_service")
    1310            5188 :                 .subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
    1311            5188 :                 .subcommand(Command::new("stop").about("Stop local pageserver")
    1312            5188 :                             .arg(stop_mode_arg.clone()))
    1313            5188 :         )
    1314            5188 :         .subcommand(
    1315            5188 :             Command::new("safekeeper")
    1316            5188 :                 .arg_required_else_help(true)
    1317            5188 :                 .about("Manage safekeepers")
    1318            5188 :                 .subcommand(Command::new("start")
    1319            5188 :                             .about("Start local safekeeper")
    1320            5188 :                             .arg(safekeeper_id_arg.clone())
    1321            5188 :                             .arg(safekeeper_extra_opt_arg.clone())
    1322            5188 :                 )
    1323            5188 :                 .subcommand(Command::new("stop")
    1324            5188 :                             .about("Stop local safekeeper")
    1325            5188 :                             .arg(safekeeper_id_arg.clone())
    1326            5188 :                             .arg(stop_mode_arg.clone())
    1327            5188 :                 )
    1328            5188 :                 .subcommand(Command::new("restart")
    1329            5188 :                             .about("Restart local safekeeper")
    1330            5188 :                             .arg(safekeeper_id_arg)
    1331            5188 :                             .arg(stop_mode_arg.clone())
    1332            5188 :                             .arg(safekeeper_extra_opt_arg)
    1333            5188 :                 )
    1334            5188 :         )
    1335            5188 :         .subcommand(
    1336            5188 :             Command::new("endpoint")
    1337            5188 :                 .arg_required_else_help(true)
    1338            5188 :                 .about("Manage postgres instances")
    1339            5188 :                 .subcommand(Command::new("list").arg(tenant_id_arg.clone()))
    1340            5188 :                 .subcommand(Command::new("create")
    1341            5188 :                     .about("Create a compute endpoint")
    1342            5188 :                     .arg(endpoint_id_arg.clone())
    1343            5188 :                     .arg(branch_name_arg.clone())
    1344            5188 :                     .arg(tenant_id_arg.clone())
    1345            5188 :                     .arg(lsn_arg.clone())
    1346            5188 :                     .arg(pg_port_arg.clone())
    1347            5188 :                     .arg(http_port_arg.clone())
    1348            5188 :                     .arg(endpoint_pageserver_id_arg.clone())
    1349            5188 :                     .arg(
    1350            5188 :                         Arg::new("config-only")
    1351            5188 :                             .help("Don't do basebackup, create endpoint directory with only config files")
    1352            5188 :                             .long("config-only")
    1353            5188 :                             .required(false))
    1354            5188 :                     .arg(pg_version_arg.clone())
    1355            5188 :                     .arg(hot_standby_arg.clone())
    1356            5188 :                 )
    1357            5188 :                 .subcommand(Command::new("start")
    1358            5188 :                     .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
    1359            5188 :                     .arg(endpoint_id_arg.clone())
    1360            5188 :                     .arg(tenant_id_arg.clone())
    1361            5188 :                     .arg(branch_name_arg.clone())
    1362            5188 :                     .arg(timeline_id_arg.clone())
    1363            5188 :                     .arg(lsn_arg)
    1364            5188 :                     .arg(pg_port_arg)
    1365            5188 :                     .arg(http_port_arg)
    1366            5188 :                     .arg(endpoint_pageserver_id_arg.clone())
    1367            5188 :                     .arg(pg_version_arg)
    1368            5188 :                     .arg(hot_standby_arg)
    1369            5188 :                     .arg(safekeepers_arg)
    1370            5188 :                     .arg(remote_ext_config_args)
    1371            5188 :                 )
    1372            5188 :                 .subcommand(
    1373            5188 :                     Command::new("stop")
    1374            5188 :                     .arg(endpoint_id_arg)
    1375            5188 :                     .arg(tenant_id_arg.clone())
    1376            5188 :                     .arg(
    1377            5188 :                         Arg::new("destroy")
    1378            5188 :                             .help("Also delete data directory (now optional, should be default in future)")
    1379            5188 :                             .long("destroy")
    1380            5188 :                             .action(ArgAction::SetTrue)
    1381            5188 :                             .required(false)
    1382            5188 :                         )
    1383            5188 :                 )
    1384            5188 : 
    1385            5188 :         )
    1386            5188 :         .subcommand(
    1387            5188 :             Command::new("mappings")
    1388            5188 :                 .arg_required_else_help(true)
    1389            5188 :                 .about("Manage neon_local branch name mappings")
    1390            5188 :                 .subcommand(
    1391            5188 :                     Command::new("map")
    1392            5188 :                         .about("Create new mapping which cannot exist already")
    1393            5188 :                         .arg(branch_name_arg.clone())
    1394            5188 :                         .arg(tenant_id_arg.clone())
    1395            5188 :                         .arg(timeline_id_arg.clone())
    1396            5188 :                 )
    1397            5188 :         )
    1398            5188 :         // Obsolete old name for 'endpoint'. We now just print an error if it's used.
    1399            5188 :         .subcommand(
    1400            5188 :             Command::new("pg")
    1401            5188 :                 .hide(true)
    1402            5188 :                 .arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
    1403            5188 :                 .trailing_var_arg(true)
    1404            5188 :         )
    1405            5188 :         .subcommand(
    1406            5188 :             Command::new("start")
    1407            5188 :                 .about("Start page server and safekeepers")
    1408            5188 :                 .arg(pageserver_config_args)
    1409            5188 :         )
    1410            5188 :         .subcommand(
    1411            5188 :             Command::new("stop")
    1412            5188 :                 .about("Stop page server and safekeepers")
    1413            5188 :                 .arg(stop_mode_arg)
    1414            5188 :         )
    1415            5188 : }
    1416                 : 
    1417               1 : #[test]
    1418               1 : fn verify_cli() {
    1419               1 :     cli().debug_assert();
    1420               1 : }
        

Generated by: LCOV version 2.1-beta