LCOV - differential code coverage report
Current view: top level - safekeeper/src - debug_dump.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.1 % 174 155 19 155
Current Date: 2024-01-09 02:06:09 Functions: 38.0 % 108 41 67 41
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Utils for dumping full state of the safekeeper.
       2                 : 
       3                 : use std::fs;
       4                 : use std::fs::DirEntry;
       5                 : use std::io::BufReader;
       6                 : use std::io::Read;
       7                 : use std::path::PathBuf;
       8                 : use std::sync::Arc;
       9                 : 
      10                 : use anyhow::bail;
      11                 : use anyhow::Result;
      12                 : use camino::Utf8Path;
      13                 : use chrono::{DateTime, Utc};
      14                 : use postgres_ffi::XLogSegNo;
      15                 : use postgres_ffi::MAX_SEND_SIZE;
      16                 : use serde::Deserialize;
      17                 : use serde::Serialize;
      18                 : 
      19                 : use sha2::{Digest, Sha256};
      20                 : use utils::id::NodeId;
      21                 : use utils::id::TenantTimelineId;
      22                 : use utils::id::{TenantId, TimelineId};
      23                 : use utils::lsn::Lsn;
      24                 : 
      25                 : use crate::safekeeper::SafeKeeperState;
      26                 : use crate::safekeeper::SafekeeperMemState;
      27                 : use crate::safekeeper::TermHistory;
      28                 : use crate::SafeKeeperConf;
      29                 : 
      30                 : use crate::send_wal::WalSenderState;
      31                 : use crate::wal_storage::WalReader;
      32                 : use crate::GlobalTimelines;
      33                 : 
      34                 : /// Various filters that influence the resulting JSON output.
      35 CBC          10 : #[derive(Debug, Serialize, Deserialize, Clone)]
      36                 : pub struct Args {
      37                 :     /// Dump all available safekeeper state. False by default.
      38                 :     pub dump_all: bool,
      39                 : 
      40                 :     /// Dump control_file content. Uses value of `dump_all` by default.
      41                 :     pub dump_control_file: bool,
      42                 : 
      43                 :     /// Dump in-memory state. Uses value of `dump_all` by default.
      44                 :     pub dump_memory: bool,
      45                 : 
      46                 :     /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
      47                 :     pub dump_disk_content: bool,
      48                 : 
      49                 :     /// Dump full term history. True by default.
      50                 :     pub dump_term_history: bool,
      51                 : 
      52                 :     /// Filter timelines by tenant_id.
      53                 :     pub tenant_id: Option<TenantId>,
      54                 : 
      55                 :     /// Filter timelines by timeline_id.
      56                 :     pub timeline_id: Option<TimelineId>,
      57                 : }
      58                 : 
      59                 : /// Response for debug dump request.
      60               5 : #[derive(Debug, Serialize)]
      61                 : pub struct Response {
      62                 :     pub start_time: DateTime<Utc>,
      63                 :     pub finish_time: DateTime<Utc>,
      64                 :     pub timelines: Vec<TimelineDumpSer>,
      65                 :     pub timelines_count: usize,
      66                 :     pub config: Config,
      67                 : }
      68                 : 
      69                 : pub struct TimelineDumpSer {
      70                 :     pub tli: Arc<crate::timeline::Timeline>,
      71                 :     pub args: Args,
      72                 :     pub runtime: Arc<tokio::runtime::Runtime>,
      73                 : }
      74                 : 
      75                 : impl std::fmt::Debug for TimelineDumpSer {
      76 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      77               0 :         f.debug_struct("TimelineDumpSer")
      78               0 :             .field("tli", &self.tli.ttid)
      79               0 :             .field("args", &self.args)
      80               0 :             .finish()
      81               0 :     }
      82                 : }
      83                 : 
      84                 : impl Serialize for TimelineDumpSer {
      85 CBC           5 :     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
      86               5 :     where
      87               5 :         S: serde::Serializer,
      88               5 :     {
      89               5 :         let dump = self
      90               5 :             .runtime
      91               5 :             .block_on(build_from_tli_dump(self.tli.clone(), self.args.clone()));
      92               5 :         dump.serialize(serializer)
      93               5 :     }
      94                 : }
      95                 : 
      96               5 : async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Args) -> Timeline {
      97               5 :     let control_file = if args.dump_control_file {
      98               5 :         let mut state = timeline.get_state().await.1;
      99               5 :         if !args.dump_term_history {
     100 UBC           0 :             state.acceptor_state.term_history = TermHistory(vec![]);
     101 CBC           5 :         }
     102               5 :         Some(state)
     103                 :     } else {
     104 UBC           0 :         None
     105                 :     };
     106                 : 
     107 CBC           5 :     let memory = if args.dump_memory {
     108               5 :         Some(timeline.memory_dump().await)
     109                 :     } else {
     110 UBC           0 :         None
     111                 :     };
     112                 : 
     113 CBC           5 :     let disk_content = if args.dump_disk_content {
     114                 :         // build_disk_content can fail, but we don't want to fail the whole
     115                 :         // request because of that.
     116               5 :         build_disk_content(&timeline.timeline_dir).ok()
     117                 :     } else {
     118 UBC           0 :         None
     119                 :     };
     120                 : 
     121 CBC           5 :     Timeline {
     122               5 :         tenant_id: timeline.ttid.tenant_id,
     123               5 :         timeline_id: timeline.ttid.timeline_id,
     124               5 :         control_file,
     125               5 :         memory,
     126               5 :         disk_content,
     127               5 :     }
     128               5 : }
     129                 : 
     130                 : /// Safekeeper configuration.
     131              15 : #[derive(Debug, Serialize, Deserialize)]
     132                 : pub struct Config {
     133                 :     pub id: NodeId,
     134                 :     pub workdir: PathBuf,
     135                 :     pub listen_pg_addr: String,
     136                 :     pub listen_http_addr: String,
     137                 :     pub no_sync: bool,
     138                 :     pub max_offloader_lag_bytes: u64,
     139                 :     pub wal_backup_enabled: bool,
     140                 : }
     141                 : 
     142              11 : #[derive(Debug, Serialize, Deserialize)]
     143                 : pub struct Timeline {
     144                 :     pub tenant_id: TenantId,
     145                 :     pub timeline_id: TimelineId,
     146                 :     pub control_file: Option<SafeKeeperState>,
     147                 :     pub memory: Option<Memory>,
     148                 :     pub disk_content: Option<DiskContent>,
     149                 : }
     150                 : 
     151              27 : #[derive(Debug, Serialize, Deserialize)]
     152                 : pub struct Memory {
     153                 :     pub is_cancelled: bool,
     154                 :     pub peers_info_len: usize,
     155                 :     pub walsenders: Vec<WalSenderState>,
     156                 :     pub wal_backup_active: bool,
     157                 :     pub active: bool,
     158                 :     pub num_computes: u32,
     159                 :     pub last_removed_segno: XLogSegNo,
     160                 :     pub epoch_start_lsn: Lsn,
     161                 :     pub mem_state: SafekeeperMemState,
     162                 : 
     163                 :     // PhysicalStorage state.
     164                 :     pub write_lsn: Lsn,
     165                 :     pub write_record_lsn: Lsn,
     166                 :     pub flush_lsn: Lsn,
     167                 :     pub file_open: bool,
     168                 : }
     169                 : 
     170               5 : #[derive(Debug, Serialize, Deserialize)]
     171                 : pub struct DiskContent {
     172                 :     pub files: Vec<FileInfo>,
     173                 : }
     174                 : 
     175              39 : #[derive(Debug, Serialize, Deserialize)]
     176                 : pub struct FileInfo {
     177                 :     pub name: String,
     178                 :     pub size: u64,
     179                 :     pub created: DateTime<Utc>,
     180                 :     pub modified: DateTime<Utc>,
     181                 :     pub start_zeroes: u64,
     182                 :     pub end_zeroes: u64,
     183                 :     // TODO: add sha256 checksum
     184                 : }
     185                 : 
     186                 : /// Build debug dump response, using the provided [`Args`] filters.
     187               5 : pub async fn build(args: Args) -> Result<Response> {
     188               5 :     let start_time = Utc::now();
     189               5 :     let timelines_count = GlobalTimelines::timelines_count();
     190                 : 
     191               5 :     let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
     192                 :         // If both tenant_id and timeline_id are specified, we can just get the
     193                 :         // timeline directly, without taking a snapshot of the whole list.
     194               1 :         let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
     195               1 :         if let Ok(tli) = GlobalTimelines::get(ttid) {
     196               1 :             vec![tli]
     197                 :         } else {
     198 UBC           0 :             vec![]
     199                 :         }
     200                 :     } else {
     201                 :         // Otherwise, take a snapshot of the whole list.
     202 CBC           4 :         GlobalTimelines::get_all()
     203                 :     };
     204                 : 
     205               5 :     let mut timelines = Vec::new();
     206               5 :     let runtime = Arc::new(
     207               5 :         tokio::runtime::Builder::new_current_thread()
     208               5 :             .build()
     209               5 :             .unwrap(),
     210               5 :     );
     211              10 :     for tli in ptrs_snapshot {
     212               5 :         let ttid = tli.ttid;
     213               5 :         if let Some(tenant_id) = args.tenant_id {
     214               1 :             if tenant_id != ttid.tenant_id {
     215 UBC           0 :                 continue;
     216 CBC           1 :             }
     217               4 :         }
     218               5 :         if let Some(timeline_id) = args.timeline_id {
     219               1 :             if timeline_id != ttid.timeline_id {
     220 UBC           0 :                 continue;
     221 CBC           1 :             }
     222               4 :         }
     223                 : 
     224               5 :         timelines.push(TimelineDumpSer {
     225               5 :             tli,
     226               5 :             args: args.clone(),
     227               5 :             runtime: runtime.clone(),
     228               5 :         });
     229                 :     }
     230                 : 
     231               5 :     let config = GlobalTimelines::get_global_config();
     232               5 : 
     233               5 :     Ok(Response {
     234               5 :         start_time,
     235               5 :         finish_time: Utc::now(),
     236               5 :         timelines,
     237               5 :         timelines_count,
     238               5 :         config: build_config(config),
     239               5 :     })
     240               5 : }
     241                 : 
     242                 : /// Builds DiskContent from a directory path. It can fail if the directory
     243                 : /// is deleted between the time we get the path and the time we try to open it.
     244               5 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
     245               5 :     let mut files = Vec::new();
     246              11 :     for entry in fs::read_dir(path)? {
     247              11 :         if entry.is_err() {
     248 UBC           0 :             continue;
     249 CBC          11 :         }
     250              11 :         let file = build_file_info(entry?);
     251              11 :         if file.is_err() {
     252 UBC           0 :             continue;
     253 CBC          11 :         }
     254              11 :         files.push(file?);
     255                 :     }
     256                 : 
     257               5 :     Ok(DiskContent { files })
     258               5 : }
     259                 : 
     260                 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
     261                 : /// if the file is deleted between the time we get the DirEntry
     262                 : /// and the time we try to open it.
     263              11 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
     264              11 :     let metadata = entry.metadata()?;
     265              11 :     let path = entry.path();
     266              11 :     let name = path
     267              11 :         .file_name()
     268              11 :         .and_then(|x| x.to_str())
     269              11 :         .unwrap_or("")
     270              11 :         .to_owned();
     271              11 :     let mut file = fs::File::open(path)?;
     272       100664508 :     let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
     273              11 : 
     274        34533291 :     let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
     275              11 :     let mut end_zeroes = 0;
     276        66131228 :     for b in reader {
     277        66131217 :         if b == 0 {
     278        58077587 :             end_zeroes += 1;
     279        58077587 :         } else {
     280         8053630 :             end_zeroes = 0;
     281         8053630 :         }
     282                 :     }
     283                 : 
     284                 :     Ok(FileInfo {
     285              11 :         name,
     286              11 :         size: metadata.len(),
     287              11 :         created: DateTime::from(metadata.created()?),
     288              11 :         modified: DateTime::from(metadata.modified()?),
     289              11 :         start_zeroes,
     290              11 :         end_zeroes,
     291                 :     })
     292              11 : }
     293                 : 
     294                 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
     295                 : /// supposed to be exposed.
     296               5 : fn build_config(config: SafeKeeperConf) -> Config {
     297               5 :     Config {
     298               5 :         id: config.my_id,
     299               5 :         workdir: config.workdir.into(),
     300               5 :         listen_pg_addr: config.listen_pg_addr,
     301               5 :         listen_http_addr: config.listen_http_addr,
     302               5 :         no_sync: config.no_sync,
     303               5 :         max_offloader_lag_bytes: config.max_offloader_lag_bytes,
     304               5 :         wal_backup_enabled: config.wal_backup_enabled,
     305               5 :     }
     306               5 : }
     307                 : 
     308 UBC           0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     309                 : pub struct TimelineDigestRequest {
     310                 :     pub from_lsn: Lsn,
     311                 :     pub until_lsn: Lsn,
     312                 : }
     313                 : 
     314 CBC          64 : #[derive(Debug, Serialize, Deserialize)]
     315                 : pub struct TimelineDigest {
     316                 :     pub sha256: String,
     317                 : }
     318                 : 
     319              64 : pub async fn calculate_digest(
     320              64 :     tli: &Arc<crate::timeline::Timeline>,
     321              64 :     request: TimelineDigestRequest,
     322              64 : ) -> Result<TimelineDigest> {
     323              64 :     if request.from_lsn > request.until_lsn {
     324 UBC           0 :         bail!("from_lsn is greater than until_lsn");
     325 CBC          64 :     }
     326              64 : 
     327              64 :     let conf = GlobalTimelines::get_global_config();
     328              64 :     let (_, persisted_state) = tli.get_state().await;
     329                 : 
     330              64 :     if persisted_state.timeline_start_lsn > request.from_lsn {
     331 UBC           0 :         bail!("requested LSN is before the start of the timeline");
     332 CBC          64 :     }
     333                 : 
     334              64 :     let mut wal_reader = WalReader::new(
     335              64 :         conf.workdir.clone(),
     336              64 :         tli.timeline_dir.clone(),
     337              64 :         &persisted_state,
     338              64 :         request.from_lsn,
     339              64 :         true,
     340              64 :     )?;
     341                 : 
     342              64 :     let mut hasher = Sha256::new();
     343              64 :     let mut buf = [0u8; MAX_SEND_SIZE];
     344              64 : 
     345              64 :     let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
     346            6828 :     while bytes_left > 0 {
     347            6764 :         let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
     348            6764 :         let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
     349            6764 :         if bytes_read == 0 {
     350 UBC           0 :             bail!("wal_reader.read returned 0 bytes");
     351 CBC        6764 :         }
     352            6764 :         hasher.update(&buf[..bytes_read]);
     353            6764 :         bytes_left -= bytes_read;
     354                 :     }
     355                 : 
     356              64 :     let digest = hasher.finalize();
     357              64 :     let digest = hex::encode(digest);
     358              64 :     Ok(TimelineDigest { sha256: digest })
     359              64 : }
        

Generated by: LCOV version 2.1-beta