Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use std::borrow::Cow;
9 : use std::collections::{BTreeSet, HashMap};
10 : use std::fs::File;
11 : use std::path::PathBuf;
12 : use std::process::exit;
13 : use std::str::FromStr;
14 : use std::time::Duration;
15 :
16 : use anyhow::{Context, Result, anyhow, bail};
17 : use clap::Parser;
18 : use compute_api::requests::ComputeClaimsScope;
19 : use compute_api::spec::{ComputeMode, PageserverProtocol};
20 : use control_plane::broker::StorageBroker;
21 : use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
22 : use control_plane::endpoint::{
23 : local_pageserver_conf_to_conn_info, tenant_locate_response_to_conn_info,
24 : };
25 : use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
26 : use control_plane::local_env;
27 : use control_plane::local_env::{
28 : EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
29 : NeonLocalInitPageserverConf, SafekeeperConf,
30 : };
31 : use control_plane::pageserver::PageServerNode;
32 : use control_plane::safekeeper::SafekeeperNode;
33 : use control_plane::storage_controller::{
34 : NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
35 : };
36 : use nix::fcntl::{Flock, FlockArg};
37 : use pageserver_api::config::{
38 : DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
39 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
40 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
41 : };
42 : use pageserver_api::controller_api::{
43 : NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
44 : };
45 : use pageserver_api::models::{
46 : ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
47 : };
48 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
49 : use postgres_backend::AuthType;
50 : use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
51 : use safekeeper_api::{
52 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
53 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, PgMajorVersion, PgVersionId,
54 : };
55 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
56 : use tokio::task::JoinSet;
57 : use utils::auth::{Claims, Scope};
58 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
59 : use utils::lsn::Lsn;
60 : use utils::project_git_version;
61 :
62 : // Default id of a safekeeper node, if not specified on the command line.
63 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
64 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
65 : const DEFAULT_BRANCH_NAME: &str = "main";
66 : project_git_version!(GIT_VERSION);
67 :
68 : #[allow(dead_code)]
69 : const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
70 : const DEFAULT_PG_VERSION_NUM: &str = "17";
71 :
72 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
73 :
74 : /// Neon CLI.
75 : #[derive(clap::Parser)]
76 : #[command(version = GIT_VERSION, name = "Neon CLI")]
77 : struct Cli {
78 : #[command(subcommand)]
79 : command: NeonLocalCmd,
80 : }
81 :
82 : #[derive(clap::Subcommand)]
83 : enum NeonLocalCmd {
84 : Init(InitCmdArgs),
85 :
86 : #[command(subcommand)]
87 : Tenant(TenantCmd),
88 : #[command(subcommand)]
89 : Timeline(TimelineCmd),
90 : #[command(subcommand)]
91 : Pageserver(PageserverCmd),
92 : #[command(subcommand)]
93 : #[clap(alias = "storage_controller")]
94 : StorageController(StorageControllerCmd),
95 : #[command(subcommand)]
96 : #[clap(alias = "storage_broker")]
97 : StorageBroker(StorageBrokerCmd),
98 : #[command(subcommand)]
99 : Safekeeper(SafekeeperCmd),
100 : #[command(subcommand)]
101 : EndpointStorage(EndpointStorageCmd),
102 : #[command(subcommand)]
103 : Endpoint(EndpointCmd),
104 : #[command(subcommand)]
105 : Mappings(MappingsCmd),
106 :
107 : Start(StartCmdArgs),
108 : Stop(StopCmdArgs),
109 : }
110 :
111 : /// Initialize a new Neon repository, preparing configs for services to start with.
112 : #[derive(clap::Args)]
113 : struct InitCmdArgs {
114 : /// How many pageservers to create (default 1).
115 : #[clap(long)]
116 : num_pageservers: Option<u16>,
117 :
118 : #[clap(long)]
119 : config: Option<PathBuf>,
120 :
121 : /// Force initialization even if the repository is not empty.
122 : #[clap(long, default_value = "must-not-exist")]
123 : #[arg(value_parser)]
124 : force: InitForceMode,
125 : }
126 :
127 : /// Start pageserver and safekeepers.
128 : #[derive(clap::Args)]
129 : struct StartCmdArgs {
130 : #[clap(long = "start-timeout", default_value = "10s")]
131 : timeout: humantime::Duration,
132 : }
133 :
134 : /// Stop pageserver and safekeepers.
135 : #[derive(clap::Args)]
136 : struct StopCmdArgs {
137 : #[arg(value_enum)]
138 : #[clap(long, default_value_t = StopMode::Fast)]
139 : mode: StopMode,
140 : }
141 :
142 : #[derive(Clone, Copy, clap::ValueEnum)]
143 : enum StopMode {
144 : Fast,
145 : Immediate,
146 : }
147 :
148 : /// Manage tenants.
149 : #[derive(clap::Subcommand)]
150 : enum TenantCmd {
151 : List,
152 : Create(TenantCreateCmdArgs),
153 : SetDefault(TenantSetDefaultCmdArgs),
154 : Config(TenantConfigCmdArgs),
155 : Import(TenantImportCmdArgs),
156 : }
157 :
158 : #[derive(clap::Args)]
159 : struct TenantCreateCmdArgs {
160 : /// Tenant ID, as a 32-byte hexadecimal string.
161 : #[clap(long = "tenant-id")]
162 : tenant_id: Option<TenantId>,
163 :
164 : /// Use a specific timeline id when creating a tenant and its initial timeline.
165 : #[clap(long)]
166 : timeline_id: Option<TimelineId>,
167 :
168 : #[clap(short = 'c')]
169 : config: Vec<String>,
170 :
171 : /// Postgres version to use for the initial timeline.
172 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
173 : #[clap(long)]
174 : pg_version: PgMajorVersion,
175 :
176 : /// Use this tenant in future CLI commands where tenant_id is needed, but not specified.
177 : #[clap(long)]
178 : set_default: bool,
179 :
180 : /// Number of shards in the new tenant.
181 : #[clap(long)]
182 : #[arg(default_value_t = 0)]
183 : shard_count: u8,
184 : /// Sharding stripe size in pages.
185 : #[clap(long)]
186 : shard_stripe_size: Option<u32>,
187 :
188 : /// Placement policy shards in this tenant.
189 : #[clap(long)]
190 : #[arg(value_parser = parse_placement_policy)]
191 : placement_policy: Option<PlacementPolicy>,
192 : }
193 :
194 0 : fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
195 0 : Ok(serde_json::from_str::<PlacementPolicy>(s)?)
196 0 : }
197 :
198 : /// Set a particular tenant as default in future CLI commands where tenant_id is needed, but not
199 : /// specified.
200 : #[derive(clap::Args)]
201 : struct TenantSetDefaultCmdArgs {
202 : /// Tenant ID, as a 32-byte hexadecimal string.
203 : #[clap(long = "tenant-id")]
204 : tenant_id: TenantId,
205 : }
206 :
207 : #[derive(clap::Args)]
208 : struct TenantConfigCmdArgs {
209 : /// Tenant ID, as a 32-byte hexadecimal string.
210 : #[clap(long = "tenant-id")]
211 : tenant_id: Option<TenantId>,
212 :
213 : #[clap(short = 'c')]
214 : config: Vec<String>,
215 : }
216 :
217 : /// Import a tenant that is present in remote storage, and create branches for its timelines.
218 : #[derive(clap::Args)]
219 : struct TenantImportCmdArgs {
220 : /// Tenant ID, as a 32-byte hexadecimal string.
221 : #[clap(long = "tenant-id")]
222 : tenant_id: TenantId,
223 : }
224 :
225 : /// Manage timelines.
226 : #[derive(clap::Subcommand)]
227 : enum TimelineCmd {
228 : List(TimelineListCmdArgs),
229 : Branch(TimelineBranchCmdArgs),
230 : Create(TimelineCreateCmdArgs),
231 : Import(TimelineImportCmdArgs),
232 : }
233 :
234 : /// List all timelines available to this pageserver.
235 : #[derive(clap::Args)]
236 : struct TimelineListCmdArgs {
237 : /// Tenant ID, as a 32-byte hexadecimal string.
238 : #[clap(long = "tenant-id")]
239 : tenant_shard_id: Option<TenantShardId>,
240 : }
241 :
242 : /// Create a new timeline, branching off from another timeline.
243 : #[derive(clap::Args)]
244 : struct TimelineBranchCmdArgs {
245 : /// Tenant ID, as a 32-byte hexadecimal string.
246 : #[clap(long = "tenant-id")]
247 : tenant_id: Option<TenantId>,
248 : /// New timeline's ID, as a 32-byte hexadecimal string.
249 : #[clap(long)]
250 : timeline_id: Option<TimelineId>,
251 : /// Human-readable alias for the new timeline.
252 : #[clap(long)]
253 : branch_name: String,
254 : /// Use last Lsn of another timeline (and its data) as base when creating the new timeline. The
255 : /// timeline gets resolved by its branch name.
256 : #[clap(long)]
257 : ancestor_branch_name: Option<String>,
258 : /// When using another timeline as base, use a specific Lsn in it instead of the latest one.
259 : #[clap(long)]
260 : ancestor_start_lsn: Option<Lsn>,
261 : }
262 :
263 : /// Create a new blank timeline.
264 : #[derive(clap::Args)]
265 : struct TimelineCreateCmdArgs {
266 : /// Tenant ID, as a 32-byte hexadecimal string.
267 : #[clap(long = "tenant-id")]
268 : tenant_id: Option<TenantId>,
269 : /// New timeline's ID, as a 32-byte hexadecimal string.
270 : #[clap(long)]
271 : timeline_id: Option<TimelineId>,
272 : /// Human-readable alias for the new timeline.
273 : #[clap(long)]
274 : branch_name: String,
275 :
276 : /// Postgres version.
277 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
278 : #[clap(long)]
279 : pg_version: PgMajorVersion,
280 : }
281 :
282 : /// Import a timeline from a basebackup directory.
283 : #[derive(clap::Args)]
284 : struct TimelineImportCmdArgs {
285 : /// Tenant ID, as a 32-byte hexadecimal string.
286 : #[clap(long = "tenant-id")]
287 : tenant_id: Option<TenantId>,
288 : /// New timeline's ID, as a 32-byte hexadecimal string.
289 : #[clap(long)]
290 : timeline_id: TimelineId,
291 : /// Human-readable alias for the new timeline.
292 : #[clap(long)]
293 : branch_name: String,
294 : /// Basebackup tarfile to import.
295 : #[clap(long)]
296 : base_tarfile: PathBuf,
297 : /// LSN the basebackup starts at.
298 : #[clap(long)]
299 : base_lsn: Lsn,
300 : /// WAL to add after base.
301 : #[clap(long)]
302 : wal_tarfile: Option<PathBuf>,
303 : /// LSN the basebackup ends at.
304 : #[clap(long)]
305 : end_lsn: Option<Lsn>,
306 :
307 : /// Postgres version of the basebackup being imported.
308 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
309 : #[clap(long)]
310 : pg_version: PgMajorVersion,
311 : }
312 :
313 : /// Manage pageservers.
314 : #[derive(clap::Subcommand)]
315 : enum PageserverCmd {
316 : Status(PageserverStatusCmdArgs),
317 : Start(PageserverStartCmdArgs),
318 : Stop(PageserverStopCmdArgs),
319 : Restart(PageserverRestartCmdArgs),
320 : }
321 :
322 : /// Show status of a local pageserver.
323 : #[derive(clap::Args)]
324 : struct PageserverStatusCmdArgs {
325 : /// Pageserver ID.
326 : #[clap(long = "id")]
327 : pageserver_id: Option<NodeId>,
328 : }
329 :
330 : /// Start local pageserver.
331 : #[derive(clap::Args)]
332 : struct PageserverStartCmdArgs {
333 : /// Pageserver ID.
334 : #[clap(long = "id")]
335 : pageserver_id: Option<NodeId>,
336 : /// Timeout until we fail the command.
337 : #[clap(short = 't', long)]
338 : #[arg(default_value = "10s")]
339 : start_timeout: humantime::Duration,
340 : }
341 :
342 : /// Stop local pageserver.
343 : #[derive(clap::Args)]
344 : struct PageserverStopCmdArgs {
345 : /// Pageserver ID.
346 : #[clap(long = "id")]
347 : pageserver_id: Option<NodeId>,
348 : /// If 'immediate', don't flush repository data at shutdown
349 : #[clap(short = 'm')]
350 : #[arg(value_enum, default_value = "fast")]
351 : stop_mode: StopMode,
352 : }
353 :
354 : /// Restart local pageserver.
355 : #[derive(clap::Args)]
356 : struct PageserverRestartCmdArgs {
357 : /// Pageserver ID.
358 : #[clap(long = "id")]
359 : pageserver_id: Option<NodeId>,
360 : /// Timeout until we fail the command.
361 : #[clap(short = 't', long)]
362 : #[arg(default_value = "10s")]
363 : start_timeout: humantime::Duration,
364 : }
365 :
366 : /// Manage storage controller.
367 : #[derive(clap::Subcommand)]
368 : enum StorageControllerCmd {
369 : Start(StorageControllerStartCmdArgs),
370 : Stop(StorageControllerStopCmdArgs),
371 : }
372 :
373 : /// Start storage controller.
374 : #[derive(clap::Args)]
375 : struct StorageControllerStartCmdArgs {
376 : /// Timeout until we fail the command.
377 : #[clap(short = 't', long)]
378 : #[arg(default_value = "10s")]
379 : start_timeout: humantime::Duration,
380 : /// Identifier used to distinguish storage controller instances.
381 : #[clap(long)]
382 : #[arg(default_value_t = 1)]
383 : instance_id: u8,
384 : /// Base port for the storage controller instance identified by instance-id (defaults to
385 : /// pageserver cplane api).
386 : #[clap(long)]
387 : base_port: Option<u16>,
388 :
389 : /// Whether the storage controller should handle pageserver-reported local disk loss events.
390 : #[clap(long)]
391 : handle_ps_local_disk_loss: Option<bool>,
392 : }
393 :
394 : /// Stop storage controller.
395 : #[derive(clap::Args)]
396 : struct StorageControllerStopCmdArgs {
397 : /// If 'immediate', don't flush repository data at shutdown
398 : #[clap(short = 'm')]
399 : #[arg(value_enum, default_value = "fast")]
400 : stop_mode: StopMode,
401 : /// Identifier used to distinguish storage controller instances.
402 : #[clap(long)]
403 : #[arg(default_value_t = 1)]
404 : instance_id: u8,
405 : }
406 :
407 : /// Manage storage broker.
408 : #[derive(clap::Subcommand)]
409 : enum StorageBrokerCmd {
410 : Start(StorageBrokerStartCmdArgs),
411 : Stop(StorageBrokerStopCmdArgs),
412 : }
413 :
414 : /// Start broker.
415 : #[derive(clap::Args)]
416 : struct StorageBrokerStartCmdArgs {
417 : /// Timeout until we fail the command.
418 : #[clap(short = 't', long, default_value = "10s")]
419 : start_timeout: humantime::Duration,
420 : }
421 :
422 : /// Stop broker.
423 : #[derive(clap::Args)]
424 : struct StorageBrokerStopCmdArgs {
425 : /// If 'immediate', don't flush repository data on shutdown.
426 : #[clap(short = 'm')]
427 : #[arg(value_enum, default_value = "fast")]
428 : stop_mode: StopMode,
429 : }
430 :
431 : /// Manage safekeepers.
432 : #[derive(clap::Subcommand)]
433 : enum SafekeeperCmd {
434 : Start(SafekeeperStartCmdArgs),
435 : Stop(SafekeeperStopCmdArgs),
436 : Restart(SafekeeperRestartCmdArgs),
437 : }
438 :
439 : /// Manage object storage.
440 : #[derive(clap::Subcommand)]
441 : enum EndpointStorageCmd {
442 : Start(EndpointStorageStartCmd),
443 : Stop(EndpointStorageStopCmd),
444 : }
445 :
446 : /// Start object storage.
447 : #[derive(clap::Args)]
448 : struct EndpointStorageStartCmd {
449 : /// Timeout until we fail the command.
450 : #[clap(short = 't', long)]
451 : #[arg(default_value = "10s")]
452 : start_timeout: humantime::Duration,
453 : }
454 :
455 : /// Stop object storage.
456 : #[derive(clap::Args)]
457 : struct EndpointStorageStopCmd {
458 : /// If 'immediate', don't flush repository data on shutdown.
459 : #[clap(short = 'm')]
460 : #[arg(value_enum, default_value = "fast")]
461 : stop_mode: StopMode,
462 : }
463 :
464 : /// Start local safekeeper.
465 : #[derive(clap::Args)]
466 : struct SafekeeperStartCmdArgs {
467 : /// Safekeeper ID.
468 : #[arg(default_value_t = NodeId(1))]
469 : id: NodeId,
470 :
471 : /// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
472 : #[clap(short = 'e', long = "safekeeper-extra-opt")]
473 : extra_opt: Vec<String>,
474 :
475 : /// Timeout until we fail the command.
476 : #[clap(short = 't', long)]
477 : #[arg(default_value = "10s")]
478 : start_timeout: humantime::Duration,
479 : }
480 :
481 : /// Stop local safekeeper.
482 : #[derive(clap::Args)]
483 : struct SafekeeperStopCmdArgs {
484 : /// Safekeeper ID.
485 : #[arg(default_value_t = NodeId(1))]
486 : id: NodeId,
487 :
488 : /// If 'immediate', don't flush repository data on shutdown.
489 : #[arg(value_enum, default_value = "fast")]
490 : #[clap(short = 'm')]
491 : stop_mode: StopMode,
492 : }
493 :
494 : /// Restart local safekeeper.
495 : #[derive(clap::Args)]
496 : struct SafekeeperRestartCmdArgs {
497 : /// Safekeeper ID.
498 : #[arg(default_value_t = NodeId(1))]
499 : id: NodeId,
500 :
501 : /// If 'immediate', don't flush repository data on shutdown.
502 : #[arg(value_enum, default_value = "fast")]
503 : #[clap(short = 'm')]
504 : stop_mode: StopMode,
505 :
506 : /// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
507 : #[clap(short = 'e', long = "safekeeper-extra-opt")]
508 : extra_opt: Vec<String>,
509 :
510 : /// Timeout until we fail the command.
511 : #[clap(short = 't', long)]
512 : #[arg(default_value = "10s")]
513 : start_timeout: humantime::Duration,
514 : }
515 :
516 : /// Manage Postgres instances.
517 : #[derive(clap::Subcommand)]
518 : enum EndpointCmd {
519 : List(EndpointListCmdArgs),
520 : Create(EndpointCreateCmdArgs),
521 : Start(EndpointStartCmdArgs),
522 : Reconfigure(EndpointReconfigureCmdArgs),
523 : RefreshConfiguration(EndpointRefreshConfigurationArgs),
524 : Stop(EndpointStopCmdArgs),
525 : UpdatePageservers(EndpointUpdatePageserversCmdArgs),
526 : GenerateJwt(EndpointGenerateJwtCmdArgs),
527 : }
528 :
529 : /// List endpoints.
530 : #[derive(clap::Args)]
531 : struct EndpointListCmdArgs {
532 : /// Tenant ID, as a 32-byte hexadecimal string.
533 : #[clap(long = "tenant-id")]
534 : tenant_shard_id: Option<TenantShardId>,
535 : }
536 :
537 : /// Create a compute endpoint.
538 : #[derive(clap::Args)]
539 : struct EndpointCreateCmdArgs {
540 : /// Tenant ID, as a 32-byte hexadecimal string.
541 : #[clap(long = "tenant-id")]
542 : tenant_id: Option<TenantId>,
543 : /// Postgres endpoint ID.
544 : endpoint_id: Option<String>,
545 : /// Name of the branch the endpoint will run on.
546 : #[clap(long)]
547 : branch_name: Option<String>,
548 : /// Specify LSN on the timeline to start from. By default, end of the timeline would be used.
549 : #[clap(long)]
550 : lsn: Option<Lsn>,
551 : #[clap(long)]
552 : pg_port: Option<u16>,
553 : #[clap(long, alias = "http-port")]
554 : external_http_port: Option<u16>,
555 : #[clap(long)]
556 : internal_http_port: Option<u16>,
557 : #[clap(long = "pageserver-id")]
558 : endpoint_pageserver_id: Option<NodeId>,
559 :
560 : /// Don't do basebackup, create endpoint directory with only config files.
561 : #[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
562 : config_only: bool,
563 :
564 : /// Postgres version.
565 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
566 : #[clap(long)]
567 : pg_version: PgMajorVersion,
568 :
569 : /// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
570 : ///
571 : /// Specified on creation such that it's retained across reconfiguration and restarts.
572 : ///
573 : /// NB: not yet supported by computes.
574 : #[clap(long)]
575 : grpc: bool,
576 :
577 : /// If set, the node will be a hot replica on the specified timeline.
578 : #[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
579 : hot_standby: bool,
580 : /// If set, will set up the catalog for neon_superuser.
581 : #[clap(long)]
582 : update_catalog: bool,
583 : /// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
584 : /// useful for tests.
585 : #[clap(long)]
586 : allow_multiple: bool,
587 :
588 : /// Name of the privileged role for the endpoint.
589 : // Only allow changing it on creation.
590 : #[clap(long)]
591 : privileged_role_name: Option<String>,
592 : }
593 :
594 : /// Start Postgres. If the endpoint doesn't exist yet, it is created.
595 : #[derive(clap::Args)]
596 : struct EndpointStartCmdArgs {
597 : /// Postgres endpoint ID.
598 : endpoint_id: String,
599 : /// Pageserver ID.
600 : #[clap(long = "pageserver-id")]
601 : endpoint_pageserver_id: Option<NodeId>,
602 : /// Safekeepers membership generation to prefix neon.safekeepers with.
603 : #[clap(long)]
604 : safekeepers_generation: Option<u32>,
605 : /// List of safekeepers endpoint will talk to.
606 : #[clap(long)]
607 : safekeepers: Option<String>,
608 : /// Configure the remote extensions storage proxy gateway URL to request for extensions.
609 : #[clap(long, alias = "remote-ext-config")]
610 : remote_ext_base_url: Option<String>,
611 : /// If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`
612 : #[clap(long)]
613 : create_test_user: bool,
614 : /// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
615 : /// useful for tests.
616 : #[clap(long)]
617 : allow_multiple: bool,
618 : /// Timeout until we fail the command.
619 : #[clap(short = 't', long, value_parser= humantime::parse_duration)]
620 : #[arg(default_value = "90s")]
621 : start_timeout: Duration,
622 :
623 : /// Download LFC cache from endpoint storage on endpoint startup
624 : #[clap(long, default_value = "false")]
625 : autoprewarm: bool,
626 :
627 : /// Upload LFC cache to endpoint storage periodically
628 : #[clap(long)]
629 : offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
630 :
631 : /// Run in development mode, skipping VM-specific operations like process termination
632 : #[clap(long, action = clap::ArgAction::SetTrue)]
633 : dev: bool,
634 : }
635 :
636 : /// Reconfigure an endpoint.
637 : #[derive(clap::Args)]
638 : struct EndpointReconfigureCmdArgs {
639 : /// Tenant id. Represented as a hexadecimal string 32 symbols length
640 : #[clap(long = "tenant-id")]
641 : tenant_id: Option<TenantId>,
642 : /// Postgres endpoint ID.
643 : endpoint_id: String,
644 : /// Pageserver ID.
645 : #[clap(long = "pageserver-id")]
646 : endpoint_pageserver_id: Option<NodeId>,
647 : #[clap(long)]
648 : safekeepers: Option<String>,
649 : }
650 :
651 : /// Refresh the endpoint's configuration by forcing it reload it's spec
652 : #[derive(clap::Args)]
653 : struct EndpointRefreshConfigurationArgs {
654 : /// Postgres endpoint id
655 : endpoint_id: String,
656 : }
657 :
658 : /// Stop an endpoint.
659 : #[derive(clap::Args)]
660 : struct EndpointStopCmdArgs {
661 : /// Postgres endpoint ID.
662 : endpoint_id: String,
663 : /// Also delete data directory (now optional, should be default in future).
664 : #[clap(long)]
665 : destroy: bool,
666 :
667 : /// Postgres shutdown mode, passed to `pg_ctl -m <mode>`.
668 : #[clap(long)]
669 : #[clap(default_value = "fast")]
670 : mode: EndpointTerminateMode,
671 : }
672 :
673 : /// Update the pageservers in the spec file of the compute endpoint
674 : #[derive(clap::Args)]
675 : struct EndpointUpdatePageserversCmdArgs {
676 : /// Postgres endpoint id
677 : endpoint_id: String,
678 :
679 : /// Specified pageserver id
680 : #[clap(short = 'p', long)]
681 : pageserver_id: Option<NodeId>,
682 : }
683 :
684 : /// Generate a JWT for an endpoint.
685 : #[derive(clap::Args)]
686 : struct EndpointGenerateJwtCmdArgs {
687 : /// Postgres endpoint ID.
688 : endpoint_id: String,
689 : /// Scope to generate the JWT with.
690 : #[clap(short = 's', long, value_parser = ComputeClaimsScope::from_str)]
691 : scope: Option<ComputeClaimsScope>,
692 : }
693 :
694 : /// Manage neon_local branch name mappings.
695 : #[derive(clap::Subcommand)]
696 : enum MappingsCmd {
697 : Map(MappingsMapCmdArgs),
698 : }
699 :
700 : /// Create new mapping which cannot exist already.
701 : #[derive(clap::Args)]
702 : struct MappingsMapCmdArgs {
703 : /// Tenant ID, as a 32-byte hexadecimal string.
704 : #[clap(long)]
705 : tenant_id: TenantId,
706 : /// Timeline ID, as a 32-byte hexadecimal string.
707 : #[clap(long)]
708 : timeline_id: TimelineId,
709 : /// Branch name to give to the timeline.
710 : #[clap(long)]
711 : branch_name: String,
712 : }
713 :
714 : ///
715 : /// Timelines tree element used as a value in the HashMap.
716 : ///
717 : struct TimelineTreeEl {
718 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
719 : pub info: TimelineInfo,
720 : /// Name, recovered from neon config mappings
721 : pub name: Option<String>,
722 : /// Holds all direct children of this timeline referenced using `timeline_id`.
723 : pub children: BTreeSet<TimelineId>,
724 : }
725 :
726 : /// A flock-based guard over the neon_local repository directory
727 : struct RepoLock {
728 : _file: Flock<File>,
729 : }
730 :
731 : impl RepoLock {
732 0 : fn new() -> Result<Self> {
733 0 : let repo_dir = File::open(local_env::base_path())?;
734 0 : match Flock::lock(repo_dir, FlockArg::LockExclusive) {
735 0 : Ok(f) => Ok(Self { _file: f }),
736 0 : Err((_, e)) => Err(e).context("flock error"),
737 : }
738 0 : }
739 : }
740 :
741 : // Main entry point for the 'neon_local' CLI utility
742 : //
743 : // This utility helps to manage neon installation. That includes following:
744 : // * Management of local postgres installations running on top of the
745 : // pageserver.
746 : // * Providing CLI api to the pageserver
747 : // * TODO: export/import to/from usual postgres
748 0 : fn main() -> Result<()> {
749 0 : let cli = Cli::parse();
750 :
751 : // Check for 'neon init' command first.
752 0 : let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
753 0 : (handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
754 : } else {
755 : // This tool uses a collection of simple files to store its state, and consequently
756 : // it is not generally safe to run multiple commands concurrently. Rather than expect
757 : // all callers to know this, use a lock file to protect against concurrent execution.
758 0 : let _repo_lock = RepoLock::new().unwrap();
759 :
760 : // all other commands need an existing config
761 0 : let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
762 0 : let original_env = env.clone();
763 0 : let env = Box::leak(Box::new(env));
764 0 : let rt = tokio::runtime::Builder::new_current_thread()
765 0 : .enable_all()
766 0 : .build()
767 0 : .unwrap();
768 :
769 0 : let subcommand_result = match cli.command {
770 0 : NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"),
771 0 : NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)),
772 0 : NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)),
773 0 : NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)),
774 0 : NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)),
775 0 : NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)),
776 0 : NeonLocalCmd::StorageController(subcmd) => {
777 0 : rt.block_on(handle_storage_controller(&subcmd, env))
778 : }
779 0 : NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
780 0 : NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
781 0 : NeonLocalCmd::EndpointStorage(subcmd) => {
782 0 : rt.block_on(handle_endpoint_storage(&subcmd, env))
783 : }
784 0 : NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
785 0 : NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
786 : };
787 :
788 0 : let subcommand_result = if &original_env != env {
789 0 : subcommand_result.map(|()| Some(Cow::Borrowed(env)))
790 : } else {
791 0 : subcommand_result.map(|()| None)
792 : };
793 0 : (subcommand_result, Some(_repo_lock))
794 : };
795 :
796 0 : match subcommand_result {
797 0 : Ok(Some(updated_env)) => updated_env.persist_config()?,
798 0 : Ok(None) => (),
799 0 : Err(e) => {
800 0 : eprintln!("command failed: {e:?}");
801 0 : exit(1);
802 : }
803 : }
804 0 : Ok(())
805 0 : }
806 :
807 : ///
808 : /// Prints timelines list as a tree-like structure.
809 : ///
810 0 : fn print_timelines_tree(
811 0 : timelines: Vec<TimelineInfo>,
812 0 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
813 0 : ) -> Result<()> {
814 0 : let mut timelines_hash = timelines
815 0 : .iter()
816 0 : .map(|t| {
817 0 : (
818 0 : t.timeline_id,
819 0 : TimelineTreeEl {
820 0 : info: t.clone(),
821 0 : children: BTreeSet::new(),
822 0 : name: timeline_name_mappings
823 0 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
824 0 : },
825 0 : )
826 0 : })
827 0 : .collect::<HashMap<_, _>>();
828 :
829 : // Memorize all direct children of each timeline.
830 0 : for timeline in timelines.iter() {
831 0 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
832 0 : timelines_hash
833 0 : .get_mut(&ancestor_timeline_id)
834 0 : .context("missing timeline info in the HashMap")?
835 : .children
836 0 : .insert(timeline.timeline_id);
837 0 : }
838 : }
839 :
840 0 : for timeline in timelines_hash.values() {
841 : // Start with root local timelines (no ancestors) first.
842 0 : if timeline.info.ancestor_timeline_id.is_none() {
843 0 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
844 0 : }
845 : }
846 :
847 0 : Ok(())
848 0 : }
849 :
850 : ///
851 : /// Recursively prints timeline info with all its children.
852 : ///
853 0 : fn print_timeline(
854 0 : nesting_level: usize,
855 0 : is_last: &[bool],
856 0 : timeline: &TimelineTreeEl,
857 0 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
858 0 : ) -> Result<()> {
859 0 : if nesting_level > 0 {
860 0 : let ancestor_lsn = match timeline.info.ancestor_lsn {
861 0 : Some(lsn) => lsn.to_string(),
862 0 : None => "Unknown Lsn".to_string(),
863 : };
864 :
865 0 : let mut br_sym = "┣━";
866 :
867 : // Draw each nesting padding with proper style
868 : // depending on whether its timeline ended or not.
869 0 : if nesting_level > 1 {
870 0 : for l in &is_last[1..is_last.len() - 1] {
871 0 : if *l {
872 0 : print!(" ");
873 0 : } else {
874 0 : print!("┃ ");
875 0 : }
876 : }
877 0 : }
878 :
879 : // We are the last in this sub-timeline
880 0 : if *is_last.last().unwrap() {
881 0 : br_sym = "┗━";
882 0 : }
883 :
884 0 : print!("{br_sym} @{ancestor_lsn}: ");
885 0 : }
886 :
887 : // Finally print a timeline id and name with new line
888 0 : println!(
889 0 : "{} [{}]",
890 0 : timeline.name.as_deref().unwrap_or("_no_name_"),
891 : timeline.info.timeline_id
892 : );
893 :
894 0 : let len = timeline.children.len();
895 0 : let mut i: usize = 0;
896 0 : let mut is_last_new = Vec::from(is_last);
897 0 : is_last_new.push(false);
898 :
899 0 : for child in &timeline.children {
900 0 : i += 1;
901 :
902 : // Mark that the last padding is the end of the timeline
903 0 : if i == len {
904 0 : if let Some(last) = is_last_new.last_mut() {
905 0 : *last = true;
906 0 : }
907 0 : }
908 :
909 0 : print_timeline(
910 0 : nesting_level + 1,
911 0 : &is_last_new,
912 0 : timelines
913 0 : .get(child)
914 0 : .context("missing timeline info in the HashMap")?,
915 0 : timelines,
916 0 : )?;
917 : }
918 :
919 0 : Ok(())
920 0 : }
921 :
922 : /// Helper function to get tenant id from an optional --tenant_id option or from the config file
923 0 : fn get_tenant_id(
924 0 : tenant_id_arg: Option<TenantId>,
925 0 : env: &local_env::LocalEnv,
926 0 : ) -> anyhow::Result<TenantId> {
927 0 : if let Some(tenant_id_from_arguments) = tenant_id_arg {
928 0 : Ok(tenant_id_from_arguments)
929 0 : } else if let Some(default_id) = env.default_tenant_id {
930 0 : Ok(default_id)
931 : } else {
932 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
933 : }
934 0 : }
935 :
936 : /// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file,
937 : /// for commands that accept a shard suffix
938 0 : fn get_tenant_shard_id(
939 0 : tenant_shard_id_arg: Option<TenantShardId>,
940 0 : env: &local_env::LocalEnv,
941 0 : ) -> anyhow::Result<TenantShardId> {
942 0 : if let Some(tenant_id_from_arguments) = tenant_shard_id_arg {
943 0 : Ok(tenant_id_from_arguments)
944 0 : } else if let Some(default_id) = env.default_tenant_id {
945 0 : Ok(TenantShardId::unsharded(default_id))
946 : } else {
947 0 : anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
948 : }
949 0 : }
950 :
951 0 : fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
952 : // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
953 0 : let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
954 : // User (likely the Python test suite) provided a description of the environment.
955 0 : if args.num_pageservers.is_some() {
956 0 : bail!(
957 0 : "Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
958 : );
959 0 : }
960 : // load and parse the file
961 0 : let contents = std::fs::read_to_string(config_path).with_context(|| {
962 0 : format!(
963 0 : "Could not read configuration file '{}'",
964 0 : config_path.display()
965 : )
966 0 : })?;
967 0 : toml_edit::de::from_str(&contents)?
968 : } else {
969 : // User (likely interactive) did not provide a description of the environment, give them the default
970 : NeonLocalInitConf {
971 0 : control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
972 0 : broker: NeonBroker {
973 0 : listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
974 0 : listen_https_addr: None,
975 0 : },
976 0 : safekeepers: vec![SafekeeperConf {
977 0 : id: DEFAULT_SAFEKEEPER_ID,
978 0 : pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
979 0 : http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
980 0 : ..Default::default()
981 0 : }],
982 0 : pageservers: (0..args.num_pageservers.unwrap_or(1))
983 0 : .map(|i| {
984 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
985 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
986 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
987 0 : let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
988 0 : NeonLocalInitPageserverConf {
989 0 : id: pageserver_id,
990 0 : listen_pg_addr: format!("127.0.0.1:{pg_port}"),
991 0 : listen_http_addr: format!("127.0.0.1:{http_port}"),
992 0 : listen_https_addr: None,
993 0 : listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
994 0 : pg_auth_type: AuthType::Trust,
995 0 : http_auth_type: AuthType::Trust,
996 0 : grpc_auth_type: AuthType::Trust,
997 0 : other: Default::default(),
998 0 : // Typical developer machines use disks with slow fsync, and we don't care
999 0 : // about data integrity: disable disk syncs.
1000 0 : no_sync: true,
1001 0 : }
1002 0 : })
1003 0 : .collect(),
1004 0 : endpoint_storage: EndpointStorageConf {
1005 0 : listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
1006 0 : },
1007 0 : pg_distrib_dir: None,
1008 0 : neon_distrib_dir: None,
1009 0 : default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
1010 0 : storage_controller: None,
1011 0 : control_plane_hooks_api: None,
1012 : generate_local_ssl_certs: false,
1013 : }
1014 : };
1015 :
1016 0 : LocalEnv::init(init_conf, &args.force)
1017 0 : .context("materialize initial neon_local environment on disk")?;
1018 0 : Ok(LocalEnv::load_config(&local_env::base_path())
1019 0 : .expect("freshly written config should be loadable"))
1020 0 : }
1021 :
1022 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
1023 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
1024 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
1025 : /// than this CLI.
1026 0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
1027 0 : let ps_conf = env
1028 0 : .pageservers
1029 0 : .first()
1030 0 : .expect("Config is validated to contain at least one pageserver");
1031 0 : PageServerNode::from_env(env, ps_conf)
1032 0 : }
1033 :
1034 0 : async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
1035 0 : let pageserver = get_default_pageserver(env);
1036 0 : match subcmd {
1037 : TenantCmd::List => {
1038 0 : for t in pageserver.tenant_list().await? {
1039 0 : println!("{} {:?}", t.id, t.state);
1040 0 : }
1041 : }
1042 0 : TenantCmd::Import(args) => {
1043 0 : let tenant_id = args.tenant_id;
1044 :
1045 0 : let storage_controller = StorageController::from_env(env);
1046 0 : let create_response = storage_controller.tenant_import(tenant_id).await?;
1047 :
1048 0 : let shard_zero = create_response
1049 0 : .shards
1050 0 : .first()
1051 0 : .expect("Import response omitted shards");
1052 :
1053 0 : let attached_pageserver_id = shard_zero.node_id;
1054 0 : let pageserver =
1055 0 : PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
1056 :
1057 0 : println!(
1058 0 : "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
1059 : );
1060 :
1061 0 : let timelines = pageserver
1062 0 : .http_client
1063 0 : .list_timelines(shard_zero.shard_id)
1064 0 : .await?;
1065 :
1066 : // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
1067 0 : let main_timeline = timelines
1068 0 : .iter()
1069 0 : .find(|t| t.ancestor_timeline_id.is_none())
1070 0 : .expect("No timelines found")
1071 : .timeline_id;
1072 :
1073 0 : let mut branch_i = 0;
1074 0 : for timeline in timelines.iter() {
1075 0 : let branch_name = if timeline.timeline_id == main_timeline {
1076 0 : "main".to_string()
1077 : } else {
1078 0 : branch_i += 1;
1079 0 : format!("branch_{branch_i}")
1080 : };
1081 :
1082 0 : println!(
1083 0 : "Importing timeline {tenant_id}/{} as branch {branch_name}",
1084 : timeline.timeline_id
1085 : );
1086 :
1087 0 : env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
1088 : }
1089 : }
1090 0 : TenantCmd::Create(args) => {
1091 0 : let tenant_conf: HashMap<_, _> =
1092 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1093 :
1094 0 : let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
1095 :
1096 : // If tenant ID was not specified, generate one
1097 0 : let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate);
1098 :
1099 : // We must register the tenant with the storage controller, so
1100 : // that when the pageserver restarts, it will be re-attached.
1101 0 : let storage_controller = StorageController::from_env(env);
1102 0 : storage_controller
1103 0 : .tenant_create(TenantCreateRequest {
1104 0 : // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
1105 0 : // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest
1106 0 : // type is used both in the storage controller (for creating tenants) and in the pageserver (for
1107 0 : // creating shards)
1108 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
1109 0 : generation: None,
1110 0 : shard_parameters: ShardParameters {
1111 0 : count: ShardCount::new(args.shard_count),
1112 0 : stripe_size: args
1113 0 : .shard_stripe_size
1114 0 : .map(ShardStripeSize)
1115 0 : .unwrap_or(DEFAULT_STRIPE_SIZE),
1116 0 : },
1117 0 : placement_policy: args.placement_policy.clone(),
1118 0 : config: tenant_conf,
1119 0 : })
1120 0 : .await?;
1121 0 : println!("tenant {tenant_id} successfully created on the pageserver");
1122 :
1123 : // Create an initial timeline for the new tenant
1124 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1125 :
1126 : // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
1127 : // different shards picking different start lsns. Maybe we have to teach storage controller
1128 : // to let shard 0 branch first and then propagate the chosen LSN to other shards.
1129 0 : storage_controller
1130 0 : .tenant_timeline_create(
1131 0 : tenant_id,
1132 0 : TimelineCreateRequest {
1133 0 : new_timeline_id,
1134 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1135 0 : existing_initdb_timeline_id: None,
1136 0 : pg_version: Some(args.pg_version),
1137 0 : },
1138 0 : },
1139 0 : )
1140 0 : .await?;
1141 :
1142 0 : env.register_branch_mapping(
1143 0 : DEFAULT_BRANCH_NAME.to_string(),
1144 0 : tenant_id,
1145 0 : new_timeline_id,
1146 0 : )?;
1147 :
1148 0 : println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
1149 :
1150 0 : if args.set_default {
1151 0 : println!("Setting tenant {tenant_id} as a default one");
1152 0 : env.default_tenant_id = Some(tenant_id);
1153 0 : }
1154 : }
1155 0 : TenantCmd::SetDefault(args) => {
1156 0 : println!("Setting tenant {} as a default one", args.tenant_id);
1157 0 : env.default_tenant_id = Some(args.tenant_id);
1158 0 : }
1159 0 : TenantCmd::Config(args) => {
1160 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1161 0 : let tenant_conf: HashMap<_, _> =
1162 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1163 0 : let config = PageServerNode::parse_config(tenant_conf)?;
1164 :
1165 0 : let req = TenantConfigRequest { tenant_id, config };
1166 :
1167 0 : let storage_controller = StorageController::from_env(env);
1168 0 : storage_controller
1169 0 : .set_tenant_config(&req)
1170 0 : .await
1171 0 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
1172 0 : println!("tenant {tenant_id} successfully configured via storcon");
1173 : }
1174 : }
1175 0 : Ok(())
1176 0 : }
1177 :
1178 0 : async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1179 0 : let pageserver = get_default_pageserver(env);
1180 :
1181 0 : match cmd {
1182 0 : TimelineCmd::List(args) => {
1183 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1184 : // where shard 0 is attached, and query there.
1185 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1186 0 : let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
1187 0 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
1188 : }
1189 0 : TimelineCmd::Create(args) => {
1190 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1191 0 : let new_branch_name = &args.branch_name;
1192 0 : let new_timeline_id_opt = args.timeline_id;
1193 0 : let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
1194 :
1195 0 : let storage_controller = StorageController::from_env(env);
1196 0 : let create_req = TimelineCreateRequest {
1197 0 : new_timeline_id,
1198 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1199 0 : existing_initdb_timeline_id: None,
1200 0 : pg_version: Some(args.pg_version),
1201 0 : },
1202 0 : };
1203 0 : let timeline_info = storage_controller
1204 0 : .tenant_timeline_create(tenant_id, create_req)
1205 0 : .await?;
1206 :
1207 0 : let last_record_lsn = timeline_info.last_record_lsn;
1208 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1209 :
1210 0 : println!(
1211 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
1212 : timeline_info.timeline_id
1213 : );
1214 : }
1215 : // TODO: rename to import-basebackup-plus-wal
1216 0 : TimelineCmd::Import(args) => {
1217 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1218 0 : let timeline_id = args.timeline_id;
1219 0 : let branch_name = &args.branch_name;
1220 :
1221 : // Parse base inputs
1222 0 : let base = (args.base_lsn, args.base_tarfile.clone());
1223 :
1224 : // Parse pg_wal inputs
1225 0 : let wal_tarfile = args.wal_tarfile.clone();
1226 0 : let end_lsn = args.end_lsn;
1227 : // TODO validate both or none are provided
1228 0 : let pg_wal = end_lsn.zip(wal_tarfile);
1229 :
1230 0 : println!("Importing timeline into pageserver ...");
1231 0 : pageserver
1232 0 : .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
1233 0 : .await?;
1234 0 : if env.storage_controller.timelines_onto_safekeepers {
1235 0 : println!("Creating timeline on safekeeper ...");
1236 0 : let timeline_info = pageserver
1237 0 : .timeline_info(
1238 0 : TenantShardId::unsharded(tenant_id),
1239 0 : timeline_id,
1240 0 : pageserver_client::mgmt_api::ForceAwaitLogicalSize::No,
1241 0 : )
1242 0 : .await?;
1243 0 : let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap());
1244 0 : let default_host = default_sk
1245 0 : .conf
1246 0 : .listen_addr
1247 0 : .clone()
1248 0 : .unwrap_or_else(|| "localhost".to_string());
1249 0 : let mconf = safekeeper_api::membership::Configuration {
1250 0 : generation: SafekeeperGeneration::new(1),
1251 0 : members: safekeeper_api::membership::MemberSet {
1252 0 : m: vec![SafekeeperId {
1253 0 : host: default_host,
1254 0 : id: default_sk.conf.id,
1255 0 : pg_port: default_sk.conf.pg_port,
1256 0 : }],
1257 0 : },
1258 0 : new_members: None,
1259 0 : };
1260 0 : let pg_version = PgVersionId::from(args.pg_version);
1261 0 : let req = safekeeper_api::models::TimelineCreateRequest {
1262 0 : tenant_id,
1263 0 : timeline_id,
1264 0 : mconf,
1265 0 : pg_version,
1266 0 : system_id: None,
1267 0 : wal_seg_size: None,
1268 0 : start_lsn: timeline_info.last_record_lsn,
1269 0 : commit_lsn: None,
1270 0 : };
1271 0 : default_sk.create_timeline(&req).await?;
1272 0 : }
1273 0 : env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
1274 0 : println!("Done");
1275 : }
1276 0 : TimelineCmd::Branch(args) => {
1277 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1278 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1279 0 : let new_branch_name = &args.branch_name;
1280 0 : let ancestor_branch_name = args
1281 0 : .ancestor_branch_name
1282 0 : .clone()
1283 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1284 0 : let ancestor_timeline_id = env
1285 0 : .get_branch_timeline_id(&ancestor_branch_name, tenant_id)
1286 0 : .ok_or_else(|| {
1287 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
1288 0 : })?;
1289 :
1290 0 : let start_lsn = args.ancestor_start_lsn;
1291 0 : let storage_controller = StorageController::from_env(env);
1292 0 : let create_req = TimelineCreateRequest {
1293 0 : new_timeline_id,
1294 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
1295 0 : ancestor_timeline_id,
1296 0 : ancestor_start_lsn: start_lsn,
1297 0 : read_only: false,
1298 0 : pg_version: None,
1299 0 : },
1300 0 : };
1301 0 : let timeline_info = storage_controller
1302 0 : .tenant_timeline_create(tenant_id, create_req)
1303 0 : .await?;
1304 :
1305 0 : let last_record_lsn = timeline_info.last_record_lsn;
1306 :
1307 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1308 :
1309 0 : println!(
1310 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
1311 : timeline_info.timeline_id
1312 : );
1313 : }
1314 : }
1315 :
1316 0 : Ok(())
1317 0 : }
1318 :
1319 0 : async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> {
1320 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
1321 :
1322 0 : match subcmd {
1323 0 : EndpointCmd::List(args) => {
1324 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1325 : // where shard 0 is attached, and query there.
1326 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1327 :
1328 0 : let timeline_name_mappings = env.timeline_name_mappings();
1329 :
1330 0 : let mut table = comfy_table::Table::new();
1331 :
1332 0 : table.load_preset(comfy_table::presets::NOTHING);
1333 :
1334 0 : table.set_header([
1335 0 : "ENDPOINT",
1336 0 : "ADDRESS",
1337 0 : "TIMELINE",
1338 0 : "BRANCH NAME",
1339 0 : "LSN",
1340 0 : "STATUS",
1341 0 : ]);
1342 :
1343 0 : for (endpoint_id, endpoint) in cplane
1344 0 : .endpoints
1345 0 : .iter()
1346 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
1347 : {
1348 0 : let lsn_str = match endpoint.mode {
1349 0 : ComputeMode::Static(lsn) => {
1350 : // -> read-only endpoint
1351 : // Use the node's LSN.
1352 0 : lsn.to_string()
1353 : }
1354 : _ => {
1355 : // As the LSN here refers to the one that the compute is started with,
1356 : // we display nothing as it is a primary/hot standby compute.
1357 0 : "---".to_string()
1358 : }
1359 : };
1360 :
1361 0 : let branch_name = timeline_name_mappings
1362 0 : .get(&TenantTimelineId::new(
1363 0 : tenant_shard_id.tenant_id,
1364 0 : endpoint.timeline_id,
1365 0 : ))
1366 0 : .map(|name| name.as_str())
1367 0 : .unwrap_or("?");
1368 :
1369 0 : table.add_row([
1370 0 : endpoint_id.as_str(),
1371 0 : &endpoint.pg_address.to_string(),
1372 0 : &endpoint.timeline_id.to_string(),
1373 0 : branch_name,
1374 0 : lsn_str.as_str(),
1375 0 : &format!("{}", endpoint.status()),
1376 0 : ]);
1377 : }
1378 :
1379 0 : println!("{table}");
1380 : }
1381 0 : EndpointCmd::Create(args) => {
1382 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1383 0 : let branch_name = args
1384 0 : .branch_name
1385 0 : .clone()
1386 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1387 0 : let endpoint_id = args
1388 0 : .endpoint_id
1389 0 : .clone()
1390 0 : .unwrap_or_else(|| format!("ep-{branch_name}"));
1391 :
1392 0 : let timeline_id = env
1393 0 : .get_branch_timeline_id(&branch_name, tenant_id)
1394 0 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
1395 :
1396 0 : let mode = match (args.lsn, args.hot_standby) {
1397 0 : (Some(lsn), false) => ComputeMode::Static(lsn),
1398 0 : (None, true) => ComputeMode::Replica,
1399 0 : (None, false) => ComputeMode::Primary,
1400 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
1401 : };
1402 :
1403 0 : match (mode, args.hot_standby) {
1404 : (ComputeMode::Static(_), true) => {
1405 0 : bail!(
1406 0 : "Cannot start a node in hot standby mode when it is already configured as a static replica"
1407 : )
1408 : }
1409 : (ComputeMode::Primary, true) => {
1410 0 : bail!(
1411 0 : "Cannot start a node as a hot standby replica, it is already configured as primary node"
1412 : )
1413 : }
1414 0 : _ => {}
1415 : }
1416 :
1417 0 : if !args.allow_multiple {
1418 0 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
1419 0 : }
1420 :
1421 0 : cplane.new_endpoint(
1422 0 : &endpoint_id,
1423 0 : tenant_id,
1424 0 : timeline_id,
1425 0 : args.pg_port,
1426 0 : args.external_http_port,
1427 0 : args.internal_http_port,
1428 0 : args.pg_version,
1429 0 : mode,
1430 0 : args.grpc,
1431 0 : !args.update_catalog,
1432 : false,
1433 0 : args.privileged_role_name.clone(),
1434 0 : )?;
1435 : }
1436 0 : EndpointCmd::Start(args) => {
1437 0 : let endpoint_id = &args.endpoint_id;
1438 0 : let pageserver_id = args.endpoint_pageserver_id;
1439 0 : let remote_ext_base_url = &args.remote_ext_base_url;
1440 :
1441 0 : let default_generation = env
1442 0 : .storage_controller
1443 0 : .timelines_onto_safekeepers
1444 0 : .then_some(1);
1445 0 : let safekeepers_generation = args
1446 0 : .safekeepers_generation
1447 0 : .or(default_generation)
1448 0 : .map(SafekeeperGeneration::new);
1449 : // If --safekeepers argument is given, use only the listed
1450 : // safekeeper nodes; otherwise all from the env.
1451 0 : let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
1452 0 : safekeepers
1453 : } else {
1454 0 : env.safekeepers.iter().map(|sk| sk.id).collect()
1455 : };
1456 :
1457 0 : let endpoint = cplane
1458 0 : .endpoints
1459 0 : .get(endpoint_id.as_str())
1460 0 : .ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
1461 :
1462 0 : if !args.allow_multiple {
1463 0 : cplane.check_conflicting_endpoints(
1464 0 : endpoint.mode,
1465 0 : endpoint.tenant_id,
1466 0 : endpoint.timeline_id,
1467 0 : )?;
1468 0 : }
1469 :
1470 0 : let prefer_protocol = if endpoint.grpc {
1471 0 : PageserverProtocol::Grpc
1472 : } else {
1473 0 : PageserverProtocol::Libpq
1474 : };
1475 :
1476 0 : let mut pageserver_conninfo = if let Some(ps_id) = pageserver_id {
1477 0 : let conf = env.get_pageserver_conf(ps_id).unwrap();
1478 0 : local_pageserver_conf_to_conn_info(conf)?
1479 : } else {
1480 : // Look up the currently attached location of the tenant, and its striping metadata,
1481 : // to pass these on to postgres.
1482 0 : let storage_controller = StorageController::from_env(env);
1483 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
1484 0 : assert!(!locate_result.shards.is_empty());
1485 :
1486 : // Initialize LSN leases for static computes.
1487 0 : if let ComputeMode::Static(lsn) = endpoint.mode {
1488 0 : futures::future::try_join_all(locate_result.shards.iter().map(
1489 0 : |shard| async move {
1490 0 : let conf = env.get_pageserver_conf(shard.node_id).unwrap();
1491 0 : let pageserver = PageServerNode::from_env(env, conf);
1492 :
1493 0 : pageserver
1494 0 : .http_client
1495 0 : .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
1496 0 : .await
1497 0 : },
1498 : ))
1499 0 : .await?;
1500 0 : }
1501 :
1502 0 : tenant_locate_response_to_conn_info(&locate_result)?
1503 : };
1504 0 : pageserver_conninfo.prefer_protocol = prefer_protocol;
1505 :
1506 0 : let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
1507 0 : let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
1508 0 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
1509 :
1510 0 : Some(env.generate_auth_token(&claims)?)
1511 : } else {
1512 0 : None
1513 : };
1514 :
1515 0 : let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
1516 0 : + Duration::from_secs(86400))
1517 0 : .as_secs();
1518 0 : let claims = endpoint_storage::claims::EndpointStorageClaims {
1519 0 : tenant_id: endpoint.tenant_id,
1520 0 : timeline_id: endpoint.timeline_id,
1521 0 : endpoint_id: endpoint_id.to_string(),
1522 0 : exp,
1523 0 : };
1524 :
1525 0 : let endpoint_storage_token = env.generate_auth_token(&claims)?;
1526 0 : let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
1527 :
1528 0 : let args = control_plane::endpoint::EndpointStartArgs {
1529 0 : auth_token,
1530 0 : endpoint_storage_token,
1531 0 : endpoint_storage_addr,
1532 0 : safekeepers_generation,
1533 0 : safekeepers,
1534 0 : pageserver_conninfo,
1535 0 : remote_ext_base_url: remote_ext_base_url.clone(),
1536 0 : create_test_user: args.create_test_user,
1537 0 : start_timeout: args.start_timeout,
1538 0 : autoprewarm: args.autoprewarm,
1539 0 : offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
1540 0 : dev: args.dev,
1541 0 : };
1542 :
1543 0 : println!("Starting existing endpoint {endpoint_id}...");
1544 0 : endpoint.start(args).await?;
1545 : }
1546 0 : EndpointCmd::UpdatePageservers(args) => {
1547 0 : let endpoint_id = &args.endpoint_id;
1548 0 : let endpoint = cplane
1549 0 : .endpoints
1550 0 : .get(endpoint_id.as_str())
1551 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1552 0 : let prefer_protocol = if endpoint.grpc {
1553 0 : PageserverProtocol::Grpc
1554 : } else {
1555 0 : PageserverProtocol::Libpq
1556 : };
1557 0 : let mut pageserver_conninfo = match args.pageserver_id {
1558 0 : Some(pageserver_id) => {
1559 0 : let conf = env.get_pageserver_conf(pageserver_id)?;
1560 0 : local_pageserver_conf_to_conn_info(conf)?
1561 : }
1562 : None => {
1563 0 : let storage_controller = StorageController::from_env(env);
1564 0 : let locate_result =
1565 0 : storage_controller.tenant_locate(endpoint.tenant_id).await?;
1566 :
1567 0 : tenant_locate_response_to_conn_info(&locate_result)?
1568 : }
1569 : };
1570 0 : pageserver_conninfo.prefer_protocol = prefer_protocol;
1571 :
1572 0 : endpoint
1573 0 : .update_pageservers_in_config(&pageserver_conninfo)
1574 0 : .await?;
1575 : }
1576 0 : EndpointCmd::Reconfigure(args) => {
1577 0 : let endpoint_id = &args.endpoint_id;
1578 0 : let endpoint = cplane
1579 0 : .endpoints
1580 0 : .get(endpoint_id.as_str())
1581 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1582 :
1583 0 : let prefer_protocol = if endpoint.grpc {
1584 0 : PageserverProtocol::Grpc
1585 : } else {
1586 0 : PageserverProtocol::Libpq
1587 : };
1588 0 : let mut pageserver_conninfo = if let Some(ps_id) = args.endpoint_pageserver_id {
1589 0 : let conf = env.get_pageserver_conf(ps_id)?;
1590 0 : local_pageserver_conf_to_conn_info(conf)?
1591 : } else {
1592 : // Look up the currently attached location of the tenant, and its striping metadata,
1593 : // to pass these on to postgres.
1594 0 : let storage_controller = StorageController::from_env(env);
1595 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
1596 :
1597 0 : tenant_locate_response_to_conn_info(&locate_result)?
1598 : };
1599 0 : pageserver_conninfo.prefer_protocol = prefer_protocol;
1600 :
1601 : // If --safekeepers argument is given, use only the listed
1602 : // safekeeper nodes; otherwise all from the env.
1603 0 : let safekeepers = parse_safekeepers(&args.safekeepers)?;
1604 0 : endpoint
1605 0 : .reconfigure(Some(&pageserver_conninfo), safekeepers, None)
1606 0 : .await?;
1607 : }
1608 0 : EndpointCmd::RefreshConfiguration(args) => {
1609 0 : let endpoint_id = &args.endpoint_id;
1610 0 : let endpoint = cplane
1611 0 : .endpoints
1612 0 : .get(endpoint_id.as_str())
1613 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1614 0 : endpoint.refresh_configuration().await?;
1615 : }
1616 0 : EndpointCmd::Stop(args) => {
1617 0 : let endpoint_id = &args.endpoint_id;
1618 0 : let endpoint = cplane
1619 0 : .endpoints
1620 0 : .get(endpoint_id)
1621 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1622 0 : match endpoint.stop(args.mode, args.destroy).await?.lsn {
1623 0 : Some(lsn) => println!("{lsn}"),
1624 0 : None => println!("null"),
1625 : }
1626 : }
1627 0 : EndpointCmd::GenerateJwt(args) => {
1628 0 : let endpoint = {
1629 0 : let endpoint_id = &args.endpoint_id;
1630 :
1631 0 : cplane
1632 0 : .endpoints
1633 0 : .get(endpoint_id)
1634 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
1635 : };
1636 :
1637 0 : let jwt = endpoint.generate_jwt(args.scope)?;
1638 :
1639 0 : print!("{jwt}");
1640 : }
1641 : }
1642 :
1643 0 : Ok(())
1644 0 : }
1645 :
1646 : /// Parse --safekeepers as list of safekeeper ids.
1647 0 : fn parse_safekeepers(safekeepers_str: &Option<String>) -> Result<Option<Vec<NodeId>>> {
1648 0 : if let Some(safekeepers_str) = safekeepers_str {
1649 0 : let mut safekeepers: Vec<NodeId> = Vec::new();
1650 0 : for sk_id in safekeepers_str.split(',').map(str::trim) {
1651 0 : let sk_id = NodeId(
1652 0 : u64::from_str(sk_id)
1653 0 : .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
1654 : );
1655 0 : safekeepers.push(sk_id);
1656 : }
1657 0 : Ok(Some(safekeepers))
1658 : } else {
1659 0 : Ok(None)
1660 : }
1661 0 : }
1662 :
1663 0 : fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1664 0 : match subcmd {
1665 0 : MappingsCmd::Map(args) => {
1666 0 : env.register_branch_mapping(
1667 0 : args.branch_name.to_owned(),
1668 0 : args.tenant_id,
1669 0 : args.timeline_id,
1670 0 : )?;
1671 :
1672 0 : Ok(())
1673 : }
1674 : }
1675 0 : }
1676 :
1677 0 : fn get_pageserver(
1678 0 : env: &local_env::LocalEnv,
1679 0 : pageserver_id_arg: Option<NodeId>,
1680 0 : ) -> Result<PageServerNode> {
1681 0 : let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID);
1682 :
1683 0 : Ok(PageServerNode::from_env(
1684 0 : env,
1685 0 : env.get_pageserver_conf(node_id)?,
1686 : ))
1687 0 : }
1688 :
1689 0 : async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> {
1690 0 : match subcmd {
1691 0 : PageserverCmd::Start(args) => {
1692 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?
1693 0 : .start(&args.start_timeout)
1694 0 : .await
1695 : {
1696 0 : eprintln!("pageserver start failed: {e}");
1697 0 : exit(1);
1698 0 : }
1699 : }
1700 :
1701 0 : PageserverCmd::Stop(args) => {
1702 0 : let immediate = match args.stop_mode {
1703 0 : StopMode::Fast => false,
1704 0 : StopMode::Immediate => true,
1705 : };
1706 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
1707 0 : eprintln!("pageserver stop failed: {e}");
1708 0 : exit(1);
1709 0 : }
1710 : }
1711 :
1712 0 : PageserverCmd::Restart(args) => {
1713 0 : let pageserver = get_pageserver(env, args.pageserver_id)?;
1714 : //TODO what shutdown strategy should we use here?
1715 0 : if let Err(e) = pageserver.stop(false) {
1716 0 : eprintln!("pageserver stop failed: {e}");
1717 0 : exit(1);
1718 0 : }
1719 :
1720 0 : if let Err(e) = pageserver.start(&args.start_timeout).await {
1721 0 : eprintln!("pageserver start failed: {e}");
1722 0 : exit(1);
1723 0 : }
1724 : }
1725 :
1726 0 : PageserverCmd::Status(args) => {
1727 0 : match get_pageserver(env, args.pageserver_id)?
1728 0 : .check_status()
1729 0 : .await
1730 : {
1731 0 : Ok(_) => println!("Page server is up and running"),
1732 0 : Err(err) => {
1733 0 : eprintln!("Page server is not available: {err}");
1734 0 : exit(1);
1735 : }
1736 : }
1737 : }
1738 : }
1739 0 : Ok(())
1740 0 : }
1741 :
1742 0 : async fn handle_storage_controller(
1743 0 : subcmd: &StorageControllerCmd,
1744 0 : env: &local_env::LocalEnv,
1745 0 : ) -> Result<()> {
1746 0 : let svc = StorageController::from_env(env);
1747 0 : match subcmd {
1748 0 : StorageControllerCmd::Start(args) => {
1749 0 : let start_args = NeonStorageControllerStartArgs {
1750 0 : instance_id: args.instance_id,
1751 0 : base_port: args.base_port,
1752 0 : start_timeout: args.start_timeout,
1753 0 : handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
1754 0 : };
1755 :
1756 0 : if let Err(e) = svc.start(start_args).await {
1757 0 : eprintln!("start failed: {e}");
1758 0 : exit(1);
1759 0 : }
1760 : }
1761 :
1762 0 : StorageControllerCmd::Stop(args) => {
1763 0 : let stop_args = NeonStorageControllerStopArgs {
1764 0 : instance_id: args.instance_id,
1765 0 : immediate: match args.stop_mode {
1766 0 : StopMode::Fast => false,
1767 0 : StopMode::Immediate => true,
1768 : },
1769 : };
1770 0 : if let Err(e) = svc.stop(stop_args).await {
1771 0 : eprintln!("stop failed: {e}");
1772 0 : exit(1);
1773 0 : }
1774 : }
1775 : }
1776 0 : Ok(())
1777 0 : }
1778 :
1779 0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1780 0 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1781 0 : Ok(SafekeeperNode::from_env(env, node))
1782 : } else {
1783 0 : bail!("could not find safekeeper {id}")
1784 : }
1785 0 : }
1786 :
1787 0 : async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> {
1788 0 : match subcmd {
1789 0 : SafekeeperCmd::Start(args) => {
1790 0 : let safekeeper = get_safekeeper(env, args.id)?;
1791 :
1792 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1793 0 : eprintln!("safekeeper start failed: {e}");
1794 0 : exit(1);
1795 0 : }
1796 : }
1797 :
1798 0 : SafekeeperCmd::Stop(args) => {
1799 0 : let safekeeper = get_safekeeper(env, args.id)?;
1800 0 : let immediate = match args.stop_mode {
1801 0 : StopMode::Fast => false,
1802 0 : StopMode::Immediate => true,
1803 : };
1804 0 : if let Err(e) = safekeeper.stop(immediate) {
1805 0 : eprintln!("safekeeper stop failed: {e}");
1806 0 : exit(1);
1807 0 : }
1808 : }
1809 :
1810 0 : SafekeeperCmd::Restart(args) => {
1811 0 : let safekeeper = get_safekeeper(env, args.id)?;
1812 0 : let immediate = match args.stop_mode {
1813 0 : StopMode::Fast => false,
1814 0 : StopMode::Immediate => true,
1815 : };
1816 :
1817 0 : if let Err(e) = safekeeper.stop(immediate) {
1818 0 : eprintln!("safekeeper stop failed: {e}");
1819 0 : exit(1);
1820 0 : }
1821 :
1822 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1823 0 : eprintln!("safekeeper start failed: {e}");
1824 0 : exit(1);
1825 0 : }
1826 : }
1827 : }
1828 0 : Ok(())
1829 0 : }
1830 :
1831 0 : async fn handle_endpoint_storage(
1832 0 : subcmd: &EndpointStorageCmd,
1833 0 : env: &local_env::LocalEnv,
1834 0 : ) -> Result<()> {
1835 : use EndpointStorageCmd::*;
1836 0 : let storage = EndpointStorage::from_env(env);
1837 :
1838 : // In tests like test_forward_compatibility or test_graceful_cluster_restart
1839 : // old neon binaries (without endpoint_storage) are present
1840 0 : if !storage.bin.exists() {
1841 0 : eprintln!(
1842 0 : "{} binary not found. Ignore if this is a compatibility test",
1843 : storage.bin
1844 : );
1845 0 : return Ok(());
1846 0 : }
1847 :
1848 0 : match subcmd {
1849 0 : Start(EndpointStorageStartCmd { start_timeout }) => {
1850 0 : if let Err(e) = storage.start(start_timeout).await {
1851 0 : eprintln!("endpoint_storage start failed: {e}");
1852 0 : exit(1);
1853 0 : }
1854 : }
1855 0 : Stop(EndpointStorageStopCmd { stop_mode }) => {
1856 0 : let immediate = match stop_mode {
1857 0 : StopMode::Fast => false,
1858 0 : StopMode::Immediate => true,
1859 : };
1860 0 : if let Err(e) = storage.stop(immediate) {
1861 0 : eprintln!("proxy stop failed: {e}");
1862 0 : exit(1);
1863 0 : }
1864 : }
1865 : };
1866 0 : Ok(())
1867 0 : }
1868 :
1869 0 : async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
1870 0 : match subcmd {
1871 0 : StorageBrokerCmd::Start(args) => {
1872 0 : let storage_broker = StorageBroker::from_env(env);
1873 0 : if let Err(e) = storage_broker.start(&args.start_timeout).await {
1874 0 : eprintln!("broker start failed: {e}");
1875 0 : exit(1);
1876 0 : }
1877 : }
1878 :
1879 0 : StorageBrokerCmd::Stop(_args) => {
1880 : // FIXME: stop_mode unused
1881 0 : let storage_broker = StorageBroker::from_env(env);
1882 0 : if let Err(e) = storage_broker.stop() {
1883 0 : eprintln!("broker stop failed: {e}");
1884 0 : exit(1);
1885 0 : }
1886 : }
1887 : }
1888 0 : Ok(())
1889 0 : }
1890 :
1891 0 : async fn handle_start_all(
1892 0 : args: &StartCmdArgs,
1893 0 : env: &'static local_env::LocalEnv,
1894 0 : ) -> anyhow::Result<()> {
1895 : // FIXME: this was called "retry_timeout", is it right?
1896 0 : let Err(errors) = handle_start_all_impl(env, args.timeout).await else {
1897 0 : neon_start_status_check(env, args.timeout.as_ref())
1898 0 : .await
1899 0 : .context("status check after successful startup of all services")?;
1900 0 : return Ok(());
1901 : };
1902 :
1903 0 : eprintln!("startup failed because one or more services could not be started");
1904 :
1905 0 : for e in errors {
1906 0 : eprintln!("{e}");
1907 0 : let debug_repr = format!("{e:?}");
1908 0 : for line in debug_repr.lines() {
1909 0 : eprintln!(" {line}");
1910 0 : }
1911 : }
1912 :
1913 0 : try_stop_all(env, true).await;
1914 :
1915 0 : exit(2);
1916 0 : }
1917 :
1918 : /// Returns Ok() if and only if all services could be started successfully.
1919 : /// Otherwise, returns the list of errors that occurred during startup.
1920 0 : async fn handle_start_all_impl(
1921 0 : env: &'static local_env::LocalEnv,
1922 0 : retry_timeout: humantime::Duration,
1923 0 : ) -> Result<(), Vec<anyhow::Error>> {
1924 : // Endpoints are not started automatically
1925 :
1926 0 : let mut js = JoinSet::new();
1927 :
1928 : // force infalliblity through closure
1929 : #[allow(clippy::redundant_closure_call)]
1930 0 : (|| {
1931 0 : js.spawn(async move {
1932 0 : let storage_broker = StorageBroker::from_env(env);
1933 0 : storage_broker
1934 0 : .start(&retry_timeout)
1935 0 : .await
1936 0 : .map_err(|e| e.context("start storage_broker"))
1937 0 : });
1938 :
1939 0 : js.spawn(async move {
1940 0 : let storage_controller = StorageController::from_env(env);
1941 0 : storage_controller
1942 0 : .start(NeonStorageControllerStartArgs::with_default_instance_id(
1943 0 : retry_timeout,
1944 0 : ))
1945 0 : .await
1946 0 : .map_err(|e| e.context("start storage_controller"))
1947 0 : });
1948 :
1949 0 : for ps_conf in &env.pageservers {
1950 0 : js.spawn(async move {
1951 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1952 0 : pageserver
1953 0 : .start(&retry_timeout)
1954 0 : .await
1955 0 : .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
1956 0 : });
1957 : }
1958 :
1959 0 : for node in env.safekeepers.iter() {
1960 0 : js.spawn(async move {
1961 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1962 0 : safekeeper
1963 0 : .start(&[], &retry_timeout)
1964 0 : .await
1965 0 : .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
1966 0 : });
1967 : }
1968 :
1969 0 : js.spawn(async move {
1970 0 : EndpointStorage::from_env(env)
1971 0 : .start(&retry_timeout)
1972 0 : .await
1973 0 : .map_err(|e| e.context("start endpoint_storage"))
1974 0 : });
1975 : })();
1976 :
1977 0 : let mut errors = Vec::new();
1978 0 : while let Some(result) = js.join_next().await {
1979 0 : let result = result.expect("we don't panic or cancel the tasks");
1980 0 : if let Err(e) = result {
1981 0 : errors.push(e);
1982 0 : }
1983 : }
1984 :
1985 0 : if !errors.is_empty() {
1986 0 : return Err(errors);
1987 0 : }
1988 :
1989 0 : Ok(())
1990 0 : }
1991 :
1992 0 : async fn neon_start_status_check(
1993 0 : env: &local_env::LocalEnv,
1994 0 : retry_timeout: &Duration,
1995 0 : ) -> anyhow::Result<()> {
1996 : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
1997 : const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
1998 :
1999 0 : let storcon = StorageController::from_env(env);
2000 :
2001 0 : let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
2002 0 : let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
2003 :
2004 0 : println!("\nRunning neon status check");
2005 :
2006 0 : for retry in 0..retries {
2007 0 : if retry == notice_after_retries {
2008 0 : println!("\nNeon status check has not passed yet, continuing to wait")
2009 0 : }
2010 :
2011 0 : let mut passed = true;
2012 0 : let mut nodes = storcon.node_list().await?;
2013 0 : let mut pageservers = env.pageservers.clone();
2014 :
2015 0 : if nodes.len() != pageservers.len() {
2016 0 : continue;
2017 0 : }
2018 :
2019 0 : nodes.sort_by_key(|ps| ps.id);
2020 0 : pageservers.sort_by_key(|ps| ps.id);
2021 :
2022 0 : for (idx, pageserver) in pageservers.iter().enumerate() {
2023 0 : let node = &nodes[idx];
2024 0 : if node.id != pageserver.id {
2025 0 : passed = false;
2026 0 : break;
2027 0 : }
2028 :
2029 0 : if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
2030 0 : passed = false;
2031 0 : break;
2032 0 : }
2033 : }
2034 :
2035 0 : if passed {
2036 0 : println!("\nNeon started and passed status check");
2037 0 : return Ok(());
2038 0 : }
2039 :
2040 0 : tokio::time::sleep(RETRY_INTERVAL).await;
2041 : }
2042 :
2043 0 : anyhow::bail!("\nNeon passed status check")
2044 0 : }
2045 :
2046 0 : async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> {
2047 0 : let immediate = match args.mode {
2048 0 : StopMode::Fast => false,
2049 0 : StopMode::Immediate => true,
2050 : };
2051 :
2052 0 : try_stop_all(env, immediate).await;
2053 :
2054 0 : Ok(())
2055 0 : }
2056 :
2057 0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
2058 0 : let mode = if immediate {
2059 0 : EndpointTerminateMode::Immediate
2060 : } else {
2061 0 : EndpointTerminateMode::Fast
2062 : };
2063 : // Stop all endpoints
2064 0 : match ComputeControlPlane::load(env.clone()) {
2065 0 : Ok(cplane) => {
2066 0 : for (_k, node) in cplane.endpoints {
2067 0 : if let Err(e) = node.stop(mode, false).await {
2068 0 : eprintln!("postgres stop failed: {e:#}");
2069 0 : }
2070 : }
2071 : }
2072 0 : Err(e) => {
2073 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
2074 : }
2075 : }
2076 :
2077 0 : let storage = EndpointStorage::from_env(env);
2078 0 : if let Err(e) = storage.stop(immediate) {
2079 0 : eprintln!("endpoint_storage stop failed: {e:#}");
2080 0 : }
2081 :
2082 0 : for ps_conf in &env.pageservers {
2083 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
2084 0 : if let Err(e) = pageserver.stop(immediate) {
2085 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
2086 0 : }
2087 : }
2088 :
2089 0 : for node in env.safekeepers.iter() {
2090 0 : let safekeeper = SafekeeperNode::from_env(env, node);
2091 0 : if let Err(e) = safekeeper.stop(immediate) {
2092 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
2093 0 : }
2094 : }
2095 :
2096 0 : let storage_broker = StorageBroker::from_env(env);
2097 0 : if let Err(e) = storage_broker.stop() {
2098 0 : eprintln!("neon broker stop failed: {e:#}");
2099 0 : }
2100 :
2101 : // Stop all storage controller instances. In the most common case there's only one,
2102 : // but iterate though the base data directory in order to discover the instances.
2103 0 : let storcon_instances = env
2104 0 : .storage_controller_instances()
2105 0 : .await
2106 0 : .expect("Must inspect data dir");
2107 0 : for (instance_id, _instance_dir_path) in storcon_instances {
2108 0 : let storage_controller = StorageController::from_env(env);
2109 0 : let stop_args = NeonStorageControllerStopArgs {
2110 0 : instance_id,
2111 0 : immediate,
2112 0 : };
2113 :
2114 0 : if let Err(e) = storage_controller.stop(stop_args).await {
2115 0 : eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
2116 0 : }
2117 : }
2118 0 : }
|