LCOV - code coverage report
Current view: top level - control_plane/src/bin - neon_local.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 0.0 % 1035 0
Test Date: 2024-11-13 18:23:39 Functions: 0.0 % 471 0

            Line data    Source code
       1              : //!
       2              : //! `neon_local` is an executable that can be used to create a local
       3              : //! Neon environment, for testing purposes. The local environment is
       4              : //! quite different from the cloud environment with Kubernetes, but it
       5              : //! easier to work with locally. The python tests in `test_runner`
       6              : //! rely on `neon_local` to set up the environment for each test.
       7              : //!
       8              : use anyhow::{anyhow, bail, Context, Result};
       9              : use clap::Parser;
      10              : use compute_api::spec::ComputeMode;
      11              : use control_plane::endpoint::ComputeControlPlane;
      12              : use control_plane::local_env::{
      13              :     InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
      14              :     SafekeeperConf,
      15              : };
      16              : use control_plane::pageserver::PageServerNode;
      17              : use control_plane::safekeeper::SafekeeperNode;
      18              : use control_plane::storage_controller::{
      19              :     NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
      20              : };
      21              : use control_plane::{broker, local_env};
      22              : use pageserver_api::config::{
      23              :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
      24              :     DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
      25              : };
      26              : use pageserver_api::controller_api::{
      27              :     NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
      28              : };
      29              : use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
      30              : use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
      31              : use postgres_backend::AuthType;
      32              : use postgres_connection::parse_host_port;
      33              : use safekeeper_api::{
      34              :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
      35              :     DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
      36              : };
      37              : use std::borrow::Cow;
      38              : use std::collections::{BTreeSet, HashMap};
      39              : use std::path::PathBuf;
      40              : use std::process::exit;
      41              : use std::str::FromStr;
      42              : use std::time::Duration;
      43              : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
      44              : use tokio::task::JoinSet;
      45              : use url::Host;
      46              : use utils::{
      47              :     auth::{Claims, Scope},
      48              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      49              :     lsn::Lsn,
      50              :     project_git_version,
      51              : };
      52              : 
      53              : // Default id of a safekeeper node, if not specified on the command line.
      54              : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
      55              : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
      56              : const DEFAULT_BRANCH_NAME: &str = "main";
      57              : project_git_version!(GIT_VERSION);
      58              : 
      59              : const DEFAULT_PG_VERSION: u32 = 16;
      60              : 
      61              : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
      62              : 
      63            0 : #[derive(clap::Parser)]
      64              : #[command(version = GIT_VERSION, about, name = "Neon CLI")]
      65              : struct Cli {
      66              :     #[command(subcommand)]
      67              :     command: NeonLocalCmd,
      68              : }
      69              : 
      70            0 : #[derive(clap::Subcommand)]
      71              : enum NeonLocalCmd {
      72              :     Init(InitCmdArgs),
      73              : 
      74              :     #[command(subcommand)]
      75              :     Tenant(TenantCmd),
      76              :     #[command(subcommand)]
      77              :     Timeline(TimelineCmd),
      78              :     #[command(subcommand)]
      79              :     Pageserver(PageserverCmd),
      80              :     #[command(subcommand)]
      81              :     #[clap(alias = "storage_controller")]
      82              :     StorageController(StorageControllerCmd),
      83              :     #[command(subcommand)]
      84              :     #[clap(alias = "storage_broker")]
      85              :     StorageBroker(StorageBrokerCmd),
      86              :     #[command(subcommand)]
      87              :     Safekeeper(SafekeeperCmd),
      88              :     #[command(subcommand)]
      89              :     Endpoint(EndpointCmd),
      90              :     #[command(subcommand)]
      91              :     Mappings(MappingsCmd),
      92              : 
      93              :     Start(StartCmdArgs),
      94              :     Stop(StopCmdArgs),
      95              : }
      96              : 
      97            0 : #[derive(clap::Args)]
      98              : #[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")]
      99              : struct InitCmdArgs {
     100              :     #[clap(long, help("How many pageservers to create (default 1)"))]
     101              :     num_pageservers: Option<u16>,
     102              : 
     103              :     #[clap(long)]
     104              :     config: Option<PathBuf>,
     105              : 
     106              :     #[clap(long, help("Force initialization even if the repository is not empty"))]
     107              :     #[arg(value_parser)]
     108              :     #[clap(default_value = "must-not-exist")]
     109            0 :     force: InitForceMode,
     110              : }
     111              : 
     112            0 : #[derive(clap::Args)]
     113              : #[clap(about = "Start pageserver and safekeepers")]
     114              : struct StartCmdArgs {
     115              :     #[clap(long = "start-timeout", default_value = "10s")]
     116            0 :     timeout: humantime::Duration,
     117              : }
     118              : 
     119            0 : #[derive(clap::Args)]
     120              : #[clap(about = "Stop pageserver and safekeepers")]
     121              : struct StopCmdArgs {
     122              :     #[arg(value_enum)]
     123            0 :     #[clap(long, default_value_t = StopMode::Fast)]
     124            0 :     mode: StopMode,
     125              : }
     126              : 
     127            0 : #[derive(Clone, Copy, clap::ValueEnum)]
     128              : enum StopMode {
     129              :     Fast,
     130              :     Immediate,
     131              : }
     132              : 
     133            0 : #[derive(clap::Subcommand)]
     134              : #[clap(about = "Manage tenants")]
     135              : enum TenantCmd {
     136              :     List,
     137              :     Create(TenantCreateCmdArgs),
     138              :     SetDefault(TenantSetDefaultCmdArgs),
     139              :     Config(TenantConfigCmdArgs),
     140              :     Import(TenantImportCmdArgs),
     141              : }
     142              : 
     143            0 : #[derive(clap::Args)]
     144              : struct TenantCreateCmdArgs {
     145              :     #[clap(
     146              :         long = "tenant-id",
     147              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     148              :     )]
     149              :     tenant_id: Option<TenantId>,
     150              : 
     151              :     #[clap(
     152              :         long,
     153              :         help = "Use a specific timeline id when creating a tenant and its initial timeline"
     154              :     )]
     155              :     timeline_id: Option<TimelineId>,
     156              : 
     157              :     #[clap(short = 'c')]
     158            0 :     config: Vec<String>,
     159              : 
     160            0 :     #[arg(default_value_t = DEFAULT_PG_VERSION)]
     161              :     #[clap(long, help = "Postgres version to use for the initial timeline")]
     162            0 :     pg_version: u32,
     163              : 
     164              :     #[clap(
     165              :         long,
     166              :         help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified"
     167              :     )]
     168            0 :     set_default: bool,
     169              : 
     170              :     #[clap(long, help = "Number of shards in the new tenant")]
     171            0 :     #[arg(default_value_t = 0)]
     172            0 :     shard_count: u8,
     173              :     #[clap(long, help = "Sharding stripe size in pages")]
     174              :     shard_stripe_size: Option<u32>,
     175              : 
     176              :     #[clap(long, help = "Placement policy shards in this tenant")]
     177              :     #[arg(value_parser = parse_placement_policy)]
     178              :     placement_policy: Option<PlacementPolicy>,
     179              : }
     180              : 
     181            0 : fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
     182            0 :     Ok(serde_json::from_str::<PlacementPolicy>(s)?)
     183            0 : }
     184              : 
     185            0 : #[derive(clap::Args)]
     186              : #[clap(
     187              :     about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"
     188              : )]
     189              : struct TenantSetDefaultCmdArgs {
     190              :     #[clap(
     191              :         long = "tenant-id",
     192              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     193              :     )]
     194            0 :     tenant_id: TenantId,
     195              : }
     196              : 
     197            0 : #[derive(clap::Args)]
     198              : struct TenantConfigCmdArgs {
     199              :     #[clap(
     200              :         long = "tenant-id",
     201              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     202              :     )]
     203              :     tenant_id: Option<TenantId>,
     204              : 
     205              :     #[clap(short = 'c')]
     206            0 :     config: Vec<String>,
     207              : }
     208              : 
     209            0 : #[derive(clap::Args)]
     210              : #[clap(
     211              :     about = "Import a tenant that is present in remote storage, and create branches for its timelines"
     212              : )]
     213              : struct TenantImportCmdArgs {
     214              :     #[clap(
     215              :         long = "tenant-id",
     216              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     217              :     )]
     218            0 :     tenant_id: TenantId,
     219              : }
     220              : 
     221            0 : #[derive(clap::Subcommand)]
     222              : #[clap(about = "Manage timelines")]
     223              : enum TimelineCmd {
     224              :     List(TimelineListCmdArgs),
     225              :     Branch(TimelineBranchCmdArgs),
     226              :     Create(TimelineCreateCmdArgs),
     227              :     Import(TimelineImportCmdArgs),
     228              : }
     229              : 
     230            0 : #[derive(clap::Args)]
     231              : #[clap(about = "List all timelines available to this pageserver")]
     232              : struct TimelineListCmdArgs {
     233              :     #[clap(
     234              :         long = "tenant-id",
     235              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     236              :     )]
     237              :     tenant_shard_id: Option<TenantShardId>,
     238              : }
     239              : 
     240            0 : #[derive(clap::Args)]
     241              : #[clap(about = "Create a new timeline, branching off from another timeline")]
     242              : struct TimelineBranchCmdArgs {
     243              :     #[clap(
     244              :         long = "tenant-id",
     245              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     246              :     )]
     247              :     tenant_id: Option<TenantId>,
     248              : 
     249              :     #[clap(long, help = "New timeline's ID")]
     250              :     timeline_id: Option<TimelineId>,
     251              : 
     252              :     #[clap(long, help = "Human-readable alias for the new timeline")]
     253            0 :     branch_name: String,
     254              : 
     255              :     #[clap(
     256              :         long,
     257              :         help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name."
     258              :     )]
     259              :     ancestor_branch_name: Option<String>,
     260              : 
     261              :     #[clap(
     262              :         long,
     263              :         help = "When using another timeline as base, use a specific Lsn in it instead of the latest one"
     264              :     )]
     265              :     ancestor_start_lsn: Option<Lsn>,
     266              : }
     267              : 
     268            0 : #[derive(clap::Args)]
     269              : #[clap(about = "Create a new blank timeline")]
     270              : struct TimelineCreateCmdArgs {
     271              :     #[clap(
     272              :         long = "tenant-id",
     273              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     274              :     )]
     275              :     tenant_id: Option<TenantId>,
     276              : 
     277              :     #[clap(long, help = "New timeline's ID")]
     278              :     timeline_id: Option<TimelineId>,
     279              : 
     280              :     #[clap(long, help = "Human-readable alias for the new timeline")]
     281            0 :     branch_name: String,
     282              : 
     283            0 :     #[arg(default_value_t = DEFAULT_PG_VERSION)]
     284              :     #[clap(long, help = "Postgres version")]
     285            0 :     pg_version: u32,
     286              : }
     287              : 
     288            0 : #[derive(clap::Args)]
     289              : #[clap(about = "Import timeline from a basebackup directory")]
     290              : struct TimelineImportCmdArgs {
     291              :     #[clap(
     292              :         long = "tenant-id",
     293              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     294              :     )]
     295              :     tenant_id: Option<TenantId>,
     296              : 
     297              :     #[clap(long, help = "New timeline's ID")]
     298            0 :     timeline_id: TimelineId,
     299              : 
     300              :     #[clap(long, help = "Human-readable alias for the new timeline")]
     301            0 :     branch_name: String,
     302              : 
     303              :     #[clap(long, help = "Basebackup tarfile to import")]
     304            0 :     base_tarfile: PathBuf,
     305              : 
     306              :     #[clap(long, help = "Lsn the basebackup starts at")]
     307            0 :     base_lsn: Lsn,
     308              : 
     309              :     #[clap(long, help = "Wal to add after base")]
     310              :     wal_tarfile: Option<PathBuf>,
     311              : 
     312              :     #[clap(long, help = "Lsn the basebackup ends at")]
     313              :     end_lsn: Option<Lsn>,
     314              : 
     315            0 :     #[arg(default_value_t = DEFAULT_PG_VERSION)]
     316              :     #[clap(long, help = "Postgres version of the backup being imported")]
     317            0 :     pg_version: u32,
     318              : }
     319              : 
     320            0 : #[derive(clap::Subcommand)]
     321              : #[clap(about = "Manage pageservers")]
     322              : enum PageserverCmd {
     323              :     Status(PageserverStatusCmdArgs),
     324              :     Start(PageserverStartCmdArgs),
     325              :     Stop(PageserverStopCmdArgs),
     326              :     Restart(PageserverRestartCmdArgs),
     327              : }
     328              : 
     329            0 : #[derive(clap::Args)]
     330              : #[clap(about = "Show status of a local pageserver")]
     331              : struct PageserverStatusCmdArgs {
     332              :     #[clap(long = "id", help = "pageserver id")]
     333              :     pageserver_id: Option<NodeId>,
     334              : }
     335              : 
     336            0 : #[derive(clap::Args)]
     337              : #[clap(about = "Start local pageserver")]
     338              : struct PageserverStartCmdArgs {
     339              :     #[clap(long = "id", help = "pageserver id")]
     340              :     pageserver_id: Option<NodeId>,
     341              : 
     342              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     343              :     #[arg(default_value = "10s")]
     344            0 :     start_timeout: humantime::Duration,
     345              : }
     346              : 
     347            0 : #[derive(clap::Args)]
     348              : #[clap(about = "Stop local pageserver")]
     349              : struct PageserverStopCmdArgs {
     350              :     #[clap(long = "id", help = "pageserver id")]
     351              :     pageserver_id: Option<NodeId>,
     352              : 
     353              :     #[clap(
     354              :         short = 'm',
     355              :         help = "If 'immediate', don't flush repository data at shutdown"
     356              :     )]
     357              :     #[arg(value_enum, default_value = "fast")]
     358            0 :     stop_mode: StopMode,
     359              : }
     360              : 
     361            0 : #[derive(clap::Args)]
     362              : #[clap(about = "Restart local pageserver")]
     363              : struct PageserverRestartCmdArgs {
     364              :     #[clap(long = "id", help = "pageserver id")]
     365              :     pageserver_id: Option<NodeId>,
     366              : 
     367              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     368              :     #[arg(default_value = "10s")]
     369            0 :     start_timeout: humantime::Duration,
     370              : }
     371              : 
     372            0 : #[derive(clap::Subcommand)]
     373              : #[clap(about = "Manage storage controller")]
     374              : enum StorageControllerCmd {
     375              :     Start(StorageControllerStartCmdArgs),
     376              :     Stop(StorageControllerStopCmdArgs),
     377              : }
     378              : 
     379            0 : #[derive(clap::Args)]
     380              : #[clap(about = "Start storage controller")]
     381              : struct StorageControllerStartCmdArgs {
     382              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     383              :     #[arg(default_value = "10s")]
     384            0 :     start_timeout: humantime::Duration,
     385              : 
     386              :     #[clap(
     387              :         long,
     388              :         help = "Identifier used to distinguish storage controller instances"
     389              :     )]
     390            0 :     #[arg(default_value_t = 1)]
     391            0 :     instance_id: u8,
     392              : 
     393              :     #[clap(
     394              :         long,
     395              :         help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
     396              :     )]
     397              :     base_port: Option<u16>,
     398              : }
     399              : 
     400            0 : #[derive(clap::Args)]
     401              : #[clap(about = "Stop storage controller")]
     402              : struct StorageControllerStopCmdArgs {
     403              :     #[clap(
     404              :         short = 'm',
     405              :         help = "If 'immediate', don't flush repository data at shutdown"
     406              :     )]
     407              :     #[arg(value_enum, default_value = "fast")]
     408            0 :     stop_mode: StopMode,
     409              : 
     410              :     #[clap(
     411              :         long,
     412              :         help = "Identifier used to distinguish storage controller instances"
     413              :     )]
     414            0 :     #[arg(default_value_t = 1)]
     415            0 :     instance_id: u8,
     416              : }
     417              : 
     418            0 : #[derive(clap::Subcommand)]
     419              : #[clap(about = "Manage storage broker")]
     420              : enum StorageBrokerCmd {
     421              :     Start(StorageBrokerStartCmdArgs),
     422              :     Stop(StorageBrokerStopCmdArgs),
     423              : }
     424              : 
     425            0 : #[derive(clap::Args)]
     426              : #[clap(about = "Start broker")]
     427              : struct StorageBrokerStartCmdArgs {
     428              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     429              :     #[arg(default_value = "10s")]
     430            0 :     start_timeout: humantime::Duration,
     431              : }
     432              : 
     433            0 : #[derive(clap::Args)]
     434              : #[clap(about = "stop broker")]
     435              : struct StorageBrokerStopCmdArgs {
     436              :     #[clap(
     437              :         short = 'm',
     438              :         help = "If 'immediate', don't flush repository data at shutdown"
     439              :     )]
     440              :     #[arg(value_enum, default_value = "fast")]
     441            0 :     stop_mode: StopMode,
     442              : }
     443              : 
     444            0 : #[derive(clap::Subcommand)]
     445              : #[clap(about = "Manage safekeepers")]
     446              : enum SafekeeperCmd {
     447              :     Start(SafekeeperStartCmdArgs),
     448              :     Stop(SafekeeperStopCmdArgs),
     449              :     Restart(SafekeeperRestartCmdArgs),
     450              : }
     451              : 
     452            0 : #[derive(clap::Args)]
     453              : #[clap(about = "Start local safekeeper")]
     454              : struct SafekeeperStartCmdArgs {
     455              :     #[clap(help = "safekeeper id")]
     456            0 :     #[arg(default_value_t = NodeId(1))]
     457            0 :     id: NodeId,
     458              : 
     459              :     #[clap(
     460              :         short = 'e',
     461              :         long = "safekeeper-extra-opt",
     462              :         help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
     463              :     )]
     464            0 :     extra_opt: Vec<String>,
     465              : 
     466              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     467              :     #[arg(default_value = "10s")]
     468            0 :     start_timeout: humantime::Duration,
     469              : }
     470              : 
     471            0 : #[derive(clap::Args)]
     472              : #[clap(about = "Stop local safekeeper")]
     473              : struct SafekeeperStopCmdArgs {
     474              :     #[clap(help = "safekeeper id")]
     475            0 :     #[arg(default_value_t = NodeId(1))]
     476            0 :     id: NodeId,
     477              : 
     478              :     #[arg(value_enum, default_value = "fast")]
     479              :     #[clap(
     480              :         short = 'm',
     481              :         help = "If 'immediate', don't flush repository data at shutdown"
     482              :     )]
     483            0 :     stop_mode: StopMode,
     484              : }
     485              : 
     486            0 : #[derive(clap::Args)]
     487              : #[clap(about = "Restart local safekeeper")]
     488              : struct SafekeeperRestartCmdArgs {
     489              :     #[clap(help = "safekeeper id")]
     490            0 :     #[arg(default_value_t = NodeId(1))]
     491            0 :     id: NodeId,
     492              : 
     493              :     #[arg(value_enum, default_value = "fast")]
     494              :     #[clap(
     495              :         short = 'm',
     496              :         help = "If 'immediate', don't flush repository data at shutdown"
     497              :     )]
     498            0 :     stop_mode: StopMode,
     499              : 
     500              :     #[clap(
     501              :         short = 'e',
     502              :         long = "safekeeper-extra-opt",
     503              :         help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
     504              :     )]
     505            0 :     extra_opt: Vec<String>,
     506              : 
     507              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     508              :     #[arg(default_value = "10s")]
     509            0 :     start_timeout: humantime::Duration,
     510              : }
     511              : 
     512            0 : #[derive(clap::Subcommand)]
     513              : #[clap(about = "Manage Postgres instances")]
     514              : enum EndpointCmd {
     515              :     List(EndpointListCmdArgs),
     516              :     Create(EndpointCreateCmdArgs),
     517              :     Start(EndpointStartCmdArgs),
     518              :     Reconfigure(EndpointReconfigureCmdArgs),
     519              :     Stop(EndpointStopCmdArgs),
     520              : }
     521              : 
     522            0 : #[derive(clap::Args)]
     523              : #[clap(about = "List endpoints")]
     524              : struct EndpointListCmdArgs {
     525              :     #[clap(
     526              :         long = "tenant-id",
     527              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     528              :     )]
     529              :     tenant_shard_id: Option<TenantShardId>,
     530              : }
     531              : 
     532            0 : #[derive(clap::Args)]
     533              : #[clap(about = "Create a compute endpoint")]
     534              : struct EndpointCreateCmdArgs {
     535              :     #[clap(
     536              :         long = "tenant-id",
     537              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     538              :     )]
     539              :     tenant_id: Option<TenantId>,
     540              : 
     541              :     #[clap(help = "Postgres endpoint id")]
     542              :     endpoint_id: Option<String>,
     543              :     #[clap(long, help = "Name of the branch the endpoint will run on")]
     544              :     branch_name: Option<String>,
     545              :     #[clap(
     546              :         long,
     547              :         help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used"
     548              :     )]
     549              :     lsn: Option<Lsn>,
     550              :     #[clap(long)]
     551              :     pg_port: Option<u16>,
     552              :     #[clap(long)]
     553              :     http_port: Option<u16>,
     554              :     #[clap(long = "pageserver-id")]
     555              :     endpoint_pageserver_id: Option<NodeId>,
     556              : 
     557              :     #[clap(
     558              :         long,
     559              :         help = "Don't do basebackup, create endpoint directory with only config files",
     560              :         action = clap::ArgAction::Set,
     561            0 :         default_value_t = false
     562              :     )]
     563            0 :     config_only: bool,
     564              : 
     565            0 :     #[arg(default_value_t = DEFAULT_PG_VERSION)]
     566              :     #[clap(long, help = "Postgres version")]
     567            0 :     pg_version: u32,
     568              : 
     569              :     #[clap(
     570              :         long,
     571              :         help = "If set, the node will be a hot replica on the specified timeline",
     572              :         action = clap::ArgAction::Set,
     573            0 :         default_value_t = false
     574              :     )]
     575            0 :     hot_standby: bool,
     576              : 
     577              :     #[clap(long, help = "If set, will set up the catalog for neon_superuser")]
     578            0 :     update_catalog: bool,
     579              : 
     580              :     #[clap(
     581              :         long,
     582              :         help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
     583              :     )]
     584            0 :     allow_multiple: bool,
     585              : }
     586              : 
     587            0 : #[derive(clap::Args)]
     588              : #[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
     589              : struct EndpointStartCmdArgs {
     590              :     #[clap(help = "Postgres endpoint id")]
     591            0 :     endpoint_id: String,
     592              :     #[clap(long = "pageserver-id")]
     593              :     endpoint_pageserver_id: Option<NodeId>,
     594              : 
     595              :     #[clap(long)]
     596              :     safekeepers: Option<String>,
     597              : 
     598              :     #[clap(
     599              :         long,
     600              :         help = "Configure the remote extensions storage proxy gateway to request for extensions."
     601              :     )]
     602              :     remote_ext_config: Option<String>,
     603              : 
     604              :     #[clap(
     605              :         long,
     606              :         help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`"
     607              :     )]
     608            0 :     create_test_user: bool,
     609              : 
     610              :     #[clap(
     611              :         long,
     612              :         help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
     613              :     )]
     614            0 :     allow_multiple: bool,
     615              : 
     616              :     #[clap(short = 't', long, help = "timeout until we fail the command")]
     617              :     #[arg(default_value = "10s")]
     618            0 :     start_timeout: humantime::Duration,
     619              : }
     620              : 
     621            0 : #[derive(clap::Args)]
     622              : #[clap(about = "Reconfigure an endpoint")]
     623              : struct EndpointReconfigureCmdArgs {
     624              :     #[clap(
     625              :         long = "tenant-id",
     626              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     627              :     )]
     628              :     tenant_id: Option<TenantId>,
     629              : 
     630              :     #[clap(help = "Postgres endpoint id")]
     631            0 :     endpoint_id: String,
     632              :     #[clap(long = "pageserver-id")]
     633              :     endpoint_pageserver_id: Option<NodeId>,
     634              : 
     635              :     #[clap(long)]
     636              :     safekeepers: Option<String>,
     637              : }
     638              : 
     639            0 : #[derive(clap::Args)]
     640              : #[clap(about = "Stop an endpoint")]
     641              : struct EndpointStopCmdArgs {
     642              :     #[clap(help = "Postgres endpoint id")]
     643            0 :     endpoint_id: String,
     644              : 
     645              :     #[clap(
     646              :         long,
     647              :         help = "Also delete data directory (now optional, should be default in future)"
     648              :     )]
     649            0 :     destroy: bool,
     650              : 
     651              :     #[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")]
     652              :     #[arg(value_parser(["smart", "fast", "immediate"]))]
     653              :     #[arg(default_value = "fast")]
     654            0 :     mode: String,
     655              : }
     656              : 
     657            0 : #[derive(clap::Subcommand)]
     658              : #[clap(about = "Manage neon_local branch name mappings")]
     659              : enum MappingsCmd {
     660              :     Map(MappingsMapCmdArgs),
     661              : }
     662              : 
     663            0 : #[derive(clap::Args)]
     664              : #[clap(about = "Create new mapping which cannot exist already")]
     665              : struct MappingsMapCmdArgs {
     666              :     #[clap(
     667              :         long,
     668              :         help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
     669              :     )]
     670            0 :     tenant_id: TenantId,
     671              :     #[clap(
     672              :         long,
     673              :         help = "Timeline id. Represented as a hexadecimal string 32 symbols length"
     674              :     )]
     675            0 :     timeline_id: TimelineId,
     676              :     #[clap(long, help = "Branch name to give to the timeline")]
     677            0 :     branch_name: String,
     678              : }
     679              : 
     680              : ///
     681              : /// Timelines tree element used as a value in the HashMap.
     682              : ///
     683              : struct TimelineTreeEl {
     684              :     /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
     685              :     pub info: TimelineInfo,
     686              :     /// Name, recovered from neon config mappings
     687              :     pub name: Option<String>,
     688              :     /// Holds all direct children of this timeline referenced using `timeline_id`.
     689              :     pub children: BTreeSet<TimelineId>,
     690              : }
     691              : 
     692              : // Main entry point for the 'neon_local' CLI utility
     693              : //
     694              : // This utility helps to manage neon installation. That includes following:
     695              : //   * Management of local postgres installations running on top of the
     696              : //     pageserver.
     697              : //   * Providing CLI api to the pageserver
     698              : //   * TODO: export/import to/from usual postgres
     699            0 : fn main() -> Result<()> {
     700            0 :     let cli = Cli::parse();
     701              : 
     702              :     // Check for 'neon init' command first.
     703            0 :     let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
     704            0 :         handle_init(&args).map(|env| Some(Cow::Owned(env)))
     705              :     } else {
     706              :         // all other commands need an existing config
     707            0 :         let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
     708            0 :         let original_env = env.clone();
     709            0 :         let env = Box::leak(Box::new(env));
     710            0 :         let rt = tokio::runtime::Builder::new_current_thread()
     711            0 :             .enable_all()
     712            0 :             .build()
     713            0 :             .unwrap();
     714              : 
     715            0 :         let subcommand_result = match cli.command {
     716            0 :             NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"),
     717            0 :             NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)),
     718            0 :             NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)),
     719            0 :             NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)),
     720            0 :             NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)),
     721            0 :             NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)),
     722            0 :             NeonLocalCmd::StorageController(subcmd) => {
     723            0 :                 rt.block_on(handle_storage_controller(&subcmd, env))
     724              :             }
     725            0 :             NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
     726            0 :             NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
     727            0 :             NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
     728            0 :             NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
     729              :         };
     730              : 
     731            0 :         if &original_env != env {
     732            0 :             subcommand_result.map(|()| Some(Cow::Borrowed(env)))
     733              :         } else {
     734            0 :             subcommand_result.map(|()| None)
     735              :         }
     736              :     };
     737              : 
     738            0 :     match subcommand_result {
     739            0 :         Ok(Some(updated_env)) => updated_env.persist_config()?,
     740            0 :         Ok(None) => (),
     741            0 :         Err(e) => {
     742            0 :             eprintln!("command failed: {e:?}");
     743            0 :             exit(1);
     744              :         }
     745              :     }
     746            0 :     Ok(())
     747            0 : }
     748              : 
     749              : ///
     750              : /// Prints timelines list as a tree-like structure.
     751              : ///
     752            0 : fn print_timelines_tree(
     753            0 :     timelines: Vec<TimelineInfo>,
     754            0 :     mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
     755            0 : ) -> Result<()> {
     756            0 :     let mut timelines_hash = timelines
     757            0 :         .iter()
     758            0 :         .map(|t| {
     759            0 :             (
     760            0 :                 t.timeline_id,
     761            0 :                 TimelineTreeEl {
     762            0 :                     info: t.clone(),
     763            0 :                     children: BTreeSet::new(),
     764            0 :                     name: timeline_name_mappings
     765            0 :                         .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
     766            0 :                 },
     767            0 :             )
     768            0 :         })
     769            0 :         .collect::<HashMap<_, _>>();
     770              : 
     771              :     // Memorize all direct children of each timeline.
     772            0 :     for timeline in timelines.iter() {
     773            0 :         if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
     774            0 :             timelines_hash
     775            0 :                 .get_mut(&ancestor_timeline_id)
     776            0 :                 .context("missing timeline info in the HashMap")?
     777              :                 .children
     778            0 :                 .insert(timeline.timeline_id);
     779            0 :         }
     780              :     }
     781              : 
     782            0 :     for timeline in timelines_hash.values() {
     783              :         // Start with root local timelines (no ancestors) first.
     784            0 :         if timeline.info.ancestor_timeline_id.is_none() {
     785            0 :             print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
     786            0 :         }
     787              :     }
     788              : 
     789            0 :     Ok(())
     790            0 : }
     791              : 
     792              : ///
     793              : /// Recursively prints timeline info with all its children.
     794              : ///
     795            0 : fn print_timeline(
     796            0 :     nesting_level: usize,
     797            0 :     is_last: &[bool],
     798            0 :     timeline: &TimelineTreeEl,
     799            0 :     timelines: &HashMap<TimelineId, TimelineTreeEl>,
     800            0 : ) -> Result<()> {
     801            0 :     if nesting_level > 0 {
     802            0 :         let ancestor_lsn = match timeline.info.ancestor_lsn {
     803            0 :             Some(lsn) => lsn.to_string(),
     804            0 :             None => "Unknown Lsn".to_string(),
     805              :         };
     806              : 
     807            0 :         let mut br_sym = "┣━";
     808            0 : 
     809            0 :         // Draw each nesting padding with proper style
     810            0 :         // depending on whether its timeline ended or not.
     811            0 :         if nesting_level > 1 {
     812            0 :             for l in &is_last[1..is_last.len() - 1] {
     813            0 :                 if *l {
     814            0 :                     print!("   ");
     815            0 :                 } else {
     816            0 :                     print!("┃  ");
     817            0 :                 }
     818              :             }
     819            0 :         }
     820              : 
     821              :         // We are the last in this sub-timeline
     822            0 :         if *is_last.last().unwrap() {
     823            0 :             br_sym = "┗━";
     824            0 :         }
     825              : 
     826            0 :         print!("{} @{}: ", br_sym, ancestor_lsn);
     827            0 :     }
     828              : 
     829              :     // Finally print a timeline id and name with new line
     830            0 :     println!(
     831            0 :         "{} [{}]",
     832            0 :         timeline.name.as_deref().unwrap_or("_no_name_"),
     833            0 :         timeline.info.timeline_id
     834            0 :     );
     835            0 : 
     836            0 :     let len = timeline.children.len();
     837            0 :     let mut i: usize = 0;
     838            0 :     let mut is_last_new = Vec::from(is_last);
     839            0 :     is_last_new.push(false);
     840              : 
     841            0 :     for child in &timeline.children {
     842            0 :         i += 1;
     843            0 : 
     844            0 :         // Mark that the last padding is the end of the timeline
     845            0 :         if i == len {
     846            0 :             if let Some(last) = is_last_new.last_mut() {
     847            0 :                 *last = true;
     848            0 :             }
     849            0 :         }
     850              : 
     851              :         print_timeline(
     852            0 :             nesting_level + 1,
     853            0 :             &is_last_new,
     854            0 :             timelines
     855            0 :                 .get(child)
     856            0 :                 .context("missing timeline info in the HashMap")?,
     857            0 :             timelines,
     858            0 :         )?;
     859              :     }
     860              : 
     861            0 :     Ok(())
     862            0 : }
     863              : 
     864              : /// Returns a map of timeline IDs to timeline_id@lsn strings.
     865              : /// Connects to the pageserver to query this information.
     866            0 : async fn get_timeline_infos(
     867            0 :     env: &local_env::LocalEnv,
     868            0 :     tenant_shard_id: &TenantShardId,
     869            0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
     870            0 :     Ok(get_default_pageserver(env)
     871            0 :         .timeline_list(tenant_shard_id)
     872            0 :         .await?
     873            0 :         .into_iter()
     874            0 :         .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
     875            0 :         .collect())
     876            0 : }
     877              : 
     878              : /// Helper function to get tenant id from an optional --tenant_id option or from the config file
     879            0 : fn get_tenant_id(
     880            0 :     tenant_id_arg: Option<TenantId>,
     881            0 :     env: &local_env::LocalEnv,
     882            0 : ) -> anyhow::Result<TenantId> {
     883            0 :     if let Some(tenant_id_from_arguments) = tenant_id_arg {
     884            0 :         Ok(tenant_id_from_arguments)
     885            0 :     } else if let Some(default_id) = env.default_tenant_id {
     886            0 :         Ok(default_id)
     887              :     } else {
     888            0 :         anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
     889              :     }
     890            0 : }
     891              : 
     892              : /// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file,
     893              : /// for commands that accept a shard suffix
     894            0 : fn get_tenant_shard_id(
     895            0 :     tenant_shard_id_arg: Option<TenantShardId>,
     896            0 :     env: &local_env::LocalEnv,
     897            0 : ) -> anyhow::Result<TenantShardId> {
     898            0 :     if let Some(tenant_id_from_arguments) = tenant_shard_id_arg {
     899            0 :         Ok(tenant_id_from_arguments)
     900            0 :     } else if let Some(default_id) = env.default_tenant_id {
     901            0 :         Ok(TenantShardId::unsharded(default_id))
     902              :     } else {
     903            0 :         anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
     904              :     }
     905            0 : }
     906              : 
     907            0 : fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
     908              :     // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
     909            0 :     let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
     910              :         // User (likely the Python test suite) provided a description of the environment.
     911            0 :         if args.num_pageservers.is_some() {
     912            0 :             bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
     913            0 :         }
     914              :         // load and parse the file
     915            0 :         let contents = std::fs::read_to_string(config_path).with_context(|| {
     916            0 :             format!(
     917            0 :                 "Could not read configuration file '{}'",
     918            0 :                 config_path.display()
     919            0 :             )
     920            0 :         })?;
     921            0 :         toml_edit::de::from_str(&contents)?
     922              :     } else {
     923              :         // User (likely interactive) did not provide a description of the environment, give them the default
     924            0 :         NeonLocalInitConf {
     925            0 :             control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
     926            0 :             broker: NeonBroker {
     927            0 :                 listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
     928            0 :             },
     929            0 :             safekeepers: vec![SafekeeperConf {
     930            0 :                 id: DEFAULT_SAFEKEEPER_ID,
     931            0 :                 pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
     932            0 :                 http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
     933            0 :                 ..Default::default()
     934            0 :             }],
     935            0 :             pageservers: (0..args.num_pageservers.unwrap_or(1))
     936            0 :                 .map(|i| {
     937            0 :                     let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
     938            0 :                     let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
     939            0 :                     let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
     940            0 :                     NeonLocalInitPageserverConf {
     941            0 :                         id: pageserver_id,
     942            0 :                         listen_pg_addr: format!("127.0.0.1:{pg_port}"),
     943            0 :                         listen_http_addr: format!("127.0.0.1:{http_port}"),
     944            0 :                         pg_auth_type: AuthType::Trust,
     945            0 :                         http_auth_type: AuthType::Trust,
     946            0 :                         other: Default::default(),
     947            0 :                         // Typical developer machines use disks with slow fsync, and we don't care
     948            0 :                         // about data integrity: disable disk syncs.
     949            0 :                         no_sync: true,
     950            0 :                     }
     951            0 :                 })
     952            0 :                 .collect(),
     953            0 :             pg_distrib_dir: None,
     954            0 :             neon_distrib_dir: None,
     955            0 :             default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
     956            0 :             storage_controller: None,
     957            0 :             control_plane_compute_hook_api: None,
     958            0 :         }
     959              :     };
     960              : 
     961            0 :     LocalEnv::init(init_conf, &args.force)
     962            0 :         .context("materialize initial neon_local environment on disk")?;
     963            0 :     Ok(LocalEnv::load_config(&local_env::base_path())
     964            0 :         .expect("freshly written config should be loadable"))
     965            0 : }
     966              : 
     967              : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
     968              : /// For typical interactive use, one would just run with a single pageserver.  Scenarios with
     969              : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
     970              : /// than this CLI.
     971            0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
     972            0 :     let ps_conf = env
     973            0 :         .pageservers
     974            0 :         .first()
     975            0 :         .expect("Config is validated to contain at least one pageserver");
     976            0 :     PageServerNode::from_env(env, ps_conf)
     977            0 : }
     978              : 
     979            0 : async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
     980            0 :     let pageserver = get_default_pageserver(env);
     981            0 :     match subcmd {
     982              :         TenantCmd::List => {
     983            0 :             for t in pageserver.tenant_list().await? {
     984            0 :                 println!("{} {:?}", t.id, t.state);
     985            0 :             }
     986              :         }
     987            0 :         TenantCmd::Import(args) => {
     988            0 :             let tenant_id = args.tenant_id;
     989            0 : 
     990            0 :             let storage_controller = StorageController::from_env(env);
     991            0 :             let create_response = storage_controller.tenant_import(tenant_id).await?;
     992              : 
     993            0 :             let shard_zero = create_response
     994            0 :                 .shards
     995            0 :                 .first()
     996            0 :                 .expect("Import response omitted shards");
     997            0 : 
     998            0 :             let attached_pageserver_id = shard_zero.node_id;
     999            0 :             let pageserver =
    1000            0 :                 PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
    1001              : 
    1002            0 :             println!(
    1003            0 :                 "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
    1004            0 :             );
    1005              : 
    1006            0 :             let timelines = pageserver
    1007            0 :                 .http_client
    1008            0 :                 .list_timelines(shard_zero.shard_id)
    1009            0 :                 .await?;
    1010              : 
    1011              :             // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
    1012            0 :             let main_timeline = timelines
    1013            0 :                 .iter()
    1014            0 :                 .find(|t| t.ancestor_timeline_id.is_none())
    1015            0 :                 .expect("No timelines found")
    1016            0 :                 .timeline_id;
    1017            0 : 
    1018            0 :             let mut branch_i = 0;
    1019            0 :             for timeline in timelines.iter() {
    1020            0 :                 let branch_name = if timeline.timeline_id == main_timeline {
    1021            0 :                     "main".to_string()
    1022              :                 } else {
    1023            0 :                     branch_i += 1;
    1024            0 :                     format!("branch_{branch_i}")
    1025              :                 };
    1026              : 
    1027            0 :                 println!(
    1028            0 :                     "Importing timeline {tenant_id}/{} as branch {branch_name}",
    1029            0 :                     timeline.timeline_id
    1030            0 :                 );
    1031            0 : 
    1032            0 :                 env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
    1033              :             }
    1034              :         }
    1035            0 :         TenantCmd::Create(args) => {
    1036            0 :             let tenant_conf: HashMap<_, _> =
    1037            0 :                 args.config.iter().flat_map(|c| c.split_once(':')).collect();
    1038              : 
    1039            0 :             let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
    1040              : 
    1041              :             // If tenant ID was not specified, generate one
    1042            0 :             let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate);
    1043            0 : 
    1044            0 :             // We must register the tenant with the storage controller, so
    1045            0 :             // that when the pageserver restarts, it will be re-attached.
    1046            0 :             let storage_controller = StorageController::from_env(env);
    1047            0 :             storage_controller
    1048            0 :                 .tenant_create(TenantCreateRequest {
    1049            0 :                     // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
    1050            0 :                     // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest
    1051            0 :                     // type is used both in the storage controller (for creating tenants) and in the pageserver (for
    1052            0 :                     // creating shards)
    1053            0 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
    1054            0 :                     generation: None,
    1055            0 :                     shard_parameters: ShardParameters {
    1056            0 :                         count: ShardCount::new(args.shard_count),
    1057            0 :                         stripe_size: args
    1058            0 :                             .shard_stripe_size
    1059            0 :                             .map(ShardStripeSize)
    1060            0 :                             .unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
    1061            0 :                     },
    1062            0 :                     placement_policy: args.placement_policy.clone(),
    1063            0 :                     config: tenant_conf,
    1064            0 :                 })
    1065            0 :                 .await?;
    1066            0 :             println!("tenant {tenant_id} successfully created on the pageserver");
    1067            0 : 
    1068            0 :             // Create an initial timeline for the new tenant
    1069            0 :             let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
    1070            0 : 
    1071            0 :             // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
    1072            0 :             // different shards picking different start lsns.  Maybe we have to teach storage controller
    1073            0 :             // to let shard 0 branch first and then propagate the chosen LSN to other shards.
    1074            0 :             storage_controller
    1075            0 :                 .tenant_timeline_create(
    1076            0 :                     tenant_id,
    1077            0 :                     TimelineCreateRequest {
    1078            0 :                         new_timeline_id,
    1079            0 :                         mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
    1080            0 :                             existing_initdb_timeline_id: None,
    1081            0 :                             pg_version: Some(args.pg_version),
    1082            0 :                         },
    1083            0 :                     },
    1084            0 :                 )
    1085            0 :                 .await?;
    1086              : 
    1087            0 :             env.register_branch_mapping(
    1088            0 :                 DEFAULT_BRANCH_NAME.to_string(),
    1089            0 :                 tenant_id,
    1090            0 :                 new_timeline_id,
    1091            0 :             )?;
    1092              : 
    1093            0 :             println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
    1094            0 : 
    1095            0 :             if args.set_default {
    1096            0 :                 println!("Setting tenant {tenant_id} as a default one");
    1097            0 :                 env.default_tenant_id = Some(tenant_id);
    1098            0 :             }
    1099              :         }
    1100            0 :         TenantCmd::SetDefault(args) => {
    1101            0 :             println!("Setting tenant {} as a default one", args.tenant_id);
    1102            0 :             env.default_tenant_id = Some(args.tenant_id);
    1103            0 :         }
    1104            0 :         TenantCmd::Config(args) => {
    1105            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1106            0 :             let tenant_conf: HashMap<_, _> =
    1107            0 :                 args.config.iter().flat_map(|c| c.split_once(':')).collect();
    1108            0 : 
    1109            0 :             pageserver
    1110            0 :                 .tenant_config(tenant_id, tenant_conf)
    1111            0 :                 .await
    1112            0 :                 .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
    1113            0 :             println!("tenant {tenant_id} successfully configured on the pageserver");
    1114              :         }
    1115              :     }
    1116            0 :     Ok(())
    1117            0 : }
    1118              : 
    1119            0 : async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> {
    1120            0 :     let pageserver = get_default_pageserver(env);
    1121            0 : 
    1122            0 :     match cmd {
    1123            0 :         TimelineCmd::List(args) => {
    1124              :             // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
    1125              :             // where shard 0 is attached, and query there.
    1126            0 :             let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
    1127            0 :             let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
    1128            0 :             print_timelines_tree(timelines, env.timeline_name_mappings())?;
    1129              :         }
    1130            0 :         TimelineCmd::Create(args) => {
    1131            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1132            0 :             let new_branch_name = &args.branch_name;
    1133            0 :             let new_timeline_id_opt = args.timeline_id;
    1134            0 :             let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
    1135            0 : 
    1136            0 :             let storage_controller = StorageController::from_env(env);
    1137            0 :             let create_req = TimelineCreateRequest {
    1138            0 :                 new_timeline_id,
    1139            0 :                 mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
    1140            0 :                     existing_initdb_timeline_id: None,
    1141            0 :                     pg_version: Some(args.pg_version),
    1142            0 :                 },
    1143            0 :             };
    1144            0 :             let timeline_info = storage_controller
    1145            0 :                 .tenant_timeline_create(tenant_id, create_req)
    1146            0 :                 .await?;
    1147              : 
    1148            0 :             let last_record_lsn = timeline_info.last_record_lsn;
    1149            0 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
    1150              : 
    1151            0 :             println!(
    1152            0 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
    1153            0 :                 timeline_info.timeline_id
    1154            0 :             );
    1155              :         }
    1156            0 :         TimelineCmd::Import(args) => {
    1157            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1158            0 :             let timeline_id = args.timeline_id;
    1159            0 :             let branch_name = &args.branch_name;
    1160            0 : 
    1161            0 :             // Parse base inputs
    1162            0 :             let base = (args.base_lsn, args.base_tarfile.clone());
    1163            0 : 
    1164            0 :             // Parse pg_wal inputs
    1165            0 :             let wal_tarfile = args.wal_tarfile.clone();
    1166            0 :             let end_lsn = args.end_lsn;
    1167            0 :             // TODO validate both or none are provided
    1168            0 :             let pg_wal = end_lsn.zip(wal_tarfile);
    1169            0 : 
    1170            0 :             println!("Importing timeline into pageserver ...");
    1171            0 :             pageserver
    1172            0 :                 .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
    1173            0 :                 .await?;
    1174            0 :             env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
    1175            0 :             println!("Done");
    1176              :         }
    1177            0 :         TimelineCmd::Branch(args) => {
    1178            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1179            0 :             let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
    1180            0 :             let new_branch_name = &args.branch_name;
    1181            0 :             let ancestor_branch_name = args
    1182            0 :                 .ancestor_branch_name
    1183            0 :                 .clone()
    1184            0 :                 .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
    1185            0 :             let ancestor_timeline_id = env
    1186            0 :                 .get_branch_timeline_id(&ancestor_branch_name, tenant_id)
    1187            0 :                 .ok_or_else(|| {
    1188            0 :                     anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
    1189            0 :                 })?;
    1190              : 
    1191            0 :             let start_lsn = args.ancestor_start_lsn;
    1192            0 :             let storage_controller = StorageController::from_env(env);
    1193            0 :             let create_req = TimelineCreateRequest {
    1194            0 :                 new_timeline_id,
    1195            0 :                 mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
    1196            0 :                     ancestor_timeline_id,
    1197            0 :                     ancestor_start_lsn: start_lsn,
    1198            0 :                     pg_version: None,
    1199            0 :                 },
    1200            0 :             };
    1201            0 :             let timeline_info = storage_controller
    1202            0 :                 .tenant_timeline_create(tenant_id, create_req)
    1203            0 :                 .await?;
    1204              : 
    1205            0 :             let last_record_lsn = timeline_info.last_record_lsn;
    1206            0 : 
    1207            0 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
    1208              : 
    1209            0 :             println!(
    1210            0 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
    1211            0 :                 timeline_info.timeline_id
    1212            0 :             );
    1213              :         }
    1214              :     }
    1215              : 
    1216            0 :     Ok(())
    1217            0 : }
    1218              : 
    1219            0 : async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> {
    1220            0 :     let mut cplane = ComputeControlPlane::load(env.clone())?;
    1221              : 
    1222            0 :     match subcmd {
    1223            0 :         EndpointCmd::List(args) => {
    1224              :             // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
    1225              :             // where shard 0 is attached, and query there.
    1226            0 :             let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
    1227            0 :             let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
    1228            0 :                 .await
    1229            0 :                 .unwrap_or_else(|e| {
    1230            0 :                     eprintln!("Failed to load timeline info: {}", e);
    1231            0 :                     HashMap::new()
    1232            0 :                 });
    1233            0 : 
    1234            0 :             let timeline_name_mappings = env.timeline_name_mappings();
    1235            0 : 
    1236            0 :             let mut table = comfy_table::Table::new();
    1237            0 : 
    1238            0 :             table.load_preset(comfy_table::presets::NOTHING);
    1239            0 : 
    1240            0 :             table.set_header([
    1241            0 :                 "ENDPOINT",
    1242            0 :                 "ADDRESS",
    1243            0 :                 "TIMELINE",
    1244            0 :                 "BRANCH NAME",
    1245            0 :                 "LSN",
    1246            0 :                 "STATUS",
    1247            0 :             ]);
    1248              : 
    1249            0 :             for (endpoint_id, endpoint) in cplane
    1250            0 :                 .endpoints
    1251            0 :                 .iter()
    1252            0 :                 .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
    1253              :             {
    1254            0 :                 let lsn_str = match endpoint.mode {
    1255            0 :                     ComputeMode::Static(lsn) => {
    1256            0 :                         // -> read-only endpoint
    1257            0 :                         // Use the node's LSN.
    1258            0 :                         lsn.to_string()
    1259              :                     }
    1260              :                     _ => {
    1261              :                         // -> primary endpoint or hot replica
    1262              :                         // Use the LSN at the end of the timeline.
    1263            0 :                         timeline_infos
    1264            0 :                             .get(&endpoint.timeline_id)
    1265            0 :                             .map(|bi| bi.last_record_lsn.to_string())
    1266            0 :                             .unwrap_or_else(|| "?".to_string())
    1267              :                     }
    1268              :                 };
    1269              : 
    1270            0 :                 let branch_name = timeline_name_mappings
    1271            0 :                     .get(&TenantTimelineId::new(
    1272            0 :                         tenant_shard_id.tenant_id,
    1273            0 :                         endpoint.timeline_id,
    1274            0 :                     ))
    1275            0 :                     .map(|name| name.as_str())
    1276            0 :                     .unwrap_or("?");
    1277            0 : 
    1278            0 :                 table.add_row([
    1279            0 :                     endpoint_id.as_str(),
    1280            0 :                     &endpoint.pg_address.to_string(),
    1281            0 :                     &endpoint.timeline_id.to_string(),
    1282            0 :                     branch_name,
    1283            0 :                     lsn_str.as_str(),
    1284            0 :                     &format!("{}", endpoint.status()),
    1285            0 :                 ]);
    1286            0 :             }
    1287              : 
    1288            0 :             println!("{table}");
    1289              :         }
    1290            0 :         EndpointCmd::Create(args) => {
    1291            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1292            0 :             let branch_name = args
    1293            0 :                 .branch_name
    1294            0 :                 .clone()
    1295            0 :                 .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
    1296            0 :             let endpoint_id = args
    1297            0 :                 .endpoint_id
    1298            0 :                 .clone()
    1299            0 :                 .unwrap_or_else(|| format!("ep-{branch_name}"));
    1300              : 
    1301            0 :             let timeline_id = env
    1302            0 :                 .get_branch_timeline_id(&branch_name, tenant_id)
    1303            0 :                 .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
    1304              : 
    1305            0 :             let mode = match (args.lsn, args.hot_standby) {
    1306            0 :                 (Some(lsn), false) => ComputeMode::Static(lsn),
    1307            0 :                 (None, true) => ComputeMode::Replica,
    1308            0 :                 (None, false) => ComputeMode::Primary,
    1309            0 :                 (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
    1310              :             };
    1311              : 
    1312            0 :             match (mode, args.hot_standby) {
    1313              :                 (ComputeMode::Static(_), true) => {
    1314            0 :                     bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
    1315              :                 }
    1316              :                 (ComputeMode::Primary, true) => {
    1317            0 :                     bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
    1318              :                 }
    1319            0 :                 _ => {}
    1320            0 :             }
    1321            0 : 
    1322            0 :             if !args.allow_multiple {
    1323            0 :                 cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
    1324            0 :             }
    1325              : 
    1326            0 :             cplane.new_endpoint(
    1327            0 :                 &endpoint_id,
    1328            0 :                 tenant_id,
    1329            0 :                 timeline_id,
    1330            0 :                 args.pg_port,
    1331            0 :                 args.http_port,
    1332            0 :                 args.pg_version,
    1333            0 :                 mode,
    1334            0 :                 !args.update_catalog,
    1335            0 :             )?;
    1336              :         }
    1337            0 :         EndpointCmd::Start(args) => {
    1338            0 :             let endpoint_id = &args.endpoint_id;
    1339            0 :             let pageserver_id = args.endpoint_pageserver_id;
    1340            0 :             let remote_ext_config = &args.remote_ext_config;
    1341              : 
    1342              :             // If --safekeepers argument is given, use only the listed
    1343              :             // safekeeper nodes; otherwise all from the env.
    1344            0 :             let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
    1345            0 :                 safekeepers
    1346              :             } else {
    1347            0 :                 env.safekeepers.iter().map(|sk| sk.id).collect()
    1348              :             };
    1349              : 
    1350            0 :             let endpoint = cplane
    1351            0 :                 .endpoints
    1352            0 :                 .get(endpoint_id.as_str())
    1353            0 :                 .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
    1354              : 
    1355            0 :             if !args.allow_multiple {
    1356            0 :                 cplane.check_conflicting_endpoints(
    1357            0 :                     endpoint.mode,
    1358            0 :                     endpoint.tenant_id,
    1359            0 :                     endpoint.timeline_id,
    1360            0 :                 )?;
    1361            0 :             }
    1362              : 
    1363            0 :             let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
    1364            0 :                 let conf = env.get_pageserver_conf(pageserver_id).unwrap();
    1365            0 :                 let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
    1366            0 :                 (
    1367            0 :                     vec![(parsed.0, parsed.1.unwrap_or(5432))],
    1368            0 :                     // If caller is telling us what pageserver to use, this is not a tenant which is
    1369            0 :                     // full managed by storage controller, therefore not sharded.
    1370            0 :                     ShardParameters::DEFAULT_STRIPE_SIZE,
    1371            0 :                 )
    1372              :             } else {
    1373              :                 // Look up the currently attached location of the tenant, and its striping metadata,
    1374              :                 // to pass these on to postgres.
    1375            0 :                 let storage_controller = StorageController::from_env(env);
    1376            0 :                 let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
    1377            0 :                 let pageservers = futures::future::try_join_all(
    1378            0 :                     locate_result.shards.into_iter().map(|shard| async move {
    1379            0 :                         if let ComputeMode::Static(lsn) = endpoint.mode {
    1380              :                             // Initialize LSN leases for static computes.
    1381            0 :                             let conf = env.get_pageserver_conf(shard.node_id).unwrap();
    1382            0 :                             let pageserver = PageServerNode::from_env(env, conf);
    1383            0 : 
    1384            0 :                             pageserver
    1385            0 :                                 .http_client
    1386            0 :                                 .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
    1387            0 :                                 .await?;
    1388            0 :                         }
    1389              : 
    1390            0 :                         anyhow::Ok((
    1391            0 :                             Host::parse(&shard.listen_pg_addr)
    1392            0 :                                 .expect("Storage controller reported bad hostname"),
    1393            0 :                             shard.listen_pg_port,
    1394            0 :                         ))
    1395            0 :                     }),
    1396            0 :                 )
    1397            0 :                 .await?;
    1398            0 :                 let stripe_size = locate_result.shard_params.stripe_size;
    1399            0 : 
    1400            0 :                 (pageservers, stripe_size)
    1401              :             };
    1402            0 :             assert!(!pageservers.is_empty());
    1403              : 
    1404            0 :             let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
    1405            0 :             let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
    1406            0 :                 let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
    1407            0 : 
    1408            0 :                 Some(env.generate_auth_token(&claims)?)
    1409              :             } else {
    1410            0 :                 None
    1411              :             };
    1412              : 
    1413            0 :             println!("Starting existing endpoint {endpoint_id}...");
    1414            0 :             endpoint
    1415            0 :                 .start(
    1416            0 :                     &auth_token,
    1417            0 :                     safekeepers,
    1418            0 :                     pageservers,
    1419            0 :                     remote_ext_config.as_ref(),
    1420            0 :                     stripe_size.0 as usize,
    1421            0 :                     args.create_test_user,
    1422            0 :                 )
    1423            0 :                 .await?;
    1424              :         }
    1425            0 :         EndpointCmd::Reconfigure(args) => {
    1426            0 :             let endpoint_id = &args.endpoint_id;
    1427            0 :             let endpoint = cplane
    1428            0 :                 .endpoints
    1429            0 :                 .get(endpoint_id.as_str())
    1430            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
    1431            0 :             let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
    1432            0 :                 let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
    1433            0 :                 vec![(
    1434            0 :                     pageserver.pg_connection_config.host().clone(),
    1435            0 :                     pageserver.pg_connection_config.port(),
    1436            0 :                 )]
    1437              :             } else {
    1438            0 :                 let storage_controller = StorageController::from_env(env);
    1439            0 :                 storage_controller
    1440            0 :                     .tenant_locate(endpoint.tenant_id)
    1441            0 :                     .await?
    1442              :                     .shards
    1443            0 :                     .into_iter()
    1444            0 :                     .map(|shard| {
    1445            0 :                         (
    1446            0 :                             Host::parse(&shard.listen_pg_addr)
    1447            0 :                                 .expect("Storage controller reported malformed host"),
    1448            0 :                             shard.listen_pg_port,
    1449            0 :                         )
    1450            0 :                     })
    1451            0 :                     .collect::<Vec<_>>()
    1452              :             };
    1453              :             // If --safekeepers argument is given, use only the listed
    1454              :             // safekeeper nodes; otherwise all from the env.
    1455            0 :             let safekeepers = parse_safekeepers(&args.safekeepers)?;
    1456            0 :             endpoint.reconfigure(pageservers, None, safekeepers).await?;
    1457              :         }
    1458            0 :         EndpointCmd::Stop(args) => {
    1459            0 :             let endpoint_id = &args.endpoint_id;
    1460            0 :             let endpoint = cplane
    1461            0 :                 .endpoints
    1462            0 :                 .get(endpoint_id)
    1463            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
    1464            0 :             endpoint.stop(&args.mode, args.destroy)?;
    1465              :         }
    1466              :     }
    1467              : 
    1468            0 :     Ok(())
    1469            0 : }
    1470              : 
    1471              : /// Parse --safekeepers as list of safekeeper ids.
    1472            0 : fn parse_safekeepers(safekeepers_str: &Option<String>) -> Result<Option<Vec<NodeId>>> {
    1473            0 :     if let Some(safekeepers_str) = safekeepers_str {
    1474            0 :         let mut safekeepers: Vec<NodeId> = Vec::new();
    1475            0 :         for sk_id in safekeepers_str.split(',').map(str::trim) {
    1476            0 :             let sk_id = NodeId(
    1477            0 :                 u64::from_str(sk_id)
    1478            0 :                     .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
    1479              :             );
    1480            0 :             safekeepers.push(sk_id);
    1481              :         }
    1482            0 :         Ok(Some(safekeepers))
    1483              :     } else {
    1484            0 :         Ok(None)
    1485              :     }
    1486            0 : }
    1487              : 
    1488            0 : fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> {
    1489            0 :     match subcmd {
    1490            0 :         MappingsCmd::Map(args) => {
    1491            0 :             env.register_branch_mapping(
    1492            0 :                 args.branch_name.to_owned(),
    1493            0 :                 args.tenant_id,
    1494            0 :                 args.timeline_id,
    1495            0 :             )?;
    1496              : 
    1497            0 :             Ok(())
    1498              :         }
    1499              :     }
    1500            0 : }
    1501              : 
    1502            0 : fn get_pageserver(
    1503            0 :     env: &local_env::LocalEnv,
    1504            0 :     pageserver_id_arg: Option<NodeId>,
    1505            0 : ) -> Result<PageServerNode> {
    1506            0 :     let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID);
    1507            0 : 
    1508            0 :     Ok(PageServerNode::from_env(
    1509            0 :         env,
    1510            0 :         env.get_pageserver_conf(node_id)?,
    1511              :     ))
    1512            0 : }
    1513              : 
    1514            0 : async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> {
    1515            0 :     match subcmd {
    1516            0 :         PageserverCmd::Start(args) => {
    1517            0 :             if let Err(e) = get_pageserver(env, args.pageserver_id)?
    1518            0 :                 .start(&args.start_timeout)
    1519            0 :                 .await
    1520              :             {
    1521            0 :                 eprintln!("pageserver start failed: {e}");
    1522            0 :                 exit(1);
    1523            0 :             }
    1524              :         }
    1525              : 
    1526            0 :         PageserverCmd::Stop(args) => {
    1527            0 :             let immediate = match args.stop_mode {
    1528            0 :                 StopMode::Fast => false,
    1529            0 :                 StopMode::Immediate => true,
    1530              :             };
    1531            0 :             if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
    1532            0 :                 eprintln!("pageserver stop failed: {}", e);
    1533            0 :                 exit(1);
    1534            0 :             }
    1535              :         }
    1536              : 
    1537            0 :         PageserverCmd::Restart(args) => {
    1538            0 :             let pageserver = get_pageserver(env, args.pageserver_id)?;
    1539              :             //TODO what shutdown strategy should we use here?
    1540            0 :             if let Err(e) = pageserver.stop(false) {
    1541            0 :                 eprintln!("pageserver stop failed: {}", e);
    1542            0 :                 exit(1);
    1543            0 :             }
    1544              : 
    1545            0 :             if let Err(e) = pageserver.start(&args.start_timeout).await {
    1546            0 :                 eprintln!("pageserver start failed: {e}");
    1547            0 :                 exit(1);
    1548            0 :             }
    1549              :         }
    1550              : 
    1551            0 :         PageserverCmd::Status(args) => {
    1552            0 :             match get_pageserver(env, args.pageserver_id)?
    1553            0 :                 .check_status()
    1554            0 :                 .await
    1555              :             {
    1556            0 :                 Ok(_) => println!("Page server is up and running"),
    1557            0 :                 Err(err) => {
    1558            0 :                     eprintln!("Page server is not available: {}", err);
    1559            0 :                     exit(1);
    1560              :                 }
    1561              :             }
    1562              :         }
    1563              :     }
    1564            0 :     Ok(())
    1565            0 : }
    1566              : 
    1567            0 : async fn handle_storage_controller(
    1568            0 :     subcmd: &StorageControllerCmd,
    1569            0 :     env: &local_env::LocalEnv,
    1570            0 : ) -> Result<()> {
    1571            0 :     let svc = StorageController::from_env(env);
    1572            0 :     match subcmd {
    1573            0 :         StorageControllerCmd::Start(args) => {
    1574            0 :             let start_args = NeonStorageControllerStartArgs {
    1575            0 :                 instance_id: args.instance_id,
    1576            0 :                 base_port: args.base_port,
    1577            0 :                 start_timeout: args.start_timeout,
    1578            0 :             };
    1579              : 
    1580            0 :             if let Err(e) = svc.start(start_args).await {
    1581            0 :                 eprintln!("start failed: {e}");
    1582            0 :                 exit(1);
    1583            0 :             }
    1584              :         }
    1585              : 
    1586            0 :         StorageControllerCmd::Stop(args) => {
    1587            0 :             let stop_args = NeonStorageControllerStopArgs {
    1588            0 :                 instance_id: args.instance_id,
    1589            0 :                 immediate: match args.stop_mode {
    1590            0 :                     StopMode::Fast => false,
    1591            0 :                     StopMode::Immediate => true,
    1592              :                 },
    1593              :             };
    1594            0 :             if let Err(e) = svc.stop(stop_args).await {
    1595            0 :                 eprintln!("stop failed: {}", e);
    1596            0 :                 exit(1);
    1597            0 :             }
    1598              :         }
    1599              :     }
    1600            0 :     Ok(())
    1601            0 : }
    1602              : 
    1603            0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
    1604            0 :     if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
    1605            0 :         Ok(SafekeeperNode::from_env(env, node))
    1606              :     } else {
    1607            0 :         bail!("could not find safekeeper {id}")
    1608              :     }
    1609            0 : }
    1610              : 
    1611            0 : async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> {
    1612            0 :     match subcmd {
    1613            0 :         SafekeeperCmd::Start(args) => {
    1614            0 :             let safekeeper = get_safekeeper(env, args.id)?;
    1615              : 
    1616            0 :             if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
    1617            0 :                 eprintln!("safekeeper start failed: {}", e);
    1618            0 :                 exit(1);
    1619            0 :             }
    1620              :         }
    1621              : 
    1622            0 :         SafekeeperCmd::Stop(args) => {
    1623            0 :             let safekeeper = get_safekeeper(env, args.id)?;
    1624            0 :             let immediate = match args.stop_mode {
    1625            0 :                 StopMode::Fast => false,
    1626            0 :                 StopMode::Immediate => true,
    1627              :             };
    1628            0 :             if let Err(e) = safekeeper.stop(immediate) {
    1629            0 :                 eprintln!("safekeeper stop failed: {}", e);
    1630            0 :                 exit(1);
    1631            0 :             }
    1632              :         }
    1633              : 
    1634            0 :         SafekeeperCmd::Restart(args) => {
    1635            0 :             let safekeeper = get_safekeeper(env, args.id)?;
    1636            0 :             let immediate = match args.stop_mode {
    1637            0 :                 StopMode::Fast => false,
    1638            0 :                 StopMode::Immediate => true,
    1639              :             };
    1640              : 
    1641            0 :             if let Err(e) = safekeeper.stop(immediate) {
    1642            0 :                 eprintln!("safekeeper stop failed: {}", e);
    1643            0 :                 exit(1);
    1644            0 :             }
    1645              : 
    1646            0 :             if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
    1647            0 :                 eprintln!("safekeeper start failed: {}", e);
    1648            0 :                 exit(1);
    1649            0 :             }
    1650              :         }
    1651              :     }
    1652            0 :     Ok(())
    1653            0 : }
    1654              : 
    1655            0 : async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
    1656            0 :     match subcmd {
    1657            0 :         StorageBrokerCmd::Start(args) => {
    1658            0 :             if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
    1659            0 :                 eprintln!("broker start failed: {e}");
    1660            0 :                 exit(1);
    1661            0 :             }
    1662              :         }
    1663              : 
    1664            0 :         StorageBrokerCmd::Stop(_args) => {
    1665              :             // FIXME: stop_mode unused
    1666            0 :             if let Err(e) = broker::stop_broker_process(env) {
    1667            0 :                 eprintln!("broker stop failed: {e}");
    1668            0 :                 exit(1);
    1669            0 :             }
    1670              :         }
    1671              :     }
    1672            0 :     Ok(())
    1673            0 : }
    1674              : 
    1675            0 : async fn handle_start_all(
    1676            0 :     args: &StartCmdArgs,
    1677            0 :     env: &'static local_env::LocalEnv,
    1678            0 : ) -> anyhow::Result<()> {
    1679              :     // FIXME: this was called "retry_timeout", is it right?
    1680            0 :     let Err(errors) = handle_start_all_impl(env, args.timeout).await else {
    1681            0 :         neon_start_status_check(env, args.timeout.as_ref())
    1682            0 :             .await
    1683            0 :             .context("status check after successful startup of all services")?;
    1684            0 :         return Ok(());
    1685              :     };
    1686              : 
    1687            0 :     eprintln!("startup failed because one or more services could not be started");
    1688              : 
    1689            0 :     for e in errors {
    1690            0 :         eprintln!("{e}");
    1691            0 :         let debug_repr = format!("{e:?}");
    1692            0 :         for line in debug_repr.lines() {
    1693            0 :             eprintln!("  {line}");
    1694            0 :         }
    1695              :     }
    1696              : 
    1697            0 :     try_stop_all(env, true).await;
    1698              : 
    1699            0 :     exit(2);
    1700            0 : }
    1701              : 
    1702              : /// Returns Ok() if and only if all services could be started successfully.
    1703              : /// Otherwise, returns the list of errors that occurred during startup.
    1704            0 : async fn handle_start_all_impl(
    1705            0 :     env: &'static local_env::LocalEnv,
    1706            0 :     retry_timeout: humantime::Duration,
    1707            0 : ) -> Result<(), Vec<anyhow::Error>> {
    1708            0 :     // Endpoints are not started automatically
    1709            0 : 
    1710            0 :     let mut js = JoinSet::new();
    1711              : 
    1712              :     // force infalliblity through closure
    1713              :     #[allow(clippy::redundant_closure_call)]
    1714            0 :     (|| {
    1715            0 :         js.spawn(async move {
    1716            0 :             let retry_timeout = retry_timeout;
    1717            0 :             broker::start_broker_process(env, &retry_timeout).await
    1718            0 :         });
    1719            0 : 
    1720            0 :         // Only start the storage controller if the pageserver is configured to need it
    1721            0 :         if env.control_plane_api.is_some() {
    1722            0 :             js.spawn(async move {
    1723            0 :                 let storage_controller = StorageController::from_env(env);
    1724            0 :                 storage_controller
    1725            0 :                     .start(NeonStorageControllerStartArgs::with_default_instance_id(
    1726            0 :                         retry_timeout,
    1727            0 :                     ))
    1728            0 :                     .await
    1729            0 :                     .map_err(|e| e.context("start storage_controller"))
    1730            0 :             });
    1731            0 :         }
    1732              : 
    1733            0 :         for ps_conf in &env.pageservers {
    1734            0 :             js.spawn(async move {
    1735            0 :                 let pageserver = PageServerNode::from_env(env, ps_conf);
    1736            0 :                 pageserver
    1737            0 :                     .start(&retry_timeout)
    1738            0 :                     .await
    1739            0 :                     .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
    1740            0 :             });
    1741            0 :         }
    1742              : 
    1743            0 :         for node in env.safekeepers.iter() {
    1744            0 :             js.spawn(async move {
    1745            0 :                 let safekeeper = SafekeeperNode::from_env(env, node);
    1746            0 :                 safekeeper
    1747            0 :                     .start(&[], &retry_timeout)
    1748            0 :                     .await
    1749            0 :                     .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
    1750            0 :             });
    1751            0 :         }
    1752            0 :     })();
    1753            0 : 
    1754            0 :     let mut errors = Vec::new();
    1755            0 :     while let Some(result) = js.join_next().await {
    1756            0 :         let result = result.expect("we don't panic or cancel the tasks");
    1757            0 :         if let Err(e) = result {
    1758            0 :             errors.push(e);
    1759            0 :         }
    1760              :     }
    1761              : 
    1762            0 :     if !errors.is_empty() {
    1763            0 :         return Err(errors);
    1764            0 :     }
    1765            0 : 
    1766            0 :     Ok(())
    1767            0 : }
    1768              : 
    1769            0 : async fn neon_start_status_check(
    1770            0 :     env: &local_env::LocalEnv,
    1771            0 :     retry_timeout: &Duration,
    1772            0 : ) -> anyhow::Result<()> {
    1773              :     const RETRY_INTERVAL: Duration = Duration::from_millis(100);
    1774              :     const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
    1775              : 
    1776            0 :     if env.control_plane_api.is_none() {
    1777            0 :         return Ok(());
    1778            0 :     }
    1779            0 : 
    1780            0 :     let storcon = StorageController::from_env(env);
    1781            0 : 
    1782            0 :     let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
    1783            0 :     let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
    1784            0 : 
    1785            0 :     println!("\nRunning neon status check");
    1786              : 
    1787            0 :     for retry in 0..retries {
    1788            0 :         if retry == notice_after_retries {
    1789            0 :             println!("\nNeon status check has not passed yet, continuing to wait")
    1790            0 :         }
    1791              : 
    1792            0 :         let mut passed = true;
    1793            0 :         let mut nodes = storcon.node_list().await?;
    1794            0 :         let mut pageservers = env.pageservers.clone();
    1795            0 : 
    1796            0 :         if nodes.len() != pageservers.len() {
    1797            0 :             continue;
    1798            0 :         }
    1799            0 : 
    1800            0 :         nodes.sort_by_key(|ps| ps.id);
    1801            0 :         pageservers.sort_by_key(|ps| ps.id);
    1802              : 
    1803            0 :         for (idx, pageserver) in pageservers.iter().enumerate() {
    1804            0 :             let node = &nodes[idx];
    1805            0 :             if node.id != pageserver.id {
    1806            0 :                 passed = false;
    1807            0 :                 break;
    1808            0 :             }
    1809              : 
    1810            0 :             if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
    1811            0 :                 passed = false;
    1812            0 :                 break;
    1813            0 :             }
    1814              :         }
    1815              : 
    1816            0 :         if passed {
    1817            0 :             println!("\nNeon started and passed status check");
    1818            0 :             return Ok(());
    1819            0 :         }
    1820            0 : 
    1821            0 :         tokio::time::sleep(RETRY_INTERVAL).await;
    1822              :     }
    1823              : 
    1824            0 :     anyhow::bail!("\nNeon passed status check")
    1825            0 : }
    1826              : 
    1827            0 : async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> {
    1828            0 :     let immediate = match args.mode {
    1829            0 :         StopMode::Fast => false,
    1830            0 :         StopMode::Immediate => true,
    1831              :     };
    1832              : 
    1833            0 :     try_stop_all(env, immediate).await;
    1834              : 
    1835            0 :     Ok(())
    1836            0 : }
    1837              : 
    1838            0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
    1839            0 :     // Stop all endpoints
    1840            0 :     match ComputeControlPlane::load(env.clone()) {
    1841            0 :         Ok(cplane) => {
    1842            0 :             for (_k, node) in cplane.endpoints {
    1843            0 :                 if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
    1844            0 :                     eprintln!("postgres stop failed: {e:#}");
    1845            0 :                 }
    1846              :             }
    1847              :         }
    1848            0 :         Err(e) => {
    1849            0 :             eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
    1850              :         }
    1851              :     }
    1852              : 
    1853            0 :     for ps_conf in &env.pageservers {
    1854            0 :         let pageserver = PageServerNode::from_env(env, ps_conf);
    1855            0 :         if let Err(e) = pageserver.stop(immediate) {
    1856            0 :             eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
    1857            0 :         }
    1858              :     }
    1859              : 
    1860            0 :     for node in env.safekeepers.iter() {
    1861            0 :         let safekeeper = SafekeeperNode::from_env(env, node);
    1862            0 :         if let Err(e) = safekeeper.stop(immediate) {
    1863            0 :             eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
    1864            0 :         }
    1865              :     }
    1866              : 
    1867            0 :     if let Err(e) = broker::stop_broker_process(env) {
    1868            0 :         eprintln!("neon broker stop failed: {e:#}");
    1869            0 :     }
    1870              : 
    1871              :     // Stop all storage controller instances. In the most common case there's only one,
    1872              :     // but iterate though the base data directory in order to discover the instances.
    1873            0 :     let storcon_instances = env
    1874            0 :         .storage_controller_instances()
    1875            0 :         .await
    1876            0 :         .expect("Must inspect data dir");
    1877            0 :     for (instance_id, _instance_dir_path) in storcon_instances {
    1878            0 :         let storage_controller = StorageController::from_env(env);
    1879            0 :         let stop_args = NeonStorageControllerStopArgs {
    1880            0 :             instance_id,
    1881            0 :             immediate,
    1882            0 :         };
    1883              : 
    1884            0 :         if let Err(e) = storage_controller.stop(stop_args).await {
    1885            0 :             eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
    1886            0 :         }
    1887              :     }
    1888            0 : }
        

Generated by: LCOV version 2.1-beta