LCOV - code coverage report
Current view: top level - safekeeper/src - lib.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 58.9 % 95 56
Test Date: 2025-07-22 17:50:06 Functions: 16.7 % 6 1

            Line data    Source code
       1              : #![deny(clippy::undocumented_unsafe_blocks)]
       2              : 
       3              : extern crate hyper0 as hyper;
       4              : 
       5              : use std::time::Duration;
       6              : 
       7              : use camino::Utf8PathBuf;
       8              : use once_cell::sync::Lazy;
       9              : use pem::Pem;
      10              : use remote_storage::RemoteStorageConfig;
      11              : use storage_broker::Uri;
      12              : use tokio::runtime::Runtime;
      13              : use url::Url;
      14              : use utils::auth::SwappableJwtAuth;
      15              : use utils::id::NodeId;
      16              : use utils::logging::SecretString;
      17              : 
      18              : mod auth;
      19              : pub mod broker;
      20              : pub mod control_file;
      21              : pub mod control_file_upgrade;
      22              : pub mod copy_timeline;
      23              : pub mod debug_dump;
      24              : pub mod hadron;
      25              : pub mod handler;
      26              : pub mod http;
      27              : pub mod metrics;
      28              : pub mod patch_control_file;
      29              : pub mod pull_timeline;
      30              : pub mod rate_limit;
      31              : pub mod receive_wal;
      32              : pub mod recovery;
      33              : pub mod remove_wal;
      34              : pub mod safekeeper;
      35              : pub mod send_interpreted_wal;
      36              : pub mod send_wal;
      37              : pub mod state;
      38              : pub mod timeline;
      39              : pub mod timeline_eviction;
      40              : pub mod timeline_guard;
      41              : pub mod timeline_manager;
      42              : pub mod timelines_set;
      43              : pub mod wal_backup;
      44              : pub mod wal_backup_partial;
      45              : pub mod wal_reader_stream;
      46              : pub mod wal_service;
      47              : pub mod wal_storage;
      48              : 
      49              : #[cfg(any(test, feature = "benchmarking"))]
      50              : pub mod test_utils;
      51              : 
      52              : mod timelines_global_map;
      53              : 
      54              : use std::sync::Arc;
      55              : 
      56              : pub use timelines_global_map::GlobalTimelines;
      57              : use utils::auth::JwtAuth;
      58              : 
      59              : pub mod defaults {
      60              :     pub use safekeeper_api::{
      61              :         DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
      62              :         DEFAULT_PG_LISTEN_PORT,
      63              :     };
      64              : 
      65              :     pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
      66              :     pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
      67              :     /* BEGIN_HADRON */
      68              :     // Default leader re-elect is 0(disabled). SK will re-elect leader if the current leader is lagging this many bytes.
      69              :     pub const DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES: u64 = 0;
      70              :     // Default disk usage limit is 0 (disabled). It means each timeline by default can use up to this many WAL
      71              :     // disk space on this SK until SK begins to reject WALs.
      72              :     pub const DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES: u64 = 0;
      73              :     /* END_HADRON */
      74              :     pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
      75              :     pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
      76              :     pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
      77              :     pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;
      78              : 
      79              :     // By default, our required residency before eviction is the same as the period that passes
      80              :     // before uploading a partial segment, so that in normal operation the eviction can happen
      81              :     // as soon as we have done the partial segment upload.
      82              :     pub const DEFAULT_EVICTION_MIN_RESIDENT: &str = DEFAULT_PARTIAL_BACKUP_TIMEOUT;
      83              : 
      84              :     pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
      85              :     pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
      86              :     pub const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
      87              : 
      88              :     // Global disk watcher defaults
      89              :     pub const DEFAULT_GLOBAL_DISK_CHECK_INTERVAL: &str = "60s";
      90              :     pub const DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO: f64 = 0.0;
      91              : }
      92              : 
      93              : #[derive(Debug, Clone)]
      94              : pub struct SafeKeeperConf {
      95              :     // Repository directory, relative to current working directory.
      96              :     // Normally, the safekeeper changes the current working directory
      97              :     // to the repository, and 'workdir' is always '.'. But we don't do
      98              :     // that during unit testing, because the current directory is global
      99              :     // to the process but different unit tests work on different
     100              :     // data directories to avoid clashing with each other.
     101              :     pub workdir: Utf8PathBuf,
     102              :     pub my_id: NodeId,
     103              :     pub listen_pg_addr: String,
     104              :     pub listen_pg_addr_tenant_only: Option<String>,
     105              :     pub listen_http_addr: String,
     106              :     pub listen_https_addr: Option<String>,
     107              :     pub advertise_pg_addr: Option<String>,
     108              :     pub availability_zone: Option<String>,
     109              :     pub no_sync: bool,
     110              :     /* BEGIN_HADRON */
     111              :     pub advertise_pg_addr_tenant_only: Option<String>,
     112              :     pub enable_pull_timeline_on_startup: bool,
     113              :     pub hcc_base_url: Option<Url>,
     114              :     /* END_HADRON */
     115              :     pub broker_endpoint: Uri,
     116              :     pub broker_keepalive_interval: Duration,
     117              :     pub heartbeat_timeout: Duration,
     118              :     pub peer_recovery_enabled: bool,
     119              :     pub remote_storage: Option<RemoteStorageConfig>,
     120              :     pub max_offloader_lag_bytes: u64,
     121              :     /* BEGIN_HADRON */
     122              :     pub max_reelect_offloader_lag_bytes: u64,
     123              :     pub max_timeline_disk_usage_bytes: u64,
     124              :     /// How often to check the working directory's filesystem for total disk usage.
     125              :     pub global_disk_check_interval: Duration,
     126              :     /// The portion of the filesystem capacity that can be used by all timelines.
     127              :     pub max_global_disk_usage_ratio: f64,
     128              :     /* END_HADRON */
     129              :     pub backup_parallel_jobs: usize,
     130              :     pub wal_backup_enabled: bool,
     131              :     pub pg_auth: Option<Arc<JwtAuth>>,
     132              :     pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
     133              :     pub http_auth: Option<Arc<SwappableJwtAuth>>,
     134              :     /// JWT token to connect to other safekeepers with.
     135              :     pub sk_auth_token: Option<SecretString>,
     136              :     pub current_thread_runtime: bool,
     137              :     pub walsenders_keep_horizon: bool,
     138              :     pub partial_backup_timeout: Duration,
     139              :     pub disable_periodic_broker_push: bool,
     140              :     pub enable_offload: bool,
     141              :     pub delete_offloaded_wal: bool,
     142              :     pub control_file_save_interval: Duration,
     143              :     pub partial_backup_concurrency: usize,
     144              :     pub eviction_min_resident: Duration,
     145              :     pub wal_reader_fanout: bool,
     146              :     pub max_delta_for_fanout: Option<u64>,
     147              :     pub ssl_key_file: Utf8PathBuf,
     148              :     pub ssl_cert_file: Utf8PathBuf,
     149              :     pub ssl_cert_reload_period: Duration,
     150              :     pub ssl_ca_certs: Vec<Pem>,
     151              :     pub use_https_safekeeper_api: bool,
     152              :     pub enable_tls_wal_service_api: bool,
     153              :     pub force_metric_collection_on_scrape: bool,
     154              : }
     155              : 
     156              : impl SafeKeeperConf {
     157           11 :     pub fn dummy() -> Self {
     158           11 :         SafeKeeperConf {
     159           11 :             workdir: Utf8PathBuf::from("./"),
     160           11 :             no_sync: false,
     161           11 :             listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
     162           11 :             listen_pg_addr_tenant_only: None,
     163           11 :             listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
     164           11 :             listen_https_addr: None,
     165           11 :             advertise_pg_addr: None,
     166           11 :             availability_zone: None,
     167           11 :             remote_storage: None,
     168           11 :             my_id: NodeId(0),
     169           11 :             broker_endpoint: storage_broker::DEFAULT_ENDPOINT
     170           11 :                 .parse()
     171           11 :                 .expect("failed to parse default broker endpoint"),
     172           11 :             broker_keepalive_interval: Duration::from_secs(5),
     173           11 :             peer_recovery_enabled: true,
     174           11 :             wal_backup_enabled: true,
     175           11 :             backup_parallel_jobs: 1,
     176           11 :             pg_auth: None,
     177           11 :             pg_tenant_only_auth: None,
     178           11 :             http_auth: None,
     179           11 :             sk_auth_token: None,
     180           11 :             heartbeat_timeout: Duration::new(5, 0),
     181           11 :             max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
     182           11 :             /* BEGIN_HADRON */
     183           11 :             max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES,
     184           11 :             max_timeline_disk_usage_bytes: defaults::DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
     185           11 :             global_disk_check_interval: Duration::from_secs(60),
     186           11 :             max_global_disk_usage_ratio: defaults::DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO,
     187           11 :             /* END_HADRON */
     188           11 :             current_thread_runtime: false,
     189           11 :             walsenders_keep_horizon: false,
     190           11 :             partial_backup_timeout: Duration::from_secs(0),
     191           11 :             disable_periodic_broker_push: false,
     192           11 :             enable_offload: false,
     193           11 :             delete_offloaded_wal: false,
     194           11 :             control_file_save_interval: Duration::from_secs(1),
     195           11 :             partial_backup_concurrency: 1,
     196           11 :             eviction_min_resident: Duration::ZERO,
     197           11 :             wal_reader_fanout: false,
     198           11 :             max_delta_for_fanout: None,
     199           11 :             ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
     200           11 :             ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
     201           11 :             ssl_cert_reload_period: Duration::from_secs(60),
     202           11 :             ssl_ca_certs: Vec::new(),
     203           11 :             use_https_safekeeper_api: false,
     204           11 :             enable_tls_wal_service_api: false,
     205           11 :             force_metric_collection_on_scrape: true,
     206           11 :             /* BEGIN_HADRON */
     207           11 :             advertise_pg_addr_tenant_only: None,
     208           11 :             enable_pull_timeline_on_startup: false,
     209           11 :             hcc_base_url: None,
     210           11 :             /* END_HADRON */
     211           11 :         }
     212           11 :     }
     213              : }
     214              : 
     215              : // Tokio runtimes.
     216            0 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     217            0 :     tokio::runtime::Builder::new_multi_thread()
     218            0 :         .thread_name("WAL service worker")
     219            0 :         .enable_all()
     220            0 :         .build()
     221            0 :         .expect("Failed to create WAL service runtime")
     222            0 : });
     223              : 
     224            0 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     225            0 :     tokio::runtime::Builder::new_multi_thread()
     226            0 :         .thread_name("HTTP worker")
     227            0 :         .enable_all()
     228            0 :         .build()
     229            0 :         .expect("Failed to create HTTP runtime")
     230            0 : });
     231              : 
     232            0 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     233            0 :     tokio::runtime::Builder::new_multi_thread()
     234            0 :         .thread_name("broker worker")
     235            0 :         .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
     236            0 :         .enable_all()
     237            0 :         .build()
     238            0 :         .expect("Failed to create broker runtime")
     239            0 : });
     240              : 
     241            0 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     242            0 :     tokio::runtime::Builder::new_multi_thread()
     243            0 :         .thread_name("WAL backup worker")
     244            0 :         .enable_all()
     245            0 :         .build()
     246            0 :         .expect("Failed to create WAL backup runtime")
     247            0 : });
     248              : 
     249              : /// Hadron: Dedicated runtime for infrequent background tasks.
     250            0 : pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     251            0 :     tokio::runtime::Builder::new_multi_thread()
     252            0 :         .thread_name("Hadron background worker")
     253            0 :         // One worker thread is enough, as most of the actual tasks run on blocking threads
     254            0 :         // which has it own thread pool.
     255            0 :         .worker_threads(1)
     256            0 :         .enable_all()
     257            0 :         .build()
     258            0 :         .expect("Failed to create background runtime")
     259            0 : });
        

Generated by: LCOV version 2.1-beta