Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::BufReader;
6 : use std::io::Read;
7 : use std::path::PathBuf;
8 : use std::sync::Arc;
9 :
10 : use anyhow::bail;
11 : use anyhow::Result;
12 : use camino::Utf8Path;
13 : use camino::Utf8PathBuf;
14 : use chrono::{DateTime, Utc};
15 : use postgres_ffi::XLogSegNo;
16 : use postgres_ffi::MAX_SEND_SIZE;
17 : use serde::Deserialize;
18 : use serde::Serialize;
19 :
20 : use sha2::{Digest, Sha256};
21 : use utils::id::NodeId;
22 : use utils::id::TenantTimelineId;
23 : use utils::id::{TenantId, TimelineId};
24 : use utils::lsn::Lsn;
25 :
26 : use crate::safekeeper::TermHistory;
27 : use crate::send_wal::WalSenderState;
28 : use crate::state::TimelineMemState;
29 : use crate::state::TimelinePersistentState;
30 : use crate::timeline::get_timeline_dir;
31 : use crate::timeline::WalResidentTimeline;
32 : use crate::timeline_manager;
33 : use crate::GlobalTimelines;
34 : use crate::SafeKeeperConf;
35 :
36 : /// Various filters that influence the resulting JSON output.
37 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
38 : pub struct Args {
39 : /// Dump all available safekeeper state. False by default.
40 : pub dump_all: bool,
41 :
42 : /// Dump control_file content. Uses value of `dump_all` by default.
43 : pub dump_control_file: bool,
44 :
45 : /// Dump in-memory state. Uses value of `dump_all` by default.
46 : pub dump_memory: bool,
47 :
48 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
49 : pub dump_disk_content: bool,
50 :
51 : /// Dump full term history. True by default.
52 : pub dump_term_history: bool,
53 :
54 : /// Filter timelines by tenant_id.
55 : pub tenant_id: Option<TenantId>,
56 :
57 : /// Filter timelines by timeline_id.
58 : pub timeline_id: Option<TimelineId>,
59 : }
60 :
61 : /// Response for debug dump request.
62 : #[derive(Debug, Serialize)]
63 : pub struct Response {
64 : pub start_time: DateTime<Utc>,
65 : pub finish_time: DateTime<Utc>,
66 : pub timelines: Vec<TimelineDumpSer>,
67 : pub timelines_count: usize,
68 : pub config: Config,
69 : }
70 :
71 : pub struct TimelineDumpSer {
72 : pub tli: Arc<crate::timeline::Timeline>,
73 : pub args: Args,
74 : pub timeline_dir: Utf8PathBuf,
75 : pub runtime: Arc<tokio::runtime::Runtime>,
76 : }
77 :
78 : impl std::fmt::Debug for TimelineDumpSer {
79 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 0 : f.debug_struct("TimelineDumpSer")
81 0 : .field("tli", &self.tli.ttid)
82 0 : .field("args", &self.args)
83 0 : .finish()
84 0 : }
85 : }
86 :
87 : impl Serialize for TimelineDumpSer {
88 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
89 0 : where
90 0 : S: serde::Serializer,
91 0 : {
92 0 : let dump = self.runtime.block_on(build_from_tli_dump(
93 0 : &self.tli,
94 0 : &self.args,
95 0 : &self.timeline_dir,
96 0 : ));
97 0 : dump.serialize(serializer)
98 0 : }
99 : }
100 :
101 0 : async fn build_from_tli_dump(
102 0 : timeline: &Arc<crate::timeline::Timeline>,
103 0 : args: &Args,
104 0 : timeline_dir: &Utf8Path,
105 0 : ) -> Timeline {
106 0 : let control_file = if args.dump_control_file {
107 0 : let mut state = timeline.get_state().await.1;
108 0 : if !args.dump_term_history {
109 0 : state.acceptor_state.term_history = TermHistory(vec![]);
110 0 : }
111 0 : Some(state)
112 : } else {
113 0 : None
114 : };
115 :
116 0 : let memory = if args.dump_memory {
117 0 : Some(timeline.memory_dump().await)
118 : } else {
119 0 : None
120 : };
121 :
122 0 : let disk_content = if args.dump_disk_content {
123 : // build_disk_content can fail, but we don't want to fail the whole
124 : // request because of that.
125 : // Note: timeline can be in offloaded state, this is not a problem.
126 0 : build_disk_content(timeline_dir).ok()
127 : } else {
128 0 : None
129 : };
130 :
131 0 : Timeline {
132 0 : tenant_id: timeline.ttid.tenant_id,
133 0 : timeline_id: timeline.ttid.timeline_id,
134 0 : control_file,
135 0 : memory,
136 0 : disk_content,
137 0 : }
138 0 : }
139 :
140 : /// Safekeeper configuration.
141 0 : #[derive(Debug, Serialize, Deserialize)]
142 : pub struct Config {
143 : pub id: NodeId,
144 : pub workdir: PathBuf,
145 : pub listen_pg_addr: String,
146 : pub listen_http_addr: String,
147 : pub no_sync: bool,
148 : pub max_offloader_lag_bytes: u64,
149 : pub wal_backup_enabled: bool,
150 : }
151 :
152 0 : #[derive(Debug, Serialize, Deserialize)]
153 : pub struct Timeline {
154 : pub tenant_id: TenantId,
155 : pub timeline_id: TimelineId,
156 : pub control_file: Option<TimelinePersistentState>,
157 : pub memory: Option<Memory>,
158 : pub disk_content: Option<DiskContent>,
159 : }
160 :
161 0 : #[derive(Debug, Serialize, Deserialize)]
162 : pub struct Memory {
163 : pub is_cancelled: bool,
164 : pub peers_info_len: usize,
165 : pub walsenders: Vec<WalSenderState>,
166 : pub wal_backup_active: bool,
167 : pub active: bool,
168 : pub num_computes: u32,
169 : pub last_removed_segno: XLogSegNo,
170 : pub epoch_start_lsn: Lsn,
171 : pub mem_state: TimelineMemState,
172 : pub mgr_status: timeline_manager::Status,
173 :
174 : // PhysicalStorage state.
175 : pub write_lsn: Lsn,
176 : pub write_record_lsn: Lsn,
177 : pub flush_lsn: Lsn,
178 : pub file_open: bool,
179 : }
180 :
181 0 : #[derive(Debug, Serialize, Deserialize)]
182 : pub struct DiskContent {
183 : pub files: Vec<FileInfo>,
184 : }
185 :
186 0 : #[derive(Debug, Serialize, Deserialize)]
187 : pub struct FileInfo {
188 : pub name: String,
189 : pub size: u64,
190 : pub created: DateTime<Utc>,
191 : pub modified: DateTime<Utc>,
192 : pub start_zeroes: u64,
193 : pub end_zeroes: u64,
194 : // TODO: add sha256 checksum
195 : }
196 :
197 : /// Build debug dump response, using the provided [`Args`] filters.
198 0 : pub async fn build(args: Args) -> Result<Response> {
199 0 : let start_time = Utc::now();
200 0 : let timelines_count = GlobalTimelines::timelines_count();
201 0 : let config = GlobalTimelines::get_global_config();
202 :
203 0 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
204 : // If both tenant_id and timeline_id are specified, we can just get the
205 : // timeline directly, without taking a snapshot of the whole list.
206 0 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
207 0 : if let Ok(tli) = GlobalTimelines::get(ttid) {
208 0 : vec![tli]
209 : } else {
210 0 : vec![]
211 : }
212 : } else {
213 : // Otherwise, take a snapshot of the whole list.
214 0 : GlobalTimelines::get_all()
215 : };
216 :
217 0 : let mut timelines = Vec::new();
218 0 : let runtime = Arc::new(
219 0 : tokio::runtime::Builder::new_current_thread()
220 0 : .build()
221 0 : .unwrap(),
222 0 : );
223 0 : for tli in ptrs_snapshot {
224 0 : let ttid = tli.ttid;
225 0 : if let Some(tenant_id) = args.tenant_id {
226 0 : if tenant_id != ttid.tenant_id {
227 0 : continue;
228 0 : }
229 0 : }
230 0 : if let Some(timeline_id) = args.timeline_id {
231 0 : if timeline_id != ttid.timeline_id {
232 0 : continue;
233 0 : }
234 0 : }
235 :
236 0 : timelines.push(TimelineDumpSer {
237 0 : tli,
238 0 : args: args.clone(),
239 0 : timeline_dir: get_timeline_dir(&config, &ttid),
240 0 : runtime: runtime.clone(),
241 0 : });
242 : }
243 :
244 0 : Ok(Response {
245 0 : start_time,
246 0 : finish_time: Utc::now(),
247 0 : timelines,
248 0 : timelines_count,
249 0 : config: build_config(config),
250 0 : })
251 0 : }
252 :
253 : /// Builds DiskContent from a directory path. It can fail if the directory
254 : /// is deleted between the time we get the path and the time we try to open it.
255 0 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
256 0 : let mut files = Vec::new();
257 0 : for entry in fs::read_dir(path)? {
258 0 : if entry.is_err() {
259 0 : continue;
260 0 : }
261 0 : let file = build_file_info(entry?);
262 0 : if file.is_err() {
263 0 : continue;
264 0 : }
265 0 : files.push(file?);
266 : }
267 :
268 0 : Ok(DiskContent { files })
269 0 : }
270 :
271 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
272 : /// if the file is deleted between the time we get the DirEntry
273 : /// and the time we try to open it.
274 0 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
275 0 : let metadata = entry.metadata()?;
276 0 : let path = entry.path();
277 0 : let name = path
278 0 : .file_name()
279 0 : .and_then(|x| x.to_str())
280 0 : .unwrap_or("")
281 0 : .to_owned();
282 0 : let mut file = fs::File::open(path)?;
283 0 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
284 0 :
285 0 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
286 0 : let mut end_zeroes = 0;
287 0 : for b in reader {
288 0 : if b == 0 {
289 0 : end_zeroes += 1;
290 0 : } else {
291 0 : end_zeroes = 0;
292 0 : }
293 : }
294 :
295 : Ok(FileInfo {
296 0 : name,
297 0 : size: metadata.len(),
298 0 : created: DateTime::from(metadata.created()?),
299 0 : modified: DateTime::from(metadata.modified()?),
300 0 : start_zeroes,
301 0 : end_zeroes,
302 : })
303 0 : }
304 :
305 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
306 : /// supposed to be exposed.
307 0 : fn build_config(config: SafeKeeperConf) -> Config {
308 0 : Config {
309 0 : id: config.my_id,
310 0 : workdir: config.workdir.into(),
311 0 : listen_pg_addr: config.listen_pg_addr,
312 0 : listen_http_addr: config.listen_http_addr,
313 0 : no_sync: config.no_sync,
314 0 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
315 0 : wal_backup_enabled: config.wal_backup_enabled,
316 0 : }
317 0 : }
318 :
319 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
320 : pub struct TimelineDigestRequest {
321 : pub from_lsn: Lsn,
322 : pub until_lsn: Lsn,
323 : }
324 :
325 0 : #[derive(Debug, Serialize, Deserialize)]
326 : pub struct TimelineDigest {
327 : pub sha256: String,
328 : }
329 :
330 0 : pub async fn calculate_digest(
331 0 : tli: &WalResidentTimeline,
332 0 : request: TimelineDigestRequest,
333 0 : ) -> Result<TimelineDigest> {
334 0 : if request.from_lsn > request.until_lsn {
335 0 : bail!("from_lsn is greater than until_lsn");
336 0 : }
337 :
338 0 : let (_, persisted_state) = tli.get_state().await;
339 0 : if persisted_state.timeline_start_lsn > request.from_lsn {
340 0 : bail!("requested LSN is before the start of the timeline");
341 0 : }
342 :
343 0 : let mut wal_reader = tli.get_walreader(request.from_lsn).await?;
344 :
345 0 : let mut hasher = Sha256::new();
346 0 : let mut buf = [0u8; MAX_SEND_SIZE];
347 0 :
348 0 : let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
349 0 : while bytes_left > 0 {
350 0 : let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
351 0 : let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
352 0 : if bytes_read == 0 {
353 0 : bail!("wal_reader.read returned 0 bytes");
354 0 : }
355 0 : hasher.update(&buf[..bytes_read]);
356 0 : bytes_left -= bytes_read;
357 : }
358 :
359 0 : let digest = hasher.finalize();
360 0 : let digest = hex::encode(digest);
361 0 : Ok(TimelineDigest { sha256: digest })
362 0 : }
|