Line data Source code
1 : //!
2 : //! `neon_local` is an executable that can be used to create a local
3 : //! Neon environment, for testing purposes. The local environment is
4 : //! quite different from the cloud environment with Kubernetes, but it
5 : //! easier to work with locally. The python tests in `test_runner`
6 : //! rely on `neon_local` to set up the environment for each test.
7 : //!
8 : use std::borrow::Cow;
9 : use std::collections::{BTreeSet, HashMap};
10 : use std::fs::File;
11 : use std::path::PathBuf;
12 : use std::process::exit;
13 : use std::str::FromStr;
14 : use std::time::Duration;
15 :
16 : use anyhow::{Context, Result, anyhow, bail};
17 : use clap::Parser;
18 : use compute_api::requests::ComputeClaimsScope;
19 : use compute_api::spec::{ComputeMode, PageserverProtocol};
20 : use control_plane::broker::StorageBroker;
21 : use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
22 : use control_plane::endpoint::{
23 : local_pageserver_conf_to_conn_info, tenant_locate_response_to_conn_info,
24 : };
25 : use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
26 : use control_plane::local_env;
27 : use control_plane::local_env::{
28 : EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
29 : NeonLocalInitPageserverConf, SafekeeperConf,
30 : };
31 : use control_plane::pageserver::PageServerNode;
32 : use control_plane::safekeeper::SafekeeperNode;
33 : use control_plane::storage_controller::{
34 : NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
35 : };
36 : use nix::fcntl::{Flock, FlockArg};
37 : use pageserver_api::config::{
38 : DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
39 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
40 : DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
41 : };
42 : use pageserver_api::controller_api::{
43 : NodeAvailabilityWrapper, PlacementPolicy, TenantCreateRequest,
44 : };
45 : use pageserver_api::models::{
46 : ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
47 : };
48 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
49 : use postgres_backend::AuthType;
50 : use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
51 : use safekeeper_api::{
52 : DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
53 : DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, PgMajorVersion, PgVersionId,
54 : };
55 : use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
56 : use tokio::task::JoinSet;
57 : use utils::auth::{Claims, Scope};
58 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
59 : use utils::lsn::Lsn;
60 : use utils::project_git_version;
61 :
62 : // Default id of a safekeeper node, if not specified on the command line.
63 : const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
64 : const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
65 : const DEFAULT_BRANCH_NAME: &str = "main";
66 : project_git_version!(GIT_VERSION);
67 :
68 : #[allow(dead_code)]
69 : const DEFAULT_PG_VERSION: PgMajorVersion = PgMajorVersion::PG17;
70 : const DEFAULT_PG_VERSION_NUM: &str = "17";
71 :
72 : const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
73 :
74 : #[derive(clap::Parser)]
75 : #[command(version = GIT_VERSION, about, name = "Neon CLI")]
76 : struct Cli {
77 : #[command(subcommand)]
78 : command: NeonLocalCmd,
79 : }
80 :
81 : #[derive(clap::Subcommand)]
82 : enum NeonLocalCmd {
83 : Init(InitCmdArgs),
84 :
85 : #[command(subcommand)]
86 : Tenant(TenantCmd),
87 : #[command(subcommand)]
88 : Timeline(TimelineCmd),
89 : #[command(subcommand)]
90 : Pageserver(PageserverCmd),
91 : #[command(subcommand)]
92 : #[clap(alias = "storage_controller")]
93 : StorageController(StorageControllerCmd),
94 : #[command(subcommand)]
95 : #[clap(alias = "storage_broker")]
96 : StorageBroker(StorageBrokerCmd),
97 : #[command(subcommand)]
98 : Safekeeper(SafekeeperCmd),
99 : #[command(subcommand)]
100 : EndpointStorage(EndpointStorageCmd),
101 : #[command(subcommand)]
102 : Endpoint(EndpointCmd),
103 : #[command(subcommand)]
104 : Mappings(MappingsCmd),
105 :
106 : Start(StartCmdArgs),
107 : Stop(StopCmdArgs),
108 : }
109 :
110 : #[derive(clap::Args)]
111 : #[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")]
112 : struct InitCmdArgs {
113 : #[clap(long, help("How many pageservers to create (default 1)"))]
114 : num_pageservers: Option<u16>,
115 :
116 : #[clap(long)]
117 : config: Option<PathBuf>,
118 :
119 : #[clap(long, help("Force initialization even if the repository is not empty"))]
120 : #[arg(value_parser)]
121 : #[clap(default_value = "must-not-exist")]
122 : force: InitForceMode,
123 : }
124 :
125 : #[derive(clap::Args)]
126 : #[clap(about = "Start pageserver and safekeepers")]
127 : struct StartCmdArgs {
128 : #[clap(long = "start-timeout", default_value = "10s")]
129 : timeout: humantime::Duration,
130 : }
131 :
132 : #[derive(clap::Args)]
133 : #[clap(about = "Stop pageserver and safekeepers")]
134 : struct StopCmdArgs {
135 : #[arg(value_enum)]
136 : #[clap(long, default_value_t = StopMode::Fast)]
137 : mode: StopMode,
138 : }
139 :
140 : #[derive(Clone, Copy, clap::ValueEnum)]
141 : enum StopMode {
142 : Fast,
143 : Immediate,
144 : }
145 :
146 : #[derive(clap::Subcommand)]
147 : #[clap(about = "Manage tenants")]
148 : enum TenantCmd {
149 : List,
150 : Create(TenantCreateCmdArgs),
151 : SetDefault(TenantSetDefaultCmdArgs),
152 : Config(TenantConfigCmdArgs),
153 : Import(TenantImportCmdArgs),
154 : }
155 :
156 : #[derive(clap::Args)]
157 : struct TenantCreateCmdArgs {
158 : #[clap(
159 : long = "tenant-id",
160 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
161 : )]
162 : tenant_id: Option<TenantId>,
163 :
164 : #[clap(
165 : long,
166 : help = "Use a specific timeline id when creating a tenant and its initial timeline"
167 : )]
168 : timeline_id: Option<TimelineId>,
169 :
170 : #[clap(short = 'c')]
171 : config: Vec<String>,
172 :
173 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
174 : #[clap(long, help = "Postgres version to use for the initial timeline")]
175 : pg_version: PgMajorVersion,
176 :
177 : #[clap(
178 : long,
179 : help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified"
180 : )]
181 : set_default: bool,
182 :
183 : #[clap(long, help = "Number of shards in the new tenant")]
184 : #[arg(default_value_t = 0)]
185 : shard_count: u8,
186 : #[clap(long, help = "Sharding stripe size in pages")]
187 : shard_stripe_size: Option<u32>,
188 :
189 : #[clap(long, help = "Placement policy shards in this tenant")]
190 : #[arg(value_parser = parse_placement_policy)]
191 : placement_policy: Option<PlacementPolicy>,
192 : }
193 :
194 0 : fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
195 0 : Ok(serde_json::from_str::<PlacementPolicy>(s)?)
196 0 : }
197 :
198 : #[derive(clap::Args)]
199 : #[clap(
200 : about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"
201 : )]
202 : struct TenantSetDefaultCmdArgs {
203 : #[clap(
204 : long = "tenant-id",
205 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
206 : )]
207 : tenant_id: TenantId,
208 : }
209 :
210 : #[derive(clap::Args)]
211 : struct TenantConfigCmdArgs {
212 : #[clap(
213 : long = "tenant-id",
214 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
215 : )]
216 : tenant_id: Option<TenantId>,
217 :
218 : #[clap(short = 'c')]
219 : config: Vec<String>,
220 : }
221 :
222 : #[derive(clap::Args)]
223 : #[clap(
224 : about = "Import a tenant that is present in remote storage, and create branches for its timelines"
225 : )]
226 : struct TenantImportCmdArgs {
227 : #[clap(
228 : long = "tenant-id",
229 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
230 : )]
231 : tenant_id: TenantId,
232 : }
233 :
234 : #[derive(clap::Subcommand)]
235 : #[clap(about = "Manage timelines")]
236 : enum TimelineCmd {
237 : List(TimelineListCmdArgs),
238 : Branch(TimelineBranchCmdArgs),
239 : Create(TimelineCreateCmdArgs),
240 : Import(TimelineImportCmdArgs),
241 : }
242 :
243 : #[derive(clap::Args)]
244 : #[clap(about = "List all timelines available to this pageserver")]
245 : struct TimelineListCmdArgs {
246 : #[clap(
247 : long = "tenant-id",
248 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
249 : )]
250 : tenant_shard_id: Option<TenantShardId>,
251 : }
252 :
253 : #[derive(clap::Args)]
254 : #[clap(about = "Create a new timeline, branching off from another timeline")]
255 : struct TimelineBranchCmdArgs {
256 : #[clap(
257 : long = "tenant-id",
258 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
259 : )]
260 : tenant_id: Option<TenantId>,
261 :
262 : #[clap(long, help = "New timeline's ID")]
263 : timeline_id: Option<TimelineId>,
264 :
265 : #[clap(long, help = "Human-readable alias for the new timeline")]
266 : branch_name: String,
267 :
268 : #[clap(
269 : long,
270 : help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name."
271 : )]
272 : ancestor_branch_name: Option<String>,
273 :
274 : #[clap(
275 : long,
276 : help = "When using another timeline as base, use a specific Lsn in it instead of the latest one"
277 : )]
278 : ancestor_start_lsn: Option<Lsn>,
279 : }
280 :
281 : #[derive(clap::Args)]
282 : #[clap(about = "Create a new blank timeline")]
283 : struct TimelineCreateCmdArgs {
284 : #[clap(
285 : long = "tenant-id",
286 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
287 : )]
288 : tenant_id: Option<TenantId>,
289 :
290 : #[clap(long, help = "New timeline's ID")]
291 : timeline_id: Option<TimelineId>,
292 :
293 : #[clap(long, help = "Human-readable alias for the new timeline")]
294 : branch_name: String,
295 :
296 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
297 : #[clap(long, help = "Postgres version")]
298 : pg_version: PgMajorVersion,
299 : }
300 :
301 : #[derive(clap::Args)]
302 : #[clap(about = "Import timeline from a basebackup directory")]
303 : struct TimelineImportCmdArgs {
304 : #[clap(
305 : long = "tenant-id",
306 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
307 : )]
308 : tenant_id: Option<TenantId>,
309 :
310 : #[clap(long, help = "New timeline's ID")]
311 : timeline_id: TimelineId,
312 :
313 : #[clap(long, help = "Human-readable alias for the new timeline")]
314 : branch_name: String,
315 :
316 : #[clap(long, help = "Basebackup tarfile to import")]
317 : base_tarfile: PathBuf,
318 :
319 : #[clap(long, help = "Lsn the basebackup starts at")]
320 : base_lsn: Lsn,
321 :
322 : #[clap(long, help = "Wal to add after base")]
323 : wal_tarfile: Option<PathBuf>,
324 :
325 : #[clap(long, help = "Lsn the basebackup ends at")]
326 : end_lsn: Option<Lsn>,
327 :
328 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
329 : #[clap(long, help = "Postgres version of the backup being imported")]
330 : pg_version: PgMajorVersion,
331 : }
332 :
333 : #[derive(clap::Subcommand)]
334 : #[clap(about = "Manage pageservers")]
335 : enum PageserverCmd {
336 : Status(PageserverStatusCmdArgs),
337 : Start(PageserverStartCmdArgs),
338 : Stop(PageserverStopCmdArgs),
339 : Restart(PageserverRestartCmdArgs),
340 : }
341 :
342 : #[derive(clap::Args)]
343 : #[clap(about = "Show status of a local pageserver")]
344 : struct PageserverStatusCmdArgs {
345 : #[clap(long = "id", help = "pageserver id")]
346 : pageserver_id: Option<NodeId>,
347 : }
348 :
349 : #[derive(clap::Args)]
350 : #[clap(about = "Start local pageserver")]
351 : struct PageserverStartCmdArgs {
352 : #[clap(long = "id", help = "pageserver id")]
353 : pageserver_id: Option<NodeId>,
354 :
355 : #[clap(short = 't', long, help = "timeout until we fail the command")]
356 : #[arg(default_value = "10s")]
357 : start_timeout: humantime::Duration,
358 : }
359 :
360 : #[derive(clap::Args)]
361 : #[clap(about = "Stop local pageserver")]
362 : struct PageserverStopCmdArgs {
363 : #[clap(long = "id", help = "pageserver id")]
364 : pageserver_id: Option<NodeId>,
365 :
366 : #[clap(
367 : short = 'm',
368 : help = "If 'immediate', don't flush repository data at shutdown"
369 : )]
370 : #[arg(value_enum, default_value = "fast")]
371 : stop_mode: StopMode,
372 : }
373 :
374 : #[derive(clap::Args)]
375 : #[clap(about = "Restart local pageserver")]
376 : struct PageserverRestartCmdArgs {
377 : #[clap(long = "id", help = "pageserver id")]
378 : pageserver_id: Option<NodeId>,
379 :
380 : #[clap(short = 't', long, help = "timeout until we fail the command")]
381 : #[arg(default_value = "10s")]
382 : start_timeout: humantime::Duration,
383 : }
384 :
385 : #[derive(clap::Subcommand)]
386 : #[clap(about = "Manage storage controller")]
387 : enum StorageControllerCmd {
388 : Start(StorageControllerStartCmdArgs),
389 : Stop(StorageControllerStopCmdArgs),
390 : }
391 :
392 : #[derive(clap::Args)]
393 : #[clap(about = "Start storage controller")]
394 : struct StorageControllerStartCmdArgs {
395 : #[clap(short = 't', long, help = "timeout until we fail the command")]
396 : #[arg(default_value = "10s")]
397 : start_timeout: humantime::Duration,
398 :
399 : #[clap(
400 : long,
401 : help = "Identifier used to distinguish storage controller instances"
402 : )]
403 : #[arg(default_value_t = 1)]
404 : instance_id: u8,
405 :
406 : #[clap(
407 : long,
408 : help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
409 : )]
410 : base_port: Option<u16>,
411 :
412 : #[clap(
413 : long,
414 : help = "Whether the storage controller should handle pageserver-reported local disk loss events."
415 : )]
416 : handle_ps_local_disk_loss: Option<bool>,
417 : }
418 :
419 : #[derive(clap::Args)]
420 : #[clap(about = "Stop storage controller")]
421 : struct StorageControllerStopCmdArgs {
422 : #[clap(
423 : short = 'm',
424 : help = "If 'immediate', don't flush repository data at shutdown"
425 : )]
426 : #[arg(value_enum, default_value = "fast")]
427 : stop_mode: StopMode,
428 :
429 : #[clap(
430 : long,
431 : help = "Identifier used to distinguish storage controller instances"
432 : )]
433 : #[arg(default_value_t = 1)]
434 : instance_id: u8,
435 : }
436 :
437 : #[derive(clap::Subcommand)]
438 : #[clap(about = "Manage storage broker")]
439 : enum StorageBrokerCmd {
440 : Start(StorageBrokerStartCmdArgs),
441 : Stop(StorageBrokerStopCmdArgs),
442 : }
443 :
444 : #[derive(clap::Args)]
445 : #[clap(about = "Start broker")]
446 : struct StorageBrokerStartCmdArgs {
447 : #[clap(short = 't', long, help = "timeout until we fail the command")]
448 : #[arg(default_value = "10s")]
449 : start_timeout: humantime::Duration,
450 : }
451 :
452 : #[derive(clap::Args)]
453 : #[clap(about = "stop broker")]
454 : struct StorageBrokerStopCmdArgs {
455 : #[clap(
456 : short = 'm',
457 : help = "If 'immediate', don't flush repository data at shutdown"
458 : )]
459 : #[arg(value_enum, default_value = "fast")]
460 : stop_mode: StopMode,
461 : }
462 :
463 : #[derive(clap::Subcommand)]
464 : #[clap(about = "Manage safekeepers")]
465 : enum SafekeeperCmd {
466 : Start(SafekeeperStartCmdArgs),
467 : Stop(SafekeeperStopCmdArgs),
468 : Restart(SafekeeperRestartCmdArgs),
469 : }
470 :
471 : #[derive(clap::Subcommand)]
472 : #[clap(about = "Manage object storage")]
473 : enum EndpointStorageCmd {
474 : Start(EndpointStorageStartCmd),
475 : Stop(EndpointStorageStopCmd),
476 : }
477 :
478 : #[derive(clap::Args)]
479 : #[clap(about = "Start object storage")]
480 : struct EndpointStorageStartCmd {
481 : #[clap(short = 't', long, help = "timeout until we fail the command")]
482 : #[arg(default_value = "10s")]
483 : start_timeout: humantime::Duration,
484 : }
485 :
486 : #[derive(clap::Args)]
487 : #[clap(about = "Stop object storage")]
488 : struct EndpointStorageStopCmd {
489 : #[arg(value_enum, default_value = "fast")]
490 : #[clap(
491 : short = 'm',
492 : help = "If 'immediate', don't flush repository data at shutdown"
493 : )]
494 : stop_mode: StopMode,
495 : }
496 :
497 : #[derive(clap::Args)]
498 : #[clap(about = "Start local safekeeper")]
499 : struct SafekeeperStartCmdArgs {
500 : #[clap(help = "safekeeper id")]
501 : #[arg(default_value_t = NodeId(1))]
502 : id: NodeId,
503 :
504 : #[clap(
505 : short = 'e',
506 : long = "safekeeper-extra-opt",
507 : help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
508 : )]
509 : extra_opt: Vec<String>,
510 :
511 : #[clap(short = 't', long, help = "timeout until we fail the command")]
512 : #[arg(default_value = "10s")]
513 : start_timeout: humantime::Duration,
514 : }
515 :
516 : #[derive(clap::Args)]
517 : #[clap(about = "Stop local safekeeper")]
518 : struct SafekeeperStopCmdArgs {
519 : #[clap(help = "safekeeper id")]
520 : #[arg(default_value_t = NodeId(1))]
521 : id: NodeId,
522 :
523 : #[arg(value_enum, default_value = "fast")]
524 : #[clap(
525 : short = 'm',
526 : help = "If 'immediate', don't flush repository data at shutdown"
527 : )]
528 : stop_mode: StopMode,
529 : }
530 :
531 : #[derive(clap::Args)]
532 : #[clap(about = "Restart local safekeeper")]
533 : struct SafekeeperRestartCmdArgs {
534 : #[clap(help = "safekeeper id")]
535 : #[arg(default_value_t = NodeId(1))]
536 : id: NodeId,
537 :
538 : #[arg(value_enum, default_value = "fast")]
539 : #[clap(
540 : short = 'm',
541 : help = "If 'immediate', don't flush repository data at shutdown"
542 : )]
543 : stop_mode: StopMode,
544 :
545 : #[clap(
546 : short = 'e',
547 : long = "safekeeper-extra-opt",
548 : help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
549 : )]
550 : extra_opt: Vec<String>,
551 :
552 : #[clap(short = 't', long, help = "timeout until we fail the command")]
553 : #[arg(default_value = "10s")]
554 : start_timeout: humantime::Duration,
555 : }
556 :
557 : #[derive(clap::Subcommand)]
558 : #[clap(about = "Manage Postgres instances")]
559 : enum EndpointCmd {
560 : List(EndpointListCmdArgs),
561 : Create(EndpointCreateCmdArgs),
562 : Start(EndpointStartCmdArgs),
563 : Reconfigure(EndpointReconfigureCmdArgs),
564 : RefreshConfiguration(EndpointRefreshConfigurationArgs),
565 : Stop(EndpointStopCmdArgs),
566 : UpdatePageservers(EndpointUpdatePageserversCmdArgs),
567 : GenerateJwt(EndpointGenerateJwtCmdArgs),
568 : }
569 :
570 : #[derive(clap::Args)]
571 : #[clap(about = "List endpoints")]
572 : struct EndpointListCmdArgs {
573 : #[clap(
574 : long = "tenant-id",
575 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
576 : )]
577 : tenant_shard_id: Option<TenantShardId>,
578 : }
579 :
580 : #[derive(clap::Args)]
581 : #[clap(about = "Create a compute endpoint")]
582 : struct EndpointCreateCmdArgs {
583 : #[clap(
584 : long = "tenant-id",
585 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
586 : )]
587 : tenant_id: Option<TenantId>,
588 :
589 : #[clap(help = "Postgres endpoint id")]
590 : endpoint_id: Option<String>,
591 : #[clap(long, help = "Name of the branch the endpoint will run on")]
592 : branch_name: Option<String>,
593 : #[clap(
594 : long,
595 : help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used"
596 : )]
597 : lsn: Option<Lsn>,
598 : #[clap(long)]
599 : pg_port: Option<u16>,
600 : #[clap(long, alias = "http-port")]
601 : external_http_port: Option<u16>,
602 : #[clap(long)]
603 : internal_http_port: Option<u16>,
604 : #[clap(long = "pageserver-id")]
605 : endpoint_pageserver_id: Option<NodeId>,
606 :
607 : #[clap(
608 : long,
609 : help = "Don't do basebackup, create endpoint directory with only config files",
610 : action = clap::ArgAction::Set,
611 : default_value_t = false
612 : )]
613 : config_only: bool,
614 :
615 : #[arg(default_value = DEFAULT_PG_VERSION_NUM)]
616 : #[clap(long, help = "Postgres version")]
617 : pg_version: PgMajorVersion,
618 :
619 : /// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
620 : ///
621 : /// Specified on creation such that it's retained across reconfiguration and restarts.
622 : ///
623 : /// NB: not yet supported by computes.
624 : #[clap(long)]
625 : grpc: bool,
626 :
627 : #[clap(
628 : long,
629 : help = "If set, the node will be a hot replica on the specified timeline",
630 : action = clap::ArgAction::Set,
631 : default_value_t = false
632 : )]
633 : hot_standby: bool,
634 :
635 : #[clap(long, help = "If set, will set up the catalog for neon_superuser")]
636 : update_catalog: bool,
637 :
638 : #[clap(
639 : long,
640 : help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
641 : )]
642 : allow_multiple: bool,
643 :
644 : /// Only allow changing it on creation
645 : #[clap(long, help = "Name of the privileged role for the endpoint")]
646 : privileged_role_name: Option<String>,
647 : }
648 :
649 : #[derive(clap::Args)]
650 : #[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
651 : struct EndpointStartCmdArgs {
652 : #[clap(help = "Postgres endpoint id")]
653 : endpoint_id: String,
654 : #[clap(long = "pageserver-id")]
655 : endpoint_pageserver_id: Option<NodeId>,
656 :
657 : #[clap(
658 : long,
659 : help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
660 : )]
661 : safekeepers_generation: Option<u32>,
662 : #[clap(
663 : long,
664 : help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override."
665 : )]
666 : safekeepers: Option<String>,
667 :
668 : #[clap(
669 : long,
670 : help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
671 : alias = "remote-ext-config"
672 : )]
673 : remote_ext_base_url: Option<String>,
674 :
675 : #[clap(
676 : long,
677 : help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`"
678 : )]
679 : create_test_user: bool,
680 :
681 : #[clap(
682 : long,
683 : help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
684 : )]
685 : allow_multiple: bool,
686 :
687 : #[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
688 : #[arg(default_value = "90s")]
689 : start_timeout: Duration,
690 :
691 : #[clap(
692 : long,
693 : help = "Download LFC cache from endpoint storage on endpoint startup",
694 : default_value = "false"
695 : )]
696 : autoprewarm: bool,
697 :
698 : #[clap(long, help = "Upload LFC cache to endpoint storage periodically")]
699 : offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
700 :
701 : #[clap(
702 : long,
703 : help = "Run in development mode, skipping VM-specific operations like process termination",
704 : action = clap::ArgAction::SetTrue
705 : )]
706 : dev: bool,
707 : }
708 :
709 : #[derive(clap::Args)]
710 : #[clap(about = "Reconfigure an endpoint")]
711 : struct EndpointReconfigureCmdArgs {
712 : #[clap(
713 : long = "tenant-id",
714 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
715 : )]
716 : tenant_id: Option<TenantId>,
717 :
718 : #[clap(help = "Postgres endpoint id")]
719 : endpoint_id: String,
720 : #[clap(long = "pageserver-id")]
721 : endpoint_pageserver_id: Option<NodeId>,
722 :
723 : #[clap(long)]
724 : safekeepers: Option<String>,
725 : }
726 :
727 : #[derive(clap::Args)]
728 : #[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")]
729 : struct EndpointRefreshConfigurationArgs {
730 : #[clap(help = "Postgres endpoint id")]
731 : endpoint_id: String,
732 : }
733 :
734 : #[derive(clap::Args)]
735 : #[clap(about = "Stop an endpoint")]
736 : struct EndpointStopCmdArgs {
737 : #[clap(help = "Postgres endpoint id")]
738 : endpoint_id: String,
739 :
740 : #[clap(
741 : long,
742 : help = "Also delete data directory (now optional, should be default in future)"
743 : )]
744 : destroy: bool,
745 :
746 : #[clap(long, help = "Postgres shutdown mode")]
747 : #[clap(default_value = "fast")]
748 : mode: EndpointTerminateMode,
749 : }
750 :
751 : #[derive(clap::Args)]
752 : #[clap(about = "Update the pageservers in the spec file of the compute endpoint")]
753 : struct EndpointUpdatePageserversCmdArgs {
754 : #[clap(help = "Postgres endpoint id")]
755 : endpoint_id: String,
756 :
757 : #[clap(short = 'p', long, help = "Specified pageserver id")]
758 : pageserver_id: Option<NodeId>,
759 : }
760 :
761 : #[derive(clap::Args)]
762 : #[clap(about = "Generate a JWT for an endpoint")]
763 : struct EndpointGenerateJwtCmdArgs {
764 : #[clap(help = "Postgres endpoint id")]
765 : endpoint_id: String,
766 :
767 : #[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
768 : scope: Option<ComputeClaimsScope>,
769 : }
770 :
771 : #[derive(clap::Subcommand)]
772 : #[clap(about = "Manage neon_local branch name mappings")]
773 : enum MappingsCmd {
774 : Map(MappingsMapCmdArgs),
775 : }
776 :
777 : #[derive(clap::Args)]
778 : #[clap(about = "Create new mapping which cannot exist already")]
779 : struct MappingsMapCmdArgs {
780 : #[clap(
781 : long,
782 : help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
783 : )]
784 : tenant_id: TenantId,
785 : #[clap(
786 : long,
787 : help = "Timeline id. Represented as a hexadecimal string 32 symbols length"
788 : )]
789 : timeline_id: TimelineId,
790 : #[clap(long, help = "Branch name to give to the timeline")]
791 : branch_name: String,
792 : }
793 :
794 : ///
795 : /// Timelines tree element used as a value in the HashMap.
796 : ///
797 : struct TimelineTreeEl {
798 : /// `TimelineInfo` received from the `pageserver` via the `timeline_list` http API call.
799 : pub info: TimelineInfo,
800 : /// Name, recovered from neon config mappings
801 : pub name: Option<String>,
802 : /// Holds all direct children of this timeline referenced using `timeline_id`.
803 : pub children: BTreeSet<TimelineId>,
804 : }
805 :
806 : /// A flock-based guard over the neon_local repository directory
807 : struct RepoLock {
808 : _file: Flock<File>,
809 : }
810 :
811 : impl RepoLock {
812 0 : fn new() -> Result<Self> {
813 0 : let repo_dir = File::open(local_env::base_path())?;
814 0 : match Flock::lock(repo_dir, FlockArg::LockExclusive) {
815 0 : Ok(f) => Ok(Self { _file: f }),
816 0 : Err((_, e)) => Err(e).context("flock error"),
817 : }
818 0 : }
819 : }
820 :
821 : // Main entry point for the 'neon_local' CLI utility
822 : //
823 : // This utility helps to manage neon installation. That includes following:
824 : // * Management of local postgres installations running on top of the
825 : // pageserver.
826 : // * Providing CLI api to the pageserver
827 : // * TODO: export/import to/from usual postgres
828 0 : fn main() -> Result<()> {
829 0 : let cli = Cli::parse();
830 :
831 : // Check for 'neon init' command first.
832 0 : let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
833 0 : (handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
834 : } else {
835 : // This tool uses a collection of simple files to store its state, and consequently
836 : // it is not generally safe to run multiple commands concurrently. Rather than expect
837 : // all callers to know this, use a lock file to protect against concurrent execution.
838 0 : let _repo_lock = RepoLock::new().unwrap();
839 :
840 : // all other commands need an existing config
841 0 : let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
842 0 : let original_env = env.clone();
843 0 : let env = Box::leak(Box::new(env));
844 0 : let rt = tokio::runtime::Builder::new_current_thread()
845 0 : .enable_all()
846 0 : .build()
847 0 : .unwrap();
848 :
849 0 : let subcommand_result = match cli.command {
850 0 : NeonLocalCmd::Init(_) => unreachable!("init was handled earlier already"),
851 0 : NeonLocalCmd::Start(args) => rt.block_on(handle_start_all(&args, env)),
852 0 : NeonLocalCmd::Stop(args) => rt.block_on(handle_stop_all(&args, env)),
853 0 : NeonLocalCmd::Tenant(subcmd) => rt.block_on(handle_tenant(&subcmd, env)),
854 0 : NeonLocalCmd::Timeline(subcmd) => rt.block_on(handle_timeline(&subcmd, env)),
855 0 : NeonLocalCmd::Pageserver(subcmd) => rt.block_on(handle_pageserver(&subcmd, env)),
856 0 : NeonLocalCmd::StorageController(subcmd) => {
857 0 : rt.block_on(handle_storage_controller(&subcmd, env))
858 : }
859 0 : NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
860 0 : NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
861 0 : NeonLocalCmd::EndpointStorage(subcmd) => {
862 0 : rt.block_on(handle_endpoint_storage(&subcmd, env))
863 : }
864 0 : NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
865 0 : NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
866 : };
867 :
868 0 : let subcommand_result = if &original_env != env {
869 0 : subcommand_result.map(|()| Some(Cow::Borrowed(env)))
870 : } else {
871 0 : subcommand_result.map(|()| None)
872 : };
873 0 : (subcommand_result, Some(_repo_lock))
874 : };
875 :
876 0 : match subcommand_result {
877 0 : Ok(Some(updated_env)) => updated_env.persist_config()?,
878 0 : Ok(None) => (),
879 0 : Err(e) => {
880 0 : eprintln!("command failed: {e:?}");
881 0 : exit(1);
882 : }
883 : }
884 0 : Ok(())
885 0 : }
886 :
887 : ///
888 : /// Prints timelines list as a tree-like structure.
889 : ///
890 0 : fn print_timelines_tree(
891 0 : timelines: Vec<TimelineInfo>,
892 0 : mut timeline_name_mappings: HashMap<TenantTimelineId, String>,
893 0 : ) -> Result<()> {
894 0 : let mut timelines_hash = timelines
895 0 : .iter()
896 0 : .map(|t| {
897 0 : (
898 0 : t.timeline_id,
899 0 : TimelineTreeEl {
900 0 : info: t.clone(),
901 0 : children: BTreeSet::new(),
902 0 : name: timeline_name_mappings
903 0 : .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)),
904 0 : },
905 0 : )
906 0 : })
907 0 : .collect::<HashMap<_, _>>();
908 :
909 : // Memorize all direct children of each timeline.
910 0 : for timeline in timelines.iter() {
911 0 : if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
912 0 : timelines_hash
913 0 : .get_mut(&ancestor_timeline_id)
914 0 : .context("missing timeline info in the HashMap")?
915 : .children
916 0 : .insert(timeline.timeline_id);
917 0 : }
918 : }
919 :
920 0 : for timeline in timelines_hash.values() {
921 : // Start with root local timelines (no ancestors) first.
922 0 : if timeline.info.ancestor_timeline_id.is_none() {
923 0 : print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
924 0 : }
925 : }
926 :
927 0 : Ok(())
928 0 : }
929 :
930 : ///
931 : /// Recursively prints timeline info with all its children.
932 : ///
933 0 : fn print_timeline(
934 0 : nesting_level: usize,
935 0 : is_last: &[bool],
936 0 : timeline: &TimelineTreeEl,
937 0 : timelines: &HashMap<TimelineId, TimelineTreeEl>,
938 0 : ) -> Result<()> {
939 0 : if nesting_level > 0 {
940 0 : let ancestor_lsn = match timeline.info.ancestor_lsn {
941 0 : Some(lsn) => lsn.to_string(),
942 0 : None => "Unknown Lsn".to_string(),
943 : };
944 :
945 0 : let mut br_sym = "┣━";
946 :
947 : // Draw each nesting padding with proper style
948 : // depending on whether its timeline ended or not.
949 0 : if nesting_level > 1 {
950 0 : for l in &is_last[1..is_last.len() - 1] {
951 0 : if *l {
952 0 : print!(" ");
953 0 : } else {
954 0 : print!("┃ ");
955 0 : }
956 : }
957 0 : }
958 :
959 : // We are the last in this sub-timeline
960 0 : if *is_last.last().unwrap() {
961 0 : br_sym = "┗━";
962 0 : }
963 :
964 0 : print!("{br_sym} @{ancestor_lsn}: ");
965 0 : }
966 :
967 : // Finally print a timeline id and name with new line
968 0 : println!(
969 0 : "{} [{}]",
970 0 : timeline.name.as_deref().unwrap_or("_no_name_"),
971 : timeline.info.timeline_id
972 : );
973 :
974 0 : let len = timeline.children.len();
975 0 : let mut i: usize = 0;
976 0 : let mut is_last_new = Vec::from(is_last);
977 0 : is_last_new.push(false);
978 :
979 0 : for child in &timeline.children {
980 0 : i += 1;
981 :
982 : // Mark that the last padding is the end of the timeline
983 0 : if i == len {
984 0 : if let Some(last) = is_last_new.last_mut() {
985 0 : *last = true;
986 0 : }
987 0 : }
988 :
989 0 : print_timeline(
990 0 : nesting_level + 1,
991 0 : &is_last_new,
992 0 : timelines
993 0 : .get(child)
994 0 : .context("missing timeline info in the HashMap")?,
995 0 : timelines,
996 0 : )?;
997 : }
998 :
999 0 : Ok(())
1000 0 : }
1001 :
1002 : /// Helper function to get tenant id from an optional --tenant_id option or from the config file
1003 0 : fn get_tenant_id(
1004 0 : tenant_id_arg: Option<TenantId>,
1005 0 : env: &local_env::LocalEnv,
1006 0 : ) -> anyhow::Result<TenantId> {
1007 0 : if let Some(tenant_id_from_arguments) = tenant_id_arg {
1008 0 : Ok(tenant_id_from_arguments)
1009 0 : } else if let Some(default_id) = env.default_tenant_id {
1010 0 : Ok(default_id)
1011 : } else {
1012 0 : anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
1013 : }
1014 0 : }
1015 :
1016 : /// Helper function to get tenant-shard ID from an optional --tenant_id option or from the config file,
1017 : /// for commands that accept a shard suffix
1018 0 : fn get_tenant_shard_id(
1019 0 : tenant_shard_id_arg: Option<TenantShardId>,
1020 0 : env: &local_env::LocalEnv,
1021 0 : ) -> anyhow::Result<TenantShardId> {
1022 0 : if let Some(tenant_id_from_arguments) = tenant_shard_id_arg {
1023 0 : Ok(tenant_id_from_arguments)
1024 0 : } else if let Some(default_id) = env.default_tenant_id {
1025 0 : Ok(TenantShardId::unsharded(default_id))
1026 : } else {
1027 0 : anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
1028 : }
1029 0 : }
1030 :
1031 0 : fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
1032 : // Create the in-memory `LocalEnv` that we'd normally load from disk in `load_config`.
1033 0 : let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
1034 : // User (likely the Python test suite) provided a description of the environment.
1035 0 : if args.num_pageservers.is_some() {
1036 0 : bail!(
1037 0 : "Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
1038 : );
1039 0 : }
1040 : // load and parse the file
1041 0 : let contents = std::fs::read_to_string(config_path).with_context(|| {
1042 0 : format!(
1043 0 : "Could not read configuration file '{}'",
1044 0 : config_path.display()
1045 : )
1046 0 : })?;
1047 0 : toml_edit::de::from_str(&contents)?
1048 : } else {
1049 : // User (likely interactive) did not provide a description of the environment, give them the default
1050 : NeonLocalInitConf {
1051 0 : control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
1052 0 : auth_token_type: AuthType::NeonJWT,
1053 0 : broker: NeonBroker {
1054 0 : listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
1055 0 : listen_https_addr: None,
1056 0 : },
1057 0 : safekeepers: vec![SafekeeperConf {
1058 0 : id: DEFAULT_SAFEKEEPER_ID,
1059 0 : pg_port: DEFAULT_SAFEKEEPER_PG_PORT,
1060 0 : http_port: DEFAULT_SAFEKEEPER_HTTP_PORT,
1061 0 : ..Default::default()
1062 0 : }],
1063 0 : pageservers: (0..args.num_pageservers.unwrap_or(1))
1064 0 : .map(|i| {
1065 0 : let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
1066 0 : let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
1067 0 : let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
1068 0 : let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
1069 0 : NeonLocalInitPageserverConf {
1070 0 : id: pageserver_id,
1071 0 : listen_pg_addr: format!("127.0.0.1:{pg_port}"),
1072 0 : listen_http_addr: format!("127.0.0.1:{http_port}"),
1073 0 : listen_https_addr: None,
1074 0 : listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
1075 0 : pg_auth_type: AuthType::Trust,
1076 0 : http_auth_type: AuthType::Trust,
1077 0 : grpc_auth_type: AuthType::Trust,
1078 0 : other: Default::default(),
1079 0 : // Typical developer machines use disks with slow fsync, and we don't care
1080 0 : // about data integrity: disable disk syncs.
1081 0 : no_sync: true,
1082 0 : }
1083 0 : })
1084 0 : .collect(),
1085 0 : endpoint_storage: EndpointStorageConf {
1086 0 : listen_addr: ENDPOINT_STORAGE_DEFAULT_ADDR,
1087 0 : },
1088 0 : pg_distrib_dir: None,
1089 0 : neon_distrib_dir: None,
1090 0 : default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
1091 0 : storage_controller: None,
1092 0 : control_plane_hooks_api: None,
1093 : generate_local_ssl_certs: false,
1094 : }
1095 : };
1096 :
1097 0 : LocalEnv::init(init_conf, &args.force)
1098 0 : .context("materialize initial neon_local environment on disk")?;
1099 0 : Ok(LocalEnv::load_config(&local_env::base_path())
1100 0 : .expect("freshly written config should be loadable"))
1101 0 : }
1102 :
1103 : /// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
1104 : /// For typical interactive use, one would just run with a single pageserver. Scenarios with
1105 : /// tenant/timeline placement across multiple pageservers are managed by python test code rather
1106 : /// than this CLI.
1107 0 : fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
1108 0 : let ps_conf = env
1109 0 : .pageservers
1110 0 : .first()
1111 0 : .expect("Config is validated to contain at least one pageserver");
1112 0 : PageServerNode::from_env(env, ps_conf)
1113 0 : }
1114 :
1115 0 : async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
1116 0 : let pageserver = get_default_pageserver(env);
1117 0 : match subcmd {
1118 : TenantCmd::List => {
1119 0 : for t in pageserver.tenant_list().await? {
1120 0 : println!("{} {:?}", t.id, t.state);
1121 0 : }
1122 : }
1123 0 : TenantCmd::Import(args) => {
1124 0 : let tenant_id = args.tenant_id;
1125 :
1126 0 : let storage_controller = StorageController::from_env(env);
1127 0 : let create_response = storage_controller.tenant_import(tenant_id).await?;
1128 :
1129 0 : let shard_zero = create_response
1130 0 : .shards
1131 0 : .first()
1132 0 : .expect("Import response omitted shards");
1133 :
1134 0 : let attached_pageserver_id = shard_zero.node_id;
1135 0 : let pageserver =
1136 0 : PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
1137 :
1138 0 : println!(
1139 0 : "Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
1140 : );
1141 :
1142 0 : let timelines = pageserver
1143 0 : .http_client
1144 0 : .list_timelines(shard_zero.shard_id)
1145 0 : .await?;
1146 :
1147 : // Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
1148 0 : let main_timeline = timelines
1149 0 : .iter()
1150 0 : .find(|t| t.ancestor_timeline_id.is_none())
1151 0 : .expect("No timelines found")
1152 : .timeline_id;
1153 :
1154 0 : let mut branch_i = 0;
1155 0 : for timeline in timelines.iter() {
1156 0 : let branch_name = if timeline.timeline_id == main_timeline {
1157 0 : "main".to_string()
1158 : } else {
1159 0 : branch_i += 1;
1160 0 : format!("branch_{branch_i}")
1161 : };
1162 :
1163 0 : println!(
1164 0 : "Importing timeline {tenant_id}/{} as branch {branch_name}",
1165 : timeline.timeline_id
1166 : );
1167 :
1168 0 : env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
1169 : }
1170 : }
1171 0 : TenantCmd::Create(args) => {
1172 0 : let tenant_conf: HashMap<_, _> =
1173 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1174 :
1175 0 : let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
1176 :
1177 : // If tenant ID was not specified, generate one
1178 0 : let tenant_id = args.tenant_id.unwrap_or_else(TenantId::generate);
1179 :
1180 : // We must register the tenant with the storage controller, so
1181 : // that when the pageserver restarts, it will be re-attached.
1182 0 : let storage_controller = StorageController::from_env(env);
1183 0 : storage_controller
1184 0 : .tenant_create(TenantCreateRequest {
1185 0 : // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
1186 0 : // storage controller expects a shard-naive tenant_id in this attribute, and the TenantCreateRequest
1187 0 : // type is used both in the storage controller (for creating tenants) and in the pageserver (for
1188 0 : // creating shards)
1189 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
1190 0 : generation: None,
1191 0 : shard_parameters: ShardParameters {
1192 0 : count: ShardCount::new(args.shard_count),
1193 0 : stripe_size: args
1194 0 : .shard_stripe_size
1195 0 : .map(ShardStripeSize)
1196 0 : .unwrap_or(DEFAULT_STRIPE_SIZE),
1197 0 : },
1198 0 : placement_policy: args.placement_policy.clone(),
1199 0 : config: tenant_conf,
1200 0 : })
1201 0 : .await?;
1202 0 : println!("tenant {tenant_id} successfully created on the pageserver");
1203 :
1204 : // Create an initial timeline for the new tenant
1205 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1206 :
1207 : // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
1208 : // different shards picking different start lsns. Maybe we have to teach storage controller
1209 : // to let shard 0 branch first and then propagate the chosen LSN to other shards.
1210 0 : storage_controller
1211 0 : .tenant_timeline_create(
1212 0 : tenant_id,
1213 0 : TimelineCreateRequest {
1214 0 : new_timeline_id,
1215 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1216 0 : existing_initdb_timeline_id: None,
1217 0 : pg_version: Some(args.pg_version),
1218 0 : },
1219 0 : },
1220 0 : )
1221 0 : .await?;
1222 :
1223 0 : env.register_branch_mapping(
1224 0 : DEFAULT_BRANCH_NAME.to_string(),
1225 0 : tenant_id,
1226 0 : new_timeline_id,
1227 0 : )?;
1228 :
1229 0 : println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",);
1230 :
1231 0 : if args.set_default {
1232 0 : println!("Setting tenant {tenant_id} as a default one");
1233 0 : env.default_tenant_id = Some(tenant_id);
1234 0 : }
1235 : }
1236 0 : TenantCmd::SetDefault(args) => {
1237 0 : println!("Setting tenant {} as a default one", args.tenant_id);
1238 0 : env.default_tenant_id = Some(args.tenant_id);
1239 0 : }
1240 0 : TenantCmd::Config(args) => {
1241 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1242 0 : let tenant_conf: HashMap<_, _> =
1243 0 : args.config.iter().flat_map(|c| c.split_once(':')).collect();
1244 0 : let config = PageServerNode::parse_config(tenant_conf)?;
1245 :
1246 0 : let req = TenantConfigRequest { tenant_id, config };
1247 :
1248 0 : let storage_controller = StorageController::from_env(env);
1249 0 : storage_controller
1250 0 : .set_tenant_config(&req)
1251 0 : .await
1252 0 : .with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
1253 0 : println!("tenant {tenant_id} successfully configured via storcon");
1254 : }
1255 : }
1256 0 : Ok(())
1257 0 : }
1258 :
1259 0 : async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1260 0 : let pageserver = get_default_pageserver(env);
1261 :
1262 0 : match cmd {
1263 0 : TimelineCmd::List(args) => {
1264 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1265 : // where shard 0 is attached, and query there.
1266 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1267 0 : let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
1268 0 : print_timelines_tree(timelines, env.timeline_name_mappings())?;
1269 : }
1270 0 : TimelineCmd::Create(args) => {
1271 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1272 0 : let new_branch_name = &args.branch_name;
1273 0 : let new_timeline_id_opt = args.timeline_id;
1274 0 : let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
1275 :
1276 0 : let storage_controller = StorageController::from_env(env);
1277 0 : let create_req = TimelineCreateRequest {
1278 0 : new_timeline_id,
1279 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
1280 0 : existing_initdb_timeline_id: None,
1281 0 : pg_version: Some(args.pg_version),
1282 0 : },
1283 0 : };
1284 0 : let timeline_info = storage_controller
1285 0 : .tenant_timeline_create(tenant_id, create_req)
1286 0 : .await?;
1287 :
1288 0 : let last_record_lsn = timeline_info.last_record_lsn;
1289 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1290 :
1291 0 : println!(
1292 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
1293 : timeline_info.timeline_id
1294 : );
1295 : }
1296 : // TODO: rename to import-basebackup-plus-wal
1297 0 : TimelineCmd::Import(args) => {
1298 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1299 0 : let timeline_id = args.timeline_id;
1300 0 : let branch_name = &args.branch_name;
1301 :
1302 : // Parse base inputs
1303 0 : let base = (args.base_lsn, args.base_tarfile.clone());
1304 :
1305 : // Parse pg_wal inputs
1306 0 : let wal_tarfile = args.wal_tarfile.clone();
1307 0 : let end_lsn = args.end_lsn;
1308 : // TODO validate both or none are provided
1309 0 : let pg_wal = end_lsn.zip(wal_tarfile);
1310 :
1311 0 : println!("Importing timeline into pageserver ...");
1312 0 : pageserver
1313 0 : .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
1314 0 : .await?;
1315 0 : if env.storage_controller.timelines_onto_safekeepers {
1316 0 : println!("Creating timeline on safekeeper ...");
1317 0 : let timeline_info = pageserver
1318 0 : .timeline_info(
1319 0 : TenantShardId::unsharded(tenant_id),
1320 0 : timeline_id,
1321 0 : pageserver_client::mgmt_api::ForceAwaitLogicalSize::No,
1322 0 : )
1323 0 : .await?;
1324 0 : let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap());
1325 0 : let default_host = default_sk
1326 0 : .conf
1327 0 : .listen_addr
1328 0 : .clone()
1329 0 : .unwrap_or_else(|| "localhost".to_string());
1330 0 : let mconf = safekeeper_api::membership::Configuration {
1331 0 : generation: SafekeeperGeneration::new(1),
1332 0 : members: safekeeper_api::membership::MemberSet {
1333 0 : m: vec![SafekeeperId {
1334 0 : host: default_host,
1335 0 : id: default_sk.conf.id,
1336 0 : pg_port: default_sk.conf.pg_port,
1337 0 : }],
1338 0 : },
1339 0 : new_members: None,
1340 0 : };
1341 0 : let pg_version = PgVersionId::from(args.pg_version);
1342 0 : let req = safekeeper_api::models::TimelineCreateRequest {
1343 0 : tenant_id,
1344 0 : timeline_id,
1345 0 : mconf,
1346 0 : pg_version,
1347 0 : system_id: None,
1348 0 : wal_seg_size: None,
1349 0 : start_lsn: timeline_info.last_record_lsn,
1350 0 : commit_lsn: None,
1351 0 : };
1352 0 : default_sk.create_timeline(&req).await?;
1353 0 : }
1354 0 : env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
1355 0 : println!("Done");
1356 : }
1357 0 : TimelineCmd::Branch(args) => {
1358 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1359 0 : let new_timeline_id = args.timeline_id.unwrap_or(TimelineId::generate());
1360 0 : let new_branch_name = &args.branch_name;
1361 0 : let ancestor_branch_name = args
1362 0 : .ancestor_branch_name
1363 0 : .clone()
1364 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1365 0 : let ancestor_timeline_id = env
1366 0 : .get_branch_timeline_id(&ancestor_branch_name, tenant_id)
1367 0 : .ok_or_else(|| {
1368 0 : anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
1369 0 : })?;
1370 :
1371 0 : let start_lsn = args.ancestor_start_lsn;
1372 0 : let storage_controller = StorageController::from_env(env);
1373 0 : let create_req = TimelineCreateRequest {
1374 0 : new_timeline_id,
1375 0 : mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
1376 0 : ancestor_timeline_id,
1377 0 : ancestor_start_lsn: start_lsn,
1378 0 : read_only: false,
1379 0 : pg_version: None,
1380 0 : },
1381 0 : };
1382 0 : let timeline_info = storage_controller
1383 0 : .tenant_timeline_create(tenant_id, create_req)
1384 0 : .await?;
1385 :
1386 0 : let last_record_lsn = timeline_info.last_record_lsn;
1387 :
1388 0 : env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
1389 :
1390 0 : println!(
1391 0 : "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
1392 : timeline_info.timeline_id
1393 : );
1394 : }
1395 : }
1396 :
1397 0 : Ok(())
1398 0 : }
1399 :
1400 0 : async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Result<()> {
1401 0 : let mut cplane = ComputeControlPlane::load(env.clone())?;
1402 :
1403 0 : match subcmd {
1404 0 : EndpointCmd::List(args) => {
1405 : // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
1406 : // where shard 0 is attached, and query there.
1407 0 : let tenant_shard_id = get_tenant_shard_id(args.tenant_shard_id, env)?;
1408 :
1409 0 : let timeline_name_mappings = env.timeline_name_mappings();
1410 :
1411 0 : let mut table = comfy_table::Table::new();
1412 :
1413 0 : table.load_preset(comfy_table::presets::NOTHING);
1414 :
1415 0 : table.set_header([
1416 0 : "ENDPOINT",
1417 0 : "ADDRESS",
1418 0 : "TIMELINE",
1419 0 : "BRANCH NAME",
1420 0 : "LSN",
1421 0 : "STATUS",
1422 0 : ]);
1423 :
1424 0 : for (endpoint_id, endpoint) in cplane
1425 0 : .endpoints
1426 0 : .iter()
1427 0 : .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id)
1428 : {
1429 0 : let lsn_str = match endpoint.mode {
1430 0 : ComputeMode::Static(lsn) => {
1431 : // -> read-only endpoint
1432 : // Use the node's LSN.
1433 0 : lsn.to_string()
1434 : }
1435 : _ => {
1436 : // As the LSN here refers to the one that the compute is started with,
1437 : // we display nothing as it is a primary/hot standby compute.
1438 0 : "---".to_string()
1439 : }
1440 : };
1441 :
1442 0 : let branch_name = timeline_name_mappings
1443 0 : .get(&TenantTimelineId::new(
1444 0 : tenant_shard_id.tenant_id,
1445 0 : endpoint.timeline_id,
1446 0 : ))
1447 0 : .map(|name| name.as_str())
1448 0 : .unwrap_or("?");
1449 :
1450 0 : table.add_row([
1451 0 : endpoint_id.as_str(),
1452 0 : &endpoint.pg_address.to_string(),
1453 0 : &endpoint.timeline_id.to_string(),
1454 0 : branch_name,
1455 0 : lsn_str.as_str(),
1456 0 : &format!("{}", endpoint.status()),
1457 0 : ]);
1458 : }
1459 :
1460 0 : println!("{table}");
1461 : }
1462 0 : EndpointCmd::Create(args) => {
1463 0 : let tenant_id = get_tenant_id(args.tenant_id, env)?;
1464 0 : let branch_name = args
1465 0 : .branch_name
1466 0 : .clone()
1467 0 : .unwrap_or(DEFAULT_BRANCH_NAME.to_owned());
1468 0 : let endpoint_id = args
1469 0 : .endpoint_id
1470 0 : .clone()
1471 0 : .unwrap_or_else(|| format!("ep-{branch_name}"));
1472 :
1473 0 : let timeline_id = env
1474 0 : .get_branch_timeline_id(&branch_name, tenant_id)
1475 0 : .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
1476 :
1477 0 : let mode = match (args.lsn, args.hot_standby) {
1478 0 : (Some(lsn), false) => ComputeMode::Static(lsn),
1479 0 : (None, true) => ComputeMode::Replica,
1480 0 : (None, false) => ComputeMode::Primary,
1481 0 : (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
1482 : };
1483 :
1484 0 : match (mode, args.hot_standby) {
1485 : (ComputeMode::Static(_), true) => {
1486 0 : bail!(
1487 0 : "Cannot start a node in hot standby mode when it is already configured as a static replica"
1488 : )
1489 : }
1490 : (ComputeMode::Primary, true) => {
1491 0 : bail!(
1492 0 : "Cannot start a node as a hot standby replica, it is already configured as primary node"
1493 : )
1494 : }
1495 0 : _ => {}
1496 : }
1497 :
1498 0 : if !args.allow_multiple {
1499 0 : cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
1500 0 : }
1501 :
1502 0 : cplane.new_endpoint(
1503 0 : &endpoint_id,
1504 0 : tenant_id,
1505 0 : timeline_id,
1506 0 : args.pg_port,
1507 0 : args.external_http_port,
1508 0 : args.internal_http_port,
1509 0 : args.pg_version,
1510 0 : mode,
1511 0 : args.grpc,
1512 0 : !args.update_catalog,
1513 : false,
1514 0 : args.privileged_role_name.clone(),
1515 0 : )?;
1516 : }
1517 0 : EndpointCmd::Start(args) => {
1518 0 : let endpoint_id = &args.endpoint_id;
1519 0 : let pageserver_id = args.endpoint_pageserver_id;
1520 0 : let remote_ext_base_url = &args.remote_ext_base_url;
1521 :
1522 0 : let default_generation = env
1523 0 : .storage_controller
1524 0 : .timelines_onto_safekeepers
1525 0 : .then_some(1);
1526 0 : let safekeepers_generation = args
1527 0 : .safekeepers_generation
1528 0 : .or(default_generation)
1529 0 : .map(SafekeeperGeneration::new);
1530 : // If --safekeepers argument is given, use only the listed
1531 : // safekeeper nodes; otherwise all from the env.
1532 0 : let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
1533 0 : safekeepers
1534 : } else {
1535 0 : env.safekeepers.iter().map(|sk| sk.id).collect()
1536 : };
1537 :
1538 0 : let endpoint = cplane
1539 0 : .endpoints
1540 0 : .get(endpoint_id.as_str())
1541 0 : .ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
1542 :
1543 0 : if !args.allow_multiple {
1544 0 : cplane.check_conflicting_endpoints(
1545 0 : endpoint.mode,
1546 0 : endpoint.tenant_id,
1547 0 : endpoint.timeline_id,
1548 0 : )?;
1549 0 : }
1550 :
1551 0 : let prefer_protocol = if endpoint.grpc {
1552 0 : PageserverProtocol::Grpc
1553 : } else {
1554 0 : PageserverProtocol::Libpq
1555 : };
1556 :
1557 0 : let mut pageserver_conninfo = if let Some(ps_id) = pageserver_id {
1558 0 : let conf = env.get_pageserver_conf(ps_id).unwrap();
1559 0 : local_pageserver_conf_to_conn_info(conf)?
1560 : } else {
1561 : // Look up the currently attached location of the tenant, and its striping metadata,
1562 : // to pass these on to postgres.
1563 0 : let storage_controller = StorageController::from_env(env);
1564 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
1565 0 : assert!(!locate_result.shards.is_empty());
1566 :
1567 : // Initialize LSN leases for static computes.
1568 0 : if let ComputeMode::Static(lsn) = endpoint.mode {
1569 0 : futures::future::try_join_all(locate_result.shards.iter().map(
1570 0 : |shard| async move {
1571 0 : let conf = env.get_pageserver_conf(shard.node_id).unwrap();
1572 0 : let pageserver = PageServerNode::from_env(env, conf);
1573 :
1574 0 : pageserver
1575 0 : .http_client
1576 0 : .timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
1577 0 : .await
1578 0 : },
1579 : ))
1580 0 : .await?;
1581 0 : }
1582 :
1583 0 : tenant_locate_response_to_conn_info(&locate_result)?
1584 : };
1585 0 : pageserver_conninfo.prefer_protocol = prefer_protocol;
1586 :
1587 0 : let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
1588 0 : let auth_token = if matches!(
1589 0 : ps_conf.pg_auth_type,
1590 : AuthType::NeonJWT | AuthType::HadronJWT
1591 : ) {
1592 0 : let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
1593 :
1594 0 : Some(env.generate_auth_token(&claims)?)
1595 : } else {
1596 0 : None
1597 : };
1598 :
1599 0 : let exp = (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?
1600 0 : + Duration::from_secs(86400))
1601 0 : .as_secs();
1602 0 : let claims = endpoint_storage::claims::EndpointStorageClaims {
1603 0 : tenant_id: endpoint.tenant_id,
1604 0 : timeline_id: endpoint.timeline_id,
1605 0 : endpoint_id: endpoint_id.to_string(),
1606 0 : exp,
1607 0 : };
1608 :
1609 0 : let endpoint_storage_token = env.generate_auth_token(&claims)?;
1610 0 : let endpoint_storage_addr = env.endpoint_storage.listen_addr.to_string();
1611 :
1612 0 : let args = control_plane::endpoint::EndpointStartArgs {
1613 0 : auth_token,
1614 0 : endpoint_storage_token,
1615 0 : endpoint_storage_addr,
1616 0 : safekeepers_generation,
1617 0 : safekeepers,
1618 0 : pageserver_conninfo,
1619 0 : remote_ext_base_url: remote_ext_base_url.clone(),
1620 0 : create_test_user: args.create_test_user,
1621 0 : start_timeout: args.start_timeout,
1622 0 : autoprewarm: args.autoprewarm,
1623 0 : offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
1624 0 : dev: args.dev,
1625 0 : };
1626 :
1627 0 : println!("Starting existing endpoint {endpoint_id}...");
1628 0 : endpoint.start(args).await?;
1629 : }
1630 0 : EndpointCmd::UpdatePageservers(args) => {
1631 0 : let endpoint_id = &args.endpoint_id;
1632 0 : let endpoint = cplane
1633 0 : .endpoints
1634 0 : .get(endpoint_id.as_str())
1635 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1636 0 : let prefer_protocol = if endpoint.grpc {
1637 0 : PageserverProtocol::Grpc
1638 : } else {
1639 0 : PageserverProtocol::Libpq
1640 : };
1641 0 : let mut pageserver_conninfo = match args.pageserver_id {
1642 0 : Some(pageserver_id) => {
1643 0 : let conf = env.get_pageserver_conf(pageserver_id)?;
1644 0 : local_pageserver_conf_to_conn_info(conf)?
1645 : }
1646 : None => {
1647 0 : let storage_controller = StorageController::from_env(env);
1648 0 : let locate_result =
1649 0 : storage_controller.tenant_locate(endpoint.tenant_id).await?;
1650 :
1651 0 : tenant_locate_response_to_conn_info(&locate_result)?
1652 : }
1653 : };
1654 0 : pageserver_conninfo.prefer_protocol = prefer_protocol;
1655 :
1656 0 : endpoint
1657 0 : .update_pageservers_in_config(&pageserver_conninfo)
1658 0 : .await?;
1659 : }
1660 0 : EndpointCmd::Reconfigure(args) => {
1661 0 : let endpoint_id = &args.endpoint_id;
1662 0 : let endpoint = cplane
1663 0 : .endpoints
1664 0 : .get(endpoint_id.as_str())
1665 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1666 :
1667 0 : let prefer_protocol = if endpoint.grpc {
1668 0 : PageserverProtocol::Grpc
1669 : } else {
1670 0 : PageserverProtocol::Libpq
1671 : };
1672 0 : let mut pageserver_conninfo = if let Some(ps_id) = args.endpoint_pageserver_id {
1673 0 : let conf = env.get_pageserver_conf(ps_id)?;
1674 0 : local_pageserver_conf_to_conn_info(conf)?
1675 : } else {
1676 : // Look up the currently attached location of the tenant, and its striping metadata,
1677 : // to pass these on to postgres.
1678 0 : let storage_controller = StorageController::from_env(env);
1679 0 : let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
1680 :
1681 0 : tenant_locate_response_to_conn_info(&locate_result)?
1682 : };
1683 0 : pageserver_conninfo.prefer_protocol = prefer_protocol;
1684 :
1685 : // If --safekeepers argument is given, use only the listed
1686 : // safekeeper nodes; otherwise all from the env.
1687 0 : let safekeepers = parse_safekeepers(&args.safekeepers)?;
1688 0 : endpoint
1689 0 : .reconfigure(Some(&pageserver_conninfo), safekeepers, None)
1690 0 : .await?;
1691 : }
1692 0 : EndpointCmd::RefreshConfiguration(args) => {
1693 0 : let endpoint_id = &args.endpoint_id;
1694 0 : let endpoint = cplane
1695 0 : .endpoints
1696 0 : .get(endpoint_id.as_str())
1697 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1698 0 : endpoint.refresh_configuration().await?;
1699 : }
1700 0 : EndpointCmd::Stop(args) => {
1701 0 : let endpoint_id = &args.endpoint_id;
1702 0 : let endpoint = cplane
1703 0 : .endpoints
1704 0 : .get(endpoint_id)
1705 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
1706 0 : match endpoint.stop(args.mode, args.destroy).await?.lsn {
1707 0 : Some(lsn) => println!("{lsn}"),
1708 0 : None => println!("null"),
1709 : }
1710 : }
1711 0 : EndpointCmd::GenerateJwt(args) => {
1712 0 : let endpoint = {
1713 0 : let endpoint_id = &args.endpoint_id;
1714 :
1715 0 : cplane
1716 0 : .endpoints
1717 0 : .get(endpoint_id)
1718 0 : .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
1719 : };
1720 :
1721 0 : let jwt = endpoint.generate_jwt(args.scope)?;
1722 :
1723 0 : print!("{jwt}");
1724 : }
1725 : }
1726 :
1727 0 : Ok(())
1728 0 : }
1729 :
1730 : /// Parse --safekeepers as list of safekeeper ids.
1731 0 : fn parse_safekeepers(safekeepers_str: &Option<String>) -> Result<Option<Vec<NodeId>>> {
1732 0 : if let Some(safekeepers_str) = safekeepers_str {
1733 0 : let mut safekeepers: Vec<NodeId> = Vec::new();
1734 0 : for sk_id in safekeepers_str.split(',').map(str::trim) {
1735 0 : let sk_id = NodeId(
1736 0 : u64::from_str(sk_id)
1737 0 : .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
1738 : );
1739 0 : safekeepers.push(sk_id);
1740 : }
1741 0 : Ok(Some(safekeepers))
1742 : } else {
1743 0 : Ok(None)
1744 : }
1745 0 : }
1746 :
1747 0 : fn handle_mappings(subcmd: &MappingsCmd, env: &mut local_env::LocalEnv) -> Result<()> {
1748 0 : match subcmd {
1749 0 : MappingsCmd::Map(args) => {
1750 0 : env.register_branch_mapping(
1751 0 : args.branch_name.to_owned(),
1752 0 : args.tenant_id,
1753 0 : args.timeline_id,
1754 0 : )?;
1755 :
1756 0 : Ok(())
1757 : }
1758 : }
1759 0 : }
1760 :
1761 0 : fn get_pageserver(
1762 0 : env: &local_env::LocalEnv,
1763 0 : pageserver_id_arg: Option<NodeId>,
1764 0 : ) -> Result<PageServerNode> {
1765 0 : let node_id = pageserver_id_arg.unwrap_or(DEFAULT_PAGESERVER_ID);
1766 :
1767 0 : Ok(PageServerNode::from_env(
1768 0 : env,
1769 0 : env.get_pageserver_conf(node_id)?,
1770 : ))
1771 0 : }
1772 :
1773 0 : async fn handle_pageserver(subcmd: &PageserverCmd, env: &local_env::LocalEnv) -> Result<()> {
1774 0 : match subcmd {
1775 0 : PageserverCmd::Start(args) => {
1776 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?
1777 0 : .start(&args.start_timeout)
1778 0 : .await
1779 : {
1780 0 : eprintln!("pageserver start failed: {e}");
1781 0 : exit(1);
1782 0 : }
1783 : }
1784 :
1785 0 : PageserverCmd::Stop(args) => {
1786 0 : let immediate = match args.stop_mode {
1787 0 : StopMode::Fast => false,
1788 0 : StopMode::Immediate => true,
1789 : };
1790 0 : if let Err(e) = get_pageserver(env, args.pageserver_id)?.stop(immediate) {
1791 0 : eprintln!("pageserver stop failed: {e}");
1792 0 : exit(1);
1793 0 : }
1794 : }
1795 :
1796 0 : PageserverCmd::Restart(args) => {
1797 0 : let pageserver = get_pageserver(env, args.pageserver_id)?;
1798 : //TODO what shutdown strategy should we use here?
1799 0 : if let Err(e) = pageserver.stop(false) {
1800 0 : eprintln!("pageserver stop failed: {e}");
1801 0 : exit(1);
1802 0 : }
1803 :
1804 0 : if let Err(e) = pageserver.start(&args.start_timeout).await {
1805 0 : eprintln!("pageserver start failed: {e}");
1806 0 : exit(1);
1807 0 : }
1808 : }
1809 :
1810 0 : PageserverCmd::Status(args) => {
1811 0 : match get_pageserver(env, args.pageserver_id)?
1812 0 : .check_status()
1813 0 : .await
1814 : {
1815 0 : Ok(_) => println!("Page server is up and running"),
1816 0 : Err(err) => {
1817 0 : eprintln!("Page server is not available: {err}");
1818 0 : exit(1);
1819 : }
1820 : }
1821 : }
1822 : }
1823 0 : Ok(())
1824 0 : }
1825 :
1826 0 : async fn handle_storage_controller(
1827 0 : subcmd: &StorageControllerCmd,
1828 0 : env: &local_env::LocalEnv,
1829 0 : ) -> Result<()> {
1830 0 : let svc = StorageController::from_env(env);
1831 0 : match subcmd {
1832 0 : StorageControllerCmd::Start(args) => {
1833 0 : let start_args = NeonStorageControllerStartArgs {
1834 0 : instance_id: args.instance_id,
1835 0 : base_port: args.base_port,
1836 0 : start_timeout: args.start_timeout,
1837 0 : handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
1838 0 : };
1839 :
1840 0 : if let Err(e) = svc.start(start_args).await {
1841 0 : eprintln!("start failed: {e}");
1842 0 : exit(1);
1843 0 : }
1844 : }
1845 :
1846 0 : StorageControllerCmd::Stop(args) => {
1847 0 : let stop_args = NeonStorageControllerStopArgs {
1848 0 : instance_id: args.instance_id,
1849 0 : immediate: match args.stop_mode {
1850 0 : StopMode::Fast => false,
1851 0 : StopMode::Immediate => true,
1852 : },
1853 : };
1854 0 : if let Err(e) = svc.stop(stop_args).await {
1855 0 : eprintln!("stop failed: {e}");
1856 0 : exit(1);
1857 0 : }
1858 : }
1859 : }
1860 0 : Ok(())
1861 0 : }
1862 :
1863 0 : fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
1864 0 : if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
1865 0 : Ok(SafekeeperNode::from_env(env, node))
1866 : } else {
1867 0 : bail!("could not find safekeeper {id}")
1868 : }
1869 0 : }
1870 :
1871 0 : async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) -> Result<()> {
1872 0 : match subcmd {
1873 0 : SafekeeperCmd::Start(args) => {
1874 0 : let safekeeper = get_safekeeper(env, args.id)?;
1875 :
1876 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1877 0 : eprintln!("safekeeper start failed: {e}");
1878 0 : exit(1);
1879 0 : }
1880 : }
1881 :
1882 0 : SafekeeperCmd::Stop(args) => {
1883 0 : let safekeeper = get_safekeeper(env, args.id)?;
1884 0 : let immediate = match args.stop_mode {
1885 0 : StopMode::Fast => false,
1886 0 : StopMode::Immediate => true,
1887 : };
1888 0 : if let Err(e) = safekeeper.stop(immediate) {
1889 0 : eprintln!("safekeeper stop failed: {e}");
1890 0 : exit(1);
1891 0 : }
1892 : }
1893 :
1894 0 : SafekeeperCmd::Restart(args) => {
1895 0 : let safekeeper = get_safekeeper(env, args.id)?;
1896 0 : let immediate = match args.stop_mode {
1897 0 : StopMode::Fast => false,
1898 0 : StopMode::Immediate => true,
1899 : };
1900 :
1901 0 : if let Err(e) = safekeeper.stop(immediate) {
1902 0 : eprintln!("safekeeper stop failed: {e}");
1903 0 : exit(1);
1904 0 : }
1905 :
1906 0 : if let Err(e) = safekeeper.start(&args.extra_opt, &args.start_timeout).await {
1907 0 : eprintln!("safekeeper start failed: {e}");
1908 0 : exit(1);
1909 0 : }
1910 : }
1911 : }
1912 0 : Ok(())
1913 0 : }
1914 :
1915 0 : async fn handle_endpoint_storage(
1916 0 : subcmd: &EndpointStorageCmd,
1917 0 : env: &local_env::LocalEnv,
1918 0 : ) -> Result<()> {
1919 : use EndpointStorageCmd::*;
1920 0 : let storage = EndpointStorage::from_env(env);
1921 :
1922 : // In tests like test_forward_compatibility or test_graceful_cluster_restart
1923 : // old neon binaries (without endpoint_storage) are present
1924 0 : if !storage.bin.exists() {
1925 0 : eprintln!(
1926 0 : "{} binary not found. Ignore if this is a compatibility test",
1927 : storage.bin
1928 : );
1929 0 : return Ok(());
1930 0 : }
1931 :
1932 0 : match subcmd {
1933 0 : Start(EndpointStorageStartCmd { start_timeout }) => {
1934 0 : if let Err(e) = storage.start(start_timeout).await {
1935 0 : eprintln!("endpoint_storage start failed: {e}");
1936 0 : exit(1);
1937 0 : }
1938 : }
1939 0 : Stop(EndpointStorageStopCmd { stop_mode }) => {
1940 0 : let immediate = match stop_mode {
1941 0 : StopMode::Fast => false,
1942 0 : StopMode::Immediate => true,
1943 : };
1944 0 : if let Err(e) = storage.stop(immediate) {
1945 0 : eprintln!("proxy stop failed: {e}");
1946 0 : exit(1);
1947 0 : }
1948 : }
1949 : };
1950 0 : Ok(())
1951 0 : }
1952 :
1953 0 : async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
1954 0 : match subcmd {
1955 0 : StorageBrokerCmd::Start(args) => {
1956 0 : let storage_broker = StorageBroker::from_env(env);
1957 0 : if let Err(e) = storage_broker.start(&args.start_timeout).await {
1958 0 : eprintln!("broker start failed: {e}");
1959 0 : exit(1);
1960 0 : }
1961 : }
1962 :
1963 0 : StorageBrokerCmd::Stop(_args) => {
1964 : // FIXME: stop_mode unused
1965 0 : let storage_broker = StorageBroker::from_env(env);
1966 0 : if let Err(e) = storage_broker.stop() {
1967 0 : eprintln!("broker stop failed: {e}");
1968 0 : exit(1);
1969 0 : }
1970 : }
1971 : }
1972 0 : Ok(())
1973 0 : }
1974 :
1975 0 : async fn handle_start_all(
1976 0 : args: &StartCmdArgs,
1977 0 : env: &'static local_env::LocalEnv,
1978 0 : ) -> anyhow::Result<()> {
1979 : // FIXME: this was called "retry_timeout", is it right?
1980 0 : let Err(errors) = handle_start_all_impl(env, args.timeout).await else {
1981 0 : neon_start_status_check(env, args.timeout.as_ref())
1982 0 : .await
1983 0 : .context("status check after successful startup of all services")?;
1984 0 : return Ok(());
1985 : };
1986 :
1987 0 : eprintln!("startup failed because one or more services could not be started");
1988 :
1989 0 : for e in errors {
1990 0 : eprintln!("{e}");
1991 0 : let debug_repr = format!("{e:?}");
1992 0 : for line in debug_repr.lines() {
1993 0 : eprintln!(" {line}");
1994 0 : }
1995 : }
1996 :
1997 0 : try_stop_all(env, true).await;
1998 :
1999 0 : exit(2);
2000 0 : }
2001 :
2002 : /// Returns Ok() if and only if all services could be started successfully.
2003 : /// Otherwise, returns the list of errors that occurred during startup.
2004 0 : async fn handle_start_all_impl(
2005 0 : env: &'static local_env::LocalEnv,
2006 0 : retry_timeout: humantime::Duration,
2007 0 : ) -> Result<(), Vec<anyhow::Error>> {
2008 : // Endpoints are not started automatically
2009 :
2010 0 : let mut js = JoinSet::new();
2011 :
2012 : // force infalliblity through closure
2013 : #[allow(clippy::redundant_closure_call)]
2014 0 : (|| {
2015 0 : js.spawn(async move {
2016 0 : let storage_broker = StorageBroker::from_env(env);
2017 0 : storage_broker
2018 0 : .start(&retry_timeout)
2019 0 : .await
2020 0 : .map_err(|e| e.context("start storage_broker"))
2021 0 : });
2022 :
2023 0 : js.spawn(async move {
2024 0 : let storage_controller = StorageController::from_env(env);
2025 0 : storage_controller
2026 0 : .start(NeonStorageControllerStartArgs::with_default_instance_id(
2027 0 : retry_timeout,
2028 0 : ))
2029 0 : .await
2030 0 : .map_err(|e| e.context("start storage_controller"))
2031 0 : });
2032 :
2033 0 : for ps_conf in &env.pageservers {
2034 0 : js.spawn(async move {
2035 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
2036 0 : pageserver
2037 0 : .start(&retry_timeout)
2038 0 : .await
2039 0 : .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
2040 0 : });
2041 : }
2042 :
2043 0 : for node in env.safekeepers.iter() {
2044 0 : js.spawn(async move {
2045 0 : let safekeeper = SafekeeperNode::from_env(env, node);
2046 0 : safekeeper
2047 0 : .start(&[], &retry_timeout)
2048 0 : .await
2049 0 : .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
2050 0 : });
2051 : }
2052 :
2053 0 : js.spawn(async move {
2054 0 : EndpointStorage::from_env(env)
2055 0 : .start(&retry_timeout)
2056 0 : .await
2057 0 : .map_err(|e| e.context("start endpoint_storage"))
2058 0 : });
2059 : })();
2060 :
2061 0 : let mut errors = Vec::new();
2062 0 : while let Some(result) = js.join_next().await {
2063 0 : let result = result.expect("we don't panic or cancel the tasks");
2064 0 : if let Err(e) = result {
2065 0 : errors.push(e);
2066 0 : }
2067 : }
2068 :
2069 0 : if !errors.is_empty() {
2070 0 : return Err(errors);
2071 0 : }
2072 :
2073 0 : Ok(())
2074 0 : }
2075 :
2076 0 : async fn neon_start_status_check(
2077 0 : env: &local_env::LocalEnv,
2078 0 : retry_timeout: &Duration,
2079 0 : ) -> anyhow::Result<()> {
2080 : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
2081 : const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
2082 :
2083 0 : let storcon = StorageController::from_env(env);
2084 :
2085 0 : let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
2086 0 : let notice_after_retries = retry_timeout.as_millis() / NOTICE_AFTER_RETRIES.as_millis();
2087 :
2088 0 : println!("\nRunning neon status check");
2089 :
2090 0 : for retry in 0..retries {
2091 0 : if retry == notice_after_retries {
2092 0 : println!("\nNeon status check has not passed yet, continuing to wait")
2093 0 : }
2094 :
2095 0 : let mut passed = true;
2096 0 : let mut nodes = storcon.node_list().await?;
2097 0 : let mut pageservers = env.pageservers.clone();
2098 :
2099 0 : if nodes.len() != pageservers.len() {
2100 0 : continue;
2101 0 : }
2102 :
2103 0 : nodes.sort_by_key(|ps| ps.id);
2104 0 : pageservers.sort_by_key(|ps| ps.id);
2105 :
2106 0 : for (idx, pageserver) in pageservers.iter().enumerate() {
2107 0 : let node = &nodes[idx];
2108 0 : if node.id != pageserver.id {
2109 0 : passed = false;
2110 0 : break;
2111 0 : }
2112 :
2113 0 : if !matches!(node.availability, NodeAvailabilityWrapper::Active) {
2114 0 : passed = false;
2115 0 : break;
2116 0 : }
2117 : }
2118 :
2119 0 : if passed {
2120 0 : println!("\nNeon started and passed status check");
2121 0 : return Ok(());
2122 0 : }
2123 :
2124 0 : tokio::time::sleep(RETRY_INTERVAL).await;
2125 : }
2126 :
2127 0 : anyhow::bail!("\nNeon passed status check")
2128 0 : }
2129 :
2130 0 : async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Result<()> {
2131 0 : let immediate = match args.mode {
2132 0 : StopMode::Fast => false,
2133 0 : StopMode::Immediate => true,
2134 : };
2135 :
2136 0 : try_stop_all(env, immediate).await;
2137 :
2138 0 : Ok(())
2139 0 : }
2140 :
2141 0 : async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
2142 0 : let mode = if immediate {
2143 0 : EndpointTerminateMode::Immediate
2144 : } else {
2145 0 : EndpointTerminateMode::Fast
2146 : };
2147 : // Stop all endpoints
2148 0 : match ComputeControlPlane::load(env.clone()) {
2149 0 : Ok(cplane) => {
2150 0 : for (_k, node) in cplane.endpoints {
2151 0 : if let Err(e) = node.stop(mode, false).await {
2152 0 : eprintln!("postgres stop failed: {e:#}");
2153 0 : }
2154 : }
2155 : }
2156 0 : Err(e) => {
2157 0 : eprintln!("postgres stop failed, could not restore control plane data from env: {e:#}")
2158 : }
2159 : }
2160 :
2161 0 : let storage = EndpointStorage::from_env(env);
2162 0 : if let Err(e) = storage.stop(immediate) {
2163 0 : eprintln!("endpoint_storage stop failed: {e:#}");
2164 0 : }
2165 :
2166 0 : for ps_conf in &env.pageservers {
2167 0 : let pageserver = PageServerNode::from_env(env, ps_conf);
2168 0 : if let Err(e) = pageserver.stop(immediate) {
2169 0 : eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
2170 0 : }
2171 : }
2172 :
2173 0 : for node in env.safekeepers.iter() {
2174 0 : let safekeeper = SafekeeperNode::from_env(env, node);
2175 0 : if let Err(e) = safekeeper.stop(immediate) {
2176 0 : eprintln!("safekeeper {} stop failed: {:#}", safekeeper.id, e);
2177 0 : }
2178 : }
2179 :
2180 0 : let storage_broker = StorageBroker::from_env(env);
2181 0 : if let Err(e) = storage_broker.stop() {
2182 0 : eprintln!("neon broker stop failed: {e:#}");
2183 0 : }
2184 :
2185 : // Stop all storage controller instances. In the most common case there's only one,
2186 : // but iterate though the base data directory in order to discover the instances.
2187 0 : let storcon_instances = env
2188 0 : .storage_controller_instances()
2189 0 : .await
2190 0 : .expect("Must inspect data dir");
2191 0 : for (instance_id, _instance_dir_path) in storcon_instances {
2192 0 : let storage_controller = StorageController::from_env(env);
2193 0 : let stop_args = NeonStorageControllerStopArgs {
2194 0 : instance_id,
2195 0 : immediate,
2196 0 : };
2197 :
2198 0 : if let Err(e) = storage_controller.stop(stop_args).await {
2199 0 : eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
2200 0 : }
2201 : }
2202 0 : }
|