Line data Source code
1 : //
2 : // Main entry point for the safekeeper executable
3 : //
4 : use anyhow::{bail, Context, Result};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use clap::{ArgAction, Parser};
7 : use futures::future::BoxFuture;
8 : use futures::stream::FuturesUnordered;
9 : use futures::{FutureExt, StreamExt};
10 : use remote_storage::RemoteStorageConfig;
11 : use sd_notify::NotifyState;
12 : use tokio::runtime::Handle;
13 : use tokio::signal::unix::{signal, SignalKind};
14 : use tokio::task::JoinError;
15 : use toml_edit::Document;
16 :
17 : use std::fs::{self, File};
18 : use std::io::{ErrorKind, Write};
19 : use std::str::FromStr;
20 : use std::sync::Arc;
21 : use std::time::Duration;
22 : use storage_broker::Uri;
23 : use tokio::sync::mpsc;
24 :
25 : use tracing::*;
26 : use utils::pid_file;
27 :
28 : use metrics::set_build_info_metric;
29 : use safekeeper::defaults::{
30 : DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
31 : DEFAULT_PG_LISTEN_ADDR,
32 : };
33 : use safekeeper::wal_service;
34 : use safekeeper::GlobalTimelines;
35 : use safekeeper::SafeKeeperConf;
36 : use safekeeper::{broker, WAL_SERVICE_RUNTIME};
37 : use safekeeper::{control_file, BROKER_RUNTIME};
38 : use safekeeper::{http, WAL_REMOVER_RUNTIME};
39 : use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
40 : use safekeeper::{wal_backup, HTTP_RUNTIME};
41 : use storage_broker::DEFAULT_ENDPOINT;
42 : use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
43 : use utils::{
44 : id::NodeId,
45 : logging::{self, LogFormat},
46 : project_build_tag, project_git_version,
47 : sentry_init::init_sentry,
48 : tcp_listener,
49 : };
50 :
51 : const PID_FILE_NAME: &str = "safekeeper.pid";
52 : const ID_FILE_NAME: &str = "safekeeper.id";
53 :
54 : project_git_version!(GIT_VERSION);
55 : project_build_tag!(BUILD_TAG);
56 :
57 : const FEATURES: &[&str] = &[
58 : #[cfg(feature = "testing")]
59 : "testing",
60 : ];
61 :
62 1305 : fn version() -> String {
63 1305 : format!(
64 1305 : "{GIT_VERSION} failpoints: {}, features: {:?}",
65 1305 : fail::has_failpoints(),
66 1305 : FEATURES,
67 1305 : )
68 1305 : }
69 :
70 : const ABOUT: &str = r#"
71 : A fleet of safekeepers is responsible for reliably storing WAL received from
72 : compute, passing it through consensus (mitigating potential computes brain
73 : split), and serving the hardened part further downstream to pageserver(s).
74 : "#;
75 :
76 1307 : #[derive(Parser)]
77 : #[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)]
78 : struct Args {
79 : /// Path to the safekeeper data directory.
80 : #[arg(short = 'D', long, default_value = "./")]
81 0 : datadir: Utf8PathBuf,
82 : /// Safekeeper node id.
83 : #[arg(long)]
84 : id: Option<u64>,
85 : /// Initialize safekeeper with given id and exit.
86 : #[arg(long)]
87 0 : init: bool,
88 : /// Listen endpoint for receiving/sending WAL in the form host:port.
89 : #[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
90 0 : listen_pg: String,
91 : /// Listen endpoint for receiving/sending WAL in the form host:port allowing
92 : /// only tenant scoped auth tokens. Pointless if auth is disabled.
93 : #[arg(long, default_value = None, verbatim_doc_comment)]
94 : listen_pg_tenant_only: Option<String>,
95 : /// Listen http endpoint for management and metrics in the form host:port.
96 : #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
97 0 : listen_http: String,
98 : /// Advertised endpoint for receiving/sending WAL in the form host:port. If not
99 : /// specified, listen_pg is used to advertise instead.
100 : #[arg(long, default_value = None)]
101 : advertise_pg: Option<String>,
102 : /// Availability zone of the safekeeper.
103 : #[arg(long)]
104 : availability_zone: Option<String>,
105 : /// Do not wait for changes to be written safely to disk. Unsafe.
106 : #[arg(short, long)]
107 0 : no_sync: bool,
108 : /// Dump control file at path specified by this argument and exit.
109 : #[arg(long)]
110 : dump_control_file: Option<Utf8PathBuf>,
111 : /// Broker endpoint for storage nodes coordination in the form
112 : /// http[s]://host:port. In case of https schema TLS is connection is
113 : /// established; plaintext otherwise.
114 : #[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
115 0 : broker_endpoint: Uri,
116 : /// Broker keepalive interval.
117 : #[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
118 0 : broker_keepalive_interval: Duration,
119 : /// Peer safekeeper is considered dead after not receiving heartbeats from
120 : /// it during this period passed as a human readable duration.
121 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
122 0 : heartbeat_timeout: Duration,
123 : /// Enable/disable peer recovery.
124 : #[arg(long, default_value = "false", action=ArgAction::Set)]
125 0 : peer_recovery: bool,
126 : /// Remote storage configuration for WAL backup (offloading to s3) as TOML
127 : /// inline table, e.g.
128 : /// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
129 : /// Safekeeper offloads WAL to
130 : /// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
131 : /// structure on the file system.
132 : #[arg(long, value_parser = parse_remote_storage, verbatim_doc_comment)]
133 : remote_storage: Option<RemoteStorageConfig>,
134 : /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
135 1307 : #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
136 0 : max_offloader_lag: u64,
137 : /// Number of max parallel WAL segments to be offloaded to remote storage.
138 : #[arg(long, default_value = "5")]
139 0 : wal_backup_parallel_jobs: usize,
140 : /// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
141 : /// WAL backup horizon.
142 : #[arg(long)]
143 0 : disable_wal_backup: bool,
144 : /// If given, enables auth on incoming connections to WAL service endpoint
145 : /// (--listen-pg). Value specifies path to a .pem public key used for
146 : /// validations of JWT tokens. Empty string is allowed and means disabling
147 : /// auth.
148 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
149 : pg_auth_public_key_path: Option<Utf8PathBuf>,
150 : /// If given, enables auth on incoming connections to tenant only WAL
151 : /// service endpoint (--listen-pg-tenant-only). Value specifies path to a
152 : /// .pem public key used for validations of JWT tokens. Empty string is
153 : /// allowed and means disabling auth.
154 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
155 : pg_tenant_only_auth_public_key_path: Option<Utf8PathBuf>,
156 : /// If given, enables auth on incoming connections to http management
157 : /// service endpoint (--listen-http). Value specifies path to a .pem public
158 : /// key used for validations of JWT tokens. Empty string is allowed and
159 : /// means disabling auth.
160 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
161 : http_auth_public_key_path: Option<Utf8PathBuf>,
162 : /// Format for logging, either 'plain' or 'json'.
163 : #[arg(long, default_value = "plain")]
164 0 : log_format: String,
165 : /// Run everything in single threaded current thread runtime, might be
166 : /// useful for debugging.
167 : #[arg(long)]
168 0 : current_thread_runtime: bool,
169 : }
170 :
171 : // Like PathBufValueParser, but allows empty string.
172 69 : fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
173 69 : Ok(Utf8PathBuf::from_str(s).unwrap())
174 69 : }
175 :
176 : #[tokio::main(flavor = "current_thread")]
177 1305 : async fn main() -> anyhow::Result<()> {
178 1305 : // We want to allow multiple occurences of the same arg (taking the last) so
179 1305 : // that neon_local could generate command with defaults + overrides without
180 1305 : // getting 'argument cannot be used multiple times' error. This seems to be
181 1305 : // impossible with pure Derive API, so convert struct to Command, modify it,
182 1305 : // parse arguments, and then fill the struct back.
183 1305 : let cmd = <Args as clap::CommandFactory>::command()
184 1305 : .args_override_self(true)
185 1305 : .version(version());
186 1305 : let mut matches = cmd.get_matches();
187 1305 : let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;
188 :
189 : // I failed to modify opt_pathbuf_parser to return Option<PathBuf> in
190 : // reasonable time, so turn empty string into option post factum.
191 1305 : if let Some(pb) = &args.pg_auth_public_key_path {
192 817 : if pb.as_os_str().is_empty() {
193 1 : args.pg_auth_public_key_path = None;
194 816 : }
195 488 : }
196 1305 : if let Some(pb) = &args.pg_tenant_only_auth_public_key_path {
197 817 : if pb.as_os_str().is_empty() {
198 0 : args.pg_tenant_only_auth_public_key_path = None;
199 817 : }
200 488 : }
201 1305 : if let Some(pb) = &args.http_auth_public_key_path {
202 817 : if pb.as_os_str().is_empty() {
203 2 : args.http_auth_public_key_path = None;
204 815 : }
205 488 : }
206 :
207 1305 : if let Some(addr) = args.dump_control_file {
208 795 : let state = control_file::FileStorage::load_control_file(addr)?;
209 795 : let json = serde_json::to_string(&state)?;
210 795 : print!("{json}");
211 795 : return Ok(());
212 510 : }
213 510 :
214 510 : // important to keep the order of:
215 510 : // 1. init logging
216 510 : // 2. tracing panic hook
217 510 : // 3. sentry
218 510 : logging::init(
219 510 : LogFormat::from_config(&args.log_format)?,
220 510 : logging::TracingErrorLayerEnablement::Disabled,
221 510 : logging::Output::Stdout,
222 0 : )?;
223 510 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
224 510 : info!("version: {GIT_VERSION}");
225 510 : info!("buld_tag: {BUILD_TAG}");
226 :
227 510 : let args_workdir = &args.datadir;
228 510 : let workdir = args_workdir.canonicalize_utf8().with_context(|| {
229 0 : format!("Failed to get the absolute path for input workdir {args_workdir:?}")
230 510 : })?;
231 :
232 : // Change into the data directory.
233 510 : std::env::set_current_dir(&workdir)?;
234 :
235 : // Set or read our ID.
236 510 : let id = set_id(&workdir, args.id.map(NodeId))?;
237 510 : if args.init {
238 0 : return Ok(());
239 510 : }
240 :
241 510 : let pg_auth = match args.pg_auth_public_key_path.as_ref() {
242 : None => {
243 489 : info!("pg auth is disabled");
244 489 : None
245 : }
246 21 : Some(path) => {
247 21 : info!("loading pg auth JWT key from {path}");
248 : Some(Arc::new(
249 21 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
250 : ))
251 : }
252 : };
253 510 : let pg_tenant_only_auth = match args.pg_tenant_only_auth_public_key_path.as_ref() {
254 : None => {
255 488 : info!("pg tenant only auth is disabled");
256 488 : None
257 : }
258 22 : Some(path) => {
259 22 : info!("loading pg tenant only auth JWT key from {path}");
260 : Some(Arc::new(
261 22 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
262 : ))
263 : }
264 : };
265 510 : let http_auth = match args.http_auth_public_key_path.as_ref() {
266 : None => {
267 490 : info!("http auth is disabled");
268 490 : None
269 : }
270 20 : Some(path) => {
271 20 : info!("loading http auth JWT key(s) from {path}");
272 20 : let jwt_auth = JwtAuth::from_key_path(path).context("failed to load the auth key")?;
273 20 : Some(Arc::new(SwappableJwtAuth::new(jwt_auth)))
274 : }
275 : };
276 :
277 510 : let conf = SafeKeeperConf {
278 510 : workdir,
279 510 : my_id: id,
280 510 : listen_pg_addr: args.listen_pg,
281 510 : listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
282 510 : listen_http_addr: args.listen_http,
283 510 : advertise_pg_addr: args.advertise_pg,
284 510 : availability_zone: args.availability_zone,
285 510 : no_sync: args.no_sync,
286 510 : broker_endpoint: args.broker_endpoint,
287 510 : broker_keepalive_interval: args.broker_keepalive_interval,
288 510 : heartbeat_timeout: args.heartbeat_timeout,
289 510 : peer_recovery_enabled: args.peer_recovery,
290 510 : remote_storage: args.remote_storage,
291 510 : max_offloader_lag_bytes: args.max_offloader_lag,
292 510 : wal_backup_enabled: !args.disable_wal_backup,
293 510 : backup_parallel_jobs: args.wal_backup_parallel_jobs,
294 510 : pg_auth,
295 510 : pg_tenant_only_auth,
296 510 : http_auth,
297 510 : current_thread_runtime: args.current_thread_runtime,
298 510 : };
299 510 :
300 510 : // initialize sentry if SENTRY_DSN is provided
301 510 : let _sentry_guard = init_sentry(
302 510 : Some(GIT_VERSION.into()),
303 510 : &[("node_id", &conf.my_id.to_string())],
304 510 : );
305 1020 : start_safekeeper(conf).await
306 : }
307 :
308 : /// Result of joining any of main tasks: upper error means task failed to
309 : /// complete, e.g. panicked, inner is error produced by task itself.
310 : type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
311 :
312 510 : async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
313 510 : // Prevent running multiple safekeepers on the same directory
314 510 : let lock_file_path = conf.workdir.join(PID_FILE_NAME);
315 510 : let lock_file =
316 510 : pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
317 510 : info!("claimed pid file at {lock_file_path:?}");
318 :
319 : // ensure that the lock file is held even if the main thread of the process is panics
320 : // we need to release the lock file only when the current process is gone
321 510 : std::mem::forget(lock_file);
322 :
323 510 : info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
324 510 : let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
325 0 : error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
326 0 : e
327 510 : })?;
328 :
329 510 : let pg_listener_tenant_only =
330 510 : if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only {
331 510 : info!(
332 510 : "starting safekeeper tenant scoped WAL service on {}",
333 510 : listen_pg_addr_tenant_only
334 510 : );
335 510 : let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
336 0 : error!(
337 0 : "failed to bind to address {}: {}",
338 0 : listen_pg_addr_tenant_only, e
339 0 : );
340 0 : e
341 510 : })?;
342 510 : Some(listener)
343 : } else {
344 0 : None
345 : };
346 :
347 510 : info!(
348 510 : "starting safekeeper HTTP service on {}",
349 510 : conf.listen_http_addr
350 510 : );
351 510 : let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
352 0 : error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
353 0 : e
354 510 : })?;
355 :
356 : // Register metrics collector for active timelines. It's important to do this
357 : // after daemonizing, otherwise process collector will be upset.
358 510 : let timeline_collector = safekeeper::metrics::TimelineCollector::new();
359 510 : metrics::register_internal(Box::new(timeline_collector))?;
360 :
361 510 : let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
362 510 :
363 510 : // Keep handles to main tasks to die if any of them disappears.
364 510 : let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
365 510 : FuturesUnordered::new();
366 510 :
367 510 : // Start wal backup launcher before loading timelines as we'll notify it
368 510 : // through the channel about timelines which need offloading, not draining
369 510 : // the channel would cause deadlock.
370 510 : let current_thread_rt = conf
371 510 : .current_thread_runtime
372 510 : .then(|| Handle::try_current().expect("no runtime in main"));
373 510 : let conf_ = conf.clone();
374 510 : let wal_backup_handle = current_thread_rt
375 510 : .as_ref()
376 510 : .unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
377 510 : .spawn(wal_backup::wal_backup_launcher_task_main(
378 510 : conf_,
379 510 : wal_backup_launcher_rx,
380 510 : ))
381 510 : .map(|res| ("WAL backup launcher".to_owned(), res));
382 510 : tasks_handles.push(Box::pin(wal_backup_handle));
383 510 :
384 510 : // Load all timelines from disk to memory.
385 510 : GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
386 :
387 510 : let conf_ = conf.clone();
388 510 : // Run everything in current thread rt, if asked.
389 510 : if conf.current_thread_runtime {
390 0 : info!("running in current thread runtime");
391 510 : }
392 :
393 510 : let wal_service_handle = current_thread_rt
394 510 : .as_ref()
395 510 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
396 510 : .spawn(wal_service::task_main(
397 510 : conf_,
398 510 : pg_listener,
399 510 : Scope::SafekeeperData,
400 510 : ))
401 510 : // wrap with task name for error reporting
402 510 : .map(|res| ("WAL service main".to_owned(), res));
403 510 : tasks_handles.push(Box::pin(wal_service_handle));
404 :
405 510 : if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
406 510 : let conf_ = conf.clone();
407 510 : let wal_service_handle = current_thread_rt
408 510 : .as_ref()
409 510 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
410 510 : .spawn(wal_service::task_main(
411 510 : conf_,
412 510 : pg_listener_tenant_only,
413 510 : Scope::Tenant,
414 510 : ))
415 510 : // wrap with task name for error reporting
416 510 : .map(|res| ("WAL service tenant only main".to_owned(), res));
417 510 : tasks_handles.push(Box::pin(wal_service_handle));
418 510 : }
419 :
420 510 : let conf_ = conf.clone();
421 510 : let http_handle = current_thread_rt
422 510 : .as_ref()
423 510 : .unwrap_or_else(|| HTTP_RUNTIME.handle())
424 510 : .spawn(http::task_main(conf_, http_listener))
425 510 : .map(|res| ("HTTP service main".to_owned(), res));
426 510 : tasks_handles.push(Box::pin(http_handle));
427 510 :
428 510 : let conf_ = conf.clone();
429 510 : let broker_task_handle = current_thread_rt
430 510 : .as_ref()
431 510 : .unwrap_or_else(|| BROKER_RUNTIME.handle())
432 510 : .spawn(broker::task_main(conf_).instrument(info_span!("broker")))
433 510 : .map(|res| ("broker main".to_owned(), res));
434 510 : tasks_handles.push(Box::pin(broker_task_handle));
435 510 :
436 510 : let conf_ = conf.clone();
437 510 : let wal_remover_handle = current_thread_rt
438 510 : .as_ref()
439 510 : .unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle())
440 510 : .spawn(remove_wal::task_main(conf_))
441 510 : .map(|res| ("WAL remover".to_owned(), res));
442 510 : tasks_handles.push(Box::pin(wal_remover_handle));
443 510 :
444 510 : set_build_info_metric(GIT_VERSION, BUILD_TAG);
445 :
446 : // TODO: update tokio-stream, convert to real async Stream with
447 : // SignalStream, map it to obtain missing signal name, combine streams into
448 : // single stream we can easily sit on.
449 510 : let mut sigquit_stream = signal(SignalKind::quit())?;
450 510 : let mut sigint_stream = signal(SignalKind::interrupt())?;
451 510 : let mut sigterm_stream = signal(SignalKind::terminate())?;
452 :
453 : // Notify systemd that we are ready. This is important as currently loading
454 : // timelines takes significant time (~30s in busy regions).
455 510 : if let Err(e) = sd_notify::notify(true, &[NotifyState::Ready]) {
456 0 : warn!("systemd notify failed: {:?}", e);
457 510 : }
458 :
459 510 : tokio::select! {
460 0 : Some((task_name, res)) = tasks_handles.next()=> {
461 0 : error!("{} task failed: {:?}, exiting", task_name, res);
462 : std::process::exit(1);
463 : }
464 : // On any shutdown signal, log receival and exit. Additionally, handling
465 : // SIGQUIT prevents coredump.
466 393 : _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
467 0 : _ = sigint_stream.recv() => info!("received SIGINT, terminating"),
468 117 : _ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
469 :
470 : };
471 510 : std::process::exit(0);
472 0 : }
473 :
474 : /// Determine safekeeper id.
475 510 : fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
476 510 : let id_file_path = workdir.join(ID_FILE_NAME);
477 510 :
478 510 : let my_id: NodeId;
479 510 : // If file with ID exists, read it in; otherwise set one passed.
480 510 : match fs::read(&id_file_path) {
481 93 : Ok(id_serialized) => {
482 93 : my_id = NodeId(
483 93 : std::str::from_utf8(&id_serialized)
484 93 : .context("failed to parse safekeeper id")?
485 93 : .parse()
486 93 : .context("failed to parse safekeeper id")?,
487 : );
488 93 : if let Some(given_id) = given_id {
489 93 : if given_id != my_id {
490 0 : bail!(
491 0 : "safekeeper already initialized with id {}, can't set {}",
492 0 : my_id,
493 0 : given_id
494 0 : );
495 93 : }
496 0 : }
497 93 : info!("safekeeper ID {}", my_id);
498 : }
499 417 : Err(error) => match error.kind() {
500 : ErrorKind::NotFound => {
501 417 : my_id = if let Some(given_id) = given_id {
502 417 : given_id
503 : } else {
504 0 : bail!("safekeeper id is not specified");
505 : };
506 417 : let mut f = File::create(&id_file_path)
507 417 : .with_context(|| format!("Failed to create id file at {id_file_path:?}"))?;
508 417 : f.write_all(my_id.to_string().as_bytes())?;
509 417 : f.sync_all()?;
510 417 : info!("initialized safekeeper id {}", my_id);
511 : }
512 : _ => {
513 0 : return Err(error.into());
514 : }
515 : },
516 : }
517 510 : Ok(my_id)
518 510 : }
519 :
520 : // Parse RemoteStorage from TOML table.
521 25 : fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
522 25 : // funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
523 25 : let storage_conf_toml = format!("remote_storage = {storage_conf}");
524 25 : let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
525 25 : let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
526 25 : RemoteStorageConfig::from_toml(storage_conf_parsed_toml).and_then(|parsed_config| {
527 25 : // XXX: Don't print the original toml here, there might be some sensitive data
528 25 : parsed_config.context("Incorrectly parsed remote storage toml as no remote storage config")
529 25 : })
530 25 : }
531 :
532 2 : #[test]
533 2 : fn verify_cli() {
534 2 : use clap::CommandFactory;
535 2 : Args::command().debug_assert()
536 2 : }
|