Line data Source code
1 : //! Functions for handling page server configuration options
2 : //!
3 : //! Configuration options can be set in the pageserver.toml configuration
4 : //! file, or on the command line.
5 : //! See also `settings.md` for better description on every parameter.
6 :
7 : use anyhow::{anyhow, bail, ensure, Context, Result};
8 : use remote_storage::{RemotePath, RemoteStorageConfig};
9 : use serde::de::IntoDeserializer;
10 : use std::env;
11 : use storage_broker::Uri;
12 : use utils::crashsafe::path_with_suffix_extension;
13 : use utils::id::ConnectionId;
14 :
15 : use once_cell::sync::OnceCell;
16 : use reqwest::Url;
17 : use std::num::NonZeroUsize;
18 : use std::path::{Path, PathBuf};
19 : use std::str::FromStr;
20 : use std::sync::Arc;
21 : use std::time::Duration;
22 : use toml_edit;
23 : use toml_edit::{Document, Item};
24 :
25 : use postgres_backend::AuthType;
26 : use utils::{
27 : id::{NodeId, TenantId, TimelineId},
28 : logging::LogFormat,
29 : };
30 :
31 : use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
32 : use crate::tenant::config::TenantConf;
33 : use crate::tenant::config::TenantConfOpt;
34 : use crate::tenant::{
35 : TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME,
36 : TIMELINES_SEGMENT_NAME,
37 : };
38 : use crate::{
39 : IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
40 : TIMELINE_UNINIT_MARK_SUFFIX,
41 : };
42 :
43 : pub mod defaults {
44 : use crate::tenant::config::defaults::*;
45 : use const_format::formatcp;
46 :
47 : pub use pageserver_api::{
48 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
49 : DEFAULT_PG_LISTEN_PORT,
50 : };
51 : pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
52 :
53 : pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
54 : pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
55 :
56 : pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
57 :
58 : pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
59 : pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
60 :
61 : pub const DEFAULT_LOG_FORMAT: &str = "plain";
62 :
63 : pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
64 : super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
65 :
66 : pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
67 : pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
68 : pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
69 : pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
70 : pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
71 :
72 : ///
73 : /// Default built-in configuration file.
74 : ///
75 : pub const DEFAULT_CONFIG_FILE: &str = formatcp!(
76 : r###"
77 : # Initial configuration file created by 'pageserver --init'
78 : #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
79 : #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
80 :
81 : #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
82 : #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
83 :
84 : #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
85 :
86 : # initial superuser role name to use when creating a new tenant
87 : #initial_superuser_name = '{DEFAULT_SUPERUSER}'
88 :
89 : #broker_endpoint = '{BROKER_DEFAULT_ENDPOINT}'
90 :
91 : #log_format = '{DEFAULT_LOG_FORMAT}'
92 :
93 : #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
94 :
95 : #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
96 : #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
97 : #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
98 :
99 : #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
100 :
101 : #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
102 :
103 : [tenant_config]
104 : #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
105 : #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
106 : #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
107 : #compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
108 : #compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
109 :
110 : #gc_period = '{DEFAULT_GC_PERIOD}'
111 : #gc_horizon = {DEFAULT_GC_HORIZON}
112 : #image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD}
113 : #pitr_interval = '{DEFAULT_PITR_INTERVAL}'
114 :
115 : #min_resident_size_override = .. # in bytes
116 : #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
117 : #gc_feedback = false
118 :
119 : [remote_storage]
120 :
121 : "###
122 : );
123 : }
124 :
125 2 : #[derive(Debug, Clone, PartialEq, Eq)]
126 : pub struct PageServerConf {
127 : // Identifier of that particular pageserver so e g safekeepers
128 : // can safely distinguish different pageservers
129 : pub id: NodeId,
130 :
131 : /// Example (default): 127.0.0.1:64000
132 : pub listen_pg_addr: String,
133 : /// Example (default): 127.0.0.1:9898
134 : pub listen_http_addr: String,
135 :
136 : /// Current availability zone. Used for traffic metrics.
137 : pub availability_zone: Option<String>,
138 :
139 : // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
140 : pub wait_lsn_timeout: Duration,
141 : // How long to wait for WAL redo to complete.
142 : pub wal_redo_timeout: Duration,
143 :
144 : pub superuser: String,
145 :
146 : pub page_cache_size: usize,
147 : pub max_file_descriptors: usize,
148 :
149 : // Repository directory, relative to current working directory.
150 : // Normally, the page server changes the current working directory
151 : // to the repository, and 'workdir' is always '.'. But we don't do
152 : // that during unit testing, because the current directory is global
153 : // to the process but different unit tests work on different
154 : // repositories.
155 : pub workdir: PathBuf,
156 :
157 : pub pg_distrib_dir: PathBuf,
158 :
159 : // Authentication
160 : /// authentication method for the HTTP mgmt API
161 : pub http_auth_type: AuthType,
162 : /// authentication method for libpq connections from compute
163 : pub pg_auth_type: AuthType,
164 : /// Path to a file containing public key for verifying JWT tokens.
165 : /// Used for both mgmt and compute auth, if enabled.
166 : pub auth_validation_public_key_path: Option<PathBuf>,
167 :
168 : pub remote_storage_config: Option<RemoteStorageConfig>,
169 :
170 : pub default_tenant_conf: TenantConf,
171 :
172 : /// Storage broker endpoints to connect to.
173 : pub broker_endpoint: Uri,
174 : pub broker_keepalive_interval: Duration,
175 :
176 : pub log_format: LogFormat,
177 :
178 : /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
179 : pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
180 : /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
181 : /// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
182 : /// See the comment in `eviction_task` for details.
183 : ///
184 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
185 : pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
186 :
187 : // How often to collect metrics and send them to the metrics endpoint.
188 : pub metric_collection_interval: Duration,
189 : // How often to send unchanged cached metrics to the metrics endpoint.
190 : pub cached_metric_collection_interval: Duration,
191 : pub metric_collection_endpoint: Option<Url>,
192 : pub synthetic_size_calculation_interval: Duration,
193 :
194 : pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
195 :
196 : pub test_remote_failures: u64,
197 :
198 : pub ondemand_download_behavior_treat_error_as_warn: bool,
199 :
200 : /// How long will background tasks be delayed at most after initial load of tenants.
201 : ///
202 : /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
203 : /// as we now isolate initial loading, initial logical size calculation and background tasks.
204 : /// Smaller nodes will have background tasks "not running" for this long unless every timeline
205 : /// has it's initial logical size calculated. Not running background tasks for some seconds is
206 : /// not terrible.
207 : pub background_task_maximum_delay: Duration,
208 : }
209 :
210 : /// We do not want to store this in a PageServerConf because the latter may be logged
211 : /// and/or serialized at a whim, while the token is secret. Currently this token is the
212 : /// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
213 : /// the future, more tokens and auth may arrive for storage broker, completely changing the logic.
214 : /// Hence, we resort to a global variable for now instead of passing the token from the
215 : /// startup code to the connection code through a dozen layers.
216 : pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
217 :
218 : // use dedicated enum for builder to better indicate the intention
219 : // and avoid possible confusion with nested options
220 : pub enum BuilderValue<T> {
221 : Set(T),
222 : NotSet,
223 : }
224 :
225 : impl<T> BuilderValue<T> {
226 25747 : pub fn ok_or<E>(self, err: E) -> Result<T, E> {
227 25747 : match self {
228 25746 : Self::Set(v) => Ok(v),
229 1 : Self::NotSet => Err(err),
230 : }
231 25747 : }
232 : }
233 :
234 : // needed to simplify config construction
235 : struct PageServerConfigBuilder {
236 : listen_pg_addr: BuilderValue<String>,
237 :
238 : listen_http_addr: BuilderValue<String>,
239 :
240 : availability_zone: BuilderValue<Option<String>>,
241 :
242 : wait_lsn_timeout: BuilderValue<Duration>,
243 : wal_redo_timeout: BuilderValue<Duration>,
244 :
245 : superuser: BuilderValue<String>,
246 :
247 : page_cache_size: BuilderValue<usize>,
248 : max_file_descriptors: BuilderValue<usize>,
249 :
250 : workdir: BuilderValue<PathBuf>,
251 :
252 : pg_distrib_dir: BuilderValue<PathBuf>,
253 :
254 : http_auth_type: BuilderValue<AuthType>,
255 : pg_auth_type: BuilderValue<AuthType>,
256 :
257 : //
258 : auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
259 : remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
260 :
261 : id: BuilderValue<NodeId>,
262 :
263 : broker_endpoint: BuilderValue<Uri>,
264 : broker_keepalive_interval: BuilderValue<Duration>,
265 :
266 : log_format: BuilderValue<LogFormat>,
267 :
268 : concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
269 :
270 : metric_collection_interval: BuilderValue<Duration>,
271 : cached_metric_collection_interval: BuilderValue<Duration>,
272 : metric_collection_endpoint: BuilderValue<Option<Url>>,
273 : synthetic_size_calculation_interval: BuilderValue<Duration>,
274 :
275 : disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
276 :
277 : test_remote_failures: BuilderValue<u64>,
278 :
279 : ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
280 :
281 : background_task_maximum_delay: BuilderValue<Duration>,
282 : }
283 :
284 : impl Default for PageServerConfigBuilder {
285 954 : fn default() -> Self {
286 954 : use self::BuilderValue::*;
287 954 : use defaults::*;
288 954 : Self {
289 954 : listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
290 954 : listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
291 954 : availability_zone: Set(None),
292 954 : wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
293 954 : .expect("cannot parse default wait lsn timeout")),
294 954 : wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
295 954 : .expect("cannot parse default wal redo timeout")),
296 954 : superuser: Set(DEFAULT_SUPERUSER.to_string()),
297 954 : page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
298 954 : max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
299 954 : workdir: Set(PathBuf::new()),
300 954 : pg_distrib_dir: Set(env::current_dir()
301 954 : .expect("cannot access current directory")
302 954 : .join("pg_install")),
303 954 : http_auth_type: Set(AuthType::Trust),
304 954 : pg_auth_type: Set(AuthType::Trust),
305 954 : auth_validation_public_key_path: Set(None),
306 954 : remote_storage_config: Set(None),
307 954 : id: NotSet,
308 954 : broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
309 954 : .parse()
310 954 : .expect("failed to parse default broker endpoint")),
311 954 : broker_keepalive_interval: Set(humantime::parse_duration(
312 954 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
313 954 : )
314 954 : .expect("cannot parse default keepalive interval")),
315 954 : log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
316 954 :
317 954 : concurrent_tenant_size_logical_size_queries: Set(
318 954 : ConfigurableSemaphore::DEFAULT_INITIAL,
319 954 : ),
320 954 : metric_collection_interval: Set(humantime::parse_duration(
321 954 : DEFAULT_METRIC_COLLECTION_INTERVAL,
322 954 : )
323 954 : .expect("cannot parse default metric collection interval")),
324 954 : cached_metric_collection_interval: Set(humantime::parse_duration(
325 954 : DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
326 954 : )
327 954 : .expect("cannot parse default cached_metric_collection_interval")),
328 954 : synthetic_size_calculation_interval: Set(humantime::parse_duration(
329 954 : DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
330 954 : )
331 954 : .expect("cannot parse default synthetic size calculation interval")),
332 954 : metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
333 954 :
334 954 : disk_usage_based_eviction: Set(None),
335 954 :
336 954 : test_remote_failures: Set(0),
337 954 :
338 954 : ondemand_download_behavior_treat_error_as_warn: Set(false),
339 954 :
340 954 : background_task_maximum_delay: Set(humantime::parse_duration(
341 954 : DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
342 954 : )
343 954 : .unwrap()),
344 954 : }
345 954 : }
346 : }
347 :
348 : impl PageServerConfigBuilder {
349 950 : pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
350 950 : self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
351 950 : }
352 :
353 950 : pub fn listen_http_addr(&mut self, listen_http_addr: String) {
354 950 : self.listen_http_addr = BuilderValue::Set(listen_http_addr)
355 950 : }
356 :
357 2 : pub fn availability_zone(&mut self, availability_zone: Option<String>) {
358 2 : self.availability_zone = BuilderValue::Set(availability_zone)
359 2 : }
360 :
361 14 : pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
362 14 : self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
363 14 : }
364 :
365 6 : pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
366 6 : self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
367 6 : }
368 :
369 6 : pub fn superuser(&mut self, superuser: String) {
370 6 : self.superuser = BuilderValue::Set(superuser)
371 6 : }
372 :
373 10 : pub fn page_cache_size(&mut self, page_cache_size: usize) {
374 10 : self.page_cache_size = BuilderValue::Set(page_cache_size)
375 10 : }
376 :
377 6 : pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
378 6 : self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
379 6 : }
380 :
381 954 : pub fn workdir(&mut self, workdir: PathBuf) {
382 954 : self.workdir = BuilderValue::Set(workdir)
383 954 : }
384 :
385 954 : pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) {
386 954 : self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
387 954 : }
388 :
389 944 : pub fn http_auth_type(&mut self, auth_type: AuthType) {
390 944 : self.http_auth_type = BuilderValue::Set(auth_type)
391 944 : }
392 :
393 944 : pub fn pg_auth_type(&mut self, auth_type: AuthType) {
394 944 : self.pg_auth_type = BuilderValue::Set(auth_type)
395 944 : }
396 :
397 18 : pub fn auth_validation_public_key_path(
398 18 : &mut self,
399 18 : auth_validation_public_key_path: Option<PathBuf>,
400 18 : ) {
401 18 : self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
402 18 : }
403 :
404 950 : pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
405 950 : self.remote_storage_config = BuilderValue::Set(remote_storage_config)
406 950 : }
407 :
408 951 : pub fn broker_endpoint(&mut self, broker_endpoint: Uri) {
409 951 : self.broker_endpoint = BuilderValue::Set(broker_endpoint)
410 951 : }
411 :
412 0 : pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
413 0 : self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
414 0 : }
415 :
416 953 : pub fn id(&mut self, node_id: NodeId) {
417 953 : self.id = BuilderValue::Set(node_id)
418 953 : }
419 :
420 6 : pub fn log_format(&mut self, log_format: LogFormat) {
421 6 : self.log_format = BuilderValue::Set(log_format)
422 6 : }
423 :
424 0 : pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
425 0 : self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
426 0 : }
427 :
428 14 : pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) {
429 14 : self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
430 14 : }
431 :
432 6 : pub fn cached_metric_collection_interval(
433 6 : &mut self,
434 6 : cached_metric_collection_interval: Duration,
435 6 : ) {
436 6 : self.cached_metric_collection_interval =
437 6 : BuilderValue::Set(cached_metric_collection_interval)
438 6 : }
439 :
440 14 : pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
441 14 : self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
442 14 : }
443 :
444 9 : pub fn synthetic_size_calculation_interval(
445 9 : &mut self,
446 9 : synthetic_size_calculation_interval: Duration,
447 9 : ) {
448 9 : self.synthetic_size_calculation_interval =
449 9 : BuilderValue::Set(synthetic_size_calculation_interval)
450 9 : }
451 :
452 94 : pub fn test_remote_failures(&mut self, fail_first: u64) {
453 94 : self.test_remote_failures = BuilderValue::Set(fail_first);
454 94 : }
455 :
456 4 : pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
457 4 : self.disk_usage_based_eviction = BuilderValue::Set(value);
458 4 : }
459 :
460 0 : pub fn ondemand_download_behavior_treat_error_as_warn(
461 0 : &mut self,
462 0 : ondemand_download_behavior_treat_error_as_warn: bool,
463 0 : ) {
464 0 : self.ondemand_download_behavior_treat_error_as_warn =
465 0 : BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
466 0 : }
467 :
468 9 : pub fn background_task_maximum_delay(&mut self, delay: Duration) {
469 9 : self.background_task_maximum_delay = BuilderValue::Set(delay);
470 9 : }
471 :
472 954 : pub fn build(self) -> anyhow::Result<PageServerConf> {
473 954 : let concurrent_tenant_size_logical_size_queries = self
474 954 : .concurrent_tenant_size_logical_size_queries
475 954 : .ok_or(anyhow!(
476 954 : "missing concurrent_tenant_size_logical_size_queries"
477 954 : ))?;
478 : Ok(PageServerConf {
479 954 : listen_pg_addr: self
480 954 : .listen_pg_addr
481 954 : .ok_or(anyhow!("missing listen_pg_addr"))?,
482 954 : listen_http_addr: self
483 954 : .listen_http_addr
484 954 : .ok_or(anyhow!("missing listen_http_addr"))?,
485 954 : availability_zone: self
486 954 : .availability_zone
487 954 : .ok_or(anyhow!("missing availability_zone"))?,
488 954 : wait_lsn_timeout: self
489 954 : .wait_lsn_timeout
490 954 : .ok_or(anyhow!("missing wait_lsn_timeout"))?,
491 954 : wal_redo_timeout: self
492 954 : .wal_redo_timeout
493 954 : .ok_or(anyhow!("missing wal_redo_timeout"))?,
494 954 : superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
495 954 : page_cache_size: self
496 954 : .page_cache_size
497 954 : .ok_or(anyhow!("missing page_cache_size"))?,
498 954 : max_file_descriptors: self
499 954 : .max_file_descriptors
500 954 : .ok_or(anyhow!("missing max_file_descriptors"))?,
501 954 : workdir: self.workdir.ok_or(anyhow!("missing workdir"))?,
502 954 : pg_distrib_dir: self
503 954 : .pg_distrib_dir
504 954 : .ok_or(anyhow!("missing pg_distrib_dir"))?,
505 954 : http_auth_type: self
506 954 : .http_auth_type
507 954 : .ok_or(anyhow!("missing http_auth_type"))?,
508 954 : pg_auth_type: self.pg_auth_type.ok_or(anyhow!("missing pg_auth_type"))?,
509 954 : auth_validation_public_key_path: self
510 954 : .auth_validation_public_key_path
511 954 : .ok_or(anyhow!("missing auth_validation_public_key_path"))?,
512 954 : remote_storage_config: self
513 954 : .remote_storage_config
514 954 : .ok_or(anyhow!("missing remote_storage_config"))?,
515 954 : id: self.id.ok_or(anyhow!("missing id"))?,
516 : // TenantConf is handled separately
517 953 : default_tenant_conf: TenantConf::default(),
518 953 : broker_endpoint: self
519 953 : .broker_endpoint
520 953 : .ok_or(anyhow!("No broker endpoints provided"))?,
521 953 : broker_keepalive_interval: self
522 953 : .broker_keepalive_interval
523 953 : .ok_or(anyhow!("No broker keepalive interval provided"))?,
524 953 : log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
525 953 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
526 953 : concurrent_tenant_size_logical_size_queries,
527 953 : ),
528 953 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
529 953 : concurrent_tenant_size_logical_size_queries,
530 953 : ),
531 953 : metric_collection_interval: self
532 953 : .metric_collection_interval
533 953 : .ok_or(anyhow!("missing metric_collection_interval"))?,
534 953 : cached_metric_collection_interval: self
535 953 : .cached_metric_collection_interval
536 953 : .ok_or(anyhow!("missing cached_metric_collection_interval"))?,
537 953 : metric_collection_endpoint: self
538 953 : .metric_collection_endpoint
539 953 : .ok_or(anyhow!("missing metric_collection_endpoint"))?,
540 953 : synthetic_size_calculation_interval: self
541 953 : .synthetic_size_calculation_interval
542 953 : .ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
543 953 : disk_usage_based_eviction: self
544 953 : .disk_usage_based_eviction
545 953 : .ok_or(anyhow!("missing disk_usage_based_eviction"))?,
546 953 : test_remote_failures: self
547 953 : .test_remote_failures
548 953 : .ok_or(anyhow!("missing test_remote_failuers"))?,
549 953 : ondemand_download_behavior_treat_error_as_warn: self
550 953 : .ondemand_download_behavior_treat_error_as_warn
551 953 : .ok_or(anyhow!(
552 953 : "missing ondemand_download_behavior_treat_error_as_warn"
553 953 : ))?,
554 953 : background_task_maximum_delay: self
555 953 : .background_task_maximum_delay
556 953 : .ok_or(anyhow!("missing background_task_maximum_delay"))?,
557 : })
558 954 : }
559 : }
560 :
561 : impl PageServerConf {
562 : //
563 : // Repository paths, relative to workdir.
564 : //
565 :
566 143061 : pub fn tenants_path(&self) -> PathBuf {
567 143061 : self.workdir.join(TENANTS_SEGMENT_NAME)
568 143061 : }
569 :
570 141539 : pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf {
571 141539 : self.tenants_path().join(tenant_id.to_string())
572 141539 : }
573 :
574 820 : pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
575 820 : self.tenant_path(tenant_id)
576 820 : .join(TENANT_ATTACHING_MARKER_FILENAME)
577 820 : }
578 :
579 1493 : pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
580 1493 : self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
581 1493 : }
582 :
583 : /// Points to a place in pageserver's local directory,
584 : /// where certain tenant's tenantconf file should be located.
585 1383 : pub fn tenant_config_path(&self, tenant_id: &TenantId) -> PathBuf {
586 1383 : self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
587 1383 : }
588 :
589 135117 : pub fn timelines_path(&self, tenant_id: &TenantId) -> PathBuf {
590 135117 : self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
591 135117 : }
592 :
593 132701 : pub fn timeline_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> PathBuf {
594 132701 : self.timelines_path(tenant_id).join(timeline_id.to_string())
595 132701 : }
596 :
597 1356 : pub fn timeline_uninit_mark_file_path(
598 1356 : &self,
599 1356 : tenant_id: TenantId,
600 1356 : timeline_id: TimelineId,
601 1356 : ) -> PathBuf {
602 1356 : path_with_suffix_extension(
603 1356 : self.timeline_path(&tenant_id, &timeline_id),
604 1356 : TIMELINE_UNINIT_MARK_SUFFIX,
605 1356 : )
606 1356 : }
607 :
608 880 : pub fn timeline_delete_mark_file_path(
609 880 : &self,
610 880 : tenant_id: TenantId,
611 880 : timeline_id: TimelineId,
612 880 : ) -> PathBuf {
613 880 : path_with_suffix_extension(
614 880 : self.timeline_path(&tenant_id, &timeline_id),
615 880 : TIMELINE_DELETE_MARK_SUFFIX,
616 880 : )
617 880 : }
618 :
619 935 : pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
620 935 : self.tenant_path(tenant_id)
621 935 : .join(TENANT_DELETED_MARKER_FILE_NAME)
622 935 : }
623 :
624 4 : pub fn traces_path(&self) -> PathBuf {
625 4 : self.workdir.join("traces")
626 4 : }
627 :
628 4 : pub fn trace_path(
629 4 : &self,
630 4 : tenant_id: &TenantId,
631 4 : timeline_id: &TimelineId,
632 4 : connection_id: &ConnectionId,
633 4 : ) -> PathBuf {
634 4 : self.traces_path()
635 4 : .join(tenant_id.to_string())
636 4 : .join(timeline_id.to_string())
637 4 : .join(connection_id.to_string())
638 4 : }
639 :
640 : /// Points to a place in pageserver's local directory,
641 : /// where certain timeline's metadata file should be located.
642 8486 : pub fn metadata_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> PathBuf {
643 8486 : self.timeline_path(tenant_id, timeline_id)
644 8486 : .join(METADATA_FILE_NAME)
645 8486 : }
646 :
647 : /// Turns storage remote path of a file into its local path.
648 0 : pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf {
649 0 : remote_path.with_base(&self.workdir)
650 0 : }
651 :
652 : //
653 : // Postgres distribution paths
654 : //
655 2092 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
656 2092 : let path = self.pg_distrib_dir.clone();
657 2092 :
658 2092 : match pg_version {
659 2092 : 14 => Ok(path.join(format!("v{pg_version}"))),
660 0 : 15 => Ok(path.join(format!("v{pg_version}"))),
661 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
662 : }
663 2092 : }
664 :
665 1046 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
666 1046 : match pg_version {
667 1046 : 14 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
668 0 : 15 => Ok(self.pg_distrib_dir(pg_version)?.join("bin")),
669 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
670 : }
671 1046 : }
672 1046 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
673 1046 : match pg_version {
674 1046 : 14 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
675 0 : 15 => Ok(self.pg_distrib_dir(pg_version)?.join("lib")),
676 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
677 : }
678 1046 : }
679 :
680 : /// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
681 : /// validating the input and failing on errors.
682 : ///
683 : /// This leaves any options not present in the file in the built-in defaults.
684 954 : pub fn parse_and_validate(toml: &Document, workdir: &Path) -> anyhow::Result<Self> {
685 954 : let mut builder = PageServerConfigBuilder::default();
686 954 : builder.workdir(workdir.to_owned());
687 954 :
688 954 : let mut t_conf = TenantConfOpt::default();
689 :
690 8762 : for (key, item) in toml.iter() {
691 8762 : match key {
692 8762 : "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
693 7812 : "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
694 6862 : "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
695 6860 : "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
696 6846 : "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
697 6840 : "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
698 6834 : "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
699 6824 : "max_file_descriptors" => {
700 6 : builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
701 : }
702 6818 : "pg_distrib_dir" => {
703 954 : builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?))
704 : }
705 5864 : "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
706 18 : PathBuf::from(parse_toml_string(key, item)?),
707 : )),
708 5846 : "http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
709 4902 : "pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
710 3958 : "remote_storage" => {
711 950 : builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
712 : }
713 3008 : "tenant_config" => {
714 948 : t_conf = Self::parse_toml_tenant_conf(item)?;
715 : }
716 2060 : "id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
717 1107 : "broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
718 156 : "broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
719 156 : "log_format" => builder.log_format(
720 6 : LogFormat::from_config(&parse_toml_string(key, item)?)?
721 : ),
722 150 : "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
723 0 : let input = parse_toml_string(key, item)?;
724 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
725 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
726 : }),
727 150 : "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
728 136 : "cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
729 130 : "metric_collection_endpoint" => {
730 14 : let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
731 14 : builder.metric_collection_endpoint(Some(endpoint));
732 : },
733 116 : "synthetic_size_calculation_interval" =>
734 9 : builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
735 107 : "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
736 13 : "disk_usage_based_eviction" => {
737 4 : tracing::info!("disk_usage_based_eviction: {:#?}", &item);
738 : builder.disk_usage_based_eviction(
739 4 : deserialize_from_item("disk_usage_based_eviction", item)
740 4 : .context("parse disk_usage_based_eviction")?
741 : )
742 : },
743 9 : "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
744 9 : "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
745 0 : _ => bail!("unrecognized pageserver option '{key}'"),
746 : }
747 : }
748 :
749 954 : let mut conf = builder.build().context("invalid config")?;
750 :
751 953 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
752 18 : let auth_validation_public_key_path = conf
753 18 : .auth_validation_public_key_path
754 18 : .get_or_insert_with(|| workdir.join("auth_public_key.pem"));
755 18 : ensure!(
756 18 : auth_validation_public_key_path.exists(),
757 0 : format!(
758 0 : "Can't find auth_validation_public_key at '{}'",
759 0 : auth_validation_public_key_path.display()
760 0 : )
761 : );
762 935 : }
763 :
764 953 : conf.default_tenant_conf = t_conf.merge(TenantConf::default());
765 953 :
766 953 : Ok(conf)
767 954 : }
768 :
769 : // subroutine of parse_and_validate to parse `[tenant_conf]` section
770 :
771 1681 : pub fn parse_toml_tenant_conf(item: &toml_edit::Item) -> Result<TenantConfOpt> {
772 1681 : let mut t_conf: TenantConfOpt = Default::default();
773 1681 : if let Some(checkpoint_distance) = item.get("checkpoint_distance") {
774 : t_conf.checkpoint_distance =
775 291 : Some(parse_toml_u64("checkpoint_distance", checkpoint_distance)?);
776 1390 : }
777 :
778 1681 : if let Some(checkpoint_timeout) = item.get("checkpoint_timeout") {
779 2 : t_conf.checkpoint_timeout = Some(parse_toml_duration(
780 2 : "checkpoint_timeout",
781 2 : checkpoint_timeout,
782 2 : )?);
783 1679 : }
784 :
785 1681 : if let Some(compaction_target_size) = item.get("compaction_target_size") {
786 53 : t_conf.compaction_target_size = Some(parse_toml_u64(
787 53 : "compaction_target_size",
788 53 : compaction_target_size,
789 53 : )?);
790 1628 : }
791 :
792 1681 : if let Some(compaction_period) = item.get("compaction_period") {
793 : t_conf.compaction_period =
794 262 : Some(parse_toml_duration("compaction_period", compaction_period)?);
795 1419 : }
796 :
797 1681 : if let Some(compaction_threshold) = item.get("compaction_threshold") {
798 : t_conf.compaction_threshold =
799 32 : Some(parse_toml_u64("compaction_threshold", compaction_threshold)?.try_into()?);
800 1649 : }
801 :
802 1681 : if let Some(image_creation_threshold) = item.get("image_creation_threshold") {
803 : t_conf.image_creation_threshold = Some(
804 236 : parse_toml_u64("image_creation_threshold", image_creation_threshold)?.try_into()?,
805 : );
806 1445 : }
807 :
808 1681 : if let Some(gc_horizon) = item.get("gc_horizon") {
809 24 : t_conf.gc_horizon = Some(parse_toml_u64("gc_horizon", gc_horizon)?);
810 1657 : }
811 :
812 1681 : if let Some(gc_period) = item.get("gc_period") {
813 262 : t_conf.gc_period = Some(parse_toml_duration("gc_period", gc_period)?);
814 1419 : }
815 :
816 1681 : if let Some(pitr_interval) = item.get("pitr_interval") {
817 41 : t_conf.pitr_interval = Some(parse_toml_duration("pitr_interval", pitr_interval)?);
818 1640 : }
819 1681 : if let Some(walreceiver_connect_timeout) = item.get("walreceiver_connect_timeout") {
820 3 : t_conf.walreceiver_connect_timeout = Some(parse_toml_duration(
821 3 : "walreceiver_connect_timeout",
822 3 : walreceiver_connect_timeout,
823 3 : )?);
824 1678 : }
825 1681 : if let Some(lagging_wal_timeout) = item.get("lagging_wal_timeout") {
826 3 : t_conf.lagging_wal_timeout = Some(parse_toml_duration(
827 3 : "lagging_wal_timeout",
828 3 : lagging_wal_timeout,
829 3 : )?);
830 1678 : }
831 1681 : if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") {
832 : t_conf.max_lsn_wal_lag =
833 2 : Some(deserialize_from_item("max_lsn_wal_lag", max_lsn_wal_lag)?);
834 1679 : }
835 1681 : if let Some(trace_read_requests) = item.get("trace_read_requests") {
836 : t_conf.trace_read_requests =
837 3 : Some(trace_read_requests.as_bool().with_context(|| {
838 0 : "configure option trace_read_requests is not a bool".to_string()
839 3 : })?);
840 1678 : }
841 :
842 1681 : if let Some(eviction_policy) = item.get("eviction_policy") {
843 : t_conf.eviction_policy = Some(
844 9 : deserialize_from_item("eviction_policy", eviction_policy)
845 9 : .context("parse eviction_policy")?,
846 : );
847 1672 : }
848 :
849 1681 : if let Some(item) = item.get("min_resident_size_override") {
850 : t_conf.min_resident_size_override = Some(
851 4 : deserialize_from_item("min_resident_size_override", item)
852 4 : .context("parse min_resident_size_override")?,
853 : );
854 1677 : }
855 :
856 1681 : if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
857 8 : t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
858 8 : "evictions_low_residence_duration_metric_threshold",
859 8 : item,
860 8 : )?);
861 1673 : }
862 :
863 1681 : if let Some(gc_feedback) = item.get("gc_feedback") {
864 : t_conf.gc_feedback = Some(
865 1 : gc_feedback
866 1 : .as_bool()
867 1 : .with_context(|| "configure option gc_feedback is not a bool".to_string())?,
868 : );
869 1680 : }
870 :
871 1681 : Ok(t_conf)
872 1681 : }
873 :
874 : #[cfg(test)]
875 40 : pub fn test_repo_dir(test_name: &str) -> PathBuf {
876 40 : PathBuf::from(format!("../tmp_check/test_{test_name}"))
877 40 : }
878 :
879 39 : pub fn dummy_conf(repo_dir: PathBuf) -> Self {
880 39 : let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
881 39 :
882 39 : PageServerConf {
883 39 : id: NodeId(0),
884 39 : wait_lsn_timeout: Duration::from_secs(60),
885 39 : wal_redo_timeout: Duration::from_secs(60),
886 39 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
887 39 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
888 39 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
889 39 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
890 39 : availability_zone: None,
891 39 : superuser: "cloud_admin".to_string(),
892 39 : workdir: repo_dir,
893 39 : pg_distrib_dir,
894 39 : http_auth_type: AuthType::Trust,
895 39 : pg_auth_type: AuthType::Trust,
896 39 : auth_validation_public_key_path: None,
897 39 : remote_storage_config: None,
898 39 : default_tenant_conf: TenantConf::default(),
899 39 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
900 39 : broker_keepalive_interval: Duration::from_secs(5000),
901 39 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
902 39 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
903 39 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
904 39 : ),
905 39 : metric_collection_interval: Duration::from_secs(60),
906 39 : cached_metric_collection_interval: Duration::from_secs(60 * 60),
907 39 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
908 39 : synthetic_size_calculation_interval: Duration::from_secs(60),
909 39 : disk_usage_based_eviction: None,
910 39 : test_remote_failures: 0,
911 39 : ondemand_download_behavior_treat_error_as_warn: false,
912 39 : background_task_maximum_delay: Duration::ZERO,
913 39 : }
914 39 : }
915 : }
916 :
917 : // Helper functions to parse a toml Item
918 :
919 3851 : fn parse_toml_string(name: &str, item: &Item) -> Result<String> {
920 3851 : let s = item
921 3851 : .as_str()
922 3851 : .with_context(|| format!("configure option {name} is not a string"))?;
923 3851 : Ok(s.to_string())
924 3851 : }
925 :
926 1699 : fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
927 : // A toml integer is signed, so it cannot represent the full range of an u64. That's OK
928 : // for our use, though.
929 1699 : let i: i64 = item
930 1699 : .as_integer()
931 1699 : .with_context(|| format!("configure option {name} is not an integer"))?;
932 1699 : if i < 0 {
933 0 : bail!("configure option {name} cannot be negative");
934 1699 : }
935 1699 : Ok(i as u64)
936 1699 : }
937 :
938 0 : fn parse_toml_bool(name: &str, item: &Item) -> Result<bool> {
939 0 : item.as_bool()
940 0 : .with_context(|| format!("configure option {name} is not a bool"))
941 0 : }
942 :
943 639 : fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
944 639 : let s = item
945 639 : .as_str()
946 639 : .with_context(|| format!("configure option {name} is not a string"))?;
947 :
948 639 : Ok(humantime::parse_duration(s)?)
949 639 : }
950 :
951 1888 : fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
952 1888 : where
953 1888 : T: FromStr,
954 1888 : <T as FromStr>::Err: std::fmt::Display,
955 1888 : {
956 1888 : let v = item
957 1888 : .as_str()
958 1888 : .with_context(|| format!("configure option {name} is not a string"))?;
959 1888 : T::from_str(v).map_err(|e| {
960 0 : anyhow!(
961 0 : "Failed to parse string as {parse_type} for configure option {name}: {e}",
962 0 : parse_type = stringify!(T)
963 0 : )
964 1888 : })
965 1888 : }
966 :
967 19 : fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
968 19 : where
969 19 : T: serde::de::DeserializeOwned,
970 19 : {
971 : // ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
972 19 : let deserializer = match item.clone().into_value() {
973 19 : Ok(value) => value.into_deserializer(),
974 0 : Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
975 : };
976 19 : T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
977 19 : }
978 :
979 : /// Configurable semaphore permits setting.
980 : ///
981 : /// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
982 : /// semaphore cannot be distinguished, leading any feature using these to await forever (or until
983 : /// new permits are added).
984 0 : #[derive(Debug, Clone)]
985 : pub struct ConfigurableSemaphore {
986 : initial_permits: NonZeroUsize,
987 : inner: std::sync::Arc<tokio::sync::Semaphore>,
988 : }
989 :
990 : impl ConfigurableSemaphore {
991 : pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
992 : Some(x) => x,
993 : None => panic!("const unwrap is not yet stable"),
994 : };
995 :
996 : /// Initializse using a non-zero amount of permits.
997 : ///
998 : /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
999 : /// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
1000 : /// behave like [`futures::future::pending`], just waiting until new permits are added.
1001 : ///
1002 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
1003 1988 : pub fn new(initial_permits: NonZeroUsize) -> Self {
1004 1988 : ConfigurableSemaphore {
1005 1988 : initial_permits,
1006 1988 : inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
1007 1988 : }
1008 1988 : }
1009 :
1010 : /// Returns the configured amount of permits.
1011 0 : pub fn initial_permits(&self) -> NonZeroUsize {
1012 0 : self.initial_permits
1013 0 : }
1014 : }
1015 :
1016 : impl Default for ConfigurableSemaphore {
1017 82 : fn default() -> Self {
1018 82 : Self::new(Self::DEFAULT_INITIAL)
1019 82 : }
1020 : }
1021 :
1022 : impl PartialEq for ConfigurableSemaphore {
1023 4 : fn eq(&self, other: &Self) -> bool {
1024 4 : // the number of permits can be increased at runtime, so we cannot really fulfill the
1025 4 : // PartialEq value equality otherwise
1026 4 : self.initial_permits == other.initial_permits
1027 4 : }
1028 : }
1029 :
1030 : impl Eq for ConfigurableSemaphore {}
1031 :
1032 : impl ConfigurableSemaphore {
1033 73 : pub fn inner(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
1034 73 : &self.inner
1035 73 : }
1036 : }
1037 :
1038 : #[cfg(test)]
1039 : mod tests {
1040 : use std::{
1041 : fs,
1042 : num::{NonZeroU32, NonZeroUsize},
1043 : };
1044 :
1045 : use remote_storage::{RemoteStorageKind, S3Config};
1046 : use tempfile::{tempdir, TempDir};
1047 : use utils::serde_percent::Percent;
1048 :
1049 : use super::*;
1050 : use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
1051 :
1052 : const ALL_BASE_VALUES_TOML: &str = r#"
1053 : # Initial configuration file created by 'pageserver --init'
1054 :
1055 : listen_pg_addr = '127.0.0.1:64000'
1056 : listen_http_addr = '127.0.0.1:9898'
1057 :
1058 : wait_lsn_timeout = '111 s'
1059 : wal_redo_timeout = '111 s'
1060 :
1061 : page_cache_size = 444
1062 : max_file_descriptors = 333
1063 :
1064 : # initial superuser role name to use when creating a new tenant
1065 : initial_superuser_name = 'zzzz'
1066 : id = 10
1067 :
1068 : metric_collection_interval = '222 s'
1069 : cached_metric_collection_interval = '22200 s'
1070 : metric_collection_endpoint = 'http://localhost:80/metrics'
1071 : synthetic_size_calculation_interval = '333 s'
1072 :
1073 : log_format = 'json'
1074 : background_task_maximum_delay = '334 s'
1075 :
1076 : "#;
1077 :
1078 1 : #[test]
1079 1 : fn parse_defaults() -> anyhow::Result<()> {
1080 1 : let tempdir = tempdir()?;
1081 1 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1082 1 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1083 1 : // we have to create dummy values to overcome the validation errors
1084 1 : let config_string = format!(
1085 1 : "pg_distrib_dir='{}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
1086 1 : pg_distrib_dir.display()
1087 1 : );
1088 1 : let toml = config_string.parse()?;
1089 :
1090 1 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1091 1 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1092 1 :
1093 1 : assert_eq!(
1094 1 : parsed_config,
1095 1 : PageServerConf {
1096 1 : id: NodeId(10),
1097 1 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1098 1 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1099 1 : availability_zone: None,
1100 1 : wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
1101 1 : wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
1102 1 : superuser: defaults::DEFAULT_SUPERUSER.to_string(),
1103 1 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1104 1 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1105 1 : workdir,
1106 1 : pg_distrib_dir,
1107 1 : http_auth_type: AuthType::Trust,
1108 1 : pg_auth_type: AuthType::Trust,
1109 1 : auth_validation_public_key_path: None,
1110 1 : remote_storage_config: None,
1111 1 : default_tenant_conf: TenantConf::default(),
1112 1 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1113 1 : broker_keepalive_interval: humantime::parse_duration(
1114 1 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL
1115 1 : )?,
1116 1 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1117 1 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1118 1 : eviction_task_immitated_concurrent_logical_size_queries:
1119 1 : ConfigurableSemaphore::default(),
1120 1 : metric_collection_interval: humantime::parse_duration(
1121 1 : defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
1122 1 : )?,
1123 1 : cached_metric_collection_interval: humantime::parse_duration(
1124 1 : defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
1125 1 : )?,
1126 1 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1127 1 : synthetic_size_calculation_interval: humantime::parse_duration(
1128 1 : defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
1129 1 : )?,
1130 1 : disk_usage_based_eviction: None,
1131 1 : test_remote_failures: 0,
1132 1 : ondemand_download_behavior_treat_error_as_warn: false,
1133 1 : background_task_maximum_delay: humantime::parse_duration(
1134 1 : defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
1135 1 : )?,
1136 : },
1137 0 : "Correct defaults should be used when no config values are provided"
1138 : );
1139 :
1140 1 : Ok(())
1141 1 : }
1142 :
1143 1 : #[test]
1144 1 : fn parse_basic_config() -> anyhow::Result<()> {
1145 1 : let tempdir = tempdir()?;
1146 1 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1147 1 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1148 1 :
1149 1 : let config_string = format!(
1150 1 : "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoint = '{broker_endpoint}'",
1151 1 : pg_distrib_dir.display()
1152 1 : );
1153 1 : let toml = config_string.parse()?;
1154 :
1155 1 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1156 1 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1157 1 :
1158 1 : assert_eq!(
1159 1 : parsed_config,
1160 1 : PageServerConf {
1161 1 : id: NodeId(10),
1162 1 : listen_pg_addr: "127.0.0.1:64000".to_string(),
1163 1 : listen_http_addr: "127.0.0.1:9898".to_string(),
1164 1 : availability_zone: None,
1165 1 : wait_lsn_timeout: Duration::from_secs(111),
1166 1 : wal_redo_timeout: Duration::from_secs(111),
1167 1 : superuser: "zzzz".to_string(),
1168 1 : page_cache_size: 444,
1169 1 : max_file_descriptors: 333,
1170 1 : workdir,
1171 1 : pg_distrib_dir,
1172 1 : http_auth_type: AuthType::Trust,
1173 1 : pg_auth_type: AuthType::Trust,
1174 1 : auth_validation_public_key_path: None,
1175 1 : remote_storage_config: None,
1176 1 : default_tenant_conf: TenantConf::default(),
1177 1 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1178 1 : broker_keepalive_interval: Duration::from_secs(5),
1179 1 : log_format: LogFormat::Json,
1180 1 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1181 1 : eviction_task_immitated_concurrent_logical_size_queries:
1182 1 : ConfigurableSemaphore::default(),
1183 1 : metric_collection_interval: Duration::from_secs(222),
1184 1 : cached_metric_collection_interval: Duration::from_secs(22200),
1185 1 : metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
1186 1 : synthetic_size_calculation_interval: Duration::from_secs(333),
1187 1 : disk_usage_based_eviction: None,
1188 1 : test_remote_failures: 0,
1189 1 : ondemand_download_behavior_treat_error_as_warn: false,
1190 1 : background_task_maximum_delay: Duration::from_secs(334),
1191 : },
1192 0 : "Should be able to parse all basic config values correctly"
1193 : );
1194 :
1195 1 : Ok(())
1196 1 : }
1197 :
1198 1 : #[test]
1199 1 : fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
1200 1 : let tempdir = tempdir()?;
1201 1 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1202 1 : let broker_endpoint = "http://127.0.0.1:7777";
1203 1 :
1204 1 : let local_storage_path = tempdir.path().join("local_remote_storage");
1205 1 :
1206 1 : let identical_toml_declarations = &[
1207 1 : format!(
1208 1 : r#"[remote_storage]
1209 1 : local_path = '{}'"#,
1210 1 : local_storage_path.display()
1211 1 : ),
1212 1 : format!(
1213 1 : "remote_storage={{local_path='{}'}}",
1214 1 : local_storage_path.display()
1215 1 : ),
1216 1 : ];
1217 :
1218 3 : for remote_storage_config_str in identical_toml_declarations {
1219 2 : let config_string = format!(
1220 2 : r#"{ALL_BASE_VALUES_TOML}
1221 2 : pg_distrib_dir='{}'
1222 2 : broker_endpoint = '{broker_endpoint}'
1223 2 :
1224 2 : {remote_storage_config_str}"#,
1225 2 : pg_distrib_dir.display(),
1226 2 : );
1227 :
1228 2 : let toml = config_string.parse()?;
1229 :
1230 2 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1231 2 : .unwrap_or_else(|e| {
1232 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1233 2 : })
1234 2 : .remote_storage_config
1235 2 : .expect("Should have remote storage config for the local FS");
1236 2 :
1237 2 : assert_eq!(
1238 2 : parsed_remote_storage_config,
1239 2 : RemoteStorageConfig {
1240 2 : max_concurrent_syncs: NonZeroUsize::new(
1241 2 : remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS
1242 2 : )
1243 2 : .unwrap(),
1244 2 : max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
1245 2 : .unwrap(),
1246 2 : storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
1247 2 : },
1248 0 : "Remote storage config should correctly parse the local FS config and fill other storage defaults"
1249 : );
1250 : }
1251 1 : Ok(())
1252 1 : }
1253 :
1254 1 : #[test]
1255 1 : fn parse_remote_s3_storage_config() -> anyhow::Result<()> {
1256 1 : let tempdir = tempdir()?;
1257 1 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1258 :
1259 1 : let bucket_name = "some-sample-bucket".to_string();
1260 1 : let bucket_region = "eu-north-1".to_string();
1261 1 : let prefix_in_bucket = "test_prefix".to_string();
1262 1 : let endpoint = "http://localhost:5000".to_string();
1263 1 : let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
1264 1 : let max_sync_errors = NonZeroU32::new(222).unwrap();
1265 1 : let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
1266 1 : let broker_endpoint = "http://127.0.0.1:7777";
1267 1 :
1268 1 : let identical_toml_declarations = &[
1269 1 : format!(
1270 1 : r#"[remote_storage]
1271 1 : max_concurrent_syncs = {max_concurrent_syncs}
1272 1 : max_sync_errors = {max_sync_errors}
1273 1 : bucket_name = '{bucket_name}'
1274 1 : bucket_region = '{bucket_region}'
1275 1 : prefix_in_bucket = '{prefix_in_bucket}'
1276 1 : endpoint = '{endpoint}'
1277 1 : concurrency_limit = {s3_concurrency_limit}"#
1278 1 : ),
1279 1 : format!(
1280 1 : "remote_storage={{max_concurrent_syncs={max_concurrent_syncs}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\
1281 1 : bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}",
1282 1 : ),
1283 1 : ];
1284 :
1285 3 : for remote_storage_config_str in identical_toml_declarations {
1286 2 : let config_string = format!(
1287 2 : r#"{ALL_BASE_VALUES_TOML}
1288 2 : pg_distrib_dir='{}'
1289 2 : broker_endpoint = '{broker_endpoint}'
1290 2 :
1291 2 : {remote_storage_config_str}"#,
1292 2 : pg_distrib_dir.display(),
1293 2 : );
1294 :
1295 2 : let toml = config_string.parse()?;
1296 :
1297 2 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1298 2 : .unwrap_or_else(|e| {
1299 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1300 2 : })
1301 2 : .remote_storage_config
1302 2 : .expect("Should have remote storage config for S3");
1303 2 :
1304 2 : assert_eq!(
1305 2 : parsed_remote_storage_config,
1306 2 : RemoteStorageConfig {
1307 2 : max_concurrent_syncs,
1308 2 : max_sync_errors,
1309 2 : storage: RemoteStorageKind::AwsS3(S3Config {
1310 2 : bucket_name: bucket_name.clone(),
1311 2 : bucket_region: bucket_region.clone(),
1312 2 : prefix_in_bucket: Some(prefix_in_bucket.clone()),
1313 2 : endpoint: Some(endpoint.clone()),
1314 2 : concurrency_limit: s3_concurrency_limit,
1315 2 : max_keys_per_list_response: None,
1316 2 : }),
1317 2 : },
1318 0 : "Remote storage config should correctly parse the S3 config"
1319 : );
1320 : }
1321 1 : Ok(())
1322 1 : }
1323 :
1324 1 : #[test]
1325 1 : fn parse_tenant_config() -> anyhow::Result<()> {
1326 1 : let tempdir = tempdir()?;
1327 1 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1328 :
1329 1 : let broker_endpoint = "http://127.0.0.1:7777";
1330 1 : let trace_read_requests = true;
1331 1 :
1332 1 : let config_string = format!(
1333 1 : r#"{ALL_BASE_VALUES_TOML}
1334 1 : pg_distrib_dir='{}'
1335 1 : broker_endpoint = '{broker_endpoint}'
1336 1 :
1337 1 : [tenant_config]
1338 1 : trace_read_requests = {trace_read_requests}"#,
1339 1 : pg_distrib_dir.display(),
1340 1 : );
1341 :
1342 1 : let toml = config_string.parse()?;
1343 :
1344 1 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1345 1 : assert_eq!(
1346 : conf.default_tenant_conf.trace_read_requests, trace_read_requests,
1347 0 : "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
1348 : );
1349 :
1350 1 : Ok(())
1351 1 : }
1352 :
1353 1 : #[test]
1354 1 : fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
1355 1 : let tempdir = tempdir()?;
1356 1 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1357 :
1358 1 : let pageserver_conf_toml = format!(
1359 1 : r#"pg_distrib_dir = "{}"
1360 1 : metric_collection_endpoint = "http://sample.url"
1361 1 : metric_collection_interval = "10min"
1362 1 : id = 222
1363 1 :
1364 1 : [disk_usage_based_eviction]
1365 1 : max_usage_pct = 80
1366 1 : min_avail_bytes = 0
1367 1 : period = "10s"
1368 1 :
1369 1 : [tenant_config]
1370 1 : evictions_low_residence_duration_metric_threshold = "20m"
1371 1 :
1372 1 : [tenant_config.eviction_policy]
1373 1 : kind = "LayerAccessThreshold"
1374 1 : period = "20m"
1375 1 : threshold = "20m"
1376 1 : "#,
1377 1 : pg_distrib_dir.display(),
1378 1 : );
1379 1 : let toml: Document = pageserver_conf_toml.parse()?;
1380 1 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1381 :
1382 1 : assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
1383 1 : assert_eq!(
1384 1 : conf.metric_collection_endpoint,
1385 1 : Some("http://sample.url".parse().unwrap())
1386 1 : );
1387 1 : assert_eq!(
1388 1 : conf.metric_collection_interval,
1389 1 : Duration::from_secs(10 * 60)
1390 1 : );
1391 1 : assert_eq!(
1392 1 : conf.default_tenant_conf
1393 1 : .evictions_low_residence_duration_metric_threshold,
1394 1 : Duration::from_secs(20 * 60)
1395 1 : );
1396 1 : assert_eq!(conf.id, NodeId(222));
1397 1 : assert_eq!(
1398 1 : conf.disk_usage_based_eviction,
1399 1 : Some(DiskUsageEvictionTaskConfig {
1400 1 : max_usage_pct: Percent::new(80).unwrap(),
1401 1 : min_avail_bytes: 0,
1402 1 : period: Duration::from_secs(10),
1403 1 : #[cfg(feature = "testing")]
1404 1 : mock_statvfs: None,
1405 1 : })
1406 1 : );
1407 1 : match &conf.default_tenant_conf.eviction_policy {
1408 0 : EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
1409 1 : EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
1410 1 : assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
1411 1 : assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
1412 : }
1413 : }
1414 :
1415 1 : Ok(())
1416 1 : }
1417 :
1418 6 : fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
1419 6 : let tempdir_path = tempdir.path();
1420 6 :
1421 6 : let workdir = tempdir_path.join("workdir");
1422 6 : fs::create_dir_all(&workdir)?;
1423 :
1424 6 : let pg_distrib_dir = tempdir_path.join("pg_distrib");
1425 6 : let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
1426 6 : fs::create_dir_all(&pg_distrib_dir_versioned)?;
1427 6 : let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
1428 6 : fs::create_dir_all(&postgres_bin_dir)?;
1429 6 : fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;
1430 :
1431 6 : Ok((workdir, pg_distrib_dir))
1432 6 : }
1433 : }
|