TLA Line data Source code
1 : //
2 : // Main entry point for the safekeeper executable
3 : //
4 : use anyhow::{bail, Context, Result};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use clap::Parser;
7 : use futures::future::BoxFuture;
8 : use futures::stream::FuturesUnordered;
9 : use futures::{FutureExt, StreamExt};
10 : use remote_storage::RemoteStorageConfig;
11 : use tokio::runtime::Handle;
12 : use tokio::signal::unix::{signal, SignalKind};
13 : use tokio::task::JoinError;
14 : use toml_edit::Document;
15 :
16 : use std::fs::{self, File};
17 : use std::io::{ErrorKind, Write};
18 : use std::str::FromStr;
19 : use std::sync::Arc;
20 : use std::time::Duration;
21 : use storage_broker::Uri;
22 : use tokio::sync::mpsc;
23 :
24 : use tracing::*;
25 : use utils::pid_file;
26 :
27 : use metrics::set_build_info_metric;
28 : use safekeeper::defaults::{
29 : DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
30 : DEFAULT_PG_LISTEN_ADDR,
31 : };
32 : use safekeeper::wal_service;
33 : use safekeeper::GlobalTimelines;
34 : use safekeeper::SafeKeeperConf;
35 : use safekeeper::{broker, WAL_SERVICE_RUNTIME};
36 : use safekeeper::{control_file, BROKER_RUNTIME};
37 : use safekeeper::{http, WAL_REMOVER_RUNTIME};
38 : use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
39 : use safekeeper::{wal_backup, HTTP_RUNTIME};
40 : use storage_broker::DEFAULT_ENDPOINT;
41 : use utils::auth::{JwtAuth, Scope};
42 : use utils::{
43 : id::NodeId,
44 : logging::{self, LogFormat},
45 : project_git_version,
46 : sentry_init::init_sentry,
47 : tcp_listener,
48 : };
49 :
50 : const PID_FILE_NAME: &str = "safekeeper.pid";
51 : const ID_FILE_NAME: &str = "safekeeper.id";
52 :
53 : project_git_version!(GIT_VERSION);
54 :
55 : const ABOUT: &str = r#"
56 : A fleet of safekeepers is responsible for reliably storing WAL received from
57 : compute, passing it through consensus (mitigating potential computes brain
58 : split), and serving the hardened part further downstream to pageserver(s).
59 : "#;
60 :
61 CBC 1001 : #[derive(Parser)]
62 : #[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)]
63 : struct Args {
64 : /// Path to the safekeeper data directory.
65 : #[arg(short = 'D', long, default_value = "./")]
66 UBC 0 : datadir: Utf8PathBuf,
67 : /// Safekeeper node id.
68 : #[arg(long)]
69 : id: Option<u64>,
70 : /// Initialize safekeeper with given id and exit.
71 : #[arg(long)]
72 0 : init: bool,
73 0 : /// Listen endpoint for receiving/sending WAL in the form host:port.
74 : #[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
75 0 : listen_pg: String,
76 : /// Listen endpoint for receiving/sending WAL in the form host:port allowing
77 : /// only tenant scoped auth tokens. Pointless if auth is disabled.
78 : #[arg(long, default_value = None, verbatim_doc_comment)]
79 : listen_pg_tenant_only: Option<String>,
80 : /// Listen http endpoint for management and metrics in the form host:port.
81 : #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
82 0 : listen_http: String,
83 : /// Advertised endpoint for receiving/sending WAL in the form host:port. If not
84 : /// specified, listen_pg is used to advertise instead.
85 : #[arg(long, default_value = None)]
86 : advertise_pg: Option<String>,
87 : /// Availability zone of the safekeeper.
88 : #[arg(long)]
89 : availability_zone: Option<String>,
90 : /// Do not wait for changes to be written safely to disk. Unsafe.
91 : #[arg(short, long)]
92 0 : no_sync: bool,
93 0 : /// Dump control file at path specified by this argument and exit.
94 : #[arg(long)]
95 : dump_control_file: Option<Utf8PathBuf>,
96 : /// Broker endpoint for storage nodes coordination in the form
97 : /// http[s]://host:port. In case of https schema TLS is connection is
98 : /// established; plaintext otherwise.
99 : #[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
100 0 : broker_endpoint: Uri,
101 0 : /// Broker keepalive interval.
102 : #[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
103 0 : broker_keepalive_interval: Duration,
104 : /// Peer safekeeper is considered dead after not receiving heartbeats from
105 : /// it during this period passed as a human readable duration.
106 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
107 0 : heartbeat_timeout: Duration,
108 : /// Remote storage configuration for WAL backup (offloading to s3) as TOML
109 : /// inline table, e.g.
110 : /// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
111 : /// Safekeeper offloads WAL to
112 : /// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
113 : /// structure on the file system.
114 : #[arg(long, value_parser = parse_remote_storage, verbatim_doc_comment)]
115 : remote_storage: Option<RemoteStorageConfig>,
116 : /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
117 CBC 501 : #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
118 UBC 0 : max_offloader_lag: u64,
119 0 : /// Number of max parallel WAL segments to be offloaded to remote storage.
120 : #[arg(long, default_value = "5")]
121 0 : wal_backup_parallel_jobs: usize,
122 0 : /// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
123 : /// WAL backup horizon.
124 : #[arg(long)]
125 0 : disable_wal_backup: bool,
126 0 : /// If given, enables auth on incoming connections to WAL service endpoint
127 : /// (--listen-pg). Value specifies path to a .pem public key used for
128 : /// validations of JWT tokens. Empty string is allowed and means disabling
129 : /// auth.
130 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
131 : pg_auth_public_key_path: Option<Utf8PathBuf>,
132 : /// If given, enables auth on incoming connections to tenant only WAL
133 : /// service endpoint (--listen-pg-tenant-only). Value specifies path to a
134 : /// .pem public key used for validations of JWT tokens. Empty string is
135 : /// allowed and means disabling auth.
136 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
137 : pg_tenant_only_auth_public_key_path: Option<Utf8PathBuf>,
138 : /// If given, enables auth on incoming connections to http management
139 : /// service endpoint (--listen-http). Value specifies path to a .pem public
140 : /// key used for validations of JWT tokens. Empty string is allowed and
141 : /// means disabling auth.
142 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
143 : http_auth_public_key_path: Option<Utf8PathBuf>,
144 : /// Format for logging, either 'plain' or 'json'.
145 : #[arg(long, default_value = "plain")]
146 0 : log_format: String,
147 : /// Run everything in single threaded current thread runtime, might be
148 : /// useful for debugging.
149 : #[arg(long)]
150 0 : current_thread_runtime: bool,
151 0 : }
152 :
153 : // Like PathBufValueParser, but allows empty string.
154 CBC 63 : fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
155 63 : Ok(Utf8PathBuf::from_str(s).unwrap())
156 63 : }
157 :
158 : #[tokio::main(flavor = "current_thread")]
159 500 : async fn main() -> anyhow::Result<()> {
160 500 : // We want to allow multiple occurences of the same arg (taking the last) so
161 500 : // that neon_local could generate command with defaults + overrides without
162 500 : // getting 'argument cannot be used multiple times' error. This seems to be
163 500 : // impossible with pure Derive API, so convert struct to Command, modify it,
164 500 : // parse arguments, and then fill the struct back.
165 500 : let cmd = <Args as clap::CommandFactory>::command().args_override_self(true);
166 500 : let mut matches = cmd.get_matches();
167 500 : let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;
168 :
169 : // I failed to modify opt_pathbuf_parser to return Option<PathBuf> in
170 : // reasonable time, so turn empty string into option post factum.
171 500 : if let Some(pb) = &args.pg_auth_public_key_path {
172 20 : if pb.as_os_str().is_empty() {
173 1 : args.pg_auth_public_key_path = None;
174 19 : }
175 480 : }
176 500 : if let Some(pb) = &args.pg_tenant_only_auth_public_key_path {
177 20 : if pb.as_os_str().is_empty() {
178 UBC 0 : args.pg_tenant_only_auth_public_key_path = None;
179 CBC 20 : }
180 480 : }
181 500 : if let Some(pb) = &args.http_auth_public_key_path {
182 20 : if pb.as_os_str().is_empty() {
183 2 : args.http_auth_public_key_path = None;
184 18 : }
185 480 : }
186 :
187 500 : if let Some(addr) = args.dump_control_file {
188 UBC 0 : let state = control_file::FileStorage::load_control_file(addr)?;
189 0 : let json = serde_json::to_string(&state)?;
190 0 : print!("{json}");
191 0 : return Ok(());
192 CBC 500 : }
193 500 :
194 500 : // important to keep the order of:
195 500 : // 1. init logging
196 500 : // 2. tracing panic hook
197 500 : // 3. sentry
198 500 : logging::init(
199 500 : LogFormat::from_config(&args.log_format)?,
200 500 : logging::TracingErrorLayerEnablement::Disabled,
201 UBC 0 : )?;
202 CBC 500 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
203 500 : info!("version: {GIT_VERSION}");
204 :
205 500 : let args_workdir = &args.datadir;
206 500 : let workdir = args_workdir.canonicalize_utf8().with_context(|| {
207 UBC 0 : format!("Failed to get the absolute path for input workdir {args_workdir:?}")
208 CBC 500 : })?;
209 :
210 : // Change into the data directory.
211 500 : std::env::set_current_dir(&workdir)?;
212 :
213 : // Set or read our ID.
214 500 : let id = set_id(&workdir, args.id.map(NodeId))?;
215 500 : if args.init {
216 UBC 0 : return Ok(());
217 CBC 500 : }
218 :
219 500 : let pg_auth = match args.pg_auth_public_key_path.as_ref() {
220 : None => {
221 481 : info!("pg auth is disabled");
222 481 : None
223 : }
224 19 : Some(path) => {
225 19 : info!("loading pg auth JWT key from {path}");
226 : Some(Arc::new(
227 19 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
228 : ))
229 : }
230 : };
231 500 : let pg_tenant_only_auth = match args.pg_tenant_only_auth_public_key_path.as_ref() {
232 : None => {
233 480 : info!("pg tenant only auth is disabled");
234 480 : None
235 : }
236 20 : Some(path) => {
237 20 : info!("loading pg tenant only auth JWT key from {path}");
238 : Some(Arc::new(
239 20 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
240 : ))
241 : }
242 : };
243 500 : let http_auth = match args.http_auth_public_key_path.as_ref() {
244 : None => {
245 482 : info!("http auth is disabled");
246 482 : None
247 : }
248 18 : Some(path) => {
249 18 : info!("loading http auth JWT key from {path}");
250 : Some(Arc::new(
251 18 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
252 : ))
253 : }
254 : };
255 :
256 500 : let conf = SafeKeeperConf {
257 500 : workdir,
258 500 : my_id: id,
259 500 : listen_pg_addr: args.listen_pg,
260 500 : listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
261 500 : listen_http_addr: args.listen_http,
262 500 : advertise_pg_addr: args.advertise_pg,
263 500 : availability_zone: args.availability_zone,
264 500 : no_sync: args.no_sync,
265 500 : broker_endpoint: args.broker_endpoint,
266 500 : broker_keepalive_interval: args.broker_keepalive_interval,
267 500 : heartbeat_timeout: args.heartbeat_timeout,
268 500 : remote_storage: args.remote_storage,
269 500 : max_offloader_lag_bytes: args.max_offloader_lag,
270 500 : wal_backup_enabled: !args.disable_wal_backup,
271 500 : backup_parallel_jobs: args.wal_backup_parallel_jobs,
272 500 : pg_auth,
273 500 : pg_tenant_only_auth,
274 500 : http_auth,
275 500 : current_thread_runtime: args.current_thread_runtime,
276 500 : };
277 500 :
278 500 : // initialize sentry if SENTRY_DSN is provided
279 500 : let _sentry_guard = init_sentry(
280 500 : Some(GIT_VERSION.into()),
281 500 : &[("node_id", &conf.my_id.to_string())],
282 500 : );
283 1000 : start_safekeeper(conf).await
284 : }
285 :
286 : /// Result of joining any of main tasks: upper error means task failed to
287 : /// complete, e.g. panicked, inner is error produced by task itself.
288 : type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
289 :
290 500 : async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
291 500 : // Prevent running multiple safekeepers on the same directory
292 500 : let lock_file_path = conf.workdir.join(PID_FILE_NAME);
293 500 : let lock_file =
294 500 : pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
295 500 : info!("claimed pid file at {lock_file_path:?}");
296 :
297 : // ensure that the lock file is held even if the main thread of the process is panics
298 : // we need to release the lock file only when the current process is gone
299 500 : std::mem::forget(lock_file);
300 :
301 500 : info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
302 500 : let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
303 UBC 0 : error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
304 0 : e
305 CBC 500 : })?;
306 :
307 500 : let pg_listener_tenant_only =
308 500 : if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only {
309 500 : info!(
310 500 : "starting safekeeper tenant scoped WAL service on {}",
311 500 : listen_pg_addr_tenant_only
312 500 : );
313 500 : let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
314 UBC 0 : error!(
315 0 : "failed to bind to address {}: {}",
316 0 : listen_pg_addr_tenant_only, e
317 0 : );
318 0 : e
319 CBC 500 : })?;
320 500 : Some(listener)
321 : } else {
322 UBC 0 : None
323 : };
324 :
325 CBC 500 : info!(
326 500 : "starting safekeeper HTTP service on {}",
327 500 : conf.listen_http_addr
328 500 : );
329 500 : let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
330 UBC 0 : error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
331 0 : e
332 CBC 500 : })?;
333 :
334 : // Register metrics collector for active timelines. It's important to do this
335 : // after daemonizing, otherwise process collector will be upset.
336 500 : let timeline_collector = safekeeper::metrics::TimelineCollector::new();
337 500 : metrics::register_internal(Box::new(timeline_collector))?;
338 :
339 500 : let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
340 500 :
341 500 : // Keep handles to main tasks to die if any of them disappears.
342 500 : let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
343 500 : FuturesUnordered::new();
344 500 :
345 500 : // Start wal backup launcher before loading timelines as we'll notify it
346 500 : // through the channel about timelines which need offloading, not draining
347 500 : // the channel would cause deadlock.
348 500 : let current_thread_rt = conf
349 500 : .current_thread_runtime
350 500 : .then(|| Handle::try_current().expect("no runtime in main"));
351 500 : let conf_ = conf.clone();
352 500 : let wal_backup_handle = current_thread_rt
353 500 : .as_ref()
354 500 : .unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
355 500 : .spawn(wal_backup::wal_backup_launcher_task_main(
356 500 : conf_,
357 500 : wal_backup_launcher_rx,
358 500 : ))
359 500 : .map(|res| ("WAL backup launcher".to_owned(), res));
360 500 : tasks_handles.push(Box::pin(wal_backup_handle));
361 500 :
362 500 : // Load all timelines from disk to memory.
363 500 : GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
364 :
365 500 : let conf_ = conf.clone();
366 500 : // Run everything in current thread rt, if asked.
367 500 : if conf.current_thread_runtime {
368 UBC 0 : info!("running in current thread runtime");
369 CBC 500 : }
370 :
371 500 : let wal_service_handle = current_thread_rt
372 500 : .as_ref()
373 500 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
374 500 : .spawn(wal_service::task_main(
375 500 : conf_,
376 500 : pg_listener,
377 500 : Scope::SafekeeperData,
378 500 : ))
379 500 : // wrap with task name for error reporting
380 500 : .map(|res| ("WAL service main".to_owned(), res));
381 500 : tasks_handles.push(Box::pin(wal_service_handle));
382 :
383 500 : if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
384 500 : let conf_ = conf.clone();
385 500 : let wal_service_handle = current_thread_rt
386 500 : .as_ref()
387 500 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
388 500 : .spawn(wal_service::task_main(
389 500 : conf_,
390 500 : pg_listener_tenant_only,
391 500 : Scope::Tenant,
392 500 : ))
393 500 : // wrap with task name for error reporting
394 500 : .map(|res| ("WAL service tenant only main".to_owned(), res));
395 500 : tasks_handles.push(Box::pin(wal_service_handle));
396 500 : }
397 :
398 500 : let conf_ = conf.clone();
399 500 : let http_handle = current_thread_rt
400 500 : .as_ref()
401 500 : .unwrap_or_else(|| HTTP_RUNTIME.handle())
402 500 : .spawn(http::task_main(conf_, http_listener))
403 500 : .map(|res| ("HTTP service main".to_owned(), res));
404 500 : tasks_handles.push(Box::pin(http_handle));
405 500 :
406 500 : let conf_ = conf.clone();
407 500 : let broker_task_handle = current_thread_rt
408 500 : .as_ref()
409 500 : .unwrap_or_else(|| BROKER_RUNTIME.handle())
410 500 : .spawn(broker::task_main(conf_).instrument(info_span!("broker")))
411 500 : .map(|res| ("broker main".to_owned(), res));
412 500 : tasks_handles.push(Box::pin(broker_task_handle));
413 500 :
414 500 : let conf_ = conf.clone();
415 500 : let wal_remover_handle = current_thread_rt
416 500 : .as_ref()
417 500 : .unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle())
418 500 : .spawn(remove_wal::task_main(conf_))
419 500 : .map(|res| ("WAL remover".to_owned(), res));
420 500 : tasks_handles.push(Box::pin(wal_remover_handle));
421 500 :
422 500 : set_build_info_metric(GIT_VERSION);
423 :
424 : // TODO: update tokio-stream, convert to real async Stream with
425 : // SignalStream, map it to obtain missing signal name, combine streams into
426 : // single stream we can easily sit on.
427 500 : let mut sigquit_stream = signal(SignalKind::quit())?;
428 500 : let mut sigint_stream = signal(SignalKind::interrupt())?;
429 500 : let mut sigterm_stream = signal(SignalKind::terminate())?;
430 :
431 500 : tokio::select! {
432 UBC 0 : Some((task_name, res)) = tasks_handles.next()=> {
433 0 : error!("{} task failed: {:?}, exiting", task_name, res);
434 : std::process::exit(1);
435 : }
436 : // On any shutdown signal, log receival and exit. Additionally, handling
437 : // SIGQUIT prevents coredump.
438 CBC 387 : _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
439 UBC 0 : _ = sigint_stream.recv() => info!("received SIGINT, terminating"),
440 CBC 113 : _ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
441 :
442 : };
443 500 : std::process::exit(0);
444 UBC 0 : }
445 :
446 : /// Determine safekeeper id.
447 CBC 500 : fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
448 500 : let id_file_path = workdir.join(ID_FILE_NAME);
449 500 :
450 500 : let my_id: NodeId;
451 500 : // If file with ID exists, read it in; otherwise set one passed.
452 500 : match fs::read(&id_file_path) {
453 92 : Ok(id_serialized) => {
454 92 : my_id = NodeId(
455 92 : std::str::from_utf8(&id_serialized)
456 92 : .context("failed to parse safekeeper id")?
457 92 : .parse()
458 92 : .context("failed to parse safekeeper id")?,
459 : );
460 92 : if let Some(given_id) = given_id {
461 92 : if given_id != my_id {
462 UBC 0 : bail!(
463 0 : "safekeeper already initialized with id {}, can't set {}",
464 0 : my_id,
465 0 : given_id
466 0 : );
467 CBC 92 : }
468 UBC 0 : }
469 CBC 92 : info!("safekeeper ID {}", my_id);
470 : }
471 408 : Err(error) => match error.kind() {
472 : ErrorKind::NotFound => {
473 408 : my_id = if let Some(given_id) = given_id {
474 408 : given_id
475 : } else {
476 UBC 0 : bail!("safekeeper id is not specified");
477 : };
478 CBC 408 : let mut f = File::create(&id_file_path)
479 408 : .with_context(|| format!("Failed to create id file at {id_file_path:?}"))?;
480 408 : f.write_all(my_id.to_string().as_bytes())?;
481 408 : f.sync_all()?;
482 408 : info!("initialized safekeeper id {}", my_id);
483 : }
484 : _ => {
485 UBC 0 : return Err(error.into());
486 : }
487 : },
488 : }
489 CBC 500 : Ok(my_id)
490 500 : }
491 :
492 : // Parse RemoteStorage from TOML table.
493 36 : fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
494 36 : // funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
495 36 : let storage_conf_toml = format!("remote_storage = {storage_conf}");
496 36 : let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
497 36 : let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
498 36 : RemoteStorageConfig::from_toml(storage_conf_parsed_toml).and_then(|parsed_config| {
499 36 : // XXX: Don't print the original toml here, there might be some sensitive data
500 36 : parsed_config.context("Incorrectly parsed remote storage toml as no remote storage config")
501 36 : })
502 36 : }
503 :
504 1 : #[test]
505 1 : fn verify_cli() {
506 1 : use clap::CommandFactory;
507 1 : Args::command().debug_assert()
508 1 : }
|