Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::BufReader;
6 : use std::io::Read;
7 : use std::path::PathBuf;
8 : use std::sync::Arc;
9 :
10 : use anyhow::bail;
11 : use anyhow::Result;
12 : use camino::Utf8Path;
13 : use chrono::{DateTime, Utc};
14 : use postgres_ffi::XLogSegNo;
15 : use postgres_ffi::MAX_SEND_SIZE;
16 : use serde::Deserialize;
17 : use serde::Serialize;
18 :
19 : use sha2::{Digest, Sha256};
20 : use utils::id::NodeId;
21 : use utils::id::TenantTimelineId;
22 : use utils::id::{TenantId, TimelineId};
23 : use utils::lsn::Lsn;
24 :
25 : use crate::safekeeper::TermHistory;
26 : use crate::send_wal::WalSenderState;
27 : use crate::state::TimelineMemState;
28 : use crate::state::TimelinePersistentState;
29 : use crate::wal_storage::WalReader;
30 : use crate::GlobalTimelines;
31 : use crate::SafeKeeperConf;
32 :
33 : /// Various filters that influence the resulting JSON output.
34 12 : #[derive(Debug, Serialize, Deserialize, Clone)]
35 : pub struct Args {
36 : /// Dump all available safekeeper state. False by default.
37 : pub dump_all: bool,
38 :
39 : /// Dump control_file content. Uses value of `dump_all` by default.
40 : pub dump_control_file: bool,
41 :
42 : /// Dump in-memory state. Uses value of `dump_all` by default.
43 : pub dump_memory: bool,
44 :
45 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
46 : pub dump_disk_content: bool,
47 :
48 : /// Dump full term history. True by default.
49 : pub dump_term_history: bool,
50 :
51 : /// Filter timelines by tenant_id.
52 : pub tenant_id: Option<TenantId>,
53 :
54 : /// Filter timelines by timeline_id.
55 : pub timeline_id: Option<TimelineId>,
56 : }
57 :
58 : /// Response for debug dump request.
59 6 : #[derive(Debug, Serialize)]
60 : pub struct Response {
61 : pub start_time: DateTime<Utc>,
62 : pub finish_time: DateTime<Utc>,
63 : pub timelines: Vec<TimelineDumpSer>,
64 : pub timelines_count: usize,
65 : pub config: Config,
66 : }
67 :
68 : pub struct TimelineDumpSer {
69 : pub tli: Arc<crate::timeline::Timeline>,
70 : pub args: Args,
71 : pub runtime: Arc<tokio::runtime::Runtime>,
72 : }
73 :
74 : impl std::fmt::Debug for TimelineDumpSer {
75 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 0 : f.debug_struct("TimelineDumpSer")
77 0 : .field("tli", &self.tli.ttid)
78 0 : .field("args", &self.args)
79 0 : .finish()
80 0 : }
81 : }
82 :
83 : impl Serialize for TimelineDumpSer {
84 6 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
85 6 : where
86 6 : S: serde::Serializer,
87 6 : {
88 6 : let dump = self
89 6 : .runtime
90 6 : .block_on(build_from_tli_dump(self.tli.clone(), self.args.clone()));
91 6 : dump.serialize(serializer)
92 6 : }
93 : }
94 :
95 6 : async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Args) -> Timeline {
96 6 : let control_file = if args.dump_control_file {
97 6 : let mut state = timeline.get_state().await.1;
98 6 : if !args.dump_term_history {
99 0 : state.acceptor_state.term_history = TermHistory(vec![]);
100 6 : }
101 6 : Some(state)
102 : } else {
103 0 : None
104 : };
105 :
106 6 : let memory = if args.dump_memory {
107 5 : Some(timeline.memory_dump().await)
108 : } else {
109 1 : None
110 : };
111 :
112 6 : let disk_content = if args.dump_disk_content {
113 : // build_disk_content can fail, but we don't want to fail the whole
114 : // request because of that.
115 5 : build_disk_content(&timeline.timeline_dir).ok()
116 : } else {
117 1 : None
118 : };
119 :
120 6 : Timeline {
121 6 : tenant_id: timeline.ttid.tenant_id,
122 6 : timeline_id: timeline.ttid.timeline_id,
123 6 : control_file,
124 6 : memory,
125 6 : disk_content,
126 6 : }
127 6 : }
128 :
129 : /// Safekeeper configuration.
130 15 : #[derive(Debug, Serialize, Deserialize)]
131 : pub struct Config {
132 : pub id: NodeId,
133 : pub workdir: PathBuf,
134 : pub listen_pg_addr: String,
135 : pub listen_http_addr: String,
136 : pub no_sync: bool,
137 : pub max_offloader_lag_bytes: u64,
138 : pub wal_backup_enabled: bool,
139 : }
140 :
141 11 : #[derive(Debug, Serialize, Deserialize)]
142 : pub struct Timeline {
143 : pub tenant_id: TenantId,
144 : pub timeline_id: TimelineId,
145 : pub control_file: Option<TimelinePersistentState>,
146 : pub memory: Option<Memory>,
147 : pub disk_content: Option<DiskContent>,
148 : }
149 :
150 27 : #[derive(Debug, Serialize, Deserialize)]
151 : pub struct Memory {
152 : pub is_cancelled: bool,
153 : pub peers_info_len: usize,
154 : pub walsenders: Vec<WalSenderState>,
155 : pub wal_backup_active: bool,
156 : pub active: bool,
157 : pub num_computes: u32,
158 : pub last_removed_segno: XLogSegNo,
159 : pub epoch_start_lsn: Lsn,
160 : pub mem_state: TimelineMemState,
161 :
162 : // PhysicalStorage state.
163 : pub write_lsn: Lsn,
164 : pub write_record_lsn: Lsn,
165 : pub flush_lsn: Lsn,
166 : pub file_open: bool,
167 : }
168 :
169 5 : #[derive(Debug, Serialize, Deserialize)]
170 : pub struct DiskContent {
171 : pub files: Vec<FileInfo>,
172 : }
173 :
174 39 : #[derive(Debug, Serialize, Deserialize)]
175 : pub struct FileInfo {
176 : pub name: String,
177 : pub size: u64,
178 : pub created: DateTime<Utc>,
179 : pub modified: DateTime<Utc>,
180 : pub start_zeroes: u64,
181 : pub end_zeroes: u64,
182 : // TODO: add sha256 checksum
183 : }
184 :
185 : /// Build debug dump response, using the provided [`Args`] filters.
186 6 : pub async fn build(args: Args) -> Result<Response> {
187 6 : let start_time = Utc::now();
188 6 : let timelines_count = GlobalTimelines::timelines_count();
189 :
190 6 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
191 : // If both tenant_id and timeline_id are specified, we can just get the
192 : // timeline directly, without taking a snapshot of the whole list.
193 1 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
194 1 : if let Ok(tli) = GlobalTimelines::get(ttid) {
195 1 : vec![tli]
196 : } else {
197 0 : vec![]
198 : }
199 : } else {
200 : // Otherwise, take a snapshot of the whole list.
201 5 : GlobalTimelines::get_all()
202 : };
203 :
204 6 : let mut timelines = Vec::new();
205 6 : let runtime = Arc::new(
206 6 : tokio::runtime::Builder::new_current_thread()
207 6 : .build()
208 6 : .unwrap(),
209 6 : );
210 12 : for tli in ptrs_snapshot {
211 6 : let ttid = tli.ttid;
212 6 : if let Some(tenant_id) = args.tenant_id {
213 1 : if tenant_id != ttid.tenant_id {
214 0 : continue;
215 1 : }
216 5 : }
217 6 : if let Some(timeline_id) = args.timeline_id {
218 2 : if timeline_id != ttid.timeline_id {
219 0 : continue;
220 2 : }
221 4 : }
222 :
223 6 : timelines.push(TimelineDumpSer {
224 6 : tli,
225 6 : args: args.clone(),
226 6 : runtime: runtime.clone(),
227 6 : });
228 : }
229 :
230 6 : let config = GlobalTimelines::get_global_config();
231 6 :
232 6 : Ok(Response {
233 6 : start_time,
234 6 : finish_time: Utc::now(),
235 6 : timelines,
236 6 : timelines_count,
237 6 : config: build_config(config),
238 6 : })
239 6 : }
240 :
241 : /// Builds DiskContent from a directory path. It can fail if the directory
242 : /// is deleted between the time we get the path and the time we try to open it.
243 5 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
244 5 : let mut files = Vec::new();
245 11 : for entry in fs::read_dir(path)? {
246 11 : if entry.is_err() {
247 0 : continue;
248 11 : }
249 11 : let file = build_file_info(entry?);
250 11 : if file.is_err() {
251 0 : continue;
252 11 : }
253 11 : files.push(file?);
254 : }
255 :
256 5 : Ok(DiskContent { files })
257 5 : }
258 :
259 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
260 : /// if the file is deleted between the time we get the DirEntry
261 : /// and the time we try to open it.
262 11 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
263 11 : let metadata = entry.metadata()?;
264 11 : let path = entry.path();
265 11 : let name = path
266 11 : .file_name()
267 11 : .and_then(|x| x.to_str())
268 11 : .unwrap_or("")
269 11 : .to_owned();
270 11 : let mut file = fs::File::open(path)?;
271 100664508 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
272 11 :
273 34533291 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
274 11 : let mut end_zeroes = 0;
275 66131228 : for b in reader {
276 66131217 : if b == 0 {
277 58078187 : end_zeroes += 1;
278 58078187 : } else {
279 8053030 : end_zeroes = 0;
280 8053030 : }
281 : }
282 :
283 : Ok(FileInfo {
284 11 : name,
285 11 : size: metadata.len(),
286 11 : created: DateTime::from(metadata.created()?),
287 11 : modified: DateTime::from(metadata.modified()?),
288 11 : start_zeroes,
289 11 : end_zeroes,
290 : })
291 11 : }
292 :
293 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
294 : /// supposed to be exposed.
295 6 : fn build_config(config: SafeKeeperConf) -> Config {
296 6 : Config {
297 6 : id: config.my_id,
298 6 : workdir: config.workdir.into(),
299 6 : listen_pg_addr: config.listen_pg_addr,
300 6 : listen_http_addr: config.listen_http_addr,
301 6 : no_sync: config.no_sync,
302 6 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
303 6 : wal_backup_enabled: config.wal_backup_enabled,
304 6 : }
305 6 : }
306 :
307 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
308 : pub struct TimelineDigestRequest {
309 : pub from_lsn: Lsn,
310 : pub until_lsn: Lsn,
311 : }
312 :
313 64 : #[derive(Debug, Serialize, Deserialize)]
314 : pub struct TimelineDigest {
315 : pub sha256: String,
316 : }
317 :
318 64 : pub async fn calculate_digest(
319 64 : tli: &Arc<crate::timeline::Timeline>,
320 64 : request: TimelineDigestRequest,
321 64 : ) -> Result<TimelineDigest> {
322 64 : if request.from_lsn > request.until_lsn {
323 0 : bail!("from_lsn is greater than until_lsn");
324 64 : }
325 64 :
326 64 : let conf = GlobalTimelines::get_global_config();
327 64 : let (_, persisted_state) = tli.get_state().await;
328 :
329 64 : if persisted_state.timeline_start_lsn > request.from_lsn {
330 0 : bail!("requested LSN is before the start of the timeline");
331 64 : }
332 :
333 64 : let mut wal_reader = WalReader::new(
334 64 : conf.workdir.clone(),
335 64 : tli.timeline_dir.clone(),
336 64 : &persisted_state,
337 64 : request.from_lsn,
338 64 : true,
339 64 : )?;
340 :
341 64 : let mut hasher = Sha256::new();
342 64 : let mut buf = [0u8; MAX_SEND_SIZE];
343 64 :
344 64 : let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
345 6820 : while bytes_left > 0 {
346 6756 : let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
347 6756 : let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
348 6756 : if bytes_read == 0 {
349 0 : bail!("wal_reader.read returned 0 bytes");
350 6756 : }
351 6756 : hasher.update(&buf[..bytes_read]);
352 6756 : bytes_left -= bytes_read;
353 : }
354 :
355 64 : let digest = hasher.finalize();
356 64 : let digest = hex::encode(digest);
357 64 : Ok(TimelineDigest { sha256: digest })
358 64 : }
|