Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use std::borrow::Cow;
9 : use std::collections::{BTreeSet, HashMap};
10 : use std::fs::File;
11 : use std::os::fd::AsRawFd;
12 : use std::path::PathBuf;
13 : use std::process::exit;
14 : use std::str::FromStr;
15 : use std::time::Duration;
16 :
17 : use anyhow::{Context, Result, anyhow, bail};
18 : use clap::Parser;
19 : use compute_api::spec::ComputeMode;
20 : use control_plane::endpoint::ComputeControlPlane;
21 : use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
22 : use control_plane::local_env::{
23 : EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
24 : NeonLocalInitPageserverConf, SafekeeperConf,
25 : };
26 : use control_plane::pageserver::PageServerNode;
27 : use control_plane::safekeeper::SafekeeperNode;
28 : use control_plane::storage_controller::{
29 : NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
30 : };
31 : use control_plane::{broker, local_env};
32 : use nix::fcntl::{FlockArg, flock};
33 : use pageserver_api::config::{
34 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
35 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
36 : };
37 : use pageserver_api::controller_api::{
38 : NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
39 : };
40 : use pageserver_api::models::{
41 : ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
42 : };
43 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
44 : use postgres_backend::AuthType;
45 : use postgres_connection::parse_host_port;
46 : use safekeeper_api::membership::SafekeeperGeneration;
47 : use safekeeper_api::{
48 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
49 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
50 : };
51 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
52 : use tokio::task::JoinSet;
53 : use url::Host;
54 : use utils::auth::{Claims, Scope};
55 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
56 : use utils::lsn::Lsn;
57 : use utils::project_git_version;
58 :
59 : // Default id of a safekeeper node, if not specified on the command line.
60 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
61 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
62 : const DEFAULT_BRANCH_NAME: &str = "main";
63 : project_git_version!(GIT_VERSION);
64 :
65 : const DEFAULT_PG_VERSION: u32 = 17;
66 :
67 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
68 :
69 : #[derive(clap::Parser)]
70 : #[command(version = GIT_VERSION, about, name = "Neon CLI")]
71 : struct Cli {
72 : #[command(subcommand)]
73 : command: NeonLocalCmd,
74 : }
75 :
76 : #[derive(clap::Subcommand)]
77 : enum NeonLocalCmd {
78 : Init(InitCmdArgs),
79 :
80 : #[command(subcommand)]
81 : Tenant(TenantCmd),
82 : #[command(subcommand)]
83 : Timeline(TimelineCmd),
84 : #[command(subcommand)]
85 : Pageserver(PageserverCmd),
86 : #[command(subcommand)]
87 : #[clap(alias = "storage_controller")]
88 : StorageController(StorageControllerCmd),
89 : #[command(subcommand)]
90 : #[clap(alias = "storage_broker")]
91 : StorageBroker(StorageBrokerCmd),
92 : #[command(subcommand)]
93 : Safekeeper(SafekeeperCmd),
94 : #[command(subcommand)]
95 : EndpointStorage(EndpointStorageCmd),
96 : #[command(subcommand)]
97 : Endpoint(EndpointCmd),
98 : #[command(subcommand)]
99 : Mappings(MappingsCmd),
100 :
101 : Start(StartCmdArgs),
102 : Stop(StopCmdArgs),
103 : }
104 :
105 : #[derive(clap::Args)]
106 : #[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")]
107 : struct InitCmdArgs {
108 : #[clap(long, help("How many pageservers to create (default 1)"))]
109 : num_pageservers: Option<u16>,
110 :
111 : #[clap(long)]
112 : config: Option<PathBuf>,
113 :
114 : #[clap(long, help("Force initialization even if the repository is not empty"))]
115 : #[arg(value_parser)]
116 : #[clap(default_value = "must-not-exist")]
117 0 : force: InitForceMode,
118 : }
119 :
120 : #[derive(clap::Args)]
121 : #[clap(about = "Start pageserver and safekeepers")]
122 : struct StartCmdArgs {
123 : #[clap(long = "start-timeout", default_value = "10s")]
124 0 : timeout: humantime::Duration,
125 : }
126 :
127 : #[derive(clap::Args)]
128 : #[clap(about = "Stop pageserver and safekeepers")]
129 : struct StopCmdArgs {
130 : #[arg(value_enum)]
131 0 : #[clap(long, default_value_t = StopMode::Fast)]
132 0 : mode: StopMode,
133 : }
134 :
135 : #[derive(Clone, Copy, clap::ValueEnum)]
136 : enum StopMode {
137 : Fast,
138 : Immediate,
139 : }
140 :
141 : #[derive(clap::Subcommand)]
142 : #[clap(about = "Manage tenants")]
143 : enum TenantCmd {
144 : List,
145 : Create(TenantCreateCmdArgs),
146 : SetDefault(TenantSetDefaultCmdArgs),
147 : Config(TenantConfigCmdArgs),
148 : Import(TenantImportCmdArgs),
149 : }
150 :
151 : #[derive(clap::Args)]
152 : struct TenantCreateCmdArgs {
153 : #[clap(
154 : long = "tenant-id",
155 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
156 : )]
157 : tenant_id: Option<TenantId>,
158 :
159 : #[clap(
160 : long,
161 : help = "Use a specific timeline id when creating a tenant and its initial timeline"
162 : )]
163 : timeline_id: Option<TimelineId>,
164 :
165 : #[clap(short = 'c')]
166 0 : config: Vec<String>,
167 :
168 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
169 : #[clap(long, help = "Postgres version to use for the initial timeline")]
170 0 : pg_version: u32,
171 :
172 : #[clap(
173 : long,
174 : help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified"
175 : )]
176 0 : set_default: bool,
177 :
178 : #[clap(long, help = "Number of shards in the new tenant")]
179 0 : #[arg(default_value_t = 0)]
180 0 : shard_count: u8,
181 : #[clap(long, help = "Sharding stripe size in pages")]
182 : shard_stripe_size: Option<u32>,
183 :
184 : #[clap(long, help = "Placement policy shards in this tenant")]
185 : #[arg(value_parser = parse_placement_policy)]
186 : placement_policy: Option<PlacementPolicy>,
187 : }
188 :
189 0 : fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
190 0 : Ok(serde_json::from_str::<PlacementPolicy>(s)?)
191 0 : }
192 :
193 : #[derive(clap::Args)]
194 : #[clap(
195 : about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"
196 : )]
197 : struct TenantSetDefaultCmdArgs {
198 : #[clap(
199 : long = "tenant-id",
200 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
201 : )]
202 0 : tenant_id: TenantId,
203 : }
204 :
205 : #[derive(clap::Args)]
206 : struct TenantConfigCmdArgs {
207 : #[clap(
208 : long = "tenant-id",
209 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
210 : )]
211 : tenant_id: Option<TenantId>,
212 :
213 : #[clap(short = 'c')]
214 0 : config: Vec<String>,
215 : }
216 :
217 : #[derive(clap::Args)]
218 : #[clap(
219 : about = "Import a tenant that is present in remote storage, and create branches for its timelines"
220 : )]
221 : struct TenantImportCmdArgs {
222 : #[clap(
223 : long = "tenant-id",
224 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
225 : )]
226 0 : tenant_id: TenantId,
227 : }
228 :
229 : #[derive(clap::Subcommand)]
230 : #[clap(about = "Manage timelines")]
231 : enum TimelineCmd {
232 : List(TimelineListCmdArgs),
233 : Branch(TimelineBranchCmdArgs),
234 : Create(TimelineCreateCmdArgs),
235 : Import(TimelineImportCmdArgs),
236 : }
237 :
238 : #[derive(clap::Args)]
239 : #[clap(about = "List all timelines available to this pageserver")]
240 : struct TimelineListCmdArgs {
241 : #[clap(
242 : long = "tenant-id",
243 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
244 : )]
245 : tenant_shard_id: Option<TenantShardId>,
246 : }
247 :
248 : #[derive(clap::Args)]
249 : #[clap(about = "Create a new timeline, branching off from another timeline")]
250 : struct TimelineBranchCmdArgs {
251 : #[clap(
252 : long = "tenant-id",
253 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
254 : )]
255 : tenant_id: Option<TenantId>,
256 :
257 : #[clap(long, help = "New timeline's ID")]
258 : timeline_id: Option<TimelineId>,
259 :
260 : #[clap(long, help = "Human-readable alias for the new timeline")]
261 0 : branch_name: String,
262 :
263 : #[clap(
264 : long,
265 : help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name."
266 : )]
267 : ancestor_branch_name: Option<String>,
268 :
269 : #[clap(
270 : long,
271 : help = "When using another timeline as base, use a specific Lsn in it instead of the latest one"
272 : )]
273 : ancestor_start_lsn: Option<Lsn>,
274 : }
275 :
276 : #[derive(clap::Args)]
277 : #[clap(about = "Create a new blank timeline")]
278 : struct TimelineCreateCmdArgs {
279 : #[clap(
280 : long = "tenant-id",
281 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
282 : )]
283 : tenant_id: Option<TenantId>,
284 :
285 : #[clap(long, help = "New timeline's ID")]
286 : timeline_id: Option<TimelineId>,
287 :
288 : #[clap(long, help = "Human-readable alias for the new timeline")]
289 0 : branch_name: String,
290 :
291 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
292 : #[clap(long, help = "Postgres version")]
293 0 : pg_version: u32,
294 : }
295 :
296 : #[derive(clap::Args)]
297 : #[clap(about = "Import timeline from a basebackup directory")]
298 : struct TimelineImportCmdArgs {
299 : #[clap(
300 : long = "tenant-id",
301 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
302 : )]
303 : tenant_id: Option<TenantId>,
304 :
305 : #[clap(long, help = "New timeline's ID")]
306 0 : timeline_id: TimelineId,
307 :
308 : #[clap(long, help = "Human-readable alias for the new timeline")]
309 0 : branch_name: String,
310 :
311 : #[clap(long, help = "Basebackup tarfile to import")]
312 0 : base_tarfile: PathBuf,
313 :
314 : #[clap(long, help = "Lsn the basebackup starts at")]
315 0 : base_lsn: Lsn,
316 :
317 : #[clap(long, help = "Wal to add after base")]
318 : wal_tarfile: Option<PathBuf>,
319 :
320 : #[clap(long, help = "Lsn the basebackup ends at")]
321 : end_lsn: Option<Lsn>,
322 :
323 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
324 : #[clap(long, help = "Postgres version of the backup being imported")]
325 0 : pg_version: u32,
326 : }
327 :
328 : #[derive(clap::Subcommand)]
329 : #[clap(about = "Manage pageservers")]
330 : enum PageserverCmd {
331 : Status(PageserverStatusCmdArgs),
332 : Start(PageserverStartCmdArgs),
333 : Stop(PageserverStopCmdArgs),
334 : Restart(PageserverRestartCmdArgs),
335 : }
336 :
337 : #[derive(clap::Args)]
338 : #[clap(about = "Show status of a local pageserver")]
339 : struct PageserverStatusCmdArgs {
340 : #[clap(long = "id", help = "pageserver id")]
341 : pageserver_id: Option<NodeId>,
342 : }
343 :
344 : #[derive(clap::Args)]
345 : #[clap(about = "Start local pageserver")]
346 : struct PageserverStartCmdArgs {
347 : #[clap(long = "id", help = "pageserver id")]
348 : pageserver_id: Option<NodeId>,
349 :
350 : #[clap(short = 't', long, help = "timeout until we fail the command")]
351 : #[arg(default_value = "10s")]
352 0 : start_timeout: humantime::Duration,
353 : }
354 :
355 : #[derive(clap::Args)]
356 : #[clap(about = "Stop local pageserver")]
357 : struct PageserverStopCmdArgs {
358 : #[clap(long = "id", help = "pageserver id")]
359 : pageserver_id: Option<NodeId>,
360 :
361 : #[clap(
362 : short = 'm',
363 : help = "If 'immediate', don't flush repository data at shutdown"
364 : )]
365 : #[arg(value_enum, default_value = "fast")]
366 0 : stop_mode: StopMode,
367 : }
368 :
369 : #[derive(clap::Args)]
370 : #[clap(about = "Restart local pageserver")]
371 : struct PageserverRestartCmdArgs {
372 : #[clap(long = "id", help = "pageserver id")]
373 : pageserver_id: Option<NodeId>,
374 :
375 : #[clap(short = 't', long, help = "timeout until we fail the command")]
376 : #[arg(default_value = "10s")]
377 0 : start_timeout: humantime::Duration,
378 : }
379 :
380 : #[derive(clap::Subcommand)]
381 : #[clap(about = "Manage storage controller")]
382 : enum StorageControllerCmd {
383 : Start(StorageControllerStartCmdArgs),
384 : Stop(StorageControllerStopCmdArgs),
385 : }
386 :
387 : #[derive(clap::Args)]
388 : #[clap(about = "Start storage controller")]
389 : struct StorageControllerStartCmdArgs {
390 : #[clap(short = 't', long, help = "timeout until we fail the command")]
391 : #[arg(default_value = "10s")]
392 0 : start_timeout: humantime::Duration,
393 :
394 : #[clap(
395 : long,
396 : help = "Identifier used to distinguish storage controller instances"
397 : )]
398 0 : #[arg(default_value_t = 1)]
399 0 : instance_id: u8,
400 :
401 : #[clap(
402 : long,
403 : help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
404 : )]
405 : base_port: Option<u16>,
406 : }
407 :
408 : #[derive(clap::Args)]
409 : #[clap(about = "Stop storage controller")]
410 : struct StorageControllerStopCmdArgs {
411 : #[clap(
412 : short = 'm',
413 : help = "If 'immediate', don't flush repository data at shutdown"
414 : )]
415 : #[arg(value_enum, default_value = "fast")]
416 0 : stop_mode: StopMode,
417 :
418 : #[clap(
419 : long,
420 : help = "Identifier used to distinguish storage controller instances"
421 : )]
422 0 : #[arg(default_value_t = 1)]
423 0 : instance_id: u8,
424 : }
425 :
426 : #[derive(clap::Subcommand)]
427 : #[clap(about = "Manage storage broker")]
428 : enum StorageBrokerCmd {
429 : Start(StorageBrokerStartCmdArgs),
430 : Stop(StorageBrokerStopCmdArgs),
431 : }
432 :
433 : #[derive(clap::Args)]
434 : #[clap(about = "Start broker")]
435 : struct StorageBrokerStartCmdArgs {
436 : #[clap(short = 't', long, help = "timeout until we fail the command")]
437 : #[arg(default_value = "10s")]
438 0 : start_timeout: humantime::Duration,
439 : }
440 :
441 : #[derive(clap::Args)]
442 : #[clap(about = "stop broker")]
443 : struct StorageBrokerStopCmdArgs {
444 : #[clap(
445 : short = 'm',
446 : help = "If 'immediate', don't flush repository data at shutdown"
447 : )]
448 : #[arg(value_enum, default_value = "fast")]
449 0 : stop_mode: StopMode,
450 : }
451 :
452 : #[derive(clap::Subcommand)]
453 : #[clap(about = "Manage safekeepers")]
454 : enum SafekeeperCmd {
455 : Start(SafekeeperStartCmdArgs),
456 : Stop(SafekeeperStopCmdArgs),
457 : Restart(SafekeeperRestartCmdArgs),
458 : }
459 :
460 : #[derive(clap::Subcommand)]
461 : #[clap(about = "Manage object storage")]
462 : enum EndpointStorageCmd {
463 : Start(EndpointStorageStartCmd),
464 : Stop(EndpointStorageStopCmd),
465 : }
466 :
467 : #[derive(clap::Args)]
468 : #[clap(about = "Start object storage")]
469 : struct EndpointStorageStartCmd {
470 : #[clap(short = 't', long, help = "timeout until we fail the command")]
471 : #[arg(default_value = "10s")]
472 0 : start_timeout: humantime::Duration,
473 : }
474 :
475 : #[derive(clap::Args)]
476 : #[clap(about = "Stop object storage")]
477 : struct EndpointStorageStopCmd {
478 : #[arg(value_enum, default_value = "fast")]
479 : #[clap(
480 : short = 'm',
481 : help = "If 'immediate', don't flush repository data at shutdown"
482 : )]
483 0 : stop_mode: StopMode,
484 : }
485 :
486 : #[derive(clap::Args)]
487 : #[clap(about = "Start local safekeeper")]
488 : struct SafekeeperStartCmdArgs {
489 : #[clap(help = "safekeeper id")]
490 0 : #[arg(default_value_t = NodeId(1))]
491 0 : id: NodeId,
492 :
493 : #[clap(
494 : short = 'e',
495 : long = "safekeeper-extra-opt",
496 : help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
497 : )]
498 0 : extra_opt: Vec<String>,
499 :
500 : #[clap(short = 't', long, help = "timeout until we fail the command")]
501 : #[arg(default_value = "10s")]
502 0 : start_timeout: humantime::Duration,
503 : }
504 :
505 : #[derive(clap::Args)]
506 : #[clap(about = "Stop local safekeeper")]
507 : struct SafekeeperStopCmdArgs {
508 : #[clap(help = "safekeeper id")]
509 0 : #[arg(default_value_t = NodeId(1))]
510 0 : id: NodeId,
511 :
512 : #[arg(value_enum, default_value = "fast")]
513 : #[clap(
514 : short = 'm',
515 : help = "If 'immediate', don't flush repository data at shutdown"
516 : )]
517 0 : stop_mode: StopMode,
518 : }
519 :
520 : #[derive(clap::Args)]
521 : #[clap(about = "Restart local safekeeper")]
522 : struct SafekeeperRestartCmdArgs {
523 : #[clap(help = "safekeeper id")]
524 0 : #[arg(default_value_t = NodeId(1))]
525 0 : id: NodeId,
526 :
527 : #[arg(value_enum, default_value = "fast")]
528 : #[clap(
529 : short = 'm',
530 : help = "If 'immediate', don't flush repository data at shutdown"
531 : )]
532 0 : stop_mode: StopMode,
533 :
534 : #[clap(
535 : short = 'e',
536 : long = "safekeeper-extra-opt",
537 : help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
538 : )]
539 0 : extra_opt: Vec<String>,
540 :
541 : #[clap(short = 't', long, help = "timeout until we fail the command")]
542 : #[arg(default_value = "10s")]
543 0 : start_timeout: humantime::Duration,
544 : }
545 :
546 : #[derive(clap::Subcommand)]
547 : #[clap(about = "Manage Postgres instances")]
548 : enum EndpointCmd {
549 : List(EndpointListCmdArgs),
550 : Create(EndpointCreateCmdArgs),
551 : Start(EndpointStartCmdArgs),
552 : Reconfigure(EndpointReconfigureCmdArgs),
553 : Stop(EndpointStopCmdArgs),
554 : GenerateJwt(EndpointGenerateJwtCmdArgs),
555 : }
556 :
557 : #[derive(clap::Args)]
558 : #[clap(about = "List endpoints")]
559 : struct EndpointListCmdArgs {
560 : #[clap(
561 : long = "tenant-id",
562 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
563 : )]
564 : tenant_shard_id: Option<TenantShardId>,
565 : }
566 :
567 : #[derive(clap::Args)]
568 : #[clap(about = "Create a compute endpoint")]
569 : struct EndpointCreateCmdArgs {
570 : #[clap(
571 : long = "tenant-id",
572 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
573 : )]
574 : tenant_id: Option<TenantId>,
575 :
576 : #[clap(help = "Postgres endpoint id")]
577 : endpoint_id: Option<String>,
578 : #[clap(long, help = "Name of the branch the endpoint will run on")]
579 : branch_name: Option<String>,
580 : #[clap(
581 : long,
582 : help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used"
583 : )]
584 : lsn: Option<Lsn>,
585 : #[clap(long)]
586 : pg_port: Option<u16>,
587 : #[clap(long, alias = "http-port")]
588 : external_http_port: Option<u16>,
589 : #[clap(long)]
590 : internal_http_port: Option<u16>,
591 : #[clap(long = "pageserver-id")]
592 : endpoint_pageserver_id: Option<NodeId>,
593 :
594 : #[clap(
595 : long,
596 : help = "Don't do basebackup, create endpoint directory with only config files",
597 : action = clap::ArgAction::Set,
598 0 : default_value_t = false
599 : )]
600 0 : config_only: bool,
601 :
602 0 : #[arg(default_value_t = DEFAULT_PG_VERSION)]
603 : #[clap(long, help = "Postgres version")]
604 0 : pg_version: u32,
605 :
606 : #[clap(
607 : long,
608 : help = "If set, the node will be a hot replica on the specified timeline",
609 : action = clap::ArgAction::Set,
610 0 : default_value_t = false
611 : )]
612 0 : hot_standby: bool,
613 :
614 : #[clap(long, help = "If set, will set up the catalog for neon_superuser")]
615 0 : update_catalog: bool,
616 :
617 : #[clap(
618 : long,
619 : help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
620 : )]
621 0 : allow_multiple: bool,
622 : }
623 :
624 : #[derive(clap::Args)]
625 : #[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
626 : struct EndpointStartCmdArgs {
627 : #[clap(help = "Postgres endpoint id")]
628 0 : endpoint_id: String,
629 : #[clap(long = "pageserver-id")]
630 : endpoint_pageserver_id: Option<NodeId>,
631 :
632 : #[clap(
633 : long,
634 : help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
635 : )]
636 : safekeepers_generation: Option<u32>,
637 : #[clap(
638 : long,
639 : help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override."
640 : )]
641 : safekeepers: Option<String>,
642 :
643 : #[clap(
644 : long,
645 : help = "Configure the remote extensions storage proxy gateway to request for extensions."
646 : )]
647 : remote_ext_config: Option<String>,
648 :
649 : #[clap(
650 : long,
651 : help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`"
652 : )]
653 0 : create_test_user: bool,
654 :
655 : #[clap(
656 : long,
657 : help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
658 : )]
659 0 : allow_multiple: bool,
660 :
661 : #[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
662 : #[arg(default_value = "90s")]
663 0 : start_timeout: Duration,
664 : }
665 :
666 : #[derive(clap::Args)]
667 : #[clap(about = "Reconfigure an endpoint")]
668 : struct EndpointReconfigureCmdArgs {
669 : #[clap(
670 : long = "tenant-id",
671 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
672 : )]
673 : tenant_id: Option<TenantId>,
674 :
675 : #[clap(help = "Postgres endpoint id")]
676 0 : endpoint_id: String,
677 : #[clap(long = "pageserver-id")]
678 : endpoint_pageserver_id: Option<NodeId>,
679 :
680 : #[clap(long)]
681 : safekeepers: Option<String>,
682 : }
683 :
684 : #[derive(clap::Args)]
685 : #[clap(about = "Stop an endpoint")]
686 : struct EndpointStopCmdArgs {
687 : #[clap(help = "Postgres endpoint id")]
688 0 : endpoint_id: String,
689 :
690 : #[clap(
691 : long,
692 : help = "Also delete data directory (now optional, should be default in future)"
693 : )]
694 0 : destroy: bool,
695 :
696 : #[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")]
697 : #[arg(value_parser(["smart", "fast", "immediate"]))]
698 : #[arg(default_value = "fast")]
699 0 : mode: String,
700 : }
701 :
702 : #[derive(clap::Args)]
703 : #[clap(about = "Generate a JWT for an endpoint")]
704 : struct EndpointGenerateJwtCmdArgs {
705 : #[clap(help = "Postgres endpoint id")]
706 0 : endpoint_id: String,
707 : }
708 :
709 : #[derive(clap::Subcommand)]
710 : #[clap(about = "Manage neon_local branch name mappings")]
711 : enum MappingsCmd {
712 : Map(MappingsMapCmdArgs),
713 : }
714 :
715 : #[derive(clap::Args)]
716 : #[clap(about = "Create new mapping which cannot exist already")]
717 : struct MappingsMapCmdArgs {
718 : #[clap(
719 : long,
720 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
721 : )]
722 0 : tenant_id: TenantId,
723 : #[clap(
724 : long,
725 : help = "Timeline id. Represented as a hexadecimal string 32 symbols length"
726 : )]
727 0 : timeline_id: TimelineId,
728 : #[clap(long, help = "Branch name to give to the timeline")]
729 0 : branch_name: String,
730 : }
731 :
732 : ///
733 : /// Timelines tree element used as a value in the HashMap.
734 : ///
735 : struct TimelineTreeEl {
736 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
737 : pub info: TimelineInfo,
738 : /// Name, recovered from neon config mappings
739 : pub name: Option<String>,
740 : /// Holds all direct children of this timeline referenced using `timeline_id`.
741 : pub children: BTreeSet<TimelineId>,
742 : }
743 :
744 : /// A flock-based guard over the neon_local repository directory
745 : struct RepoLock {
746 : _file: File,
747 : }
748 :
749 : impl RepoLock {
750 0 : fn new() -> Result<Self> {
751 0 : let repo_dir = File::open(local_env::base_path())?;
752 0 : let repo_dir_fd = repo_dir.as_raw_fd();
753 0 : flock(repo_dir_fd, FlockArg::LockExclusive)?;
754 :
755 0 : Ok(Self { _file: repo_dir })
756 0 : }
757 : }
758 :
759 : // Main entry point for the 'neon_local' CLI utility
760 : //
761 : // This utility helps to manage neon installation. That includes following:
762 : // * Management of local postgres installations running on top of the
763 : // pageserver.
764 : // * Providing CLI api to the pageserver
765 : // * TODO: export/import to/from usual postgres
766 0 : fn main() -> Result<()> {
767 0 : let cli = Cli::parse();
768 :
769 : // Check for 'neon init' command first.
770 0 : let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
771 0 : (handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
772 : } else {
773 : // This tool uses a collection of simple files to store its state, and consequently
774 : // it is not generally safe to run multiple commands concurrently. Rather than expect
775 : // all callers to know this, use a lock file to protect against concurrent execution.
776 0 : let _repo_lock = RepoLock::new().unwrap();
777 :
778 : // all other commands need an existing config
779 0 : let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
780 0 : let original_env = env.clone();
781 0 : let env = Box::leak(Box::new(env));
782 0 : let rt = tokio::runtime::Builder::new_current_thread()
783 0 : .enable_all()
784 0 : .build()
785 0 : .unwrap();
786 :
787 0 : let subcommand_result = match cli.command {
788 0 : NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"),
789 0 : NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)),
790 0 : NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)),
791 0 : NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)),
792 0 : NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)),
793 0 : NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)),
794 0 : NeonLocalCmd::StorageController(subcmd) => {
795 0 : rt.block_on(handle_storage_controller(&subcmd, env))
796 : }
797 0 : NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
798 0 : NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
799 0 : NeonLocalCmd::EndpointStorage(subcmd) => {
800 0 : rt.block_on(handle_endpoint_storage(&subcmd, env))
801 : }
802 0 : NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
803 0 : NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
804 : };
805 :
806 0 : let subcommand_result = if &original_env != env {
807 0 : subcommand_result.map(|()| Some(Cow::Borrowed(env)))
808 : } else {
809 0 : subcommand_result.map(|()| None)
810 : };
811 0 : (subcommand_result, Some(_repo_lock))
812 : };
813 :
814 0 : match subcommand_result {
815 0 : Ok(Some(updated_env)) => updated_env.persist_config()?,
816 0 : Ok(None) => (),
817 0 : Err(e) => {
818 0 : eprintln!("command failed: {e:?}");
819 0 : exit(1);
820 : }
821 : }
822 0 : Ok(())
823 0 : }
824 :
825 : ///
826 : /// Prints timelines list as a tree-like structure.
827 : ///
828 0 : fn print_timelines_tree(
829 0 : timelines: Vec<TimelineInfo>,
830 0 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
831 0 : ) -> Result<()> {
832 0 : let mut timelines_hash = timelines
833 0 : .iter()
834 0 : .map(|t| {
835 0 : (
836 0 : t.timeline_id,
837 0 : TimelineTreeEl {
838 0 : info: t.clone(),
839 0 : children: BTreeSet::new(),
840 0 : name: timeline_name_mappings
841 0 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
842 0 : },
843 0 : )
844 0 : })
845 0 : .collect::<HashMap<_, _>>();
846 :
847 : // Memorize all direct children of each timeline.
848 0 : for timeline in timelines.iter() {
849 0 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
850 0 : timelines_hash
851 0 : .get_mut(&ancestor_timeline_id)
852 0 : .context("missing timeline info in the HashMap")?
853 : .children
854 0 : .insert(timeline.timeline_id);
855 0 : }
856 : }
857 :
858 0 : for timeline in timelines_hash.values() {
859 : // Start with root local timelines (no ancestors) first.
860 0 : if timeline.info.ancestor_timeline_id.is_none() {
861 0 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
862 0 : }
863 : }
864 :
865 0 : Ok(())
866 0 : }
867 :
868 : ///
869 : /// Recursively prints timeline info with all its children.
870 : ///
871 0 : fn print_timeline(
872 0 : nesting_level: usize,
873 0 : is_last: &[bool],
874 0 : timeline: &TimelineTreeEl,
875 0 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
876 0 : ) -> Result<()> {
877 0 : if nesting_level > 0 {
878 0 : let ancestor_lsn = match timeline.info.ancestor_lsn {
879 0 : Some(lsn) => lsn.to_string(),
880 0 : None => "Unknown Lsn".to_string(),
881 : };
882 :
883 0 : let mut br_sym = "┣━";
884 0 :
885 0 : // Draw each nesting padding with proper style
886 0 : // depending on whether its timeline ended or not.
887 0 : if nesting_level > 1 {
888 0 : for l in &is_last[1..is_last.len() - 1] {
889 0 : if *l {
890 0 : print!(" ");
891 0 : } else {
892 0 : print!("┃ ");
893 0 : }
894 : }
895 0 : }
896 :
897 : // We are the last in this sub-timeline
898 0 : if *is_last.last().unwrap() {
899 0 : br_sym = "┗━";
900 0 : }
901 :
902 0 : print!("{} @{}: ", br_sym, ancestor_lsn);
903 0 : }
904 :
905 : // Finally print a timeline id and name with new line
906 0 : println!(
907 0 : "{} [{}]",
908 0 : timeline.name.as_deref().unwrap_or("_no_name_"),
909 0 : timeline.info.timeline_id
910 0 : );
911 0 :
912 0 : let len = timeline.children.len();
913 0 : let mut i: usize = 0;
914 0 : let mut is_last_new = Vec::from(is_last);
915 0 : is_last_new.push(false);
916 :
917 0 : for child in &timeline.children {
918 0 : i += 1;
919 0 :
920 0 : // Mark that the last padding is the end of the timeline
921 0 : if i == len {
922 0 : if let Some(last) = is_last_new.last_mut() {
923 0 : *last = true;
924 0 : }
925 0 : }
926 :
927 : print_timeline(
928 0 : nesting_level + 1,
929 0 : &is_last_new,
930 0 : timelines
931 0 : .get(child)
932 0 : .context("missing timeline info in the HashMap")?,
933 0 : timelines,
934 0 : )?;
935 : }
936 :
937 0 : Ok(())
938 0 : }
939 :
940 : /// Helper function to get tenant id from an optional --tenant_id option or from the config file
941 0 : fn get_tenant_id(
942 0 : tenant_id_arg: Option<TenantId>,
943 0 : env: &local_env::LocalEnv,
944 0 : ) -> anyhow::Result<TenantId> {
945 0 : if let Some(tenant_id_from_arguments) = tenant_id_arg {
946 0 : Ok(tenant_id_from_arguments)
947 0 : } else if let Some(default_id) = env.default_tenant_id {
948 0 : Ok(default_id)
949 : } else {
950 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
951 : }
952 0 : }
953 :
954 : /// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file,
955 : /// for commands that accept a shard suffix
956 0 : fn get_tenant_shard_id(
957 0 : tenant_shard_id_arg: Option<TenantShardId>,
958 0 : env: &local_env::LocalEnv,
959 0 : ) -> anyhow::Result<TenantShardId> {
960 0 : if let Some(tenant_id_from_arguments) = tenant_shard_id_arg {
961 0 : Ok(tenant_id_from_arguments)
962 0 : } else if let Some(default_id) = env.default_tenant_id {
963 0 : Ok(TenantShardId::unsharded(default_id))
964 : } else {
965 0 : anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
966 : }
967 0 : }
968 :
969 0 : fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
970 : // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
971 0 : let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
972 : // User (likely the Python test suite) provided a description of the environment.
973 0 : if args.num_pageservers.is_some() {
974 0 : bail!(
975 0 : "Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
976 0 : );
977 0 : }
978 : // load and parse the file
979 0 : let contents = std::fs::read_to_string(config_path).with_context(|| {
980 0 : format!(
981 0 : "Could not read configuration file '{}'",
982 0 : config_path.display()
983 0 : )
984 0 : })?;
985 0 : toml_edit::de::from_str(&contents)?
986 : } else {
987 : // User (likely interactive) did not provide a description of the environment, give them the default
988 0 : NeonLocalInitConf {
989 0 : control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
990 0 : broker: NeonBroker {
991 0 : listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
992 0 : },
993 0 : safekeepers: vec![SafekeeperConf {
994 0 : id: DEFAULT_SAFEKEEPER_ID,
995 0 : pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
996 0 : http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
997 0 : ..Default::default()
998 0 : }],
999 0 : pageservers: (0..args.num_pageservers.unwrap_or(1))
1000 0 : .map(|i| {
1001 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
1002 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
1003 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
1004 0 : NeonLocalInitPageserverConf {
1005 0 : id: pageserver_id,
1006 0 : listen_pg_addr: format!("127.0.0.1:{pg_port}"),
1007 0 : listen_http_addr: format!("127.0.0.1:{http_port}"),
1008 0 : listen_https_addr: None,
1009 0 : pg_auth_type: AuthType::Trust,
1010 0 : http_auth_type: AuthType::Trust,
1011 0 : other: Default::default(),
1012 0 : // Typical developer machines use disks with slow fsync, and we don't care
1013 0 : // about data integrity: disable disk syncs.
1014 0 : no_sync: true,
1015 0 : }
1016 0 : })
1017 0 : .collect(),
1018 0 : endpoint_storage: EndpointStorageConf {
1019 0 : port: ENDPOINT_STORAGE_DEFAULT_PORT,
1020 0 : },
1021 0 : pg_distrib_dir: None,
1022 0 : neon_distrib_dir: None,
1023 0 : default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
1024 0 : storage_controller: None,
1025 0 : control_plane_hooks_api: None,
1026 0 : generate_local_ssl_certs: false,
1027 0 : }
1028 : };
1029 :
1030 0 : LocalEnv::init(init_conf, &args.force)
1031 0 : .context("materialize initial neon_local environment on disk")?;
1032 0 : Ok(LocalEnv::load_config(&local_env::base_path())
1033 0 : .expect("freshly written config should be loadable"))
1034 0 : }
1035 :
1036 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
1037 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
1038 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
1039 : /// than this CLI.
1040 0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
1041 0 : let ps_conf = env
1042 0 : .pageservers
1043 0 : .first()
1044 0 : .expect("Config is validated to contain at least one pageserver");
1045 0 : PageServerNode::from_env(env, ps_conf)
1046 0 : }
1047 :
1048 0 : async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
1049 0 : let pageserver = get_default_pageserver(env);
1050 0 : match subcmd {
1051 : TenantCmd::List => {
1052 0 : for t in pageserver.tenant_list().await? {
1053 0 : println!("{} {:?}", t.id, t.state);
1054 0 : }
1055 : }
1056 0 : TenantCmd::Import(args) => {
1057 0 : let tenant_id = args.tenant_id;
1058 0 :
1059 0 : let storage_controller = StorageController::from_env(env);
1060 0 : let create_response = storage_controller.tenant_import(tenant_id).await?;
1061 :
1062 0 : let shard_zero = create_response
1063 0 : .shards
1064 0 : .first()
1065 0 : .expect("Import response omitted shards");
1066 0 :
1067 0 : let attached_pageserver_id = shard_zero.node_id;
1068 0 : let pageserver =
1069 0 : PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
1070 :
1071 0 : println!(
1072 0 : "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
1073 0 : );
1074 :
1075 0 : let timelines = pageserver
1076 0 : .http_client
1077 0 : .list_timelines(shard_zero.shard_id)
1078 0 : .await?;
1079 :
1080 : // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
1081 0 : let main_timeline = timelines
1082 0 : .iter()
1083 0 : .find(|t| t.ancestor_timeline_id.is_none())
1084 0 : .expect("No timelines found")
1085 0 : .timeline_id;
1086 0 :
1087 0 : let mut branch_i = 0;
1088 0 : for timeline in timelines.iter() {
1089 0 : let branch_name = if timeline.timeline_id == main_timeline {
1090 0 : "main".to_string()
1091 : } else {
1092 0 : branch_i += 1;
1093 0 : format!("branch_{branch_i}")
1094 : };
1095 :
1096 0 : println!(
1097 0 : "Importing timeline {tenant_id}/{} as branch {branch_name}",
1098 0 : timeline.timeline_id
1099 0 : );
1100 0 :
1101 0 : env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
1102 : }
1103 : }
1104 0 : TenantCmd::Create(args) => {
1105 0 : let tenant_conf: HashMap<_, _> =
1106 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1107 :
1108 0 : let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
1109 :
1110 : // If tenant ID was not specified, generate one
1111 0 : let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate);
1112 0 :
1113 0 : // We must register the tenant with the storage controller, so
1114 0 : // that when the pageserver restarts, it will be re-attached.
1115 0 : let storage_controller = StorageController::from_env(env);
1116 0 : storage_controller
1117 0 : .tenant_create(TenantCreateRequest {
1118 0 : // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
1119 0 : // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest
1120 0 : // type is used both in the storage controller (for creating tenants) and in the pageserver (for
1121 0 : // creating shards)
1122 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
1123 0 : generation: None,
1124 0 : shard_parameters: ShardParameters {
1125 0 : count: ShardCount::new(args.shard_count),
1126 0 : stripe_size: args
1127 0 : .shard_stripe_size
1128 0 : .map(ShardStripeSize)
1129 0 : .unwrap_or(DEFAULT_STRIPE_SIZE),
1130 0 : },
1131 0 : placement_policy: args.placement_policy.clone(),
1132 0 : config: tenant_conf,
1133 0 : })
1134 0 : .await?;
1135 0 : println!("tenant {tenant_id} successfully created on the pageserver");
1136 0 :
1137 0 : // Create an initial timeline for the new tenant
1138 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1139 0 :
1140 0 : // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
1141 0 : // different shards picking different start lsns. Maybe we have to teach storage controller
1142 0 : // to let shard 0 branch first and then propagate the chosen LSN to other shards.
1143 0 : storage_controller
1144 0 : .tenant_timeline_create(
1145 0 : tenant_id,
1146 0 : TimelineCreateRequest {
1147 0 : new_timeline_id,
1148 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1149 0 : existing_initdb_timeline_id: None,
1150 0 : pg_version: Some(args.pg_version),
1151 0 : },
1152 0 : },
1153 0 : )
1154 0 : .await?;
1155 :
1156 0 : env.register_branch_mapping(
1157 0 : DEFAULT_BRANCH_NAME.to_string(),
1158 0 : tenant_id,
1159 0 : new_timeline_id,
1160 0 : )?;
1161 :
1162 0 : println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
1163 0 :
1164 0 : if args.set_default {
1165 0 : println!("Setting tenant {tenant_id} as a default one");
1166 0 : env.default_tenant_id = Some(tenant_id);
1167 0 : }
1168 : }
1169 0 : TenantCmd::SetDefault(args) => {
1170 0 : println!("Setting tenant {} as a default one", args.tenant_id);
1171 0 : env.default_tenant_id = Some(args.tenant_id);
1172 0 : }
1173 0 : TenantCmd::Config(args) => {
1174 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1175 0 : let tenant_conf: HashMap<_, _> =
1176 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1177 0 : let config = PageServerNode::parse_config(tenant_conf)?;
1178 :
1179 0 : let req = TenantConfigRequest { tenant_id, config };
1180 0 :
1181 0 : let storage_controller = StorageController::from_env(env);
1182 0 : storage_controller
1183 0 : .set_tenant_config(&req)
1184 0 : .await
1185 0 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
1186 0 : println!("tenant {tenant_id} successfully configured via storcon");
1187 : }
1188 : }
1189 0 : Ok(())
1190 0 : }
1191 :
1192 0 : async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1193 0 : let pageserver = get_default_pageserver(env);
1194 0 :
1195 0 : match cmd {
1196 0 : TimelineCmd::List(args) => {
1197 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1198 : // where shard 0 is attached, and query there.
1199 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1200 0 : let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
1201 0 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
1202 : }
1203 0 : TimelineCmd::Create(args) => {
1204 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1205 0 : let new_branch_name = &args.branch_name;
1206 0 : let new_timeline_id_opt = args.timeline_id;
1207 0 : let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
1208 0 :
1209 0 : let storage_controller = StorageController::from_env(env);
1210 0 : let create_req = TimelineCreateRequest {
1211 0 : new_timeline_id,
1212 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1213 0 : existing_initdb_timeline_id: None,
1214 0 : pg_version: Some(args.pg_version),
1215 0 : },
1216 0 : };
1217 0 : let timeline_info = storage_controller
1218 0 : .tenant_timeline_create(tenant_id, create_req)
1219 0 : .await?;
1220 :
1221 0 : let last_record_lsn = timeline_info.last_record_lsn;
1222 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1223 :
1224 0 : println!(
1225 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
1226 0 : timeline_info.timeline_id
1227 0 : );
1228 : }
1229 : // TODO: rename to import-basebackup-plus-wal
1230 0 : TimelineCmd::Import(args) => {
1231 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1232 0 : let timeline_id = args.timeline_id;
1233 0 : let branch_name = &args.branch_name;
1234 0 :
1235 0 : // Parse base inputs
1236 0 : let base = (args.base_lsn, args.base_tarfile.clone());
1237 0 :
1238 0 : // Parse pg_wal inputs
1239 0 : let wal_tarfile = args.wal_tarfile.clone();
1240 0 : let end_lsn = args.end_lsn;
1241 0 : // TODO validate both or none are provided
1242 0 : let pg_wal = end_lsn.zip(wal_tarfile);
1243 0 :
1244 0 : println!("Importing timeline into pageserver ...");
1245 0 : pageserver
1246 0 : .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
1247 0 : .await?;
1248 0 : env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
1249 0 : println!("Done");
1250 : }
1251 0 : TimelineCmd::Branch(args) => {
1252 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1253 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1254 0 : let new_branch_name = &args.branch_name;
1255 0 : let ancestor_branch_name = args
1256 0 : .ancestor_branch_name
1257 0 : .clone()
1258 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1259 0 : let ancestor_timeline_id = env
1260 0 : .get_branch_timeline_id(&ancestor_branch_name, tenant_id)
1261 0 : .ok_or_else(|| {
1262 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
1263 0 : })?;
1264 :
1265 0 : let start_lsn = args.ancestor_start_lsn;
1266 0 : let storage_controller = StorageController::from_env(env);
1267 0 : let create_req = TimelineCreateRequest {
1268 0 : new_timeline_id,
1269 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
1270 0 : ancestor_timeline_id,
1271 0 : ancestor_start_lsn: start_lsn,
1272 0 : pg_version: None,
1273 0 : },
1274 0 : };
1275 0 : let timeline_info = storage_controller
1276 0 : .tenant_timeline_create(tenant_id, create_req)
1277 0 : .await?;
1278 :
1279 0 : let last_record_lsn = timeline_info.last_record_lsn;
1280 0 :
1281 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1282 :
1283 0 : println!(
1284 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
1285 0 : timeline_info.timeline_id
1286 0 : );
1287 : }
1288 : }
1289 :
1290 0 : Ok(())
1291 0 : }
1292 :
1293 0 : async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> {
1294 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
1295 :
1296 0 : match subcmd {
1297 0 : EndpointCmd::List(args) => {
1298 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1299 : // where shard 0 is attached, and query there.
1300 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1301 :
1302 0 : let timeline_name_mappings = env.timeline_name_mappings();
1303 0 :
1304 0 : let mut table = comfy_table::Table::new();
1305 0 :
1306 0 : table.load_preset(comfy_table::presets::NOTHING);
1307 0 :
1308 0 : table.set_header([
1309 0 : "ENDPOINT",
1310 0 : "ADDRESS",
1311 0 : "TIMELINE",
1312 0 : "BRANCH NAME",
1313 0 : "LSN",
1314 0 : "STATUS",
1315 0 : ]);
1316 :
1317 0 : for (endpoint_id, endpoint) in cplane
1318 0 : .endpoints
1319 0 : .iter()
1320 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
1321 : {
1322 0 : let lsn_str = match endpoint.mode {
1323 0 : ComputeMode::Static(lsn) => {
1324 0 : // -> read-only endpoint
1325 0 : // Use the node's LSN.
1326 0 : lsn.to_string()
1327 : }
1328 : _ => {
1329 : // As the LSN here refers to the one that the compute is started with,
1330 : // we display nothing as it is a primary/hot standby compute.
1331 0 : "---".to_string()
1332 : }
1333 : };
1334 :
1335 0 : let branch_name = timeline_name_mappings
1336 0 : .get(&TenantTimelineId::new(
1337 0 : tenant_shard_id.tenant_id,
1338 0 : endpoint.timeline_id,
1339 0 : ))
1340 0 : .map(|name| name.as_str())
1341 0 : .unwrap_or("?");
1342 0 :
1343 0 : table.add_row([
1344 0 : endpoint_id.as_str(),
1345 0 : &endpoint.pg_address.to_string(),
1346 0 : &endpoint.timeline_id.to_string(),
1347 0 : branch_name,
1348 0 : lsn_str.as_str(),
1349 0 : &format!("{}", endpoint.status()),
1350 0 : ]);
1351 0 : }
1352 :
1353 0 : println!("{table}");
1354 : }
1355 0 : EndpointCmd::Create(args) => {
1356 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1357 0 : let branch_name = args
1358 0 : .branch_name
1359 0 : .clone()
1360 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1361 0 : let endpoint_id = args
1362 0 : .endpoint_id
1363 0 : .clone()
1364 0 : .unwrap_or_else(|| format!("ep-{branch_name}"));
1365 :
1366 0 : let timeline_id = env
1367 0 : .get_branch_timeline_id(&branch_name, tenant_id)
1368 0 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
1369 :
1370 0 : let mode = match (args.lsn, args.hot_standby) {
1371 0 : (Some(lsn), false) => ComputeMode::Static(lsn),
1372 0 : (None, true) => ComputeMode::Replica,
1373 0 : (None, false) => ComputeMode::Primary,
1374 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
1375 : };
1376 :
1377 0 : match (mode, args.hot_standby) {
1378 : (ComputeMode::Static(_), true) => {
1379 0 : bail!(
1380 0 : "Cannot start a node in hot standby mode when it is already configured as a static replica"
1381 0 : )
1382 : }
1383 : (ComputeMode::Primary, true) => {
1384 0 : bail!(
1385 0 : "Cannot start a node as a hot standby replica, it is already configured as primary node"
1386 0 : )
1387 : }
1388 0 : _ => {}
1389 0 : }
1390 0 :
1391 0 : if !args.allow_multiple {
1392 0 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
1393 0 : }
1394 :
1395 0 : cplane.new_endpoint(
1396 0 : &endpoint_id,
1397 0 : tenant_id,
1398 0 : timeline_id,
1399 0 : args.pg_port,
1400 0 : args.external_http_port,
1401 0 : args.internal_http_port,
1402 0 : args.pg_version,
1403 0 : mode,
1404 0 : !args.update_catalog,
1405 0 : false,
1406 0 : )?;
1407 : }
1408 0 : EndpointCmd::Start(args) => {
1409 0 : let endpoint_id = &args.endpoint_id;
1410 0 : let pageserver_id = args.endpoint_pageserver_id;
1411 0 : let remote_ext_config = &args.remote_ext_config;
1412 0 :
1413 0 : let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
1414 : // If --safekeepers argument is given, use only the listed
1415 : // safekeeper nodes; otherwise all from the env.
1416 0 : let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
1417 0 : safekeepers
1418 : } else {
1419 0 : env.safekeepers.iter().map(|sk| sk.id).collect()
1420 : };
1421 :
1422 0 : let endpoint = cplane
1423 0 : .endpoints
1424 0 : .get(endpoint_id.as_str())
1425 0 : .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
1426 :
1427 0 : if !args.allow_multiple {
1428 0 : cplane.check_conflicting_endpoints(
1429 0 : endpoint.mode,
1430 0 : endpoint.tenant_id,
1431 0 : endpoint.timeline_id,
1432 0 : )?;
1433 0 : }
1434 :
1435 0 : let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
1436 0 : let conf = env.get_pageserver_conf(pageserver_id).unwrap();
1437 0 : let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
1438 0 : (
1439 0 : vec![(parsed.0, parsed.1.unwrap_or(5432))],
1440 0 : // If caller is telling us what pageserver to use, this is not a tenant which is
1441 0 : // full managed by storage controller, therefore not sharded.
1442 0 : DEFAULT_STRIPE_SIZE,
1443 0 : )
1444 : } else {
1445 : // Look up the currently attached location of the tenant, and its striping metadata,
1446 : // to pass these on to postgres.
1447 0 : let storage_controller = StorageController::from_env(env);
1448 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
1449 0 : let pageservers = futures::future::try_join_all(
1450 0 : locate_result.shards.into_iter().map(|shard| async move {
1451 0 : if let ComputeMode::Static(lsn) = endpoint.mode {
1452 : // Initialize LSN leases for static computes.
1453 0 : let conf = env.get_pageserver_conf(shard.node_id).unwrap();
1454 0 : let pageserver = PageServerNode::from_env(env, conf);
1455 0 :
1456 0 : pageserver
1457 0 : .http_client
1458 0 : .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
1459 0 : .await?;
1460 0 : }
1461 :
1462 0 : anyhow::Ok((
1463 0 : Host::parse(&shard.listen_pg_addr)
1464 0 : .expect("Storage controller reported bad hostname"),
1465 0 : shard.listen_pg_port,
1466 0 : ))
1467 0 : }),
1468 0 : )
1469 0 : .await?;
1470 0 : let stripe_size = locate_result.shard_params.stripe_size;
1471 0 :
1472 0 : (pageservers, stripe_size)
1473 : };
1474 0 : assert!(!pageservers.is_empty());
1475 :
1476 0 : let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
1477 0 : let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
1478 0 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
1479 0 :
1480 0 : Some(env.generate_auth_token(&claims)?)
1481 : } else {
1482 0 : None
1483 : };
1484 :
1485 0 : println!("Starting existing endpoint {endpoint_id}...");
1486 0 : endpoint
1487 0 : .start(
1488 0 : &auth_token,
1489 0 : safekeepers_generation,
1490 0 : safekeepers,
1491 0 : pageservers,
1492 0 : remote_ext_config.as_ref(),
1493 0 : stripe_size.0 as usize,
1494 0 : args.create_test_user,
1495 0 : args.start_timeout,
1496 0 : )
1497 0 : .await?;
1498 : }
1499 0 : EndpointCmd::Reconfigure(args) => {
1500 0 : let endpoint_id = &args.endpoint_id;
1501 0 : let endpoint = cplane
1502 0 : .endpoints
1503 0 : .get(endpoint_id.as_str())
1504 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1505 0 : let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
1506 0 : let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
1507 0 : vec![(
1508 0 : pageserver.pg_connection_config.host().clone(),
1509 0 : pageserver.pg_connection_config.port(),
1510 0 : )]
1511 : } else {
1512 0 : let storage_controller = StorageController::from_env(env);
1513 0 : storage_controller
1514 0 : .tenant_locate(endpoint.tenant_id)
1515 0 : .await?
1516 : .shards
1517 0 : .into_iter()
1518 0 : .map(|shard| {
1519 0 : (
1520 0 : Host::parse(&shard.listen_pg_addr)
1521 0 : .expect("Storage controller reported malformed host"),
1522 0 : shard.listen_pg_port,
1523 0 : )
1524 0 : })
1525 0 : .collect::<Vec<_>>()
1526 : };
1527 : // If --safekeepers argument is given, use only the listed
1528 : // safekeeper nodes; otherwise all from the env.
1529 0 : let safekeepers = parse_safekeepers(&args.safekeepers)?;
1530 0 : endpoint.reconfigure(pageservers, None, safekeepers).await?;
1531 : }
1532 0 : EndpointCmd::Stop(args) => {
1533 0 : let endpoint_id = &args.endpoint_id;
1534 0 : let endpoint = cplane
1535 0 : .endpoints
1536 0 : .get(endpoint_id)
1537 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1538 0 : endpoint.stop(&args.mode, args.destroy)?;
1539 : }
1540 0 : EndpointCmd::GenerateJwt(args) => {
1541 0 : let endpoint_id = &args.endpoint_id;
1542 0 : let endpoint = cplane
1543 0 : .endpoints
1544 0 : .get(endpoint_id)
1545 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1546 0 : let jwt = endpoint.generate_jwt()?;
1547 :
1548 0 : print!("{jwt}");
1549 : }
1550 : }
1551 :
1552 0 : Ok(())
1553 0 : }
1554 :
1555 : /// Parse --safekeepers as list of safekeeper ids.
1556 0 : fn parse_safekeepers(safekeepers_str: &Option<String>) -> Result<Option<Vec<NodeId>>> {
1557 0 : if let Some(safekeepers_str) = safekeepers_str {
1558 0 : let mut safekeepers: Vec<NodeId> = Vec::new();
1559 0 : for sk_id in safekeepers_str.split(',').map(str::trim) {
1560 0 : let sk_id = NodeId(
1561 0 : u64::from_str(sk_id)
1562 0 : .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
1563 : );
1564 0 : safekeepers.push(sk_id);
1565 : }
1566 0 : Ok(Some(safekeepers))
1567 : } else {
1568 0 : Ok(None)
1569 : }
1570 0 : }
1571 :
1572 0 : fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1573 0 : match subcmd {
1574 0 : MappingsCmd::Map(args) => {
1575 0 : env.register_branch_mapping(
1576 0 : args.branch_name.to_owned(),
1577 0 : args.tenant_id,
1578 0 : args.timeline_id,
1579 0 : )?;
1580 :
1581 0 : Ok(())
1582 : }
1583 : }
1584 0 : }
1585 :
1586 0 : fn get_pageserver(
1587 0 : env: &local_env::LocalEnv,
1588 0 : pageserver_id_arg: Option<NodeId>,
1589 0 : ) -> Result<PageServerNode> {
1590 0 : let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID);
1591 0 :
1592 0 : Ok(PageServerNode::from_env(
1593 0 : env,
1594 0 : env.get_pageserver_conf(node_id)?,
1595 : ))
1596 0 : }
1597 :
1598 0 : async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> {
1599 0 : match subcmd {
1600 0 : PageserverCmd::Start(args) => {
1601 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?
1602 0 : .start(&args.start_timeout)
1603 0 : .await
1604 : {
1605 0 : eprintln!("pageserver start failed: {e}");
1606 0 : exit(1);
1607 0 : }
1608 : }
1609 :
1610 0 : PageserverCmd::Stop(args) => {
1611 0 : let immediate = match args.stop_mode {
1612 0 : StopMode::Fast => false,
1613 0 : StopMode::Immediate => true,
1614 : };
1615 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
1616 0 : eprintln!("pageserver stop failed: {}", e);
1617 0 : exit(1);
1618 0 : }
1619 : }
1620 :
1621 0 : PageserverCmd::Restart(args) => {
1622 0 : let pageserver = get_pageserver(env, args.pageserver_id)?;
1623 : //TODO what shutdown strategy should we use here?
1624 0 : if let Err(e) = pageserver.stop(false) {
1625 0 : eprintln!("pageserver stop failed: {}", e);
1626 0 : exit(1);
1627 0 : }
1628 :
1629 0 : if let Err(e) = pageserver.start(&args.start_timeout).await {
1630 0 : eprintln!("pageserver start failed: {e}");
1631 0 : exit(1);
1632 0 : }
1633 : }
1634 :
1635 0 : PageserverCmd::Status(args) => {
1636 0 : match get_pageserver(env, args.pageserver_id)?
1637 0 : .check_status()
1638 0 : .await
1639 : {
1640 0 : Ok(_) => println!("Page server is up and running"),
1641 0 : Err(err) => {
1642 0 : eprintln!("Page server is not available: {}", err);
1643 0 : exit(1);
1644 : }
1645 : }
1646 : }
1647 : }
1648 0 : Ok(())
1649 0 : }
1650 :
1651 0 : async fn handle_storage_controller(
1652 0 : subcmd: &StorageControllerCmd,
1653 0 : env: &local_env::LocalEnv,
1654 0 : ) -> Result<()> {
1655 0 : let svc = StorageController::from_env(env);
1656 0 : match subcmd {
1657 0 : StorageControllerCmd::Start(args) => {
1658 0 : let start_args = NeonStorageControllerStartArgs {
1659 0 : instance_id: args.instance_id,
1660 0 : base_port: args.base_port,
1661 0 : start_timeout: args.start_timeout,
1662 0 : };
1663 :
1664 0 : if let Err(e) = svc.start(start_args).await {
1665 0 : eprintln!("start failed: {e}");
1666 0 : exit(1);
1667 0 : }
1668 : }
1669 :
1670 0 : StorageControllerCmd::Stop(args) => {
1671 0 : let stop_args = NeonStorageControllerStopArgs {
1672 0 : instance_id: args.instance_id,
1673 0 : immediate: match args.stop_mode {
1674 0 : StopMode::Fast => false,
1675 0 : StopMode::Immediate => true,
1676 : },
1677 : };
1678 0 : if let Err(e) = svc.stop(stop_args).await {
1679 0 : eprintln!("stop failed: {}", e);
1680 0 : exit(1);
1681 0 : }
1682 : }
1683 : }
1684 0 : Ok(())
1685 0 : }
1686 :
1687 0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1688 0 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1689 0 : Ok(SafekeeperNode::from_env(env, node))
1690 : } else {
1691 0 : bail!("could not find safekeeper {id}")
1692 : }
1693 0 : }
1694 :
1695 0 : async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> {
1696 0 : match subcmd {
1697 0 : SafekeeperCmd::Start(args) => {
1698 0 : let safekeeper = get_safekeeper(env, args.id)?;
1699 :
1700 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1701 0 : eprintln!("safekeeper start failed: {}", e);
1702 0 : exit(1);
1703 0 : }
1704 : }
1705 :
1706 0 : SafekeeperCmd::Stop(args) => {
1707 0 : let safekeeper = get_safekeeper(env, args.id)?;
1708 0 : let immediate = match args.stop_mode {
1709 0 : StopMode::Fast => false,
1710 0 : StopMode::Immediate => true,
1711 : };
1712 0 : if let Err(e) = safekeeper.stop(immediate) {
1713 0 : eprintln!("safekeeper stop failed: {}", e);
1714 0 : exit(1);
1715 0 : }
1716 : }
1717 :
1718 0 : SafekeeperCmd::Restart(args) => {
1719 0 : let safekeeper = get_safekeeper(env, args.id)?;
1720 0 : let immediate = match args.stop_mode {
1721 0 : StopMode::Fast => false,
1722 0 : StopMode::Immediate => true,
1723 : };
1724 :
1725 0 : if let Err(e) = safekeeper.stop(immediate) {
1726 0 : eprintln!("safekeeper stop failed: {}", e);
1727 0 : exit(1);
1728 0 : }
1729 :
1730 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1731 0 : eprintln!("safekeeper start failed: {}", e);
1732 0 : exit(1);
1733 0 : }
1734 : }
1735 : }
1736 0 : Ok(())
1737 0 : }
1738 :
1739 0 : async fn handle_endpoint_storage(
1740 0 : subcmd: &EndpointStorageCmd,
1741 0 : env: &local_env::LocalEnv,
1742 0 : ) -> Result<()> {
1743 : use EndpointStorageCmd::*;
1744 0 : let storage = EndpointStorage::from_env(env);
1745 0 :
1746 0 : // In tests like test_forward_compatibility or test_graceful_cluster_restart
1747 0 : // old neon binaries (without endpoint_storage) are present
1748 0 : if !storage.bin.exists() {
1749 0 : eprintln!(
1750 0 : "{} binary not found. Ignore if this is a compatibility test",
1751 0 : storage.bin
1752 0 : );
1753 0 : return Ok(());
1754 0 : }
1755 0 :
1756 0 : match subcmd {
1757 0 : Start(EndpointStorageStartCmd { start_timeout }) => {
1758 0 : if let Err(e) = storage.start(start_timeout).await {
1759 0 : eprintln!("endpoint_storage start failed: {e}");
1760 0 : exit(1);
1761 0 : }
1762 : }
1763 0 : Stop(EndpointStorageStopCmd { stop_mode }) => {
1764 0 : let immediate = match stop_mode {
1765 0 : StopMode::Fast => false,
1766 0 : StopMode::Immediate => true,
1767 : };
1768 0 : if let Err(e) = storage.stop(immediate) {
1769 0 : eprintln!("proxy stop failed: {e}");
1770 0 : exit(1);
1771 0 : }
1772 : }
1773 : };
1774 0 : Ok(())
1775 0 : }
1776 :
1777 0 : async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
1778 0 : match subcmd {
1779 0 : StorageBrokerCmd::Start(args) => {
1780 0 : if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
1781 0 : eprintln!("broker start failed: {e}");
1782 0 : exit(1);
1783 0 : }
1784 : }
1785 :
1786 0 : StorageBrokerCmd::Stop(_args) => {
1787 : // FIXME: stop_mode unused
1788 0 : if let Err(e) = broker::stop_broker_process(env) {
1789 0 : eprintln!("broker stop failed: {e}");
1790 0 : exit(1);
1791 0 : }
1792 : }
1793 : }
1794 0 : Ok(())
1795 0 : }
1796 :
1797 0 : async fn handle_start_all(
1798 0 : args: &StartCmdArgs,
1799 0 : env: &'static local_env::LocalEnv,
1800 0 : ) -> anyhow::Result<()> {
1801 : // FIXME: this was called "retry_timeout", is it right?
1802 0 : let Err(errors) = handle_start_all_impl(env, args.timeout).await else {
1803 0 : neon_start_status_check(env, args.timeout.as_ref())
1804 0 : .await
1805 0 : .context("status check after successful startup of all services")?;
1806 0 : return Ok(());
1807 : };
1808 :
1809 0 : eprintln!("startup failed because one or more services could not be started");
1810 :
1811 0 : for e in errors {
1812 0 : eprintln!("{e}");
1813 0 : let debug_repr = format!("{e:?}");
1814 0 : for line in debug_repr.lines() {
1815 0 : eprintln!(" {line}");
1816 0 : }
1817 : }
1818 :
1819 0 : try_stop_all(env, true).await;
1820 :
1821 0 : exit(2);
1822 0 : }
1823 :
1824 : /// Returns Ok() if and only if all services could be started successfully.
1825 : /// Otherwise, returns the list of errors that occurred during startup.
1826 0 : async fn handle_start_all_impl(
1827 0 : env: &'static local_env::LocalEnv,
1828 0 : retry_timeout: humantime::Duration,
1829 0 : ) -> Result<(), Vec<anyhow::Error>> {
1830 0 : // Endpoints are not started automatically
1831 0 :
1832 0 : let mut js = JoinSet::new();
1833 :
1834 : // force infalliblity through closure
1835 : #[allow(clippy::redundant_closure_call)]
1836 0 : (|| {
1837 0 : js.spawn(async move {
1838 0 : let retry_timeout = retry_timeout;
1839 0 : broker::start_broker_process(env, &retry_timeout).await
1840 0 : });
1841 0 :
1842 0 : js.spawn(async move {
1843 0 : let storage_controller = StorageController::from_env(env);
1844 0 : storage_controller
1845 0 : .start(NeonStorageControllerStartArgs::with_default_instance_id(
1846 0 : retry_timeout,
1847 0 : ))
1848 0 : .await
1849 0 : .map_err(|e| e.context("start storage_controller"))
1850 0 : });
1851 :
1852 0 : for ps_conf in &env.pageservers {
1853 0 : js.spawn(async move {
1854 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1855 0 : pageserver
1856 0 : .start(&retry_timeout)
1857 0 : .await
1858 0 : .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
1859 0 : });
1860 0 : }
1861 :
1862 0 : for node in env.safekeepers.iter() {
1863 0 : js.spawn(async move {
1864 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1865 0 : safekeeper
1866 0 : .start(&[], &retry_timeout)
1867 0 : .await
1868 0 : .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
1869 0 : });
1870 0 : }
1871 :
1872 0 : js.spawn(async move {
1873 0 : EndpointStorage::from_env(env)
1874 0 : .start(&retry_timeout)
1875 0 : .await
1876 0 : .map_err(|e| e.context("start endpoint_storage"))
1877 0 : });
1878 0 : })();
1879 0 :
1880 0 : let mut errors = Vec::new();
1881 0 : while let Some(result) = js.join_next().await {
1882 0 : let result = result.expect("we don't panic or cancel the tasks");
1883 0 : if let Err(e) = result {
1884 0 : errors.push(e);
1885 0 : }
1886 : }
1887 :
1888 0 : if !errors.is_empty() {
1889 0 : return Err(errors);
1890 0 : }
1891 0 :
1892 0 : Ok(())
1893 0 : }
1894 :
1895 0 : async fn neon_start_status_check(
1896 0 : env: &local_env::LocalEnv,
1897 0 : retry_timeout: &Duration,
1898 0 : ) -> anyhow::Result<()> {
1899 : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
1900 : const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
1901 :
1902 0 : let storcon = StorageController::from_env(env);
1903 0 :
1904 0 : let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
1905 0 : let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
1906 0 :
1907 0 : println!("\nRunning neon status check");
1908 :
1909 0 : for retry in 0..retries {
1910 0 : if retry == notice_after_retries {
1911 0 : println!("\nNeon status check has not passed yet, continuing to wait")
1912 0 : }
1913 :
1914 0 : let mut passed = true;
1915 0 : let mut nodes = storcon.node_list().await?;
1916 0 : let mut pageservers = env.pageservers.clone();
1917 0 :
1918 0 : if nodes.len() != pageservers.len() {
1919 0 : continue;
1920 0 : }
1921 0 :
1922 0 : nodes.sort_by_key(|ps| ps.id);
1923 0 : pageservers.sort_by_key(|ps| ps.id);
1924 :
1925 0 : for (idx, pageserver) in pageservers.iter().enumerate() {
1926 0 : let node = &nodes[idx];
1927 0 : if node.id != pageserver.id {
1928 0 : passed = false;
1929 0 : break;
1930 0 : }
1931 :
1932 0 : if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
1933 0 : passed = false;
1934 0 : break;
1935 0 : }
1936 : }
1937 :
1938 0 : if passed {
1939 0 : println!("\nNeon started and passed status check");
1940 0 : return Ok(());
1941 0 : }
1942 0 :
1943 0 : tokio::time::sleep(RETRY_INTERVAL).await;
1944 : }
1945 :
1946 0 : anyhow::bail!("\nNeon passed status check")
1947 0 : }
1948 :
1949 0 : async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> {
1950 0 : let immediate = match args.mode {
1951 0 : StopMode::Fast => false,
1952 0 : StopMode::Immediate => true,
1953 : };
1954 :
1955 0 : try_stop_all(env, immediate).await;
1956 :
1957 0 : Ok(())
1958 0 : }
1959 :
1960 0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
1961 0 : // Stop all endpoints
1962 0 : match ComputeControlPlane::load(env.clone()) {
1963 0 : Ok(cplane) => {
1964 0 : for (_k, node) in cplane.endpoints {
1965 0 : if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
1966 0 : eprintln!("postgres stop failed: {e:#}");
1967 0 : }
1968 : }
1969 : }
1970 0 : Err(e) => {
1971 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
1972 : }
1973 : }
1974 :
1975 0 : let storage = EndpointStorage::from_env(env);
1976 0 : if let Err(e) = storage.stop(immediate) {
1977 0 : eprintln!("endpoint_storage stop failed: {:#}", e);
1978 0 : }
1979 :
1980 0 : for ps_conf in &env.pageservers {
1981 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
1982 0 : if let Err(e) = pageserver.stop(immediate) {
1983 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
1984 0 : }
1985 : }
1986 :
1987 0 : for node in env.safekeepers.iter() {
1988 0 : let safekeeper = SafekeeperNode::from_env(env, node);
1989 0 : if let Err(e) = safekeeper.stop(immediate) {
1990 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
1991 0 : }
1992 : }
1993 :
1994 0 : if let Err(e) = broker::stop_broker_process(env) {
1995 0 : eprintln!("neon broker stop failed: {e:#}");
1996 0 : }
1997 :
1998 : // Stop all storage controller instances. In the most common case there's only one,
1999 : // but iterate though the base data directory in order to discover the instances.
2000 0 : let storcon_instances = env
2001 0 : .storage_controller_instances()
2002 0 : .await
2003 0 : .expect("Must inspect data dir");
2004 0 : for (instance_id, _instance_dir_path) in storcon_instances {
2005 0 : let storage_controller = StorageController::from_env(env);
2006 0 : let stop_args = NeonStorageControllerStopArgs {
2007 0 : instance_id,
2008 0 : immediate,
2009 0 : };
2010 :
2011 0 : if let Err(e) = storage_controller.stop(stop_args).await {
2012 0 : eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
2013 0 : }
2014 : }
2015 0 : }
|