Line data Source code
1 : //! Functions for handling page server configuration options
2 : //!
3 : //! Configuration options can be set in the pageserver.toml configuration
4 : //! file, or on the command line.
5 : //! See also `settings.md` for better description on every parameter.
6 :
7 : use anyhow::{anyhow, bail, ensure, Context, Result};
8 : use pageserver_api::shard::TenantShardId;
9 : use remote_storage::{RemotePath, RemoteStorageConfig};
10 : use serde;
11 : use serde::de::IntoDeserializer;
12 : use std::{collections::HashMap, env};
13 : use storage_broker::Uri;
14 : use utils::crashsafe::path_with_suffix_extension;
15 : use utils::id::ConnectionId;
16 : use utils::logging::SecretString;
17 :
18 : use once_cell::sync::OnceCell;
19 : use reqwest::Url;
20 : use std::num::NonZeroUsize;
21 : use std::str::FromStr;
22 : use std::sync::Arc;
23 : use std::time::Duration;
24 : use toml_edit::{Document, Item};
25 :
26 : use camino::{Utf8Path, Utf8PathBuf};
27 : use postgres_backend::AuthType;
28 : use utils::{
29 : id::{NodeId, TimelineId},
30 : logging::LogFormat,
31 : };
32 :
33 : use crate::tenant::config::TenantConfOpt;
34 : use crate::tenant::timeline::GetVectoredImpl;
35 : use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
36 : use crate::tenant::{
37 : TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
38 : };
39 : use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
40 : use crate::{tenant::config::TenantConf, virtual_file};
41 : use crate::{
42 : IGNORED_TENANT_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
43 : TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
44 : };
45 :
46 : use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
47 :
48 : use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
49 :
50 : pub mod defaults {
51 : use crate::tenant::config::defaults::*;
52 : use const_format::formatcp;
53 :
54 : pub use pageserver_api::{
55 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
56 : DEFAULT_PG_LISTEN_PORT,
57 : };
58 : pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
59 :
60 : pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
61 : pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
62 :
63 : pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
64 :
65 : pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
66 : pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
67 :
68 : pub const DEFAULT_LOG_FORMAT: &str = "plain";
69 :
70 : pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
71 :
72 : pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
73 : super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
74 :
75 : pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
76 : pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
77 : pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
78 : pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
79 : pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
80 :
81 : pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
82 : pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
83 :
84 : pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
85 :
86 : #[cfg(target_os = "linux")]
87 : pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "tokio-epoll-uring";
88 :
89 : #[cfg(not(target_os = "linux"))]
90 : pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
91 :
92 : pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
93 :
94 : pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
95 :
96 : pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
97 :
98 : pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
99 :
100 : pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "sync";
101 :
102 : ///
103 : /// Default built-in configuration file.
104 : ///
105 : pub const DEFAULT_CONFIG_FILE: &str = formatcp!(
106 : r#"
107 : # Initial configuration file created by 'pageserver --init'
108 : #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
109 : #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
110 :
111 : #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
112 : #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
113 :
114 : #page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
115 : #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
116 :
117 : # initial superuser role name to use when creating a new tenant
118 : #initial_superuser_name = '{DEFAULT_SUPERUSER}'
119 :
120 : #broker_endpoint = '{BROKER_DEFAULT_ENDPOINT}'
121 :
122 : #log_format = '{DEFAULT_LOG_FORMAT}'
123 :
124 : #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
125 : #concurrent_tenant_warmup = '{DEFAULT_CONCURRENT_TENANT_WARMUP}'
126 :
127 : #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
128 : #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
129 : #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
130 :
131 : #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
132 :
133 : #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
134 :
135 : #ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
136 :
137 : #virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
138 :
139 : #get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
140 :
141 : #max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
142 :
143 : #validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
144 :
145 : #walredo_process_kind = '{DEFAULT_WALREDO_PROCESS_KIND}'
146 :
147 : [tenant_config]
148 : #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
149 : #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
150 : #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
151 : #compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
152 : #compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
153 :
154 : #gc_period = '{DEFAULT_GC_PERIOD}'
155 : #gc_horizon = {DEFAULT_GC_HORIZON}
156 : #image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD}
157 : #pitr_interval = '{DEFAULT_PITR_INTERVAL}'
158 :
159 : #min_resident_size_override = .. # in bytes
160 : #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
161 :
162 : #heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
163 : #secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
164 :
165 : #ephemeral_bytes_per_memory_kb = {DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB}
166 :
167 : [remote_storage]
168 :
169 : "#
170 : );
171 : }
172 :
173 : #[derive(Debug, Clone, PartialEq, Eq)]
174 : pub struct PageServerConf {
175 : // Identifier of that particular pageserver so e g safekeepers
176 : // can safely distinguish different pageservers
177 : pub id: NodeId,
178 :
179 : /// Example (default): 127.0.0.1:64000
180 : pub listen_pg_addr: String,
181 : /// Example (default): 127.0.0.1:9898
182 : pub listen_http_addr: String,
183 :
184 : /// Current availability zone. Used for traffic metrics.
185 : pub availability_zone: Option<String>,
186 :
187 : // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
188 : pub wait_lsn_timeout: Duration,
189 : // How long to wait for WAL redo to complete.
190 : pub wal_redo_timeout: Duration,
191 :
192 : pub superuser: String,
193 :
194 : pub page_cache_size: usize,
195 : pub max_file_descriptors: usize,
196 :
197 : // Repository directory, relative to current working directory.
198 : // Normally, the page server changes the current working directory
199 : // to the repository, and 'workdir' is always '.'. But we don't do
200 : // that during unit testing, because the current directory is global
201 : // to the process but different unit tests work on different
202 : // repositories.
203 : pub workdir: Utf8PathBuf,
204 :
205 : pub pg_distrib_dir: Utf8PathBuf,
206 :
207 : // Authentication
208 : /// authentication method for the HTTP mgmt API
209 : pub http_auth_type: AuthType,
210 : /// authentication method for libpq connections from compute
211 : pub pg_auth_type: AuthType,
212 : /// Path to a file or directory containing public key(s) for verifying JWT tokens.
213 : /// Used for both mgmt and compute auth, if enabled.
214 : pub auth_validation_public_key_path: Option<Utf8PathBuf>,
215 :
216 : pub remote_storage_config: Option<RemoteStorageConfig>,
217 :
218 : pub default_tenant_conf: TenantConf,
219 :
220 : /// Storage broker endpoints to connect to.
221 : pub broker_endpoint: Uri,
222 : pub broker_keepalive_interval: Duration,
223 :
224 : pub log_format: LogFormat,
225 :
226 : /// Number of tenants which will be concurrently loaded from remote storage proactively on startup or attach.
227 : ///
228 : /// A lower value implicitly deprioritizes loading such tenants, vs. other work in the system.
229 : pub concurrent_tenant_warmup: ConfigurableSemaphore,
230 :
231 : /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
232 : pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
233 : /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
234 : /// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
235 : /// See the comment in `eviction_task` for details.
236 : ///
237 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
238 : pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
239 :
240 : // How often to collect metrics and send them to the metrics endpoint.
241 : pub metric_collection_interval: Duration,
242 : // How often to send unchanged cached metrics to the metrics endpoint.
243 : pub cached_metric_collection_interval: Duration,
244 : pub metric_collection_endpoint: Option<Url>,
245 : pub metric_collection_bucket: Option<RemoteStorageConfig>,
246 : pub synthetic_size_calculation_interval: Duration,
247 :
248 : pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
249 :
250 : pub test_remote_failures: u64,
251 :
252 : pub ondemand_download_behavior_treat_error_as_warn: bool,
253 :
254 : /// How long will background tasks be delayed at most after initial load of tenants.
255 : ///
256 : /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
257 : /// as we now isolate initial loading, initial logical size calculation and background tasks.
258 : /// Smaller nodes will have background tasks "not running" for this long unless every timeline
259 : /// has it's initial logical size calculated. Not running background tasks for some seconds is
260 : /// not terrible.
261 : pub background_task_maximum_delay: Duration,
262 :
263 : pub control_plane_api: Option<Url>,
264 :
265 : /// JWT token for use with the control plane API.
266 : pub control_plane_api_token: Option<SecretString>,
267 :
268 : /// If true, pageserver will make best-effort to operate without a control plane: only
269 : /// for use in major incidents.
270 : pub control_plane_emergency_mode: bool,
271 :
272 : /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
273 : /// heatmap uploads vs. other remote storage operations.
274 : pub heatmap_upload_concurrency: usize,
275 :
276 : /// How many remote storage downloads may be done for secondary tenants concurrently. Implicitly
277 : /// deprioritises secondary downloads vs. remote storage operations for attached tenants.
278 : pub secondary_download_concurrency: usize,
279 :
280 : /// Maximum number of WAL records to be ingested and committed at the same time
281 : pub ingest_batch_size: u64,
282 :
283 : pub virtual_file_io_engine: virtual_file::IoEngineKind,
284 :
285 : pub get_vectored_impl: GetVectoredImpl,
286 :
287 : pub max_vectored_read_bytes: MaxVectoredReadBytes,
288 :
289 : pub validate_vectored_get: bool,
290 :
291 : /// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
292 : /// is exceeded, we start proactively closing ephemeral layers to limit the total amount
293 : /// of ephemeral data.
294 : ///
295 : /// Setting this to zero disables limits on total ephemeral layer size.
296 : pub ephemeral_bytes_per_memory_kb: usize,
297 :
298 : pub walredo_process_kind: crate::walredo::ProcessKind,
299 : }
300 :
301 : /// We do not want to store this in a PageServerConf because the latter may be logged
302 : /// and/or serialized at a whim, while the token is secret. Currently this token is the
303 : /// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
304 : /// the future, more tokens and auth may arrive for storage broker, completely changing the logic.
305 : /// Hence, we resort to a global variable for now instead of passing the token from the
306 : /// startup code to the connection code through a dozen layers.
307 : pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
308 :
309 : // use dedicated enum for builder to better indicate the intention
310 : // and avoid possible confusion with nested options
311 : #[derive(Clone, Default)]
312 : pub enum BuilderValue<T> {
313 : Set(T),
314 : #[default]
315 : NotSet,
316 : }
317 :
318 : impl<T: Clone> BuilderValue<T> {
319 738 : pub fn ok_or(&self, field_name: &'static str, default: BuilderValue<T>) -> anyhow::Result<T> {
320 738 : match self {
321 242 : Self::Set(v) => Ok(v.clone()),
322 496 : Self::NotSet => match default {
323 496 : BuilderValue::Set(v) => Ok(v.clone()),
324 : BuilderValue::NotSet => {
325 0 : anyhow::bail!("missing config value {field_name:?}")
326 : }
327 : },
328 : }
329 738 : }
330 : }
331 :
332 : // Certain metadata (e.g. externally-addressable name, AZ) is delivered
333 : // as a separate structure. This information is not neeed by the pageserver
334 : // itself, it is only used for registering the pageserver with the control
335 : // plane and/or storage controller.
336 : //
337 0 : #[derive(serde::Deserialize)]
338 : pub(crate) struct NodeMetadata {
339 : #[serde(rename = "host")]
340 : pub(crate) postgres_host: String,
341 : #[serde(rename = "port")]
342 : pub(crate) postgres_port: u16,
343 : pub(crate) http_host: String,
344 : pub(crate) http_port: u16,
345 :
346 : // Deployment tools may write fields to the metadata file beyond what we
347 : // use in this type: this type intentionally only names fields that require.
348 : #[serde(flatten)]
349 : pub(crate) other: HashMap<String, serde_json::Value>,
350 : }
351 :
352 : // needed to simplify config construction
353 : #[derive(Default)]
354 : struct PageServerConfigBuilder {
355 : listen_pg_addr: BuilderValue<String>,
356 :
357 : listen_http_addr: BuilderValue<String>,
358 :
359 : availability_zone: BuilderValue<Option<String>>,
360 :
361 : wait_lsn_timeout: BuilderValue<Duration>,
362 : wal_redo_timeout: BuilderValue<Duration>,
363 :
364 : superuser: BuilderValue<String>,
365 :
366 : page_cache_size: BuilderValue<usize>,
367 : max_file_descriptors: BuilderValue<usize>,
368 :
369 : workdir: BuilderValue<Utf8PathBuf>,
370 :
371 : pg_distrib_dir: BuilderValue<Utf8PathBuf>,
372 :
373 : http_auth_type: BuilderValue<AuthType>,
374 : pg_auth_type: BuilderValue<AuthType>,
375 :
376 : //
377 : auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
378 : remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
379 :
380 : id: BuilderValue<NodeId>,
381 :
382 : broker_endpoint: BuilderValue<Uri>,
383 : broker_keepalive_interval: BuilderValue<Duration>,
384 :
385 : log_format: BuilderValue<LogFormat>,
386 :
387 : concurrent_tenant_warmup: BuilderValue<NonZeroUsize>,
388 : concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
389 :
390 : metric_collection_interval: BuilderValue<Duration>,
391 : cached_metric_collection_interval: BuilderValue<Duration>,
392 : metric_collection_endpoint: BuilderValue<Option<Url>>,
393 : synthetic_size_calculation_interval: BuilderValue<Duration>,
394 : metric_collection_bucket: BuilderValue<Option<RemoteStorageConfig>>,
395 :
396 : disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
397 :
398 : test_remote_failures: BuilderValue<u64>,
399 :
400 : ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
401 :
402 : background_task_maximum_delay: BuilderValue<Duration>,
403 :
404 : control_plane_api: BuilderValue<Option<Url>>,
405 : control_plane_api_token: BuilderValue<Option<SecretString>>,
406 : control_plane_emergency_mode: BuilderValue<bool>,
407 :
408 : heatmap_upload_concurrency: BuilderValue<usize>,
409 : secondary_download_concurrency: BuilderValue<usize>,
410 :
411 : ingest_batch_size: BuilderValue<u64>,
412 :
413 : virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
414 :
415 : get_vectored_impl: BuilderValue<GetVectoredImpl>,
416 :
417 : max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
418 :
419 : validate_vectored_get: BuilderValue<bool>,
420 :
421 : ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
422 :
423 : walredo_process_kind: BuilderValue<crate::walredo::ProcessKind>,
424 : }
425 :
426 : impl PageServerConfigBuilder {
427 : #[inline(always)]
428 18 : fn default_values() -> Self {
429 18 : use self::BuilderValue::*;
430 18 : use defaults::*;
431 18 : Self {
432 18 : listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
433 18 : listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
434 18 : availability_zone: Set(None),
435 18 : wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
436 18 : .expect("cannot parse default wait lsn timeout")),
437 18 : wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
438 18 : .expect("cannot parse default wal redo timeout")),
439 18 : superuser: Set(DEFAULT_SUPERUSER.to_string()),
440 18 : page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
441 18 : max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
442 18 : workdir: Set(Utf8PathBuf::new()),
443 18 : pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
444 18 : env::current_dir().expect("cannot access current directory"),
445 18 : )
446 18 : .expect("non-Unicode path")
447 18 : .join("pg_install")),
448 18 : http_auth_type: Set(AuthType::Trust),
449 18 : pg_auth_type: Set(AuthType::Trust),
450 18 : auth_validation_public_key_path: Set(None),
451 18 : remote_storage_config: Set(None),
452 18 : id: NotSet,
453 18 : broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
454 18 : .parse()
455 18 : .expect("failed to parse default broker endpoint")),
456 18 : broker_keepalive_interval: Set(humantime::parse_duration(
457 18 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
458 18 : )
459 18 : .expect("cannot parse default keepalive interval")),
460 18 : log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
461 18 :
462 18 : concurrent_tenant_warmup: Set(NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
463 18 : .expect("Invalid default constant")),
464 18 : concurrent_tenant_size_logical_size_queries: Set(
465 18 : ConfigurableSemaphore::DEFAULT_INITIAL,
466 18 : ),
467 18 : metric_collection_interval: Set(humantime::parse_duration(
468 18 : DEFAULT_METRIC_COLLECTION_INTERVAL,
469 18 : )
470 18 : .expect("cannot parse default metric collection interval")),
471 18 : cached_metric_collection_interval: Set(humantime::parse_duration(
472 18 : DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
473 18 : )
474 18 : .expect("cannot parse default cached_metric_collection_interval")),
475 18 : synthetic_size_calculation_interval: Set(humantime::parse_duration(
476 18 : DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
477 18 : )
478 18 : .expect("cannot parse default synthetic size calculation interval")),
479 18 : metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
480 18 :
481 18 : metric_collection_bucket: Set(None),
482 18 :
483 18 : disk_usage_based_eviction: Set(None),
484 18 :
485 18 : test_remote_failures: Set(0),
486 18 :
487 18 : ondemand_download_behavior_treat_error_as_warn: Set(false),
488 18 :
489 18 : background_task_maximum_delay: Set(humantime::parse_duration(
490 18 : DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
491 18 : )
492 18 : .unwrap()),
493 18 :
494 18 : control_plane_api: Set(None),
495 18 : control_plane_api_token: Set(None),
496 18 : control_plane_emergency_mode: Set(false),
497 18 :
498 18 : heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
499 18 : secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
500 18 :
501 18 : ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
502 18 :
503 18 : virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
504 18 :
505 18 : get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
506 18 : max_vectored_read_bytes: Set(MaxVectoredReadBytes(
507 18 : NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
508 18 : )),
509 18 : validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
510 18 : ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
511 18 :
512 18 : walredo_process_kind: Set(DEFAULT_WALREDO_PROCESS_KIND.parse().unwrap()),
513 18 : }
514 18 : }
515 : }
516 :
517 : impl PageServerConfigBuilder {
518 12 : pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
519 12 : self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
520 12 : }
521 :
522 12 : pub fn listen_http_addr(&mut self, listen_http_addr: String) {
523 12 : self.listen_http_addr = BuilderValue::Set(listen_http_addr)
524 12 : }
525 :
526 0 : pub fn availability_zone(&mut self, availability_zone: Option<String>) {
527 0 : self.availability_zone = BuilderValue::Set(availability_zone)
528 0 : }
529 :
530 12 : pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
531 12 : self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
532 12 : }
533 :
534 12 : pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
535 12 : self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
536 12 : }
537 :
538 12 : pub fn superuser(&mut self, superuser: String) {
539 12 : self.superuser = BuilderValue::Set(superuser)
540 12 : }
541 :
542 12 : pub fn page_cache_size(&mut self, page_cache_size: usize) {
543 12 : self.page_cache_size = BuilderValue::Set(page_cache_size)
544 12 : }
545 :
546 12 : pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
547 12 : self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
548 12 : }
549 :
550 18 : pub fn workdir(&mut self, workdir: Utf8PathBuf) {
551 18 : self.workdir = BuilderValue::Set(workdir)
552 18 : }
553 :
554 18 : pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
555 18 : self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
556 18 : }
557 :
558 0 : pub fn http_auth_type(&mut self, auth_type: AuthType) {
559 0 : self.http_auth_type = BuilderValue::Set(auth_type)
560 0 : }
561 :
562 0 : pub fn pg_auth_type(&mut self, auth_type: AuthType) {
563 0 : self.pg_auth_type = BuilderValue::Set(auth_type)
564 0 : }
565 :
566 0 : pub fn auth_validation_public_key_path(
567 0 : &mut self,
568 0 : auth_validation_public_key_path: Option<Utf8PathBuf>,
569 0 : ) {
570 0 : self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
571 0 : }
572 :
573 8 : pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
574 8 : self.remote_storage_config = BuilderValue::Set(remote_storage_config)
575 8 : }
576 :
577 14 : pub fn broker_endpoint(&mut self, broker_endpoint: Uri) {
578 14 : self.broker_endpoint = BuilderValue::Set(broker_endpoint)
579 14 : }
580 :
581 0 : pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
582 0 : self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
583 0 : }
584 :
585 18 : pub fn id(&mut self, node_id: NodeId) {
586 18 : self.id = BuilderValue::Set(node_id)
587 18 : }
588 :
589 12 : pub fn log_format(&mut self, log_format: LogFormat) {
590 12 : self.log_format = BuilderValue::Set(log_format)
591 12 : }
592 :
593 0 : pub fn concurrent_tenant_warmup(&mut self, u: NonZeroUsize) {
594 0 : self.concurrent_tenant_warmup = BuilderValue::Set(u);
595 0 : }
596 :
597 0 : pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
598 0 : self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
599 0 : }
600 :
601 16 : pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) {
602 16 : self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
603 16 : }
604 :
605 12 : pub fn cached_metric_collection_interval(
606 12 : &mut self,
607 12 : cached_metric_collection_interval: Duration,
608 12 : ) {
609 12 : self.cached_metric_collection_interval =
610 12 : BuilderValue::Set(cached_metric_collection_interval)
611 12 : }
612 :
613 16 : pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
614 16 : self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
615 16 : }
616 :
617 0 : pub fn metric_collection_bucket(
618 0 : &mut self,
619 0 : metric_collection_bucket: Option<RemoteStorageConfig>,
620 0 : ) {
621 0 : self.metric_collection_bucket = BuilderValue::Set(metric_collection_bucket)
622 0 : }
623 :
624 12 : pub fn synthetic_size_calculation_interval(
625 12 : &mut self,
626 12 : synthetic_size_calculation_interval: Duration,
627 12 : ) {
628 12 : self.synthetic_size_calculation_interval =
629 12 : BuilderValue::Set(synthetic_size_calculation_interval)
630 12 : }
631 :
632 0 : pub fn test_remote_failures(&mut self, fail_first: u64) {
633 0 : self.test_remote_failures = BuilderValue::Set(fail_first);
634 0 : }
635 :
636 2 : pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
637 2 : self.disk_usage_based_eviction = BuilderValue::Set(value);
638 2 : }
639 :
640 0 : pub fn ondemand_download_behavior_treat_error_as_warn(
641 0 : &mut self,
642 0 : ondemand_download_behavior_treat_error_as_warn: bool,
643 0 : ) {
644 0 : self.ondemand_download_behavior_treat_error_as_warn =
645 0 : BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
646 0 : }
647 :
648 12 : pub fn background_task_maximum_delay(&mut self, delay: Duration) {
649 12 : self.background_task_maximum_delay = BuilderValue::Set(delay);
650 12 : }
651 :
652 0 : pub fn control_plane_api(&mut self, api: Option<Url>) {
653 0 : self.control_plane_api = BuilderValue::Set(api)
654 0 : }
655 :
656 0 : pub fn control_plane_api_token(&mut self, token: Option<SecretString>) {
657 0 : self.control_plane_api_token = BuilderValue::Set(token)
658 0 : }
659 :
660 0 : pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
661 0 : self.control_plane_emergency_mode = BuilderValue::Set(enabled)
662 0 : }
663 :
664 0 : pub fn heatmap_upload_concurrency(&mut self, value: usize) {
665 0 : self.heatmap_upload_concurrency = BuilderValue::Set(value)
666 0 : }
667 :
668 0 : pub fn secondary_download_concurrency(&mut self, value: usize) {
669 0 : self.secondary_download_concurrency = BuilderValue::Set(value)
670 0 : }
671 :
672 0 : pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
673 0 : self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
674 0 : }
675 :
676 0 : pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
677 0 : self.virtual_file_io_engine = BuilderValue::Set(value);
678 0 : }
679 :
680 0 : pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) {
681 0 : self.get_vectored_impl = BuilderValue::Set(value);
682 0 : }
683 :
684 0 : pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
685 0 : self.max_vectored_read_bytes = BuilderValue::Set(value);
686 0 : }
687 :
688 0 : pub fn get_validate_vectored_get(&mut self, value: bool) {
689 0 : self.validate_vectored_get = BuilderValue::Set(value);
690 0 : }
691 :
692 0 : pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
693 0 : self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
694 0 : }
695 :
696 0 : pub fn get_walredo_process_kind(&mut self, value: crate::walredo::ProcessKind) {
697 0 : self.walredo_process_kind = BuilderValue::Set(value);
698 0 : }
699 :
700 18 : pub fn build(self) -> anyhow::Result<PageServerConf> {
701 18 : let default = Self::default_values();
702 18 :
703 18 : macro_rules! conf {
704 18 : (USING DEFAULT { $($field:ident,)* } CUSTOM LOGIC { $($custom_field:ident : $custom_value:expr,)* } ) => {
705 18 : PageServerConf {
706 18 : $(
707 18 : $field: self.$field.ok_or(stringify!($field), default.$field)?,
708 18 : )*
709 18 : $(
710 18 : $custom_field: $custom_value,
711 18 : )*
712 18 : }
713 18 : };
714 18 : }
715 18 :
716 18 : Ok(conf!(
717 : USING DEFAULT
718 : {
719 : listen_pg_addr,
720 : listen_http_addr,
721 : availability_zone,
722 : wait_lsn_timeout,
723 : wal_redo_timeout,
724 : superuser,
725 : page_cache_size,
726 : max_file_descriptors,
727 : workdir,
728 : pg_distrib_dir,
729 : http_auth_type,
730 : pg_auth_type,
731 : auth_validation_public_key_path,
732 : remote_storage_config,
733 : id,
734 : broker_endpoint,
735 : broker_keepalive_interval,
736 : log_format,
737 : metric_collection_interval,
738 : cached_metric_collection_interval,
739 : metric_collection_endpoint,
740 : metric_collection_bucket,
741 : synthetic_size_calculation_interval,
742 : disk_usage_based_eviction,
743 : test_remote_failures,
744 : ondemand_download_behavior_treat_error_as_warn,
745 : background_task_maximum_delay,
746 : control_plane_api,
747 : control_plane_api_token,
748 : control_plane_emergency_mode,
749 : heatmap_upload_concurrency,
750 : secondary_download_concurrency,
751 : ingest_batch_size,
752 : get_vectored_impl,
753 : max_vectored_read_bytes,
754 : validate_vectored_get,
755 : ephemeral_bytes_per_memory_kb,
756 : walredo_process_kind,
757 : }
758 : CUSTOM LOGIC
759 : {
760 : // TenantConf is handled separately
761 18 : default_tenant_conf: TenantConf::default(),
762 18 : concurrent_tenant_warmup: ConfigurableSemaphore::new({
763 18 : self
764 18 : .concurrent_tenant_warmup
765 18 : .ok_or("concurrent_tenant_warmpup",
766 18 : default.concurrent_tenant_warmup)?
767 : }),
768 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
769 18 : self
770 18 : .concurrent_tenant_size_logical_size_queries
771 18 : .ok_or("concurrent_tenant_size_logical_size_queries",
772 18 : default.concurrent_tenant_size_logical_size_queries.clone())?
773 : ),
774 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
775 : // re-use `concurrent_tenant_size_logical_size_queries`
776 18 : self
777 18 : .concurrent_tenant_size_logical_size_queries
778 18 : .ok_or("eviction_task_immitated_concurrent_logical_size_queries",
779 18 : default.concurrent_tenant_size_logical_size_queries.clone())?,
780 : ),
781 18 : virtual_file_io_engine: match self.virtual_file_io_engine {
782 0 : BuilderValue::Set(v) => v,
783 18 : BuilderValue::NotSet => match crate::virtual_file::io_engine_feature_test().context("auto-detect virtual_file_io_engine")? {
784 18 : io_engine::FeatureTestResult::PlatformPreferred(v) => v, // make no noise
785 0 : io_engine::FeatureTestResult::Worse { engine, remark } => {
786 0 : // TODO: bubble this up to the caller so we can tracing::warn! it.
787 0 : eprintln!("auto-detected IO engine is not platform-preferred: engine={engine:?} remark={remark:?}");
788 0 : engine
789 : }
790 : },
791 : },
792 : }
793 : ))
794 18 : }
795 : }
796 :
797 : impl PageServerConf {
798 : //
799 : // Repository paths, relative to workdir.
800 : //
801 :
802 4170 : pub fn tenants_path(&self) -> Utf8PathBuf {
803 4170 : self.workdir.join(TENANTS_SEGMENT_NAME)
804 4170 : }
805 :
806 72 : pub fn deletion_prefix(&self) -> Utf8PathBuf {
807 72 : self.workdir.join("deletion")
808 72 : }
809 :
810 0 : pub fn metadata_path(&self) -> Utf8PathBuf {
811 0 : self.workdir.join("metadata.json")
812 0 : }
813 :
814 28 : pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
815 28 : // Encode a version in the filename, so that if we ever switch away from JSON we can
816 28 : // increment this.
817 28 : const VERSION: u8 = 1;
818 28 :
819 28 : self.deletion_prefix()
820 28 : .join(format!("{sequence:016x}-{VERSION:02x}.list"))
821 28 : }
822 :
823 24 : pub fn deletion_header_path(&self) -> Utf8PathBuf {
824 24 : // Encode a version in the filename, so that if we ever switch away from JSON we can
825 24 : // increment this.
826 24 : const VERSION: u8 = 1;
827 24 :
828 24 : self.deletion_prefix().join(format!("header-{VERSION:02x}"))
829 24 : }
830 :
831 4170 : pub fn tenant_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
832 4170 : self.tenants_path().join(tenant_shard_id.to_string())
833 4170 : }
834 :
835 0 : pub fn tenant_ignore_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
836 0 : self.tenant_path(tenant_shard_id)
837 0 : .join(IGNORED_TENANT_FILE_NAME)
838 0 : }
839 :
840 : /// Points to a place in pageserver's local directory,
841 : /// where certain tenant's tenantconf file should be located.
842 : ///
843 : /// Legacy: superseded by tenant_location_config_path. Eventually
844 : /// remove this function.
845 0 : pub fn tenant_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
846 0 : self.tenant_path(tenant_shard_id).join(TENANT_CONFIG_NAME)
847 0 : }
848 :
849 0 : pub fn tenant_location_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
850 0 : self.tenant_path(tenant_shard_id)
851 0 : .join(TENANT_LOCATION_CONFIG_NAME)
852 0 : }
853 :
854 0 : pub(crate) fn tenant_heatmap_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
855 0 : self.tenant_path(tenant_shard_id)
856 0 : .join(TENANT_HEATMAP_BASENAME)
857 0 : }
858 :
859 4058 : pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
860 4058 : self.tenant_path(tenant_shard_id)
861 4058 : .join(TIMELINES_SEGMENT_NAME)
862 4058 : }
863 :
864 3838 : pub fn timeline_path(
865 3838 : &self,
866 3838 : tenant_shard_id: &TenantShardId,
867 3838 : timeline_id: &TimelineId,
868 3838 : ) -> Utf8PathBuf {
869 3838 : self.timelines_path(tenant_shard_id)
870 3838 : .join(timeline_id.to_string())
871 3838 : }
872 :
873 0 : pub(crate) fn timeline_delete_mark_file_path(
874 0 : &self,
875 0 : tenant_shard_id: TenantShardId,
876 0 : timeline_id: TimelineId,
877 0 : ) -> Utf8PathBuf {
878 0 : path_with_suffix_extension(
879 0 : self.timeline_path(&tenant_shard_id, &timeline_id),
880 0 : TIMELINE_DELETE_MARK_SUFFIX,
881 0 : )
882 0 : }
883 :
884 0 : pub(crate) fn tenant_deleted_mark_file_path(
885 0 : &self,
886 0 : tenant_shard_id: &TenantShardId,
887 0 : ) -> Utf8PathBuf {
888 0 : self.tenant_path(tenant_shard_id)
889 0 : .join(TENANT_DELETED_MARKER_FILE_NAME)
890 0 : }
891 :
892 0 : pub fn traces_path(&self) -> Utf8PathBuf {
893 0 : self.workdir.join("traces")
894 0 : }
895 :
896 0 : pub fn trace_path(
897 0 : &self,
898 0 : tenant_shard_id: &TenantShardId,
899 0 : timeline_id: &TimelineId,
900 0 : connection_id: &ConnectionId,
901 0 : ) -> Utf8PathBuf {
902 0 : self.traces_path()
903 0 : .join(tenant_shard_id.to_string())
904 0 : .join(timeline_id.to_string())
905 0 : .join(connection_id.to_string())
906 0 : }
907 :
908 : /// Turns storage remote path of a file into its local path.
909 0 : pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
910 0 : remote_path.with_base(&self.workdir)
911 0 : }
912 :
913 : //
914 : // Postgres distribution paths
915 : //
916 16 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
917 16 : let path = self.pg_distrib_dir.clone();
918 16 :
919 16 : #[allow(clippy::manual_range_patterns)]
920 16 : match pg_version {
921 16 : 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
922 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
923 : }
924 16 : }
925 :
926 8 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
927 8 : Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
928 8 : }
929 8 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
930 8 : Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
931 8 : }
932 :
933 : /// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
934 : /// validating the input and failing on errors.
935 : ///
936 : /// This leaves any options not present in the file in the built-in defaults.
937 18 : pub fn parse_and_validate(toml: &Document, workdir: &Utf8Path) -> anyhow::Result<Self> {
938 18 : let mut builder = PageServerConfigBuilder::default();
939 18 : builder.workdir(workdir.to_owned());
940 18 :
941 18 : let mut t_conf = TenantConfOpt::default();
942 :
943 230 : for (key, item) in toml.iter() {
944 230 : match key {
945 230 : "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
946 218 : "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
947 206 : "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
948 206 : "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
949 194 : "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
950 182 : "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
951 170 : "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
952 158 : "max_file_descriptors" => {
953 12 : builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
954 : }
955 146 : "pg_distrib_dir" => {
956 18 : builder.pg_distrib_dir(Utf8PathBuf::from(parse_toml_string(key, item)?))
957 : }
958 128 : "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
959 0 : Utf8PathBuf::from(parse_toml_string(key, item)?),
960 : )),
961 128 : "http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
962 128 : "pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
963 128 : "remote_storage" => {
964 8 : builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
965 : }
966 120 : "tenant_config" => {
967 6 : t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
968 : }
969 114 : "id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
970 96 : "broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
971 82 : "broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
972 82 : "log_format" => builder.log_format(
973 12 : LogFormat::from_config(&parse_toml_string(key, item)?)?
974 : ),
975 70 : "concurrent_tenant_warmup" => builder.concurrent_tenant_warmup({
976 0 : let input = parse_toml_string(key, item)?;
977 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
978 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
979 : }),
980 70 : "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
981 0 : let input = parse_toml_string(key, item)?;
982 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
983 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
984 : }),
985 70 : "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
986 54 : "cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
987 42 : "metric_collection_endpoint" => {
988 16 : let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
989 16 : builder.metric_collection_endpoint(Some(endpoint));
990 : },
991 26 : "metric_collection_bucket" => {
992 0 : builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
993 : }
994 26 : "synthetic_size_calculation_interval" =>
995 12 : builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
996 14 : "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
997 14 : "disk_usage_based_eviction" => {
998 2 : tracing::info!("disk_usage_based_eviction: {:#?}", &item);
999 2 : builder.disk_usage_based_eviction(
1000 2 : deserialize_from_item("disk_usage_based_eviction", item)
1001 2 : .context("parse disk_usage_based_eviction")?
1002 : )
1003 : },
1004 12 : "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
1005 12 : "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
1006 0 : "control_plane_api" => {
1007 0 : let parsed = parse_toml_string(key, item)?;
1008 0 : if parsed.is_empty() {
1009 0 : builder.control_plane_api(None)
1010 : } else {
1011 0 : builder.control_plane_api(Some(parsed.parse().context("failed to parse control plane URL")?))
1012 : }
1013 : },
1014 0 : "control_plane_api_token" => {
1015 0 : let parsed = parse_toml_string(key, item)?;
1016 0 : if parsed.is_empty() {
1017 0 : builder.control_plane_api_token(None)
1018 : } else {
1019 0 : builder.control_plane_api_token(Some(parsed.into()))
1020 : }
1021 : },
1022 0 : "control_plane_emergency_mode" => {
1023 0 : builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
1024 : },
1025 0 : "heatmap_upload_concurrency" => {
1026 0 : builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
1027 : },
1028 0 : "secondary_download_concurrency" => {
1029 0 : builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
1030 : },
1031 0 : "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
1032 0 : "virtual_file_io_engine" => {
1033 0 : builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
1034 : }
1035 0 : "get_vectored_impl" => {
1036 0 : builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
1037 : }
1038 0 : "max_vectored_read_bytes" => {
1039 0 : let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
1040 0 : builder.get_max_vectored_read_bytes(
1041 0 : MaxVectoredReadBytes(
1042 0 : NonZeroUsize::new(bytes).expect("Max byte size of vectored read must be greater than 0")))
1043 : }
1044 0 : "validate_vectored_get" => {
1045 0 : builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
1046 : }
1047 0 : "ephemeral_bytes_per_memory_kb" => {
1048 0 : builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
1049 : }
1050 0 : "walredo_process_kind" => {
1051 0 : builder.get_walredo_process_kind(parse_toml_from_str("walredo_process_kind", item)?)
1052 : }
1053 0 : _ => bail!("unrecognized pageserver option '{key}'"),
1054 : }
1055 : }
1056 :
1057 18 : let mut conf = builder.build().context("invalid config")?;
1058 :
1059 18 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
1060 0 : let auth_validation_public_key_path = conf
1061 0 : .auth_validation_public_key_path
1062 0 : .get_or_insert_with(|| workdir.join("auth_public_key.pem"));
1063 0 : ensure!(
1064 0 : auth_validation_public_key_path.exists(),
1065 0 : format!(
1066 0 : "Can't find auth_validation_public_key at '{auth_validation_public_key_path}'",
1067 0 : )
1068 : );
1069 18 : }
1070 :
1071 18 : conf.default_tenant_conf = t_conf.merge(TenantConf::default());
1072 18 :
1073 18 : Ok(conf)
1074 18 : }
1075 :
1076 : #[cfg(test)]
1077 120 : pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
1078 120 : let test_output_dir = std::env::var("TEST_OUTPUT").unwrap_or("../tmp_check".into());
1079 120 : Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}"))
1080 120 : }
1081 :
1082 116 : pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
1083 116 : let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
1084 116 :
1085 116 : PageServerConf {
1086 116 : id: NodeId(0),
1087 116 : wait_lsn_timeout: Duration::from_secs(60),
1088 116 : wal_redo_timeout: Duration::from_secs(60),
1089 116 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1090 116 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1091 116 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1092 116 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1093 116 : availability_zone: None,
1094 116 : superuser: "cloud_admin".to_string(),
1095 116 : workdir: repo_dir,
1096 116 : pg_distrib_dir,
1097 116 : http_auth_type: AuthType::Trust,
1098 116 : pg_auth_type: AuthType::Trust,
1099 116 : auth_validation_public_key_path: None,
1100 116 : remote_storage_config: None,
1101 116 : default_tenant_conf: TenantConf::default(),
1102 116 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1103 116 : broker_keepalive_interval: Duration::from_secs(5000),
1104 116 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1105 116 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1106 116 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
1107 116 : .expect("Invalid default constant"),
1108 116 : ),
1109 116 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1110 116 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
1111 116 : ),
1112 116 : metric_collection_interval: Duration::from_secs(60),
1113 116 : cached_metric_collection_interval: Duration::from_secs(60 * 60),
1114 116 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1115 116 : metric_collection_bucket: None,
1116 116 : synthetic_size_calculation_interval: Duration::from_secs(60),
1117 116 : disk_usage_based_eviction: None,
1118 116 : test_remote_failures: 0,
1119 116 : ondemand_download_behavior_treat_error_as_warn: false,
1120 116 : background_task_maximum_delay: Duration::ZERO,
1121 116 : control_plane_api: None,
1122 116 : control_plane_api_token: None,
1123 116 : control_plane_emergency_mode: false,
1124 116 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1125 116 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1126 116 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1127 116 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1128 116 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1129 116 : max_vectored_read_bytes: MaxVectoredReadBytes(
1130 116 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1131 116 : .expect("Invalid default constant"),
1132 116 : ),
1133 116 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1134 116 : ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
1135 116 : walredo_process_kind: defaults::DEFAULT_WALREDO_PROCESS_KIND.parse().unwrap(),
1136 116 : }
1137 116 : }
1138 : }
1139 :
1140 : // Helper functions to parse a toml Item
1141 :
1142 96 : fn parse_toml_string(name: &str, item: &Item) -> Result<String> {
1143 96 : let s = item
1144 96 : .as_str()
1145 96 : .with_context(|| format!("configure option {name} is not a string"))?;
1146 96 : Ok(s.to_string())
1147 96 : }
1148 :
1149 42 : fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
1150 : // A toml integer is signed, so it cannot represent the full range of an u64. That's OK
1151 : // for our use, though.
1152 42 : let i: i64 = item
1153 42 : .as_integer()
1154 42 : .with_context(|| format!("configure option {name} is not an integer"))?;
1155 42 : if i < 0 {
1156 0 : bail!("configure option {name} cannot be negative");
1157 42 : }
1158 42 : Ok(i as u64)
1159 42 : }
1160 :
1161 0 : fn parse_toml_bool(name: &str, item: &Item) -> Result<bool> {
1162 0 : item.as_bool()
1163 0 : .with_context(|| format!("configure option {name} is not a bool"))
1164 0 : }
1165 :
1166 76 : fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
1167 76 : let s = item
1168 76 : .as_str()
1169 76 : .with_context(|| format!("configure option {name} is not a string"))?;
1170 :
1171 76 : Ok(humantime::parse_duration(s)?)
1172 76 : }
1173 :
1174 0 : fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
1175 0 : where
1176 0 : T: FromStr,
1177 0 : <T as FromStr>::Err: std::fmt::Display,
1178 0 : {
1179 0 : let v = item
1180 0 : .as_str()
1181 0 : .with_context(|| format!("configure option {name} is not a string"))?;
1182 0 : T::from_str(v).map_err(|e| {
1183 0 : anyhow!(
1184 0 : "Failed to parse string as {parse_type} for configure option {name}: {e}",
1185 0 : parse_type = stringify!(T)
1186 0 : )
1187 0 : })
1188 0 : }
1189 :
1190 2 : fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
1191 2 : where
1192 2 : T: serde::de::DeserializeOwned,
1193 2 : {
1194 : // ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
1195 2 : let deserializer = match item.clone().into_value() {
1196 2 : Ok(value) => value.into_deserializer(),
1197 0 : Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
1198 : };
1199 2 : T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
1200 2 : }
1201 :
1202 : /// Configurable semaphore permits setting.
1203 : ///
1204 : /// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
1205 : /// semaphore cannot be distinguished, leading any feature using these to await forever (or until
1206 : /// new permits are added).
1207 : #[derive(Debug, Clone)]
1208 : pub struct ConfigurableSemaphore {
1209 : initial_permits: NonZeroUsize,
1210 : inner: std::sync::Arc<tokio::sync::Semaphore>,
1211 : }
1212 :
1213 : impl ConfigurableSemaphore {
1214 : pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
1215 : Some(x) => x,
1216 : None => panic!("const unwrap is not yet stable"),
1217 : };
1218 :
1219 : /// Initializse using a non-zero amount of permits.
1220 : ///
1221 : /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
1222 : /// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
1223 : /// behave like [`futures::future::pending`], just waiting until new permits are added.
1224 : ///
1225 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
1226 414 : pub fn new(initial_permits: NonZeroUsize) -> Self {
1227 414 : ConfigurableSemaphore {
1228 414 : initial_permits,
1229 414 : inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
1230 414 : }
1231 414 : }
1232 :
1233 : /// Returns the configured amount of permits.
1234 0 : pub fn initial_permits(&self) -> NonZeroUsize {
1235 0 : self.initial_permits
1236 0 : }
1237 : }
1238 :
1239 : impl Default for ConfigurableSemaphore {
1240 240 : fn default() -> Self {
1241 240 : Self::new(Self::DEFAULT_INITIAL)
1242 240 : }
1243 : }
1244 :
1245 : impl PartialEq for ConfigurableSemaphore {
1246 12 : fn eq(&self, other: &Self) -> bool {
1247 12 : // the number of permits can be increased at runtime, so we cannot really fulfill the
1248 12 : // PartialEq value equality otherwise
1249 12 : self.initial_permits == other.initial_permits
1250 12 : }
1251 : }
1252 :
1253 : impl Eq for ConfigurableSemaphore {}
1254 :
1255 : impl ConfigurableSemaphore {
1256 0 : pub fn inner(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
1257 0 : &self.inner
1258 0 : }
1259 : }
1260 :
1261 : #[cfg(test)]
1262 : mod tests {
1263 : use std::{fs, num::NonZeroU32};
1264 :
1265 : use camino_tempfile::{tempdir, Utf8TempDir};
1266 : use pageserver_api::models::EvictionPolicy;
1267 : use remote_storage::{RemoteStorageKind, S3Config};
1268 : use utils::serde_percent::Percent;
1269 :
1270 : use super::*;
1271 : use crate::DEFAULT_PG_VERSION;
1272 :
1273 : const ALL_BASE_VALUES_TOML: &str = r#"
1274 : # Initial configuration file created by 'pageserver --init'
1275 :
1276 : listen_pg_addr = '127.0.0.1:64000'
1277 : listen_http_addr = '127.0.0.1:9898'
1278 :
1279 : wait_lsn_timeout = '111 s'
1280 : wal_redo_timeout = '111 s'
1281 :
1282 : page_cache_size = 444
1283 : max_file_descriptors = 333
1284 :
1285 : # initial superuser role name to use when creating a new tenant
1286 : initial_superuser_name = 'zzzz'
1287 : id = 10
1288 :
1289 : metric_collection_interval = '222 s'
1290 : cached_metric_collection_interval = '22200 s'
1291 : metric_collection_endpoint = 'http://localhost:80/metrics'
1292 : synthetic_size_calculation_interval = '333 s'
1293 :
1294 : log_format = 'json'
1295 : background_task_maximum_delay = '334 s'
1296 :
1297 : "#;
1298 :
1299 : #[test]
1300 2 : fn parse_defaults() -> anyhow::Result<()> {
1301 2 : let tempdir = tempdir()?;
1302 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1303 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1304 2 : // we have to create dummy values to overcome the validation errors
1305 2 : let config_string = format!(
1306 2 : "pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
1307 2 : );
1308 2 : let toml = config_string.parse()?;
1309 :
1310 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1311 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1312 2 :
1313 2 : assert_eq!(
1314 2 : parsed_config,
1315 2 : PageServerConf {
1316 2 : id: NodeId(10),
1317 2 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1318 2 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1319 2 : availability_zone: None,
1320 2 : wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
1321 2 : wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
1322 2 : superuser: defaults::DEFAULT_SUPERUSER.to_string(),
1323 2 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1324 2 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1325 2 : workdir,
1326 2 : pg_distrib_dir,
1327 2 : http_auth_type: AuthType::Trust,
1328 2 : pg_auth_type: AuthType::Trust,
1329 2 : auth_validation_public_key_path: None,
1330 2 : remote_storage_config: None,
1331 2 : default_tenant_conf: TenantConf::default(),
1332 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1333 2 : broker_keepalive_interval: humantime::parse_duration(
1334 2 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL
1335 2 : )?,
1336 2 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1337 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1338 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1339 2 : ),
1340 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1341 2 : eviction_task_immitated_concurrent_logical_size_queries:
1342 2 : ConfigurableSemaphore::default(),
1343 2 : metric_collection_interval: humantime::parse_duration(
1344 2 : defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
1345 2 : )?,
1346 2 : cached_metric_collection_interval: humantime::parse_duration(
1347 2 : defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
1348 2 : )?,
1349 2 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1350 2 : metric_collection_bucket: None,
1351 2 : synthetic_size_calculation_interval: humantime::parse_duration(
1352 2 : defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
1353 2 : )?,
1354 2 : disk_usage_based_eviction: None,
1355 2 : test_remote_failures: 0,
1356 2 : ondemand_download_behavior_treat_error_as_warn: false,
1357 2 : background_task_maximum_delay: humantime::parse_duration(
1358 2 : defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
1359 2 : )?,
1360 2 : control_plane_api: None,
1361 2 : control_plane_api_token: None,
1362 2 : control_plane_emergency_mode: false,
1363 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1364 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1365 2 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1366 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1367 2 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1368 2 : max_vectored_read_bytes: MaxVectoredReadBytes(
1369 2 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1370 2 : .expect("Invalid default constant")
1371 2 : ),
1372 2 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1373 2 : ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
1374 2 : walredo_process_kind: defaults::DEFAULT_WALREDO_PROCESS_KIND.parse().unwrap(),
1375 : },
1376 0 : "Correct defaults should be used when no config values are provided"
1377 : );
1378 :
1379 2 : Ok(())
1380 2 : }
1381 :
1382 : #[test]
1383 2 : fn parse_basic_config() -> anyhow::Result<()> {
1384 2 : let tempdir = tempdir()?;
1385 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1386 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1387 2 :
1388 2 : let config_string = format!(
1389 2 : "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",
1390 2 : );
1391 2 : let toml = config_string.parse()?;
1392 :
1393 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1394 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1395 2 :
1396 2 : assert_eq!(
1397 2 : parsed_config,
1398 2 : PageServerConf {
1399 2 : id: NodeId(10),
1400 2 : listen_pg_addr: "127.0.0.1:64000".to_string(),
1401 2 : listen_http_addr: "127.0.0.1:9898".to_string(),
1402 2 : availability_zone: None,
1403 2 : wait_lsn_timeout: Duration::from_secs(111),
1404 2 : wal_redo_timeout: Duration::from_secs(111),
1405 2 : superuser: "zzzz".to_string(),
1406 2 : page_cache_size: 444,
1407 2 : max_file_descriptors: 333,
1408 2 : workdir,
1409 2 : pg_distrib_dir,
1410 2 : http_auth_type: AuthType::Trust,
1411 2 : pg_auth_type: AuthType::Trust,
1412 2 : auth_validation_public_key_path: None,
1413 2 : remote_storage_config: None,
1414 2 : default_tenant_conf: TenantConf::default(),
1415 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1416 2 : broker_keepalive_interval: Duration::from_secs(5),
1417 2 : log_format: LogFormat::Json,
1418 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1419 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1420 2 : ),
1421 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1422 2 : eviction_task_immitated_concurrent_logical_size_queries:
1423 2 : ConfigurableSemaphore::default(),
1424 2 : metric_collection_interval: Duration::from_secs(222),
1425 2 : cached_metric_collection_interval: Duration::from_secs(22200),
1426 2 : metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
1427 2 : metric_collection_bucket: None,
1428 2 : synthetic_size_calculation_interval: Duration::from_secs(333),
1429 2 : disk_usage_based_eviction: None,
1430 2 : test_remote_failures: 0,
1431 2 : ondemand_download_behavior_treat_error_as_warn: false,
1432 2 : background_task_maximum_delay: Duration::from_secs(334),
1433 2 : control_plane_api: None,
1434 2 : control_plane_api_token: None,
1435 2 : control_plane_emergency_mode: false,
1436 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1437 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1438 2 : ingest_batch_size: 100,
1439 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1440 2 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1441 2 : max_vectored_read_bytes: MaxVectoredReadBytes(
1442 2 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1443 2 : .expect("Invalid default constant")
1444 2 : ),
1445 2 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1446 2 : ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
1447 2 : walredo_process_kind: defaults::DEFAULT_WALREDO_PROCESS_KIND.parse().unwrap(),
1448 : },
1449 0 : "Should be able to parse all basic config values correctly"
1450 : );
1451 :
1452 2 : Ok(())
1453 2 : }
1454 :
1455 : #[test]
1456 2 : fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
1457 2 : let tempdir = tempdir()?;
1458 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1459 2 : let broker_endpoint = "http://127.0.0.1:7777";
1460 2 :
1461 2 : let local_storage_path = tempdir.path().join("local_remote_storage");
1462 2 :
1463 2 : let identical_toml_declarations = &[
1464 2 : format!(
1465 2 : r#"[remote_storage]
1466 2 : local_path = '{local_storage_path}'"#,
1467 2 : ),
1468 2 : format!("remote_storage={{local_path='{local_storage_path}'}}"),
1469 2 : ];
1470 :
1471 6 : for remote_storage_config_str in identical_toml_declarations {
1472 4 : let config_string = format!(
1473 4 : r#"{ALL_BASE_VALUES_TOML}
1474 4 : pg_distrib_dir='{pg_distrib_dir}'
1475 4 : broker_endpoint = '{broker_endpoint}'
1476 4 :
1477 4 : {remote_storage_config_str}"#,
1478 4 : );
1479 :
1480 4 : let toml = config_string.parse()?;
1481 :
1482 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1483 4 : .unwrap_or_else(|e| {
1484 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1485 4 : })
1486 4 : .remote_storage_config
1487 4 : .expect("Should have remote storage config for the local FS");
1488 4 :
1489 4 : assert_eq!(
1490 4 : parsed_remote_storage_config,
1491 4 : RemoteStorageConfig {
1492 4 : storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
1493 4 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
1494 4 : },
1495 0 : "Remote storage config should correctly parse the local FS config and fill other storage defaults"
1496 : );
1497 : }
1498 2 : Ok(())
1499 2 : }
1500 :
1501 : #[test]
1502 2 : fn parse_remote_s3_storage_config() -> anyhow::Result<()> {
1503 2 : let tempdir = tempdir()?;
1504 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1505 :
1506 2 : let bucket_name = "some-sample-bucket".to_string();
1507 2 : let bucket_region = "eu-north-1".to_string();
1508 2 : let prefix_in_bucket = "test_prefix".to_string();
1509 2 : let endpoint = "http://localhost:5000".to_string();
1510 2 : let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
1511 2 : let max_sync_errors = NonZeroU32::new(222).unwrap();
1512 2 : let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
1513 2 : let broker_endpoint = "http://127.0.0.1:7777";
1514 2 :
1515 2 : let identical_toml_declarations = &[
1516 2 : format!(
1517 2 : r#"[remote_storage]
1518 2 : max_concurrent_syncs = {max_concurrent_syncs}
1519 2 : max_sync_errors = {max_sync_errors}
1520 2 : bucket_name = '{bucket_name}'
1521 2 : bucket_region = '{bucket_region}'
1522 2 : prefix_in_bucket = '{prefix_in_bucket}'
1523 2 : endpoint = '{endpoint}'
1524 2 : concurrency_limit = {s3_concurrency_limit}"#
1525 2 : ),
1526 2 : format!(
1527 2 : "remote_storage={{max_concurrent_syncs={max_concurrent_syncs}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\
1528 2 : bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}",
1529 2 : ),
1530 2 : ];
1531 :
1532 6 : for remote_storage_config_str in identical_toml_declarations {
1533 4 : let config_string = format!(
1534 4 : r#"{ALL_BASE_VALUES_TOML}
1535 4 : pg_distrib_dir='{pg_distrib_dir}'
1536 4 : broker_endpoint = '{broker_endpoint}'
1537 4 :
1538 4 : {remote_storage_config_str}"#,
1539 4 : );
1540 :
1541 4 : let toml = config_string.parse()?;
1542 :
1543 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1544 4 : .unwrap_or_else(|e| {
1545 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1546 4 : })
1547 4 : .remote_storage_config
1548 4 : .expect("Should have remote storage config for S3");
1549 4 :
1550 4 : assert_eq!(
1551 4 : parsed_remote_storage_config,
1552 4 : RemoteStorageConfig {
1553 4 : storage: RemoteStorageKind::AwsS3(S3Config {
1554 4 : bucket_name: bucket_name.clone(),
1555 4 : bucket_region: bucket_region.clone(),
1556 4 : prefix_in_bucket: Some(prefix_in_bucket.clone()),
1557 4 : endpoint: Some(endpoint.clone()),
1558 4 : concurrency_limit: s3_concurrency_limit,
1559 4 : max_keys_per_list_response: None,
1560 4 : }),
1561 4 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
1562 4 : },
1563 0 : "Remote storage config should correctly parse the S3 config"
1564 : );
1565 : }
1566 2 : Ok(())
1567 2 : }
1568 :
1569 : #[test]
1570 2 : fn parse_tenant_config() -> anyhow::Result<()> {
1571 2 : let tempdir = tempdir()?;
1572 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1573 :
1574 2 : let broker_endpoint = "http://127.0.0.1:7777";
1575 2 : let trace_read_requests = true;
1576 2 :
1577 2 : let config_string = format!(
1578 2 : r#"{ALL_BASE_VALUES_TOML}
1579 2 : pg_distrib_dir='{pg_distrib_dir}'
1580 2 : broker_endpoint = '{broker_endpoint}'
1581 2 :
1582 2 : [tenant_config]
1583 2 : trace_read_requests = {trace_read_requests}"#,
1584 2 : );
1585 :
1586 2 : let toml = config_string.parse()?;
1587 :
1588 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1589 2 : assert_eq!(
1590 : conf.default_tenant_conf.trace_read_requests, trace_read_requests,
1591 0 : "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
1592 : );
1593 :
1594 2 : Ok(())
1595 2 : }
1596 :
1597 : #[test]
1598 2 : fn parse_incorrect_tenant_config() -> anyhow::Result<()> {
1599 2 : let config_string = r#"
1600 2 : [tenant_config]
1601 2 : checkpoint_distance = -1 # supposed to be an u64
1602 2 : "#
1603 2 : .to_string();
1604 :
1605 2 : let toml: Document = config_string.parse()?;
1606 2 : let item = toml.get("tenant_config").unwrap();
1607 2 : let error = TenantConfOpt::try_from(item.to_owned()).unwrap_err();
1608 2 :
1609 2 : let expected_error_str = "checkpoint_distance: invalid value: integer `-1`, expected u64";
1610 2 : assert_eq!(error.to_string(), expected_error_str);
1611 :
1612 2 : Ok(())
1613 2 : }
1614 :
1615 : #[test]
1616 2 : fn parse_override_tenant_config() -> anyhow::Result<()> {
1617 2 : let config_string = r#"tenant_config={ min_resident_size_override = 400 }"#.to_string();
1618 :
1619 2 : let toml: Document = config_string.parse()?;
1620 2 : let item = toml.get("tenant_config").unwrap();
1621 2 : let conf = TenantConfOpt::try_from(item.to_owned()).unwrap();
1622 2 :
1623 2 : assert_eq!(conf.min_resident_size_override, Some(400));
1624 :
1625 2 : Ok(())
1626 2 : }
1627 :
1628 : #[test]
1629 2 : fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
1630 2 : let tempdir = tempdir()?;
1631 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1632 :
1633 2 : let pageserver_conf_toml = format!(
1634 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1635 2 : metric_collection_endpoint = "http://sample.url"
1636 2 : metric_collection_interval = "10min"
1637 2 : id = 222
1638 2 :
1639 2 : [disk_usage_based_eviction]
1640 2 : max_usage_pct = 80
1641 2 : min_avail_bytes = 0
1642 2 : period = "10s"
1643 2 :
1644 2 : [tenant_config]
1645 2 : evictions_low_residence_duration_metric_threshold = "20m"
1646 2 :
1647 2 : [tenant_config.eviction_policy]
1648 2 : kind = "LayerAccessThreshold"
1649 2 : period = "20m"
1650 2 : threshold = "20m"
1651 2 : "#,
1652 2 : );
1653 2 : let toml: Document = pageserver_conf_toml.parse()?;
1654 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1655 :
1656 2 : assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
1657 2 : assert_eq!(
1658 2 : conf.metric_collection_endpoint,
1659 2 : Some("http://sample.url".parse().unwrap())
1660 2 : );
1661 2 : assert_eq!(
1662 2 : conf.metric_collection_interval,
1663 2 : Duration::from_secs(10 * 60)
1664 2 : );
1665 2 : assert_eq!(
1666 2 : conf.default_tenant_conf
1667 2 : .evictions_low_residence_duration_metric_threshold,
1668 2 : Duration::from_secs(20 * 60)
1669 2 : );
1670 2 : assert_eq!(conf.id, NodeId(222));
1671 2 : assert_eq!(
1672 2 : conf.disk_usage_based_eviction,
1673 2 : Some(DiskUsageEvictionTaskConfig {
1674 2 : max_usage_pct: Percent::new(80).unwrap(),
1675 2 : min_avail_bytes: 0,
1676 2 : period: Duration::from_secs(10),
1677 2 : #[cfg(feature = "testing")]
1678 2 : mock_statvfs: None,
1679 2 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder::AbsoluteAccessed,
1680 2 : })
1681 2 : );
1682 :
1683 2 : match &conf.default_tenant_conf.eviction_policy {
1684 2 : EvictionPolicy::LayerAccessThreshold(eviction_threshold) => {
1685 2 : assert_eq!(eviction_threshold.period, Duration::from_secs(20 * 60));
1686 2 : assert_eq!(eviction_threshold.threshold, Duration::from_secs(20 * 60));
1687 : }
1688 0 : other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
1689 : }
1690 :
1691 2 : Ok(())
1692 2 : }
1693 :
1694 : #[test]
1695 2 : fn parse_imitation_only_pageserver_config() {
1696 2 : let tempdir = tempdir().unwrap();
1697 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir).unwrap();
1698 2 :
1699 2 : let pageserver_conf_toml = format!(
1700 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1701 2 : metric_collection_endpoint = "http://sample.url"
1702 2 : metric_collection_interval = "10min"
1703 2 : id = 222
1704 2 :
1705 2 : [tenant_config]
1706 2 : evictions_low_residence_duration_metric_threshold = "20m"
1707 2 :
1708 2 : [tenant_config.eviction_policy]
1709 2 : kind = "OnlyImitiate"
1710 2 : period = "20m"
1711 2 : threshold = "20m"
1712 2 : "#,
1713 2 : );
1714 2 : let toml: Document = pageserver_conf_toml.parse().unwrap();
1715 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir).unwrap();
1716 2 :
1717 2 : match &conf.default_tenant_conf.eviction_policy {
1718 2 : EvictionPolicy::OnlyImitiate(t) => {
1719 2 : assert_eq!(t.period, Duration::from_secs(20 * 60));
1720 2 : assert_eq!(t.threshold, Duration::from_secs(20 * 60));
1721 : }
1722 0 : other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
1723 : }
1724 2 : }
1725 :
1726 14 : fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
1727 14 : let tempdir_path = tempdir.path();
1728 14 :
1729 14 : let workdir = tempdir_path.join("workdir");
1730 14 : fs::create_dir_all(&workdir)?;
1731 :
1732 14 : let pg_distrib_dir = tempdir_path.join("pg_distrib");
1733 14 : let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
1734 14 : fs::create_dir_all(&pg_distrib_dir_versioned)?;
1735 14 : let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
1736 14 : fs::create_dir_all(&postgres_bin_dir)?;
1737 14 : fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;
1738 :
1739 14 : Ok((workdir, pg_distrib_dir))
1740 14 : }
1741 : }
|