Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::BufReader;
6 : use std::io::Read;
7 : use std::path::PathBuf;
8 : use std::sync::Arc;
9 :
10 : use anyhow::bail;
11 : use anyhow::Result;
12 : use camino::Utf8Path;
13 : use camino::Utf8PathBuf;
14 : use chrono::{DateTime, Utc};
15 : use postgres_ffi::XLogSegNo;
16 : use postgres_ffi::MAX_SEND_SIZE;
17 : use serde::Deserialize;
18 : use serde::Serialize;
19 :
20 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
21 : use sha2::{Digest, Sha256};
22 : use utils::id::NodeId;
23 : use utils::id::TenantTimelineId;
24 : use utils::id::{TenantId, TimelineId};
25 : use utils::lsn::Lsn;
26 :
27 : use crate::safekeeper::TermHistory;
28 : use crate::send_wal::WalSenderState;
29 : use crate::state::TimelineMemState;
30 : use crate::state::TimelinePersistentState;
31 : use crate::timeline::get_timeline_dir;
32 : use crate::timeline::WalResidentTimeline;
33 : use crate::timeline_manager;
34 : use crate::GlobalTimelines;
35 : use crate::SafeKeeperConf;
36 :
37 : /// Various filters that influence the resulting JSON output.
38 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
39 : pub struct Args {
40 : /// Dump all available safekeeper state. False by default.
41 : pub dump_all: bool,
42 :
43 : /// Dump control_file content. Uses value of `dump_all` by default.
44 : pub dump_control_file: bool,
45 :
46 : /// Dump in-memory state. Uses value of `dump_all` by default.
47 : pub dump_memory: bool,
48 :
49 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
50 : pub dump_disk_content: bool,
51 :
52 : /// Dump full term history. True by default.
53 : pub dump_term_history: bool,
54 :
55 : /// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
56 : pub dump_wal_last_modified: bool,
57 :
58 : /// Filter timelines by tenant_id.
59 : pub tenant_id: Option<TenantId>,
60 :
61 : /// Filter timelines by timeline_id.
62 : pub timeline_id: Option<TimelineId>,
63 : }
64 :
65 : /// Response for debug dump request.
66 : #[derive(Debug, Serialize)]
67 : pub struct Response {
68 : pub start_time: DateTime<Utc>,
69 : pub finish_time: DateTime<Utc>,
70 : pub timelines: Vec<TimelineDumpSer>,
71 : pub timelines_count: usize,
72 : pub config: Config,
73 : }
74 :
75 : pub struct TimelineDumpSer {
76 : pub tli: Arc<crate::timeline::Timeline>,
77 : pub args: Args,
78 : pub timeline_dir: Utf8PathBuf,
79 : pub runtime: Arc<tokio::runtime::Runtime>,
80 : }
81 :
82 : impl std::fmt::Debug for TimelineDumpSer {
83 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 0 : f.debug_struct("TimelineDumpSer")
85 0 : .field("tli", &self.tli.ttid)
86 0 : .field("args", &self.args)
87 0 : .finish()
88 0 : }
89 : }
90 :
91 : impl Serialize for TimelineDumpSer {
92 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
93 0 : where
94 0 : S: serde::Serializer,
95 0 : {
96 0 : let dump = self.runtime.block_on(build_from_tli_dump(
97 0 : &self.tli,
98 0 : &self.args,
99 0 : &self.timeline_dir,
100 0 : ));
101 0 : dump.serialize(serializer)
102 0 : }
103 : }
104 :
105 0 : async fn build_from_tli_dump(
106 0 : timeline: &Arc<crate::timeline::Timeline>,
107 0 : args: &Args,
108 0 : timeline_dir: &Utf8Path,
109 0 : ) -> Timeline {
110 0 : let control_file = if args.dump_control_file {
111 0 : let mut state = timeline.get_state().await.1;
112 0 : if !args.dump_term_history {
113 0 : state.acceptor_state.term_history = TermHistory(vec![]);
114 0 : }
115 0 : Some(state)
116 : } else {
117 0 : None
118 : };
119 :
120 0 : let memory = if args.dump_memory {
121 0 : Some(timeline.memory_dump().await)
122 : } else {
123 0 : None
124 : };
125 :
126 0 : let disk_content = if args.dump_disk_content {
127 : // build_disk_content can fail, but we don't want to fail the whole
128 : // request because of that.
129 : // Note: timeline can be in offloaded state, this is not a problem.
130 0 : build_disk_content(timeline_dir).ok()
131 : } else {
132 0 : None
133 : };
134 :
135 0 : let wal_last_modified = if args.dump_wal_last_modified {
136 0 : get_wal_last_modified(timeline_dir).ok().flatten()
137 : } else {
138 0 : None
139 : };
140 :
141 0 : Timeline {
142 0 : tenant_id: timeline.ttid.tenant_id,
143 0 : timeline_id: timeline.ttid.timeline_id,
144 0 : control_file,
145 0 : memory,
146 0 : disk_content,
147 0 : wal_last_modified,
148 0 : }
149 0 : }
150 :
151 : /// Safekeeper configuration.
152 0 : #[derive(Debug, Serialize, Deserialize)]
153 : pub struct Config {
154 : pub id: NodeId,
155 : pub workdir: PathBuf,
156 : pub listen_pg_addr: String,
157 : pub listen_http_addr: String,
158 : pub no_sync: bool,
159 : pub max_offloader_lag_bytes: u64,
160 : pub wal_backup_enabled: bool,
161 : }
162 :
163 0 : #[derive(Debug, Serialize, Deserialize)]
164 : pub struct Timeline {
165 : pub tenant_id: TenantId,
166 : pub timeline_id: TimelineId,
167 : pub control_file: Option<TimelinePersistentState>,
168 : pub memory: Option<Memory>,
169 : pub disk_content: Option<DiskContent>,
170 : pub wal_last_modified: Option<DateTime<Utc>>,
171 : }
172 :
173 0 : #[derive(Debug, Serialize, Deserialize)]
174 : pub struct Memory {
175 : pub is_cancelled: bool,
176 : pub peers_info_len: usize,
177 : pub walsenders: Vec<WalSenderState>,
178 : pub wal_backup_active: bool,
179 : pub active: bool,
180 : pub num_computes: u32,
181 : pub last_removed_segno: XLogSegNo,
182 : pub epoch_start_lsn: Lsn,
183 : pub mem_state: TimelineMemState,
184 : pub mgr_status: timeline_manager::Status,
185 :
186 : // PhysicalStorage state.
187 : pub write_lsn: Lsn,
188 : pub write_record_lsn: Lsn,
189 : pub flush_lsn: Lsn,
190 : pub file_open: bool,
191 : }
192 :
193 0 : #[derive(Debug, Serialize, Deserialize)]
194 : pub struct DiskContent {
195 : pub files: Vec<FileInfo>,
196 : }
197 :
198 0 : #[derive(Debug, Serialize, Deserialize)]
199 : pub struct FileInfo {
200 : pub name: String,
201 : pub size: u64,
202 : pub created: DateTime<Utc>,
203 : pub modified: DateTime<Utc>,
204 : pub start_zeroes: u64,
205 : pub end_zeroes: u64,
206 : // TODO: add sha256 checksum
207 : }
208 :
209 : /// Build debug dump response, using the provided [`Args`] filters.
210 0 : pub async fn build(args: Args) -> Result<Response> {
211 0 : let start_time = Utc::now();
212 0 : let timelines_count = GlobalTimelines::timelines_count();
213 0 : let config = GlobalTimelines::get_global_config();
214 :
215 0 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
216 : // If both tenant_id and timeline_id are specified, we can just get the
217 : // timeline directly, without taking a snapshot of the whole list.
218 0 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
219 0 : if let Ok(tli) = GlobalTimelines::get(ttid) {
220 0 : vec![tli]
221 : } else {
222 0 : vec![]
223 : }
224 : } else {
225 : // Otherwise, take a snapshot of the whole list.
226 0 : GlobalTimelines::get_all()
227 : };
228 :
229 0 : let mut timelines = Vec::new();
230 0 : let runtime = Arc::new(
231 0 : tokio::runtime::Builder::new_current_thread()
232 0 : .build()
233 0 : .unwrap(),
234 0 : );
235 0 : for tli in ptrs_snapshot {
236 0 : let ttid = tli.ttid;
237 0 : if let Some(tenant_id) = args.tenant_id {
238 0 : if tenant_id != ttid.tenant_id {
239 0 : continue;
240 0 : }
241 0 : }
242 0 : if let Some(timeline_id) = args.timeline_id {
243 0 : if timeline_id != ttid.timeline_id {
244 0 : continue;
245 0 : }
246 0 : }
247 :
248 0 : timelines.push(TimelineDumpSer {
249 0 : tli,
250 0 : args: args.clone(),
251 0 : timeline_dir: get_timeline_dir(&config, &ttid),
252 0 : runtime: runtime.clone(),
253 0 : });
254 : }
255 :
256 0 : Ok(Response {
257 0 : start_time,
258 0 : finish_time: Utc::now(),
259 0 : timelines,
260 0 : timelines_count,
261 0 : config: build_config(config),
262 0 : })
263 0 : }
264 :
265 : /// Builds DiskContent from a directory path. It can fail if the directory
266 : /// is deleted between the time we get the path and the time we try to open it.
267 0 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
268 0 : let mut files = Vec::new();
269 0 : for entry in fs::read_dir(path)? {
270 0 : if entry.is_err() {
271 0 : continue;
272 0 : }
273 0 : let file = build_file_info(entry?);
274 0 : if file.is_err() {
275 0 : continue;
276 0 : }
277 0 : files.push(file?);
278 : }
279 :
280 0 : Ok(DiskContent { files })
281 0 : }
282 :
283 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
284 : /// if the file is deleted between the time we get the DirEntry
285 : /// and the time we try to open it.
286 0 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
287 0 : let metadata = entry.metadata()?;
288 0 : let path = entry.path();
289 0 : let name = path
290 0 : .file_name()
291 0 : .and_then(|x| x.to_str())
292 0 : .unwrap_or("")
293 0 : .to_owned();
294 0 : let mut file = fs::File::open(path)?;
295 0 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
296 0 :
297 0 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
298 0 : let mut end_zeroes = 0;
299 0 : for b in reader {
300 0 : if b == 0 {
301 0 : end_zeroes += 1;
302 0 : } else {
303 0 : end_zeroes = 0;
304 0 : }
305 : }
306 :
307 : Ok(FileInfo {
308 0 : name,
309 0 : size: metadata.len(),
310 0 : created: DateTime::from(metadata.created()?),
311 0 : modified: DateTime::from(metadata.modified()?),
312 0 : start_zeroes,
313 0 : end_zeroes,
314 : })
315 0 : }
316 :
317 : /// Get highest modified time of WAL segments in the directory.
318 0 : fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
319 0 : let mut res = None;
320 0 : for entry in fs::read_dir(path)? {
321 0 : if entry.is_err() {
322 0 : continue;
323 0 : }
324 0 : let entry = entry?;
325 : /* Ignore files that are not XLOG segments */
326 0 : let fname = entry.file_name();
327 0 : if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
328 0 : continue;
329 0 : }
330 :
331 0 : let metadata = entry.metadata()?;
332 0 : let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
333 0 : res = std::cmp::max(res, Some(modified));
334 : }
335 0 : Ok(res)
336 0 : }
337 :
338 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
339 : /// supposed to be exposed.
340 0 : fn build_config(config: SafeKeeperConf) -> Config {
341 0 : Config {
342 0 : id: config.my_id,
343 0 : workdir: config.workdir.into(),
344 0 : listen_pg_addr: config.listen_pg_addr,
345 0 : listen_http_addr: config.listen_http_addr,
346 0 : no_sync: config.no_sync,
347 0 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
348 0 : wal_backup_enabled: config.wal_backup_enabled,
349 0 : }
350 0 : }
351 :
352 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
353 : pub struct TimelineDigestRequest {
354 : pub from_lsn: Lsn,
355 : pub until_lsn: Lsn,
356 : }
357 :
358 0 : #[derive(Debug, Serialize, Deserialize)]
359 : pub struct TimelineDigest {
360 : pub sha256: String,
361 : }
362 :
363 0 : pub async fn calculate_digest(
364 0 : tli: &WalResidentTimeline,
365 0 : request: TimelineDigestRequest,
366 0 : ) -> Result<TimelineDigest> {
367 0 : if request.from_lsn > request.until_lsn {
368 0 : bail!("from_lsn is greater than until_lsn");
369 0 : }
370 :
371 0 : let (_, persisted_state) = tli.get_state().await;
372 0 : if persisted_state.timeline_start_lsn > request.from_lsn {
373 0 : bail!("requested LSN is before the start of the timeline");
374 0 : }
375 :
376 0 : let mut wal_reader = tli.get_walreader(request.from_lsn).await?;
377 :
378 0 : let mut hasher = Sha256::new();
379 0 : let mut buf = [0u8; MAX_SEND_SIZE];
380 0 :
381 0 : let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
382 0 : while bytes_left > 0 {
383 0 : let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
384 0 : let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
385 0 : if bytes_read == 0 {
386 0 : bail!("wal_reader.read returned 0 bytes");
387 0 : }
388 0 : hasher.update(&buf[..bytes_read]);
389 0 : bytes_left -= bytes_read;
390 : }
391 :
392 0 : let digest = hasher.finalize();
393 0 : let digest = hex::encode(digest);
394 0 : Ok(TimelineDigest { sha256: digest })
395 0 : }
|