Line data Source code
1 : //! Functions for handling page server configuration options
2 : //!
3 : //! Configuration options can be set in the pageserver.toml configuration
4 : //! file, or on the command line.
5 : //! See also `settings.md` for better description on every parameter.
6 :
7 : use anyhow::{anyhow, bail, ensure, Context, Result};
8 : use pageserver_api::shard::TenantShardId;
9 : use remote_storage::{RemotePath, RemoteStorageConfig};
10 : use serde::de::IntoDeserializer;
11 : use std::env;
12 : use storage_broker::Uri;
13 : use utils::crashsafe::path_with_suffix_extension;
14 : use utils::id::ConnectionId;
15 : use utils::logging::SecretString;
16 :
17 : use once_cell::sync::OnceCell;
18 : use reqwest::Url;
19 : use std::num::NonZeroUsize;
20 : use std::str::FromStr;
21 : use std::sync::Arc;
22 : use std::time::Duration;
23 : use toml_edit;
24 : use toml_edit::{Document, Item};
25 :
26 : use camino::{Utf8Path, Utf8PathBuf};
27 : use postgres_backend::AuthType;
28 : use utils::{
29 : id::{NodeId, TimelineId},
30 : logging::LogFormat,
31 : };
32 :
33 : use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
34 : use crate::tenant::config::TenantConf;
35 : use crate::tenant::config::TenantConfOpt;
36 : use crate::tenant::timeline::GetVectoredImpl;
37 : use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
38 : use crate::tenant::{
39 : TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
40 : };
41 : use crate::virtual_file;
42 : use crate::{
43 : IGNORED_TENANT_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
44 : TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
45 : };
46 :
47 : use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
48 :
49 : use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
50 :
51 : pub mod defaults {
52 : use crate::tenant::config::defaults::*;
53 : use const_format::formatcp;
54 :
55 : pub use pageserver_api::{
56 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
57 : DEFAULT_PG_LISTEN_PORT,
58 : };
59 : pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
60 :
61 : pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
62 : pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
63 :
64 : pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
65 :
66 : pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
67 : pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
68 :
69 : pub const DEFAULT_LOG_FORMAT: &str = "plain";
70 :
71 : pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8;
72 :
73 : pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
74 : super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
75 :
76 : pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
77 : pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
78 : pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
79 : pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
80 : pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
81 :
82 : pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
83 : pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
84 :
85 : pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
86 :
87 : pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
88 :
89 : pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
90 :
91 : pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
92 :
93 : pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
94 :
95 : ///
96 : /// Default built-in configuration file.
97 : ///
98 : pub const DEFAULT_CONFIG_FILE: &str = formatcp!(
99 : r#"
100 : # Initial configuration file created by 'pageserver --init'
101 : #listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
102 : #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
103 :
104 : #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
105 : #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
106 :
107 : #page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
108 : #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
109 :
110 : # initial superuser role name to use when creating a new tenant
111 : #initial_superuser_name = '{DEFAULT_SUPERUSER}'
112 :
113 : #broker_endpoint = '{BROKER_DEFAULT_ENDPOINT}'
114 :
115 : #log_format = '{DEFAULT_LOG_FORMAT}'
116 :
117 : #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
118 : #concurrent_tenant_warmup = '{DEFAULT_CONCURRENT_TENANT_WARMUP}'
119 :
120 : #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
121 : #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
122 : #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
123 :
124 : #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
125 :
126 : #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
127 :
128 : #ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
129 :
130 : #virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
131 :
132 : #get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
133 :
134 : #max_vectored_read_bytes = '{DEFAULT_MAX_VECTORED_READ_BYTES}'
135 :
136 : #validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
137 :
138 : [tenant_config]
139 : #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
140 : #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
141 : #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
142 : #compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
143 : #compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
144 :
145 : #gc_period = '{DEFAULT_GC_PERIOD}'
146 : #gc_horizon = {DEFAULT_GC_HORIZON}
147 : #image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD}
148 : #pitr_interval = '{DEFAULT_PITR_INTERVAL}'
149 :
150 : #min_resident_size_override = .. # in bytes
151 : #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
152 :
153 : #heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
154 : #secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
155 :
156 : [remote_storage]
157 :
158 : "#
159 : );
160 : }
161 :
162 4 : #[derive(Debug, Clone, PartialEq, Eq)]
163 : pub struct PageServerConf {
164 : // Identifier of that particular pageserver so e g safekeepers
165 : // can safely distinguish different pageservers
166 : pub id: NodeId,
167 :
168 : /// Example (default): 127.0.0.1:64000
169 : pub listen_pg_addr: String,
170 : /// Example (default): 127.0.0.1:9898
171 : pub listen_http_addr: String,
172 :
173 : /// Current availability zone. Used for traffic metrics.
174 : pub availability_zone: Option<String>,
175 :
176 : // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
177 : pub wait_lsn_timeout: Duration,
178 : // How long to wait for WAL redo to complete.
179 : pub wal_redo_timeout: Duration,
180 :
181 : pub superuser: String,
182 :
183 : pub page_cache_size: usize,
184 : pub max_file_descriptors: usize,
185 :
186 : // Repository directory, relative to current working directory.
187 : // Normally, the page server changes the current working directory
188 : // to the repository, and 'workdir' is always '.'. But we don't do
189 : // that during unit testing, because the current directory is global
190 : // to the process but different unit tests work on different
191 : // repositories.
192 : pub workdir: Utf8PathBuf,
193 :
194 : pub pg_distrib_dir: Utf8PathBuf,
195 :
196 : // Authentication
197 : /// authentication method for the HTTP mgmt API
198 : pub http_auth_type: AuthType,
199 : /// authentication method for libpq connections from compute
200 : pub pg_auth_type: AuthType,
201 : /// Path to a file or directory containing public key(s) for verifying JWT tokens.
202 : /// Used for both mgmt and compute auth, if enabled.
203 : pub auth_validation_public_key_path: Option<Utf8PathBuf>,
204 :
205 : pub remote_storage_config: Option<RemoteStorageConfig>,
206 :
207 : pub default_tenant_conf: TenantConf,
208 :
209 : /// Storage broker endpoints to connect to.
210 : pub broker_endpoint: Uri,
211 : pub broker_keepalive_interval: Duration,
212 :
213 : pub log_format: LogFormat,
214 :
215 : /// Number of tenants which will be concurrently loaded from remote storage proactively on startup,
216 : /// does not limit tenants loaded in response to client I/O. A lower value implicitly deprioritizes
217 : /// loading such tenants, vs. other work in the system.
218 : pub concurrent_tenant_warmup: ConfigurableSemaphore,
219 :
220 : /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
221 : pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
222 : /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
223 : /// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
224 : /// See the comment in `eviction_task` for details.
225 : ///
226 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
227 : pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
228 :
229 : // How often to collect metrics and send them to the metrics endpoint.
230 : pub metric_collection_interval: Duration,
231 : // How often to send unchanged cached metrics to the metrics endpoint.
232 : pub cached_metric_collection_interval: Duration,
233 : pub metric_collection_endpoint: Option<Url>,
234 : pub synthetic_size_calculation_interval: Duration,
235 :
236 : pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
237 :
238 : pub test_remote_failures: u64,
239 :
240 : pub ondemand_download_behavior_treat_error_as_warn: bool,
241 :
242 : /// How long will background tasks be delayed at most after initial load of tenants.
243 : ///
244 : /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
245 : /// as we now isolate initial loading, initial logical size calculation and background tasks.
246 : /// Smaller nodes will have background tasks "not running" for this long unless every timeline
247 : /// has it's initial logical size calculated. Not running background tasks for some seconds is
248 : /// not terrible.
249 : pub background_task_maximum_delay: Duration,
250 :
251 : pub control_plane_api: Option<Url>,
252 :
253 : /// JWT token for use with the control plane API.
254 : pub control_plane_api_token: Option<SecretString>,
255 :
256 : /// If true, pageserver will make best-effort to operate without a control plane: only
257 : /// for use in major incidents.
258 : pub control_plane_emergency_mode: bool,
259 :
260 : /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
261 : /// heatmap uploads vs. other remote storage operations.
262 : pub heatmap_upload_concurrency: usize,
263 :
264 : /// How many remote storage downloads may be done for secondary tenants concurrently. Implicitly
265 : /// deprioritises secondary downloads vs. remote storage operations for attached tenants.
266 : pub secondary_download_concurrency: usize,
267 :
268 : /// Maximum number of WAL records to be ingested and committed at the same time
269 : pub ingest_batch_size: u64,
270 :
271 : pub virtual_file_io_engine: virtual_file::IoEngineKind,
272 :
273 : pub get_vectored_impl: GetVectoredImpl,
274 :
275 : pub max_vectored_read_bytes: MaxVectoredReadBytes,
276 :
277 : pub validate_vectored_get: bool,
278 : }
279 :
280 : /// We do not want to store this in a PageServerConf because the latter may be logged
281 : /// and/or serialized at a whim, while the token is secret. Currently this token is the
282 : /// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
283 : /// the future, more tokens and auth may arrive for storage broker, completely changing the logic.
284 : /// Hence, we resort to a global variable for now instead of passing the token from the
285 : /// startup code to the connection code through a dozen layers.
286 : pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
287 :
288 : // use dedicated enum for builder to better indicate the intention
289 : // and avoid possible confusion with nested options
290 : pub enum BuilderValue<T> {
291 : Set(T),
292 : NotSet,
293 : }
294 :
295 : impl<T> BuilderValue<T> {
296 684 : pub fn ok_or<E>(self, err: E) -> Result<T, E> {
297 684 : match self {
298 684 : Self::Set(v) => Ok(v),
299 0 : Self::NotSet => Err(err),
300 : }
301 684 : }
302 : }
303 :
304 : // needed to simplify config construction
305 : struct PageServerConfigBuilder {
306 : listen_pg_addr: BuilderValue<String>,
307 :
308 : listen_http_addr: BuilderValue<String>,
309 :
310 : availability_zone: BuilderValue<Option<String>>,
311 :
312 : wait_lsn_timeout: BuilderValue<Duration>,
313 : wal_redo_timeout: BuilderValue<Duration>,
314 :
315 : superuser: BuilderValue<String>,
316 :
317 : page_cache_size: BuilderValue<usize>,
318 : max_file_descriptors: BuilderValue<usize>,
319 :
320 : workdir: BuilderValue<Utf8PathBuf>,
321 :
322 : pg_distrib_dir: BuilderValue<Utf8PathBuf>,
323 :
324 : http_auth_type: BuilderValue<AuthType>,
325 : pg_auth_type: BuilderValue<AuthType>,
326 :
327 : //
328 : auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
329 : remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
330 :
331 : id: BuilderValue<NodeId>,
332 :
333 : broker_endpoint: BuilderValue<Uri>,
334 : broker_keepalive_interval: BuilderValue<Duration>,
335 :
336 : log_format: BuilderValue<LogFormat>,
337 :
338 : concurrent_tenant_warmup: BuilderValue<NonZeroUsize>,
339 : concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
340 :
341 : metric_collection_interval: BuilderValue<Duration>,
342 : cached_metric_collection_interval: BuilderValue<Duration>,
343 : metric_collection_endpoint: BuilderValue<Option<Url>>,
344 : synthetic_size_calculation_interval: BuilderValue<Duration>,
345 :
346 : disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
347 :
348 : test_remote_failures: BuilderValue<u64>,
349 :
350 : ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
351 :
352 : background_task_maximum_delay: BuilderValue<Duration>,
353 :
354 : control_plane_api: BuilderValue<Option<Url>>,
355 : control_plane_api_token: BuilderValue<Option<SecretString>>,
356 : control_plane_emergency_mode: BuilderValue<bool>,
357 :
358 : heatmap_upload_concurrency: BuilderValue<usize>,
359 : secondary_download_concurrency: BuilderValue<usize>,
360 :
361 : ingest_batch_size: BuilderValue<u64>,
362 :
363 : virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
364 :
365 : get_vectored_impl: BuilderValue<GetVectoredImpl>,
366 :
367 : max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
368 :
369 : validate_vectored_get: BuilderValue<bool>,
370 : }
371 :
372 : impl Default for PageServerConfigBuilder {
373 18 : fn default() -> Self {
374 18 : use self::BuilderValue::*;
375 18 : use defaults::*;
376 18 : Self {
377 18 : listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
378 18 : listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
379 18 : availability_zone: Set(None),
380 18 : wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
381 18 : .expect("cannot parse default wait lsn timeout")),
382 18 : wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
383 18 : .expect("cannot parse default wal redo timeout")),
384 18 : superuser: Set(DEFAULT_SUPERUSER.to_string()),
385 18 : page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
386 18 : max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
387 18 : workdir: Set(Utf8PathBuf::new()),
388 18 : pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
389 18 : env::current_dir().expect("cannot access current directory"),
390 18 : )
391 18 : .expect("non-Unicode path")
392 18 : .join("pg_install")),
393 18 : http_auth_type: Set(AuthType::Trust),
394 18 : pg_auth_type: Set(AuthType::Trust),
395 18 : auth_validation_public_key_path: Set(None),
396 18 : remote_storage_config: Set(None),
397 18 : id: NotSet,
398 18 : broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
399 18 : .parse()
400 18 : .expect("failed to parse default broker endpoint")),
401 18 : broker_keepalive_interval: Set(humantime::parse_duration(
402 18 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL,
403 18 : )
404 18 : .expect("cannot parse default keepalive interval")),
405 18 : log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
406 18 :
407 18 : concurrent_tenant_warmup: Set(NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
408 18 : .expect("Invalid default constant")),
409 18 : concurrent_tenant_size_logical_size_queries: Set(
410 18 : ConfigurableSemaphore::DEFAULT_INITIAL,
411 18 : ),
412 18 : metric_collection_interval: Set(humantime::parse_duration(
413 18 : DEFAULT_METRIC_COLLECTION_INTERVAL,
414 18 : )
415 18 : .expect("cannot parse default metric collection interval")),
416 18 : cached_metric_collection_interval: Set(humantime::parse_duration(
417 18 : DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL,
418 18 : )
419 18 : .expect("cannot parse default cached_metric_collection_interval")),
420 18 : synthetic_size_calculation_interval: Set(humantime::parse_duration(
421 18 : DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
422 18 : )
423 18 : .expect("cannot parse default synthetic size calculation interval")),
424 18 : metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
425 18 :
426 18 : disk_usage_based_eviction: Set(None),
427 18 :
428 18 : test_remote_failures: Set(0),
429 18 :
430 18 : ondemand_download_behavior_treat_error_as_warn: Set(false),
431 18 :
432 18 : background_task_maximum_delay: Set(humantime::parse_duration(
433 18 : DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
434 18 : )
435 18 : .unwrap()),
436 18 :
437 18 : control_plane_api: Set(None),
438 18 : control_plane_api_token: Set(None),
439 18 : control_plane_emergency_mode: Set(false),
440 18 :
441 18 : heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
442 18 : secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
443 18 :
444 18 : ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
445 18 :
446 18 : virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
447 18 :
448 18 : get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
449 18 : max_vectored_read_bytes: Set(MaxVectoredReadBytes(
450 18 : NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
451 18 : )),
452 18 : validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
453 18 : }
454 18 : }
455 : }
456 :
457 : impl PageServerConfigBuilder {
458 12 : pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
459 12 : self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
460 12 : }
461 :
462 12 : pub fn listen_http_addr(&mut self, listen_http_addr: String) {
463 12 : self.listen_http_addr = BuilderValue::Set(listen_http_addr)
464 12 : }
465 :
466 0 : pub fn availability_zone(&mut self, availability_zone: Option<String>) {
467 0 : self.availability_zone = BuilderValue::Set(availability_zone)
468 0 : }
469 :
470 12 : pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
471 12 : self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
472 12 : }
473 :
474 12 : pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
475 12 : self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
476 12 : }
477 :
478 12 : pub fn superuser(&mut self, superuser: String) {
479 12 : self.superuser = BuilderValue::Set(superuser)
480 12 : }
481 :
482 12 : pub fn page_cache_size(&mut self, page_cache_size: usize) {
483 12 : self.page_cache_size = BuilderValue::Set(page_cache_size)
484 12 : }
485 :
486 12 : pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
487 12 : self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
488 12 : }
489 :
490 18 : pub fn workdir(&mut self, workdir: Utf8PathBuf) {
491 18 : self.workdir = BuilderValue::Set(workdir)
492 18 : }
493 :
494 18 : pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
495 18 : self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
496 18 : }
497 :
498 0 : pub fn http_auth_type(&mut self, auth_type: AuthType) {
499 0 : self.http_auth_type = BuilderValue::Set(auth_type)
500 0 : }
501 :
502 0 : pub fn pg_auth_type(&mut self, auth_type: AuthType) {
503 0 : self.pg_auth_type = BuilderValue::Set(auth_type)
504 0 : }
505 :
506 0 : pub fn auth_validation_public_key_path(
507 0 : &mut self,
508 0 : auth_validation_public_key_path: Option<Utf8PathBuf>,
509 0 : ) {
510 0 : self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
511 0 : }
512 :
513 8 : pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
514 8 : self.remote_storage_config = BuilderValue::Set(remote_storage_config)
515 8 : }
516 :
517 14 : pub fn broker_endpoint(&mut self, broker_endpoint: Uri) {
518 14 : self.broker_endpoint = BuilderValue::Set(broker_endpoint)
519 14 : }
520 :
521 0 : pub fn broker_keepalive_interval(&mut self, broker_keepalive_interval: Duration) {
522 0 : self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
523 0 : }
524 :
525 18 : pub fn id(&mut self, node_id: NodeId) {
526 18 : self.id = BuilderValue::Set(node_id)
527 18 : }
528 :
529 12 : pub fn log_format(&mut self, log_format: LogFormat) {
530 12 : self.log_format = BuilderValue::Set(log_format)
531 12 : }
532 :
533 0 : pub fn concurrent_tenant_warmup(&mut self, u: NonZeroUsize) {
534 0 : self.concurrent_tenant_warmup = BuilderValue::Set(u);
535 0 : }
536 :
537 0 : pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
538 0 : self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
539 0 : }
540 :
541 16 : pub fn metric_collection_interval(&mut self, metric_collection_interval: Duration) {
542 16 : self.metric_collection_interval = BuilderValue::Set(metric_collection_interval)
543 16 : }
544 :
545 12 : pub fn cached_metric_collection_interval(
546 12 : &mut self,
547 12 : cached_metric_collection_interval: Duration,
548 12 : ) {
549 12 : self.cached_metric_collection_interval =
550 12 : BuilderValue::Set(cached_metric_collection_interval)
551 12 : }
552 :
553 16 : pub fn metric_collection_endpoint(&mut self, metric_collection_endpoint: Option<Url>) {
554 16 : self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
555 16 : }
556 :
557 12 : pub fn synthetic_size_calculation_interval(
558 12 : &mut self,
559 12 : synthetic_size_calculation_interval: Duration,
560 12 : ) {
561 12 : self.synthetic_size_calculation_interval =
562 12 : BuilderValue::Set(synthetic_size_calculation_interval)
563 12 : }
564 :
565 0 : pub fn test_remote_failures(&mut self, fail_first: u64) {
566 0 : self.test_remote_failures = BuilderValue::Set(fail_first);
567 0 : }
568 :
569 2 : pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
570 2 : self.disk_usage_based_eviction = BuilderValue::Set(value);
571 2 : }
572 :
573 0 : pub fn ondemand_download_behavior_treat_error_as_warn(
574 0 : &mut self,
575 0 : ondemand_download_behavior_treat_error_as_warn: bool,
576 0 : ) {
577 0 : self.ondemand_download_behavior_treat_error_as_warn =
578 0 : BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
579 0 : }
580 :
581 12 : pub fn background_task_maximum_delay(&mut self, delay: Duration) {
582 12 : self.background_task_maximum_delay = BuilderValue::Set(delay);
583 12 : }
584 :
585 0 : pub fn control_plane_api(&mut self, api: Option<Url>) {
586 0 : self.control_plane_api = BuilderValue::Set(api)
587 0 : }
588 :
589 0 : pub fn control_plane_api_token(&mut self, token: Option<SecretString>) {
590 0 : self.control_plane_api_token = BuilderValue::Set(token)
591 0 : }
592 :
593 0 : pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
594 0 : self.control_plane_emergency_mode = BuilderValue::Set(enabled)
595 0 : }
596 :
597 0 : pub fn heatmap_upload_concurrency(&mut self, value: usize) {
598 0 : self.heatmap_upload_concurrency = BuilderValue::Set(value)
599 0 : }
600 :
601 0 : pub fn secondary_download_concurrency(&mut self, value: usize) {
602 0 : self.secondary_download_concurrency = BuilderValue::Set(value)
603 0 : }
604 :
605 0 : pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
606 0 : self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
607 0 : }
608 :
609 0 : pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
610 0 : self.virtual_file_io_engine = BuilderValue::Set(value);
611 0 : }
612 :
613 0 : pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) {
614 0 : self.get_vectored_impl = BuilderValue::Set(value);
615 0 : }
616 :
617 0 : pub fn get_max_vectored_read_bytes(&mut self, value: MaxVectoredReadBytes) {
618 0 : self.max_vectored_read_bytes = BuilderValue::Set(value);
619 0 : }
620 :
621 0 : pub fn get_validate_vectored_get(&mut self, value: bool) {
622 0 : self.validate_vectored_get = BuilderValue::Set(value);
623 0 : }
624 :
625 18 : pub fn build(self) -> anyhow::Result<PageServerConf> {
626 18 : let concurrent_tenant_warmup = self
627 18 : .concurrent_tenant_warmup
628 18 : .ok_or(anyhow!("missing concurrent_tenant_warmup"))?;
629 18 : let concurrent_tenant_size_logical_size_queries = self
630 18 : .concurrent_tenant_size_logical_size_queries
631 18 : .ok_or(anyhow!(
632 18 : "missing concurrent_tenant_size_logical_size_queries"
633 18 : ))?;
634 : Ok(PageServerConf {
635 18 : listen_pg_addr: self
636 18 : .listen_pg_addr
637 18 : .ok_or(anyhow!("missing listen_pg_addr"))?,
638 18 : listen_http_addr: self
639 18 : .listen_http_addr
640 18 : .ok_or(anyhow!("missing listen_http_addr"))?,
641 18 : availability_zone: self
642 18 : .availability_zone
643 18 : .ok_or(anyhow!("missing availability_zone"))?,
644 18 : wait_lsn_timeout: self
645 18 : .wait_lsn_timeout
646 18 : .ok_or(anyhow!("missing wait_lsn_timeout"))?,
647 18 : wal_redo_timeout: self
648 18 : .wal_redo_timeout
649 18 : .ok_or(anyhow!("missing wal_redo_timeout"))?,
650 18 : superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
651 18 : page_cache_size: self
652 18 : .page_cache_size
653 18 : .ok_or(anyhow!("missing page_cache_size"))?,
654 18 : max_file_descriptors: self
655 18 : .max_file_descriptors
656 18 : .ok_or(anyhow!("missing max_file_descriptors"))?,
657 18 : workdir: self.workdir.ok_or(anyhow!("missing workdir"))?,
658 18 : pg_distrib_dir: self
659 18 : .pg_distrib_dir
660 18 : .ok_or(anyhow!("missing pg_distrib_dir"))?,
661 18 : http_auth_type: self
662 18 : .http_auth_type
663 18 : .ok_or(anyhow!("missing http_auth_type"))?,
664 18 : pg_auth_type: self.pg_auth_type.ok_or(anyhow!("missing pg_auth_type"))?,
665 18 : auth_validation_public_key_path: self
666 18 : .auth_validation_public_key_path
667 18 : .ok_or(anyhow!("missing auth_validation_public_key_path"))?,
668 18 : remote_storage_config: self
669 18 : .remote_storage_config
670 18 : .ok_or(anyhow!("missing remote_storage_config"))?,
671 18 : id: self.id.ok_or(anyhow!("missing id"))?,
672 : // TenantConf is handled separately
673 18 : default_tenant_conf: TenantConf::default(),
674 18 : broker_endpoint: self
675 18 : .broker_endpoint
676 18 : .ok_or(anyhow!("No broker endpoints provided"))?,
677 18 : broker_keepalive_interval: self
678 18 : .broker_keepalive_interval
679 18 : .ok_or(anyhow!("No broker keepalive interval provided"))?,
680 18 : log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
681 18 : concurrent_tenant_warmup: ConfigurableSemaphore::new(concurrent_tenant_warmup),
682 18 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
683 18 : concurrent_tenant_size_logical_size_queries,
684 18 : ),
685 18 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
686 18 : concurrent_tenant_size_logical_size_queries,
687 18 : ),
688 18 : metric_collection_interval: self
689 18 : .metric_collection_interval
690 18 : .ok_or(anyhow!("missing metric_collection_interval"))?,
691 18 : cached_metric_collection_interval: self
692 18 : .cached_metric_collection_interval
693 18 : .ok_or(anyhow!("missing cached_metric_collection_interval"))?,
694 18 : metric_collection_endpoint: self
695 18 : .metric_collection_endpoint
696 18 : .ok_or(anyhow!("missing metric_collection_endpoint"))?,
697 18 : synthetic_size_calculation_interval: self
698 18 : .synthetic_size_calculation_interval
699 18 : .ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
700 18 : disk_usage_based_eviction: self
701 18 : .disk_usage_based_eviction
702 18 : .ok_or(anyhow!("missing disk_usage_based_eviction"))?,
703 18 : test_remote_failures: self
704 18 : .test_remote_failures
705 18 : .ok_or(anyhow!("missing test_remote_failuers"))?,
706 18 : ondemand_download_behavior_treat_error_as_warn: self
707 18 : .ondemand_download_behavior_treat_error_as_warn
708 18 : .ok_or(anyhow!(
709 18 : "missing ondemand_download_behavior_treat_error_as_warn"
710 18 : ))?,
711 18 : background_task_maximum_delay: self
712 18 : .background_task_maximum_delay
713 18 : .ok_or(anyhow!("missing background_task_maximum_delay"))?,
714 18 : control_plane_api: self
715 18 : .control_plane_api
716 18 : .ok_or(anyhow!("missing control_plane_api"))?,
717 18 : control_plane_api_token: self
718 18 : .control_plane_api_token
719 18 : .ok_or(anyhow!("missing control_plane_api_token"))?,
720 18 : control_plane_emergency_mode: self
721 18 : .control_plane_emergency_mode
722 18 : .ok_or(anyhow!("missing control_plane_emergency_mode"))?,
723 18 : heatmap_upload_concurrency: self
724 18 : .heatmap_upload_concurrency
725 18 : .ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
726 18 : secondary_download_concurrency: self
727 18 : .secondary_download_concurrency
728 18 : .ok_or(anyhow!("missing secondary_download_concurrency"))?,
729 18 : ingest_batch_size: self
730 18 : .ingest_batch_size
731 18 : .ok_or(anyhow!("missing ingest_batch_size"))?,
732 18 : virtual_file_io_engine: self
733 18 : .virtual_file_io_engine
734 18 : .ok_or(anyhow!("missing virtual_file_io_engine"))?,
735 18 : get_vectored_impl: self
736 18 : .get_vectored_impl
737 18 : .ok_or(anyhow!("missing get_vectored_impl"))?,
738 18 : max_vectored_read_bytes: self
739 18 : .max_vectored_read_bytes
740 18 : .ok_or(anyhow!("missing max_vectored_read_bytes"))?,
741 18 : validate_vectored_get: self
742 18 : .validate_vectored_get
743 18 : .ok_or(anyhow!("missing validate_vectored_get"))?,
744 : })
745 18 : }
746 : }
747 :
748 : impl PageServerConf {
749 : //
750 : // Repository paths, relative to workdir.
751 : //
752 :
753 3220 : pub fn tenants_path(&self) -> Utf8PathBuf {
754 3220 : self.workdir.join(TENANTS_SEGMENT_NAME)
755 3220 : }
756 :
757 78 : pub fn deletion_prefix(&self) -> Utf8PathBuf {
758 78 : self.workdir.join("deletion")
759 78 : }
760 :
761 30 : pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
762 30 : // Encode a version in the filename, so that if we ever switch away from JSON we can
763 30 : // increment this.
764 30 : const VERSION: u8 = 1;
765 30 :
766 30 : self.deletion_prefix()
767 30 : .join(format!("{sequence:016x}-{VERSION:02x}.list"))
768 30 : }
769 :
770 26 : pub fn deletion_header_path(&self) -> Utf8PathBuf {
771 26 : // Encode a version in the filename, so that if we ever switch away from JSON we can
772 26 : // increment this.
773 26 : const VERSION: u8 = 1;
774 26 :
775 26 : self.deletion_prefix().join(format!("header-{VERSION:02x}"))
776 26 : }
777 :
778 3220 : pub fn tenant_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
779 3220 : self.tenants_path().join(tenant_shard_id.to_string())
780 3220 : }
781 :
782 0 : pub fn tenant_ignore_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
783 0 : self.tenant_path(tenant_shard_id)
784 0 : .join(IGNORED_TENANT_FILE_NAME)
785 0 : }
786 :
787 : /// Points to a place in pageserver's local directory,
788 : /// where certain tenant's tenantconf file should be located.
789 : ///
790 : /// Legacy: superseded by tenant_location_config_path. Eventually
791 : /// remove this function.
792 0 : pub fn tenant_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
793 0 : self.tenant_path(tenant_shard_id).join(TENANT_CONFIG_NAME)
794 0 : }
795 :
796 0 : pub fn tenant_location_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
797 0 : self.tenant_path(tenant_shard_id)
798 0 : .join(TENANT_LOCATION_CONFIG_NAME)
799 0 : }
800 :
801 0 : pub(crate) fn tenant_heatmap_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
802 0 : self.tenant_path(tenant_shard_id)
803 0 : .join(TENANT_HEATMAP_BASENAME)
804 0 : }
805 :
806 3127 : pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
807 3127 : self.tenant_path(tenant_shard_id)
808 3127 : .join(TIMELINES_SEGMENT_NAME)
809 3127 : }
810 :
811 2946 : pub fn timeline_path(
812 2946 : &self,
813 2946 : tenant_shard_id: &TenantShardId,
814 2946 : timeline_id: &TimelineId,
815 2946 : ) -> Utf8PathBuf {
816 2946 : self.timelines_path(tenant_shard_id)
817 2946 : .join(timeline_id.to_string())
818 2946 : }
819 :
820 298 : pub fn timeline_uninit_mark_file_path(
821 298 : &self,
822 298 : tenant_shard_id: TenantShardId,
823 298 : timeline_id: TimelineId,
824 298 : ) -> Utf8PathBuf {
825 298 : path_with_suffix_extension(
826 298 : self.timeline_path(&tenant_shard_id, &timeline_id),
827 298 : TIMELINE_UNINIT_MARK_SUFFIX,
828 298 : )
829 298 : }
830 :
831 0 : pub fn timeline_delete_mark_file_path(
832 0 : &self,
833 0 : tenant_shard_id: TenantShardId,
834 0 : timeline_id: TimelineId,
835 0 : ) -> Utf8PathBuf {
836 0 : path_with_suffix_extension(
837 0 : self.timeline_path(&tenant_shard_id, &timeline_id),
838 0 : TIMELINE_DELETE_MARK_SUFFIX,
839 0 : )
840 0 : }
841 :
842 0 : pub fn tenant_deleted_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
843 0 : self.tenant_path(tenant_shard_id)
844 0 : .join(TENANT_DELETED_MARKER_FILE_NAME)
845 0 : }
846 :
847 0 : pub fn traces_path(&self) -> Utf8PathBuf {
848 0 : self.workdir.join("traces")
849 0 : }
850 :
851 0 : pub fn trace_path(
852 0 : &self,
853 0 : tenant_shard_id: &TenantShardId,
854 0 : timeline_id: &TimelineId,
855 0 : connection_id: &ConnectionId,
856 0 : ) -> Utf8PathBuf {
857 0 : self.traces_path()
858 0 : .join(tenant_shard_id.to_string())
859 0 : .join(timeline_id.to_string())
860 0 : .join(connection_id.to_string())
861 0 : }
862 :
863 : /// Turns storage remote path of a file into its local path.
864 0 : pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
865 0 : remote_path.with_base(&self.workdir)
866 0 : }
867 :
868 : //
869 : // Postgres distribution paths
870 : //
871 16 : pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
872 16 : let path = self.pg_distrib_dir.clone();
873 16 :
874 16 : #[allow(clippy::manual_range_patterns)]
875 16 : match pg_version {
876 16 : 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))),
877 0 : _ => bail!("Unsupported postgres version: {}", pg_version),
878 : }
879 16 : }
880 :
881 8 : pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
882 8 : Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
883 8 : }
884 8 : pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
885 8 : Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
886 8 : }
887 :
888 : /// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
889 : /// validating the input and failing on errors.
890 : ///
891 : /// This leaves any options not present in the file in the built-in defaults.
892 18 : pub fn parse_and_validate(toml: &Document, workdir: &Utf8Path) -> anyhow::Result<Self> {
893 18 : let mut builder = PageServerConfigBuilder::default();
894 18 : builder.workdir(workdir.to_owned());
895 18 :
896 18 : let mut t_conf = TenantConfOpt::default();
897 :
898 230 : for (key, item) in toml.iter() {
899 230 : match key {
900 230 : "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
901 218 : "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
902 206 : "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
903 206 : "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
904 194 : "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
905 182 : "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
906 170 : "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
907 158 : "max_file_descriptors" => {
908 12 : builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
909 : }
910 146 : "pg_distrib_dir" => {
911 18 : builder.pg_distrib_dir(Utf8PathBuf::from(parse_toml_string(key, item)?))
912 : }
913 128 : "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
914 0 : Utf8PathBuf::from(parse_toml_string(key, item)?),
915 : )),
916 128 : "http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
917 128 : "pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
918 128 : "remote_storage" => {
919 8 : builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
920 : }
921 120 : "tenant_config" => {
922 6 : t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
923 : }
924 114 : "id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
925 96 : "broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
926 82 : "broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
927 82 : "log_format" => builder.log_format(
928 12 : LogFormat::from_config(&parse_toml_string(key, item)?)?
929 : ),
930 70 : "concurrent_tenant_warmup" => builder.concurrent_tenant_warmup({
931 0 : let input = parse_toml_string(key, item)?;
932 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
933 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
934 : }),
935 70 : "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
936 0 : let input = parse_toml_string(key, item)?;
937 0 : let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
938 0 : NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
939 : }),
940 70 : "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
941 54 : "cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
942 42 : "metric_collection_endpoint" => {
943 16 : let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
944 16 : builder.metric_collection_endpoint(Some(endpoint));
945 : },
946 26 : "synthetic_size_calculation_interval" =>
947 12 : builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
948 14 : "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
949 14 : "disk_usage_based_eviction" => {
950 2 : tracing::info!("disk_usage_based_eviction: {:#?}", &item);
951 2 : builder.disk_usage_based_eviction(
952 2 : deserialize_from_item("disk_usage_based_eviction", item)
953 2 : .context("parse disk_usage_based_eviction")?
954 : )
955 : },
956 12 : "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
957 12 : "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
958 0 : "control_plane_api" => {
959 0 : let parsed = parse_toml_string(key, item)?;
960 0 : if parsed.is_empty() {
961 0 : builder.control_plane_api(None)
962 : } else {
963 0 : builder.control_plane_api(Some(parsed.parse().context("failed to parse control plane URL")?))
964 : }
965 : },
966 0 : "control_plane_api_token" => {
967 0 : let parsed = parse_toml_string(key, item)?;
968 0 : if parsed.is_empty() {
969 0 : builder.control_plane_api_token(None)
970 : } else {
971 0 : builder.control_plane_api_token(Some(parsed.into()))
972 : }
973 : },
974 0 : "control_plane_emergency_mode" => {
975 0 : builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
976 : },
977 0 : "heatmap_upload_concurrency" => {
978 0 : builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
979 : },
980 0 : "secondary_download_concurrency" => {
981 0 : builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
982 : },
983 0 : "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
984 0 : "virtual_file_io_engine" => {
985 0 : builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
986 : }
987 0 : "get_vectored_impl" => {
988 0 : builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
989 : }
990 0 : "max_vectored_read_bytes" => {
991 0 : let bytes = parse_toml_u64("max_vectored_read_bytes", item)? as usize;
992 0 : builder.get_max_vectored_read_bytes(
993 0 : MaxVectoredReadBytes(
994 0 : NonZeroUsize::new(bytes).expect("Max byte size of vectored read must be greater than 0")))
995 : }
996 0 : "validate_vectored_get" => {
997 0 : builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
998 : }
999 0 : _ => bail!("unrecognized pageserver option '{key}'"),
1000 : }
1001 : }
1002 :
1003 18 : let mut conf = builder.build().context("invalid config")?;
1004 :
1005 18 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
1006 0 : let auth_validation_public_key_path = conf
1007 0 : .auth_validation_public_key_path
1008 0 : .get_or_insert_with(|| workdir.join("auth_public_key.pem"));
1009 0 : ensure!(
1010 0 : auth_validation_public_key_path.exists(),
1011 0 : format!(
1012 0 : "Can't find auth_validation_public_key at '{auth_validation_public_key_path}'",
1013 0 : )
1014 : );
1015 18 : }
1016 :
1017 18 : conf.default_tenant_conf = t_conf.merge(TenantConf::default());
1018 18 :
1019 18 : Ok(conf)
1020 18 : }
1021 :
1022 : #[cfg(test)]
1023 101 : pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
1024 101 : let test_output_dir = std::env::var("TEST_OUTPUT").unwrap_or("../tmp_check".into());
1025 101 : Utf8PathBuf::from(format!("{test_output_dir}/test_{test_name}"))
1026 101 : }
1027 :
1028 97 : pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
1029 97 : let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
1030 97 :
1031 97 : PageServerConf {
1032 97 : id: NodeId(0),
1033 97 : wait_lsn_timeout: Duration::from_secs(60),
1034 97 : wal_redo_timeout: Duration::from_secs(60),
1035 97 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1036 97 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1037 97 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1038 97 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1039 97 : availability_zone: None,
1040 97 : superuser: "cloud_admin".to_string(),
1041 97 : workdir: repo_dir,
1042 97 : pg_distrib_dir,
1043 97 : http_auth_type: AuthType::Trust,
1044 97 : pg_auth_type: AuthType::Trust,
1045 97 : auth_validation_public_key_path: None,
1046 97 : remote_storage_config: None,
1047 97 : default_tenant_conf: TenantConf::default(),
1048 97 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1049 97 : broker_keepalive_interval: Duration::from_secs(5000),
1050 97 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1051 97 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1052 97 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
1053 97 : .expect("Invalid default constant"),
1054 97 : ),
1055 97 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1056 97 : eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
1057 97 : ),
1058 97 : metric_collection_interval: Duration::from_secs(60),
1059 97 : cached_metric_collection_interval: Duration::from_secs(60 * 60),
1060 97 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1061 97 : synthetic_size_calculation_interval: Duration::from_secs(60),
1062 97 : disk_usage_based_eviction: None,
1063 97 : test_remote_failures: 0,
1064 97 : ondemand_download_behavior_treat_error_as_warn: false,
1065 97 : background_task_maximum_delay: Duration::ZERO,
1066 97 : control_plane_api: None,
1067 97 : control_plane_api_token: None,
1068 97 : control_plane_emergency_mode: false,
1069 97 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1070 97 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1071 97 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1072 97 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1073 97 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1074 97 : max_vectored_read_bytes: MaxVectoredReadBytes(
1075 97 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1076 97 : .expect("Invalid default constant"),
1077 97 : ),
1078 97 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1079 97 : }
1080 97 : }
1081 : }
1082 :
1083 : // Helper functions to parse a toml Item
1084 :
1085 96 : fn parse_toml_string(name: &str, item: &Item) -> Result<String> {
1086 96 : let s = item
1087 96 : .as_str()
1088 96 : .with_context(|| format!("configure option {name} is not a string"))?;
1089 96 : Ok(s.to_string())
1090 96 : }
1091 :
1092 42 : fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
1093 : // A toml integer is signed, so it cannot represent the full range of an u64. That's OK
1094 : // for our use, though.
1095 42 : let i: i64 = item
1096 42 : .as_integer()
1097 42 : .with_context(|| format!("configure option {name} is not an integer"))?;
1098 42 : if i < 0 {
1099 0 : bail!("configure option {name} cannot be negative");
1100 42 : }
1101 42 : Ok(i as u64)
1102 42 : }
1103 :
1104 0 : fn parse_toml_bool(name: &str, item: &Item) -> Result<bool> {
1105 0 : item.as_bool()
1106 0 : .with_context(|| format!("configure option {name} is not a bool"))
1107 0 : }
1108 :
1109 76 : fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
1110 76 : let s = item
1111 76 : .as_str()
1112 76 : .with_context(|| format!("configure option {name} is not a string"))?;
1113 :
1114 76 : Ok(humantime::parse_duration(s)?)
1115 76 : }
1116 :
1117 0 : fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
1118 0 : where
1119 0 : T: FromStr,
1120 0 : <T as FromStr>::Err: std::fmt::Display,
1121 0 : {
1122 0 : let v = item
1123 0 : .as_str()
1124 0 : .with_context(|| format!("configure option {name} is not a string"))?;
1125 0 : T::from_str(v).map_err(|e| {
1126 0 : anyhow!(
1127 0 : "Failed to parse string as {parse_type} for configure option {name}: {e}",
1128 0 : parse_type = stringify!(T)
1129 0 : )
1130 0 : })
1131 0 : }
1132 :
1133 2 : fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
1134 2 : where
1135 2 : T: serde::de::DeserializeOwned,
1136 2 : {
1137 : // ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
1138 2 : let deserializer = match item.clone().into_value() {
1139 2 : Ok(value) => value.into_deserializer(),
1140 0 : Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
1141 : };
1142 2 : T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
1143 2 : }
1144 :
1145 : /// Configurable semaphore permits setting.
1146 : ///
1147 : /// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
1148 : /// semaphore cannot be distinguished, leading any feature using these to await forever (or until
1149 : /// new permits are added).
1150 0 : #[derive(Debug, Clone)]
1151 : pub struct ConfigurableSemaphore {
1152 : initial_permits: NonZeroUsize,
1153 : inner: std::sync::Arc<tokio::sync::Semaphore>,
1154 : }
1155 :
1156 : impl ConfigurableSemaphore {
1157 : pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
1158 : Some(x) => x,
1159 : None => panic!("const unwrap is not yet stable"),
1160 : };
1161 :
1162 : /// Initializse using a non-zero amount of permits.
1163 : ///
1164 : /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
1165 : /// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
1166 : /// behave like [`futures::future::pending`], just waiting until new permits are added.
1167 : ///
1168 : /// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
1169 357 : pub fn new(initial_permits: NonZeroUsize) -> Self {
1170 357 : ConfigurableSemaphore {
1171 357 : initial_permits,
1172 357 : inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
1173 357 : }
1174 357 : }
1175 :
1176 : /// Returns the configured amount of permits.
1177 0 : pub fn initial_permits(&self) -> NonZeroUsize {
1178 0 : self.initial_permits
1179 0 : }
1180 : }
1181 :
1182 : impl Default for ConfigurableSemaphore {
1183 202 : fn default() -> Self {
1184 202 : Self::new(Self::DEFAULT_INITIAL)
1185 202 : }
1186 : }
1187 :
1188 : impl PartialEq for ConfigurableSemaphore {
1189 12 : fn eq(&self, other: &Self) -> bool {
1190 12 : // the number of permits can be increased at runtime, so we cannot really fulfill the
1191 12 : // PartialEq value equality otherwise
1192 12 : self.initial_permits == other.initial_permits
1193 12 : }
1194 : }
1195 :
1196 : impl Eq for ConfigurableSemaphore {}
1197 :
1198 : impl ConfigurableSemaphore {
1199 0 : pub fn inner(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
1200 0 : &self.inner
1201 0 : }
1202 : }
1203 :
1204 : #[cfg(test)]
1205 : mod tests {
1206 : use std::{
1207 : fs,
1208 : num::{NonZeroU32, NonZeroUsize},
1209 : };
1210 :
1211 : use camino_tempfile::{tempdir, Utf8TempDir};
1212 : use pageserver_api::models::EvictionPolicy;
1213 : use remote_storage::{RemoteStorageKind, S3Config};
1214 : use utils::serde_percent::Percent;
1215 :
1216 : use super::*;
1217 : use crate::DEFAULT_PG_VERSION;
1218 :
1219 : const ALL_BASE_VALUES_TOML: &str = r#"
1220 : # Initial configuration file created by 'pageserver --init'
1221 :
1222 : listen_pg_addr = '127.0.0.1:64000'
1223 : listen_http_addr = '127.0.0.1:9898'
1224 :
1225 : wait_lsn_timeout = '111 s'
1226 : wal_redo_timeout = '111 s'
1227 :
1228 : page_cache_size = 444
1229 : max_file_descriptors = 333
1230 :
1231 : # initial superuser role name to use when creating a new tenant
1232 : initial_superuser_name = 'zzzz'
1233 : id = 10
1234 :
1235 : metric_collection_interval = '222 s'
1236 : cached_metric_collection_interval = '22200 s'
1237 : metric_collection_endpoint = 'http://localhost:80/metrics'
1238 : synthetic_size_calculation_interval = '333 s'
1239 :
1240 : log_format = 'json'
1241 : background_task_maximum_delay = '334 s'
1242 :
1243 : "#;
1244 :
1245 2 : #[test]
1246 2 : fn parse_defaults() -> anyhow::Result<()> {
1247 2 : let tempdir = tempdir()?;
1248 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1249 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1250 2 : // we have to create dummy values to overcome the validation errors
1251 2 : let config_string = format!(
1252 2 : "pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
1253 2 : );
1254 2 : let toml = config_string.parse()?;
1255 :
1256 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1257 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1258 2 :
1259 2 : assert_eq!(
1260 2 : parsed_config,
1261 2 : PageServerConf {
1262 2 : id: NodeId(10),
1263 2 : listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
1264 2 : listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
1265 2 : availability_zone: None,
1266 2 : wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
1267 2 : wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
1268 2 : superuser: defaults::DEFAULT_SUPERUSER.to_string(),
1269 2 : page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
1270 2 : max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
1271 2 : workdir,
1272 2 : pg_distrib_dir,
1273 2 : http_auth_type: AuthType::Trust,
1274 2 : pg_auth_type: AuthType::Trust,
1275 2 : auth_validation_public_key_path: None,
1276 2 : remote_storage_config: None,
1277 2 : default_tenant_conf: TenantConf::default(),
1278 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1279 2 : broker_keepalive_interval: humantime::parse_duration(
1280 2 : storage_broker::DEFAULT_KEEPALIVE_INTERVAL
1281 2 : )?,
1282 2 : log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
1283 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1284 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1285 2 : ),
1286 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1287 2 : eviction_task_immitated_concurrent_logical_size_queries:
1288 2 : ConfigurableSemaphore::default(),
1289 2 : metric_collection_interval: humantime::parse_duration(
1290 2 : defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
1291 2 : )?,
1292 2 : cached_metric_collection_interval: humantime::parse_duration(
1293 2 : defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
1294 2 : )?,
1295 2 : metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
1296 2 : synthetic_size_calculation_interval: humantime::parse_duration(
1297 2 : defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
1298 2 : )?,
1299 2 : disk_usage_based_eviction: None,
1300 2 : test_remote_failures: 0,
1301 2 : ondemand_download_behavior_treat_error_as_warn: false,
1302 2 : background_task_maximum_delay: humantime::parse_duration(
1303 2 : defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
1304 2 : )?,
1305 2 : control_plane_api: None,
1306 2 : control_plane_api_token: None,
1307 2 : control_plane_emergency_mode: false,
1308 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1309 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1310 2 : ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
1311 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1312 2 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1313 2 : max_vectored_read_bytes: MaxVectoredReadBytes(
1314 2 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1315 2 : .expect("Invalid default constant")
1316 2 : ),
1317 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1318 : },
1319 0 : "Correct defaults should be used when no config values are provided"
1320 : );
1321 :
1322 2 : Ok(())
1323 2 : }
1324 :
1325 2 : #[test]
1326 2 : fn parse_basic_config() -> anyhow::Result<()> {
1327 2 : let tempdir = tempdir()?;
1328 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1329 2 : let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
1330 2 :
1331 2 : let config_string = format!(
1332 2 : "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",
1333 2 : );
1334 2 : let toml = config_string.parse()?;
1335 :
1336 2 : let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
1337 2 : .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
1338 2 :
1339 2 : assert_eq!(
1340 2 : parsed_config,
1341 2 : PageServerConf {
1342 2 : id: NodeId(10),
1343 2 : listen_pg_addr: "127.0.0.1:64000".to_string(),
1344 2 : listen_http_addr: "127.0.0.1:9898".to_string(),
1345 2 : availability_zone: None,
1346 2 : wait_lsn_timeout: Duration::from_secs(111),
1347 2 : wal_redo_timeout: Duration::from_secs(111),
1348 2 : superuser: "zzzz".to_string(),
1349 2 : page_cache_size: 444,
1350 2 : max_file_descriptors: 333,
1351 2 : workdir,
1352 2 : pg_distrib_dir,
1353 2 : http_auth_type: AuthType::Trust,
1354 2 : pg_auth_type: AuthType::Trust,
1355 2 : auth_validation_public_key_path: None,
1356 2 : remote_storage_config: None,
1357 2 : default_tenant_conf: TenantConf::default(),
1358 2 : broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
1359 2 : broker_keepalive_interval: Duration::from_secs(5),
1360 2 : log_format: LogFormat::Json,
1361 2 : concurrent_tenant_warmup: ConfigurableSemaphore::new(
1362 2 : NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap()
1363 2 : ),
1364 2 : concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
1365 2 : eviction_task_immitated_concurrent_logical_size_queries:
1366 2 : ConfigurableSemaphore::default(),
1367 2 : metric_collection_interval: Duration::from_secs(222),
1368 2 : cached_metric_collection_interval: Duration::from_secs(22200),
1369 2 : metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
1370 2 : synthetic_size_calculation_interval: Duration::from_secs(333),
1371 2 : disk_usage_based_eviction: None,
1372 2 : test_remote_failures: 0,
1373 2 : ondemand_download_behavior_treat_error_as_warn: false,
1374 2 : background_task_maximum_delay: Duration::from_secs(334),
1375 2 : control_plane_api: None,
1376 2 : control_plane_api_token: None,
1377 2 : control_plane_emergency_mode: false,
1378 2 : heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
1379 2 : secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
1380 2 : ingest_batch_size: 100,
1381 2 : virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
1382 2 : get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
1383 2 : max_vectored_read_bytes: MaxVectoredReadBytes(
1384 2 : NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
1385 2 : .expect("Invalid default constant")
1386 2 : ),
1387 : validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
1388 : },
1389 0 : "Should be able to parse all basic config values correctly"
1390 : );
1391 :
1392 2 : Ok(())
1393 2 : }
1394 :
1395 2 : #[test]
1396 2 : fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
1397 2 : let tempdir = tempdir()?;
1398 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1399 2 : let broker_endpoint = "http://127.0.0.1:7777";
1400 2 :
1401 2 : let local_storage_path = tempdir.path().join("local_remote_storage");
1402 2 :
1403 2 : let identical_toml_declarations = &[
1404 2 : format!(
1405 2 : r#"[remote_storage]
1406 2 : local_path = '{local_storage_path}'"#,
1407 2 : ),
1408 2 : format!("remote_storage={{local_path='{local_storage_path}'}}"),
1409 2 : ];
1410 :
1411 6 : for remote_storage_config_str in identical_toml_declarations {
1412 4 : let config_string = format!(
1413 4 : r#"{ALL_BASE_VALUES_TOML}
1414 4 : pg_distrib_dir='{pg_distrib_dir}'
1415 4 : broker_endpoint = '{broker_endpoint}'
1416 4 :
1417 4 : {remote_storage_config_str}"#,
1418 4 : );
1419 :
1420 4 : let toml = config_string.parse()?;
1421 :
1422 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1423 4 : .unwrap_or_else(|e| {
1424 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1425 4 : })
1426 4 : .remote_storage_config
1427 4 : .expect("Should have remote storage config for the local FS");
1428 4 :
1429 4 : assert_eq!(
1430 4 : parsed_remote_storage_config,
1431 4 : RemoteStorageConfig {
1432 4 : storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
1433 4 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
1434 4 : },
1435 0 : "Remote storage config should correctly parse the local FS config and fill other storage defaults"
1436 : );
1437 : }
1438 2 : Ok(())
1439 2 : }
1440 :
1441 2 : #[test]
1442 2 : fn parse_remote_s3_storage_config() -> anyhow::Result<()> {
1443 2 : let tempdir = tempdir()?;
1444 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1445 :
1446 2 : let bucket_name = "some-sample-bucket".to_string();
1447 2 : let bucket_region = "eu-north-1".to_string();
1448 2 : let prefix_in_bucket = "test_prefix".to_string();
1449 2 : let endpoint = "http://localhost:5000".to_string();
1450 2 : let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
1451 2 : let max_sync_errors = NonZeroU32::new(222).unwrap();
1452 2 : let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
1453 2 : let broker_endpoint = "http://127.0.0.1:7777";
1454 2 :
1455 2 : let identical_toml_declarations = &[
1456 2 : format!(
1457 2 : r#"[remote_storage]
1458 2 : max_concurrent_syncs = {max_concurrent_syncs}
1459 2 : max_sync_errors = {max_sync_errors}
1460 2 : bucket_name = '{bucket_name}'
1461 2 : bucket_region = '{bucket_region}'
1462 2 : prefix_in_bucket = '{prefix_in_bucket}'
1463 2 : endpoint = '{endpoint}'
1464 2 : concurrency_limit = {s3_concurrency_limit}"#
1465 2 : ),
1466 2 : format!(
1467 2 : "remote_storage={{max_concurrent_syncs={max_concurrent_syncs}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\
1468 2 : bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}",
1469 2 : ),
1470 2 : ];
1471 :
1472 6 : for remote_storage_config_str in identical_toml_declarations {
1473 4 : let config_string = format!(
1474 4 : r#"{ALL_BASE_VALUES_TOML}
1475 4 : pg_distrib_dir='{pg_distrib_dir}'
1476 4 : broker_endpoint = '{broker_endpoint}'
1477 4 :
1478 4 : {remote_storage_config_str}"#,
1479 4 : );
1480 :
1481 4 : let toml = config_string.parse()?;
1482 :
1483 4 : let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
1484 4 : .unwrap_or_else(|e| {
1485 0 : panic!("Failed to parse config '{config_string}', reason: {e:?}")
1486 4 : })
1487 4 : .remote_storage_config
1488 4 : .expect("Should have remote storage config for S3");
1489 4 :
1490 4 : assert_eq!(
1491 4 : parsed_remote_storage_config,
1492 4 : RemoteStorageConfig {
1493 4 : storage: RemoteStorageKind::AwsS3(S3Config {
1494 4 : bucket_name: bucket_name.clone(),
1495 4 : bucket_region: bucket_region.clone(),
1496 4 : prefix_in_bucket: Some(prefix_in_bucket.clone()),
1497 4 : endpoint: Some(endpoint.clone()),
1498 4 : concurrency_limit: s3_concurrency_limit,
1499 4 : max_keys_per_list_response: None,
1500 4 : }),
1501 4 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
1502 4 : },
1503 0 : "Remote storage config should correctly parse the S3 config"
1504 : );
1505 : }
1506 2 : Ok(())
1507 2 : }
1508 :
1509 2 : #[test]
1510 2 : fn parse_tenant_config() -> anyhow::Result<()> {
1511 2 : let tempdir = tempdir()?;
1512 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1513 :
1514 2 : let broker_endpoint = "http://127.0.0.1:7777";
1515 2 : let trace_read_requests = true;
1516 2 :
1517 2 : let config_string = format!(
1518 2 : r#"{ALL_BASE_VALUES_TOML}
1519 2 : pg_distrib_dir='{pg_distrib_dir}'
1520 2 : broker_endpoint = '{broker_endpoint}'
1521 2 :
1522 2 : [tenant_config]
1523 2 : trace_read_requests = {trace_read_requests}"#,
1524 2 : );
1525 :
1526 2 : let toml = config_string.parse()?;
1527 :
1528 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1529 2 : assert_eq!(
1530 : conf.default_tenant_conf.trace_read_requests, trace_read_requests,
1531 0 : "Tenant config from pageserver config file should be parsed and udpated values used as defaults for all tenants",
1532 : );
1533 :
1534 2 : Ok(())
1535 2 : }
1536 :
1537 2 : #[test]
1538 2 : fn parse_incorrect_tenant_config() -> anyhow::Result<()> {
1539 2 : let config_string = r#"
1540 2 : [tenant_config]
1541 2 : checkpoint_distance = -1 # supposed to be an u64
1542 2 : "#
1543 2 : .to_string();
1544 :
1545 2 : let toml: Document = config_string.parse()?;
1546 2 : let item = toml.get("tenant_config").unwrap();
1547 2 : let error = TenantConfOpt::try_from(item.to_owned()).unwrap_err();
1548 2 :
1549 2 : let expected_error_str = "checkpoint_distance: invalid value: integer `-1`, expected u64";
1550 2 : assert_eq!(error.to_string(), expected_error_str);
1551 :
1552 2 : Ok(())
1553 2 : }
1554 :
1555 2 : #[test]
1556 2 : fn parse_override_tenant_config() -> anyhow::Result<()> {
1557 2 : let config_string = r#"tenant_config={ min_resident_size_override = 400 }"#.to_string();
1558 :
1559 2 : let toml: Document = config_string.parse()?;
1560 2 : let item = toml.get("tenant_config").unwrap();
1561 2 : let conf = TenantConfOpt::try_from(item.to_owned()).unwrap();
1562 2 :
1563 2 : assert_eq!(conf.min_resident_size_override, Some(400));
1564 :
1565 2 : Ok(())
1566 2 : }
1567 :
1568 2 : #[test]
1569 2 : fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
1570 2 : let tempdir = tempdir()?;
1571 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
1572 :
1573 2 : let pageserver_conf_toml = format!(
1574 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1575 2 : metric_collection_endpoint = "http://sample.url"
1576 2 : metric_collection_interval = "10min"
1577 2 : id = 222
1578 2 :
1579 2 : [disk_usage_based_eviction]
1580 2 : max_usage_pct = 80
1581 2 : min_avail_bytes = 0
1582 2 : period = "10s"
1583 2 :
1584 2 : [tenant_config]
1585 2 : evictions_low_residence_duration_metric_threshold = "20m"
1586 2 :
1587 2 : [tenant_config.eviction_policy]
1588 2 : kind = "LayerAccessThreshold"
1589 2 : period = "20m"
1590 2 : threshold = "20m"
1591 2 : "#,
1592 2 : );
1593 2 : let toml: Document = pageserver_conf_toml.parse()?;
1594 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
1595 :
1596 2 : assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
1597 2 : assert_eq!(
1598 2 : conf.metric_collection_endpoint,
1599 2 : Some("http://sample.url".parse().unwrap())
1600 2 : );
1601 2 : assert_eq!(
1602 2 : conf.metric_collection_interval,
1603 2 : Duration::from_secs(10 * 60)
1604 2 : );
1605 2 : assert_eq!(
1606 2 : conf.default_tenant_conf
1607 2 : .evictions_low_residence_duration_metric_threshold,
1608 2 : Duration::from_secs(20 * 60)
1609 2 : );
1610 2 : assert_eq!(conf.id, NodeId(222));
1611 2 : assert_eq!(
1612 2 : conf.disk_usage_based_eviction,
1613 2 : Some(DiskUsageEvictionTaskConfig {
1614 2 : max_usage_pct: Percent::new(80).unwrap(),
1615 2 : min_avail_bytes: 0,
1616 2 : period: Duration::from_secs(10),
1617 2 : #[cfg(feature = "testing")]
1618 2 : mock_statvfs: None,
1619 2 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder::AbsoluteAccessed,
1620 2 : })
1621 2 : );
1622 :
1623 2 : match &conf.default_tenant_conf.eviction_policy {
1624 2 : EvictionPolicy::LayerAccessThreshold(eviction_threshold) => {
1625 2 : assert_eq!(eviction_threshold.period, Duration::from_secs(20 * 60));
1626 2 : assert_eq!(eviction_threshold.threshold, Duration::from_secs(20 * 60));
1627 : }
1628 0 : other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
1629 : }
1630 :
1631 2 : Ok(())
1632 2 : }
1633 :
1634 2 : #[test]
1635 2 : fn parse_imitation_only_pageserver_config() {
1636 2 : let tempdir = tempdir().unwrap();
1637 2 : let (workdir, pg_distrib_dir) = prepare_fs(&tempdir).unwrap();
1638 2 :
1639 2 : let pageserver_conf_toml = format!(
1640 2 : r#"pg_distrib_dir = "{pg_distrib_dir}"
1641 2 : metric_collection_endpoint = "http://sample.url"
1642 2 : metric_collection_interval = "10min"
1643 2 : id = 222
1644 2 :
1645 2 : [tenant_config]
1646 2 : evictions_low_residence_duration_metric_threshold = "20m"
1647 2 :
1648 2 : [tenant_config.eviction_policy]
1649 2 : kind = "OnlyImitiate"
1650 2 : period = "20m"
1651 2 : threshold = "20m"
1652 2 : "#,
1653 2 : );
1654 2 : let toml: Document = pageserver_conf_toml.parse().unwrap();
1655 2 : let conf = PageServerConf::parse_and_validate(&toml, &workdir).unwrap();
1656 2 :
1657 2 : match &conf.default_tenant_conf.eviction_policy {
1658 2 : EvictionPolicy::OnlyImitiate(t) => {
1659 2 : assert_eq!(t.period, Duration::from_secs(20 * 60));
1660 2 : assert_eq!(t.threshold, Duration::from_secs(20 * 60));
1661 : }
1662 0 : other => unreachable!("Unexpected eviction policy tenant settings: {other:?}"),
1663 : }
1664 2 : }
1665 :
1666 14 : fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
1667 14 : let tempdir_path = tempdir.path();
1668 14 :
1669 14 : let workdir = tempdir_path.join("workdir");
1670 14 : fs::create_dir_all(&workdir)?;
1671 :
1672 14 : let pg_distrib_dir = tempdir_path.join("pg_distrib");
1673 14 : let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
1674 14 : fs::create_dir_all(&pg_distrib_dir_versioned)?;
1675 14 : let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
1676 14 : fs::create_dir_all(&postgres_bin_dir)?;
1677 14 : fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;
1678 :
1679 14 : Ok((workdir, pg_distrib_dir))
1680 14 : }
1681 : }
|