LCOV - differential code coverage report
Current view: top level - safekeeper/src - debug_dump.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 90.9 % 110 100 10 100
Current Date: 2023-10-19 02:04:12 Functions: 43.0 % 93 40 53 40
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Utils for dumping full state of the safekeeper.
       2                 : 
       3                 : use std::fs;
       4                 : use std::fs::DirEntry;
       5                 : use std::io::BufReader;
       6                 : use std::io::Read;
       7                 : use std::path::PathBuf;
       8                 : 
       9                 : use anyhow::Result;
      10                 : use camino::Utf8Path;
      11                 : use chrono::{DateTime, Utc};
      12                 : use postgres_ffi::XLogSegNo;
      13                 : use serde::Deserialize;
      14                 : use serde::Serialize;
      15                 : 
      16                 : use serde_with::{serde_as, DisplayFromStr};
      17                 : use utils::id::NodeId;
      18                 : use utils::id::TenantTimelineId;
      19                 : use utils::id::{TenantId, TimelineId};
      20                 : use utils::lsn::Lsn;
      21                 : 
      22                 : use crate::safekeeper::SafeKeeperState;
      23                 : use crate::safekeeper::SafekeeperMemState;
      24                 : use crate::safekeeper::TermHistory;
      25                 : use crate::SafeKeeperConf;
      26                 : 
      27                 : use crate::send_wal::WalSenderState;
      28                 : use crate::GlobalTimelines;
      29                 : 
      30                 : /// Various filters that influence the resulting JSON output.
      31 UBC           0 : #[derive(Debug, Serialize, Deserialize)]
      32                 : pub struct Args {
      33                 :     /// Dump all available safekeeper state. False by default.
      34                 :     pub dump_all: bool,
      35                 : 
      36                 :     /// Dump control_file content. Uses value of `dump_all` by default.
      37                 :     pub dump_control_file: bool,
      38                 : 
      39                 :     /// Dump in-memory state. Uses value of `dump_all` by default.
      40                 :     pub dump_memory: bool,
      41                 : 
      42                 :     /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
      43                 :     pub dump_disk_content: bool,
      44                 : 
      45                 :     /// Dump full term history. True by default.
      46                 :     pub dump_term_history: bool,
      47                 : 
      48                 :     /// Filter timelines by tenant_id.
      49                 :     pub tenant_id: Option<TenantId>,
      50                 : 
      51                 :     /// Filter timelines by timeline_id.
      52                 :     pub timeline_id: Option<TimelineId>,
      53                 : }
      54                 : 
      55                 : /// Response for debug dump request.
      56 CBC          11 : #[derive(Debug, Serialize, Deserialize)]
      57                 : pub struct Response {
      58                 :     pub start_time: DateTime<Utc>,
      59                 :     pub finish_time: DateTime<Utc>,
      60                 :     pub timelines: Vec<Timeline>,
      61                 :     pub timelines_count: usize,
      62                 :     pub config: Config,
      63                 : }
      64                 : 
      65                 : /// Safekeeper configuration.
      66              15 : #[derive(Debug, Serialize, Deserialize)]
      67                 : pub struct Config {
      68                 :     pub id: NodeId,
      69                 :     pub workdir: PathBuf,
      70                 :     pub listen_pg_addr: String,
      71                 :     pub listen_http_addr: String,
      72                 :     pub no_sync: bool,
      73                 :     pub max_offloader_lag_bytes: u64,
      74                 :     pub wal_backup_enabled: bool,
      75                 : }
      76                 : 
      77                 : #[serde_as]
      78              13 : #[derive(Debug, Serialize, Deserialize)]
      79                 : pub struct Timeline {
      80                 :     #[serde_as(as = "DisplayFromStr")]
      81                 :     pub tenant_id: TenantId,
      82                 :     #[serde_as(as = "DisplayFromStr")]
      83                 :     pub timeline_id: TimelineId,
      84                 :     pub control_file: Option<SafeKeeperState>,
      85                 :     pub memory: Option<Memory>,
      86                 :     pub disk_content: Option<DiskContent>,
      87                 : }
      88                 : 
      89              27 : #[derive(Debug, Serialize, Deserialize)]
      90                 : pub struct Memory {
      91                 :     pub is_cancelled: bool,
      92                 :     pub peers_info_len: usize,
      93                 :     pub walsenders: Vec<WalSenderState>,
      94                 :     pub wal_backup_active: bool,
      95                 :     pub active: bool,
      96                 :     pub num_computes: u32,
      97                 :     pub last_removed_segno: XLogSegNo,
      98                 :     pub epoch_start_lsn: Lsn,
      99                 :     pub mem_state: SafekeeperMemState,
     100                 : 
     101                 :     // PhysicalStorage state.
     102                 :     pub write_lsn: Lsn,
     103                 :     pub write_record_lsn: Lsn,
     104                 :     pub flush_lsn: Lsn,
     105                 :     pub file_open: bool,
     106                 : }
     107                 : 
     108               5 : #[derive(Debug, Serialize, Deserialize)]
     109                 : pub struct DiskContent {
     110                 :     pub files: Vec<FileInfo>,
     111                 : }
     112                 : 
     113              39 : #[derive(Debug, Serialize, Deserialize)]
     114                 : pub struct FileInfo {
     115                 :     pub name: String,
     116                 :     pub size: u64,
     117                 :     pub created: DateTime<Utc>,
     118                 :     pub modified: DateTime<Utc>,
     119                 :     pub start_zeroes: u64,
     120                 :     pub end_zeroes: u64,
     121                 :     // TODO: add sha256 checksum
     122                 : }
     123                 : 
     124                 : /// Build debug dump response, using the provided [`Args`] filters.
     125               5 : pub async fn build(args: Args) -> Result<Response> {
     126               5 :     let start_time = Utc::now();
     127               5 :     let timelines_count = GlobalTimelines::timelines_count();
     128                 : 
     129               5 :     let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
     130                 :         // If both tenant_id and timeline_id are specified, we can just get the
     131                 :         // timeline directly, without taking a snapshot of the whole list.
     132               1 :         let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
     133               1 :         if let Ok(tli) = GlobalTimelines::get(ttid) {
     134               1 :             vec![tli]
     135                 :         } else {
     136 UBC           0 :             vec![]
     137                 :         }
     138                 :     } else {
     139                 :         // Otherwise, take a snapshot of the whole list.
     140 CBC           4 :         GlobalTimelines::get_all()
     141                 :     };
     142                 : 
     143                 :     // TODO: return Stream instead of Vec
     144               5 :     let mut timelines = Vec::new();
     145              10 :     for tli in ptrs_snapshot {
     146               5 :         let ttid = tli.ttid;
     147               5 :         if let Some(tenant_id) = args.tenant_id {
     148               1 :             if tenant_id != ttid.tenant_id {
     149 UBC           0 :                 continue;
     150 CBC           1 :             }
     151               4 :         }
     152               5 :         if let Some(timeline_id) = args.timeline_id {
     153               1 :             if timeline_id != ttid.timeline_id {
     154 UBC           0 :                 continue;
     155 CBC           1 :             }
     156               4 :         }
     157                 : 
     158               5 :         let control_file = if args.dump_control_file {
     159               5 :             let mut state = tli.get_state().await.1;
     160               5 :             if !args.dump_term_history {
     161 UBC           0 :                 state.acceptor_state.term_history = TermHistory(vec![]);
     162 CBC           5 :             }
     163               5 :             Some(state)
     164                 :         } else {
     165 UBC           0 :             None
     166                 :         };
     167                 : 
     168 CBC           5 :         let memory = if args.dump_memory {
     169               5 :             Some(tli.memory_dump().await)
     170                 :         } else {
     171 UBC           0 :             None
     172                 :         };
     173                 : 
     174 CBC           5 :         let disk_content = if args.dump_disk_content {
     175                 :             // build_disk_content can fail, but we don't want to fail the whole
     176                 :             // request because of that.
     177               5 :             build_disk_content(&tli.timeline_dir).ok()
     178                 :         } else {
     179 UBC           0 :             None
     180                 :         };
     181                 : 
     182 CBC           5 :         let timeline = Timeline {
     183               5 :             tenant_id: ttid.tenant_id,
     184               5 :             timeline_id: ttid.timeline_id,
     185               5 :             control_file,
     186               5 :             memory,
     187               5 :             disk_content,
     188               5 :         };
     189               5 :         timelines.push(timeline);
     190                 :     }
     191                 : 
     192               5 :     let config = GlobalTimelines::get_global_config();
     193               5 : 
     194               5 :     Ok(Response {
     195               5 :         start_time,
     196               5 :         finish_time: Utc::now(),
     197               5 :         timelines,
     198               5 :         timelines_count,
     199               5 :         config: build_config(config),
     200               5 :     })
     201               5 : }
     202                 : 
     203                 : /// Builds DiskContent from a directory path. It can fail if the directory
     204                 : /// is deleted between the time we get the path and the time we try to open it.
     205               5 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
     206               5 :     let mut files = Vec::new();
     207              11 :     for entry in fs::read_dir(path)? {
     208              11 :         if entry.is_err() {
     209 UBC           0 :             continue;
     210 CBC          11 :         }
     211              11 :         let file = build_file_info(entry?);
     212              11 :         if file.is_err() {
     213 UBC           0 :             continue;
     214 CBC          11 :         }
     215              11 :         files.push(file?);
     216                 :     }
     217                 : 
     218               5 :     Ok(DiskContent { files })
     219               5 : }
     220                 : 
     221                 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
     222                 : /// if the file is deleted between the time we get the DirEntry
     223                 : /// and the time we try to open it.
     224              11 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
     225              11 :     let metadata = entry.metadata()?;
     226              11 :     let path = entry.path();
     227              11 :     let name = path
     228              11 :         .file_name()
     229              11 :         .and_then(|x| x.to_str())
     230              11 :         .unwrap_or("")
     231              11 :         .to_owned();
     232              11 :     let mut file = fs::File::open(path)?;
     233       100664508 :     let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
     234              11 : 
     235        34530451 :     let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
     236              11 :     let mut end_zeroes = 0;
     237        66134068 :     for b in reader {
     238        66134057 :         if b == 0 {
     239        58076784 :             end_zeroes += 1;
     240        58076784 :         } else {
     241         8057273 :             end_zeroes = 0;
     242         8057273 :         }
     243                 :     }
     244                 : 
     245                 :     Ok(FileInfo {
     246              11 :         name,
     247              11 :         size: metadata.len(),
     248              11 :         created: DateTime::from(metadata.created()?),
     249              11 :         modified: DateTime::from(metadata.modified()?),
     250              11 :         start_zeroes,
     251              11 :         end_zeroes,
     252                 :     })
     253              11 : }
     254                 : 
     255                 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
     256                 : /// supposed to be exposed.
     257               5 : fn build_config(config: SafeKeeperConf) -> Config {
     258               5 :     Config {
     259               5 :         id: config.my_id,
     260               5 :         workdir: config.workdir.into(),
     261               5 :         listen_pg_addr: config.listen_pg_addr,
     262               5 :         listen_http_addr: config.listen_http_addr,
     263               5 :         no_sync: config.no_sync,
     264               5 :         max_offloader_lag_bytes: config.max_offloader_lag_bytes,
     265               5 :         wal_backup_enabled: config.wal_backup_enabled,
     266               5 :     }
     267               5 : }
        

Generated by: LCOV version 2.1-beta