Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::{BufReader, Read};
6 : use std::path::PathBuf;
7 : use std::sync::Arc;
8 :
9 : use anyhow::{Result, bail};
10 : use camino::{Utf8Path, Utf8PathBuf};
11 : use chrono::{DateTime, Utc};
12 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
13 : use postgres_ffi::{MAX_SEND_SIZE, XLogSegNo};
14 : use safekeeper_api::models::WalSenderState;
15 : use serde::{Deserialize, Serialize};
16 : use sha2::{Digest, Sha256};
17 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
18 : use utils::lsn::Lsn;
19 :
20 : use crate::safekeeper::TermHistory;
21 : use crate::state::{TimelineMemState, TimelinePersistentState};
22 : use crate::timeline::{WalResidentTimeline, get_timeline_dir};
23 : use crate::{GlobalTimelines, SafeKeeperConf, timeline_manager};
24 :
25 : /// Various filters that influence the resulting JSON output.
26 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
27 : pub struct Args {
28 : /// Dump all available safekeeper state. False by default.
29 : pub dump_all: bool,
30 :
31 : /// Dump control_file content. Uses value of `dump_all` by default.
32 : pub dump_control_file: bool,
33 :
34 : /// Dump in-memory state. Uses value of `dump_all` by default.
35 : pub dump_memory: bool,
36 :
37 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
38 : pub dump_disk_content: bool,
39 :
40 : /// Dump full term history. True by default.
41 : pub dump_term_history: bool,
42 :
43 : /// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
44 : pub dump_wal_last_modified: bool,
45 :
46 : /// Filter timelines by tenant_id.
47 : pub tenant_id: Option<TenantId>,
48 :
49 : /// Filter timelines by timeline_id.
50 : pub timeline_id: Option<TimelineId>,
51 : }
52 :
53 : /// Response for debug dump request.
54 : #[derive(Debug, Serialize)]
55 : pub struct Response {
56 : pub start_time: DateTime<Utc>,
57 : pub finish_time: DateTime<Utc>,
58 : pub timelines: Vec<TimelineDumpSer>,
59 : pub timelines_count: usize,
60 : pub config: Config,
61 : }
62 :
63 : pub struct TimelineDumpSer {
64 : pub tli: Arc<crate::timeline::Timeline>,
65 : pub args: Args,
66 : pub timeline_dir: Utf8PathBuf,
67 : pub runtime: Arc<tokio::runtime::Runtime>,
68 : }
69 :
70 : impl std::fmt::Debug for TimelineDumpSer {
71 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 0 : f.debug_struct("TimelineDumpSer")
73 0 : .field("tli", &self.tli.ttid)
74 0 : .field("args", &self.args)
75 0 : .finish()
76 0 : }
77 : }
78 :
79 : impl Serialize for TimelineDumpSer {
80 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
81 0 : where
82 0 : S: serde::Serializer,
83 0 : {
84 0 : let dump = self.runtime.block_on(build_from_tli_dump(
85 0 : &self.tli,
86 0 : &self.args,
87 0 : &self.timeline_dir,
88 0 : ));
89 0 : dump.serialize(serializer)
90 0 : }
91 : }
92 :
93 0 : async fn build_from_tli_dump(
94 0 : timeline: &Arc<crate::timeline::Timeline>,
95 0 : args: &Args,
96 0 : timeline_dir: &Utf8Path,
97 0 : ) -> Timeline {
98 0 : let control_file = if args.dump_control_file {
99 0 : let mut state = timeline.get_state().await.1;
100 0 : if !args.dump_term_history {
101 0 : state.acceptor_state.term_history = TermHistory(vec![]);
102 0 : }
103 0 : Some(state)
104 : } else {
105 0 : None
106 : };
107 :
108 0 : let memory = if args.dump_memory {
109 0 : Some(timeline.memory_dump().await)
110 : } else {
111 0 : None
112 : };
113 :
114 0 : let disk_content = if args.dump_disk_content {
115 : // build_disk_content can fail, but we don't want to fail the whole
116 : // request because of that.
117 : // Note: timeline can be in offloaded state, this is not a problem.
118 0 : build_disk_content(timeline_dir).ok()
119 : } else {
120 0 : None
121 : };
122 :
123 0 : let wal_last_modified = if args.dump_wal_last_modified {
124 0 : get_wal_last_modified(timeline_dir).ok().flatten()
125 : } else {
126 0 : None
127 : };
128 :
129 0 : Timeline {
130 0 : tenant_id: timeline.ttid.tenant_id,
131 0 : timeline_id: timeline.ttid.timeline_id,
132 0 : control_file,
133 0 : memory,
134 0 : disk_content,
135 0 : wal_last_modified,
136 0 : }
137 0 : }
138 :
139 : /// Safekeeper configuration.
140 0 : #[derive(Debug, Serialize, Deserialize)]
141 : pub struct Config {
142 : pub id: NodeId,
143 : pub workdir: PathBuf,
144 : pub listen_pg_addr: String,
145 : pub listen_http_addr: String,
146 : pub no_sync: bool,
147 : pub max_offloader_lag_bytes: u64,
148 : pub wal_backup_enabled: bool,
149 : }
150 :
151 0 : #[derive(Debug, Serialize, Deserialize)]
152 : pub struct Timeline {
153 : pub tenant_id: TenantId,
154 : pub timeline_id: TimelineId,
155 : pub control_file: Option<TimelinePersistentState>,
156 : pub memory: Option<Memory>,
157 : pub disk_content: Option<DiskContent>,
158 : pub wal_last_modified: Option<DateTime<Utc>>,
159 : }
160 :
161 0 : #[derive(Debug, Serialize, Deserialize)]
162 : pub struct Memory {
163 : pub is_cancelled: bool,
164 : pub peers_info_len: usize,
165 : pub walsenders: Vec<WalSenderState>,
166 : pub wal_backup_active: bool,
167 : pub active: bool,
168 : pub num_computes: u32,
169 : pub last_removed_segno: XLogSegNo,
170 : pub epoch_start_lsn: Lsn,
171 : pub mem_state: TimelineMemState,
172 : pub mgr_status: timeline_manager::Status,
173 :
174 : // PhysicalStorage state.
175 : pub write_lsn: Lsn,
176 : pub write_record_lsn: Lsn,
177 : pub flush_lsn: Lsn,
178 : pub file_open: bool,
179 : }
180 :
181 0 : #[derive(Debug, Serialize, Deserialize)]
182 : pub struct DiskContent {
183 : pub files: Vec<FileInfo>,
184 : }
185 :
186 0 : #[derive(Debug, Serialize, Deserialize)]
187 : pub struct FileInfo {
188 : pub name: String,
189 : pub size: u64,
190 : pub created: DateTime<Utc>,
191 : pub modified: DateTime<Utc>,
192 : pub start_zeroes: u64,
193 : pub end_zeroes: u64,
194 : // TODO: add sha256 checksum
195 : }
196 :
197 : /// Build debug dump response, using the provided [`Args`] filters.
198 0 : pub async fn build(args: Args, global_timelines: Arc<GlobalTimelines>) -> Result<Response> {
199 0 : let start_time = Utc::now();
200 0 : let timelines_count = global_timelines.timelines_count();
201 0 : let config = global_timelines.get_global_config();
202 :
203 0 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
204 : // If both tenant_id and timeline_id are specified, we can just get the
205 : // timeline directly, without taking a snapshot of the whole list.
206 0 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
207 0 : if let Ok(tli) = global_timelines.get(ttid) {
208 0 : vec![tli]
209 : } else {
210 0 : vec![]
211 : }
212 : } else {
213 : // Otherwise, take a snapshot of the whole list.
214 0 : global_timelines.get_all()
215 : };
216 :
217 0 : let mut timelines = Vec::new();
218 0 : let runtime = Arc::new(
219 0 : tokio::runtime::Builder::new_current_thread()
220 0 : .build()
221 0 : .unwrap(),
222 0 : );
223 0 : for tli in ptrs_snapshot {
224 0 : let ttid = tli.ttid;
225 0 : if let Some(tenant_id) = args.tenant_id {
226 0 : if tenant_id != ttid.tenant_id {
227 0 : continue;
228 0 : }
229 0 : }
230 0 : if let Some(timeline_id) = args.timeline_id {
231 0 : if timeline_id != ttid.timeline_id {
232 0 : continue;
233 0 : }
234 0 : }
235 :
236 0 : timelines.push(TimelineDumpSer {
237 0 : tli,
238 0 : args: args.clone(),
239 0 : timeline_dir: get_timeline_dir(&config, &ttid),
240 0 : runtime: runtime.clone(),
241 0 : });
242 : }
243 :
244 : // Tokio forbids to drop runtime in async context, so this is a stupid way
245 : // to drop it in non async context.
246 0 : tokio::task::spawn_blocking(move || {
247 0 : let _r = runtime;
248 0 : })
249 0 : .await?;
250 :
251 0 : Ok(Response {
252 0 : start_time,
253 0 : finish_time: Utc::now(),
254 0 : timelines,
255 0 : timelines_count,
256 0 : config: build_config(config),
257 0 : })
258 0 : }
259 :
260 : /// Builds DiskContent from a directory path. It can fail if the directory
261 : /// is deleted between the time we get the path and the time we try to open it.
262 0 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
263 0 : let mut files = Vec::new();
264 0 : for entry in fs::read_dir(path)? {
265 0 : if entry.is_err() {
266 0 : continue;
267 0 : }
268 0 : let file = build_file_info(entry?);
269 0 : if file.is_err() {
270 0 : continue;
271 0 : }
272 0 : files.push(file?);
273 : }
274 :
275 0 : Ok(DiskContent { files })
276 0 : }
277 :
278 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
279 : /// if the file is deleted between the time we get the DirEntry
280 : /// and the time we try to open it.
281 0 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
282 0 : let metadata = entry.metadata()?;
283 0 : let path = entry.path();
284 0 : let name = path
285 0 : .file_name()
286 0 : .and_then(|x| x.to_str())
287 0 : .unwrap_or("")
288 0 : .to_owned();
289 0 : let mut file = fs::File::open(path)?;
290 0 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
291 0 :
292 0 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
293 0 : let mut end_zeroes = 0;
294 0 : for b in reader {
295 0 : if b == 0 {
296 0 : end_zeroes += 1;
297 0 : } else {
298 0 : end_zeroes = 0;
299 0 : }
300 : }
301 :
302 : Ok(FileInfo {
303 0 : name,
304 0 : size: metadata.len(),
305 0 : created: DateTime::from(metadata.created()?),
306 0 : modified: DateTime::from(metadata.modified()?),
307 0 : start_zeroes,
308 0 : end_zeroes,
309 : })
310 0 : }
311 :
312 : /// Get highest modified time of WAL segments in the directory.
313 0 : fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
314 0 : let mut res = None;
315 0 : for entry in fs::read_dir(path)? {
316 0 : if entry.is_err() {
317 0 : continue;
318 0 : }
319 0 : let entry = entry?;
320 : /* Ignore files that are not XLOG segments */
321 0 : let fname = entry.file_name();
322 0 : if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
323 0 : continue;
324 0 : }
325 :
326 0 : let metadata = entry.metadata()?;
327 0 : let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
328 0 : res = std::cmp::max(res, Some(modified));
329 : }
330 0 : Ok(res)
331 0 : }
332 :
333 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
334 : /// supposed to be exposed.
335 0 : fn build_config(config: Arc<SafeKeeperConf>) -> Config {
336 0 : Config {
337 0 : id: config.my_id,
338 0 : workdir: config.workdir.clone().into(),
339 0 : listen_pg_addr: config.listen_pg_addr.clone(),
340 0 : listen_http_addr: config.listen_http_addr.clone(),
341 0 : no_sync: config.no_sync,
342 0 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
343 0 : wal_backup_enabled: config.wal_backup_enabled,
344 0 : }
345 0 : }
346 :
347 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
348 : pub struct TimelineDigestRequest {
349 : pub from_lsn: Lsn,
350 : pub until_lsn: Lsn,
351 : }
352 :
353 0 : #[derive(Debug, Serialize, Deserialize)]
354 : pub struct TimelineDigest {
355 : pub sha256: String,
356 : }
357 :
358 0 : pub async fn calculate_digest(
359 0 : tli: &WalResidentTimeline,
360 0 : request: TimelineDigestRequest,
361 0 : ) -> Result<TimelineDigest> {
362 0 : if request.from_lsn > request.until_lsn {
363 0 : bail!("from_lsn is greater than until_lsn");
364 0 : }
365 :
366 0 : let (_, persisted_state) = tli.get_state().await;
367 0 : if persisted_state.timeline_start_lsn > request.from_lsn {
368 0 : bail!("requested LSN is before the start of the timeline");
369 0 : }
370 :
371 0 : let mut wal_reader = tli.get_walreader(request.from_lsn).await?;
372 :
373 0 : let mut hasher = Sha256::new();
374 0 : let mut buf = vec![0u8; MAX_SEND_SIZE];
375 0 :
376 0 : let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
377 0 : while bytes_left > 0 {
378 0 : let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
379 0 : let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
380 0 : if bytes_read == 0 {
381 0 : bail!("wal_reader.read returned 0 bytes");
382 0 : }
383 0 : hasher.update(&buf[..bytes_read]);
384 0 : bytes_left -= bytes_read;
385 : }
386 :
387 0 : let digest = hasher.finalize();
388 0 : let digest = hex::encode(digest);
389 0 : Ok(TimelineDigest { sha256: digest })
390 0 : }
|