Line data Source code
1 : //! Functions for handling page server configuration options
2 : //!
3 : //! Configuration options can be set in the pageserver.toml configuration
4 : //! file, or on the command line.
5 : //! See also `settings.md` for better description on every parameter.
6 :
7 : use anyhow::{anyhow, bail, ensure, Context, Result};
8 : use pageserver_api::shard::TenantShardId;
9 : use remote_storage::{RemotePath, RemoteStorageConfig};
10 : use serde::de::IntoDeserializer;
11 : use std::env;
12 : use storage_broker::Uri;
13 : use utils::crashsafe::path_with_suffix_extension;
14 : use utils::id::ConnectionId;
15 : use utils::logging::SecretString;
16 :
17 : use once_cell::sync::OnceCell;
18 : use reqwest::Url;
19 : use std::num::NonZeroUsize;
20 : use std::str::FromStr;
21 : use std::sync::Arc;
22 : use std::time::Duration;
23 : use toml_edit;
24 : use toml_edit::{Document, Item};
25 :
26 : use camino::{Utf8Path, Utf8PathBuf};
27 : use postgres_backend::AuthType;
28 : use utils::{
29 : id::{NodeId, TimelineId},
30 : logging::LogFormat,
31 : };
32 :
33 : use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
34 : use crate::tenant::config::TenantConf;
35 : use crate::tenant::config::TenantConfOpt;
36 : use crate::tenant::{
37 : TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
38 : };
39 : use crate::virtual_file;
40 : use crate::{
41 : IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
42 : TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
43 : };
44 :
45 : use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
46 :
47 : use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
48 :
49 : pub mod defaults {
50 : use crate::tenant::config::defaults::*;
51 : use const_format::formatcp;
52 :
53 : pub use pageserver_api::{
54 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
55 : DEFAULT_PG_LISTEN_PORT,
56 : };
57 : pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
58 :
59 : pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
60 : pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
61 :
62 : pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
63 :
64 : pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
65 : pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
66 :
67 : pub const DEFAULT_LOG_FORMAT: &str = "plain";
68 :
69 : pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
70 :
71 : pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
72 : super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
73 :
74 : pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
75 : pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
76 : pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
77 : pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
78 : pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
79 :
80 : pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
81 : pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
82 :
83 : pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
84 :
85 : pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
86 :
87 : ///
88 : /// Default built-in configuration file.
89 : ///
90 : pub const DEFAULT_CONFIG_FILE: &str = formatcp!(
91 : r#"
92 : # Initial configuration file created by 'pageserver --init'
93 : #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
94 : #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
95 :
96 : #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
97 : #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
98 :
99 : #page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
100 : #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
101 :
102 : # initial superuser role name to use when creating a new tenant
103 : #initial_superuser_name = '{DEFAULT_SUPERUSER}'
104 :
105 : #broker_endpoint = '{BROKER_DEFAULT_ENDPOINT}'
106 :
107 : #log_format = '{DEFAULT_LOG_FORMAT}'
108 :
109 : #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
110 : #concurrent_tenant_warmup = '{DEFAULT_CONCURRENT_TENANT_WARMUP}'
111 :
112 : #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
113 : #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
114 : #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
115 :
116 : #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
117 :
118 : #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
119 :
120 : #ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
121 :
122 : #virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
123 :
124 : [tenant_config]
125 : #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
126 : #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
127 : #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
128 : #compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
129 : #compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
130 :
131 : #gc_period = '{DEFAULT_GC_PERIOD}'
132 : #gc_horizon = {DEFAULT_GC_HORIZON}
133 : #image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD}
134 : #pitr_interval = '{DEFAULT_PITR_INTERVAL}'
135 :
136 : #min_resident_size_override = .. # in bytes
137 : #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
138 : #gc_feedback = false
139 :
140 : #heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
141 : #secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
142 :
143 : [remote_storage]
144 :
145 : "#
146 : );
147 : }
148 :
149 4 : #[derive(Debug, Clone, PartialEq, Eq)]
150 : pub struct PageServerConf {
151 : // Identifier of that particular pageserver so e g safekeepers
152 : // can safely distinguish different pageservers
153 : pub id: NodeId,
154 :
155 : /// Example (default): 127.0.0.1:64000
156 : pub listen_pg_addr: String,
157 : /// Example (default): 127.0.0.1:9898
158 : pub listen_http_addr: String,
159 :
160 : /// Current availability zone. Used for traffic metrics.
161 : pub availability_zone: Option<String>,
162 :
163 : // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
164 : pub wait_lsn_timeout: Duration,
165 : // How long to wait for WAL redo to complete.
166 : pub wal_redo_timeout: Duration,
167 :
168 : pub superuser: String,
169 :
170 : pub page_cache_size: usize,
171 : pub max_file_descriptors: usize,
172 :
173 : // Repository directory, relative to current working directory.
174 : // Normally, the page server changes the current working directory
175 : // to the repository, and 'workdir' is always '.'. But we don't do
176 : // that during unit testing, because the current directory is global
177 : // to the process but different unit tests work on different
178 : // repositories.
179 : pub workdir: Utf8PathBuf,
180 :
181 : pub pg_distrib_dir: Utf8PathBuf,
182 :
183 : // Authentication
184 : /// authentication method for the HTTP mgmt API
185 : pub http_auth_type: AuthType,
186 : /// authentication method for libpq connections from compute
187 : pub pg_auth_type: AuthType,
188 : /// Path to a file or directory containing public key(s) for verifying JWT tokens.
189 : /// Used for both mgmt and compute auth, if enabled.
190 : pub auth_validation_public_key_path: Option<Utf8PathBuf>,
191 :
192 : pub remote_storage_config: Option<RemoteStorageConfig>,
193 :
194 : pub default_tenant_conf: TenantConf,
195 :
196 : /// Storage broker endpoints to connect to.
197 : pub broker_endpoint: Uri,
198 : pub broker_keepalive_interval: Duration,
199 :
200 : pub log_format: LogFormat,
201 :
202 : /// Number of tenants which will be concurrently loaded from remote storage proactively on startup,
203 : /// does not limit tenants loaded in response to client I/O. A lower value implicitly deprioritizes
204 : /// loading such tenants, vs. other work in the system.
205 : pub concurrent_tenant_warmup: ConfigurableSemaphore,
206 :
207 : /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
208 : pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
209 : /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
210 : /// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
211 : /// See the comment in `eviction_task` for details.
212 : ///
213 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
214 : pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
215 :
216 : // How often to collect metrics and send them to the metrics endpoint.
217 : pub metric_collection_interval: Duration,
218 : // How often to send unchanged cached metrics to the metrics endpoint.
219 : pub cached_metric_collection_interval: Duration,
220 : pub metric_collection_endpoint: Option<Url>,
221 : pub synthetic_size_calculation_interval: Duration,
222 :
223 : pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
224 :
225 : pub test_remote_failures: u64,
226 :
227 : pub ondemand_download_behavior_treat_error_as_warn: bool,
228 :
229 : /// How long will background tasks be delayed at most after initial load of tenants.
230 : ///
231 : /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
232 : /// as we now isolate initial loading, initial logical size calculation and background tasks.
233 : /// Smaller nodes will have background tasks "not running" for this long unless every timeline
234 : /// has it's initial logical size calculated. Not running background tasks for some seconds is
235 : /// not terrible.
236 : pub background_task_maximum_delay: Duration,
237 :
238 : pub control_plane_api: Option<Url>,
239 :
240 : /// JWT token for use with the control plane API.
241 : pub control_plane_api_token: Option<SecretString>,
242 :
243 : /// If true, pageserver will make best-effort to operate without a control plane: only
244 : /// for use in major incidents.
245 : pub control_plane_emergency_mode: bool,
246 :
247 : /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
248 : /// heatmap uploads vs. other remote storage operations.
249 : pub heatmap_upload_concurrency: usize,
250 :
251 : /// How many remote storage downloads may be done for secondary tenants concurrently. Implicitly
252 : /// deprioritises secondary downloads vs. remote storage operations for attached tenants.
253 : pub secondary_download_concurrency: usize,
254 :
255 : /// Maximum number of WAL records to be ingested and committed at the same time
256 : pub ingest_batch_size: u64,
257 :
258 : pub virtual_file_io_engine: virtual_file::IoEngineKind,
259 : }
260 :
261 : /// We do not want to store this in a PageServerConf because the latter may be logged
262 : /// and/or serialized at a whim, while the token is secret. Currently this token is the
263 : /// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
264 : /// the future, more tokens and auth may arrive for storage broker, completely changing the logic.
265 : /// Hence, we resort to a global variable for now instead of passing the token from the
266 : /// startup code to the connection code through a dozen layers.
267 : pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
268 :
269 : // use dedicated enum for builder to better indicate the intention
270 : // and avoid possible confusion with nested options
271 : pub enum BuilderValue<T> {
272 : Set(T),
273 : NotSet,
274 : }
275 :
276 : impl<T> BuilderValue<T> {
277 36417 : pub fn ok_or<E>(self, err: E) -> Result<T, E> {
278 36417 : match self {
279 36416 : Self::Set(v) => Ok(v),
280 1 : Self::NotSet => Err(err),
281 : }
282 36417 : }
283 : }
284 :
285 : // needed to simplify config construction
286 : struct PageServerConfigBuilder {
287 : listen_pg_addr: BuilderValue<String>,
288 :
289 : listen_http_addr: BuilderValue<String>,
290 :
291 : availability_zone: BuilderValue<Option<String>>,
292 :
293 : wait_lsn_timeout: BuilderValue<Duration>,
294 : wal_redo_timeout: BuilderValue<Duration>,
295 :
296 : superuser: BuilderValue<String>,
297 :
298 : page_cache_size: BuilderValue<usize>,
299 : max_file_descriptors: BuilderValue<usize>,
300 :
301 : workdir: BuilderValue<Utf8PathBuf>,
302 :
303 : pg_distrib_dir: BuilderValue<Utf8PathBuf>,
304 :
305 : http_auth_type: BuilderValue<AuthType>,
306 : pg_auth_type: BuilderValue<AuthType>,
307 :
308 : //
309 : auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
310 : remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
311 :
312 : id: BuilderValue<NodeId>,
313 :
314 : broker_endpoint: BuilderValue<Uri>,
315 : broker_keepalive_interval: BuilderValue<Duration>,
316 :
317 : log_format: BuilderValue<LogFormat>,
318 :
319 : concurrent_tenant_warmup: BuilderValue<NonZeroUsize>,
320 : concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
321 :
322 : metric_collection_interval: BuilderValue<Duration>,
323 : cached_metric_collection_interval: BuilderValue<Duration>,
324 : metric_collection_endpoint: BuilderValue<Option<Url>>,
325 : synthetic_size_calculation_interval: BuilderValue<Duration>,
326 :
327 : disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
328 :
329 : test_remote_failures: BuilderValue<u64>,
330 :
331 : ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
332 :
333 : background_task_maximum_delay: BuilderValue<Duration>,
334 :
335 : control_plane_api: BuilderValue<Option<Url>>,
336 : control_plane_api_token: BuilderValue<Option<SecretString>>,
337 : control_plane_emergency_mode: BuilderValue<bool>,
338 :
339 : heatmap_upload_concurrency: BuilderValue<usize>,
340 : secondary_download_concurrency: BuilderValue<usize>,
341 :
342 : ingest_batch_size: BuilderValue<u64>,
343 :
344 : virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
345 : }
346 :
347 : impl Default for PageServerConfigBuilder {
348 1041 : fn default() -> Self {
349 1041 : use self::BuilderValue::*;
350 1041 : use defaults::*;
351 1041 : Self {
352 1041 : listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
353 1041 : listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
354 1041 : availability_zone: Set(None),
355 1041 : wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
356 1041 : .expect("cannot parse default wait lsn timeout")),
357 1041 : wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
358 1041 : .expect("cannot parse default wal redo timeout")),
359 1041 : superuser: Set(DEFAULT_SUPERUSER.to_string()),
360 1041 : page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
361 1041 : max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
362 1041 : workdir: Set(Utf8PathBuf::new()),
363 1041 : pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
364 1041 : env::current_dir().expect("cannot access current directory"),
365 1041 : )
366 1041 : .expect("non-Unicode path")
367 1041 : .join("pg_install")),
368 1041 : http_auth_type: Set(AuthType::Trust),
369 1041 : pg_auth_type: Set(AuthType::Trust),
370 1041 : auth_validation_public_key_path: Set(None),
371 1041 : remote_storage_config: Set(None),
372 1041 : id: NotSet,
373 1041 : broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
374 1041 : .parse()
375 1041 : .expect("failed to parse default broker endpoint")),
376 1041 : broker_keepalive_interval: Set(humantime::parse_duration(
377 1041 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
378 1041 : )
379 1041 : .expect("cannot parse default keepalive interval")),
380 1041 : log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
381 1041 :
382 1041 : concurrent_tenant_warmup: Set(NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
383 1041 : .expect("Invalid default constant")),
384 1041 : concurrent_tenant_size_logical_size_queries: Set(
385 1041 : ConfigurableSemaphore::DEFAULT_INITIAL,
386 1041 : ),
387 1041 : metric_collection_interval: Set(humantime::parse_duration(
388 1041 : DEFAULT_METRIC_COLLECTION_INTERVAL,
389 1041 : )
390 1041 : .expect("cannot parse default metric collection interval")),
391 1041 : cached_metric_collection_interval: Set(humantime::parse_duration(
392 1041 : DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
393 1041 : )
394 1041 : .expect("cannot parse default cached_metric_collection_interval")),
395 1041 : synthetic_size_calculation_interval: Set(humantime::parse_duration(
396 1041 : DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
397 1041 : )
398 1041 : .expect("cannot parse default synthetic size calculation interval")),
399 1041 : metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
400 1041 :
401 1041 : disk_usage_based_eviction: Set(None),
402 1041 :
403 1041 : test_remote_failures: Set(0),
404 1041 :
405 1041 : ondemand_download_behavior_treat_error_as_warn: Set(false),
406 1041 :
407 1041 : background_task_maximum_delay: Set(humantime::parse_duration(
408 1041 : DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
409 1041 : )
410 1041 : .unwrap()),
411 1041 :
412 1041 : control_plane_api: Set(None),
413 1041 : control_plane_api_token: Set(None),
414 1041 : control_plane_emergency_mode: Set(false),
415 1041 :
416 1041 : heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
417 1041 : secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
418 1041 :
419 1041 : ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
420 1041 :
421 1041 : virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
422 1041 : }
423 1041 : }
424 : }
425 :
426 : impl PageServerConfigBuilder {
427 1035 : pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
428 1035 : self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
429 1035 : }
430 :
431 1035 : pub fn listen_http_addr(&mut self, listen_http_addr: String) {
432 1035 : self.listen_http_addr = BuilderValue::Set(listen_http_addr)
433 1035 : }
434 :
435 2 : pub fn availability_zone(&mut self, availability_zone: Option<String>) {
436 2 : self.availability_zone = BuilderValue::Set(availability_zone)
437 2 : }
438 :
439 20 : pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
440 20 : self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
441 20 : }
442 :
443 12 : pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
444 12 : self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
445 12 : }
446 :
447 12 : pub fn superuser(&mut self, superuser: String) {
448 12 : self.superuser = BuilderValue::Set(superuser)
449 12 : }
450 :
451 16 : pub fn page_cache_size(&mut self, page_cache_size: usize) {
452 16 : self.page_cache_size = BuilderValue::Set(page_cache_size)
453 16 : }
454 :
455 12 : pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
456 12 : self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
457 12 : }
458 :
459 1041 : pub fn workdir(&mut self, workdir: Utf8PathBuf) {
460 1041 : self.workdir = BuilderValue::Set(workdir)
461 1041 : }
462 :
463 1041 : pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
464 1041 : self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
465 1041 : }
466 :
467 1023 : pub fn http_auth_type(&mut self, auth_type: AuthType) {
468 1023 : self.http_auth_type = BuilderValue::Set(auth_type)
469 1023 : }
470 :
471 1023 : pub fn pg_auth_type(&mut self, auth_type: AuthType) {
472 1023 : self.pg_auth_type = BuilderValue::Set(auth_type)
473 1023 : }
474 :
475 22 : pub fn auth_validation_public_key_path(
476 22 : &mut self,
477 22 : auth_validation_public_key_path: Option<Utf8PathBuf>,
478 22 : ) {
479 22 : self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
480 22 : }
481 :
482 1033 : pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
483 1033 : self.remote_storage_config = BuilderValue::Set(remote_storage_config)
484 1033 : }
485 :
486 1037 : pub fn broker_endpoint(&mut self, broker_endpoint: Uri) {
487 1037 : self.broker_endpoint = BuilderValue::Set(broker_endpoint)
488 1037 : }
489 :
490 0 : pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
491 0 : self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
492 0 : }
493 :
494 1040 : pub fn id(&mut self, node_id: NodeId) {
495 1040 : self.id = BuilderValue::Set(node_id)
496 1040 : }
497 :
498 12 : pub fn log_format(&mut self, log_format: LogFormat) {
499 12 : self.log_format = BuilderValue::Set(log_format)
500 12 : }
501 :
502 5 : pub fn concurrent_tenant_warmup(&mut self, u: NonZeroUsize) {
503 5 : self.concurrent_tenant_warmup = BuilderValue::Set(u);
504 5 : }
505 :
506 0 : pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
507 0 : self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
508 0 : }
509 :
510 23 : pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) {
511 23 : self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
512 23 : }
513 :
514 18 : pub fn cached_metric_collection_interval(
515 18 : &mut self,
516 18 : cached_metric_collection_interval: Duration,
517 18 : ) {
518 18 : self.cached_metric_collection_interval =
519 18 : BuilderValue::Set(cached_metric_collection_interval)
520 18 : }
521 :
522 23 : pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
523 23 : self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
524 23 : }
525 :
526 21 : pub fn synthetic_size_calculation_interval(
527 21 : &mut self,
528 21 : synthetic_size_calculation_interval: Duration,
529 21 : ) {
530 21 : self.synthetic_size_calculation_interval =
531 21 : BuilderValue::Set(synthetic_size_calculation_interval)
532 21 : }
533 :
534 100 : pub fn test_remote_failures(&mut self, fail_first: u64) {
535 100 : self.test_remote_failures = BuilderValue::Set(fail_first);
536 100 : }
537 :
538 6 : pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
539 6 : self.disk_usage_based_eviction = BuilderValue::Set(value);
540 6 : }
541 :
542 0 : pub fn ondemand_download_behavior_treat_error_as_warn(
543 0 : &mut self,
544 0 : ondemand_download_behavior_treat_error_as_warn: bool,
545 0 : ) {
546 0 : self.ondemand_download_behavior_treat_error_as_warn =
547 0 : BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
548 0 : }
549 :
550 16 : pub fn background_task_maximum_delay(&mut self, delay: Duration) {
551 16 : self.background_task_maximum_delay = BuilderValue::Set(delay);
552 16 : }
553 :
554 1023 : pub fn control_plane_api(&mut self, api: Option<Url>) {
555 1023 : self.control_plane_api = BuilderValue::Set(api)
556 1023 : }
557 :
558 22 : pub fn control_plane_api_token(&mut self, token: Option<SecretString>) {
559 22 : self.control_plane_api_token = BuilderValue::Set(token)
560 22 : }
561 :
562 1 : pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
563 1 : self.control_plane_emergency_mode = BuilderValue::Set(enabled)
564 1 : }
565 :
566 0 : pub fn heatmap_upload_concurrency(&mut self, value: usize) {
567 0 : self.heatmap_upload_concurrency = BuilderValue::Set(value)
568 0 : }
569 :
570 0 : pub fn secondary_download_concurrency(&mut self, value: usize) {
571 0 : self.secondary_download_concurrency = BuilderValue::Set(value)
572 0 : }
573 :
574 0 : pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
575 0 : self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
576 0 : }
577 :
578 0 : pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
579 0 : self.virtual_file_io_engine = BuilderValue::Set(value);
580 0 : }
581 :
582 1041 : pub fn build(self) -> anyhow::Result<PageServerConf> {
583 1041 : let concurrent_tenant_warmup = self
584 1041 : .concurrent_tenant_warmup
585 1041 : .ok_or(anyhow!("missing concurrent_tenant_warmup"))?;
586 1041 : let concurrent_tenant_size_logical_size_queries = self
587 1041 : .concurrent_tenant_size_logical_size_queries
588 1041 : .ok_or(anyhow!(
589 1041 : "missing concurrent_tenant_size_logical_size_queries"
590 1041 : ))?;
591 : Ok(PageServerConf {
592 1041 : listen_pg_addr: self
593 1041 : .listen_pg_addr
594 1041 : .ok_or(anyhow!("missing listen_pg_addr"))?,
595 1041 : listen_http_addr: self
596 1041 : .listen_http_addr
597 1041 : .ok_or(anyhow!("missing listen_http_addr"))?,
598 1041 : availability_zone: self
599 1041 : .availability_zone
600 1041 : .ok_or(anyhow!("missing availability_zone"))?,
601 1041 : wait_lsn_timeout: self
602 1041 : .wait_lsn_timeout
603 1041 : .ok_or(anyhow!("missing wait_lsn_timeout"))?,
604 1041 : wal_redo_timeout: self
605 1041 : .wal_redo_timeout
606 1041 : .ok_or(anyhow!("missing wal_redo_timeout"))?,
607 1041 : superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
608 1041 : page_cache_size: self
609 1041 : .page_cache_size
610 1041 : .ok_or(anyhow!("missing page_cache_size"))?,
611 1041 : max_file_descriptors: self
612 1041 : .max_file_descriptors
613 1041 : .ok_or(anyhow!("missing max_file_descriptors"))?,
614 1041 : workdir: self.workdir.ok_or(anyhow!("missing workdir"))?,
615 1041 : pg_distrib_dir: self
616 1041 : .pg_distrib_dir
617 1041 : .ok_or(anyhow!("missing pg_distrib_dir"))?,
618 1041 : http_auth_type: self
619 1041 : .http_auth_type
620 1041 : .ok_or(anyhow!("missing http_auth_type"))?,
621 1041 : pg_auth_type: self.pg_auth_type.ok_or(anyhow!("missing pg_auth_type"))?,
622 1041 : auth_validation_public_key_path: self
623 1041 : .auth_validation_public_key_path
624 1041 : .ok_or(anyhow!("missing auth_validation_public_key_path"))?,
625 1041 : remote_storage_config: self
626 1041 : .remote_storage_config
627 1041 : .ok_or(anyhow!("missing remote_storage_config"))?,
628 1041 : id: self.id.ok_or(anyhow!("missing id"))?,
629 : // TenantConf is handled separately
630 1040 : default_tenant_conf: TenantConf::default(),
631 1040 : broker_endpoint: self
632 1040 : .broker_endpoint
633 1040 : .ok_or(anyhow!("No broker endpoints provided"))?,
634 1040 : broker_keepalive_interval: self
635 1040 : .broker_keepalive_interval
636 1040 : .ok_or(anyhow!("No broker keepalive interval provided"))?,
637 1040 : log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
638 1040 : concurrent_tenant_warmup: ConfigurableSemaphore::new(concurrent_tenant_warmup),
639 1040 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
640 1040 : concurrent_tenant_size_logical_size_queries,
641 1040 : ),
642 1040 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
643 1040 : concurrent_tenant_size_logical_size_queries,
644 1040 : ),
645 1040 : metric_collection_interval: self
646 1040 : .metric_collection_interval
647 1040 : .ok_or(anyhow!("missing metric_collection_interval"))?,
648 1040 : cached_metric_collection_interval: self
649 1040 : .cached_metric_collection_interval
650 1040 : .ok_or(anyhow!("missing cached_metric_collection_interval"))?,
651 1040 : metric_collection_endpoint: self
652 1040 : .metric_collection_endpoint
653 1040 : .ok_or(anyhow!("missing metric_collection_endpoint"))?,
654 1040 : synthetic_size_calculation_interval: self
655 1040 : .synthetic_size_calculation_interval
656 1040 : .ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
657 1040 : disk_usage_based_eviction: self
658 1040 : .disk_usage_based_eviction
659 1040 : .ok_or(anyhow!("missing disk_usage_based_eviction"))?,
660 1040 : test_remote_failures: self
661 1040 : .test_remote_failures
662 1040 : .ok_or(anyhow!("missing test_remote_failuers"))?,
663 1040 : ondemand_download_behavior_treat_error_as_warn: self
664 1040 : .ondemand_download_behavior_treat_error_as_warn
665 1040 : .ok_or(anyhow!(
666 1040 : "missing ondemand_download_behavior_treat_error_as_warn"
667 1040 : ))?,
668 1040 : background_task_maximum_delay: self
669 1040 : .background_task_maximum_delay
670 1040 : .ok_or(anyhow!("missing background_task_maximum_delay"))?,
671 1040 : control_plane_api: self
672 1040 : .control_plane_api
673 1040 : .ok_or(anyhow!("missing control_plane_api"))?,
674 1040 : control_plane_api_token: self
675 1040 : .control_plane_api_token
676 1040 : .ok_or(anyhow!("missing control_plane_api_token"))?,
677 1040 : control_plane_emergency_mode: self
678 1040 : .control_plane_emergency_mode
679 1040 : .ok_or(anyhow!("missing control_plane_emergency_mode"))?,
680 1040 : heatmap_upload_concurrency: self
681 1040 : .heatmap_upload_concurrency
682 1040 : .ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
683 1040 : secondary_download_concurrency: self
684 1040 : .secondary_download_concurrency
685 1040 : .ok_or(anyhow!("missing secondary_download_concurrency"))?,
686 1040 : ingest_batch_size: self
687 1040 : .ingest_batch_size
688 1040 : .ok_or(anyhow!("missing ingest_batch_size"))?,
689 1040 : virtual_file_io_engine: self
690 1040 : .virtual_file_io_engine
691 1040 : .ok_or(anyhow!("missing virtual_file_io_engine"))?,
692 : })
693 1041 : }
694 : }
695 :
696 : impl PageServerConf {
697 : //
698 : // Repository paths, relative to workdir.
699 : //
700 :
701 142536 : pub fn tenants_path(&self) -> Utf8PathBuf {
702 142536 : self.workdir.join(TENANTS_SEGMENT_NAME)
703 142536 : }
704 :
705 2716 : pub fn deletion_prefix(&self) -> Utf8PathBuf {
706 2716 : self.workdir.join("deletion")
707 2716 : }
708 :
709 149 : pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
710 149 : // Encode a version in the filename, so that if we ever switch away from JSON we can
711 149 : // increment this.
712 149 : const VERSION: u8 = 1;
713 149 :
714 149 : self.deletion_prefix()
715 149 : .join(format!("{sequence:016x}-{VERSION:02x}.list"))
716 149 : }
717 :
718 1300 : pub fn deletion_header_path(&self) -> Utf8PathBuf {
719 1300 : // Encode a version in the filename, so that if we ever switch away from JSON we can
720 1300 : // increment this.
721 1300 : const VERSION: u8 = 1;
722 1300 :
723 1300 : self.deletion_prefix().join(format!("header-{VERSION:02x}"))
724 1300 : }
725 :
726 140883 : pub fn tenant_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
727 140883 : self.tenants_path().join(tenant_shard_id.to_string())
728 140883 : }
729 :
730 1787 : pub fn tenant_ignore_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
731 1787 : self.tenant_path(tenant_shard_id)
732 1787 : .join(IGNORED_TENANT_FILE_NAME)
733 1787 : }
734 :
735 : /// Points to a place in pageserver's local directory,
736 : /// where certain tenant's tenantconf file should be located.
737 : ///
738 : /// Legacy: superseded by tenant_location_config_path. Eventually
739 : /// remove this function.
740 1310 : pub fn tenant_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
741 1310 : self.tenant_path(tenant_shard_id).join(TENANT_CONFIG_NAME)
742 1310 : }
743 :
744 1310 : pub fn tenant_location_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
745 1310 : self.tenant_path(tenant_shard_id)
746 1310 : .join(TENANT_LOCATION_CONFIG_NAME)
747 1310 : }
748 :
749 8 : pub(crate) fn tenant_heatmap_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
750 8 : self.tenant_path(tenant_shard_id)
751 8 : .join(TENANT_HEATMAP_BASENAME)
752 8 : }
753 :
754 133859 : pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
755 133859 : self.tenant_path(tenant_shard_id)
756 133859 : .join(TIMELINES_SEGMENT_NAME)
757 133859 : }
758 :
759 131185 : pub fn timeline_path(
760 131185 : &self,
761 131185 : tenant_shard_id: &TenantShardId,
762 131185 : timeline_id: &TimelineId,
763 131185 : ) -> Utf8PathBuf {
764 131185 : self.timelines_path(tenant_shard_id)
765 131185 : .join(timeline_id.to_string())
766 131185 : }
767 :
768 1202 : pub fn timeline_uninit_mark_file_path(
769 1202 : &self,
770 1202 : tenant_shard_id: TenantShardId,
771 1202 : timeline_id: TimelineId,
772 1202 : ) -> Utf8PathBuf {
773 1202 : path_with_suffix_extension(
774 1202 : self.timeline_path(&tenant_shard_id, &timeline_id),
775 1202 : TIMELINE_UNINIT_MARK_SUFFIX,
776 1202 : )
777 1202 : }
778 :
779 172 : pub fn timeline_delete_mark_file_path(
780 172 : &self,
781 172 : tenant_shard_id: TenantShardId,
782 172 : timeline_id: TimelineId,
783 172 : ) -> Utf8PathBuf {
784 172 : path_with_suffix_extension(
785 172 : self.timeline_path(&tenant_shard_id, &timeline_id),
786 172 : TIMELINE_DELETE_MARK_SUFFIX,
787 172 : )
788 172 : }
789 :
790 1026 : pub fn tenant_deleted_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
791 1026 : self.tenant_path(tenant_shard_id)
792 1026 : .join(TENANT_DELETED_MARKER_FILE_NAME)
793 1026 : }
794 :
795 4 : pub fn traces_path(&self) -> Utf8PathBuf {
796 4 : self.workdir.join("traces")
797 4 : }
798 :
799 4 : pub fn trace_path(
800 4 : &self,
801 4 : tenant_shard_id: &TenantShardId,
802 4 : timeline_id: &TimelineId,
803 4 : connection_id: &ConnectionId,
804 4 : ) -> Utf8PathBuf {
805 4 : self.traces_path()
806 4 : .join(tenant_shard_id.to_string())
807 4 : .join(timeline_id.to_string())
808 4 : .join(connection_id.to_string())
809 4 : }
810 :
811 : /// Points to a place in pageserver's local directory,
812 : /// where certain timeline's metadata file should be located.
813 7197 : pub fn metadata_path(
814 7197 : &self,
815 7197 : tenant_shard_id: &TenantShardId,
816 7197 : timeline_id: &TimelineId,
817 7197 : ) -> Utf8PathBuf {
818 7197 : self.timeline_path(tenant_shard_id, timeline_id)
819 7197 : .join(METADATA_FILE_NAME)
820 7197 : }
821 :
822 : /// Turns storage remote path of a file into its local path.
823 0 : pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
824 0 : remote_path.with_base(&self.workdir)
825 0 : }
826 :
827 : //
828 : // Postgres distribution paths
829 : //
830 2404 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
831 2404 : let path = self.pg_distrib_dir.clone();
832 2404 :
833 2404 : #[allow(clippy::manual_range_patterns)]
834 2404 : match pg_version {
835 2404 : 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
836 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
837 : }
838 2404 : }
839 :
840 1202 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
841 1202 : Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
842 1202 : }
843 1202 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
844 1202 : Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
845 1202 : }
846 :
847 : /// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
848 : /// validating the input and failing on errors.
849 : ///
850 : /// This leaves any options not present in the file in the built-in defaults.
851 1041 : pub fn parse_and_validate(toml: &Document, workdir: &Utf8Path) -> anyhow::Result<Self> {
852 1041 : let mut builder = PageServerConfigBuilder::default();
853 1041 : builder.workdir(workdir.to_owned());
854 1041 :
855 1041 : let mut t_conf = TenantConfOpt::default();
856 :
857 10662 : for (key, item) in toml.iter() {
858 10662 : match key {
859 10662 : "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
860 9627 : "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
861 8592 : "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
862 8590 : "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
863 8570 : "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
864 8558 : "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
865 8546 : "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
866 8530 : "max_file_descriptors" => {
867 12 : builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
868 : }
869 8518 : "pg_distrib_dir" => {
870 1041 : builder.pg_distrib_dir(Utf8PathBuf::from(parse_toml_string(key, item)?))
871 : }
872 7477 : "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
873 22 : Utf8PathBuf::from(parse_toml_string(key, item)?),
874 : )),
875 7455 : "http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
876 6432 : "pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
877 5409 : "remote_storage" => {
878 1033 : builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
879 : }
880 4376 : "tenant_config" => {
881 1029 : t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
882 : }
883 3347 : "id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
884 2307 : "broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
885 1270 : "broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
886 1270 : "log_format" => builder.log_format(
887 12 : LogFormat::from_config(&parse_toml_string(key, item)?)?
888 : ),
889 1258 : "concurrent_tenant_warmup" => builder.concurrent_tenant_warmup({
890 5 : let input = parse_toml_string(key, item)?;
891 5 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
892 5 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
893 : }),
894 1253 : "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
895 0 : let input = parse_toml_string(key, item)?;
896 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
897 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
898 : }),
899 1253 : "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
900 1230 : "cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
901 1212 : "metric_collection_endpoint" => {
902 23 : let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
903 23 : builder.metric_collection_endpoint(Some(endpoint));
904 : },
905 1189 : "synthetic_size_calculation_interval" =>
906 21 : builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
907 1168 : "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
908 1068 : "disk_usage_based_eviction" => {
909 6 : tracing::info!("disk_usage_based_eviction: {:#?}", &item);
910 6 : builder.disk_usage_based_eviction(
911 6 : deserialize_from_item("disk_usage_based_eviction", item)
912 6 : .context("parse disk_usage_based_eviction")?
913 : )
914 : },
915 1062 : "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
916 1062 : "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
917 1046 : "control_plane_api" => {
918 1023 : let parsed = parse_toml_string(key, item)?;
919 1023 : if parsed.is_empty() {
920 1 : builder.control_plane_api(None)
921 : } else {
922 1022 : builder.control_plane_api(Some(parsed.parse().context("failed to parse control plane URL")?))
923 : }
924 : },
925 23 : "control_plane_api_token" => {
926 22 : let parsed = parse_toml_string(key, item)?;
927 22 : if parsed.is_empty() {
928 0 : builder.control_plane_api_token(None)
929 : } else {
930 22 : builder.control_plane_api_token(Some(parsed.into()))
931 : }
932 : },
933 1 : "control_plane_emergency_mode" => {
934 1 : builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
935 : },
936 0 : "heatmap_upload_concurrency" => {
937 0 : builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
938 : },
939 0 : "secondary_download_concurrency" => {
940 0 : builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
941 : },
942 0 : "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
943 0 : "virtual_file_io_engine" => {
944 0 : builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
945 : }
946 0 : _ => bail!("unrecognized pageserver option '{key}'"),
947 : }
948 : }
949 :
950 1041 : let mut conf = builder.build().context("invalid config")?;
951 :
952 1040 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
953 22 : let auth_validation_public_key_path = conf
954 22 : .auth_validation_public_key_path
955 22 : .get_or_insert_with(|| workdir.join("auth_public_key.pem"));
956 22 : ensure!(
957 22 : auth_validation_public_key_path.exists(),
958 0 : format!(
959 0 : "Can't find auth_validation_public_key at '{auth_validation_public_key_path}'",
960 0 : )
961 : );
962 1018 : }
963 :
964 1040 : conf.default_tenant_conf = t_conf.merge(TenantConf::default());
965 1040 :
966 1040 : Ok(conf)
967 1041 : }
968 :
969 : #[cfg(test)]
970 94 : pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
971 94 : let test_output_dir = std::env::var("TEST_OUTPUT").unwrap_or("../tmp_check".into());
972 94 : Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}"))
973 94 : }
974 :
975 90 : pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
976 90 : let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
977 90 :
978 90 : PageServerConf {
979 90 : id: NodeId(0),
980 90 : wait_lsn_timeout: Duration::from_secs(60),
981 90 : wal_redo_timeout: Duration::from_secs(60),
982 90 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
983 90 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
984 90 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
985 90 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
986 90 : availability_zone: None,
987 90 : superuser: "cloud_admin".to_string(),
988 90 : workdir: repo_dir,
989 90 : pg_distrib_dir,
990 90 : http_auth_type: AuthType::Trust,
991 90 : pg_auth_type: AuthType::Trust,
992 90 : auth_validation_public_key_path: None,
993 90 : remote_storage_config: None,
994 90 : default_tenant_conf: TenantConf::default(),
995 90 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
996 90 : broker_keepalive_interval: Duration::from_secs(5000),
997 90 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
998 90 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
999 90 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
1000 90 : .expect("Invalid default constant"),
1001 90 : ),
1002 90 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1003 90 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
1004 90 : ),
1005 90 : metric_collection_interval: Duration::from_secs(60),
1006 90 : cached_metric_collection_interval: Duration::from_secs(60 * 60),
1007 90 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1008 90 : synthetic_size_calculation_interval: Duration::from_secs(60),
1009 90 : disk_usage_based_eviction: None,
1010 90 : test_remote_failures: 0,
1011 90 : ondemand_download_behavior_treat_error_as_warn: false,
1012 90 : background_task_maximum_delay: Duration::ZERO,
1013 90 : control_plane_api: None,
1014 90 : control_plane_api_token: None,
1015 90 : control_plane_emergency_mode: false,
1016 90 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1017 90 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1018 90 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1019 90 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1020 90 : }
1021 90 : }
1022 : }
1023 :
1024 : // Helper functions to parse a toml Item
1025 :
1026 5269 : fn parse_toml_string(name: &str, item: &Item) -> Result<String> {
1027 5269 : let s = item
1028 5269 : .as_str()
1029 5269 : .with_context(|| format!("configure option {name} is not a string"))?;
1030 5269 : Ok(s.to_string())
1031 5269 : }
1032 :
1033 1168 : fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
1034 : // A toml integer is signed, so it cannot represent the full range of an u64. That's OK
1035 : // for our use, though.
1036 1168 : let i: i64 = item
1037 1168 : .as_integer()
1038 1168 : .with_context(|| format!("configure option {name} is not an integer"))?;
1039 1168 : if i < 0 {
1040 0 : bail!("configure option {name} cannot be negative");
1041 1168 : }
1042 1168 : Ok(i as u64)
1043 1168 : }
1044 :
1045 1 : fn parse_toml_bool(name: &str, item: &Item) -> Result<bool> {
1046 1 : item.as_bool()
1047 1 : .with_context(|| format!("configure option {name} is not a bool"))
1048 1 : }
1049 :
1050 110 : fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
1051 110 : let s = item
1052 110 : .as_str()
1053 110 : .with_context(|| format!("configure option {name} is not a string"))?;
1054 :
1055 110 : Ok(humantime::parse_duration(s)?)
1056 110 : }
1057 :
1058 2046 : fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
1059 2046 : where
1060 2046 : T: FromStr,
1061 2046 : <T as FromStr>::Err: std::fmt::Display,
1062 2046 : {
1063 2046 : let v = item
1064 2046 : .as_str()
1065 2046 : .with_context(|| format!("configure option {name} is not a string"))?;
1066 2046 : T::from_str(v).map_err(|e| {
1067 0 : anyhow!(
1068 0 : "Failed to parse string as {parse_type} for configure option {name}: {e}",
1069 0 : parse_type = stringify!(T)
1070 0 : )
1071 2046 : })
1072 2046 : }
1073 :
1074 6 : fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
1075 6 : where
1076 6 : T: serde::de::DeserializeOwned,
1077 6 : {
1078 : // ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
1079 6 : let deserializer = match item.clone().into_value() {
1080 6 : Ok(value) => value.into_deserializer(),
1081 0 : Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
1082 : };
1083 6 : T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
1084 6 : }
1085 :
1086 : /// Configurable semaphore permits setting.
1087 : ///
1088 : /// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
1089 : /// semaphore cannot be distinguished, leading any feature using these to await forever (or until
1090 : /// new permits are added).
1091 0 : #[derive(Debug, Clone)]
1092 : pub struct ConfigurableSemaphore {
1093 : initial_permits: NonZeroUsize,
1094 : inner: std::sync::Arc<tokio::sync::Semaphore>,
1095 : }
1096 :
1097 : impl ConfigurableSemaphore {
1098 : pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
1099 : Some(x) => x,
1100 : None => panic!("const unwrap is not yet stable"),
1101 : };
1102 :
1103 : /// Initializse using a non-zero amount of permits.
1104 : ///
1105 : /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
1106 : /// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
1107 : /// behave like [`futures::future::pending`], just waiting until new permits are added.
1108 : ///
1109 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
1110 3402 : pub fn new(initial_permits: NonZeroUsize) -> Self {
1111 3402 : ConfigurableSemaphore {
1112 3402 : initial_permits,
1113 3402 : inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
1114 3402 : }
1115 3402 : }
1116 :
1117 : /// Returns the configured amount of permits.
1118 624 : pub fn initial_permits(&self) -> NonZeroUsize {
1119 624 : self.initial_permits
1120 624 : }
1121 : }
1122 :
1123 : impl Default for ConfigurableSemaphore {
1124 188 : fn default() -> Self {
1125 188 : Self::new(Self::DEFAULT_INITIAL)
1126 188 : }
1127 : }
1128 :
1129 : impl PartialEq for ConfigurableSemaphore {
1130 12 : fn eq(&self, other: &Self) -> bool {
1131 12 : // the number of permits can be increased at runtime, so we cannot really fulfill the
1132 12 : // PartialEq value equality otherwise
1133 12 : self.initial_permits == other.initial_permits
1134 12 : }
1135 : }
1136 :
1137 : impl Eq for ConfigurableSemaphore {}
1138 :
1139 : impl ConfigurableSemaphore {
1140 300 : pub fn inner(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
1141 300 : &self.inner
1142 300 : }
1143 : }
1144 :
1145 : #[cfg(test)]
1146 : mod tests {
1147 : use std::{
1148 : fs,
1149 : num::{NonZeroU32, NonZeroUsize},
1150 : };
1151 :
1152 : use camino_tempfile::{tempdir, Utf8TempDir};
1153 : use pageserver_api::models::EvictionPolicy;
1154 : use remote_storage::{RemoteStorageKind, S3Config};
1155 : use utils::serde_percent::Percent;
1156 :
1157 : use super::*;
1158 : use crate::DEFAULT_PG_VERSION;
1159 :
1160 : const ALL_BASE_VALUES_TOML: &str = r#"
1161 : # Initial configuration file created by 'pageserver --init'
1162 :
1163 : listen_pg_addr = '127.0.0.1:64000'
1164 : listen_http_addr = '127.0.0.1:9898'
1165 :
1166 : wait_lsn_timeout = '111 s'
1167 : wal_redo_timeout = '111 s'
1168 :
1169 : page_cache_size = 444
1170 : max_file_descriptors = 333
1171 :
1172 : # initial superuser role name to use when creating a new tenant
1173 : initial_superuser_name = 'zzzz'
1174 : id = 10
1175 :
1176 : metric_collection_interval = '222 s'
1177 : cached_metric_collection_interval = '22200 s'
1178 : metric_collection_endpoint = 'http://localhost:80/metrics'
1179 : synthetic_size_calculation_interval = '333 s'
1180 :
1181 : log_format = 'json'
1182 : background_task_maximum_delay = '334 s'
1183 :
1184 : "#;
1185 :
1186 2 : #[test]
1187 2 : fn parse_defaults() -> anyhow::Result<()> {
1188 2 : let tempdir = tempdir()?;
1189 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1190 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1191 2 : // we have to create dummy values to overcome the validation errors
1192 2 : let config_string = format!(
1193 2 : "pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
1194 2 : );
1195 2 : let toml = config_string.parse()?;
1196 :
1197 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1198 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1199 2 :
1200 2 : assert_eq!(
1201 2 : parsed_config,
1202 2 : PageServerConf {
1203 2 : id: NodeId(10),
1204 2 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1205 2 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1206 2 : availability_zone: None,
1207 2 : wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
1208 2 : wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
1209 2 : superuser: defaults::DEFAULT_SUPERUSER.to_string(),
1210 2 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1211 2 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1212 2 : workdir,
1213 2 : pg_distrib_dir,
1214 2 : http_auth_type: AuthType::Trust,
1215 2 : pg_auth_type: AuthType::Trust,
1216 2 : auth_validation_public_key_path: None,
1217 2 : remote_storage_config: None,
1218 2 : default_tenant_conf: TenantConf::default(),
1219 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1220 2 : broker_keepalive_interval: humantime::parse_duration(
1221 2 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL
1222 2 : )?,
1223 2 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1224 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1225 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1226 2 : ),
1227 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1228 2 : eviction_task_immitated_concurrent_logical_size_queries:
1229 2 : ConfigurableSemaphore::default(),
1230 2 : metric_collection_interval: humantime::parse_duration(
1231 2 : defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
1232 2 : )?,
1233 2 : cached_metric_collection_interval: humantime::parse_duration(
1234 2 : defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
1235 2 : )?,
1236 2 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1237 2 : synthetic_size_calculation_interval: humantime::parse_duration(
1238 2 : defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
1239 2 : )?,
1240 2 : disk_usage_based_eviction: None,
1241 2 : test_remote_failures: 0,
1242 2 : ondemand_download_behavior_treat_error_as_warn: false,
1243 2 : background_task_maximum_delay: humantime::parse_duration(
1244 2 : defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
1245 2 : )?,
1246 2 : control_plane_api: None,
1247 2 : control_plane_api_token: None,
1248 2 : control_plane_emergency_mode: false,
1249 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1250 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1251 2 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1252 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1253 : },
1254 0 : "Correct defaults should be used when no config values are provided"
1255 : );
1256 :
1257 2 : Ok(())
1258 2 : }
1259 :
1260 2 : #[test]
1261 2 : fn parse_basic_config() -> anyhow::Result<()> {
1262 2 : let tempdir = tempdir()?;
1263 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1264 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1265 2 :
1266 2 : let config_string = format!(
1267 2 : "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",
1268 2 : );
1269 2 : let toml = config_string.parse()?;
1270 :
1271 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1272 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1273 2 :
1274 2 : assert_eq!(
1275 2 : parsed_config,
1276 2 : PageServerConf {
1277 2 : id: NodeId(10),
1278 2 : listen_pg_addr: "127.0.0.1:64000".to_string(),
1279 2 : listen_http_addr: "127.0.0.1:9898".to_string(),
1280 2 : availability_zone: None,
1281 2 : wait_lsn_timeout: Duration::from_secs(111),
1282 2 : wal_redo_timeout: Duration::from_secs(111),
1283 2 : superuser: "zzzz".to_string(),
1284 2 : page_cache_size: 444,
1285 2 : max_file_descriptors: 333,
1286 2 : workdir,
1287 2 : pg_distrib_dir,
1288 2 : http_auth_type: AuthType::Trust,
1289 2 : pg_auth_type: AuthType::Trust,
1290 2 : auth_validation_public_key_path: None,
1291 2 : remote_storage_config: None,
1292 2 : default_tenant_conf: TenantConf::default(),
1293 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1294 2 : broker_keepalive_interval: Duration::from_secs(5),
1295 2 : log_format: LogFormat::Json,
1296 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1297 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1298 2 : ),
1299 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1300 2 : eviction_task_immitated_concurrent_logical_size_queries:
1301 2 : ConfigurableSemaphore::default(),
1302 2 : metric_collection_interval: Duration::from_secs(222),
1303 2 : cached_metric_collection_interval: Duration::from_secs(22200),
1304 2 : metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
1305 2 : synthetic_size_calculation_interval: Duration::from_secs(333),
1306 2 : disk_usage_based_eviction: None,
1307 2 : test_remote_failures: 0,
1308 2 : ondemand_download_behavior_treat_error_as_warn: false,
1309 2 : background_task_maximum_delay: Duration::from_secs(334),
1310 2 : control_plane_api: None,
1311 2 : control_plane_api_token: None,
1312 2 : control_plane_emergency_mode: false,
1313 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1314 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1315 2 : ingest_batch_size: 100,
1316 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1317 : },
1318 0 : "Should be able to parse all basic config values correctly"
1319 : );
1320 :
1321 2 : Ok(())
1322 2 : }
1323 :
1324 2 : #[test]
1325 2 : fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
1326 2 : let tempdir = tempdir()?;
1327 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1328 2 : let broker_endpoint = "http://127.0.0.1:7777";
1329 2 :
1330 2 : let local_storage_path = tempdir.path().join("local_remote_storage");
1331 2 :
1332 2 : let identical_toml_declarations = &[
1333 2 : format!(
1334 2 : r#"[remote_storage]
1335 2 : local_path = '{local_storage_path}'"#,
1336 2 : ),
1337 2 : format!("remote_storage={{local_path='{local_storage_path}'}}"),
1338 2 : ];
1339 :
1340 6 : for remote_storage_config_str in identical_toml_declarations {
1341 4 : let config_string = format!(
1342 4 : r#"{ALL_BASE_VALUES_TOML}
1343 4 : pg_distrib_dir='{pg_distrib_dir}'
1344 4 : broker_endpoint = '{broker_endpoint}'
1345 4 :
1346 4 : {remote_storage_config_str}"#,
1347 4 : );
1348 :
1349 4 : let toml = config_string.parse()?;
1350 :
1351 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1352 4 : .unwrap_or_else(|e| {
1353 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1354 4 : })
1355 4 : .remote_storage_config
1356 4 : .expect("Should have remote storage config for the local FS");
1357 4 :
1358 4 : assert_eq!(
1359 4 : parsed_remote_storage_config,
1360 4 : RemoteStorageConfig {
1361 4 : storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
1362 4 : },
1363 0 : "Remote storage config should correctly parse the local FS config and fill other storage defaults"
1364 : );
1365 : }
1366 2 : Ok(())
1367 2 : }
1368 :
1369 2 : #[test]
1370 2 : fn parse_remote_s3_storage_config() -> anyhow::Result<()> {
1371 2 : let tempdir = tempdir()?;
1372 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1373 :
1374 2 : let bucket_name = "some-sample-bucket".to_string();
1375 2 : let bucket_region = "eu-north-1".to_string();
1376 2 : let prefix_in_bucket = "test_prefix".to_string();
1377 2 : let endpoint = "http://localhost:5000".to_string();
1378 2 : let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
1379 2 : let max_sync_errors = NonZeroU32::new(222).unwrap();
1380 2 : let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
1381 2 : let broker_endpoint = "http://127.0.0.1:7777";
1382 2 :
1383 2 : let identical_toml_declarations = &[
1384 2 : format!(
1385 2 : r#"[remote_storage]
1386 2 : max_concurrent_syncs = {max_concurrent_syncs}
1387 2 : max_sync_errors = {max_sync_errors}
1388 2 : bucket_name = '{bucket_name}'
1389 2 : bucket_region = '{bucket_region}'
1390 2 : prefix_in_bucket = '{prefix_in_bucket}'
1391 2 : endpoint = '{endpoint}'
1392 2 : concurrency_limit = {s3_concurrency_limit}"#
1393 2 : ),
1394 2 : format!(
1395 2 : "remote_storage={{max_concurrent_syncs={max_concurrent_syncs}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\
1396 2 : bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}",
1397 2 : ),
1398 2 : ];
1399 :
1400 6 : for remote_storage_config_str in identical_toml_declarations {
1401 4 : let config_string = format!(
1402 4 : r#"{ALL_BASE_VALUES_TOML}
1403 4 : pg_distrib_dir='{pg_distrib_dir}'
1404 4 : broker_endpoint = '{broker_endpoint}'
1405 4 :
1406 4 : {remote_storage_config_str}"#,
1407 4 : );
1408 :
1409 4 : let toml = config_string.parse()?;
1410 :
1411 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1412 4 : .unwrap_or_else(|e| {
1413 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1414 4 : })
1415 4 : .remote_storage_config
1416 4 : .expect("Should have remote storage config for S3");
1417 4 :
1418 4 : assert_eq!(
1419 4 : parsed_remote_storage_config,
1420 4 : RemoteStorageConfig {
1421 4 : storage: RemoteStorageKind::AwsS3(S3Config {
1422 4 : bucket_name: bucket_name.clone(),
1423 4 : bucket_region: bucket_region.clone(),
1424 4 : prefix_in_bucket: Some(prefix_in_bucket.clone()),
1425 4 : endpoint: Some(endpoint.clone()),
1426 4 : concurrency_limit: s3_concurrency_limit,
1427 4 : max_keys_per_list_response: None,
1428 4 : }),
1429 4 : },
1430 0 : "Remote storage config should correctly parse the S3 config"
1431 : );
1432 : }
1433 2 : Ok(())
1434 2 : }
1435 :
1436 2 : #[test]
1437 2 : fn parse_tenant_config() -> anyhow::Result<()> {
1438 2 : let tempdir = tempdir()?;
1439 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1440 :
1441 2 : let broker_endpoint = "http://127.0.0.1:7777";
1442 2 : let trace_read_requests = true;
1443 2 :
1444 2 : let config_string = format!(
1445 2 : r#"{ALL_BASE_VALUES_TOML}
1446 2 : pg_distrib_dir='{pg_distrib_dir}'
1447 2 : broker_endpoint = '{broker_endpoint}'
1448 2 :
1449 2 : [tenant_config]
1450 2 : trace_read_requests = {trace_read_requests}"#,
1451 2 : );
1452 :
1453 2 : let toml = config_string.parse()?;
1454 :
1455 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1456 2 : assert_eq!(
1457 : conf.default_tenant_conf.trace_read_requests, trace_read_requests,
1458 0 : "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
1459 : );
1460 :
1461 2 : Ok(())
1462 2 : }
1463 :
1464 2 : #[test]
1465 2 : fn parse_incorrect_tenant_config() -> anyhow::Result<()> {
1466 2 : let config_string = r#"
1467 2 : [tenant_config]
1468 2 : checkpoint_distance = -1 # supposed to be an u64
1469 2 : "#
1470 2 : .to_string();
1471 :
1472 2 : let toml: Document = config_string.parse()?;
1473 2 : let item = toml.get("tenant_config").unwrap();
1474 2 : let error = TenantConfOpt::try_from(item.to_owned()).unwrap_err();
1475 2 :
1476 2 : let expected_error_str = "checkpoint_distance: invalid value: integer `-1`, expected u64";
1477 2 : assert_eq!(error.to_string(), expected_error_str);
1478 :
1479 2 : Ok(())
1480 2 : }
1481 :
1482 2 : #[test]
1483 2 : fn parse_override_tenant_config() -> anyhow::Result<()> {
1484 2 : let config_string = r#"tenant_config={ min_resident_size_override = 400 }"#.to_string();
1485 :
1486 2 : let toml: Document = config_string.parse()?;
1487 2 : let item = toml.get("tenant_config").unwrap();
1488 2 : let conf = TenantConfOpt::try_from(item.to_owned()).unwrap();
1489 2 :
1490 2 : assert_eq!(conf.min_resident_size_override, Some(400));
1491 :
1492 2 : Ok(())
1493 2 : }
1494 :
1495 2 : #[test]
1496 2 : fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
1497 2 : let tempdir = tempdir()?;
1498 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1499 :
1500 2 : let pageserver_conf_toml = format!(
1501 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1502 2 : metric_collection_endpoint = "http://sample.url"
1503 2 : metric_collection_interval = "10min"
1504 2 : id = 222
1505 2 :
1506 2 : [disk_usage_based_eviction]
1507 2 : max_usage_pct = 80
1508 2 : min_avail_bytes = 0
1509 2 : period = "10s"
1510 2 :
1511 2 : [tenant_config]
1512 2 : evictions_low_residence_duration_metric_threshold = "20m"
1513 2 :
1514 2 : [tenant_config.eviction_policy]
1515 2 : kind = "LayerAccessThreshold"
1516 2 : period = "20m"
1517 2 : threshold = "20m"
1518 2 : "#,
1519 2 : );
1520 2 : let toml: Document = pageserver_conf_toml.parse()?;
1521 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1522 :
1523 2 : assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
1524 2 : assert_eq!(
1525 2 : conf.metric_collection_endpoint,
1526 2 : Some("http://sample.url".parse().unwrap())
1527 2 : );
1528 2 : assert_eq!(
1529 2 : conf.metric_collection_interval,
1530 2 : Duration::from_secs(10 * 60)
1531 2 : );
1532 2 : assert_eq!(
1533 2 : conf.default_tenant_conf
1534 2 : .evictions_low_residence_duration_metric_threshold,
1535 2 : Duration::from_secs(20 * 60)
1536 2 : );
1537 2 : assert_eq!(conf.id, NodeId(222));
1538 2 : assert_eq!(
1539 2 : conf.disk_usage_based_eviction,
1540 2 : Some(DiskUsageEvictionTaskConfig {
1541 2 : max_usage_pct: Percent::new(80).unwrap(),
1542 2 : min_avail_bytes: 0,
1543 2 : period: Duration::from_secs(10),
1544 2 : #[cfg(feature = "testing")]
1545 2 : mock_statvfs: None,
1546 2 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder::AbsoluteAccessed,
1547 2 : })
1548 2 : );
1549 2 : match &conf.default_tenant_conf.eviction_policy {
1550 0 : EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
1551 2 : EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
1552 2 : assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
1553 2 : assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
1554 : }
1555 : }
1556 :
1557 2 : Ok(())
1558 2 : }
1559 :
1560 12 : fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
1561 12 : let tempdir_path = tempdir.path();
1562 12 :
1563 12 : let workdir = tempdir_path.join("workdir");
1564 12 : fs::create_dir_all(&workdir)?;
1565 :
1566 12 : let pg_distrib_dir = tempdir_path.join("pg_distrib");
1567 12 : let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
1568 12 : fs::create_dir_all(&pg_distrib_dir_versioned)?;
1569 12 : let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
1570 12 : fs::create_dir_all(&postgres_bin_dir)?;
1571 12 : fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;
1572 :
1573 12 : Ok((workdir, pg_distrib_dir))
1574 12 : }
1575 : }
|