Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use anyhow::{anyhow, bail, Context, Result};
9 : use clap::Parser;
10 : use compute_api::spec::ComputeMode;
11 : use control_plane::endpoint::ComputeControlPlane;
12 : use control_plane::local_env::{
13 : InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
14 : SafekeeperConf,
15 : };
16 : use control_plane::pageserver::PageServerNode;
17 : use control_plane::safekeeper::SafekeeperNode;
18 : use control_plane::storage_controller::{
19 : NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
20 : };
21 : use control_plane::{broker, local_env};
22 : use pageserver_api::config::{
23 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
24 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
25 : };
26 : use pageserver_api::controller_api::{
27 : NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
28 : };
29 : use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInfo};
30 : use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
31 : use postgres_backend::AuthType;
32 : use postgres_connection::parse_host_port;
33 : use safekeeper_api::{
34 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
35 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
36 : };
37 : use std::borrow::Cow;
38 : use std::collections::{BTreeSet, HashMap};
39 : use std::path::PathBuf;
40 : use std::process::exit;
41 : use std::str::FromStr;
42 : use std::time::Duration;
43 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
44 : use tokio::task::JoinSet;
45 : use url::Host;
46 : use utils::{
47 : auth::{Claims, Scope},
48 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
49 : lsn::Lsn,
50 : project_git_version,
51 : };
52 :
53 : // Default id of a safekeeper node, if not specified on the command line.
54 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
55 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
56 : const DEFAULT_BRANCH_NAME: &str = "main";
57 : project_git_version!(GIT_VERSION);
58 :
59 : const DEFAULT_PG_VERSION: u32 = 16;
60 :
61 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
62 :
63 0 : #[derive(clap::Parser)]
64 : #[command(version = GIT_VERSION, about, name = "Neon CLI")]
65 : struct Cli {
66 : #[command(subcommand)]
67 : command: NeonLocalCmd,
68 : }
69 :
70 0 : #[derive(clap::Subcommand)]
71 : enum NeonLocalCmd {
72 : Init(InitCmdArgs),
73 :
74 : #[command(subcommand)]
75 : Tenant(TenantCmd),
76 : #[command(subcommand)]
77 : Timeline(TimelineCmd),
78 : #[command(subcommand)]
79 : Pageserver(PageserverCmd),
80 : #[command(subcommand)]
81 : #[clap(alias = "storage_controller")]
82 : StorageController(StorageControllerCmd),
83 : #[command(subcommand)]
84 : #[clap(alias = "storage_broker")]
85 : StorageBroker(StorageBrokerCmd),
86 : #[command(subcommand)]
87 : Safekeeper(SafekeeperCmd),
88 : #[command(subcommand)]
89 : Endpoint(EndpointCmd),
90 : #[command(subcommand)]
91 : Mappings(MappingsCmd),
92 :
93 : Start(StartCmdArgs),
94 : Stop(StopCmdArgs),
95 : }
96 :
97 0 : #[derive(clap::Args)]
98 : #[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")]
99 : struct InitCmdArgs {
100 : #[clap(long, help("How many pageservers to create (default 1)"))]
101 : num_pageservers: Option<u16>,
102 :
103 : #[clap(long)]
104 : config: Option<PathBuf>,
105 :
106 : #[clap(long, help("Force initialization even if the repository is not empty"))]
107 : #[arg(value_parser)]
108 : #[clap(default_value = "must-not-exist")]
109 0 : force: InitForceMode,
110 : }
111 :
112 0 : #[derive(clap::Args)]
113 : #[clap(about = "Start pageserver and safekeepers")]
114 : struct StartCmdArgs {
115 : #[clap(long = "start-timeout", default_value = "10s")]
116 0 : timeout: humantime::Duration,
117 : }
118 :
119 0 : #[derive(clap::Args)]
120 : #[clap(about = "Stop pageserver and safekeepers")]
121 : struct StopCmdArgs {
122 : #[arg(value_enum)]
123 0 : #[clap(long, default_value_t = StopMode::Fast)]
124 0 : mode: StopMode,
125 : }
126 :
127 0 : #[derive(Clone, Copy, clap::ValueEnum)]
128 : enum StopMode {
129 : Fast,
130 : Immediate,
131 : }
132 :
133 0 : #[derive(clap::Subcommand)]
134 : #[clap(about = "Manage tenants")]
135 : enum TenantCmd {
136 : List,
137 : Create(TenantCreateCmdArgs),
138 : SetDefault(TenantSetDefaultCmdArgs),
139 : Config(TenantConfigCmdArgs),
140 : Import(TenantImportCmdArgs),
141 : }
142 :
143 0 : #[derive(clap::Args)]
144 : struct TenantCreateCmdArgs {
145 : #[clap(
146 : long = "tenant-id",
147 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
148 : )]
149 : tenant_id: Option<TenantId>,
150 :
151 : #[clap(
152 : long,
153 : help = "Use a specific timeline id when creating a tenant and its initial timeline"
154 : )]
155 : timeline_id: Option<TimelineId>,
156 :
157 : #[clap(short = 'c')]
158 0 : config: Vec<String>,
159 :
160 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
161 : #[clap(long, help = "Postgres version to use for the initial timeline")]
162 0 : pg_version: u32,
163 :
164 : #[clap(
165 : long,
166 : help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified"
167 : )]
168 0 : set_default: bool,
169 :
170 : #[clap(long, help = "Number of shards in the new tenant")]
171 0 : #[arg(default_value_t = 0)]
172 0 : shard_count: u8,
173 : #[clap(long, help = "Sharding stripe size in pages")]
174 : shard_stripe_size: Option<u32>,
175 :
176 : #[clap(long, help = "Placement policy shards in this tenant")]
177 : #[arg(value_parser = parse_placement_policy)]
178 : placement_policy: Option<PlacementPolicy>,
179 : }
180 :
181 0 : fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
182 0 : Ok(serde_json::from_str::<PlacementPolicy>(s)?)
183 0 : }
184 :
185 0 : #[derive(clap::Args)]
186 : #[clap(
187 : about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"
188 : )]
189 : struct TenantSetDefaultCmdArgs {
190 : #[clap(
191 : long = "tenant-id",
192 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
193 : )]
194 0 : tenant_id: TenantId,
195 : }
196 :
197 0 : #[derive(clap::Args)]
198 : struct TenantConfigCmdArgs {
199 : #[clap(
200 : long = "tenant-id",
201 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
202 : )]
203 : tenant_id: Option<TenantId>,
204 :
205 : #[clap(short = 'c')]
206 0 : config: Vec<String>,
207 : }
208 :
209 0 : #[derive(clap::Args)]
210 : #[clap(
211 : about = "Import a tenant that is present in remote storage, and create branches for its timelines"
212 : )]
213 : struct TenantImportCmdArgs {
214 : #[clap(
215 : long = "tenant-id",
216 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
217 : )]
218 0 : tenant_id: TenantId,
219 : }
220 :
221 0 : #[derive(clap::Subcommand)]
222 : #[clap(about = "Manage timelines")]
223 : enum TimelineCmd {
224 : List(TimelineListCmdArgs),
225 : Branch(TimelineBranchCmdArgs),
226 : Create(TimelineCreateCmdArgs),
227 : Import(TimelineImportCmdArgs),
228 : }
229 :
230 0 : #[derive(clap::Args)]
231 : #[clap(about = "List all timelines available to this pageserver")]
232 : struct TimelineListCmdArgs {
233 : #[clap(
234 : long = "tenant-id",
235 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
236 : )]
237 : tenant_shard_id: Option<TenantShardId>,
238 : }
239 :
240 0 : #[derive(clap::Args)]
241 : #[clap(about = "Create a new timeline, branching off from another timeline")]
242 : struct TimelineBranchCmdArgs {
243 : #[clap(
244 : long = "tenant-id",
245 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
246 : )]
247 : tenant_id: Option<TenantId>,
248 :
249 : #[clap(long, help = "New timeline's ID")]
250 : timeline_id: Option<TimelineId>,
251 :
252 : #[clap(long, help = "Human-readable alias for the new timeline")]
253 0 : branch_name: String,
254 :
255 : #[clap(
256 : long,
257 : help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name."
258 : )]
259 : ancestor_branch_name: Option<String>,
260 :
261 : #[clap(
262 : long,
263 : help = "When using another timeline as base, use a specific Lsn in it instead of the latest one"
264 : )]
265 : ancestor_start_lsn: Option<Lsn>,
266 : }
267 :
268 0 : #[derive(clap::Args)]
269 : #[clap(about = "Create a new blank timeline")]
270 : struct TimelineCreateCmdArgs {
271 : #[clap(
272 : long = "tenant-id",
273 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
274 : )]
275 : tenant_id: Option<TenantId>,
276 :
277 : #[clap(long, help = "New timeline's ID")]
278 : timeline_id: Option<TimelineId>,
279 :
280 : #[clap(long, help = "Human-readable alias for the new timeline")]
281 0 : branch_name: String,
282 :
283 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
284 : #[clap(long, help = "Postgres version")]
285 0 : pg_version: u32,
286 : }
287 :
288 0 : #[derive(clap::Args)]
289 : #[clap(about = "Import timeline from a basebackup directory")]
290 : struct TimelineImportCmdArgs {
291 : #[clap(
292 : long = "tenant-id",
293 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
294 : )]
295 : tenant_id: Option<TenantId>,
296 :
297 : #[clap(long, help = "New timeline's ID")]
298 0 : timeline_id: TimelineId,
299 :
300 : #[clap(long, help = "Human-readable alias for the new timeline")]
301 0 : branch_name: String,
302 :
303 : #[clap(long, help = "Basebackup tarfile to import")]
304 0 : base_tarfile: PathBuf,
305 :
306 : #[clap(long, help = "Lsn the basebackup starts at")]
307 0 : base_lsn: Lsn,
308 :
309 : #[clap(long, help = "Wal to add after base")]
310 : wal_tarfile: Option<PathBuf>,
311 :
312 : #[clap(long, help = "Lsn the basebackup ends at")]
313 : end_lsn: Option<Lsn>,
314 :
315 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
316 : #[clap(long, help = "Postgres version of the backup being imported")]
317 0 : pg_version: u32,
318 : }
319 :
320 0 : #[derive(clap::Subcommand)]
321 : #[clap(about = "Manage pageservers")]
322 : enum PageserverCmd {
323 : Status(PageserverStatusCmdArgs),
324 : Start(PageserverStartCmdArgs),
325 : Stop(PageserverStopCmdArgs),
326 : Restart(PageserverRestartCmdArgs),
327 : }
328 :
329 0 : #[derive(clap::Args)]
330 : #[clap(about = "Show status of a local pageserver")]
331 : struct PageserverStatusCmdArgs {
332 : #[clap(long = "id", help = "pageserver id")]
333 : pageserver_id: Option<NodeId>,
334 : }
335 :
336 0 : #[derive(clap::Args)]
337 : #[clap(about = "Start local pageserver")]
338 : struct PageserverStartCmdArgs {
339 : #[clap(long = "id", help = "pageserver id")]
340 : pageserver_id: Option<NodeId>,
341 :
342 : #[clap(short = 't', long, help = "timeout until we fail the command")]
343 : #[arg(default_value = "10s")]
344 0 : start_timeout: humantime::Duration,
345 : }
346 :
347 0 : #[derive(clap::Args)]
348 : #[clap(about = "Stop local pageserver")]
349 : struct PageserverStopCmdArgs {
350 : #[clap(long = "id", help = "pageserver id")]
351 : pageserver_id: Option<NodeId>,
352 :
353 : #[clap(
354 : short = 'm',
355 : help = "If 'immediate', don't flush repository data at shutdown"
356 : )]
357 : #[arg(value_enum, default_value = "fast")]
358 0 : stop_mode: StopMode,
359 : }
360 :
361 0 : #[derive(clap::Args)]
362 : #[clap(about = "Restart local pageserver")]
363 : struct PageserverRestartCmdArgs {
364 : #[clap(long = "id", help = "pageserver id")]
365 : pageserver_id: Option<NodeId>,
366 :
367 : #[clap(short = 't', long, help = "timeout until we fail the command")]
368 : #[arg(default_value = "10s")]
369 0 : start_timeout: humantime::Duration,
370 : }
371 :
372 0 : #[derive(clap::Subcommand)]
373 : #[clap(about = "Manage storage controller")]
374 : enum StorageControllerCmd {
375 : Start(StorageControllerStartCmdArgs),
376 : Stop(StorageControllerStopCmdArgs),
377 : }
378 :
379 0 : #[derive(clap::Args)]
380 : #[clap(about = "Start storage controller")]
381 : struct StorageControllerStartCmdArgs {
382 : #[clap(short = 't', long, help = "timeout until we fail the command")]
383 : #[arg(default_value = "10s")]
384 0 : start_timeout: humantime::Duration,
385 :
386 : #[clap(
387 : long,
388 : help = "Identifier used to distinguish storage controller instances"
389 : )]
390 0 : #[arg(default_value_t = 1)]
391 0 : instance_id: u8,
392 :
393 : #[clap(
394 : long,
395 : help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
396 : )]
397 : base_port: Option<u16>,
398 : }
399 :
400 0 : #[derive(clap::Args)]
401 : #[clap(about = "Stop storage controller")]
402 : struct StorageControllerStopCmdArgs {
403 : #[clap(
404 : short = 'm',
405 : help = "If 'immediate', don't flush repository data at shutdown"
406 : )]
407 : #[arg(value_enum, default_value = "fast")]
408 0 : stop_mode: StopMode,
409 :
410 : #[clap(
411 : long,
412 : help = "Identifier used to distinguish storage controller instances"
413 : )]
414 0 : #[arg(default_value_t = 1)]
415 0 : instance_id: u8,
416 : }
417 :
418 0 : #[derive(clap::Subcommand)]
419 : #[clap(about = "Manage storage broker")]
420 : enum StorageBrokerCmd {
421 : Start(StorageBrokerStartCmdArgs),
422 : Stop(StorageBrokerStopCmdArgs),
423 : }
424 :
425 0 : #[derive(clap::Args)]
426 : #[clap(about = "Start broker")]
427 : struct StorageBrokerStartCmdArgs {
428 : #[clap(short = 't', long, help = "timeout until we fail the command")]
429 : #[arg(default_value = "10s")]
430 0 : start_timeout: humantime::Duration,
431 : }
432 :
433 0 : #[derive(clap::Args)]
434 : #[clap(about = "stop broker")]
435 : struct StorageBrokerStopCmdArgs {
436 : #[clap(
437 : short = 'm',
438 : help = "If 'immediate', don't flush repository data at shutdown"
439 : )]
440 : #[arg(value_enum, default_value = "fast")]
441 0 : stop_mode: StopMode,
442 : }
443 :
444 0 : #[derive(clap::Subcommand)]
445 : #[clap(about = "Manage safekeepers")]
446 : enum SafekeeperCmd {
447 : Start(SafekeeperStartCmdArgs),
448 : Stop(SafekeeperStopCmdArgs),
449 : Restart(SafekeeperRestartCmdArgs),
450 : }
451 :
452 0 : #[derive(clap::Args)]
453 : #[clap(about = "Start local safekeeper")]
454 : struct SafekeeperStartCmdArgs {
455 : #[clap(help = "safekeeper id")]
456 0 : #[arg(default_value_t = NodeId(1))]
457 0 : id: NodeId,
458 :
459 : #[clap(
460 : short = 'e',
461 : long = "safekeeper-extra-opt",
462 : help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
463 : )]
464 0 : extra_opt: Vec<String>,
465 :
466 : #[clap(short = 't', long, help = "timeout until we fail the command")]
467 : #[arg(default_value = "10s")]
468 0 : start_timeout: humantime::Duration,
469 : }
470 :
471 0 : #[derive(clap::Args)]
472 : #[clap(about = "Stop local safekeeper")]
473 : struct SafekeeperStopCmdArgs {
474 : #[clap(help = "safekeeper id")]
475 0 : #[arg(default_value_t = NodeId(1))]
476 0 : id: NodeId,
477 :
478 : #[arg(value_enum, default_value = "fast")]
479 : #[clap(
480 : short = 'm',
481 : help = "If 'immediate', don't flush repository data at shutdown"
482 : )]
483 0 : stop_mode: StopMode,
484 : }
485 :
486 0 : #[derive(clap::Args)]
487 : #[clap(about = "Restart local safekeeper")]
488 : struct SafekeeperRestartCmdArgs {
489 : #[clap(help = "safekeeper id")]
490 0 : #[arg(default_value_t = NodeId(1))]
491 0 : id: NodeId,
492 :
493 : #[arg(value_enum, default_value = "fast")]
494 : #[clap(
495 : short = 'm',
496 : help = "If 'immediate', don't flush repository data at shutdown"
497 : )]
498 0 : stop_mode: StopMode,
499 :
500 : #[clap(
501 : short = 'e',
502 : long = "safekeeper-extra-opt",
503 : help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
504 : )]
505 0 : extra_opt: Vec<String>,
506 :
507 : #[clap(short = 't', long, help = "timeout until we fail the command")]
508 : #[arg(default_value = "10s")]
509 0 : start_timeout: humantime::Duration,
510 : }
511 :
512 0 : #[derive(clap::Subcommand)]
513 : #[clap(about = "Manage Postgres instances")]
514 : enum EndpointCmd {
515 : List(EndpointListCmdArgs),
516 : Create(EndpointCreateCmdArgs),
517 : Start(EndpointStartCmdArgs),
518 : Reconfigure(EndpointReconfigureCmdArgs),
519 : Stop(EndpointStopCmdArgs),
520 : }
521 :
522 0 : #[derive(clap::Args)]
523 : #[clap(about = "List endpoints")]
524 : struct EndpointListCmdArgs {
525 : #[clap(
526 : long = "tenant-id",
527 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
528 : )]
529 : tenant_shard_id: Option<TenantShardId>,
530 : }
531 :
532 0 : #[derive(clap::Args)]
533 : #[clap(about = "Create a compute endpoint")]
534 : struct EndpointCreateCmdArgs {
535 : #[clap(
536 : long = "tenant-id",
537 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
538 : )]
539 : tenant_id: Option<TenantId>,
540 :
541 : #[clap(help = "Postgres endpoint id")]
542 : endpoint_id: Option<String>,
543 : #[clap(long, help = "Name of the branch the endpoint will run on")]
544 : branch_name: Option<String>,
545 : #[clap(
546 : long,
547 : help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used"
548 : )]
549 : lsn: Option<Lsn>,
550 : #[clap(long)]
551 : pg_port: Option<u16>,
552 : #[clap(long)]
553 : http_port: Option<u16>,
554 : #[clap(long = "pageserver-id")]
555 : endpoint_pageserver_id: Option<NodeId>,
556 :
557 : #[clap(
558 : long,
559 : help = "Don't do basebackup, create endpoint directory with only config files",
560 : action = clap::ArgAction::Set,
561 0 : default_value_t = false
562 : )]
563 0 : config_only: bool,
564 :
565 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
566 : #[clap(long, help = "Postgres version")]
567 0 : pg_version: u32,
568 :
569 : #[clap(
570 : long,
571 : help = "If set, the node will be a hot replica on the specified timeline",
572 : action = clap::ArgAction::Set,
573 0 : default_value_t = false
574 : )]
575 0 : hot_standby: bool,
576 :
577 : #[clap(long, help = "If set, will set up the catalog for neon_superuser")]
578 0 : update_catalog: bool,
579 :
580 : #[clap(
581 : long,
582 : help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
583 : )]
584 0 : allow_multiple: bool,
585 : }
586 :
587 0 : #[derive(clap::Args)]
588 : #[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
589 : struct EndpointStartCmdArgs {
590 : #[clap(help = "Postgres endpoint id")]
591 0 : endpoint_id: String,
592 : #[clap(long = "pageserver-id")]
593 : endpoint_pageserver_id: Option<NodeId>,
594 :
595 : #[clap(long)]
596 : safekeepers: Option<String>,
597 :
598 : #[clap(
599 : long,
600 : help = "Configure the remote extensions storage proxy gateway to request for extensions."
601 : )]
602 : remote_ext_config: Option<String>,
603 :
604 : #[clap(
605 : long,
606 : help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`"
607 : )]
608 0 : create_test_user: bool,
609 :
610 : #[clap(
611 : long,
612 : help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
613 : )]
614 0 : allow_multiple: bool,
615 :
616 : #[clap(short = 't', long, help = "timeout until we fail the command")]
617 : #[arg(default_value = "10s")]
618 0 : start_timeout: humantime::Duration,
619 : }
620 :
621 0 : #[derive(clap::Args)]
622 : #[clap(about = "Reconfigure an endpoint")]
623 : struct EndpointReconfigureCmdArgs {
624 : #[clap(
625 : long = "tenant-id",
626 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
627 : )]
628 : tenant_id: Option<TenantId>,
629 :
630 : #[clap(help = "Postgres endpoint id")]
631 0 : endpoint_id: String,
632 : #[clap(long = "pageserver-id")]
633 : endpoint_pageserver_id: Option<NodeId>,
634 :
635 : #[clap(long)]
636 : safekeepers: Option<String>,
637 : }
638 :
639 0 : #[derive(clap::Args)]
640 : #[clap(about = "Stop an endpoint")]
641 : struct EndpointStopCmdArgs {
642 : #[clap(help = "Postgres endpoint id")]
643 0 : endpoint_id: String,
644 :
645 : #[clap(
646 : long,
647 : help = "Also delete data directory (now optional, should be default in future)"
648 : )]
649 0 : destroy: bool,
650 :
651 : #[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")]
652 : #[arg(value_parser(["smart", "fast", "immediate"]))]
653 : #[arg(default_value = "fast")]
654 0 : mode: String,
655 : }
656 :
657 0 : #[derive(clap::Subcommand)]
658 : #[clap(about = "Manage neon_local branch name mappings")]
659 : enum MappingsCmd {
660 : Map(MappingsMapCmdArgs),
661 : }
662 :
663 0 : #[derive(clap::Args)]
664 : #[clap(about = "Create new mapping which cannot exist already")]
665 : struct MappingsMapCmdArgs {
666 : #[clap(
667 : long,
668 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
669 : )]
670 0 : tenant_id: TenantId,
671 : #[clap(
672 : long,
673 : help = "Timeline id. Represented as a hexadecimal string 32 symbols length"
674 : )]
675 0 : timeline_id: TimelineId,
676 : #[clap(long, help = "Branch name to give to the timeline")]
677 0 : branch_name: String,
678 : }
679 :
680 : ///
681 : /// Timelines tree element used as a value in the HashMap.
682 : ///
683 : struct TimelineTreeEl {
684 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
685 : pub info: TimelineInfo,
686 : /// Name, recovered from neon config mappings
687 : pub name: Option<String>,
688 : /// Holds all direct children of this timeline referenced using `timeline_id`.
689 : pub children: BTreeSet<TimelineId>,
690 : }
691 :
692 : // Main entry point for the 'neon_local' CLI utility
693 : //
694 : // This utility helps to manage neon installation. That includes following:
695 : // * Management of local postgres installations running on top of the
696 : // pageserver.
697 : // * Providing CLI api to the pageserver
698 : // * TODO: export/import to/from usual postgres
699 0 : fn main() -> Result<()> {
700 0 : let cli = Cli::parse();
701 :
702 : // Check for 'neon init' command first.
703 0 : let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
704 0 : handle_init(&args).map(|env| Some(Cow::Owned(env)))
705 : } else {
706 : // all other commands need an existing config
707 0 : let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
708 0 : let original_env = env.clone();
709 0 : let env = Box::leak(Box::new(env));
710 0 : let rt = tokio::runtime::Builder::new_current_thread()
711 0 : .enable_all()
712 0 : .build()
713 0 : .unwrap();
714 :
715 0 : let subcommand_result = match cli.command {
716 0 : NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"),
717 0 : NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)),
718 0 : NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)),
719 0 : NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)),
720 0 : NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)),
721 0 : NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)),
722 0 : NeonLocalCmd::StorageController(subcmd) => {
723 0 : rt.block_on(handle_storage_controller(&subcmd, env))
724 : }
725 0 : NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
726 0 : NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
727 0 : NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
728 0 : NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
729 : };
730 :
731 0 : if &original_env != env {
732 0 : subcommand_result.map(|()| Some(Cow::Borrowed(env)))
733 : } else {
734 0 : subcommand_result.map(|()| None)
735 : }
736 : };
737 :
738 0 : match subcommand_result {
739 0 : Ok(Some(updated_env)) => updated_env.persist_config()?,
740 0 : Ok(None) => (),
741 0 : Err(e) => {
742 0 : eprintln!("command failed: {e:?}");
743 0 : exit(1);
744 : }
745 : }
746 0 : Ok(())
747 0 : }
748 :
749 : ///
750 : /// Prints timelines list as a tree-like structure.
751 : ///
752 0 : fn print_timelines_tree(
753 0 : timelines: Vec<TimelineInfo>,
754 0 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
755 0 : ) -> Result<()> {
756 0 : let mut timelines_hash = timelines
757 0 : .iter()
758 0 : .map(|t| {
759 0 : (
760 0 : t.timeline_id,
761 0 : TimelineTreeEl {
762 0 : info: t.clone(),
763 0 : children: BTreeSet::new(),
764 0 : name: timeline_name_mappings
765 0 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
766 0 : },
767 0 : )
768 0 : })
769 0 : .collect::<HashMap<_, _>>();
770 :
771 : // Memorize all direct children of each timeline.
772 0 : for timeline in timelines.iter() {
773 0 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
774 0 : timelines_hash
775 0 : .get_mut(&ancestor_timeline_id)
776 0 : .context("missing timeline info in the HashMap")?
777 : .children
778 0 : .insert(timeline.timeline_id);
779 0 : }
780 : }
781 :
782 0 : for timeline in timelines_hash.values() {
783 : // Start with root local timelines (no ancestors) first.
784 0 : if timeline.info.ancestor_timeline_id.is_none() {
785 0 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
786 0 : }
787 : }
788 :
789 0 : Ok(())
790 0 : }
791 :
792 : ///
793 : /// Recursively prints timeline info with all its children.
794 : ///
795 0 : fn print_timeline(
796 0 : nesting_level: usize,
797 0 : is_last: &[bool],
798 0 : timeline: &TimelineTreeEl,
799 0 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
800 0 : ) -> Result<()> {
801 0 : if nesting_level > 0 {
802 0 : let ancestor_lsn = match timeline.info.ancestor_lsn {
803 0 : Some(lsn) => lsn.to_string(),
804 0 : None => "Unknown Lsn".to_string(),
805 : };
806 :
807 0 : let mut br_sym = "┣━";
808 0 :
809 0 : // Draw each nesting padding with proper style
810 0 : // depending on whether its timeline ended or not.
811 0 : if nesting_level > 1 {
812 0 : for l in &is_last[1..is_last.len() - 1] {
813 0 : if *l {
814 0 : print!(" ");
815 0 : } else {
816 0 : print!("┃ ");
817 0 : }
818 : }
819 0 : }
820 :
821 : // We are the last in this sub-timeline
822 0 : if *is_last.last().unwrap() {
823 0 : br_sym = "┗━";
824 0 : }
825 :
826 0 : print!("{} @{}: ", br_sym, ancestor_lsn);
827 0 : }
828 :
829 : // Finally print a timeline id and name with new line
830 0 : println!(
831 0 : "{} [{}]",
832 0 : timeline.name.as_deref().unwrap_or("_no_name_"),
833 0 : timeline.info.timeline_id
834 0 : );
835 0 :
836 0 : let len = timeline.children.len();
837 0 : let mut i: usize = 0;
838 0 : let mut is_last_new = Vec::from(is_last);
839 0 : is_last_new.push(false);
840 :
841 0 : for child in &timeline.children {
842 0 : i += 1;
843 0 :
844 0 : // Mark that the last padding is the end of the timeline
845 0 : if i == len {
846 0 : if let Some(last) = is_last_new.last_mut() {
847 0 : *last = true;
848 0 : }
849 0 : }
850 :
851 : print_timeline(
852 0 : nesting_level + 1,
853 0 : &is_last_new,
854 0 : timelines
855 0 : .get(child)
856 0 : .context("missing timeline info in the HashMap")?,
857 0 : timelines,
858 0 : )?;
859 : }
860 :
861 0 : Ok(())
862 0 : }
863 :
864 : /// Returns a map of timeline IDs to timeline_id@lsn strings.
865 : /// Connects to the pageserver to query this information.
866 0 : async fn get_timeline_infos(
867 0 : env: &local_env::LocalEnv,
868 0 : tenant_shard_id: &TenantShardId,
869 0 : ) -> Result<HashMap<TimelineId, TimelineInfo>> {
870 0 : Ok(get_default_pageserver(env)
871 0 : .timeline_list(tenant_shard_id)
872 0 : .await?
873 0 : .into_iter()
874 0 : .map(|timeline_info| (timeline_info.timeline_id, timeline_info))
875 0 : .collect())
876 0 : }
877 :
878 : /// Helper function to get tenant id from an optional --tenant_id option or from the config file
879 0 : fn get_tenant_id(
880 0 : tenant_id_arg: Option<TenantId>,
881 0 : env: &local_env::LocalEnv,
882 0 : ) -> anyhow::Result<TenantId> {
883 0 : if let Some(tenant_id_from_arguments) = tenant_id_arg {
884 0 : Ok(tenant_id_from_arguments)
885 0 : } else if let Some(default_id) = env.default_tenant_id {
886 0 : Ok(default_id)
887 : } else {
888 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
889 : }
890 0 : }
891 :
892 : /// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file,
893 : /// for commands that accept a shard suffix
894 0 : fn get_tenant_shard_id(
895 0 : tenant_shard_id_arg: Option<TenantShardId>,
896 0 : env: &local_env::LocalEnv,
897 0 : ) -> anyhow::Result<TenantShardId> {
898 0 : if let Some(tenant_id_from_arguments) = tenant_shard_id_arg {
899 0 : Ok(tenant_id_from_arguments)
900 0 : } else if let Some(default_id) = env.default_tenant_id {
901 0 : Ok(TenantShardId::unsharded(default_id))
902 : } else {
903 0 : anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
904 : }
905 0 : }
906 :
907 0 : fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
908 : // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
909 0 : let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
910 : // User (likely the Python test suite) provided a description of the environment.
911 0 : if args.num_pageservers.is_some() {
912 0 : bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
913 0 : }
914 : // load and parse the file
915 0 : let contents = std::fs::read_to_string(config_path).with_context(|| {
916 0 : format!(
917 0 : "Could not read configuration file '{}'",
918 0 : config_path.display()
919 0 : )
920 0 : })?;
921 0 : toml_edit::de::from_str(&contents)?
922 : } else {
923 : // User (likely interactive) did not provide a description of the environment, give them the default
924 0 : NeonLocalInitConf {
925 0 : control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
926 0 : broker: NeonBroker {
927 0 : listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
928 0 : },
929 0 : safekeepers: vec![SafekeeperConf {
930 0 : id: DEFAULT_SAFEKEEPER_ID,
931 0 : pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
932 0 : http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
933 0 : ..Default::default()
934 0 : }],
935 0 : pageservers: (0..args.num_pageservers.unwrap_or(1))
936 0 : .map(|i| {
937 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
938 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
939 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
940 0 : NeonLocalInitPageserverConf {
941 0 : id: pageserver_id,
942 0 : listen_pg_addr: format!("127.0.0.1:{pg_port}"),
943 0 : listen_http_addr: format!("127.0.0.1:{http_port}"),
944 0 : pg_auth_type: AuthType::Trust,
945 0 : http_auth_type: AuthType::Trust,
946 0 : other: Default::default(),
947 0 : // Typical developer machines use disks with slow fsync, and we don't care
948 0 : // about data integrity: disable disk syncs.
949 0 : no_sync: true,
950 0 : }
951 0 : })
952 0 : .collect(),
953 0 : pg_distrib_dir: None,
954 0 : neon_distrib_dir: None,
955 0 : default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
956 0 : storage_controller: None,
957 0 : control_plane_compute_hook_api: None,
958 0 : }
959 : };
960 :
961 0 : LocalEnv::init(init_conf, &args.force)
962 0 : .context("materialize initial neon_local environment on disk")?;
963 0 : Ok(LocalEnv::load_config(&local_env::base_path())
964 0 : .expect("freshly written config should be loadable"))
965 0 : }
966 :
967 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
968 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
969 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
970 : /// than this CLI.
971 0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
972 0 : let ps_conf = env
973 0 : .pageservers
974 0 : .first()
975 0 : .expect("Config is validated to contain at least one pageserver");
976 0 : PageServerNode::from_env(env, ps_conf)
977 0 : }
978 :
979 0 : async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
980 0 : let pageserver = get_default_pageserver(env);
981 0 : match subcmd {
982 : TenantCmd::List => {
983 0 : for t in pageserver.tenant_list().await? {
984 0 : println!("{} {:?}", t.id, t.state);
985 0 : }
986 : }
987 0 : TenantCmd::Import(args) => {
988 0 : let tenant_id = args.tenant_id;
989 0 :
990 0 : let storage_controller = StorageController::from_env(env);
991 0 : let create_response = storage_controller.tenant_import(tenant_id).await?;
992 :
993 0 : let shard_zero = create_response
994 0 : .shards
995 0 : .first()
996 0 : .expect("Import response omitted shards");
997 0 :
998 0 : let attached_pageserver_id = shard_zero.node_id;
999 0 : let pageserver =
1000 0 : PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
1001 :
1002 0 : println!(
1003 0 : "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
1004 0 : );
1005 :
1006 0 : let timelines = pageserver
1007 0 : .http_client
1008 0 : .list_timelines(shard_zero.shard_id)
1009 0 : .await?;
1010 :
1011 : // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
1012 0 : let main_timeline = timelines
1013 0 : .iter()
1014 0 : .find(|t| t.ancestor_timeline_id.is_none())
1015 0 : .expect("No timelines found")
1016 0 : .timeline_id;
1017 0 :
1018 0 : let mut branch_i = 0;
1019 0 : for timeline in timelines.iter() {
1020 0 : let branch_name = if timeline.timeline_id == main_timeline {
1021 0 : "main".to_string()
1022 : } else {
1023 0 : branch_i += 1;
1024 0 : format!("branch_{branch_i}")
1025 : };
1026 :
1027 0 : println!(
1028 0 : "Importing timeline {tenant_id}/{} as branch {branch_name}",
1029 0 : timeline.timeline_id
1030 0 : );
1031 0 :
1032 0 : env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
1033 : }
1034 : }
1035 0 : TenantCmd::Create(args) => {
1036 0 : let tenant_conf: HashMap<_, _> =
1037 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1038 :
1039 0 : let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
1040 :
1041 : // If tenant ID was not specified, generate one
1042 0 : let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate);
1043 0 :
1044 0 : // We must register the tenant with the storage controller, so
1045 0 : // that when the pageserver restarts, it will be re-attached.
1046 0 : let storage_controller = StorageController::from_env(env);
1047 0 : storage_controller
1048 0 : .tenant_create(TenantCreateRequest {
1049 0 : // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
1050 0 : // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest
1051 0 : // type is used both in the storage controller (for creating tenants) and in the pageserver (for
1052 0 : // creating shards)
1053 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
1054 0 : generation: None,
1055 0 : shard_parameters: ShardParameters {
1056 0 : count: ShardCount::new(args.shard_count),
1057 0 : stripe_size: args
1058 0 : .shard_stripe_size
1059 0 : .map(ShardStripeSize)
1060 0 : .unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
1061 0 : },
1062 0 : placement_policy: args.placement_policy.clone(),
1063 0 : config: tenant_conf,
1064 0 : })
1065 0 : .await?;
1066 0 : println!("tenant {tenant_id} successfully created on the pageserver");
1067 0 :
1068 0 : // Create an initial timeline for the new tenant
1069 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1070 0 :
1071 0 : // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
1072 0 : // different shards picking different start lsns. Maybe we have to teach storage controller
1073 0 : // to let shard 0 branch first and then propagate the chosen LSN to other shards.
1074 0 : storage_controller
1075 0 : .tenant_timeline_create(
1076 0 : tenant_id,
1077 0 : TimelineCreateRequest {
1078 0 : new_timeline_id,
1079 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1080 0 : existing_initdb_timeline_id: None,
1081 0 : pg_version: Some(args.pg_version),
1082 0 : },
1083 0 : },
1084 0 : )
1085 0 : .await?;
1086 :
1087 0 : env.register_branch_mapping(
1088 0 : DEFAULT_BRANCH_NAME.to_string(),
1089 0 : tenant_id,
1090 0 : new_timeline_id,
1091 0 : )?;
1092 :
1093 0 : println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
1094 0 :
1095 0 : if args.set_default {
1096 0 : println!("Setting tenant {tenant_id} as a default one");
1097 0 : env.default_tenant_id = Some(tenant_id);
1098 0 : }
1099 : }
1100 0 : TenantCmd::SetDefault(args) => {
1101 0 : println!("Setting tenant {} as a default one", args.tenant_id);
1102 0 : env.default_tenant_id = Some(args.tenant_id);
1103 0 : }
1104 0 : TenantCmd::Config(args) => {
1105 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1106 0 : let tenant_conf: HashMap<_, _> =
1107 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1108 0 :
1109 0 : pageserver
1110 0 : .tenant_config(tenant_id, tenant_conf)
1111 0 : .await
1112 0 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
1113 0 : println!("tenant {tenant_id} successfully configured on the pageserver");
1114 : }
1115 : }
1116 0 : Ok(())
1117 0 : }
1118 :
1119 0 : async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1120 0 : let pageserver = get_default_pageserver(env);
1121 0 :
1122 0 : match cmd {
1123 0 : TimelineCmd::List(args) => {
1124 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1125 : // where shard 0 is attached, and query there.
1126 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1127 0 : let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
1128 0 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
1129 : }
1130 0 : TimelineCmd::Create(args) => {
1131 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1132 0 : let new_branch_name = &args.branch_name;
1133 0 : let new_timeline_id_opt = args.timeline_id;
1134 0 : let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
1135 0 :
1136 0 : let storage_controller = StorageController::from_env(env);
1137 0 : let create_req = TimelineCreateRequest {
1138 0 : new_timeline_id,
1139 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1140 0 : existing_initdb_timeline_id: None,
1141 0 : pg_version: Some(args.pg_version),
1142 0 : },
1143 0 : };
1144 0 : let timeline_info = storage_controller
1145 0 : .tenant_timeline_create(tenant_id, create_req)
1146 0 : .await?;
1147 :
1148 0 : let last_record_lsn = timeline_info.last_record_lsn;
1149 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1150 :
1151 0 : println!(
1152 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
1153 0 : timeline_info.timeline_id
1154 0 : );
1155 : }
1156 0 : TimelineCmd::Import(args) => {
1157 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1158 0 : let timeline_id = args.timeline_id;
1159 0 : let branch_name = &args.branch_name;
1160 0 :
1161 0 : // Parse base inputs
1162 0 : let base = (args.base_lsn, args.base_tarfile.clone());
1163 0 :
1164 0 : // Parse pg_wal inputs
1165 0 : let wal_tarfile = args.wal_tarfile.clone();
1166 0 : let end_lsn = args.end_lsn;
1167 0 : // TODO validate both or none are provided
1168 0 : let pg_wal = end_lsn.zip(wal_tarfile);
1169 0 :
1170 0 : println!("Importing timeline into pageserver ...");
1171 0 : pageserver
1172 0 : .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
1173 0 : .await?;
1174 0 : env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
1175 0 : println!("Done");
1176 : }
1177 0 : TimelineCmd::Branch(args) => {
1178 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1179 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1180 0 : let new_branch_name = &args.branch_name;
1181 0 : let ancestor_branch_name = args
1182 0 : .ancestor_branch_name
1183 0 : .clone()
1184 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1185 0 : let ancestor_timeline_id = env
1186 0 : .get_branch_timeline_id(&ancestor_branch_name, tenant_id)
1187 0 : .ok_or_else(|| {
1188 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
1189 0 : })?;
1190 :
1191 0 : let start_lsn = args.ancestor_start_lsn;
1192 0 : let storage_controller = StorageController::from_env(env);
1193 0 : let create_req = TimelineCreateRequest {
1194 0 : new_timeline_id,
1195 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
1196 0 : ancestor_timeline_id,
1197 0 : ancestor_start_lsn: start_lsn,
1198 0 : pg_version: None,
1199 0 : },
1200 0 : };
1201 0 : let timeline_info = storage_controller
1202 0 : .tenant_timeline_create(tenant_id, create_req)
1203 0 : .await?;
1204 :
1205 0 : let last_record_lsn = timeline_info.last_record_lsn;
1206 0 :
1207 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1208 :
1209 0 : println!(
1210 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
1211 0 : timeline_info.timeline_id
1212 0 : );
1213 : }
1214 : }
1215 :
1216 0 : Ok(())
1217 0 : }
1218 :
1219 0 : async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> {
1220 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
1221 :
1222 0 : match subcmd {
1223 0 : EndpointCmd::List(args) => {
1224 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1225 : // where shard 0 is attached, and query there.
1226 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1227 0 : let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
1228 0 : .await
1229 0 : .unwrap_or_else(|e| {
1230 0 : eprintln!("Failed to load timeline info: {}", e);
1231 0 : HashMap::new()
1232 0 : });
1233 0 :
1234 0 : let timeline_name_mappings = env.timeline_name_mappings();
1235 0 :
1236 0 : let mut table = comfy_table::Table::new();
1237 0 :
1238 0 : table.load_preset(comfy_table::presets::NOTHING);
1239 0 :
1240 0 : table.set_header([
1241 0 : "ENDPOINT",
1242 0 : "ADDRESS",
1243 0 : "TIMELINE",
1244 0 : "BRANCH NAME",
1245 0 : "LSN",
1246 0 : "STATUS",
1247 0 : ]);
1248 :
1249 0 : for (endpoint_id, endpoint) in cplane
1250 0 : .endpoints
1251 0 : .iter()
1252 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
1253 : {
1254 0 : let lsn_str = match endpoint.mode {
1255 0 : ComputeMode::Static(lsn) => {
1256 0 : // -> read-only endpoint
1257 0 : // Use the node's LSN.
1258 0 : lsn.to_string()
1259 : }
1260 : _ => {
1261 : // -> primary endpoint or hot replica
1262 : // Use the LSN at the end of the timeline.
1263 0 : timeline_infos
1264 0 : .get(&endpoint.timeline_id)
1265 0 : .map(|bi| bi.last_record_lsn.to_string())
1266 0 : .unwrap_or_else(|| "?".to_string())
1267 : }
1268 : };
1269 :
1270 0 : let branch_name = timeline_name_mappings
1271 0 : .get(&TenantTimelineId::new(
1272 0 : tenant_shard_id.tenant_id,
1273 0 : endpoint.timeline_id,
1274 0 : ))
1275 0 : .map(|name| name.as_str())
1276 0 : .unwrap_or("?");
1277 0 :
1278 0 : table.add_row([
1279 0 : endpoint_id.as_str(),
1280 0 : &endpoint.pg_address.to_string(),
1281 0 : &endpoint.timeline_id.to_string(),
1282 0 : branch_name,
1283 0 : lsn_str.as_str(),
1284 0 : &format!("{}", endpoint.status()),
1285 0 : ]);
1286 0 : }
1287 :
1288 0 : println!("{table}");
1289 : }
1290 0 : EndpointCmd::Create(args) => {
1291 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1292 0 : let branch_name = args
1293 0 : .branch_name
1294 0 : .clone()
1295 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1296 0 : let endpoint_id = args
1297 0 : .endpoint_id
1298 0 : .clone()
1299 0 : .unwrap_or_else(|| format!("ep-{branch_name}"));
1300 :
1301 0 : let timeline_id = env
1302 0 : .get_branch_timeline_id(&branch_name, tenant_id)
1303 0 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
1304 :
1305 0 : let mode = match (args.lsn, args.hot_standby) {
1306 0 : (Some(lsn), false) => ComputeMode::Static(lsn),
1307 0 : (None, true) => ComputeMode::Replica,
1308 0 : (None, false) => ComputeMode::Primary,
1309 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
1310 : };
1311 :
1312 0 : match (mode, args.hot_standby) {
1313 : (ComputeMode::Static(_), true) => {
1314 0 : bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
1315 : }
1316 : (ComputeMode::Primary, true) => {
1317 0 : bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
1318 : }
1319 0 : _ => {}
1320 0 : }
1321 0 :
1322 0 : if !args.allow_multiple {
1323 0 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
1324 0 : }
1325 :
1326 0 : cplane.new_endpoint(
1327 0 : &endpoint_id,
1328 0 : tenant_id,
1329 0 : timeline_id,
1330 0 : args.pg_port,
1331 0 : args.http_port,
1332 0 : args.pg_version,
1333 0 : mode,
1334 0 : !args.update_catalog,
1335 0 : )?;
1336 : }
1337 0 : EndpointCmd::Start(args) => {
1338 0 : let endpoint_id = &args.endpoint_id;
1339 0 : let pageserver_id = args.endpoint_pageserver_id;
1340 0 : let remote_ext_config = &args.remote_ext_config;
1341 :
1342 : // If --safekeepers argument is given, use only the listed
1343 : // safekeeper nodes; otherwise all from the env.
1344 0 : let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
1345 0 : safekeepers
1346 : } else {
1347 0 : env.safekeepers.iter().map(|sk| sk.id).collect()
1348 : };
1349 :
1350 0 : let endpoint = cplane
1351 0 : .endpoints
1352 0 : .get(endpoint_id.as_str())
1353 0 : .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
1354 :
1355 0 : if !args.allow_multiple {
1356 0 : cplane.check_conflicting_endpoints(
1357 0 : endpoint.mode,
1358 0 : endpoint.tenant_id,
1359 0 : endpoint.timeline_id,
1360 0 : )?;
1361 0 : }
1362 :
1363 0 : let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
1364 0 : let conf = env.get_pageserver_conf(pageserver_id).unwrap();
1365 0 : let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
1366 0 : (
1367 0 : vec![(parsed.0, parsed.1.unwrap_or(5432))],
1368 0 : // If caller is telling us what pageserver to use, this is not a tenant which is
1369 0 : // full managed by storage controller, therefore not sharded.
1370 0 : ShardParameters::DEFAULT_STRIPE_SIZE,
1371 0 : )
1372 : } else {
1373 : // Look up the currently attached location of the tenant, and its striping metadata,
1374 : // to pass these on to postgres.
1375 0 : let storage_controller = StorageController::from_env(env);
1376 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
1377 0 : let pageservers = futures::future::try_join_all(
1378 0 : locate_result.shards.into_iter().map(|shard| async move {
1379 0 : if let ComputeMode::Static(lsn) = endpoint.mode {
1380 : // Initialize LSN leases for static computes.
1381 0 : let conf = env.get_pageserver_conf(shard.node_id).unwrap();
1382 0 : let pageserver = PageServerNode::from_env(env, conf);
1383 0 :
1384 0 : pageserver
1385 0 : .http_client
1386 0 : .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
1387 0 : .await?;
1388 0 : }
1389 :
1390 0 : anyhow::Ok((
1391 0 : Host::parse(&shard.listen_pg_addr)
1392 0 : .expect("Storage controller reported bad hostname"),
1393 0 : shard.listen_pg_port,
1394 0 : ))
1395 0 : }),
1396 0 : )
1397 0 : .await?;
1398 0 : let stripe_size = locate_result.shard_params.stripe_size;
1399 0 :
1400 0 : (pageservers, stripe_size)
1401 : };
1402 0 : assert!(!pageservers.is_empty());
1403 :
1404 0 : let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
1405 0 : let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
1406 0 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
1407 0 :
1408 0 : Some(env.generate_auth_token(&claims)?)
1409 : } else {
1410 0 : None
1411 : };
1412 :
1413 0 : println!("Starting existing endpoint {endpoint_id}...");
1414 0 : endpoint
1415 0 : .start(
1416 0 : &auth_token,
1417 0 : safekeepers,
1418 0 : pageservers,
1419 0 : remote_ext_config.as_ref(),
1420 0 : stripe_size.0 as usize,
1421 0 : args.create_test_user,
1422 0 : )
1423 0 : .await?;
1424 : }
1425 0 : EndpointCmd::Reconfigure(args) => {
1426 0 : let endpoint_id = &args.endpoint_id;
1427 0 : let endpoint = cplane
1428 0 : .endpoints
1429 0 : .get(endpoint_id.as_str())
1430 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1431 0 : let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
1432 0 : let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
1433 0 : vec![(
1434 0 : pageserver.pg_connection_config.host().clone(),
1435 0 : pageserver.pg_connection_config.port(),
1436 0 : )]
1437 : } else {
1438 0 : let storage_controller = StorageController::from_env(env);
1439 0 : storage_controller
1440 0 : .tenant_locate(endpoint.tenant_id)
1441 0 : .await?
1442 : .shards
1443 0 : .into_iter()
1444 0 : .map(|shard| {
1445 0 : (
1446 0 : Host::parse(&shard.listen_pg_addr)
1447 0 : .expect("Storage controller reported malformed host"),
1448 0 : shard.listen_pg_port,
1449 0 : )
1450 0 : })
1451 0 : .collect::<Vec<_>>()
1452 : };
1453 : // If --safekeepers argument is given, use only the listed
1454 : // safekeeper nodes; otherwise all from the env.
1455 0 : let safekeepers = parse_safekeepers(&args.safekeepers)?;
1456 0 : endpoint.reconfigure(pageservers, None, safekeepers).await?;
1457 : }
1458 0 : EndpointCmd::Stop(args) => {
1459 0 : let endpoint_id = &args.endpoint_id;
1460 0 : let endpoint = cplane
1461 0 : .endpoints
1462 0 : .get(endpoint_id)
1463 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1464 0 : endpoint.stop(&args.mode, args.destroy)?;
1465 : }
1466 : }
1467 :
1468 0 : Ok(())
1469 0 : }
1470 :
1471 : /// Parse --safekeepers as list of safekeeper ids.
1472 0 : fn parse_safekeepers(safekeepers_str: &Option<String>) -> Result<Option<Vec<NodeId>>> {
1473 0 : if let Some(safekeepers_str) = safekeepers_str {
1474 0 : let mut safekeepers: Vec<NodeId> = Vec::new();
1475 0 : for sk_id in safekeepers_str.split(',').map(str::trim) {
1476 0 : let sk_id = NodeId(
1477 0 : u64::from_str(sk_id)
1478 0 : .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
1479 : );
1480 0 : safekeepers.push(sk_id);
1481 : }
1482 0 : Ok(Some(safekeepers))
1483 : } else {
1484 0 : Ok(None)
1485 : }
1486 0 : }
1487 :
1488 0 : fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1489 0 : match subcmd {
1490 0 : MappingsCmd::Map(args) => {
1491 0 : env.register_branch_mapping(
1492 0 : args.branch_name.to_owned(),
1493 0 : args.tenant_id,
1494 0 : args.timeline_id,
1495 0 : )?;
1496 :
1497 0 : Ok(())
1498 : }
1499 : }
1500 0 : }
1501 :
1502 0 : fn get_pageserver(
1503 0 : env: &local_env::LocalEnv,
1504 0 : pageserver_id_arg: Option<NodeId>,
1505 0 : ) -> Result<PageServerNode> {
1506 0 : let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID);
1507 0 :
1508 0 : Ok(PageServerNode::from_env(
1509 0 : env,
1510 0 : env.get_pageserver_conf(node_id)?,
1511 : ))
1512 0 : }
1513 :
1514 0 : async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> {
1515 0 : match subcmd {
1516 0 : PageserverCmd::Start(args) => {
1517 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?
1518 0 : .start(&args.start_timeout)
1519 0 : .await
1520 : {
1521 0 : eprintln!("pageserver start failed: {e}");
1522 0 : exit(1);
1523 0 : }
1524 : }
1525 :
1526 0 : PageserverCmd::Stop(args) => {
1527 0 : let immediate = match args.stop_mode {
1528 0 : StopMode::Fast => false,
1529 0 : StopMode::Immediate => true,
1530 : };
1531 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
1532 0 : eprintln!("pageserver stop failed: {}", e);
1533 0 : exit(1);
1534 0 : }
1535 : }
1536 :
1537 0 : PageserverCmd::Restart(args) => {
1538 0 : let pageserver = get_pageserver(env, args.pageserver_id)?;
1539 : //TODO what shutdown strategy should we use here?
1540 0 : if let Err(e) = pageserver.stop(false) {
1541 0 : eprintln!("pageserver stop failed: {}", e);
1542 0 : exit(1);
1543 0 : }
1544 :
1545 0 : if let Err(e) = pageserver.start(&args.start_timeout).await {
1546 0 : eprintln!("pageserver start failed: {e}");
1547 0 : exit(1);
1548 0 : }
1549 : }
1550 :
1551 0 : PageserverCmd::Status(args) => {
1552 0 : match get_pageserver(env, args.pageserver_id)?
1553 0 : .check_status()
1554 0 : .await
1555 : {
1556 0 : Ok(_) => println!("Page server is up and running"),
1557 0 : Err(err) => {
1558 0 : eprintln!("Page server is not available: {}", err);
1559 0 : exit(1);
1560 : }
1561 : }
1562 : }
1563 : }
1564 0 : Ok(())
1565 0 : }
1566 :
1567 0 : async fn handle_storage_controller(
1568 0 : subcmd: &StorageControllerCmd,
1569 0 : env: &local_env::LocalEnv,
1570 0 : ) -> Result<()> {
1571 0 : let svc = StorageController::from_env(env);
1572 0 : match subcmd {
1573 0 : StorageControllerCmd::Start(args) => {
1574 0 : let start_args = NeonStorageControllerStartArgs {
1575 0 : instance_id: args.instance_id,
1576 0 : base_port: args.base_port,
1577 0 : start_timeout: args.start_timeout,
1578 0 : };
1579 :
1580 0 : if let Err(e) = svc.start(start_args).await {
1581 0 : eprintln!("start failed: {e}");
1582 0 : exit(1);
1583 0 : }
1584 : }
1585 :
1586 0 : StorageControllerCmd::Stop(args) => {
1587 0 : let stop_args = NeonStorageControllerStopArgs {
1588 0 : instance_id: args.instance_id,
1589 0 : immediate: match args.stop_mode {
1590 0 : StopMode::Fast => false,
1591 0 : StopMode::Immediate => true,
1592 : },
1593 : };
1594 0 : if let Err(e) = svc.stop(stop_args).await {
1595 0 : eprintln!("stop failed: {}", e);
1596 0 : exit(1);
1597 0 : }
1598 : }
1599 : }
1600 0 : Ok(())
1601 0 : }
1602 :
1603 0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1604 0 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1605 0 : Ok(SafekeeperNode::from_env(env, node))
1606 : } else {
1607 0 : bail!("could not find safekeeper {id}")
1608 : }
1609 0 : }
1610 :
1611 0 : async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> {
1612 0 : match subcmd {
1613 0 : SafekeeperCmd::Start(args) => {
1614 0 : let safekeeper = get_safekeeper(env, args.id)?;
1615 :
1616 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1617 0 : eprintln!("safekeeper start failed: {}", e);
1618 0 : exit(1);
1619 0 : }
1620 : }
1621 :
1622 0 : SafekeeperCmd::Stop(args) => {
1623 0 : let safekeeper = get_safekeeper(env, args.id)?;
1624 0 : let immediate = match args.stop_mode {
1625 0 : StopMode::Fast => false,
1626 0 : StopMode::Immediate => true,
1627 : };
1628 0 : if let Err(e) = safekeeper.stop(immediate) {
1629 0 : eprintln!("safekeeper stop failed: {}", e);
1630 0 : exit(1);
1631 0 : }
1632 : }
1633 :
1634 0 : SafekeeperCmd::Restart(args) => {
1635 0 : let safekeeper = get_safekeeper(env, args.id)?;
1636 0 : let immediate = match args.stop_mode {
1637 0 : StopMode::Fast => false,
1638 0 : StopMode::Immediate => true,
1639 : };
1640 :
1641 0 : if let Err(e) = safekeeper.stop(immediate) {
1642 0 : eprintln!("safekeeper stop failed: {}", e);
1643 0 : exit(1);
1644 0 : }
1645 :
1646 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1647 0 : eprintln!("safekeeper start failed: {}", e);
1648 0 : exit(1);
1649 0 : }
1650 : }
1651 : }
1652 0 : Ok(())
1653 0 : }
1654 :
1655 0 : async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
1656 0 : match subcmd {
1657 0 : StorageBrokerCmd::Start(args) => {
1658 0 : if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
1659 0 : eprintln!("broker start failed: {e}");
1660 0 : exit(1);
1661 0 : }
1662 : }
1663 :
1664 0 : StorageBrokerCmd::Stop(_args) => {
1665 : // FIXME: stop_mode unused
1666 0 : if let Err(e) = broker::stop_broker_process(env) {
1667 0 : eprintln!("broker stop failed: {e}");
1668 0 : exit(1);
1669 0 : }
1670 : }
1671 : }
1672 0 : Ok(())
1673 0 : }
1674 :
1675 0 : async fn handle_start_all(
1676 0 : args: &StartCmdArgs,
1677 0 : env: &'static local_env::LocalEnv,
1678 0 : ) -> anyhow::Result<()> {
1679 : // FIXME: this was called "retry_timeout", is it right?
1680 0 : let Err(errors) = handle_start_all_impl(env, args.timeout).await else {
1681 0 : neon_start_status_check(env, args.timeout.as_ref())
1682 0 : .await
1683 0 : .context("status check after successful startup of all services")?;
1684 0 : return Ok(());
1685 : };
1686 :
1687 0 : eprintln!("startup failed because one or more services could not be started");
1688 :
1689 0 : for e in errors {
1690 0 : eprintln!("{e}");
1691 0 : let debug_repr = format!("{e:?}");
1692 0 : for line in debug_repr.lines() {
1693 0 : eprintln!(" {line}");
1694 0 : }
1695 : }
1696 :
1697 0 : try_stop_all(env, true).await;
1698 :
1699 0 : exit(2);
1700 0 : }
1701 :
1702 : /// Returns Ok() if and only if all services could be started successfully.
1703 : /// Otherwise, returns the list of errors that occurred during startup.
1704 0 : async fn handle_start_all_impl(
1705 0 : env: &'static local_env::LocalEnv,
1706 0 : retry_timeout: humantime::Duration,
1707 0 : ) -> Result<(), Vec<anyhow::Error>> {
1708 0 : // Endpoints are not started automatically
1709 0 :
1710 0 : let mut js = JoinSet::new();
1711 :
1712 : // force infalliblity through closure
1713 : #[allow(clippy::redundant_closure_call)]
1714 0 : (|| {
1715 0 : js.spawn(async move {
1716 0 : let retry_timeout = retry_timeout;
1717 0 : broker::start_broker_process(env, &retry_timeout).await
1718 0 : });
1719 0 :
1720 0 : // Only start the storage controller if the pageserver is configured to need it
1721 0 : if env.control_plane_api.is_some() {
1722 0 : js.spawn(async move {
1723 0 : let storage_controller = StorageController::from_env(env);
1724 0 : storage_controller
1725 0 : .start(NeonStorageControllerStartArgs::with_default_instance_id(
1726 0 : retry_timeout,
1727 0 : ))
1728 0 : .await
1729 0 : .map_err(|e| e.context("start storage_controller"))
1730 0 : });
1731 0 : }
1732 :
1733 0 : for ps_conf in &env.pageservers {
1734 0 : js.spawn(async move {
1735 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1736 0 : pageserver
1737 0 : .start(&retry_timeout)
1738 0 : .await
1739 0 : .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
1740 0 : });
1741 0 : }
1742 :
1743 0 : for node in env.safekeepers.iter() {
1744 0 : js.spawn(async move {
1745 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1746 0 : safekeeper
1747 0 : .start(&[], &retry_timeout)
1748 0 : .await
1749 0 : .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
1750 0 : });
1751 0 : }
1752 0 : })();
1753 0 :
1754 0 : let mut errors = Vec::new();
1755 0 : while let Some(result) = js.join_next().await {
1756 0 : let result = result.expect("we don't panic or cancel the tasks");
1757 0 : if let Err(e) = result {
1758 0 : errors.push(e);
1759 0 : }
1760 : }
1761 :
1762 0 : if !errors.is_empty() {
1763 0 : return Err(errors);
1764 0 : }
1765 0 :
1766 0 : Ok(())
1767 0 : }
1768 :
1769 0 : async fn neon_start_status_check(
1770 0 : env: &local_env::LocalEnv,
1771 0 : retry_timeout: &Duration,
1772 0 : ) -> anyhow::Result<()> {
1773 : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
1774 : const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
1775 :
1776 0 : if env.control_plane_api.is_none() {
1777 0 : return Ok(());
1778 0 : }
1779 0 :
1780 0 : let storcon = StorageController::from_env(env);
1781 0 :
1782 0 : let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
1783 0 : let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
1784 0 :
1785 0 : println!("\nRunning neon status check");
1786 :
1787 0 : for retry in 0..retries {
1788 0 : if retry == notice_after_retries {
1789 0 : println!("\nNeon status check has not passed yet, continuing to wait")
1790 0 : }
1791 :
1792 0 : let mut passed = true;
1793 0 : let mut nodes = storcon.node_list().await?;
1794 0 : let mut pageservers = env.pageservers.clone();
1795 0 :
1796 0 : if nodes.len() != pageservers.len() {
1797 0 : continue;
1798 0 : }
1799 0 :
1800 0 : nodes.sort_by_key(|ps| ps.id);
1801 0 : pageservers.sort_by_key(|ps| ps.id);
1802 :
1803 0 : for (idx, pageserver) in pageservers.iter().enumerate() {
1804 0 : let node = &nodes[idx];
1805 0 : if node.id != pageserver.id {
1806 0 : passed = false;
1807 0 : break;
1808 0 : }
1809 :
1810 0 : if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
1811 0 : passed = false;
1812 0 : break;
1813 0 : }
1814 : }
1815 :
1816 0 : if passed {
1817 0 : println!("\nNeon started and passed status check");
1818 0 : return Ok(());
1819 0 : }
1820 0 :
1821 0 : tokio::time::sleep(RETRY_INTERVAL).await;
1822 : }
1823 :
1824 0 : anyhow::bail!("\nNeon passed status check")
1825 0 : }
1826 :
1827 0 : async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> {
1828 0 : let immediate = match args.mode {
1829 0 : StopMode::Fast => false,
1830 0 : StopMode::Immediate => true,
1831 : };
1832 :
1833 0 : try_stop_all(env, immediate).await;
1834 :
1835 0 : Ok(())
1836 0 : }
1837 :
1838 0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
1839 0 : // Stop all endpoints
1840 0 : match ComputeControlPlane::load(env.clone()) {
1841 0 : Ok(cplane) => {
1842 0 : for (_k, node) in cplane.endpoints {
1843 0 : if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
1844 0 : eprintln!("postgres stop failed: {e:#}");
1845 0 : }
1846 : }
1847 : }
1848 0 : Err(e) => {
1849 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
1850 : }
1851 : }
1852 :
1853 0 : for ps_conf in &env.pageservers {
1854 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1855 0 : if let Err(e) = pageserver.stop(immediate) {
1856 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
1857 0 : }
1858 : }
1859 :
1860 0 : for node in env.safekeepers.iter() {
1861 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1862 0 : if let Err(e) = safekeeper.stop(immediate) {
1863 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
1864 0 : }
1865 : }
1866 :
1867 0 : if let Err(e) = broker::stop_broker_process(env) {
1868 0 : eprintln!("neon broker stop failed: {e:#}");
1869 0 : }
1870 :
1871 : // Stop all storage controller instances. In the most common case there's only one,
1872 : // but iterate though the base data directory in order to discover the instances.
1873 0 : let storcon_instances = env
1874 0 : .storage_controller_instances()
1875 0 : .await
1876 0 : .expect("Must inspect data dir");
1877 0 : for (instance_id, _instance_dir_path) in storcon_instances {
1878 0 : let storage_controller = StorageController::from_env(env);
1879 0 : let stop_args = NeonStorageControllerStopArgs {
1880 0 : instance_id,
1881 0 : immediate,
1882 0 : };
1883 :
1884 0 : if let Err(e) = storage_controller.stop(stop_args).await {
1885 0 : eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
1886 0 : }
1887 : }
1888 0 : }
|