Line data Source code
1 : //! Functions for handling page server configuration options
2 : //!
3 : //! Configuration options can be set in the pageserver.toml configuration
4 : //! file, or on the command line.
5 : //! See also `settings.md` for better description on every parameter.
6 :
7 : use anyhow::{anyhow, bail, ensure, Context, Result};
8 : use pageserver_api::shard::TenantShardId;
9 : use remote_storage::{RemotePath, RemoteStorageConfig};
10 : use serde;
11 : use serde::de::IntoDeserializer;
12 : use std::env;
13 : use storage_broker::Uri;
14 : use utils::crashsafe::path_with_suffix_extension;
15 : use utils::id::ConnectionId;
16 : use utils::logging::SecretString;
17 :
18 : use once_cell::sync::OnceCell;
19 : use reqwest::Url;
20 : use std::num::NonZeroUsize;
21 : use std::str::FromStr;
22 : use std::sync::Arc;
23 : use std::time::Duration;
24 : use toml_edit::{Document, Item};
25 :
26 : use camino::{Utf8Path, Utf8PathBuf};
27 : use postgres_backend::AuthType;
28 : use utils::{
29 : id::{NodeId, TimelineId},
30 : logging::LogFormat,
31 : };
32 :
33 : use crate::tenant::timeline::GetVectoredImpl;
34 : use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
35 : use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
36 : use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
37 : use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
38 : use crate::{tenant::config::TenantConf, virtual_file};
39 : use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX};
40 :
41 : use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
42 :
43 : use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
44 :
45 : pub mod defaults {
46 : use crate::tenant::config::defaults::*;
47 : use const_format::formatcp;
48 :
49 : pub use pageserver_api::config::{
50 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
51 : DEFAULT_PG_LISTEN_PORT,
52 : };
53 : pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
54 :
55 : pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
56 : pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
57 :
58 : pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
59 :
60 : pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
61 : pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
62 :
63 : pub const DEFAULT_LOG_FORMAT: &str = "plain";
64 :
65 : pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
66 :
67 : pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
68 : super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
69 :
70 : pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
71 : pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
72 : pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
73 : pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
74 : pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
75 :
76 : pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
77 : pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
78 :
79 : pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
80 :
81 : #[cfg(target_os = "linux")]
82 : pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "tokio-epoll-uring";
83 :
84 : #[cfg(not(target_os = "linux"))]
85 : pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
86 :
87 : pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
88 :
89 : pub const DEFAULT_GET_IMPL: &str = "legacy";
90 :
91 : pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
92 :
93 : pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
94 :
95 : pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
96 :
97 : ///
98 : /// Default built-in configuration file.
99 : ///
100 : pub const DEFAULT_CONFIG_FILE: &str = formatcp!(
101 : r#"
102 : # Initial configuration file created by 'pageserver --init'
103 : #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
104 : #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
105 :
106 : #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
107 : #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
108 :
109 : #page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
110 : #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
111 :
112 : # initial superuser role name to use when creating a new tenant
113 : #initial_superuser_name = '{DEFAULT_SUPERUSER}'
114 :
115 : #broker_endpoint = '{BROKER_DEFAULT_ENDPOINT}'
116 :
117 : #log_format = '{DEFAULT_LOG_FORMAT}'
118 :
119 : #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
120 : #concurrent_tenant_warmup = '{DEFAULT_CONCURRENT_TENANT_WARMUP}'
121 :
122 : #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
123 : #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
124 : #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
125 :
126 : #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
127 :
128 : #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
129 :
130 : #ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
131 :
132 : #virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
133 :
134 : #get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
135 :
136 : #get_impl = '{DEFAULT_GET_IMPL}'
137 :
138 : #max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
139 :
140 : #validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
141 :
142 : [tenant_config]
143 : #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
144 : #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
145 : #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
146 : #compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
147 : #compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
148 :
149 : #gc_period = '{DEFAULT_GC_PERIOD}'
150 : #gc_horizon = {DEFAULT_GC_HORIZON}
151 : #image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD}
152 : #pitr_interval = '{DEFAULT_PITR_INTERVAL}'
153 :
154 : #min_resident_size_override = .. # in bytes
155 : #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
156 :
157 : #heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
158 : #secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
159 :
160 : #ephemeral_bytes_per_memory_kb = {DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB}
161 :
162 : [remote_storage]
163 :
164 : "#
165 : );
166 : }
167 :
168 : #[derive(Debug, Clone, PartialEq, Eq)]
169 : pub struct PageServerConf {
170 : // Identifier of that particular pageserver so e g safekeepers
171 : // can safely distinguish different pageservers
172 : pub id: NodeId,
173 :
174 : /// Example (default): 127.0.0.1:64000
175 : pub listen_pg_addr: String,
176 : /// Example (default): 127.0.0.1:9898
177 : pub listen_http_addr: String,
178 :
179 : /// Current availability zone. Used for traffic metrics.
180 : pub availability_zone: Option<String>,
181 :
182 : // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
183 : pub wait_lsn_timeout: Duration,
184 : // How long to wait for WAL redo to complete.
185 : pub wal_redo_timeout: Duration,
186 :
187 : pub superuser: String,
188 :
189 : pub page_cache_size: usize,
190 : pub max_file_descriptors: usize,
191 :
192 : // Repository directory, relative to current working directory.
193 : // Normally, the page server changes the current working directory
194 : // to the repository, and 'workdir' is always '.'. But we don't do
195 : // that during unit testing, because the current directory is global
196 : // to the process but different unit tests work on different
197 : // repositories.
198 : pub workdir: Utf8PathBuf,
199 :
200 : pub pg_distrib_dir: Utf8PathBuf,
201 :
202 : // Authentication
203 : /// authentication method for the HTTP mgmt API
204 : pub http_auth_type: AuthType,
205 : /// authentication method for libpq connections from compute
206 : pub pg_auth_type: AuthType,
207 : /// Path to a file or directory containing public key(s) for verifying JWT tokens.
208 : /// Used for both mgmt and compute auth, if enabled.
209 : pub auth_validation_public_key_path: Option<Utf8PathBuf>,
210 :
211 : pub remote_storage_config: Option<RemoteStorageConfig>,
212 :
213 : pub default_tenant_conf: TenantConf,
214 :
215 : /// Storage broker endpoints to connect to.
216 : pub broker_endpoint: Uri,
217 : pub broker_keepalive_interval: Duration,
218 :
219 : pub log_format: LogFormat,
220 :
221 : /// Number of tenants which will be concurrently loaded from remote storage proactively on startup or attach.
222 : ///
223 : /// A lower value implicitly deprioritizes loading such tenants, vs. other work in the system.
224 : pub concurrent_tenant_warmup: ConfigurableSemaphore,
225 :
226 : /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
227 : pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
228 : /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
229 : /// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
230 : /// See the comment in `eviction_task` for details.
231 : ///
232 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
233 : pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
234 :
235 : // How often to collect metrics and send them to the metrics endpoint.
236 : pub metric_collection_interval: Duration,
237 : // How often to send unchanged cached metrics to the metrics endpoint.
238 : pub cached_metric_collection_interval: Duration,
239 : pub metric_collection_endpoint: Option<Url>,
240 : pub metric_collection_bucket: Option<RemoteStorageConfig>,
241 : pub synthetic_size_calculation_interval: Duration,
242 :
243 : pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
244 :
245 : pub test_remote_failures: u64,
246 :
247 : pub ondemand_download_behavior_treat_error_as_warn: bool,
248 :
249 : /// How long will background tasks be delayed at most after initial load of tenants.
250 : ///
251 : /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
252 : /// as we now isolate initial loading, initial logical size calculation and background tasks.
253 : /// Smaller nodes will have background tasks "not running" for this long unless every timeline
254 : /// has it's initial logical size calculated. Not running background tasks for some seconds is
255 : /// not terrible.
256 : pub background_task_maximum_delay: Duration,
257 :
258 : pub control_plane_api: Option<Url>,
259 :
260 : /// JWT token for use with the control plane API.
261 : pub control_plane_api_token: Option<SecretString>,
262 :
263 : /// If true, pageserver will make best-effort to operate without a control plane: only
264 : /// for use in major incidents.
265 : pub control_plane_emergency_mode: bool,
266 :
267 : /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
268 : /// heatmap uploads vs. other remote storage operations.
269 : pub heatmap_upload_concurrency: usize,
270 :
271 : /// How many remote storage downloads may be done for secondary tenants concurrently. Implicitly
272 : /// deprioritises secondary downloads vs. remote storage operations for attached tenants.
273 : pub secondary_download_concurrency: usize,
274 :
275 : /// Maximum number of WAL records to be ingested and committed at the same time
276 : pub ingest_batch_size: u64,
277 :
278 : pub virtual_file_io_engine: virtual_file::IoEngineKind,
279 :
280 : pub get_vectored_impl: GetVectoredImpl,
281 :
282 : pub get_impl: GetImpl,
283 :
284 : pub max_vectored_read_bytes: MaxVectoredReadBytes,
285 :
286 : pub validate_vectored_get: bool,
287 :
288 : /// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
289 : /// is exceeded, we start proactively closing ephemeral layers to limit the total amount
290 : /// of ephemeral data.
291 : ///
292 : /// Setting this to zero disables limits on total ephemeral layer size.
293 : pub ephemeral_bytes_per_memory_kb: usize,
294 : }
295 :
296 : /// We do not want to store this in a PageServerConf because the latter may be logged
297 : /// and/or serialized at a whim, while the token is secret. Currently this token is the
298 : /// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
299 : /// the future, more tokens and auth may arrive for storage broker, completely changing the logic.
300 : /// Hence, we resort to a global variable for now instead of passing the token from the
301 : /// startup code to the connection code through a dozen layers.
302 : pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
303 :
304 : // use dedicated enum for builder to better indicate the intention
305 : // and avoid possible confusion with nested options
306 : #[derive(Clone, Default)]
307 : pub enum BuilderValue<T> {
308 : Set(T),
309 : #[default]
310 : NotSet,
311 : }
312 :
313 : impl<T: Clone> BuilderValue<T> {
314 738 : pub fn ok_or(&self, field_name: &'static str, default: BuilderValue<T>) -> anyhow::Result<T> {
315 738 : match self {
316 242 : Self::Set(v) => Ok(v.clone()),
317 496 : Self::NotSet => match default {
318 496 : BuilderValue::Set(v) => Ok(v.clone()),
319 : BuilderValue::NotSet => {
320 0 : anyhow::bail!("missing config value {field_name:?}")
321 : }
322 : },
323 : }
324 738 : }
325 : }
326 :
327 : // needed to simplify config construction
328 : #[derive(Default)]
329 : struct PageServerConfigBuilder {
330 : listen_pg_addr: BuilderValue<String>,
331 :
332 : listen_http_addr: BuilderValue<String>,
333 :
334 : availability_zone: BuilderValue<Option<String>>,
335 :
336 : wait_lsn_timeout: BuilderValue<Duration>,
337 : wal_redo_timeout: BuilderValue<Duration>,
338 :
339 : superuser: BuilderValue<String>,
340 :
341 : page_cache_size: BuilderValue<usize>,
342 : max_file_descriptors: BuilderValue<usize>,
343 :
344 : workdir: BuilderValue<Utf8PathBuf>,
345 :
346 : pg_distrib_dir: BuilderValue<Utf8PathBuf>,
347 :
348 : http_auth_type: BuilderValue<AuthType>,
349 : pg_auth_type: BuilderValue<AuthType>,
350 :
351 : //
352 : auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
353 : remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
354 :
355 : id: BuilderValue<NodeId>,
356 :
357 : broker_endpoint: BuilderValue<Uri>,
358 : broker_keepalive_interval: BuilderValue<Duration>,
359 :
360 : log_format: BuilderValue<LogFormat>,
361 :
362 : concurrent_tenant_warmup: BuilderValue<NonZeroUsize>,
363 : concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
364 :
365 : metric_collection_interval: BuilderValue<Duration>,
366 : cached_metric_collection_interval: BuilderValue<Duration>,
367 : metric_collection_endpoint: BuilderValue<Option<Url>>,
368 : synthetic_size_calculation_interval: BuilderValue<Duration>,
369 : metric_collection_bucket: BuilderValue<Option<RemoteStorageConfig>>,
370 :
371 : disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
372 :
373 : test_remote_failures: BuilderValue<u64>,
374 :
375 : ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
376 :
377 : background_task_maximum_delay: BuilderValue<Duration>,
378 :
379 : control_plane_api: BuilderValue<Option<Url>>,
380 : control_plane_api_token: BuilderValue<Option<SecretString>>,
381 : control_plane_emergency_mode: BuilderValue<bool>,
382 :
383 : heatmap_upload_concurrency: BuilderValue<usize>,
384 : secondary_download_concurrency: BuilderValue<usize>,
385 :
386 : ingest_batch_size: BuilderValue<u64>,
387 :
388 : virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
389 :
390 : get_vectored_impl: BuilderValue<GetVectoredImpl>,
391 :
392 : get_impl: BuilderValue<GetImpl>,
393 :
394 : max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
395 :
396 : validate_vectored_get: BuilderValue<bool>,
397 :
398 : ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
399 : }
400 :
401 : impl PageServerConfigBuilder {
402 : #[inline(always)]
403 18 : fn default_values() -> Self {
404 18 : use self::BuilderValue::*;
405 18 : use defaults::*;
406 18 : Self {
407 18 : listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
408 18 : listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
409 18 : availability_zone: Set(None),
410 18 : wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
411 18 : .expect("cannot parse default wait lsn timeout")),
412 18 : wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
413 18 : .expect("cannot parse default wal redo timeout")),
414 18 : superuser: Set(DEFAULT_SUPERUSER.to_string()),
415 18 : page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
416 18 : max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
417 18 : workdir: Set(Utf8PathBuf::new()),
418 18 : pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
419 18 : env::current_dir().expect("cannot access current directory"),
420 18 : )
421 18 : .expect("non-Unicode path")
422 18 : .join("pg_install")),
423 18 : http_auth_type: Set(AuthType::Trust),
424 18 : pg_auth_type: Set(AuthType::Trust),
425 18 : auth_validation_public_key_path: Set(None),
426 18 : remote_storage_config: Set(None),
427 18 : id: NotSet,
428 18 : broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
429 18 : .parse()
430 18 : .expect("failed to parse default broker endpoint")),
431 18 : broker_keepalive_interval: Set(humantime::parse_duration(
432 18 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
433 18 : )
434 18 : .expect("cannot parse default keepalive interval")),
435 18 : log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
436 18 :
437 18 : concurrent_tenant_warmup: Set(NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
438 18 : .expect("Invalid default constant")),
439 18 : concurrent_tenant_size_logical_size_queries: Set(
440 18 : ConfigurableSemaphore::DEFAULT_INITIAL,
441 18 : ),
442 18 : metric_collection_interval: Set(humantime::parse_duration(
443 18 : DEFAULT_METRIC_COLLECTION_INTERVAL,
444 18 : )
445 18 : .expect("cannot parse default metric collection interval")),
446 18 : cached_metric_collection_interval: Set(humantime::parse_duration(
447 18 : DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
448 18 : )
449 18 : .expect("cannot parse default cached_metric_collection_interval")),
450 18 : synthetic_size_calculation_interval: Set(humantime::parse_duration(
451 18 : DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
452 18 : )
453 18 : .expect("cannot parse default synthetic size calculation interval")),
454 18 : metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
455 18 :
456 18 : metric_collection_bucket: Set(None),
457 18 :
458 18 : disk_usage_based_eviction: Set(None),
459 18 :
460 18 : test_remote_failures: Set(0),
461 18 :
462 18 : ondemand_download_behavior_treat_error_as_warn: Set(false),
463 18 :
464 18 : background_task_maximum_delay: Set(humantime::parse_duration(
465 18 : DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
466 18 : )
467 18 : .unwrap()),
468 18 :
469 18 : control_plane_api: Set(None),
470 18 : control_plane_api_token: Set(None),
471 18 : control_plane_emergency_mode: Set(false),
472 18 :
473 18 : heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
474 18 : secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
475 18 :
476 18 : ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
477 18 :
478 18 : virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
479 18 :
480 18 : get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
481 18 : get_impl: Set(DEFAULT_GET_IMPL.parse().unwrap()),
482 18 : max_vectored_read_bytes: Set(MaxVectoredReadBytes(
483 18 : NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
484 18 : )),
485 18 : validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
486 18 : ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
487 18 : }
488 18 : }
489 : }
490 :
491 : impl PageServerConfigBuilder {
492 12 : pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
493 12 : self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
494 12 : }
495 :
496 12 : pub fn listen_http_addr(&mut self, listen_http_addr: String) {
497 12 : self.listen_http_addr = BuilderValue::Set(listen_http_addr)
498 12 : }
499 :
500 0 : pub fn availability_zone(&mut self, availability_zone: Option<String>) {
501 0 : self.availability_zone = BuilderValue::Set(availability_zone)
502 0 : }
503 :
504 12 : pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
505 12 : self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
506 12 : }
507 :
508 12 : pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
509 12 : self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
510 12 : }
511 :
512 12 : pub fn superuser(&mut self, superuser: String) {
513 12 : self.superuser = BuilderValue::Set(superuser)
514 12 : }
515 :
516 12 : pub fn page_cache_size(&mut self, page_cache_size: usize) {
517 12 : self.page_cache_size = BuilderValue::Set(page_cache_size)
518 12 : }
519 :
520 12 : pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
521 12 : self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
522 12 : }
523 :
524 18 : pub fn workdir(&mut self, workdir: Utf8PathBuf) {
525 18 : self.workdir = BuilderValue::Set(workdir)
526 18 : }
527 :
528 18 : pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
529 18 : self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
530 18 : }
531 :
532 0 : pub fn http_auth_type(&mut self, auth_type: AuthType) {
533 0 : self.http_auth_type = BuilderValue::Set(auth_type)
534 0 : }
535 :
536 0 : pub fn pg_auth_type(&mut self, auth_type: AuthType) {
537 0 : self.pg_auth_type = BuilderValue::Set(auth_type)
538 0 : }
539 :
540 0 : pub fn auth_validation_public_key_path(
541 0 : &mut self,
542 0 : auth_validation_public_key_path: Option<Utf8PathBuf>,
543 0 : ) {
544 0 : self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
545 0 : }
546 :
547 8 : pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
548 8 : self.remote_storage_config = BuilderValue::Set(remote_storage_config)
549 8 : }
550 :
551 14 : pub fn broker_endpoint(&mut self, broker_endpoint: Uri) {
552 14 : self.broker_endpoint = BuilderValue::Set(broker_endpoint)
553 14 : }
554 :
555 0 : pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
556 0 : self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
557 0 : }
558 :
559 18 : pub fn id(&mut self, node_id: NodeId) {
560 18 : self.id = BuilderValue::Set(node_id)
561 18 : }
562 :
563 12 : pub fn log_format(&mut self, log_format: LogFormat) {
564 12 : self.log_format = BuilderValue::Set(log_format)
565 12 : }
566 :
567 0 : pub fn concurrent_tenant_warmup(&mut self, u: NonZeroUsize) {
568 0 : self.concurrent_tenant_warmup = BuilderValue::Set(u);
569 0 : }
570 :
571 0 : pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
572 0 : self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
573 0 : }
574 :
575 16 : pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) {
576 16 : self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
577 16 : }
578 :
579 12 : pub fn cached_metric_collection_interval(
580 12 : &mut self,
581 12 : cached_metric_collection_interval: Duration,
582 12 : ) {
583 12 : self.cached_metric_collection_interval =
584 12 : BuilderValue::Set(cached_metric_collection_interval)
585 12 : }
586 :
587 16 : pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
588 16 : self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
589 16 : }
590 :
591 0 : pub fn metric_collection_bucket(
592 0 : &mut self,
593 0 : metric_collection_bucket: Option<RemoteStorageConfig>,
594 0 : ) {
595 0 : self.metric_collection_bucket = BuilderValue::Set(metric_collection_bucket)
596 0 : }
597 :
598 12 : pub fn synthetic_size_calculation_interval(
599 12 : &mut self,
600 12 : synthetic_size_calculation_interval: Duration,
601 12 : ) {
602 12 : self.synthetic_size_calculation_interval =
603 12 : BuilderValue::Set(synthetic_size_calculation_interval)
604 12 : }
605 :
606 0 : pub fn test_remote_failures(&mut self, fail_first: u64) {
607 0 : self.test_remote_failures = BuilderValue::Set(fail_first);
608 0 : }
609 :
610 2 : pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
611 2 : self.disk_usage_based_eviction = BuilderValue::Set(value);
612 2 : }
613 :
614 0 : pub fn ondemand_download_behavior_treat_error_as_warn(
615 0 : &mut self,
616 0 : ondemand_download_behavior_treat_error_as_warn: bool,
617 0 : ) {
618 0 : self.ondemand_download_behavior_treat_error_as_warn =
619 0 : BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
620 0 : }
621 :
622 12 : pub fn background_task_maximum_delay(&mut self, delay: Duration) {
623 12 : self.background_task_maximum_delay = BuilderValue::Set(delay);
624 12 : }
625 :
626 0 : pub fn control_plane_api(&mut self, api: Option<Url>) {
627 0 : self.control_plane_api = BuilderValue::Set(api)
628 0 : }
629 :
630 0 : pub fn control_plane_api_token(&mut self, token: Option<SecretString>) {
631 0 : self.control_plane_api_token = BuilderValue::Set(token)
632 0 : }
633 :
634 0 : pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
635 0 : self.control_plane_emergency_mode = BuilderValue::Set(enabled)
636 0 : }
637 :
638 0 : pub fn heatmap_upload_concurrency(&mut self, value: usize) {
639 0 : self.heatmap_upload_concurrency = BuilderValue::Set(value)
640 0 : }
641 :
642 0 : pub fn secondary_download_concurrency(&mut self, value: usize) {
643 0 : self.secondary_download_concurrency = BuilderValue::Set(value)
644 0 : }
645 :
646 0 : pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
647 0 : self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
648 0 : }
649 :
650 0 : pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
651 0 : self.virtual_file_io_engine = BuilderValue::Set(value);
652 0 : }
653 :
654 0 : pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) {
655 0 : self.get_vectored_impl = BuilderValue::Set(value);
656 0 : }
657 :
658 0 : pub fn get_impl(&mut self, value: GetImpl) {
659 0 : self.get_impl = BuilderValue::Set(value);
660 0 : }
661 :
662 0 : pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
663 0 : self.max_vectored_read_bytes = BuilderValue::Set(value);
664 0 : }
665 :
666 0 : pub fn get_validate_vectored_get(&mut self, value: bool) {
667 0 : self.validate_vectored_get = BuilderValue::Set(value);
668 0 : }
669 :
670 0 : pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
671 0 : self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
672 0 : }
673 :
674 18 : pub fn build(self) -> anyhow::Result<PageServerConf> {
675 18 : let default = Self::default_values();
676 18 :
677 18 : macro_rules! conf {
678 18 : (USING DEFAULT { $($field:ident,)* } CUSTOM LOGIC { $($custom_field:ident : $custom_value:expr,)* } ) => {
679 18 : PageServerConf {
680 18 : $(
681 18 : $field: self.$field.ok_or(stringify!($field), default.$field)?,
682 18 : )*
683 18 : $(
684 18 : $custom_field: $custom_value,
685 18 : )*
686 18 : }
687 18 : };
688 18 : }
689 18 :
690 18 : Ok(conf!(
691 : USING DEFAULT
692 : {
693 : listen_pg_addr,
694 : listen_http_addr,
695 : availability_zone,
696 : wait_lsn_timeout,
697 : wal_redo_timeout,
698 : superuser,
699 : page_cache_size,
700 : max_file_descriptors,
701 : workdir,
702 : pg_distrib_dir,
703 : http_auth_type,
704 : pg_auth_type,
705 : auth_validation_public_key_path,
706 : remote_storage_config,
707 : id,
708 : broker_endpoint,
709 : broker_keepalive_interval,
710 : log_format,
711 : metric_collection_interval,
712 : cached_metric_collection_interval,
713 : metric_collection_endpoint,
714 : metric_collection_bucket,
715 : synthetic_size_calculation_interval,
716 : disk_usage_based_eviction,
717 : test_remote_failures,
718 : ondemand_download_behavior_treat_error_as_warn,
719 : background_task_maximum_delay,
720 : control_plane_api,
721 : control_plane_api_token,
722 : control_plane_emergency_mode,
723 : heatmap_upload_concurrency,
724 : secondary_download_concurrency,
725 : ingest_batch_size,
726 : get_vectored_impl,
727 : get_impl,
728 : max_vectored_read_bytes,
729 : validate_vectored_get,
730 : ephemeral_bytes_per_memory_kb,
731 : }
732 : CUSTOM LOGIC
733 : {
734 : // TenantConf is handled separately
735 18 : default_tenant_conf: TenantConf::default(),
736 18 : concurrent_tenant_warmup: ConfigurableSemaphore::new({
737 18 : self
738 18 : .concurrent_tenant_warmup
739 18 : .ok_or("concurrent_tenant_warmpup",
740 18 : default.concurrent_tenant_warmup)?
741 : }),
742 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
743 18 : self
744 18 : .concurrent_tenant_size_logical_size_queries
745 18 : .ok_or("concurrent_tenant_size_logical_size_queries",
746 18 : default.concurrent_tenant_size_logical_size_queries.clone())?
747 : ),
748 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
749 : // re-use `concurrent_tenant_size_logical_size_queries`
750 18 : self
751 18 : .concurrent_tenant_size_logical_size_queries
752 18 : .ok_or("eviction_task_immitated_concurrent_logical_size_queries",
753 18 : default.concurrent_tenant_size_logical_size_queries.clone())?,
754 : ),
755 18 : virtual_file_io_engine: match self.virtual_file_io_engine {
756 0 : BuilderValue::Set(v) => v,
757 18 : BuilderValue::NotSet => match crate::virtual_file::io_engine_feature_test().context("auto-detect virtual_file_io_engine")? {
758 18 : io_engine::FeatureTestResult::PlatformPreferred(v) => v, // make no noise
759 0 : io_engine::FeatureTestResult::Worse { engine, remark } => {
760 0 : // TODO: bubble this up to the caller so we can tracing::warn! it.
761 0 : eprintln!("auto-detected IO engine is not platform-preferred: engine={engine:?} remark={remark:?}");
762 0 : engine
763 : }
764 : },
765 : },
766 : }
767 : ))
768 18 : }
769 : }
770 :
771 : impl PageServerConf {
772 : //
773 : // Repository paths, relative to workdir.
774 : //
775 :
776 6434 : pub fn tenants_path(&self) -> Utf8PathBuf {
777 6434 : self.workdir.join(TENANTS_SEGMENT_NAME)
778 6434 : }
779 :
780 72 : pub fn deletion_prefix(&self) -> Utf8PathBuf {
781 72 : self.workdir.join("deletion")
782 72 : }
783 :
784 0 : pub fn metadata_path(&self) -> Utf8PathBuf {
785 0 : self.workdir.join("metadata.json")
786 0 : }
787 :
788 28 : pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
789 28 : // Encode a version in the filename, so that if we ever switch away from JSON we can
790 28 : // increment this.
791 28 : const VERSION: u8 = 1;
792 28 :
793 28 : self.deletion_prefix()
794 28 : .join(format!("{sequence:016x}-{VERSION:02x}.list"))
795 28 : }
796 :
797 24 : pub fn deletion_header_path(&self) -> Utf8PathBuf {
798 24 : // Encode a version in the filename, so that if we ever switch away from JSON we can
799 24 : // increment this.
800 24 : const VERSION: u8 = 1;
801 24 :
802 24 : self.deletion_prefix().join(format!("header-{VERSION:02x}"))
803 24 : }
804 :
805 6434 : pub fn tenant_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
806 6434 : self.tenants_path().join(tenant_shard_id.to_string())
807 6434 : }
808 :
809 : /// Points to a place in pageserver's local directory,
810 : /// where certain tenant's LocationConf be stored.
811 0 : pub(crate) fn tenant_location_config_path(
812 0 : &self,
813 0 : tenant_shard_id: &TenantShardId,
814 0 : ) -> Utf8PathBuf {
815 0 : self.tenant_path(tenant_shard_id)
816 0 : .join(TENANT_LOCATION_CONFIG_NAME)
817 0 : }
818 :
819 0 : pub(crate) fn tenant_heatmap_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
820 0 : self.tenant_path(tenant_shard_id)
821 0 : .join(TENANT_HEATMAP_BASENAME)
822 0 : }
823 :
824 6269 : pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
825 6269 : self.tenant_path(tenant_shard_id)
826 6269 : .join(TIMELINES_SEGMENT_NAME)
827 6269 : }
828 :
829 5943 : pub fn timeline_path(
830 5943 : &self,
831 5943 : tenant_shard_id: &TenantShardId,
832 5943 : timeline_id: &TimelineId,
833 5943 : ) -> Utf8PathBuf {
834 5943 : self.timelines_path(tenant_shard_id)
835 5943 : .join(timeline_id.to_string())
836 5943 : }
837 :
838 0 : pub(crate) fn timeline_delete_mark_file_path(
839 0 : &self,
840 0 : tenant_shard_id: TenantShardId,
841 0 : timeline_id: TimelineId,
842 0 : ) -> Utf8PathBuf {
843 0 : path_with_suffix_extension(
844 0 : self.timeline_path(&tenant_shard_id, &timeline_id),
845 0 : TIMELINE_DELETE_MARK_SUFFIX,
846 0 : )
847 0 : }
848 :
849 0 : pub fn traces_path(&self) -> Utf8PathBuf {
850 0 : self.workdir.join("traces")
851 0 : }
852 :
853 0 : pub fn trace_path(
854 0 : &self,
855 0 : tenant_shard_id: &TenantShardId,
856 0 : timeline_id: &TimelineId,
857 0 : connection_id: &ConnectionId,
858 0 : ) -> Utf8PathBuf {
859 0 : self.traces_path()
860 0 : .join(tenant_shard_id.to_string())
861 0 : .join(timeline_id.to_string())
862 0 : .join(connection_id.to_string())
863 0 : }
864 :
865 : /// Turns storage remote path of a file into its local path.
866 0 : pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
867 0 : remote_path.with_base(&self.workdir)
868 0 : }
869 :
870 : //
871 : // Postgres distribution paths
872 : //
873 16 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
874 16 : let path = self.pg_distrib_dir.clone();
875 16 :
876 16 : #[allow(clippy::manual_range_patterns)]
877 16 : match pg_version {
878 16 : 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
879 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
880 : }
881 16 : }
882 :
883 8 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
884 8 : Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
885 8 : }
886 8 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
887 8 : Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
888 8 : }
889 :
890 : /// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
891 : /// validating the input and failing on errors.
892 : ///
893 : /// This leaves any options not present in the file in the built-in defaults.
894 18 : pub fn parse_and_validate(toml: &Document, workdir: &Utf8Path) -> anyhow::Result<Self> {
895 18 : let mut builder = PageServerConfigBuilder::default();
896 18 : builder.workdir(workdir.to_owned());
897 18 :
898 18 : let mut t_conf = TenantConfOpt::default();
899 :
900 230 : for (key, item) in toml.iter() {
901 230 : match key {
902 230 : "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
903 218 : "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
904 206 : "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
905 206 : "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
906 194 : "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
907 182 : "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
908 170 : "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
909 158 : "max_file_descriptors" => {
910 12 : builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
911 : }
912 146 : "pg_distrib_dir" => {
913 18 : builder.pg_distrib_dir(Utf8PathBuf::from(parse_toml_string(key, item)?))
914 : }
915 128 : "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
916 0 : Utf8PathBuf::from(parse_toml_string(key, item)?),
917 : )),
918 128 : "http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
919 128 : "pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
920 128 : "remote_storage" => {
921 8 : builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
922 : }
923 120 : "tenant_config" => {
924 6 : t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
925 : }
926 114 : "id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
927 96 : "broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
928 82 : "broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
929 82 : "log_format" => builder.log_format(
930 12 : LogFormat::from_config(&parse_toml_string(key, item)?)?
931 : ),
932 70 : "concurrent_tenant_warmup" => builder.concurrent_tenant_warmup({
933 0 : let input = parse_toml_string(key, item)?;
934 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
935 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
936 : }),
937 70 : "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
938 0 : let input = parse_toml_string(key, item)?;
939 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
940 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
941 : }),
942 70 : "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
943 54 : "cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
944 42 : "metric_collection_endpoint" => {
945 16 : let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
946 16 : builder.metric_collection_endpoint(Some(endpoint));
947 : },
948 26 : "metric_collection_bucket" => {
949 0 : builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
950 : }
951 26 : "synthetic_size_calculation_interval" =>
952 12 : builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
953 14 : "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
954 14 : "disk_usage_based_eviction" => {
955 2 : tracing::info!("disk_usage_based_eviction: {:#?}", &item);
956 2 : builder.disk_usage_based_eviction(
957 2 : deserialize_from_item("disk_usage_based_eviction", item)
958 2 : .context("parse disk_usage_based_eviction")?
959 : )
960 : },
961 12 : "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
962 12 : "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
963 0 : "control_plane_api" => {
964 0 : let parsed = parse_toml_string(key, item)?;
965 0 : if parsed.is_empty() {
966 0 : builder.control_plane_api(None)
967 : } else {
968 0 : builder.control_plane_api(Some(parsed.parse().context("failed to parse control plane URL")?))
969 : }
970 : },
971 0 : "control_plane_api_token" => {
972 0 : let parsed = parse_toml_string(key, item)?;
973 0 : if parsed.is_empty() {
974 0 : builder.control_plane_api_token(None)
975 : } else {
976 0 : builder.control_plane_api_token(Some(parsed.into()))
977 : }
978 : },
979 0 : "control_plane_emergency_mode" => {
980 0 : builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
981 : },
982 0 : "heatmap_upload_concurrency" => {
983 0 : builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
984 : },
985 0 : "secondary_download_concurrency" => {
986 0 : builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
987 : },
988 0 : "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
989 0 : "virtual_file_io_engine" => {
990 0 : builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
991 : }
992 0 : "get_vectored_impl" => {
993 0 : builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
994 : }
995 0 : "get_impl" => {
996 0 : builder.get_impl(parse_toml_from_str("get_impl", item)?)
997 : }
998 0 : "max_vectored_read_bytes" => {
999 0 : let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
1000 0 : builder.get_max_vectored_read_bytes(
1001 0 : MaxVectoredReadBytes(
1002 0 : NonZeroUsize::new(bytes).expect("Max byte size of vectored read must be greater than 0")))
1003 : }
1004 0 : "validate_vectored_get" => {
1005 0 : builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
1006 : }
1007 0 : "ephemeral_bytes_per_memory_kb" => {
1008 0 : builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
1009 : }
1010 0 : _ => bail!("unrecognized pageserver option '{key}'"),
1011 : }
1012 : }
1013 :
1014 18 : let mut conf = builder.build().context("invalid config")?;
1015 :
1016 18 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
1017 0 : let auth_validation_public_key_path = conf
1018 0 : .auth_validation_public_key_path
1019 0 : .get_or_insert_with(|| workdir.join("auth_public_key.pem"));
1020 0 : ensure!(
1021 0 : auth_validation_public_key_path.exists(),
1022 0 : format!(
1023 0 : "Can't find auth_validation_public_key at '{auth_validation_public_key_path}'",
1024 0 : )
1025 : );
1026 18 : }
1027 :
1028 18 : conf.default_tenant_conf = t_conf.merge(TenantConf::default());
1029 18 :
1030 18 : Ok(conf)
1031 18 : }
1032 :
1033 : #[cfg(test)]
1034 173 : pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
1035 173 : let test_output_dir = std::env::var("TEST_OUTPUT").unwrap_or("../tmp_check".into());
1036 173 : Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}"))
1037 173 : }
1038 :
1039 169 : pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
1040 169 : let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
1041 169 :
1042 169 : PageServerConf {
1043 169 : id: NodeId(0),
1044 169 : wait_lsn_timeout: Duration::from_secs(60),
1045 169 : wal_redo_timeout: Duration::from_secs(60),
1046 169 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1047 169 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1048 169 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1049 169 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1050 169 : availability_zone: None,
1051 169 : superuser: "cloud_admin".to_string(),
1052 169 : workdir: repo_dir,
1053 169 : pg_distrib_dir,
1054 169 : http_auth_type: AuthType::Trust,
1055 169 : pg_auth_type: AuthType::Trust,
1056 169 : auth_validation_public_key_path: None,
1057 169 : remote_storage_config: None,
1058 169 : default_tenant_conf: TenantConf::default(),
1059 169 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1060 169 : broker_keepalive_interval: Duration::from_secs(5000),
1061 169 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1062 169 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1063 169 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
1064 169 : .expect("Invalid default constant"),
1065 169 : ),
1066 169 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1067 169 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
1068 169 : ),
1069 169 : metric_collection_interval: Duration::from_secs(60),
1070 169 : cached_metric_collection_interval: Duration::from_secs(60 * 60),
1071 169 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1072 169 : metric_collection_bucket: None,
1073 169 : synthetic_size_calculation_interval: Duration::from_secs(60),
1074 169 : disk_usage_based_eviction: None,
1075 169 : test_remote_failures: 0,
1076 169 : ondemand_download_behavior_treat_error_as_warn: false,
1077 169 : background_task_maximum_delay: Duration::ZERO,
1078 169 : control_plane_api: None,
1079 169 : control_plane_api_token: None,
1080 169 : control_plane_emergency_mode: false,
1081 169 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1082 169 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1083 169 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1084 169 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1085 169 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1086 169 : get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
1087 169 : max_vectored_read_bytes: MaxVectoredReadBytes(
1088 169 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1089 169 : .expect("Invalid default constant"),
1090 169 : ),
1091 169 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1092 169 : ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
1093 169 : }
1094 169 : }
1095 : }
1096 :
1097 : // Helper functions to parse a toml Item
1098 :
1099 96 : fn parse_toml_string(name: &str, item: &Item) -> Result<String> {
1100 96 : let s = item
1101 96 : .as_str()
1102 96 : .with_context(|| format!("configure option {name} is not a string"))?;
1103 96 : Ok(s.to_string())
1104 96 : }
1105 :
1106 42 : fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
1107 : // A toml integer is signed, so it cannot represent the full range of an u64. That's OK
1108 : // for our use, though.
1109 42 : let i: i64 = item
1110 42 : .as_integer()
1111 42 : .with_context(|| format!("configure option {name} is not an integer"))?;
1112 42 : if i < 0 {
1113 0 : bail!("configure option {name} cannot be negative");
1114 42 : }
1115 42 : Ok(i as u64)
1116 42 : }
1117 :
1118 0 : fn parse_toml_bool(name: &str, item: &Item) -> Result<bool> {
1119 0 : item.as_bool()
1120 0 : .with_context(|| format!("configure option {name} is not a bool"))
1121 0 : }
1122 :
1123 76 : fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
1124 76 : let s = item
1125 76 : .as_str()
1126 76 : .with_context(|| format!("configure option {name} is not a string"))?;
1127 :
1128 76 : Ok(humantime::parse_duration(s)?)
1129 76 : }
1130 :
1131 0 : fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
1132 0 : where
1133 0 : T: FromStr,
1134 0 : <T as FromStr>::Err: std::fmt::Display,
1135 0 : {
1136 0 : let v = item
1137 0 : .as_str()
1138 0 : .with_context(|| format!("configure option {name} is not a string"))?;
1139 0 : T::from_str(v).map_err(|e| {
1140 0 : anyhow!(
1141 0 : "Failed to parse string as {parse_type} for configure option {name}: {e}",
1142 0 : parse_type = stringify!(T)
1143 0 : )
1144 0 : })
1145 0 : }
1146 :
1147 2 : fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
1148 2 : where
1149 2 : T: serde::de::DeserializeOwned,
1150 2 : {
1151 : // ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
1152 2 : let deserializer = match item.clone().into_value() {
1153 2 : Ok(value) => value.into_deserializer(),
1154 0 : Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
1155 : };
1156 2 : T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
1157 2 : }
1158 :
1159 : /// Configurable semaphore permits setting.
1160 : ///
1161 : /// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
1162 : /// semaphore cannot be distinguished, leading any feature using these to await forever (or until
1163 : /// new permits are added).
1164 : #[derive(Debug, Clone)]
1165 : pub struct ConfigurableSemaphore {
1166 : initial_permits: NonZeroUsize,
1167 : inner: std::sync::Arc<tokio::sync::Semaphore>,
1168 : }
1169 :
1170 : impl ConfigurableSemaphore {
1171 : pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
1172 : Some(x) => x,
1173 : None => panic!("const unwrap is not yet stable"),
1174 : };
1175 :
1176 : /// Initializse using a non-zero amount of permits.
1177 : ///
1178 : /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
1179 : /// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
1180 : /// behave like [`futures::future::pending`], just waiting until new permits are added.
1181 : ///
1182 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
1183 573 : pub fn new(initial_permits: NonZeroUsize) -> Self {
1184 573 : ConfigurableSemaphore {
1185 573 : initial_permits,
1186 573 : inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
1187 573 : }
1188 573 : }
1189 :
1190 : /// Returns the configured amount of permits.
1191 0 : pub fn initial_permits(&self) -> NonZeroUsize {
1192 0 : self.initial_permits
1193 0 : }
1194 : }
1195 :
1196 : impl Default for ConfigurableSemaphore {
1197 346 : fn default() -> Self {
1198 346 : Self::new(Self::DEFAULT_INITIAL)
1199 346 : }
1200 : }
1201 :
1202 : impl PartialEq for ConfigurableSemaphore {
1203 12 : fn eq(&self, other: &Self) -> bool {
1204 12 : // the number of permits can be increased at runtime, so we cannot really fulfill the
1205 12 : // PartialEq value equality otherwise
1206 12 : self.initial_permits == other.initial_permits
1207 12 : }
1208 : }
1209 :
1210 : impl Eq for ConfigurableSemaphore {}
1211 :
1212 : impl ConfigurableSemaphore {
1213 0 : pub fn inner(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
1214 0 : &self.inner
1215 0 : }
1216 : }
1217 :
1218 : #[cfg(test)]
1219 : mod tests {
1220 : use std::{fs, num::NonZeroU32};
1221 :
1222 : use camino_tempfile::{tempdir, Utf8TempDir};
1223 : use pageserver_api::models::EvictionPolicy;
1224 : use remote_storage::{RemoteStorageKind, S3Config};
1225 : use utils::serde_percent::Percent;
1226 :
1227 : use super::*;
1228 : use crate::DEFAULT_PG_VERSION;
1229 :
1230 : const ALL_BASE_VALUES_TOML: &str = r#"
1231 : # Initial configuration file created by 'pageserver --init'
1232 :
1233 : listen_pg_addr = '127.0.0.1:64000'
1234 : listen_http_addr = '127.0.0.1:9898'
1235 :
1236 : wait_lsn_timeout = '111 s'
1237 : wal_redo_timeout = '111 s'
1238 :
1239 : page_cache_size = 444
1240 : max_file_descriptors = 333
1241 :
1242 : # initial superuser role name to use when creating a new tenant
1243 : initial_superuser_name = 'zzzz'
1244 : id = 10
1245 :
1246 : metric_collection_interval = '222 s'
1247 : cached_metric_collection_interval = '22200 s'
1248 : metric_collection_endpoint = 'http://localhost:80/metrics'
1249 : synthetic_size_calculation_interval = '333 s'
1250 :
1251 : log_format = 'json'
1252 : background_task_maximum_delay = '334 s'
1253 :
1254 : "#;
1255 :
1256 : #[test]
1257 2 : fn parse_defaults() -> anyhow::Result<()> {
1258 2 : let tempdir = tempdir()?;
1259 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1260 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1261 2 : // we have to create dummy values to overcome the validation errors
1262 2 : let config_string = format!(
1263 2 : "pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
1264 2 : );
1265 2 : let toml = config_string.parse()?;
1266 :
1267 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1268 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1269 2 :
1270 2 : assert_eq!(
1271 2 : parsed_config,
1272 2 : PageServerConf {
1273 2 : id: NodeId(10),
1274 2 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1275 2 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1276 2 : availability_zone: None,
1277 2 : wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
1278 2 : wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
1279 2 : superuser: defaults::DEFAULT_SUPERUSER.to_string(),
1280 2 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1281 2 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1282 2 : workdir,
1283 2 : pg_distrib_dir,
1284 2 : http_auth_type: AuthType::Trust,
1285 2 : pg_auth_type: AuthType::Trust,
1286 2 : auth_validation_public_key_path: None,
1287 2 : remote_storage_config: None,
1288 2 : default_tenant_conf: TenantConf::default(),
1289 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1290 2 : broker_keepalive_interval: humantime::parse_duration(
1291 2 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL
1292 2 : )?,
1293 2 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1294 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1295 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1296 2 : ),
1297 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1298 2 : eviction_task_immitated_concurrent_logical_size_queries:
1299 2 : ConfigurableSemaphore::default(),
1300 2 : metric_collection_interval: humantime::parse_duration(
1301 2 : defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
1302 2 : )?,
1303 2 : cached_metric_collection_interval: humantime::parse_duration(
1304 2 : defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
1305 2 : )?,
1306 2 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1307 2 : metric_collection_bucket: None,
1308 2 : synthetic_size_calculation_interval: humantime::parse_duration(
1309 2 : defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
1310 2 : )?,
1311 2 : disk_usage_based_eviction: None,
1312 2 : test_remote_failures: 0,
1313 2 : ondemand_download_behavior_treat_error_as_warn: false,
1314 2 : background_task_maximum_delay: humantime::parse_duration(
1315 2 : defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
1316 2 : )?,
1317 2 : control_plane_api: None,
1318 2 : control_plane_api_token: None,
1319 2 : control_plane_emergency_mode: false,
1320 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1321 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1322 2 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1323 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1324 2 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1325 2 : get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
1326 2 : max_vectored_read_bytes: MaxVectoredReadBytes(
1327 2 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1328 2 : .expect("Invalid default constant")
1329 2 : ),
1330 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1331 : ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
1332 : },
1333 0 : "Correct defaults should be used when no config values are provided"
1334 : );
1335 :
1336 2 : Ok(())
1337 2 : }
1338 :
1339 : #[test]
1340 2 : fn parse_basic_config() -> anyhow::Result<()> {
1341 2 : let tempdir = tempdir()?;
1342 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1343 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1344 2 :
1345 2 : let config_string = format!(
1346 2 : "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",
1347 2 : );
1348 2 : let toml = config_string.parse()?;
1349 :
1350 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1351 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1352 2 :
1353 2 : assert_eq!(
1354 2 : parsed_config,
1355 2 : PageServerConf {
1356 2 : id: NodeId(10),
1357 2 : listen_pg_addr: "127.0.0.1:64000".to_string(),
1358 2 : listen_http_addr: "127.0.0.1:9898".to_string(),
1359 2 : availability_zone: None,
1360 2 : wait_lsn_timeout: Duration::from_secs(111),
1361 2 : wal_redo_timeout: Duration::from_secs(111),
1362 2 : superuser: "zzzz".to_string(),
1363 2 : page_cache_size: 444,
1364 2 : max_file_descriptors: 333,
1365 2 : workdir,
1366 2 : pg_distrib_dir,
1367 2 : http_auth_type: AuthType::Trust,
1368 2 : pg_auth_type: AuthType::Trust,
1369 2 : auth_validation_public_key_path: None,
1370 2 : remote_storage_config: None,
1371 2 : default_tenant_conf: TenantConf::default(),
1372 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1373 2 : broker_keepalive_interval: Duration::from_secs(5),
1374 2 : log_format: LogFormat::Json,
1375 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1376 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1377 2 : ),
1378 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1379 2 : eviction_task_immitated_concurrent_logical_size_queries:
1380 2 : ConfigurableSemaphore::default(),
1381 2 : metric_collection_interval: Duration::from_secs(222),
1382 2 : cached_metric_collection_interval: Duration::from_secs(22200),
1383 2 : metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
1384 2 : metric_collection_bucket: None,
1385 2 : synthetic_size_calculation_interval: Duration::from_secs(333),
1386 2 : disk_usage_based_eviction: None,
1387 2 : test_remote_failures: 0,
1388 2 : ondemand_download_behavior_treat_error_as_warn: false,
1389 2 : background_task_maximum_delay: Duration::from_secs(334),
1390 2 : control_plane_api: None,
1391 2 : control_plane_api_token: None,
1392 2 : control_plane_emergency_mode: false,
1393 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1394 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1395 2 : ingest_batch_size: 100,
1396 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1397 2 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1398 2 : get_impl: defaults::DEFAULT_GET_IMPL.parse().unwrap(),
1399 2 : max_vectored_read_bytes: MaxVectoredReadBytes(
1400 2 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1401 2 : .expect("Invalid default constant")
1402 2 : ),
1403 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1404 : ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
1405 : },
1406 0 : "Should be able to parse all basic config values correctly"
1407 : );
1408 :
1409 2 : Ok(())
1410 2 : }
1411 :
1412 : #[test]
1413 2 : fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
1414 2 : let tempdir = tempdir()?;
1415 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1416 2 : let broker_endpoint = "http://127.0.0.1:7777";
1417 2 :
1418 2 : let local_storage_path = tempdir.path().join("local_remote_storage");
1419 2 :
1420 2 : let identical_toml_declarations = &[
1421 2 : format!(
1422 2 : r#"[remote_storage]
1423 2 : local_path = '{local_storage_path}'"#,
1424 2 : ),
1425 2 : format!("remote_storage={{local_path='{local_storage_path}'}}"),
1426 2 : ];
1427 :
1428 6 : for remote_storage_config_str in identical_toml_declarations {
1429 4 : let config_string = format!(
1430 4 : r#"{ALL_BASE_VALUES_TOML}
1431 4 : pg_distrib_dir='{pg_distrib_dir}'
1432 4 : broker_endpoint = '{broker_endpoint}'
1433 4 :
1434 4 : {remote_storage_config_str}"#,
1435 4 : );
1436 :
1437 4 : let toml = config_string.parse()?;
1438 :
1439 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1440 4 : .unwrap_or_else(|e| {
1441 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1442 4 : })
1443 4 : .remote_storage_config
1444 4 : .expect("Should have remote storage config for the local FS");
1445 4 :
1446 4 : assert_eq!(
1447 4 : parsed_remote_storage_config,
1448 4 : RemoteStorageConfig {
1449 4 : storage: RemoteStorageKind::LocalFs { local_path: local_storage_path.clone() },
1450 4 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
1451 4 : },
1452 0 : "Remote storage config should correctly parse the local FS config and fill other storage defaults"
1453 : );
1454 : }
1455 2 : Ok(())
1456 2 : }
1457 :
1458 : #[test]
1459 2 : fn parse_remote_s3_storage_config() -> anyhow::Result<()> {
1460 2 : let tempdir = tempdir()?;
1461 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1462 :
1463 2 : let bucket_name = "some-sample-bucket".to_string();
1464 2 : let bucket_region = "eu-north-1".to_string();
1465 2 : let prefix_in_bucket = "test_prefix".to_string();
1466 2 : let endpoint = "http://localhost:5000".to_string();
1467 2 : let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
1468 2 : let max_sync_errors = NonZeroU32::new(222).unwrap();
1469 2 : let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
1470 2 : let broker_endpoint = "http://127.0.0.1:7777";
1471 2 :
1472 2 : let identical_toml_declarations = &[
1473 2 : format!(
1474 2 : r#"[remote_storage]
1475 2 : max_concurrent_syncs = {max_concurrent_syncs}
1476 2 : max_sync_errors = {max_sync_errors}
1477 2 : bucket_name = '{bucket_name}'
1478 2 : bucket_region = '{bucket_region}'
1479 2 : prefix_in_bucket = '{prefix_in_bucket}'
1480 2 : endpoint = '{endpoint}'
1481 2 : concurrency_limit = {s3_concurrency_limit}"#
1482 2 : ),
1483 2 : format!(
1484 2 : "remote_storage={{max_concurrent_syncs={max_concurrent_syncs}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\
1485 2 : bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}",
1486 2 : ),
1487 2 : ];
1488 :
1489 6 : for remote_storage_config_str in identical_toml_declarations {
1490 4 : let config_string = format!(
1491 4 : r#"{ALL_BASE_VALUES_TOML}
1492 4 : pg_distrib_dir='{pg_distrib_dir}'
1493 4 : broker_endpoint = '{broker_endpoint}'
1494 4 :
1495 4 : {remote_storage_config_str}"#,
1496 4 : );
1497 :
1498 4 : let toml = config_string.parse()?;
1499 :
1500 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1501 4 : .unwrap_or_else(|e| {
1502 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1503 4 : })
1504 4 : .remote_storage_config
1505 4 : .expect("Should have remote storage config for S3");
1506 4 :
1507 4 : assert_eq!(
1508 4 : parsed_remote_storage_config,
1509 4 : RemoteStorageConfig {
1510 4 : storage: RemoteStorageKind::AwsS3(S3Config {
1511 4 : bucket_name: bucket_name.clone(),
1512 4 : bucket_region: bucket_region.clone(),
1513 4 : prefix_in_bucket: Some(prefix_in_bucket.clone()),
1514 4 : endpoint: Some(endpoint.clone()),
1515 4 : concurrency_limit: s3_concurrency_limit,
1516 4 : max_keys_per_list_response: None,
1517 4 : upload_storage_class: None,
1518 4 : }),
1519 4 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
1520 4 : },
1521 0 : "Remote storage config should correctly parse the S3 config"
1522 : );
1523 : }
1524 2 : Ok(())
1525 2 : }
1526 :
1527 : #[test]
1528 2 : fn parse_tenant_config() -> anyhow::Result<()> {
1529 2 : let tempdir = tempdir()?;
1530 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1531 :
1532 2 : let broker_endpoint = "http://127.0.0.1:7777";
1533 2 : let trace_read_requests = true;
1534 2 :
1535 2 : let config_string = format!(
1536 2 : r#"{ALL_BASE_VALUES_TOML}
1537 2 : pg_distrib_dir='{pg_distrib_dir}'
1538 2 : broker_endpoint = '{broker_endpoint}'
1539 2 :
1540 2 : [tenant_config]
1541 2 : trace_read_requests = {trace_read_requests}"#,
1542 2 : );
1543 :
1544 2 : let toml = config_string.parse()?;
1545 :
1546 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1547 2 : assert_eq!(
1548 : conf.default_tenant_conf.trace_read_requests, trace_read_requests,
1549 0 : "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
1550 : );
1551 :
1552 2 : Ok(())
1553 2 : }
1554 :
1555 : #[test]
1556 2 : fn parse_incorrect_tenant_config() -> anyhow::Result<()> {
1557 2 : let config_string = r#"
1558 2 : [tenant_config]
1559 2 : checkpoint_distance = -1 # supposed to be an u64
1560 2 : "#
1561 2 : .to_string();
1562 :
1563 2 : let toml: Document = config_string.parse()?;
1564 2 : let item = toml.get("tenant_config").unwrap();
1565 2 : let error = TenantConfOpt::try_from(item.to_owned()).unwrap_err();
1566 2 :
1567 2 : let expected_error_str = "checkpoint_distance: invalid value: integer `-1`, expected u64";
1568 2 : assert_eq!(error.to_string(), expected_error_str);
1569 :
1570 2 : Ok(())
1571 2 : }
1572 :
1573 : #[test]
1574 2 : fn parse_override_tenant_config() -> anyhow::Result<()> {
1575 2 : let config_string = r#"tenant_config={ min_resident_size_override = 400 }"#.to_string();
1576 :
1577 2 : let toml: Document = config_string.parse()?;
1578 2 : let item = toml.get("tenant_config").unwrap();
1579 2 : let conf = TenantConfOpt::try_from(item.to_owned()).unwrap();
1580 2 :
1581 2 : assert_eq!(conf.min_resident_size_override, Some(400));
1582 :
1583 2 : Ok(())
1584 2 : }
1585 :
1586 : #[test]
1587 2 : fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
1588 2 : let tempdir = tempdir()?;
1589 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1590 :
1591 2 : let pageserver_conf_toml = format!(
1592 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1593 2 : metric_collection_endpoint = "http://sample.url"
1594 2 : metric_collection_interval = "10min"
1595 2 : id = 222
1596 2 :
1597 2 : [disk_usage_based_eviction]
1598 2 : max_usage_pct = 80
1599 2 : min_avail_bytes = 0
1600 2 : period = "10s"
1601 2 :
1602 2 : [tenant_config]
1603 2 : evictions_low_residence_duration_metric_threshold = "20m"
1604 2 :
1605 2 : [tenant_config.eviction_policy]
1606 2 : kind = "LayerAccessThreshold"
1607 2 : period = "20m"
1608 2 : threshold = "20m"
1609 2 : "#,
1610 2 : );
1611 2 : let toml: Document = pageserver_conf_toml.parse()?;
1612 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1613 :
1614 2 : assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
1615 2 : assert_eq!(
1616 2 : conf.metric_collection_endpoint,
1617 2 : Some("http://sample.url".parse().unwrap())
1618 2 : );
1619 2 : assert_eq!(
1620 2 : conf.metric_collection_interval,
1621 2 : Duration::from_secs(10 * 60)
1622 2 : );
1623 2 : assert_eq!(
1624 2 : conf.default_tenant_conf
1625 2 : .evictions_low_residence_duration_metric_threshold,
1626 2 : Duration::from_secs(20 * 60)
1627 2 : );
1628 2 : assert_eq!(conf.id, NodeId(222));
1629 2 : assert_eq!(
1630 2 : conf.disk_usage_based_eviction,
1631 2 : Some(DiskUsageEvictionTaskConfig {
1632 2 : max_usage_pct: Percent::new(80).unwrap(),
1633 2 : min_avail_bytes: 0,
1634 2 : period: Duration::from_secs(10),
1635 2 : #[cfg(feature = "testing")]
1636 2 : mock_statvfs: None,
1637 2 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder::AbsoluteAccessed,
1638 2 : })
1639 2 : );
1640 :
1641 2 : match &conf.default_tenant_conf.eviction_policy {
1642 2 : EvictionPolicy::LayerAccessThreshold(eviction_threshold) => {
1643 2 : assert_eq!(eviction_threshold.period, Duration::from_secs(20 * 60));
1644 2 : assert_eq!(eviction_threshold.threshold, Duration::from_secs(20 * 60));
1645 : }
1646 0 : other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
1647 : }
1648 :
1649 2 : Ok(())
1650 2 : }
1651 :
1652 : #[test]
1653 2 : fn parse_imitation_only_pageserver_config() {
1654 2 : let tempdir = tempdir().unwrap();
1655 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir).unwrap();
1656 2 :
1657 2 : let pageserver_conf_toml = format!(
1658 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1659 2 : metric_collection_endpoint = "http://sample.url"
1660 2 : metric_collection_interval = "10min"
1661 2 : id = 222
1662 2 :
1663 2 : [tenant_config]
1664 2 : evictions_low_residence_duration_metric_threshold = "20m"
1665 2 :
1666 2 : [tenant_config.eviction_policy]
1667 2 : kind = "OnlyImitiate"
1668 2 : period = "20m"
1669 2 : threshold = "20m"
1670 2 : "#,
1671 2 : );
1672 2 : let toml: Document = pageserver_conf_toml.parse().unwrap();
1673 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir).unwrap();
1674 2 :
1675 2 : match &conf.default_tenant_conf.eviction_policy {
1676 2 : EvictionPolicy::OnlyImitiate(t) => {
1677 2 : assert_eq!(t.period, Duration::from_secs(20 * 60));
1678 2 : assert_eq!(t.threshold, Duration::from_secs(20 * 60));
1679 : }
1680 0 : other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
1681 : }
1682 2 : }
1683 :
1684 14 : fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
1685 14 : let tempdir_path = tempdir.path();
1686 14 :
1687 14 : let workdir = tempdir_path.join("workdir");
1688 14 : fs::create_dir_all(&workdir)?;
1689 :
1690 14 : let pg_distrib_dir = tempdir_path.join("pg_distrib");
1691 14 : let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
1692 14 : fs::create_dir_all(&pg_distrib_dir_versioned)?;
1693 14 : let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
1694 14 : fs::create_dir_all(&postgres_bin_dir)?;
1695 14 : fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;
1696 :
1697 14 : Ok((workdir, pg_distrib_dir))
1698 14 : }
1699 : }
|