TLA Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::BufReader;
6 : use std::io::Read;
7 : use std::path::PathBuf;
8 : use std::sync::Arc;
9 :
10 : use anyhow::bail;
11 : use anyhow::Result;
12 : use camino::Utf8Path;
13 : use chrono::{DateTime, Utc};
14 : use postgres_ffi::XLogSegNo;
15 : use postgres_ffi::MAX_SEND_SIZE;
16 : use serde::Deserialize;
17 : use serde::Serialize;
18 :
19 : use sha2::{Digest, Sha256};
20 : use utils::id::NodeId;
21 : use utils::id::TenantTimelineId;
22 : use utils::id::{TenantId, TimelineId};
23 : use utils::lsn::Lsn;
24 :
25 : use crate::safekeeper::SafeKeeperState;
26 : use crate::safekeeper::SafekeeperMemState;
27 : use crate::safekeeper::TermHistory;
28 : use crate::SafeKeeperConf;
29 :
30 : use crate::send_wal::WalSenderState;
31 : use crate::wal_storage::WalReader;
32 : use crate::GlobalTimelines;
33 :
34 : /// Various filters that influence the resulting JSON output.
35 CBC 10 : #[derive(Debug, Serialize, Deserialize, Clone)]
36 : pub struct Args {
37 : /// Dump all available safekeeper state. False by default.
38 : pub dump_all: bool,
39 :
40 : /// Dump control_file content. Uses value of `dump_all` by default.
41 : pub dump_control_file: bool,
42 :
43 : /// Dump in-memory state. Uses value of `dump_all` by default.
44 : pub dump_memory: bool,
45 :
46 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
47 : pub dump_disk_content: bool,
48 :
49 : /// Dump full term history. True by default.
50 : pub dump_term_history: bool,
51 :
52 : /// Filter timelines by tenant_id.
53 : pub tenant_id: Option<TenantId>,
54 :
55 : /// Filter timelines by timeline_id.
56 : pub timeline_id: Option<TimelineId>,
57 : }
58 :
59 : /// Response for debug dump request.
60 5 : #[derive(Debug, Serialize)]
61 : pub struct Response {
62 : pub start_time: DateTime<Utc>,
63 : pub finish_time: DateTime<Utc>,
64 : pub timelines: Vec<TimelineDumpSer>,
65 : pub timelines_count: usize,
66 : pub config: Config,
67 : }
68 :
69 : pub struct TimelineDumpSer {
70 : pub tli: Arc<crate::timeline::Timeline>,
71 : pub args: Args,
72 : pub runtime: Arc<tokio::runtime::Runtime>,
73 : }
74 :
75 : impl std::fmt::Debug for TimelineDumpSer {
76 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 0 : f.debug_struct("TimelineDumpSer")
78 0 : .field("tli", &self.tli.ttid)
79 0 : .field("args", &self.args)
80 0 : .finish()
81 0 : }
82 : }
83 :
84 : impl Serialize for TimelineDumpSer {
85 CBC 5 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
86 5 : where
87 5 : S: serde::Serializer,
88 5 : {
89 5 : let dump = self
90 5 : .runtime
91 5 : .block_on(build_from_tli_dump(self.tli.clone(), self.args.clone()));
92 5 : dump.serialize(serializer)
93 5 : }
94 : }
95 :
96 5 : async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Args) -> Timeline {
97 5 : let control_file = if args.dump_control_file {
98 5 : let mut state = timeline.get_state().await.1;
99 5 : if !args.dump_term_history {
100 UBC 0 : state.acceptor_state.term_history = TermHistory(vec![]);
101 CBC 5 : }
102 5 : Some(state)
103 : } else {
104 UBC 0 : None
105 : };
106 :
107 CBC 5 : let memory = if args.dump_memory {
108 5 : Some(timeline.memory_dump().await)
109 : } else {
110 UBC 0 : None
111 : };
112 :
113 CBC 5 : let disk_content = if args.dump_disk_content {
114 : // build_disk_content can fail, but we don't want to fail the whole
115 : // request because of that.
116 5 : build_disk_content(&timeline.timeline_dir).ok()
117 : } else {
118 UBC 0 : None
119 : };
120 :
121 CBC 5 : Timeline {
122 5 : tenant_id: timeline.ttid.tenant_id,
123 5 : timeline_id: timeline.ttid.timeline_id,
124 5 : control_file,
125 5 : memory,
126 5 : disk_content,
127 5 : }
128 5 : }
129 :
130 : /// Safekeeper configuration.
131 15 : #[derive(Debug, Serialize, Deserialize)]
132 : pub struct Config {
133 : pub id: NodeId,
134 : pub workdir: PathBuf,
135 : pub listen_pg_addr: String,
136 : pub listen_http_addr: String,
137 : pub no_sync: bool,
138 : pub max_offloader_lag_bytes: u64,
139 : pub wal_backup_enabled: bool,
140 : }
141 :
142 11 : #[derive(Debug, Serialize, Deserialize)]
143 : pub struct Timeline {
144 : pub tenant_id: TenantId,
145 : pub timeline_id: TimelineId,
146 : pub control_file: Option<SafeKeeperState>,
147 : pub memory: Option<Memory>,
148 : pub disk_content: Option<DiskContent>,
149 : }
150 :
151 27 : #[derive(Debug, Serialize, Deserialize)]
152 : pub struct Memory {
153 : pub is_cancelled: bool,
154 : pub peers_info_len: usize,
155 : pub walsenders: Vec<WalSenderState>,
156 : pub wal_backup_active: bool,
157 : pub active: bool,
158 : pub num_computes: u32,
159 : pub last_removed_segno: XLogSegNo,
160 : pub epoch_start_lsn: Lsn,
161 : pub mem_state: SafekeeperMemState,
162 :
163 : // PhysicalStorage state.
164 : pub write_lsn: Lsn,
165 : pub write_record_lsn: Lsn,
166 : pub flush_lsn: Lsn,
167 : pub file_open: bool,
168 : }
169 :
170 5 : #[derive(Debug, Serialize, Deserialize)]
171 : pub struct DiskContent {
172 : pub files: Vec<FileInfo>,
173 : }
174 :
175 39 : #[derive(Debug, Serialize, Deserialize)]
176 : pub struct FileInfo {
177 : pub name: String,
178 : pub size: u64,
179 : pub created: DateTime<Utc>,
180 : pub modified: DateTime<Utc>,
181 : pub start_zeroes: u64,
182 : pub end_zeroes: u64,
183 : // TODO: add sha256 checksum
184 : }
185 :
186 : /// Build debug dump response, using the provided [`Args`] filters.
187 5 : pub async fn build(args: Args) -> Result<Response> {
188 5 : let start_time = Utc::now();
189 5 : let timelines_count = GlobalTimelines::timelines_count();
190 :
191 5 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
192 : // If both tenant_id and timeline_id are specified, we can just get the
193 : // timeline directly, without taking a snapshot of the whole list.
194 1 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
195 1 : if let Ok(tli) = GlobalTimelines::get(ttid) {
196 1 : vec![tli]
197 : } else {
198 UBC 0 : vec![]
199 : }
200 : } else {
201 : // Otherwise, take a snapshot of the whole list.
202 CBC 4 : GlobalTimelines::get_all()
203 : };
204 :
205 5 : let mut timelines = Vec::new();
206 5 : let runtime = Arc::new(
207 5 : tokio::runtime::Builder::new_current_thread()
208 5 : .build()
209 5 : .unwrap(),
210 5 : );
211 10 : for tli in ptrs_snapshot {
212 5 : let ttid = tli.ttid;
213 5 : if let Some(tenant_id) = args.tenant_id {
214 1 : if tenant_id != ttid.tenant_id {
215 UBC 0 : continue;
216 CBC 1 : }
217 4 : }
218 5 : if let Some(timeline_id) = args.timeline_id {
219 1 : if timeline_id != ttid.timeline_id {
220 UBC 0 : continue;
221 CBC 1 : }
222 4 : }
223 :
224 5 : timelines.push(TimelineDumpSer {
225 5 : tli,
226 5 : args: args.clone(),
227 5 : runtime: runtime.clone(),
228 5 : });
229 : }
230 :
231 5 : let config = GlobalTimelines::get_global_config();
232 5 :
233 5 : Ok(Response {
234 5 : start_time,
235 5 : finish_time: Utc::now(),
236 5 : timelines,
237 5 : timelines_count,
238 5 : config: build_config(config),
239 5 : })
240 5 : }
241 :
242 : /// Builds DiskContent from a directory path. It can fail if the directory
243 : /// is deleted between the time we get the path and the time we try to open it.
244 5 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
245 5 : let mut files = Vec::new();
246 11 : for entry in fs::read_dir(path)? {
247 11 : if entry.is_err() {
248 UBC 0 : continue;
249 CBC 11 : }
250 11 : let file = build_file_info(entry?);
251 11 : if file.is_err() {
252 UBC 0 : continue;
253 CBC 11 : }
254 11 : files.push(file?);
255 : }
256 :
257 5 : Ok(DiskContent { files })
258 5 : }
259 :
260 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
261 : /// if the file is deleted between the time we get the DirEntry
262 : /// and the time we try to open it.
263 11 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
264 11 : let metadata = entry.metadata()?;
265 11 : let path = entry.path();
266 11 : let name = path
267 11 : .file_name()
268 11 : .and_then(|x| x.to_str())
269 11 : .unwrap_or("")
270 11 : .to_owned();
271 11 : let mut file = fs::File::open(path)?;
272 100664508 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
273 11 :
274 34533291 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
275 11 : let mut end_zeroes = 0;
276 66131228 : for b in reader {
277 66131217 : if b == 0 {
278 58077587 : end_zeroes += 1;
279 58077587 : } else {
280 8053630 : end_zeroes = 0;
281 8053630 : }
282 : }
283 :
284 : Ok(FileInfo {
285 11 : name,
286 11 : size: metadata.len(),
287 11 : created: DateTime::from(metadata.created()?),
288 11 : modified: DateTime::from(metadata.modified()?),
289 11 : start_zeroes,
290 11 : end_zeroes,
291 : })
292 11 : }
293 :
294 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
295 : /// supposed to be exposed.
296 5 : fn build_config(config: SafeKeeperConf) -> Config {
297 5 : Config {
298 5 : id: config.my_id,
299 5 : workdir: config.workdir.into(),
300 5 : listen_pg_addr: config.listen_pg_addr,
301 5 : listen_http_addr: config.listen_http_addr,
302 5 : no_sync: config.no_sync,
303 5 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
304 5 : wal_backup_enabled: config.wal_backup_enabled,
305 5 : }
306 5 : }
307 :
308 UBC 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
309 : pub struct TimelineDigestRequest {
310 : pub from_lsn: Lsn,
311 : pub until_lsn: Lsn,
312 : }
313 :
314 CBC 64 : #[derive(Debug, Serialize, Deserialize)]
315 : pub struct TimelineDigest {
316 : pub sha256: String,
317 : }
318 :
319 64 : pub async fn calculate_digest(
320 64 : tli: &Arc<crate::timeline::Timeline>,
321 64 : request: TimelineDigestRequest,
322 64 : ) -> Result<TimelineDigest> {
323 64 : if request.from_lsn > request.until_lsn {
324 UBC 0 : bail!("from_lsn is greater than until_lsn");
325 CBC 64 : }
326 64 :
327 64 : let conf = GlobalTimelines::get_global_config();
328 64 : let (_, persisted_state) = tli.get_state().await;
329 :
330 64 : if persisted_state.timeline_start_lsn > request.from_lsn {
331 UBC 0 : bail!("requested LSN is before the start of the timeline");
332 CBC 64 : }
333 :
334 64 : let mut wal_reader = WalReader::new(
335 64 : conf.workdir.clone(),
336 64 : tli.timeline_dir.clone(),
337 64 : &persisted_state,
338 64 : request.from_lsn,
339 64 : true,
340 64 : )?;
341 :
342 64 : let mut hasher = Sha256::new();
343 64 : let mut buf = [0u8; MAX_SEND_SIZE];
344 64 :
345 64 : let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
346 6828 : while bytes_left > 0 {
347 6764 : let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
348 6764 : let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
349 6764 : if bytes_read == 0 {
350 UBC 0 : bail!("wal_reader.read returned 0 bytes");
351 CBC 6764 : }
352 6764 : hasher.update(&buf[..bytes_read]);
353 6764 : bytes_left -= bytes_read;
354 : }
355 :
356 64 : let digest = hasher.finalize();
357 64 : let digest = hex::encode(digest);
358 64 : Ok(TimelineDigest { sha256: digest })
359 64 : }
|