LCOV - code coverage report
Current view: top level - safekeeper/src - lib.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 90.2 % 82 74
Test Date: 2024-02-07 07:37:29 Functions: 83.3 % 12 10

            Line data    Source code
       1              : #![deny(clippy::undocumented_unsafe_blocks)]
       2              : use camino::Utf8PathBuf;
       3              : use once_cell::sync::Lazy;
       4              : use remote_storage::RemoteStorageConfig;
       5              : use tokio::runtime::Runtime;
       6              : 
       7              : use std::time::Duration;
       8              : use storage_broker::Uri;
       9              : 
      10              : use utils::{
      11              :     auth::SwappableJwtAuth,
      12              :     id::{NodeId, TenantId, TenantTimelineId},
      13              : };
      14              : 
      15              : mod auth;
      16              : pub mod broker;
      17              : pub mod control_file;
      18              : pub mod control_file_upgrade;
      19              : pub mod copy_timeline;
      20              : pub mod debug_dump;
      21              : pub mod handler;
      22              : pub mod http;
      23              : pub mod json_ctrl;
      24              : pub mod metrics;
      25              : pub mod patch_control_file;
      26              : pub mod pull_timeline;
      27              : pub mod receive_wal;
      28              : pub mod recovery;
      29              : pub mod remove_wal;
      30              : pub mod safekeeper;
      31              : pub mod send_wal;
      32              : pub mod state;
      33              : pub mod timeline;
      34              : pub mod wal_backup;
      35              : pub mod wal_service;
      36              : pub mod wal_storage;
      37              : 
      38              : mod timelines_global_map;
      39              : use std::sync::Arc;
      40              : pub use timelines_global_map::GlobalTimelines;
      41              : use utils::auth::JwtAuth;
      42              : 
      43              : pub mod defaults {
      44              :     pub use safekeeper_api::{
      45              :         DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
      46              :         DEFAULT_PG_LISTEN_PORT,
      47              :     };
      48              : 
      49              :     pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
      50              :     pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
      51              : }
      52              : 
      53        10489 : #[derive(Debug, Clone)]
      54              : pub struct SafeKeeperConf {
      55              :     // Repository directory, relative to current working directory.
      56              :     // Normally, the safekeeper changes the current working directory
      57              :     // to the repository, and 'workdir' is always '.'. But we don't do
      58              :     // that during unit testing, because the current directory is global
      59              :     // to the process but different unit tests work on different
      60              :     // data directories to avoid clashing with each other.
      61              :     pub workdir: Utf8PathBuf,
      62              :     pub my_id: NodeId,
      63              :     pub listen_pg_addr: String,
      64              :     pub listen_pg_addr_tenant_only: Option<String>,
      65              :     pub listen_http_addr: String,
      66              :     pub advertise_pg_addr: Option<String>,
      67              :     pub availability_zone: Option<String>,
      68              :     pub no_sync: bool,
      69              :     pub broker_endpoint: Uri,
      70              :     pub broker_keepalive_interval: Duration,
      71              :     pub heartbeat_timeout: Duration,
      72              :     pub peer_recovery_enabled: bool,
      73              :     pub remote_storage: Option<RemoteStorageConfig>,
      74              :     pub max_offloader_lag_bytes: u64,
      75              :     pub backup_parallel_jobs: usize,
      76              :     pub wal_backup_enabled: bool,
      77              :     pub pg_auth: Option<Arc<JwtAuth>>,
      78              :     pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
      79              :     pub http_auth: Option<Arc<SwappableJwtAuth>>,
      80              :     pub current_thread_runtime: bool,
      81              : }
      82              : 
      83              : impl SafeKeeperConf {
      84         3023 :     pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf {
      85         3023 :         self.workdir.join(tenant_id.to_string())
      86         3023 :     }
      87              : 
      88         2885 :     pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf {
      89         2885 :         self.tenant_dir(&ttid.tenant_id)
      90         2885 :             .join(ttid.timeline_id.to_string())
      91         2885 :     }
      92              : 
      93        13593 :     pub fn is_wal_backup_enabled(&self) -> bool {
      94        13593 :         self.remote_storage.is_some() && self.wal_backup_enabled
      95        13593 :     }
      96              : }
      97              : 
      98              : impl SafeKeeperConf {
      99              :     #[cfg(test)]
     100            4 :     fn dummy() -> Self {
     101            4 :         SafeKeeperConf {
     102            4 :             workdir: Utf8PathBuf::from("./"),
     103            4 :             no_sync: false,
     104            4 :             listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
     105            4 :             listen_pg_addr_tenant_only: None,
     106            4 :             listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
     107            4 :             advertise_pg_addr: None,
     108            4 :             availability_zone: None,
     109            4 :             remote_storage: None,
     110            4 :             my_id: NodeId(0),
     111            4 :             broker_endpoint: storage_broker::DEFAULT_ENDPOINT
     112            4 :                 .parse()
     113            4 :                 .expect("failed to parse default broker endpoint"),
     114            4 :             broker_keepalive_interval: Duration::from_secs(5),
     115            4 :             peer_recovery_enabled: true,
     116            4 :             wal_backup_enabled: true,
     117            4 :             backup_parallel_jobs: 1,
     118            4 :             pg_auth: None,
     119            4 :             pg_tenant_only_auth: None,
     120            4 :             http_auth: None,
     121            4 :             heartbeat_timeout: Duration::new(5, 0),
     122            4 :             max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
     123            4 :             current_thread_runtime: false,
     124            4 :         }
     125            4 :     }
     126              : }
     127              : 
     128              : // Tokio runtimes.
     129          510 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     130          510 :     tokio::runtime::Builder::new_multi_thread()
     131          510 :         .thread_name("WAL service worker")
     132          510 :         .enable_all()
     133          510 :         .build()
     134          510 :         .expect("Failed to create WAL service runtime")
     135          510 : });
     136              : 
     137          510 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     138          510 :     tokio::runtime::Builder::new_multi_thread()
     139          510 :         .thread_name("HTTP worker")
     140          510 :         .enable_all()
     141          510 :         .build()
     142          510 :         .expect("Failed to create WAL service runtime")
     143          510 : });
     144              : 
     145          510 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     146          510 :     tokio::runtime::Builder::new_multi_thread()
     147          510 :         .thread_name("broker worker")
     148          510 :         .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
     149          510 :         .enable_all()
     150          510 :         .build()
     151          510 :         .expect("Failed to create broker runtime")
     152          510 : });
     153              : 
     154          510 : pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     155          510 :     tokio::runtime::Builder::new_multi_thread()
     156          510 :         .thread_name("WAL remover")
     157          510 :         .worker_threads(1)
     158          510 :         .enable_all()
     159          510 :         .build()
     160          510 :         .expect("Failed to create broker runtime")
     161          510 : });
     162              : 
     163          510 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     164          510 :     tokio::runtime::Builder::new_multi_thread()
     165          510 :         .thread_name("WAL backup worker")
     166          510 :         .enable_all()
     167          510 :         .build()
     168          510 :         .expect("Failed to create WAL backup runtime")
     169          510 : });
     170              : 
     171            0 : pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     172            0 :     tokio::runtime::Builder::new_multi_thread()
     173            0 :         .thread_name("metric shifter")
     174            0 :         .worker_threads(1)
     175            0 :         .enable_all()
     176            0 :         .build()
     177            0 :         .expect("Failed to create broker runtime")
     178            0 : });
        

Generated by: LCOV version 2.1-beta