LCOV - code coverage report
Current view: top level - safekeeper/src - lib.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 89.7 % 78 70
Test Date: 2023-09-06 10:18:01 Functions: 81.8 % 11 9

            Line data    Source code
       1              : use once_cell::sync::Lazy;
       2              : use remote_storage::RemoteStorageConfig;
       3              : use tokio::runtime::Runtime;
       4              : 
       5              : use std::path::PathBuf;
       6              : use std::time::Duration;
       7              : use storage_broker::Uri;
       8              : 
       9              : use utils::id::{NodeId, TenantId, TenantTimelineId};
      10              : 
      11              : mod auth;
      12              : pub mod broker;
      13              : pub mod control_file;
      14              : pub mod control_file_upgrade;
      15              : pub mod debug_dump;
      16              : pub mod handler;
      17              : pub mod http;
      18              : pub mod json_ctrl;
      19              : pub mod metrics;
      20              : pub mod pull_timeline;
      21              : pub mod receive_wal;
      22              : pub mod recovery;
      23              : pub mod remove_wal;
      24              : pub mod safekeeper;
      25              : pub mod send_wal;
      26              : pub mod timeline;
      27              : pub mod wal_backup;
      28              : pub mod wal_service;
      29              : pub mod wal_storage;
      30              : 
      31              : mod timelines_global_map;
      32              : use std::sync::Arc;
      33              : pub use timelines_global_map::GlobalTimelines;
      34              : use utils::auth::JwtAuth;
      35              : 
      36              : pub mod defaults {
      37              :     pub use safekeeper_api::{
      38              :         DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
      39              :         DEFAULT_PG_LISTEN_PORT,
      40              :     };
      41              : 
      42              :     pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
      43              :     pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
      44              : }
      45              : 
      46        11326 : #[derive(Debug, Clone)]
      47              : pub struct SafeKeeperConf {
      48              :     // Repository directory, relative to current working directory.
      49              :     // Normally, the safekeeper changes the current working directory
      50              :     // to the repository, and 'workdir' is always '.'. But we don't do
      51              :     // that during unit testing, because the current directory is global
      52              :     // to the process but different unit tests work on different
      53              :     // data directories to avoid clashing with each other.
      54              :     pub workdir: PathBuf,
      55              :     pub my_id: NodeId,
      56              :     pub listen_pg_addr: String,
      57              :     pub listen_pg_addr_tenant_only: Option<String>,
      58              :     pub listen_http_addr: String,
      59              :     pub advertise_pg_addr: Option<String>,
      60              :     pub availability_zone: Option<String>,
      61              :     pub no_sync: bool,
      62              :     pub broker_endpoint: Uri,
      63              :     pub broker_keepalive_interval: Duration,
      64              :     pub heartbeat_timeout: Duration,
      65              :     pub remote_storage: Option<RemoteStorageConfig>,
      66              :     pub max_offloader_lag_bytes: u64,
      67              :     pub backup_parallel_jobs: usize,
      68              :     pub wal_backup_enabled: bool,
      69              :     pub pg_auth: Option<Arc<JwtAuth>>,
      70              :     pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
      71              :     pub http_auth: Option<Arc<JwtAuth>>,
      72              :     pub current_thread_runtime: bool,
      73              : }
      74              : 
      75              : impl SafeKeeperConf {
      76         2851 :     pub fn tenant_dir(&self, tenant_id: &TenantId) -> PathBuf {
      77         2851 :         self.workdir.join(tenant_id.to_string())
      78         2851 :     }
      79              : 
      80         2763 :     pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> PathBuf {
      81         2763 :         self.tenant_dir(&ttid.tenant_id)
      82         2763 :             .join(ttid.timeline_id.to_string())
      83         2763 :     }
      84              : }
      85              : 
      86              : impl SafeKeeperConf {
      87              :     #[cfg(test)]
      88            2 :     fn dummy() -> Self {
      89            2 :         SafeKeeperConf {
      90            2 :             workdir: PathBuf::from("./"),
      91            2 :             no_sync: false,
      92            2 :             listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
      93            2 :             listen_pg_addr_tenant_only: None,
      94            2 :             listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
      95            2 :             advertise_pg_addr: None,
      96            2 :             availability_zone: None,
      97            2 :             remote_storage: None,
      98            2 :             my_id: NodeId(0),
      99            2 :             broker_endpoint: storage_broker::DEFAULT_ENDPOINT
     100            2 :                 .parse()
     101            2 :                 .expect("failed to parse default broker endpoint"),
     102            2 :             broker_keepalive_interval: Duration::from_secs(5),
     103            2 :             wal_backup_enabled: true,
     104            2 :             backup_parallel_jobs: 1,
     105            2 :             pg_auth: None,
     106            2 :             pg_tenant_only_auth: None,
     107            2 :             http_auth: None,
     108            2 :             heartbeat_timeout: Duration::new(5, 0),
     109            2 :             max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
     110            2 :             current_thread_runtime: false,
     111            2 :         }
     112            2 :     }
     113              : }
     114              : 
     115              : // Tokio runtimes.
     116          517 : pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     117          517 :     tokio::runtime::Builder::new_multi_thread()
     118          517 :         .thread_name("WAL service worker")
     119          517 :         .enable_all()
     120          517 :         .build()
     121          517 :         .expect("Failed to create WAL service runtime")
     122          517 : });
     123              : 
     124          517 : pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     125          517 :     tokio::runtime::Builder::new_multi_thread()
     126          517 :         .thread_name("HTTP worker")
     127          517 :         .enable_all()
     128          517 :         .build()
     129          517 :         .expect("Failed to create WAL service runtime")
     130          517 : });
     131              : 
     132          517 : pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     133          517 :     tokio::runtime::Builder::new_multi_thread()
     134          517 :         .thread_name("broker worker")
     135          517 :         .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
     136          517 :         .enable_all()
     137          517 :         .build()
     138          517 :         .expect("Failed to create broker runtime")
     139          517 : });
     140              : 
     141          517 : pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     142          517 :     tokio::runtime::Builder::new_multi_thread()
     143          517 :         .thread_name("WAL remover")
     144          517 :         .worker_threads(1)
     145          517 :         .enable_all()
     146          517 :         .build()
     147          517 :         .expect("Failed to create broker runtime")
     148          517 : });
     149              : 
     150          517 : pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     151          517 :     tokio::runtime::Builder::new_multi_thread()
     152          517 :         .thread_name("WAL backup worker")
     153          517 :         .enable_all()
     154          517 :         .build()
     155          517 :         .expect("Failed to create WAL backup runtime")
     156          517 : });
     157              : 
     158            0 : pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
     159            0 :     tokio::runtime::Builder::new_multi_thread()
     160            0 :         .thread_name("metric shifter")
     161            0 :         .worker_threads(1)
     162            0 :         .enable_all()
     163            0 :         .build()
     164            0 :         .expect("Failed to create broker runtime")
     165            0 : });
        

Generated by: LCOV version 2.1-beta