LCOV - code coverage report
Current view: top level - control_plane/src/bin - neon_local.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 0.0 % 994 0
Test Date: 2025-07-31 15:59:03 Functions: 0.0 % 75 0

            Line data    Source code
       1              : //!
       2              : //! `neon_local` is an executable that can be used to create a local
       3              : //! Neon environment, for testing purposes. The local environment is
       4              : //! quite different from the cloud environment with Kubernetes, but it
       5              : //! easier to work with locally. The python tests in `test_runner`
       6              : //! rely on `neon_local` to set up the environment for each test.
       7              : //!
       8              : use std::borrow::Cow;
       9              : use std::collections::{BTreeSet, HashMap};
      10              : use std::fs::File;
      11              : use std::path::PathBuf;
      12              : use std::process::exit;
      13              : use std::str::FromStr;
      14              : use std::time::Duration;
      15              : 
      16              : use anyhow::{Context, Result, anyhow, bail};
      17              : use clap::Parser;
      18              : use compute_api::requests::ComputeClaimsScope;
      19              : use compute_api::spec::{ComputeMode, PageserverProtocol};
      20              : use control_plane::broker::StorageBroker;
      21              : use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
      22              : use control_plane::endpoint::{
      23              :     local_pageserver_conf_to_conn_info, tenant_locate_response_to_conn_info,
      24              : };
      25              : use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
      26              : use control_plane::local_env;
      27              : use control_plane::local_env::{
      28              :     EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
      29              :     NeonLocalInitPageserverConf, SafekeeperConf,
      30              : };
      31              : use control_plane::pageserver::PageServerNode;
      32              : use control_plane::safekeeper::SafekeeperNode;
      33              : use control_plane::storage_controller::{
      34              :     NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
      35              : };
      36              : use nix::fcntl::{Flock, FlockArg};
      37              : use pageserver_api::config::{
      38              :     DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
      39              :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
      40              :     DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
      41              : };
      42              : use pageserver_api::controller_api::{
      43              :     NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
      44              : };
      45              : use pageserver_api::models::{
      46              :     ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
      47              : };
      48              : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
      49              : use postgres_backend::AuthType;
      50              : use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
      51              : use safekeeper_api::{
      52              :     DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
      53              :     DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, PgMajorVersion, PgVersionId,
      54              : };
      55              : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
      56              : use tokio::task::JoinSet;
      57              : use utils::auth::{Claims, Scope};
      58              : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
      59              : use utils::lsn::Lsn;
      60              : use utils::project_git_version;
      61              : 
      62              : // Default id of a safekeeper node, if not specified on the command line.
      63              : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
      64              : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
      65              : const DEFAULT_BRANCH_NAME: &str = "main";
      66              : project_git_version!(GIT_VERSION);
      67              : 
      68              : #[allow(dead_code)]
      69              : const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
      70              : const DEFAULT_PG_VERSION_NUM: &str = "17";
      71              : 
      72              : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
      73              : 
      74              : /// Neon CLI.
      75              : #[derive(clap::Parser)]
      76              : #[command(version = GIT_VERSION, name = "Neon CLI")]
      77              : struct Cli {
      78              :     #[command(subcommand)]
      79              :     command: NeonLocalCmd,
      80              : }
      81              : 
      82              : #[derive(clap::Subcommand)]
      83              : enum NeonLocalCmd {
      84              :     Init(InitCmdArgs),
      85              : 
      86              :     #[command(subcommand)]
      87              :     Tenant(TenantCmd),
      88              :     #[command(subcommand)]
      89              :     Timeline(TimelineCmd),
      90              :     #[command(subcommand)]
      91              :     Pageserver(PageserverCmd),
      92              :     #[command(subcommand)]
      93              :     #[clap(alias = "storage_controller")]
      94              :     StorageController(StorageControllerCmd),
      95              :     #[command(subcommand)]
      96              :     #[clap(alias = "storage_broker")]
      97              :     StorageBroker(StorageBrokerCmd),
      98              :     #[command(subcommand)]
      99              :     Safekeeper(SafekeeperCmd),
     100              :     #[command(subcommand)]
     101              :     EndpointStorage(EndpointStorageCmd),
     102              :     #[command(subcommand)]
     103              :     Endpoint(EndpointCmd),
     104              :     #[command(subcommand)]
     105              :     Mappings(MappingsCmd),
     106              : 
     107              :     Start(StartCmdArgs),
     108              :     Stop(StopCmdArgs),
     109              : }
     110              : 
     111              : /// Initialize a new Neon repository, preparing configs for services to start with.
     112              : #[derive(clap::Args)]
     113              : struct InitCmdArgs {
     114              :     /// How many pageservers to create (default 1).
     115              :     #[clap(long)]
     116              :     num_pageservers: Option<u16>,
     117              : 
     118              :     #[clap(long)]
     119              :     config: Option<PathBuf>,
     120              : 
     121              :     /// Force initialization even if the repository is not empty.
     122              :     #[clap(long, default_value = "must-not-exist")]
     123              :     #[arg(value_parser)]
     124              :     force: InitForceMode,
     125              : }
     126              : 
     127              : /// Start pageserver and safekeepers.
     128              : #[derive(clap::Args)]
     129              : struct StartCmdArgs {
     130              :     #[clap(long = "start-timeout", default_value = "10s")]
     131              :     timeout: humantime::Duration,
     132              : }
     133              : 
     134              : /// Stop pageserver and safekeepers.
     135              : #[derive(clap::Args)]
     136              : struct StopCmdArgs {
     137              :     #[arg(value_enum)]
     138              :     #[clap(long, default_value_t = StopMode::Fast)]
     139              :     mode: StopMode,
     140              : }
     141              : 
     142              : #[derive(Clone, Copy, clap::ValueEnum)]
     143              : enum StopMode {
     144              :     Fast,
     145              :     Immediate,
     146              : }
     147              : 
     148              : /// Manage tenants.
     149              : #[derive(clap::Subcommand)]
     150              : enum TenantCmd {
     151              :     List,
     152              :     Create(TenantCreateCmdArgs),
     153              :     SetDefault(TenantSetDefaultCmdArgs),
     154              :     Config(TenantConfigCmdArgs),
     155              :     Import(TenantImportCmdArgs),
     156              : }
     157              : 
     158              : #[derive(clap::Args)]
     159              : struct TenantCreateCmdArgs {
     160              :     /// Tenant ID, as a 32-byte hexadecimal string.
     161              :     #[clap(long = "tenant-id")]
     162              :     tenant_id: Option<TenantId>,
     163              : 
     164              :     /// Use a specific timeline id when creating a tenant and its initial timeline.
     165              :     #[clap(long)]
     166              :     timeline_id: Option<TimelineId>,
     167              : 
     168              :     #[clap(short = 'c')]
     169              :     config: Vec<String>,
     170              : 
     171              :     /// Postgres version to use for the initial timeline.
     172              :     #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
     173              :     #[clap(long)]
     174              :     pg_version: PgMajorVersion,
     175              : 
     176              :     /// Use this tenant in future CLI commands where tenant_id is needed, but not specified.
     177              :     #[clap(long)]
     178              :     set_default: bool,
     179              : 
     180              :     /// Number of shards in the new tenant.
     181              :     #[clap(long)]
     182              :     #[arg(default_value_t = 0)]
     183              :     shard_count: u8,
     184              :     /// Sharding stripe size in pages.
     185              :     #[clap(long)]
     186              :     shard_stripe_size: Option<u32>,
     187              : 
     188              :     /// Placement policy shards in this tenant.
     189              :     #[clap(long)]
     190              :     #[arg(value_parser = parse_placement_policy)]
     191              :     placement_policy: Option<PlacementPolicy>,
     192              : }
     193              : 
     194            0 : fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
     195            0 :     Ok(serde_json::from_str::<PlacementPolicy>(s)?)
     196            0 : }
     197              : 
     198              : /// Set a particular tenant as default in future CLI commands where tenant_id is needed, but not
     199              : /// specified.
     200              : #[derive(clap::Args)]
     201              : struct TenantSetDefaultCmdArgs {
     202              :     /// Tenant ID, as a 32-byte hexadecimal string.
     203              :     #[clap(long = "tenant-id")]
     204              :     tenant_id: TenantId,
     205              : }
     206              : 
     207              : #[derive(clap::Args)]
     208              : struct TenantConfigCmdArgs {
     209              :     /// Tenant ID, as a 32-byte hexadecimal string.
     210              :     #[clap(long = "tenant-id")]
     211              :     tenant_id: Option<TenantId>,
     212              : 
     213              :     #[clap(short = 'c')]
     214              :     config: Vec<String>,
     215              : }
     216              : 
     217              : /// Import a tenant that is present in remote storage, and create branches for its timelines.
     218              : #[derive(clap::Args)]
     219              : struct TenantImportCmdArgs {
     220              :     /// Tenant ID, as a 32-byte hexadecimal string.
     221              :     #[clap(long = "tenant-id")]
     222              :     tenant_id: TenantId,
     223              : }
     224              : 
     225              : /// Manage timelines.
     226              : #[derive(clap::Subcommand)]
     227              : enum TimelineCmd {
     228              :     List(TimelineListCmdArgs),
     229              :     Branch(TimelineBranchCmdArgs),
     230              :     Create(TimelineCreateCmdArgs),
     231              :     Import(TimelineImportCmdArgs),
     232              : }
     233              : 
     234              : /// List all timelines available to this pageserver.
     235              : #[derive(clap::Args)]
     236              : struct TimelineListCmdArgs {
     237              :     /// Tenant ID, as a 32-byte hexadecimal string.
     238              :     #[clap(long = "tenant-id")]
     239              :     tenant_shard_id: Option<TenantShardId>,
     240              : }
     241              : 
     242              : /// Create a new timeline, branching off from another timeline.
     243              : #[derive(clap::Args)]
     244              : struct TimelineBranchCmdArgs {
     245              :     /// Tenant ID, as a 32-byte hexadecimal string.
     246              :     #[clap(long = "tenant-id")]
     247              :     tenant_id: Option<TenantId>,
     248              :     /// New timeline's ID, as a 32-byte hexadecimal string.
     249              :     #[clap(long)]
     250              :     timeline_id: Option<TimelineId>,
     251              :     /// Human-readable alias for the new timeline.
     252              :     #[clap(long)]
     253              :     branch_name: String,
     254              :     /// Use last Lsn of another timeline (and its data) as base when creating the new timeline. The
     255              :     /// timeline gets resolved by its branch name.
     256              :     #[clap(long)]
     257              :     ancestor_branch_name: Option<String>,
     258              :     /// When using another timeline as base, use a specific Lsn in it instead of the latest one.
     259              :     #[clap(long)]
     260              :     ancestor_start_lsn: Option<Lsn>,
     261              : }
     262              : 
     263              : /// Create a new blank timeline.
     264              : #[derive(clap::Args)]
     265              : struct TimelineCreateCmdArgs {
     266              :     /// Tenant ID, as a 32-byte hexadecimal string.
     267              :     #[clap(long = "tenant-id")]
     268              :     tenant_id: Option<TenantId>,
     269              :     /// New timeline's ID, as a 32-byte hexadecimal string.
     270              :     #[clap(long)]
     271              :     timeline_id: Option<TimelineId>,
     272              :     /// Human-readable alias for the new timeline.
     273              :     #[clap(long)]
     274              :     branch_name: String,
     275              : 
     276              :     /// Postgres version.
     277              :     #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
     278              :     #[clap(long)]
     279              :     pg_version: PgMajorVersion,
     280              : }
     281              : 
     282              : /// Import a timeline from a basebackup directory.
     283              : #[derive(clap::Args)]
     284              : struct TimelineImportCmdArgs {
     285              :     /// Tenant ID, as a 32-byte hexadecimal string.
     286              :     #[clap(long = "tenant-id")]
     287              :     tenant_id: Option<TenantId>,
     288              :     /// New timeline's ID, as a 32-byte hexadecimal string.
     289              :     #[clap(long)]
     290              :     timeline_id: TimelineId,
     291              :     /// Human-readable alias for the new timeline.
     292              :     #[clap(long)]
     293              :     branch_name: String,
     294              :     /// Basebackup tarfile to import.
     295              :     #[clap(long)]
     296              :     base_tarfile: PathBuf,
     297              :     /// LSN the basebackup starts at.
     298              :     #[clap(long)]
     299              :     base_lsn: Lsn,
     300              :     /// WAL to add after base.
     301              :     #[clap(long)]
     302              :     wal_tarfile: Option<PathBuf>,
     303              :     /// LSN the basebackup ends at.
     304              :     #[clap(long)]
     305              :     end_lsn: Option<Lsn>,
     306              : 
     307              :     /// Postgres version of the basebackup being imported.
     308              :     #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
     309              :     #[clap(long)]
     310              :     pg_version: PgMajorVersion,
     311              : }
     312              : 
     313              : /// Manage pageservers.
     314              : #[derive(clap::Subcommand)]
     315              : enum PageserverCmd {
     316              :     Status(PageserverStatusCmdArgs),
     317              :     Start(PageserverStartCmdArgs),
     318              :     Stop(PageserverStopCmdArgs),
     319              :     Restart(PageserverRestartCmdArgs),
     320              : }
     321              : 
     322              : /// Show status of a local pageserver.
     323              : #[derive(clap::Args)]
     324              : struct PageserverStatusCmdArgs {
     325              :     /// Pageserver ID.
     326              :     #[clap(long = "id")]
     327              :     pageserver_id: Option<NodeId>,
     328              : }
     329              : 
     330              : /// Start local pageserver.
     331              : #[derive(clap::Args)]
     332              : struct PageserverStartCmdArgs {
     333              :     /// Pageserver ID.
     334              :     #[clap(long = "id")]
     335              :     pageserver_id: Option<NodeId>,
     336              :     /// Timeout until we fail the command.
     337              :     #[clap(short = 't', long)]
     338              :     #[arg(default_value = "10s")]
     339              :     start_timeout: humantime::Duration,
     340              : }
     341              : 
     342              : /// Stop local pageserver.
     343              : #[derive(clap::Args)]
     344              : struct PageserverStopCmdArgs {
     345              :     /// Pageserver ID.
     346              :     #[clap(long = "id")]
     347              :     pageserver_id: Option<NodeId>,
     348              :     /// If 'immediate', don't flush repository data at shutdown
     349              :     #[clap(short = 'm')]
     350              :     #[arg(value_enum, default_value = "fast")]
     351              :     stop_mode: StopMode,
     352              : }
     353              : 
     354              : /// Restart local pageserver.
     355              : #[derive(clap::Args)]
     356              : struct PageserverRestartCmdArgs {
     357              :     /// Pageserver ID.
     358              :     #[clap(long = "id")]
     359              :     pageserver_id: Option<NodeId>,
     360              :     /// Timeout until we fail the command.
     361              :     #[clap(short = 't', long)]
     362              :     #[arg(default_value = "10s")]
     363              :     start_timeout: humantime::Duration,
     364              : }
     365              : 
     366              : /// Manage storage controller.
     367              : #[derive(clap::Subcommand)]
     368              : enum StorageControllerCmd {
     369              :     Start(StorageControllerStartCmdArgs),
     370              :     Stop(StorageControllerStopCmdArgs),
     371              : }
     372              : 
     373              : /// Start storage controller.
     374              : #[derive(clap::Args)]
     375              : struct StorageControllerStartCmdArgs {
     376              :     /// Timeout until we fail the command.
     377              :     #[clap(short = 't', long)]
     378              :     #[arg(default_value = "10s")]
     379              :     start_timeout: humantime::Duration,
     380              :     /// Identifier used to distinguish storage controller instances.
     381              :     #[clap(long)]
     382              :     #[arg(default_value_t = 1)]
     383              :     instance_id: u8,
     384              :     /// Base port for the storage controller instance identified by instance-id (defaults to
     385              :     /// pageserver cplane api).
     386              :     #[clap(long)]
     387              :     base_port: Option<u16>,
     388              : 
     389              :     /// Whether the storage controller should handle pageserver-reported local disk loss events.
     390              :     #[clap(long)]
     391              :     handle_ps_local_disk_loss: Option<bool>,
     392              : }
     393              : 
     394              : /// Stop storage controller.
     395              : #[derive(clap::Args)]
     396              : struct StorageControllerStopCmdArgs {
     397              :     /// If 'immediate', don't flush repository data at shutdown
     398              :     #[clap(short = 'm')]
     399              :     #[arg(value_enum, default_value = "fast")]
     400              :     stop_mode: StopMode,
     401              :     /// Identifier used to distinguish storage controller instances.
     402              :     #[clap(long)]
     403              :     #[arg(default_value_t = 1)]
     404              :     instance_id: u8,
     405              : }
     406              : 
     407              : /// Manage storage broker.
     408              : #[derive(clap::Subcommand)]
     409              : enum StorageBrokerCmd {
     410              :     Start(StorageBrokerStartCmdArgs),
     411              :     Stop(StorageBrokerStopCmdArgs),
     412              : }
     413              : 
     414              : /// Start broker.
     415              : #[derive(clap::Args)]
     416              : struct StorageBrokerStartCmdArgs {
     417              :     /// Timeout until we fail the command.
     418              :     #[clap(short = 't', long, default_value = "10s")]
     419              :     start_timeout: humantime::Duration,
     420              : }
     421              : 
     422              : /// Stop broker.
     423              : #[derive(clap::Args)]
     424              : struct StorageBrokerStopCmdArgs {
     425              :     /// If 'immediate', don't flush repository data on shutdown.
     426              :     #[clap(short = 'm')]
     427              :     #[arg(value_enum, default_value = "fast")]
     428              :     stop_mode: StopMode,
     429              : }
     430              : 
     431              : /// Manage safekeepers.
     432              : #[derive(clap::Subcommand)]
     433              : enum SafekeeperCmd {
     434              :     Start(SafekeeperStartCmdArgs),
     435              :     Stop(SafekeeperStopCmdArgs),
     436              :     Restart(SafekeeperRestartCmdArgs),
     437              : }
     438              : 
     439              : /// Manage object storage.
     440              : #[derive(clap::Subcommand)]
     441              : enum EndpointStorageCmd {
     442              :     Start(EndpointStorageStartCmd),
     443              :     Stop(EndpointStorageStopCmd),
     444              : }
     445              : 
     446              : /// Start object storage.
     447              : #[derive(clap::Args)]
     448              : struct EndpointStorageStartCmd {
     449              :     /// Timeout until we fail the command.
     450              :     #[clap(short = 't', long)]
     451              :     #[arg(default_value = "10s")]
     452              :     start_timeout: humantime::Duration,
     453              : }
     454              : 
     455              : /// Stop object storage.
     456              : #[derive(clap::Args)]
     457              : struct EndpointStorageStopCmd {
     458              :     /// If 'immediate', don't flush repository data on shutdown.
     459              :     #[clap(short = 'm')]
     460              :     #[arg(value_enum, default_value = "fast")]
     461              :     stop_mode: StopMode,
     462              : }
     463              : 
     464              : /// Start local safekeeper.
     465              : #[derive(clap::Args)]
     466              : struct SafekeeperStartCmdArgs {
     467              :     /// Safekeeper ID.
     468              :     #[arg(default_value_t = NodeId(1))]
     469              :     id: NodeId,
     470              : 
     471              :     /// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
     472              :     #[clap(short = 'e', long = "safekeeper-extra-opt")]
     473              :     extra_opt: Vec<String>,
     474              : 
     475              :     /// Timeout until we fail the command.
     476              :     #[clap(short = 't', long)]
     477              :     #[arg(default_value = "10s")]
     478              :     start_timeout: humantime::Duration,
     479              : }
     480              : 
     481              : /// Stop local safekeeper.
     482              : #[derive(clap::Args)]
     483              : struct SafekeeperStopCmdArgs {
     484              :     /// Safekeeper ID.
     485              :     #[arg(default_value_t = NodeId(1))]
     486              :     id: NodeId,
     487              : 
     488              :     /// If 'immediate', don't flush repository data on shutdown.
     489              :     #[arg(value_enum, default_value = "fast")]
     490              :     #[clap(short = 'm')]
     491              :     stop_mode: StopMode,
     492              : }
     493              : 
     494              : /// Restart local safekeeper.
     495              : #[derive(clap::Args)]
     496              : struct SafekeeperRestartCmdArgs {
     497              :     /// Safekeeper ID.
     498              :     #[arg(default_value_t = NodeId(1))]
     499              :     id: NodeId,
     500              : 
     501              :     /// If 'immediate', don't flush repository data on shutdown.
     502              :     #[arg(value_enum, default_value = "fast")]
     503              :     #[clap(short = 'm')]
     504              :     stop_mode: StopMode,
     505              : 
     506              :     /// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
     507              :     #[clap(short = 'e', long = "safekeeper-extra-opt")]
     508              :     extra_opt: Vec<String>,
     509              : 
     510              :     /// Timeout until we fail the command.
     511              :     #[clap(short = 't', long)]
     512              :     #[arg(default_value = "10s")]
     513              :     start_timeout: humantime::Duration,
     514              : }
     515              : 
     516              : /// Manage Postgres instances.
     517              : #[derive(clap::Subcommand)]
     518              : enum EndpointCmd {
     519              :     List(EndpointListCmdArgs),
     520              :     Create(EndpointCreateCmdArgs),
     521              :     Start(EndpointStartCmdArgs),
     522              :     Reconfigure(EndpointReconfigureCmdArgs),
     523              :     RefreshConfiguration(EndpointRefreshConfigurationArgs),
     524              :     Stop(EndpointStopCmdArgs),
     525              :     UpdatePageservers(EndpointUpdatePageserversCmdArgs),
     526              :     GenerateJwt(EndpointGenerateJwtCmdArgs),
     527              : }
     528              : 
     529              : /// List endpoints.
     530              : #[derive(clap::Args)]
     531              : struct EndpointListCmdArgs {
     532              :     /// Tenant ID, as a 32-byte hexadecimal string.
     533              :     #[clap(long = "tenant-id")]
     534              :     tenant_shard_id: Option<TenantShardId>,
     535              : }
     536              : 
     537              : /// Create a compute endpoint.
     538              : #[derive(clap::Args)]
     539              : struct EndpointCreateCmdArgs {
     540              :     /// Tenant ID, as a 32-byte hexadecimal string.
     541              :     #[clap(long = "tenant-id")]
     542              :     tenant_id: Option<TenantId>,
     543              :     /// Postgres endpoint ID.
     544              :     endpoint_id: Option<String>,
     545              :     /// Name of the branch the endpoint will run on.
     546              :     #[clap(long)]
     547              :     branch_name: Option<String>,
     548              :     /// Specify LSN on the timeline to start from. By default, end of the timeline would be used.
     549              :     #[clap(long)]
     550              :     lsn: Option<Lsn>,
     551              :     #[clap(long)]
     552              :     pg_port: Option<u16>,
     553              :     #[clap(long, alias = "http-port")]
     554              :     external_http_port: Option<u16>,
     555              :     #[clap(long)]
     556              :     internal_http_port: Option<u16>,
     557              :     #[clap(long = "pageserver-id")]
     558              :     endpoint_pageserver_id: Option<NodeId>,
     559              : 
     560              :     /// Don't do basebackup, create endpoint directory with only config files.
     561              :     #[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
     562              :     config_only: bool,
     563              : 
     564              :     /// Postgres version.
     565              :     #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
     566              :     #[clap(long)]
     567              :     pg_version: PgMajorVersion,
     568              : 
     569              :     /// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
     570              :     ///
     571              :     /// Specified on creation such that it's retained across reconfiguration and restarts.
     572              :     ///
     573              :     /// NB: not yet supported by computes.
     574              :     #[clap(long)]
     575              :     grpc: bool,
     576              : 
     577              :     /// If set, the node will be a hot replica on the specified timeline.
     578              :     #[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
     579              :     hot_standby: bool,
     580              :     /// If set, will set up the catalog for neon_superuser.
     581              :     #[clap(long)]
     582              :     update_catalog: bool,
     583              :     /// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
     584              :     /// useful for tests.
     585              :     #[clap(long)]
     586              :     allow_multiple: bool,
     587              : 
     588              :     /// Name of the privileged role for the endpoint.
     589              :     // Only allow changing it on creation.
     590              :     #[clap(long)]
     591              :     privileged_role_name: Option<String>,
     592              : }
     593              : 
     594              : /// Start Postgres. If the endpoint doesn't exist yet, it is created.
     595              : #[derive(clap::Args)]
     596              : struct EndpointStartCmdArgs {
     597              :     /// Postgres endpoint ID.
     598              :     endpoint_id: String,
     599              :     /// Pageserver ID.
     600              :     #[clap(long = "pageserver-id")]
     601              :     endpoint_pageserver_id: Option<NodeId>,
     602              :     /// Safekeepers membership generation to prefix neon.safekeepers with.
     603              :     #[clap(long)]
     604              :     safekeepers_generation: Option<u32>,
     605              :     /// List of safekeepers endpoint will talk to.
     606              :     #[clap(long)]
     607              :     safekeepers: Option<String>,
     608              :     /// Configure the remote extensions storage proxy gateway URL to request for extensions.
     609              :     #[clap(long, alias = "remote-ext-config")]
     610              :     remote_ext_base_url: Option<String>,
     611              :     /// If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`
     612              :     #[clap(long)]
     613              :     create_test_user: bool,
     614              :     /// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
     615              :     /// useful for tests.
     616              :     #[clap(long)]
     617              :     allow_multiple: bool,
     618              :     /// Timeout until we fail the command.
     619              :     #[clap(short = 't', long, value_parser= humantime::parse_duration)]
     620              :     #[arg(default_value = "90s")]
     621              :     start_timeout: Duration,
     622              : 
     623              :     /// Download LFC cache from endpoint storage on endpoint startup
     624              :     #[clap(long, default_value = "false")]
     625              :     autoprewarm: bool,
     626              : 
     627              :     /// Upload LFC cache to endpoint storage periodically
     628              :     #[clap(long)]
     629              :     offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
     630              : 
     631              :     /// Run in development mode, skipping VM-specific operations like process termination
     632              :     #[clap(long, action = clap::ArgAction::SetTrue)]
     633              :     dev: bool,
     634              : }
     635              : 
     636              : /// Reconfigure an endpoint.
     637              : #[derive(clap::Args)]
     638              : struct EndpointReconfigureCmdArgs {
     639              :     /// Tenant id. Represented as a hexadecimal string 32 symbols length
     640              :     #[clap(long = "tenant-id")]
     641              :     tenant_id: Option<TenantId>,
     642              :     /// Postgres endpoint ID.
     643              :     endpoint_id: String,
     644              :     /// Pageserver ID.
     645              :     #[clap(long = "pageserver-id")]
     646              :     endpoint_pageserver_id: Option<NodeId>,
     647              :     #[clap(long)]
     648              :     safekeepers: Option<String>,
     649              : }
     650              : 
     651              : /// Refresh the endpoint's configuration by forcing it reload it's spec
     652              : #[derive(clap::Args)]
     653              : struct EndpointRefreshConfigurationArgs {
     654              :     /// Postgres endpoint id
     655              :     endpoint_id: String,
     656              : }
     657              : 
     658              : /// Stop an endpoint.
     659              : #[derive(clap::Args)]
     660              : struct EndpointStopCmdArgs {
     661              :     /// Postgres endpoint ID.
     662              :     endpoint_id: String,
     663              :     /// Also delete data directory (now optional, should be default in future).
     664              :     #[clap(long)]
     665              :     destroy: bool,
     666              : 
     667              :     /// Postgres shutdown mode, passed to `pg_ctl -m <mode>`.
     668              :     #[clap(long)]
     669              :     #[clap(default_value = "fast")]
     670              :     mode: EndpointTerminateMode,
     671              : }
     672              : 
     673              : /// Update the pageservers in the spec file of the compute endpoint
     674              : #[derive(clap::Args)]
     675              : struct EndpointUpdatePageserversCmdArgs {
     676              :     /// Postgres endpoint id
     677              :     endpoint_id: String,
     678              : 
     679              :     /// Specified pageserver id
     680              :     #[clap(short = 'p', long)]
     681              :     pageserver_id: Option<NodeId>,
     682              : }
     683              : 
     684              : /// Generate a JWT for an endpoint.
     685              : #[derive(clap::Args)]
     686              : struct EndpointGenerateJwtCmdArgs {
     687              :     /// Postgres endpoint ID.
     688              :     endpoint_id: String,
     689              :     /// Scope to generate the JWT with.
     690              :     #[clap(short = 's', long, value_parser = ComputeClaimsScope::from_str)]
     691              :     scope: Option<ComputeClaimsScope>,
     692              : }
     693              : 
     694              : /// Manage neon_local branch name mappings.
     695              : #[derive(clap::Subcommand)]
     696              : enum MappingsCmd {
     697              :     Map(MappingsMapCmdArgs),
     698              : }
     699              : 
     700              : /// Create new mapping which cannot exist already.
     701              : #[derive(clap::Args)]
     702              : struct MappingsMapCmdArgs {
     703              :     /// Tenant ID, as a 32-byte hexadecimal string.
     704              :     #[clap(long)]
     705              :     tenant_id: TenantId,
     706              :     /// Timeline ID, as a 32-byte hexadecimal string.
     707              :     #[clap(long)]
     708              :     timeline_id: TimelineId,
     709              :     /// Branch name to give to the timeline.
     710              :     #[clap(long)]
     711              :     branch_name: String,
     712              : }
     713              : 
     714              : ///
     715              : /// Timelines tree element used as a value in the HashMap.
     716              : ///
     717              : struct TimelineTreeEl {
     718              :     /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
     719              :     pub info: TimelineInfo,
     720              :     /// Name, recovered from neon config mappings
     721              :     pub name: Option<String>,
     722              :     /// Holds all direct children of this timeline referenced using `timeline_id`.
     723              :     pub children: BTreeSet<TimelineId>,
     724              : }
     725              : 
     726              : /// A flock-based guard over the neon_local repository directory
     727              : struct RepoLock {
     728              :     _file: Flock<File>,
     729              : }
     730              : 
     731              : impl RepoLock {
     732            0 :     fn new() -> Result<Self> {
     733            0 :         let repo_dir = File::open(local_env::base_path())?;
     734            0 :         match Flock::lock(repo_dir, FlockArg::LockExclusive) {
     735            0 :             Ok(f) => Ok(Self { _file: f }),
     736            0 :             Err((_, e)) => Err(e).context("flock error"),
     737              :         }
     738            0 :     }
     739              : }
     740              : 
     741              : // Main entry point for the 'neon_local' CLI utility
     742              : //
     743              : // This utility helps to manage neon installation. That includes following:
     744              : //   * Management of local postgres installations running on top of the
     745              : //     pageserver.
     746              : //   * Providing CLI api to the pageserver
     747              : //   * TODO: export/import to/from usual postgres
     748            0 : fn main() -> Result<()> {
     749            0 :     let cli = Cli::parse();
     750              : 
     751              :     // Check for 'neon init' command first.
     752            0 :     let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
     753            0 :         (handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
     754              :     } else {
     755              :         // This tool uses a collection of simple files to store its state, and consequently
     756              :         // it is not generally safe to run multiple commands concurrently.  Rather than expect
     757              :         // all callers to know this, use a lock file to protect against concurrent execution.
     758            0 :         let _repo_lock = RepoLock::new().unwrap();
     759              : 
     760              :         // all other commands need an existing config
     761            0 :         let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
     762            0 :         let original_env = env.clone();
     763            0 :         let env = Box::leak(Box::new(env));
     764            0 :         let rt = tokio::runtime::Builder::new_current_thread()
     765            0 :             .enable_all()
     766            0 :             .build()
     767            0 :             .unwrap();
     768              : 
     769            0 :         let subcommand_result = match cli.command {
     770            0 :             NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"),
     771            0 :             NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)),
     772            0 :             NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)),
     773            0 :             NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)),
     774            0 :             NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)),
     775            0 :             NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)),
     776            0 :             NeonLocalCmd::StorageController(subcmd) => {
     777            0 :                 rt.block_on(handle_storage_controller(&subcmd, env))
     778              :             }
     779            0 :             NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
     780            0 :             NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
     781            0 :             NeonLocalCmd::EndpointStorage(subcmd) => {
     782            0 :                 rt.block_on(handle_endpoint_storage(&subcmd, env))
     783              :             }
     784            0 :             NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
     785            0 :             NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
     786              :         };
     787              : 
     788            0 :         let subcommand_result = if &original_env != env {
     789            0 :             subcommand_result.map(|()| Some(Cow::Borrowed(env)))
     790              :         } else {
     791            0 :             subcommand_result.map(|()| None)
     792              :         };
     793            0 :         (subcommand_result, Some(_repo_lock))
     794              :     };
     795              : 
     796            0 :     match subcommand_result {
     797            0 :         Ok(Some(updated_env)) => updated_env.persist_config()?,
     798            0 :         Ok(None) => (),
     799            0 :         Err(e) => {
     800            0 :             eprintln!("command failed: {e:?}");
     801            0 :             exit(1);
     802              :         }
     803              :     }
     804            0 :     Ok(())
     805            0 : }
     806              : 
     807              : ///
     808              : /// Prints timelines list as a tree-like structure.
     809              : ///
     810            0 : fn print_timelines_tree(
     811            0 :     timelines: Vec<TimelineInfo>,
     812            0 :     mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
     813            0 : ) -> Result<()> {
     814            0 :     let mut timelines_hash = timelines
     815            0 :         .iter()
     816            0 :         .map(|t| {
     817            0 :             (
     818            0 :                 t.timeline_id,
     819            0 :                 TimelineTreeEl {
     820            0 :                     info: t.clone(),
     821            0 :                     children: BTreeSet::new(),
     822            0 :                     name: timeline_name_mappings
     823            0 :                         .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
     824            0 :                 },
     825            0 :             )
     826            0 :         })
     827            0 :         .collect::<HashMap<_, _>>();
     828              : 
     829              :     // Memorize all direct children of each timeline.
     830            0 :     for timeline in timelines.iter() {
     831            0 :         if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
     832            0 :             timelines_hash
     833            0 :                 .get_mut(&ancestor_timeline_id)
     834            0 :                 .context("missing timeline info in the HashMap")?
     835              :                 .children
     836            0 :                 .insert(timeline.timeline_id);
     837            0 :         }
     838              :     }
     839              : 
     840            0 :     for timeline in timelines_hash.values() {
     841              :         // Start with root local timelines (no ancestors) first.
     842            0 :         if timeline.info.ancestor_timeline_id.is_none() {
     843            0 :             print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
     844            0 :         }
     845              :     }
     846              : 
     847            0 :     Ok(())
     848            0 : }
     849              : 
     850              : ///
     851              : /// Recursively prints timeline info with all its children.
     852              : ///
     853            0 : fn print_timeline(
     854            0 :     nesting_level: usize,
     855            0 :     is_last: &[bool],
     856            0 :     timeline: &TimelineTreeEl,
     857            0 :     timelines: &HashMap<TimelineId, TimelineTreeEl>,
     858            0 : ) -> Result<()> {
     859            0 :     if nesting_level > 0 {
     860            0 :         let ancestor_lsn = match timeline.info.ancestor_lsn {
     861            0 :             Some(lsn) => lsn.to_string(),
     862            0 :             None => "Unknown Lsn".to_string(),
     863              :         };
     864              : 
     865            0 :         let mut br_sym = "┣━";
     866              : 
     867              :         // Draw each nesting padding with proper style
     868              :         // depending on whether its timeline ended or not.
     869            0 :         if nesting_level > 1 {
     870            0 :             for l in &is_last[1..is_last.len() - 1] {
     871            0 :                 if *l {
     872            0 :                     print!("   ");
     873            0 :                 } else {
     874            0 :                     print!("┃  ");
     875            0 :                 }
     876              :             }
     877            0 :         }
     878              : 
     879              :         // We are the last in this sub-timeline
     880            0 :         if *is_last.last().unwrap() {
     881            0 :             br_sym = "┗━";
     882            0 :         }
     883              : 
     884            0 :         print!("{br_sym} @{ancestor_lsn}: ");
     885            0 :     }
     886              : 
     887              :     // Finally print a timeline id and name with new line
     888            0 :     println!(
     889            0 :         "{} [{}]",
     890            0 :         timeline.name.as_deref().unwrap_or("_no_name_"),
     891              :         timeline.info.timeline_id
     892              :     );
     893              : 
     894            0 :     let len = timeline.children.len();
     895            0 :     let mut i: usize = 0;
     896            0 :     let mut is_last_new = Vec::from(is_last);
     897            0 :     is_last_new.push(false);
     898              : 
     899            0 :     for child in &timeline.children {
     900            0 :         i += 1;
     901              : 
     902              :         // Mark that the last padding is the end of the timeline
     903            0 :         if i == len {
     904            0 :             if let Some(last) = is_last_new.last_mut() {
     905            0 :                 *last = true;
     906            0 :             }
     907            0 :         }
     908              : 
     909            0 :         print_timeline(
     910            0 :             nesting_level + 1,
     911            0 :             &is_last_new,
     912            0 :             timelines
     913            0 :                 .get(child)
     914            0 :                 .context("missing timeline info in the HashMap")?,
     915            0 :             timelines,
     916            0 :         )?;
     917              :     }
     918              : 
     919            0 :     Ok(())
     920            0 : }
     921              : 
     922              : /// Helper function to get tenant id from an optional --tenant_id option or from the config file
     923            0 : fn get_tenant_id(
     924            0 :     tenant_id_arg: Option<TenantId>,
     925            0 :     env: &local_env::LocalEnv,
     926            0 : ) -> anyhow::Result<TenantId> {
     927            0 :     if let Some(tenant_id_from_arguments) = tenant_id_arg {
     928            0 :         Ok(tenant_id_from_arguments)
     929            0 :     } else if let Some(default_id) = env.default_tenant_id {
     930            0 :         Ok(default_id)
     931              :     } else {
     932            0 :         anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
     933              :     }
     934            0 : }
     935              : 
     936              : /// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file,
     937              : /// for commands that accept a shard suffix
     938            0 : fn get_tenant_shard_id(
     939            0 :     tenant_shard_id_arg: Option<TenantShardId>,
     940            0 :     env: &local_env::LocalEnv,
     941            0 : ) -> anyhow::Result<TenantShardId> {
     942            0 :     if let Some(tenant_id_from_arguments) = tenant_shard_id_arg {
     943            0 :         Ok(tenant_id_from_arguments)
     944            0 :     } else if let Some(default_id) = env.default_tenant_id {
     945            0 :         Ok(TenantShardId::unsharded(default_id))
     946              :     } else {
     947            0 :         anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
     948              :     }
     949            0 : }
     950              : 
     951            0 : fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
     952              :     // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
     953            0 :     let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
     954              :         // User (likely the Python test suite) provided a description of the environment.
     955            0 :         if args.num_pageservers.is_some() {
     956            0 :             bail!(
     957            0 :                 "Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
     958              :             );
     959            0 :         }
     960              :         // load and parse the file
     961            0 :         let contents = std::fs::read_to_string(config_path).with_context(|| {
     962            0 :             format!(
     963            0 :                 "Could not read configuration file '{}'",
     964            0 :                 config_path.display()
     965              :             )
     966            0 :         })?;
     967            0 :         toml_edit::de::from_str(&contents)?
     968              :     } else {
     969              :         // User (likely interactive) did not provide a description of the environment, give them the default
     970              :         NeonLocalInitConf {
     971            0 :             control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
     972            0 :             broker: NeonBroker {
     973            0 :                 listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
     974            0 :                 listen_https_addr: None,
     975            0 :             },
     976            0 :             safekeepers: vec![SafekeeperConf {
     977            0 :                 id: DEFAULT_SAFEKEEPER_ID,
     978            0 :                 pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
     979            0 :                 http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
     980            0 :                 ..Default::default()
     981            0 :             }],
     982            0 :             pageservers: (0..args.num_pageservers.unwrap_or(1))
     983            0 :                 .map(|i| {
     984            0 :                     let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
     985            0 :                     let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
     986            0 :                     let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
     987            0 :                     let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
     988            0 :                     NeonLocalInitPageserverConf {
     989            0 :                         id: pageserver_id,
     990            0 :                         listen_pg_addr: format!("127.0.0.1:{pg_port}"),
     991            0 :                         listen_http_addr: format!("127.0.0.1:{http_port}"),
     992            0 :                         listen_https_addr: None,
     993            0 :                         listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
     994            0 :                         pg_auth_type: AuthType::Trust,
     995            0 :                         http_auth_type: AuthType::Trust,
     996            0 :                         grpc_auth_type: AuthType::Trust,
     997            0 :                         other: Default::default(),
     998            0 :                         // Typical developer machines use disks with slow fsync, and we don't care
     999            0 :                         // about data integrity: disable disk syncs.
    1000            0 :                         no_sync: true,
    1001            0 :                     }
    1002            0 :                 })
    1003            0 :                 .collect(),
    1004            0 :             endpoint_storage: EndpointStorageConf {
    1005            0 :                 listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
    1006            0 :             },
    1007            0 :             pg_distrib_dir: None,
    1008            0 :             neon_distrib_dir: None,
    1009            0 :             default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
    1010            0 :             storage_controller: None,
    1011            0 :             control_plane_hooks_api: None,
    1012              :             generate_local_ssl_certs: false,
    1013              :         }
    1014              :     };
    1015              : 
    1016            0 :     LocalEnv::init(init_conf, &args.force)
    1017            0 :         .context("materialize initial neon_local environment on disk")?;
    1018            0 :     Ok(LocalEnv::load_config(&local_env::base_path())
    1019            0 :         .expect("freshly written config should be loadable"))
    1020            0 : }
    1021              : 
    1022              : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
    1023              : /// For typical interactive use, one would just run with a single pageserver.  Scenarios with
    1024              : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
    1025              : /// than this CLI.
    1026            0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
    1027            0 :     let ps_conf = env
    1028            0 :         .pageservers
    1029            0 :         .first()
    1030            0 :         .expect("Config is validated to contain at least one pageserver");
    1031            0 :     PageServerNode::from_env(env, ps_conf)
    1032            0 : }
    1033              : 
    1034            0 : async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
    1035            0 :     let pageserver = get_default_pageserver(env);
    1036            0 :     match subcmd {
    1037              :         TenantCmd::List => {
    1038            0 :             for t in pageserver.tenant_list().await? {
    1039            0 :                 println!("{} {:?}", t.id, t.state);
    1040            0 :             }
    1041              :         }
    1042            0 :         TenantCmd::Import(args) => {
    1043            0 :             let tenant_id = args.tenant_id;
    1044              : 
    1045            0 :             let storage_controller = StorageController::from_env(env);
    1046            0 :             let create_response = storage_controller.tenant_import(tenant_id).await?;
    1047              : 
    1048            0 :             let shard_zero = create_response
    1049            0 :                 .shards
    1050            0 :                 .first()
    1051            0 :                 .expect("Import response omitted shards");
    1052              : 
    1053            0 :             let attached_pageserver_id = shard_zero.node_id;
    1054            0 :             let pageserver =
    1055            0 :                 PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
    1056              : 
    1057            0 :             println!(
    1058            0 :                 "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
    1059              :             );
    1060              : 
    1061            0 :             let timelines = pageserver
    1062            0 :                 .http_client
    1063            0 :                 .list_timelines(shard_zero.shard_id)
    1064            0 :                 .await?;
    1065              : 
    1066              :             // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
    1067            0 :             let main_timeline = timelines
    1068            0 :                 .iter()
    1069            0 :                 .find(|t| t.ancestor_timeline_id.is_none())
    1070            0 :                 .expect("No timelines found")
    1071              :                 .timeline_id;
    1072              : 
    1073            0 :             let mut branch_i = 0;
    1074            0 :             for timeline in timelines.iter() {
    1075            0 :                 let branch_name = if timeline.timeline_id == main_timeline {
    1076            0 :                     "main".to_string()
    1077              :                 } else {
    1078            0 :                     branch_i += 1;
    1079            0 :                     format!("branch_{branch_i}")
    1080              :                 };
    1081              : 
    1082            0 :                 println!(
    1083            0 :                     "Importing timeline {tenant_id}/{} as branch {branch_name}",
    1084              :                     timeline.timeline_id
    1085              :                 );
    1086              : 
    1087            0 :                 env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
    1088              :             }
    1089              :         }
    1090            0 :         TenantCmd::Create(args) => {
    1091            0 :             let tenant_conf: HashMap<_, _> =
    1092            0 :                 args.config.iter().flat_map(|c| c.split_once(':')).collect();
    1093              : 
    1094            0 :             let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
    1095              : 
    1096              :             // If tenant ID was not specified, generate one
    1097            0 :             let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate);
    1098              : 
    1099              :             // We must register the tenant with the storage controller, so
    1100              :             // that when the pageserver restarts, it will be re-attached.
    1101            0 :             let storage_controller = StorageController::from_env(env);
    1102            0 :             storage_controller
    1103            0 :                 .tenant_create(TenantCreateRequest {
    1104            0 :                     // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
    1105            0 :                     // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest
    1106            0 :                     // type is used both in the storage controller (for creating tenants) and in the pageserver (for
    1107            0 :                     // creating shards)
    1108            0 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
    1109            0 :                     generation: None,
    1110            0 :                     shard_parameters: ShardParameters {
    1111            0 :                         count: ShardCount::new(args.shard_count),
    1112            0 :                         stripe_size: args
    1113            0 :                             .shard_stripe_size
    1114            0 :                             .map(ShardStripeSize)
    1115            0 :                             .unwrap_or(DEFAULT_STRIPE_SIZE),
    1116            0 :                     },
    1117            0 :                     placement_policy: args.placement_policy.clone(),
    1118            0 :                     config: tenant_conf,
    1119            0 :                 })
    1120            0 :                 .await?;
    1121            0 :             println!("tenant {tenant_id} successfully created on the pageserver");
    1122              : 
    1123              :             // Create an initial timeline for the new tenant
    1124            0 :             let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
    1125              : 
    1126              :             // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
    1127              :             // different shards picking different start lsns.  Maybe we have to teach storage controller
    1128              :             // to let shard 0 branch first and then propagate the chosen LSN to other shards.
    1129            0 :             storage_controller
    1130            0 :                 .tenant_timeline_create(
    1131            0 :                     tenant_id,
    1132            0 :                     TimelineCreateRequest {
    1133            0 :                         new_timeline_id,
    1134            0 :                         mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
    1135            0 :                             existing_initdb_timeline_id: None,
    1136            0 :                             pg_version: Some(args.pg_version),
    1137            0 :                         },
    1138            0 :                     },
    1139            0 :                 )
    1140            0 :                 .await?;
    1141              : 
    1142            0 :             env.register_branch_mapping(
    1143            0 :                 DEFAULT_BRANCH_NAME.to_string(),
    1144            0 :                 tenant_id,
    1145            0 :                 new_timeline_id,
    1146            0 :             )?;
    1147              : 
    1148            0 :             println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
    1149              : 
    1150            0 :             if args.set_default {
    1151            0 :                 println!("Setting tenant {tenant_id} as a default one");
    1152            0 :                 env.default_tenant_id = Some(tenant_id);
    1153            0 :             }
    1154              :         }
    1155            0 :         TenantCmd::SetDefault(args) => {
    1156            0 :             println!("Setting tenant {} as a default one", args.tenant_id);
    1157            0 :             env.default_tenant_id = Some(args.tenant_id);
    1158            0 :         }
    1159            0 :         TenantCmd::Config(args) => {
    1160            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1161            0 :             let tenant_conf: HashMap<_, _> =
    1162            0 :                 args.config.iter().flat_map(|c| c.split_once(':')).collect();
    1163            0 :             let config = PageServerNode::parse_config(tenant_conf)?;
    1164              : 
    1165            0 :             let req = TenantConfigRequest { tenant_id, config };
    1166              : 
    1167            0 :             let storage_controller = StorageController::from_env(env);
    1168            0 :             storage_controller
    1169            0 :                 .set_tenant_config(&req)
    1170            0 :                 .await
    1171            0 :                 .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
    1172            0 :             println!("tenant {tenant_id} successfully configured via storcon");
    1173              :         }
    1174              :     }
    1175            0 :     Ok(())
    1176            0 : }
    1177              : 
    1178            0 : async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> {
    1179            0 :     let pageserver = get_default_pageserver(env);
    1180              : 
    1181            0 :     match cmd {
    1182            0 :         TimelineCmd::List(args) => {
    1183              :             // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
    1184              :             // where shard 0 is attached, and query there.
    1185            0 :             let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
    1186            0 :             let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
    1187            0 :             print_timelines_tree(timelines, env.timeline_name_mappings())?;
    1188              :         }
    1189            0 :         TimelineCmd::Create(args) => {
    1190            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1191            0 :             let new_branch_name = &args.branch_name;
    1192            0 :             let new_timeline_id_opt = args.timeline_id;
    1193            0 :             let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
    1194              : 
    1195            0 :             let storage_controller = StorageController::from_env(env);
    1196            0 :             let create_req = TimelineCreateRequest {
    1197            0 :                 new_timeline_id,
    1198            0 :                 mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
    1199            0 :                     existing_initdb_timeline_id: None,
    1200            0 :                     pg_version: Some(args.pg_version),
    1201            0 :                 },
    1202            0 :             };
    1203            0 :             let timeline_info = storage_controller
    1204            0 :                 .tenant_timeline_create(tenant_id, create_req)
    1205            0 :                 .await?;
    1206              : 
    1207            0 :             let last_record_lsn = timeline_info.last_record_lsn;
    1208            0 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
    1209              : 
    1210            0 :             println!(
    1211            0 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
    1212              :                 timeline_info.timeline_id
    1213              :             );
    1214              :         }
    1215              :         // TODO: rename to import-basebackup-plus-wal
    1216            0 :         TimelineCmd::Import(args) => {
    1217            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1218            0 :             let timeline_id = args.timeline_id;
    1219            0 :             let branch_name = &args.branch_name;
    1220              : 
    1221              :             // Parse base inputs
    1222            0 :             let base = (args.base_lsn, args.base_tarfile.clone());
    1223              : 
    1224              :             // Parse pg_wal inputs
    1225            0 :             let wal_tarfile = args.wal_tarfile.clone();
    1226            0 :             let end_lsn = args.end_lsn;
    1227              :             // TODO validate both or none are provided
    1228            0 :             let pg_wal = end_lsn.zip(wal_tarfile);
    1229              : 
    1230            0 :             println!("Importing timeline into pageserver ...");
    1231            0 :             pageserver
    1232            0 :                 .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
    1233            0 :                 .await?;
    1234            0 :             if env.storage_controller.timelines_onto_safekeepers {
    1235            0 :                 println!("Creating timeline on safekeeper ...");
    1236            0 :                 let timeline_info = pageserver
    1237            0 :                     .timeline_info(
    1238            0 :                         TenantShardId::unsharded(tenant_id),
    1239            0 :                         timeline_id,
    1240            0 :                         pageserver_client::mgmt_api::ForceAwaitLogicalSize::No,
    1241            0 :                     )
    1242            0 :                     .await?;
    1243            0 :                 let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap());
    1244            0 :                 let default_host = default_sk
    1245            0 :                     .conf
    1246            0 :                     .listen_addr
    1247            0 :                     .clone()
    1248            0 :                     .unwrap_or_else(|| "localhost".to_string());
    1249            0 :                 let mconf = safekeeper_api::membership::Configuration {
    1250            0 :                     generation: SafekeeperGeneration::new(1),
    1251            0 :                     members: safekeeper_api::membership::MemberSet {
    1252            0 :                         m: vec![SafekeeperId {
    1253            0 :                             host: default_host,
    1254            0 :                             id: default_sk.conf.id,
    1255            0 :                             pg_port: default_sk.conf.pg_port,
    1256            0 :                         }],
    1257            0 :                     },
    1258            0 :                     new_members: None,
    1259            0 :                 };
    1260            0 :                 let pg_version = PgVersionId::from(args.pg_version);
    1261            0 :                 let req = safekeeper_api::models::TimelineCreateRequest {
    1262            0 :                     tenant_id,
    1263            0 :                     timeline_id,
    1264            0 :                     mconf,
    1265            0 :                     pg_version,
    1266            0 :                     system_id: None,
    1267            0 :                     wal_seg_size: None,
    1268            0 :                     start_lsn: timeline_info.last_record_lsn,
    1269            0 :                     commit_lsn: None,
    1270            0 :                 };
    1271            0 :                 default_sk.create_timeline(&req).await?;
    1272            0 :             }
    1273            0 :             env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
    1274            0 :             println!("Done");
    1275              :         }
    1276            0 :         TimelineCmd::Branch(args) => {
    1277            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1278            0 :             let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
    1279            0 :             let new_branch_name = &args.branch_name;
    1280            0 :             let ancestor_branch_name = args
    1281            0 :                 .ancestor_branch_name
    1282            0 :                 .clone()
    1283            0 :                 .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
    1284            0 :             let ancestor_timeline_id = env
    1285            0 :                 .get_branch_timeline_id(&ancestor_branch_name, tenant_id)
    1286            0 :                 .ok_or_else(|| {
    1287            0 :                     anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
    1288            0 :                 })?;
    1289              : 
    1290            0 :             let start_lsn = args.ancestor_start_lsn;
    1291            0 :             let storage_controller = StorageController::from_env(env);
    1292            0 :             let create_req = TimelineCreateRequest {
    1293            0 :                 new_timeline_id,
    1294            0 :                 mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
    1295            0 :                     ancestor_timeline_id,
    1296            0 :                     ancestor_start_lsn: start_lsn,
    1297            0 :                     read_only: false,
    1298            0 :                     pg_version: None,
    1299            0 :                 },
    1300            0 :             };
    1301            0 :             let timeline_info = storage_controller
    1302            0 :                 .tenant_timeline_create(tenant_id, create_req)
    1303            0 :                 .await?;
    1304              : 
    1305            0 :             let last_record_lsn = timeline_info.last_record_lsn;
    1306              : 
    1307            0 :             env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
    1308              : 
    1309            0 :             println!(
    1310            0 :                 "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
    1311              :                 timeline_info.timeline_id
    1312              :             );
    1313              :         }
    1314              :     }
    1315              : 
    1316            0 :     Ok(())
    1317            0 : }
    1318              : 
    1319            0 : async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> {
    1320            0 :     let mut cplane = ComputeControlPlane::load(env.clone())?;
    1321              : 
    1322            0 :     match subcmd {
    1323            0 :         EndpointCmd::List(args) => {
    1324              :             // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
    1325              :             // where shard 0 is attached, and query there.
    1326            0 :             let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
    1327              : 
    1328            0 :             let timeline_name_mappings = env.timeline_name_mappings();
    1329              : 
    1330            0 :             let mut table = comfy_table::Table::new();
    1331              : 
    1332            0 :             table.load_preset(comfy_table::presets::NOTHING);
    1333              : 
    1334            0 :             table.set_header([
    1335            0 :                 "ENDPOINT",
    1336            0 :                 "ADDRESS",
    1337            0 :                 "TIMELINE",
    1338            0 :                 "BRANCH NAME",
    1339            0 :                 "LSN",
    1340            0 :                 "STATUS",
    1341            0 :             ]);
    1342              : 
    1343            0 :             for (endpoint_id, endpoint) in cplane
    1344            0 :                 .endpoints
    1345            0 :                 .iter()
    1346            0 :                 .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
    1347              :             {
    1348            0 :                 let lsn_str = match endpoint.mode {
    1349            0 :                     ComputeMode::Static(lsn) => {
    1350              :                         // -> read-only endpoint
    1351              :                         // Use the node's LSN.
    1352            0 :                         lsn.to_string()
    1353              :                     }
    1354              :                     _ => {
    1355              :                         // As the LSN here refers to the one that the compute is started with,
    1356              :                         // we display nothing as it is a primary/hot standby compute.
    1357            0 :                         "---".to_string()
    1358              :                     }
    1359              :                 };
    1360              : 
    1361            0 :                 let branch_name = timeline_name_mappings
    1362            0 :                     .get(&TenantTimelineId::new(
    1363            0 :                         tenant_shard_id.tenant_id,
    1364            0 :                         endpoint.timeline_id,
    1365            0 :                     ))
    1366            0 :                     .map(|name| name.as_str())
    1367            0 :                     .unwrap_or("?");
    1368              : 
    1369            0 :                 table.add_row([
    1370            0 :                     endpoint_id.as_str(),
    1371            0 :                     &endpoint.pg_address.to_string(),
    1372            0 :                     &endpoint.timeline_id.to_string(),
    1373            0 :                     branch_name,
    1374            0 :                     lsn_str.as_str(),
    1375            0 :                     &format!("{}", endpoint.status()),
    1376            0 :                 ]);
    1377              :             }
    1378              : 
    1379            0 :             println!("{table}");
    1380              :         }
    1381            0 :         EndpointCmd::Create(args) => {
    1382            0 :             let tenant_id = get_tenant_id(args.tenant_id, env)?;
    1383            0 :             let branch_name = args
    1384            0 :                 .branch_name
    1385            0 :                 .clone()
    1386            0 :                 .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
    1387            0 :             let endpoint_id = args
    1388            0 :                 .endpoint_id
    1389            0 :                 .clone()
    1390            0 :                 .unwrap_or_else(|| format!("ep-{branch_name}"));
    1391              : 
    1392            0 :             let timeline_id = env
    1393            0 :                 .get_branch_timeline_id(&branch_name, tenant_id)
    1394            0 :                 .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
    1395              : 
    1396            0 :             let mode = match (args.lsn, args.hot_standby) {
    1397            0 :                 (Some(lsn), false) => ComputeMode::Static(lsn),
    1398            0 :                 (None, true) => ComputeMode::Replica,
    1399            0 :                 (None, false) => ComputeMode::Primary,
    1400            0 :                 (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
    1401              :             };
    1402              : 
    1403            0 :             match (mode, args.hot_standby) {
    1404              :                 (ComputeMode::Static(_), true) => {
    1405            0 :                     bail!(
    1406            0 :                         "Cannot start a node in hot standby mode when it is already configured as a static replica"
    1407              :                     )
    1408              :                 }
    1409              :                 (ComputeMode::Primary, true) => {
    1410            0 :                     bail!(
    1411            0 :                         "Cannot start a node as a hot standby replica, it is already configured as primary node"
    1412              :                     )
    1413              :                 }
    1414            0 :                 _ => {}
    1415              :             }
    1416              : 
    1417            0 :             if !args.allow_multiple {
    1418            0 :                 cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
    1419            0 :             }
    1420              : 
    1421            0 :             cplane.new_endpoint(
    1422            0 :                 &endpoint_id,
    1423            0 :                 tenant_id,
    1424            0 :                 timeline_id,
    1425            0 :                 args.pg_port,
    1426            0 :                 args.external_http_port,
    1427            0 :                 args.internal_http_port,
    1428            0 :                 args.pg_version,
    1429            0 :                 mode,
    1430            0 :                 args.grpc,
    1431            0 :                 !args.update_catalog,
    1432              :                 false,
    1433            0 :                 args.privileged_role_name.clone(),
    1434            0 :             )?;
    1435              :         }
    1436            0 :         EndpointCmd::Start(args) => {
    1437            0 :             let endpoint_id = &args.endpoint_id;
    1438            0 :             let pageserver_id = args.endpoint_pageserver_id;
    1439            0 :             let remote_ext_base_url = &args.remote_ext_base_url;
    1440              : 
    1441            0 :             let default_generation = env
    1442            0 :                 .storage_controller
    1443            0 :                 .timelines_onto_safekeepers
    1444            0 :                 .then_some(1);
    1445            0 :             let safekeepers_generation = args
    1446            0 :                 .safekeepers_generation
    1447            0 :                 .or(default_generation)
    1448            0 :                 .map(SafekeeperGeneration::new);
    1449              :             // If --safekeepers argument is given, use only the listed
    1450              :             // safekeeper nodes; otherwise all from the env.
    1451            0 :             let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
    1452            0 :                 safekeepers
    1453              :             } else {
    1454            0 :                 env.safekeepers.iter().map(|sk| sk.id).collect()
    1455              :             };
    1456              : 
    1457            0 :             let endpoint = cplane
    1458            0 :                 .endpoints
    1459            0 :                 .get(endpoint_id.as_str())
    1460            0 :                 .ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
    1461              : 
    1462            0 :             if !args.allow_multiple {
    1463            0 :                 cplane.check_conflicting_endpoints(
    1464            0 :                     endpoint.mode,
    1465            0 :                     endpoint.tenant_id,
    1466            0 :                     endpoint.timeline_id,
    1467            0 :                 )?;
    1468            0 :             }
    1469              : 
    1470            0 :             let prefer_protocol = if endpoint.grpc {
    1471            0 :                 PageserverProtocol::Grpc
    1472              :             } else {
    1473            0 :                 PageserverProtocol::Libpq
    1474              :             };
    1475              : 
    1476            0 :             let mut pageserver_conninfo = if let Some(ps_id) = pageserver_id {
    1477            0 :                 let conf = env.get_pageserver_conf(ps_id).unwrap();
    1478            0 :                 local_pageserver_conf_to_conn_info(conf)?
    1479              :             } else {
    1480              :                 // Look up the currently attached location of the tenant, and its striping metadata,
    1481              :                 // to pass these on to postgres.
    1482            0 :                 let storage_controller = StorageController::from_env(env);
    1483            0 :                 let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
    1484            0 :                 assert!(!locate_result.shards.is_empty());
    1485              : 
    1486              :                 // Initialize LSN leases for static computes.
    1487            0 :                 if let ComputeMode::Static(lsn) = endpoint.mode {
    1488            0 :                     futures::future::try_join_all(locate_result.shards.iter().map(
    1489            0 :                         |shard| async move {
    1490            0 :                             let conf = env.get_pageserver_conf(shard.node_id).unwrap();
    1491            0 :                             let pageserver = PageServerNode::from_env(env, conf);
    1492              : 
    1493            0 :                             pageserver
    1494            0 :                                 .http_client
    1495            0 :                                 .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
    1496            0 :                                 .await
    1497            0 :                         },
    1498              :                     ))
    1499            0 :                     .await?;
    1500            0 :                 }
    1501              : 
    1502            0 :                 tenant_locate_response_to_conn_info(&locate_result)?
    1503              :             };
    1504            0 :             pageserver_conninfo.prefer_protocol = prefer_protocol;
    1505              : 
    1506            0 :             let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
    1507            0 :             let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
    1508            0 :                 let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
    1509              : 
    1510            0 :                 Some(env.generate_auth_token(&claims)?)
    1511              :             } else {
    1512            0 :                 None
    1513              :             };
    1514              : 
    1515            0 :             let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
    1516            0 :                 + Duration::from_secs(86400))
    1517            0 :             .as_secs();
    1518            0 :             let claims = endpoint_storage::claims::EndpointStorageClaims {
    1519            0 :                 tenant_id: endpoint.tenant_id,
    1520            0 :                 timeline_id: endpoint.timeline_id,
    1521            0 :                 endpoint_id: endpoint_id.to_string(),
    1522            0 :                 exp,
    1523            0 :             };
    1524              : 
    1525            0 :             let endpoint_storage_token = env.generate_auth_token(&claims)?;
    1526            0 :             let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
    1527              : 
    1528            0 :             let args = control_plane::endpoint::EndpointStartArgs {
    1529            0 :                 auth_token,
    1530            0 :                 endpoint_storage_token,
    1531            0 :                 endpoint_storage_addr,
    1532            0 :                 safekeepers_generation,
    1533            0 :                 safekeepers,
    1534            0 :                 pageserver_conninfo,
    1535            0 :                 remote_ext_base_url: remote_ext_base_url.clone(),
    1536            0 :                 create_test_user: args.create_test_user,
    1537            0 :                 start_timeout: args.start_timeout,
    1538            0 :                 autoprewarm: args.autoprewarm,
    1539            0 :                 offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
    1540            0 :                 dev: args.dev,
    1541            0 :             };
    1542              : 
    1543            0 :             println!("Starting existing endpoint {endpoint_id}...");
    1544            0 :             endpoint.start(args).await?;
    1545              :         }
    1546            0 :         EndpointCmd::UpdatePageservers(args) => {
    1547            0 :             let endpoint_id = &args.endpoint_id;
    1548            0 :             let endpoint = cplane
    1549            0 :                 .endpoints
    1550            0 :                 .get(endpoint_id.as_str())
    1551            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
    1552            0 :             let prefer_protocol = if endpoint.grpc {
    1553            0 :                 PageserverProtocol::Grpc
    1554              :             } else {
    1555            0 :                 PageserverProtocol::Libpq
    1556              :             };
    1557            0 :             let mut pageserver_conninfo = match args.pageserver_id {
    1558            0 :                 Some(pageserver_id) => {
    1559            0 :                     let conf = env.get_pageserver_conf(pageserver_id)?;
    1560            0 :                     local_pageserver_conf_to_conn_info(conf)?
    1561              :                 }
    1562              :                 None => {
    1563            0 :                     let storage_controller = StorageController::from_env(env);
    1564            0 :                     let locate_result =
    1565            0 :                         storage_controller.tenant_locate(endpoint.tenant_id).await?;
    1566              : 
    1567            0 :                     tenant_locate_response_to_conn_info(&locate_result)?
    1568              :                 }
    1569              :             };
    1570            0 :             pageserver_conninfo.prefer_protocol = prefer_protocol;
    1571              : 
    1572            0 :             endpoint
    1573            0 :                 .update_pageservers_in_config(&pageserver_conninfo)
    1574            0 :                 .await?;
    1575              :         }
    1576            0 :         EndpointCmd::Reconfigure(args) => {
    1577            0 :             let endpoint_id = &args.endpoint_id;
    1578            0 :             let endpoint = cplane
    1579            0 :                 .endpoints
    1580            0 :                 .get(endpoint_id.as_str())
    1581            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
    1582              : 
    1583            0 :             let prefer_protocol = if endpoint.grpc {
    1584            0 :                 PageserverProtocol::Grpc
    1585              :             } else {
    1586            0 :                 PageserverProtocol::Libpq
    1587              :             };
    1588            0 :             let mut pageserver_conninfo = if let Some(ps_id) = args.endpoint_pageserver_id {
    1589            0 :                 let conf = env.get_pageserver_conf(ps_id)?;
    1590            0 :                 local_pageserver_conf_to_conn_info(conf)?
    1591              :             } else {
    1592              :                 // Look up the currently attached location of the tenant, and its striping metadata,
    1593              :                 // to pass these on to postgres.
    1594            0 :                 let storage_controller = StorageController::from_env(env);
    1595            0 :                 let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
    1596              : 
    1597            0 :                 tenant_locate_response_to_conn_info(&locate_result)?
    1598              :             };
    1599            0 :             pageserver_conninfo.prefer_protocol = prefer_protocol;
    1600              : 
    1601              :             // If --safekeepers argument is given, use only the listed
    1602              :             // safekeeper nodes; otherwise all from the env.
    1603            0 :             let safekeepers = parse_safekeepers(&args.safekeepers)?;
    1604            0 :             endpoint
    1605            0 :                 .reconfigure(Some(&pageserver_conninfo), safekeepers, None)
    1606            0 :                 .await?;
    1607              :         }
    1608            0 :         EndpointCmd::RefreshConfiguration(args) => {
    1609            0 :             let endpoint_id = &args.endpoint_id;
    1610            0 :             let endpoint = cplane
    1611            0 :                 .endpoints
    1612            0 :                 .get(endpoint_id.as_str())
    1613            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
    1614            0 :             endpoint.refresh_configuration().await?;
    1615              :         }
    1616            0 :         EndpointCmd::Stop(args) => {
    1617            0 :             let endpoint_id = &args.endpoint_id;
    1618            0 :             let endpoint = cplane
    1619            0 :                 .endpoints
    1620            0 :                 .get(endpoint_id)
    1621            0 :                 .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
    1622            0 :             match endpoint.stop(args.mode, args.destroy).await?.lsn {
    1623            0 :                 Some(lsn) => println!("{lsn}"),
    1624            0 :                 None => println!("null"),
    1625              :             }
    1626              :         }
    1627            0 :         EndpointCmd::GenerateJwt(args) => {
    1628            0 :             let endpoint = {
    1629            0 :                 let endpoint_id = &args.endpoint_id;
    1630              : 
    1631            0 :                 cplane
    1632            0 :                     .endpoints
    1633            0 :                     .get(endpoint_id)
    1634            0 :                     .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
    1635              :             };
    1636              : 
    1637            0 :             let jwt = endpoint.generate_jwt(args.scope)?;
    1638              : 
    1639            0 :             print!("{jwt}");
    1640              :         }
    1641              :     }
    1642              : 
    1643            0 :     Ok(())
    1644            0 : }
    1645              : 
    1646              : /// Parse --safekeepers as list of safekeeper ids.
    1647            0 : fn parse_safekeepers(safekeepers_str: &Option<String>) -> Result<Option<Vec<NodeId>>> {
    1648            0 :     if let Some(safekeepers_str) = safekeepers_str {
    1649            0 :         let mut safekeepers: Vec<NodeId> = Vec::new();
    1650            0 :         for sk_id in safekeepers_str.split(',').map(str::trim) {
    1651            0 :             let sk_id = NodeId(
    1652            0 :                 u64::from_str(sk_id)
    1653            0 :                     .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
    1654              :             );
    1655            0 :             safekeepers.push(sk_id);
    1656              :         }
    1657            0 :         Ok(Some(safekeepers))
    1658              :     } else {
    1659            0 :         Ok(None)
    1660              :     }
    1661            0 : }
    1662              : 
    1663            0 : fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> {
    1664            0 :     match subcmd {
    1665            0 :         MappingsCmd::Map(args) => {
    1666            0 :             env.register_branch_mapping(
    1667            0 :                 args.branch_name.to_owned(),
    1668            0 :                 args.tenant_id,
    1669            0 :                 args.timeline_id,
    1670            0 :             )?;
    1671              : 
    1672            0 :             Ok(())
    1673              :         }
    1674              :     }
    1675            0 : }
    1676              : 
    1677            0 : fn get_pageserver(
    1678            0 :     env: &local_env::LocalEnv,
    1679            0 :     pageserver_id_arg: Option<NodeId>,
    1680            0 : ) -> Result<PageServerNode> {
    1681            0 :     let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID);
    1682              : 
    1683            0 :     Ok(PageServerNode::from_env(
    1684            0 :         env,
    1685            0 :         env.get_pageserver_conf(node_id)?,
    1686              :     ))
    1687            0 : }
    1688              : 
    1689            0 : async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> {
    1690            0 :     match subcmd {
    1691            0 :         PageserverCmd::Start(args) => {
    1692            0 :             if let Err(e) = get_pageserver(env, args.pageserver_id)?
    1693            0 :                 .start(&args.start_timeout)
    1694            0 :                 .await
    1695              :             {
    1696            0 :                 eprintln!("pageserver start failed: {e}");
    1697            0 :                 exit(1);
    1698            0 :             }
    1699              :         }
    1700              : 
    1701            0 :         PageserverCmd::Stop(args) => {
    1702            0 :             let immediate = match args.stop_mode {
    1703            0 :                 StopMode::Fast => false,
    1704            0 :                 StopMode::Immediate => true,
    1705              :             };
    1706            0 :             if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
    1707            0 :                 eprintln!("pageserver stop failed: {e}");
    1708            0 :                 exit(1);
    1709            0 :             }
    1710              :         }
    1711              : 
    1712            0 :         PageserverCmd::Restart(args) => {
    1713            0 :             let pageserver = get_pageserver(env, args.pageserver_id)?;
    1714              :             //TODO what shutdown strategy should we use here?
    1715            0 :             if let Err(e) = pageserver.stop(false) {
    1716            0 :                 eprintln!("pageserver stop failed: {e}");
    1717            0 :                 exit(1);
    1718            0 :             }
    1719              : 
    1720            0 :             if let Err(e) = pageserver.start(&args.start_timeout).await {
    1721            0 :                 eprintln!("pageserver start failed: {e}");
    1722            0 :                 exit(1);
    1723            0 :             }
    1724              :         }
    1725              : 
    1726            0 :         PageserverCmd::Status(args) => {
    1727            0 :             match get_pageserver(env, args.pageserver_id)?
    1728            0 :                 .check_status()
    1729            0 :                 .await
    1730              :             {
    1731            0 :                 Ok(_) => println!("Page server is up and running"),
    1732            0 :                 Err(err) => {
    1733            0 :                     eprintln!("Page server is not available: {err}");
    1734            0 :                     exit(1);
    1735              :                 }
    1736              :             }
    1737              :         }
    1738              :     }
    1739            0 :     Ok(())
    1740            0 : }
    1741              : 
    1742            0 : async fn handle_storage_controller(
    1743            0 :     subcmd: &StorageControllerCmd,
    1744            0 :     env: &local_env::LocalEnv,
    1745            0 : ) -> Result<()> {
    1746            0 :     let svc = StorageController::from_env(env);
    1747            0 :     match subcmd {
    1748            0 :         StorageControllerCmd::Start(args) => {
    1749            0 :             let start_args = NeonStorageControllerStartArgs {
    1750            0 :                 instance_id: args.instance_id,
    1751            0 :                 base_port: args.base_port,
    1752            0 :                 start_timeout: args.start_timeout,
    1753            0 :                 handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
    1754            0 :             };
    1755              : 
    1756            0 :             if let Err(e) = svc.start(start_args).await {
    1757            0 :                 eprintln!("start failed: {e}");
    1758            0 :                 exit(1);
    1759            0 :             }
    1760              :         }
    1761              : 
    1762            0 :         StorageControllerCmd::Stop(args) => {
    1763            0 :             let stop_args = NeonStorageControllerStopArgs {
    1764            0 :                 instance_id: args.instance_id,
    1765            0 :                 immediate: match args.stop_mode {
    1766            0 :                     StopMode::Fast => false,
    1767            0 :                     StopMode::Immediate => true,
    1768              :                 },
    1769              :             };
    1770            0 :             if let Err(e) = svc.stop(stop_args).await {
    1771            0 :                 eprintln!("stop failed: {e}");
    1772            0 :                 exit(1);
    1773            0 :             }
    1774              :         }
    1775              :     }
    1776            0 :     Ok(())
    1777            0 : }
    1778              : 
    1779            0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
    1780            0 :     if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
    1781            0 :         Ok(SafekeeperNode::from_env(env, node))
    1782              :     } else {
    1783            0 :         bail!("could not find safekeeper {id}")
    1784              :     }
    1785            0 : }
    1786              : 
    1787            0 : async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> {
    1788            0 :     match subcmd {
    1789            0 :         SafekeeperCmd::Start(args) => {
    1790            0 :             let safekeeper = get_safekeeper(env, args.id)?;
    1791              : 
    1792            0 :             if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
    1793            0 :                 eprintln!("safekeeper start failed: {e}");
    1794            0 :                 exit(1);
    1795            0 :             }
    1796              :         }
    1797              : 
    1798            0 :         SafekeeperCmd::Stop(args) => {
    1799            0 :             let safekeeper = get_safekeeper(env, args.id)?;
    1800            0 :             let immediate = match args.stop_mode {
    1801            0 :                 StopMode::Fast => false,
    1802            0 :                 StopMode::Immediate => true,
    1803              :             };
    1804            0 :             if let Err(e) = safekeeper.stop(immediate) {
    1805            0 :                 eprintln!("safekeeper stop failed: {e}");
    1806            0 :                 exit(1);
    1807            0 :             }
    1808              :         }
    1809              : 
    1810            0 :         SafekeeperCmd::Restart(args) => {
    1811            0 :             let safekeeper = get_safekeeper(env, args.id)?;
    1812            0 :             let immediate = match args.stop_mode {
    1813            0 :                 StopMode::Fast => false,
    1814            0 :                 StopMode::Immediate => true,
    1815              :             };
    1816              : 
    1817            0 :             if let Err(e) = safekeeper.stop(immediate) {
    1818            0 :                 eprintln!("safekeeper stop failed: {e}");
    1819            0 :                 exit(1);
    1820            0 :             }
    1821              : 
    1822            0 :             if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
    1823            0 :                 eprintln!("safekeeper start failed: {e}");
    1824            0 :                 exit(1);
    1825            0 :             }
    1826              :         }
    1827              :     }
    1828            0 :     Ok(())
    1829            0 : }
    1830              : 
    1831            0 : async fn handle_endpoint_storage(
    1832            0 :     subcmd: &EndpointStorageCmd,
    1833            0 :     env: &local_env::LocalEnv,
    1834            0 : ) -> Result<()> {
    1835              :     use EndpointStorageCmd::*;
    1836            0 :     let storage = EndpointStorage::from_env(env);
    1837              : 
    1838              :     // In tests like test_forward_compatibility or test_graceful_cluster_restart
    1839              :     // old neon binaries (without endpoint_storage) are present
    1840            0 :     if !storage.bin.exists() {
    1841            0 :         eprintln!(
    1842            0 :             "{} binary not found. Ignore if this is a compatibility test",
    1843              :             storage.bin
    1844              :         );
    1845            0 :         return Ok(());
    1846            0 :     }
    1847              : 
    1848            0 :     match subcmd {
    1849            0 :         Start(EndpointStorageStartCmd { start_timeout }) => {
    1850            0 :             if let Err(e) = storage.start(start_timeout).await {
    1851            0 :                 eprintln!("endpoint_storage start failed: {e}");
    1852            0 :                 exit(1);
    1853            0 :             }
    1854              :         }
    1855            0 :         Stop(EndpointStorageStopCmd { stop_mode }) => {
    1856            0 :             let immediate = match stop_mode {
    1857            0 :                 StopMode::Fast => false,
    1858            0 :                 StopMode::Immediate => true,
    1859              :             };
    1860            0 :             if let Err(e) = storage.stop(immediate) {
    1861            0 :                 eprintln!("proxy stop failed: {e}");
    1862            0 :                 exit(1);
    1863            0 :             }
    1864              :         }
    1865              :     };
    1866            0 :     Ok(())
    1867            0 : }
    1868              : 
    1869            0 : async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
    1870            0 :     match subcmd {
    1871            0 :         StorageBrokerCmd::Start(args) => {
    1872            0 :             let storage_broker = StorageBroker::from_env(env);
    1873            0 :             if let Err(e) = storage_broker.start(&args.start_timeout).await {
    1874            0 :                 eprintln!("broker start failed: {e}");
    1875            0 :                 exit(1);
    1876            0 :             }
    1877              :         }
    1878              : 
    1879            0 :         StorageBrokerCmd::Stop(_args) => {
    1880              :             // FIXME: stop_mode unused
    1881            0 :             let storage_broker = StorageBroker::from_env(env);
    1882            0 :             if let Err(e) = storage_broker.stop() {
    1883            0 :                 eprintln!("broker stop failed: {e}");
    1884            0 :                 exit(1);
    1885            0 :             }
    1886              :         }
    1887              :     }
    1888            0 :     Ok(())
    1889            0 : }
    1890              : 
    1891            0 : async fn handle_start_all(
    1892            0 :     args: &StartCmdArgs,
    1893            0 :     env: &'static local_env::LocalEnv,
    1894            0 : ) -> anyhow::Result<()> {
    1895              :     // FIXME: this was called "retry_timeout", is it right?
    1896            0 :     let Err(errors) = handle_start_all_impl(env, args.timeout).await else {
    1897            0 :         neon_start_status_check(env, args.timeout.as_ref())
    1898            0 :             .await
    1899            0 :             .context("status check after successful startup of all services")?;
    1900            0 :         return Ok(());
    1901              :     };
    1902              : 
    1903            0 :     eprintln!("startup failed because one or more services could not be started");
    1904              : 
    1905            0 :     for e in errors {
    1906            0 :         eprintln!("{e}");
    1907            0 :         let debug_repr = format!("{e:?}");
    1908            0 :         for line in debug_repr.lines() {
    1909            0 :             eprintln!("  {line}");
    1910            0 :         }
    1911              :     }
    1912              : 
    1913            0 :     try_stop_all(env, true).await;
    1914              : 
    1915            0 :     exit(2);
    1916            0 : }
    1917              : 
    1918              : /// Returns Ok() if and only if all services could be started successfully.
    1919              : /// Otherwise, returns the list of errors that occurred during startup.
    1920            0 : async fn handle_start_all_impl(
    1921            0 :     env: &'static local_env::LocalEnv,
    1922            0 :     retry_timeout: humantime::Duration,
    1923            0 : ) -> Result<(), Vec<anyhow::Error>> {
    1924              :     // Endpoints are not started automatically
    1925              : 
    1926            0 :     let mut js = JoinSet::new();
    1927              : 
    1928              :     // force infalliblity through closure
    1929              :     #[allow(clippy::redundant_closure_call)]
    1930            0 :     (|| {
    1931            0 :         js.spawn(async move {
    1932            0 :             let storage_broker = StorageBroker::from_env(env);
    1933            0 :             storage_broker
    1934            0 :                 .start(&retry_timeout)
    1935            0 :                 .await
    1936            0 :                 .map_err(|e| e.context("start storage_broker"))
    1937            0 :         });
    1938              : 
    1939            0 :         js.spawn(async move {
    1940            0 :             let storage_controller = StorageController::from_env(env);
    1941            0 :             storage_controller
    1942            0 :                 .start(NeonStorageControllerStartArgs::with_default_instance_id(
    1943            0 :                     retry_timeout,
    1944            0 :                 ))
    1945            0 :                 .await
    1946            0 :                 .map_err(|e| e.context("start storage_controller"))
    1947            0 :         });
    1948              : 
    1949            0 :         for ps_conf in &env.pageservers {
    1950            0 :             js.spawn(async move {
    1951            0 :                 let pageserver = PageServerNode::from_env(env, ps_conf);
    1952            0 :                 pageserver
    1953            0 :                     .start(&retry_timeout)
    1954            0 :                     .await
    1955            0 :                     .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
    1956            0 :             });
    1957              :         }
    1958              : 
    1959            0 :         for node in env.safekeepers.iter() {
    1960            0 :             js.spawn(async move {
    1961            0 :                 let safekeeper = SafekeeperNode::from_env(env, node);
    1962            0 :                 safekeeper
    1963            0 :                     .start(&[], &retry_timeout)
    1964            0 :                     .await
    1965            0 :                     .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
    1966            0 :             });
    1967              :         }
    1968              : 
    1969            0 :         js.spawn(async move {
    1970            0 :             EndpointStorage::from_env(env)
    1971            0 :                 .start(&retry_timeout)
    1972            0 :                 .await
    1973            0 :                 .map_err(|e| e.context("start endpoint_storage"))
    1974            0 :         });
    1975              :     })();
    1976              : 
    1977            0 :     let mut errors = Vec::new();
    1978            0 :     while let Some(result) = js.join_next().await {
    1979            0 :         let result = result.expect("we don't panic or cancel the tasks");
    1980            0 :         if let Err(e) = result {
    1981            0 :             errors.push(e);
    1982            0 :         }
    1983              :     }
    1984              : 
    1985            0 :     if !errors.is_empty() {
    1986            0 :         return Err(errors);
    1987            0 :     }
    1988              : 
    1989            0 :     Ok(())
    1990            0 : }
    1991              : 
    1992            0 : async fn neon_start_status_check(
    1993            0 :     env: &local_env::LocalEnv,
    1994            0 :     retry_timeout: &Duration,
    1995            0 : ) -> anyhow::Result<()> {
    1996              :     const RETRY_INTERVAL: Duration = Duration::from_millis(100);
    1997              :     const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
    1998              : 
    1999            0 :     let storcon = StorageController::from_env(env);
    2000              : 
    2001            0 :     let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
    2002            0 :     let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
    2003              : 
    2004            0 :     println!("\nRunning neon status check");
    2005              : 
    2006            0 :     for retry in 0..retries {
    2007            0 :         if retry == notice_after_retries {
    2008            0 :             println!("\nNeon status check has not passed yet, continuing to wait")
    2009            0 :         }
    2010              : 
    2011            0 :         let mut passed = true;
    2012            0 :         let mut nodes = storcon.node_list().await?;
    2013            0 :         let mut pageservers = env.pageservers.clone();
    2014              : 
    2015            0 :         if nodes.len() != pageservers.len() {
    2016            0 :             continue;
    2017            0 :         }
    2018              : 
    2019            0 :         nodes.sort_by_key(|ps| ps.id);
    2020            0 :         pageservers.sort_by_key(|ps| ps.id);
    2021              : 
    2022            0 :         for (idx, pageserver) in pageservers.iter().enumerate() {
    2023            0 :             let node = &nodes[idx];
    2024            0 :             if node.id != pageserver.id {
    2025            0 :                 passed = false;
    2026            0 :                 break;
    2027            0 :             }
    2028              : 
    2029            0 :             if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
    2030            0 :                 passed = false;
    2031            0 :                 break;
    2032            0 :             }
    2033              :         }
    2034              : 
    2035            0 :         if passed {
    2036            0 :             println!("\nNeon started and passed status check");
    2037            0 :             return Ok(());
    2038            0 :         }
    2039              : 
    2040            0 :         tokio::time::sleep(RETRY_INTERVAL).await;
    2041              :     }
    2042              : 
    2043            0 :     anyhow::bail!("\nNeon passed status check")
    2044            0 : }
    2045              : 
    2046            0 : async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> {
    2047            0 :     let immediate = match args.mode {
    2048            0 :         StopMode::Fast => false,
    2049            0 :         StopMode::Immediate => true,
    2050              :     };
    2051              : 
    2052            0 :     try_stop_all(env, immediate).await;
    2053              : 
    2054            0 :     Ok(())
    2055            0 : }
    2056              : 
    2057            0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
    2058            0 :     let mode = if immediate {
    2059            0 :         EndpointTerminateMode::Immediate
    2060              :     } else {
    2061            0 :         EndpointTerminateMode::Fast
    2062              :     };
    2063              :     // Stop all endpoints
    2064            0 :     match ComputeControlPlane::load(env.clone()) {
    2065            0 :         Ok(cplane) => {
    2066            0 :             for (_k, node) in cplane.endpoints {
    2067            0 :                 if let Err(e) = node.stop(mode, false).await {
    2068            0 :                     eprintln!("postgres stop failed: {e:#}");
    2069            0 :                 }
    2070              :             }
    2071              :         }
    2072            0 :         Err(e) => {
    2073            0 :             eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
    2074              :         }
    2075              :     }
    2076              : 
    2077            0 :     let storage = EndpointStorage::from_env(env);
    2078            0 :     if let Err(e) = storage.stop(immediate) {
    2079            0 :         eprintln!("endpoint_storage stop failed: {e:#}");
    2080            0 :     }
    2081              : 
    2082            0 :     for ps_conf in &env.pageservers {
    2083            0 :         let pageserver = PageServerNode::from_env(env, ps_conf);
    2084            0 :         if let Err(e) = pageserver.stop(immediate) {
    2085            0 :             eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
    2086            0 :         }
    2087              :     }
    2088              : 
    2089            0 :     for node in env.safekeepers.iter() {
    2090            0 :         let safekeeper = SafekeeperNode::from_env(env, node);
    2091            0 :         if let Err(e) = safekeeper.stop(immediate) {
    2092            0 :             eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
    2093            0 :         }
    2094              :     }
    2095              : 
    2096            0 :     let storage_broker = StorageBroker::from_env(env);
    2097            0 :     if let Err(e) = storage_broker.stop() {
    2098            0 :         eprintln!("neon broker stop failed: {e:#}");
    2099            0 :     }
    2100              : 
    2101              :     // Stop all storage controller instances. In the most common case there's only one,
    2102              :     // but iterate though the base data directory in order to discover the instances.
    2103            0 :     let storcon_instances = env
    2104            0 :         .storage_controller_instances()
    2105            0 :         .await
    2106            0 :         .expect("Must inspect data dir");
    2107            0 :     for (instance_id, _instance_dir_path) in storcon_instances {
    2108            0 :         let storage_controller = StorageController::from_env(env);
    2109            0 :         let stop_args = NeonStorageControllerStopArgs {
    2110            0 :             instance_id,
    2111            0 :             immediate,
    2112            0 :         };
    2113              : 
    2114            0 :         if let Err(e) = storage_controller.stop(stop_args).await {
    2115            0 :             eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
    2116            0 :         }
    2117              :     }
    2118            0 : }
        

Generated by: LCOV version 2.1-beta