LCOV - code coverage report
Current view: top level - safekeeper/src - debug_dump.rs (source / functions) Coverage Total Hit
Test: 52d9d4a58355424a48c56cb9ba9670a073f618b9.info Lines: 0.0 % 195 0
Test Date: 2024-11-21 08:31:22 Functions: 0.0 % 80 0

            Line data    Source code
       1              : //! Utils for dumping full state of the safekeeper.
       2              : 
       3              : use std::fs;
       4              : use std::fs::DirEntry;
       5              : use std::io::BufReader;
       6              : use std::io::Read;
       7              : use std::path::PathBuf;
       8              : use std::sync::Arc;
       9              : 
      10              : use anyhow::bail;
      11              : use anyhow::Result;
      12              : use camino::Utf8Path;
      13              : use camino::Utf8PathBuf;
      14              : use chrono::{DateTime, Utc};
      15              : use postgres_ffi::XLogSegNo;
      16              : use postgres_ffi::MAX_SEND_SIZE;
      17              : use serde::Deserialize;
      18              : use serde::Serialize;
      19              : 
      20              : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
      21              : use sha2::{Digest, Sha256};
      22              : use utils::id::NodeId;
      23              : use utils::id::TenantTimelineId;
      24              : use utils::id::{TenantId, TimelineId};
      25              : use utils::lsn::Lsn;
      26              : 
      27              : use crate::safekeeper::TermHistory;
      28              : use crate::send_wal::WalSenderState;
      29              : use crate::state::TimelineMemState;
      30              : use crate::state::TimelinePersistentState;
      31              : use crate::timeline::get_timeline_dir;
      32              : use crate::timeline::WalResidentTimeline;
      33              : use crate::timeline_manager;
      34              : use crate::GlobalTimelines;
      35              : use crate::SafeKeeperConf;
      36              : 
      37              : /// Various filters that influence the resulting JSON output.
      38            0 : #[derive(Debug, Serialize, Deserialize, Clone)]
      39              : pub struct Args {
      40              :     /// Dump all available safekeeper state. False by default.
      41              :     pub dump_all: bool,
      42              : 
      43              :     /// Dump control_file content. Uses value of `dump_all` by default.
      44              :     pub dump_control_file: bool,
      45              : 
      46              :     /// Dump in-memory state. Uses value of `dump_all` by default.
      47              :     pub dump_memory: bool,
      48              : 
      49              :     /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
      50              :     pub dump_disk_content: bool,
      51              : 
      52              :     /// Dump full term history. True by default.
      53              :     pub dump_term_history: bool,
      54              : 
      55              :     /// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
      56              :     pub dump_wal_last_modified: bool,
      57              : 
      58              :     /// Filter timelines by tenant_id.
      59              :     pub tenant_id: Option<TenantId>,
      60              : 
      61              :     /// Filter timelines by timeline_id.
      62              :     pub timeline_id: Option<TimelineId>,
      63              : }
      64              : 
      65              : /// Response for debug dump request.
      66              : #[derive(Debug, Serialize)]
      67              : pub struct Response {
      68              :     pub start_time: DateTime<Utc>,
      69              :     pub finish_time: DateTime<Utc>,
      70              :     pub timelines: Vec<TimelineDumpSer>,
      71              :     pub timelines_count: usize,
      72              :     pub config: Config,
      73              : }
      74              : 
      75              : pub struct TimelineDumpSer {
      76              :     pub tli: Arc<crate::timeline::Timeline>,
      77              :     pub args: Args,
      78              :     pub timeline_dir: Utf8PathBuf,
      79              :     pub runtime: Arc<tokio::runtime::Runtime>,
      80              : }
      81              : 
      82              : impl std::fmt::Debug for TimelineDumpSer {
      83            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      84            0 :         f.debug_struct("TimelineDumpSer")
      85            0 :             .field("tli", &self.tli.ttid)
      86            0 :             .field("args", &self.args)
      87            0 :             .finish()
      88            0 :     }
      89              : }
      90              : 
      91              : impl Serialize for TimelineDumpSer {
      92            0 :     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
      93            0 :     where
      94            0 :         S: serde::Serializer,
      95            0 :     {
      96            0 :         let dump = self.runtime.block_on(build_from_tli_dump(
      97            0 :             &self.tli,
      98            0 :             &self.args,
      99            0 :             &self.timeline_dir,
     100            0 :         ));
     101            0 :         dump.serialize(serializer)
     102            0 :     }
     103              : }
     104              : 
     105            0 : async fn build_from_tli_dump(
     106            0 :     timeline: &Arc<crate::timeline::Timeline>,
     107            0 :     args: &Args,
     108            0 :     timeline_dir: &Utf8Path,
     109            0 : ) -> Timeline {
     110            0 :     let control_file = if args.dump_control_file {
     111            0 :         let mut state = timeline.get_state().await.1;
     112            0 :         if !args.dump_term_history {
     113            0 :             state.acceptor_state.term_history = TermHistory(vec![]);
     114            0 :         }
     115            0 :         Some(state)
     116              :     } else {
     117            0 :         None
     118              :     };
     119              : 
     120            0 :     let memory = if args.dump_memory {
     121            0 :         Some(timeline.memory_dump().await)
     122              :     } else {
     123            0 :         None
     124              :     };
     125              : 
     126            0 :     let disk_content = if args.dump_disk_content {
     127              :         // build_disk_content can fail, but we don't want to fail the whole
     128              :         // request because of that.
     129              :         // Note: timeline can be in offloaded state, this is not a problem.
     130            0 :         build_disk_content(timeline_dir).ok()
     131              :     } else {
     132            0 :         None
     133              :     };
     134              : 
     135            0 :     let wal_last_modified = if args.dump_wal_last_modified {
     136            0 :         get_wal_last_modified(timeline_dir).ok().flatten()
     137              :     } else {
     138            0 :         None
     139              :     };
     140              : 
     141            0 :     Timeline {
     142            0 :         tenant_id: timeline.ttid.tenant_id,
     143            0 :         timeline_id: timeline.ttid.timeline_id,
     144            0 :         control_file,
     145            0 :         memory,
     146            0 :         disk_content,
     147            0 :         wal_last_modified,
     148            0 :     }
     149            0 : }
     150              : 
     151              : /// Safekeeper configuration.
     152            0 : #[derive(Debug, Serialize, Deserialize)]
     153              : pub struct Config {
     154              :     pub id: NodeId,
     155              :     pub workdir: PathBuf,
     156              :     pub listen_pg_addr: String,
     157              :     pub listen_http_addr: String,
     158              :     pub no_sync: bool,
     159              :     pub max_offloader_lag_bytes: u64,
     160              :     pub wal_backup_enabled: bool,
     161              : }
     162              : 
     163            0 : #[derive(Debug, Serialize, Deserialize)]
     164              : pub struct Timeline {
     165              :     pub tenant_id: TenantId,
     166              :     pub timeline_id: TimelineId,
     167              :     pub control_file: Option<TimelinePersistentState>,
     168              :     pub memory: Option<Memory>,
     169              :     pub disk_content: Option<DiskContent>,
     170              :     pub wal_last_modified: Option<DateTime<Utc>>,
     171              : }
     172              : 
     173            0 : #[derive(Debug, Serialize, Deserialize)]
     174              : pub struct Memory {
     175              :     pub is_cancelled: bool,
     176              :     pub peers_info_len: usize,
     177              :     pub walsenders: Vec<WalSenderState>,
     178              :     pub wal_backup_active: bool,
     179              :     pub active: bool,
     180              :     pub num_computes: u32,
     181              :     pub last_removed_segno: XLogSegNo,
     182              :     pub epoch_start_lsn: Lsn,
     183              :     pub mem_state: TimelineMemState,
     184              :     pub mgr_status: timeline_manager::Status,
     185              : 
     186              :     // PhysicalStorage state.
     187              :     pub write_lsn: Lsn,
     188              :     pub write_record_lsn: Lsn,
     189              :     pub flush_lsn: Lsn,
     190              :     pub file_open: bool,
     191              : }
     192              : 
     193            0 : #[derive(Debug, Serialize, Deserialize)]
     194              : pub struct DiskContent {
     195              :     pub files: Vec<FileInfo>,
     196              : }
     197              : 
     198            0 : #[derive(Debug, Serialize, Deserialize)]
     199              : pub struct FileInfo {
     200              :     pub name: String,
     201              :     pub size: u64,
     202              :     pub created: DateTime<Utc>,
     203              :     pub modified: DateTime<Utc>,
     204              :     pub start_zeroes: u64,
     205              :     pub end_zeroes: u64,
     206              :     // TODO: add sha256 checksum
     207              : }
     208              : 
     209              : /// Build debug dump response, using the provided [`Args`] filters.
     210            0 : pub async fn build(args: Args) -> Result<Response> {
     211            0 :     let start_time = Utc::now();
     212            0 :     let timelines_count = GlobalTimelines::timelines_count();
     213            0 :     let config = GlobalTimelines::get_global_config();
     214              : 
     215            0 :     let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
     216              :         // If both tenant_id and timeline_id are specified, we can just get the
     217              :         // timeline directly, without taking a snapshot of the whole list.
     218            0 :         let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
     219            0 :         if let Ok(tli) = GlobalTimelines::get(ttid) {
     220            0 :             vec![tli]
     221              :         } else {
     222            0 :             vec![]
     223              :         }
     224              :     } else {
     225              :         // Otherwise, take a snapshot of the whole list.
     226            0 :         GlobalTimelines::get_all()
     227              :     };
     228              : 
     229            0 :     let mut timelines = Vec::new();
     230            0 :     let runtime = Arc::new(
     231            0 :         tokio::runtime::Builder::new_current_thread()
     232            0 :             .build()
     233            0 :             .unwrap(),
     234            0 :     );
     235            0 :     for tli in ptrs_snapshot {
     236            0 :         let ttid = tli.ttid;
     237            0 :         if let Some(tenant_id) = args.tenant_id {
     238            0 :             if tenant_id != ttid.tenant_id {
     239            0 :                 continue;
     240            0 :             }
     241            0 :         }
     242            0 :         if let Some(timeline_id) = args.timeline_id {
     243            0 :             if timeline_id != ttid.timeline_id {
     244            0 :                 continue;
     245            0 :             }
     246            0 :         }
     247              : 
     248            0 :         timelines.push(TimelineDumpSer {
     249            0 :             tli,
     250            0 :             args: args.clone(),
     251            0 :             timeline_dir: get_timeline_dir(&config, &ttid),
     252            0 :             runtime: runtime.clone(),
     253            0 :         });
     254              :     }
     255              : 
     256              :     // Tokio forbids to drop runtime in async context, so this is a stupid way
     257              :     // to drop it in non async context.
     258            0 :     tokio::task::spawn_blocking(move || {
     259            0 :         let _r = runtime;
     260            0 :     })
     261            0 :     .await?;
     262              : 
     263            0 :     Ok(Response {
     264            0 :         start_time,
     265            0 :         finish_time: Utc::now(),
     266            0 :         timelines,
     267            0 :         timelines_count,
     268            0 :         config: build_config(config),
     269            0 :     })
     270            0 : }
     271              : 
     272              : /// Builds DiskContent from a directory path. It can fail if the directory
     273              : /// is deleted between the time we get the path and the time we try to open it.
     274            0 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
     275            0 :     let mut files = Vec::new();
     276            0 :     for entry in fs::read_dir(path)? {
     277            0 :         if entry.is_err() {
     278            0 :             continue;
     279            0 :         }
     280            0 :         let file = build_file_info(entry?);
     281            0 :         if file.is_err() {
     282            0 :             continue;
     283            0 :         }
     284            0 :         files.push(file?);
     285              :     }
     286              : 
     287            0 :     Ok(DiskContent { files })
     288            0 : }
     289              : 
     290              : /// Builds FileInfo from DirEntry. Sometimes it can return an error
     291              : /// if the file is deleted between the time we get the DirEntry
     292              : /// and the time we try to open it.
     293            0 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
     294            0 :     let metadata = entry.metadata()?;
     295            0 :     let path = entry.path();
     296            0 :     let name = path
     297            0 :         .file_name()
     298            0 :         .and_then(|x| x.to_str())
     299            0 :         .unwrap_or("")
     300            0 :         .to_owned();
     301            0 :     let mut file = fs::File::open(path)?;
     302            0 :     let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
     303            0 : 
     304            0 :     let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
     305            0 :     let mut end_zeroes = 0;
     306            0 :     for b in reader {
     307            0 :         if b == 0 {
     308            0 :             end_zeroes += 1;
     309            0 :         } else {
     310            0 :             end_zeroes = 0;
     311            0 :         }
     312              :     }
     313              : 
     314              :     Ok(FileInfo {
     315            0 :         name,
     316            0 :         size: metadata.len(),
     317            0 :         created: DateTime::from(metadata.created()?),
     318            0 :         modified: DateTime::from(metadata.modified()?),
     319            0 :         start_zeroes,
     320            0 :         end_zeroes,
     321              :     })
     322            0 : }
     323              : 
     324              : /// Get highest modified time of WAL segments in the directory.
     325            0 : fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
     326            0 :     let mut res = None;
     327            0 :     for entry in fs::read_dir(path)? {
     328            0 :         if entry.is_err() {
     329            0 :             continue;
     330            0 :         }
     331            0 :         let entry = entry?;
     332              :         /* Ignore files that are not XLOG segments */
     333            0 :         let fname = entry.file_name();
     334            0 :         if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
     335            0 :             continue;
     336            0 :         }
     337              : 
     338            0 :         let metadata = entry.metadata()?;
     339            0 :         let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
     340            0 :         res = std::cmp::max(res, Some(modified));
     341              :     }
     342            0 :     Ok(res)
     343            0 : }
     344              : 
     345              : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
     346              : /// supposed to be exposed.
     347            0 : fn build_config(config: SafeKeeperConf) -> Config {
     348            0 :     Config {
     349            0 :         id: config.my_id,
     350            0 :         workdir: config.workdir.into(),
     351            0 :         listen_pg_addr: config.listen_pg_addr,
     352            0 :         listen_http_addr: config.listen_http_addr,
     353            0 :         no_sync: config.no_sync,
     354            0 :         max_offloader_lag_bytes: config.max_offloader_lag_bytes,
     355            0 :         wal_backup_enabled: config.wal_backup_enabled,
     356            0 :     }
     357            0 : }
     358              : 
     359            0 : #[derive(Debug, Clone, Deserialize, Serialize)]
     360              : pub struct TimelineDigestRequest {
     361              :     pub from_lsn: Lsn,
     362              :     pub until_lsn: Lsn,
     363              : }
     364              : 
     365            0 : #[derive(Debug, Serialize, Deserialize)]
     366              : pub struct TimelineDigest {
     367              :     pub sha256: String,
     368              : }
     369              : 
     370            0 : pub async fn calculate_digest(
     371            0 :     tli: &WalResidentTimeline,
     372            0 :     request: TimelineDigestRequest,
     373            0 : ) -> Result<TimelineDigest> {
     374            0 :     if request.from_lsn > request.until_lsn {
     375            0 :         bail!("from_lsn is greater than until_lsn");
     376            0 :     }
     377              : 
     378            0 :     let (_, persisted_state) = tli.get_state().await;
     379            0 :     if persisted_state.timeline_start_lsn > request.from_lsn {
     380            0 :         bail!("requested LSN is before the start of the timeline");
     381            0 :     }
     382              : 
     383            0 :     let mut wal_reader = tli.get_walreader(request.from_lsn).await?;
     384              : 
     385            0 :     let mut hasher = Sha256::new();
     386            0 :     let mut buf = vec![0u8; MAX_SEND_SIZE];
     387            0 : 
     388            0 :     let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
     389            0 :     while bytes_left > 0 {
     390            0 :         let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
     391            0 :         let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
     392            0 :         if bytes_read == 0 {
     393            0 :             bail!("wal_reader.read returned 0 bytes");
     394            0 :         }
     395            0 :         hasher.update(&buf[..bytes_read]);
     396            0 :         bytes_left -= bytes_read;
     397              :     }
     398              : 
     399            0 :     let digest = hasher.finalize();
     400            0 :     let digest = hex::encode(digest);
     401            0 :     Ok(TimelineDigest { sha256: digest })
     402            0 : }
        

Generated by: LCOV version 2.1-beta