Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::BufReader;
6 : use std::io::Read;
7 : use std::path::PathBuf;
8 : use std::sync::Arc;
9 :
10 : use anyhow::bail;
11 : use anyhow::Result;
12 : use camino::Utf8Path;
13 : use camino::Utf8PathBuf;
14 : use chrono::{DateTime, Utc};
15 : use postgres_ffi::XLogSegNo;
16 : use postgres_ffi::MAX_SEND_SIZE;
17 : use serde::Deserialize;
18 : use serde::Serialize;
19 :
20 : use sha2::{Digest, Sha256};
21 : use utils::id::NodeId;
22 : use utils::id::TenantTimelineId;
23 : use utils::id::{TenantId, TimelineId};
24 : use utils::lsn::Lsn;
25 :
26 : use crate::safekeeper::TermHistory;
27 : use crate::send_wal::WalSenderState;
28 : use crate::state::TimelineMemState;
29 : use crate::state::TimelinePersistentState;
30 : use crate::timeline::get_timeline_dir;
31 : use crate::timeline::FullAccessTimeline;
32 : use crate::GlobalTimelines;
33 : use crate::SafeKeeperConf;
34 :
35 : /// Various filters that influence the resulting JSON output.
36 0 : #[derive(Debug, Serialize, Deserialize, Clone)]
37 : pub struct Args {
38 : /// Dump all available safekeeper state. False by default.
39 : pub dump_all: bool,
40 :
41 : /// Dump control_file content. Uses value of `dump_all` by default.
42 : pub dump_control_file: bool,
43 :
44 : /// Dump in-memory state. Uses value of `dump_all` by default.
45 : pub dump_memory: bool,
46 :
47 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
48 : pub dump_disk_content: bool,
49 :
50 : /// Dump full term history. True by default.
51 : pub dump_term_history: bool,
52 :
53 : /// Filter timelines by tenant_id.
54 : pub tenant_id: Option<TenantId>,
55 :
56 : /// Filter timelines by timeline_id.
57 : pub timeline_id: Option<TimelineId>,
58 : }
59 :
60 : /// Response for debug dump request.
61 : #[derive(Debug, Serialize)]
62 : pub struct Response {
63 : pub start_time: DateTime<Utc>,
64 : pub finish_time: DateTime<Utc>,
65 : pub timelines: Vec<TimelineDumpSer>,
66 : pub timelines_count: usize,
67 : pub config: Config,
68 : }
69 :
70 : pub struct TimelineDumpSer {
71 : pub tli: Arc<crate::timeline::Timeline>,
72 : pub args: Args,
73 : pub timeline_dir: Utf8PathBuf,
74 : pub runtime: Arc<tokio::runtime::Runtime>,
75 : }
76 :
77 : impl std::fmt::Debug for TimelineDumpSer {
78 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 0 : f.debug_struct("TimelineDumpSer")
80 0 : .field("tli", &self.tli.ttid)
81 0 : .field("args", &self.args)
82 0 : .finish()
83 0 : }
84 : }
85 :
86 : impl Serialize for TimelineDumpSer {
87 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
88 0 : where
89 0 : S: serde::Serializer,
90 0 : {
91 0 : let dump = self.runtime.block_on(build_from_tli_dump(
92 0 : &self.tli,
93 0 : &self.args,
94 0 : &self.timeline_dir,
95 0 : ));
96 0 : dump.serialize(serializer)
97 0 : }
98 : }
99 :
100 0 : async fn build_from_tli_dump(
101 0 : timeline: &Arc<crate::timeline::Timeline>,
102 0 : args: &Args,
103 0 : timeline_dir: &Utf8Path,
104 0 : ) -> Timeline {
105 0 : let control_file = if args.dump_control_file {
106 0 : let mut state = timeline.get_state().await.1;
107 0 : if !args.dump_term_history {
108 0 : state.acceptor_state.term_history = TermHistory(vec![]);
109 0 : }
110 0 : Some(state)
111 : } else {
112 0 : None
113 : };
114 :
115 0 : let memory = if args.dump_memory {
116 0 : Some(timeline.memory_dump().await)
117 : } else {
118 0 : None
119 : };
120 :
121 0 : let disk_content = if args.dump_disk_content {
122 : // build_disk_content can fail, but we don't want to fail the whole
123 : // request because of that.
124 : // Note: timeline can be in offloaded state, this is not a problem.
125 0 : build_disk_content(timeline_dir).ok()
126 : } else {
127 0 : None
128 : };
129 :
130 0 : Timeline {
131 0 : tenant_id: timeline.ttid.tenant_id,
132 0 : timeline_id: timeline.ttid.timeline_id,
133 0 : control_file,
134 0 : memory,
135 0 : disk_content,
136 0 : }
137 0 : }
138 :
139 : /// Safekeeper configuration.
140 0 : #[derive(Debug, Serialize, Deserialize)]
141 : pub struct Config {
142 : pub id: NodeId,
143 : pub workdir: PathBuf,
144 : pub listen_pg_addr: String,
145 : pub listen_http_addr: String,
146 : pub no_sync: bool,
147 : pub max_offloader_lag_bytes: u64,
148 : pub wal_backup_enabled: bool,
149 : }
150 :
151 0 : #[derive(Debug, Serialize, Deserialize)]
152 : pub struct Timeline {
153 : pub tenant_id: TenantId,
154 : pub timeline_id: TimelineId,
155 : pub control_file: Option<TimelinePersistentState>,
156 : pub memory: Option<Memory>,
157 : pub disk_content: Option<DiskContent>,
158 : }
159 :
160 0 : #[derive(Debug, Serialize, Deserialize)]
161 : pub struct Memory {
162 : pub is_cancelled: bool,
163 : pub peers_info_len: usize,
164 : pub walsenders: Vec<WalSenderState>,
165 : pub wal_backup_active: bool,
166 : pub active: bool,
167 : pub num_computes: u32,
168 : pub last_removed_segno: XLogSegNo,
169 : pub epoch_start_lsn: Lsn,
170 : pub mem_state: TimelineMemState,
171 :
172 : // PhysicalStorage state.
173 : pub write_lsn: Lsn,
174 : pub write_record_lsn: Lsn,
175 : pub flush_lsn: Lsn,
176 : pub file_open: bool,
177 : }
178 :
179 0 : #[derive(Debug, Serialize, Deserialize)]
180 : pub struct DiskContent {
181 : pub files: Vec<FileInfo>,
182 : }
183 :
184 0 : #[derive(Debug, Serialize, Deserialize)]
185 : pub struct FileInfo {
186 : pub name: String,
187 : pub size: u64,
188 : pub created: DateTime<Utc>,
189 : pub modified: DateTime<Utc>,
190 : pub start_zeroes: u64,
191 : pub end_zeroes: u64,
192 : // TODO: add sha256 checksum
193 : }
194 :
195 : /// Build debug dump response, using the provided [`Args`] filters.
196 0 : pub async fn build(args: Args) -> Result<Response> {
197 0 : let start_time = Utc::now();
198 0 : let timelines_count = GlobalTimelines::timelines_count();
199 0 : let config = GlobalTimelines::get_global_config();
200 :
201 0 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
202 : // If both tenant_id and timeline_id are specified, we can just get the
203 : // timeline directly, without taking a snapshot of the whole list.
204 0 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
205 0 : if let Ok(tli) = GlobalTimelines::get(ttid) {
206 0 : vec![tli]
207 : } else {
208 0 : vec![]
209 : }
210 : } else {
211 : // Otherwise, take a snapshot of the whole list.
212 0 : GlobalTimelines::get_all()
213 : };
214 :
215 0 : let mut timelines = Vec::new();
216 0 : let runtime = Arc::new(
217 0 : tokio::runtime::Builder::new_current_thread()
218 0 : .build()
219 0 : .unwrap(),
220 0 : );
221 0 : for tli in ptrs_snapshot {
222 0 : let ttid = tli.ttid;
223 0 : if let Some(tenant_id) = args.tenant_id {
224 0 : if tenant_id != ttid.tenant_id {
225 0 : continue;
226 0 : }
227 0 : }
228 0 : if let Some(timeline_id) = args.timeline_id {
229 0 : if timeline_id != ttid.timeline_id {
230 0 : continue;
231 0 : }
232 0 : }
233 :
234 0 : timelines.push(TimelineDumpSer {
235 0 : tli,
236 0 : args: args.clone(),
237 0 : timeline_dir: get_timeline_dir(&config, &ttid),
238 0 : runtime: runtime.clone(),
239 0 : });
240 : }
241 :
242 0 : Ok(Response {
243 0 : start_time,
244 0 : finish_time: Utc::now(),
245 0 : timelines,
246 0 : timelines_count,
247 0 : config: build_config(config),
248 0 : })
249 0 : }
250 :
251 : /// Builds DiskContent from a directory path. It can fail if the directory
252 : /// is deleted between the time we get the path and the time we try to open it.
253 0 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
254 0 : let mut files = Vec::new();
255 0 : for entry in fs::read_dir(path)? {
256 0 : if entry.is_err() {
257 0 : continue;
258 0 : }
259 0 : let file = build_file_info(entry?);
260 0 : if file.is_err() {
261 0 : continue;
262 0 : }
263 0 : files.push(file?);
264 : }
265 :
266 0 : Ok(DiskContent { files })
267 0 : }
268 :
269 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
270 : /// if the file is deleted between the time we get the DirEntry
271 : /// and the time we try to open it.
272 0 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
273 0 : let metadata = entry.metadata()?;
274 0 : let path = entry.path();
275 0 : let name = path
276 0 : .file_name()
277 0 : .and_then(|x| x.to_str())
278 0 : .unwrap_or("")
279 0 : .to_owned();
280 0 : let mut file = fs::File::open(path)?;
281 0 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
282 0 :
283 0 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
284 0 : let mut end_zeroes = 0;
285 0 : for b in reader {
286 0 : if b == 0 {
287 0 : end_zeroes += 1;
288 0 : } else {
289 0 : end_zeroes = 0;
290 0 : }
291 : }
292 :
293 : Ok(FileInfo {
294 0 : name,
295 0 : size: metadata.len(),
296 0 : created: DateTime::from(metadata.created()?),
297 0 : modified: DateTime::from(metadata.modified()?),
298 0 : start_zeroes,
299 0 : end_zeroes,
300 : })
301 0 : }
302 :
303 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
304 : /// supposed to be exposed.
305 0 : fn build_config(config: SafeKeeperConf) -> Config {
306 0 : Config {
307 0 : id: config.my_id,
308 0 : workdir: config.workdir.into(),
309 0 : listen_pg_addr: config.listen_pg_addr,
310 0 : listen_http_addr: config.listen_http_addr,
311 0 : no_sync: config.no_sync,
312 0 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
313 0 : wal_backup_enabled: config.wal_backup_enabled,
314 0 : }
315 0 : }
316 :
317 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
318 : pub struct TimelineDigestRequest {
319 : pub from_lsn: Lsn,
320 : pub until_lsn: Lsn,
321 : }
322 :
323 0 : #[derive(Debug, Serialize, Deserialize)]
324 : pub struct TimelineDigest {
325 : pub sha256: String,
326 : }
327 :
328 0 : pub async fn calculate_digest(
329 0 : tli: &FullAccessTimeline,
330 0 : request: TimelineDigestRequest,
331 0 : ) -> Result<TimelineDigest> {
332 0 : if request.from_lsn > request.until_lsn {
333 0 : bail!("from_lsn is greater than until_lsn");
334 0 : }
335 :
336 0 : let (_, persisted_state) = tli.get_state().await;
337 0 : if persisted_state.timeline_start_lsn > request.from_lsn {
338 0 : bail!("requested LSN is before the start of the timeline");
339 0 : }
340 :
341 0 : let mut wal_reader = tli.get_walreader(request.from_lsn).await?;
342 :
343 0 : let mut hasher = Sha256::new();
344 0 : let mut buf = [0u8; MAX_SEND_SIZE];
345 0 :
346 0 : let mut bytes_left = (request.until_lsn.0 - request.from_lsn.0) as usize;
347 0 : while bytes_left > 0 {
348 0 : let bytes_to_read = std::cmp::min(buf.len(), bytes_left);
349 0 : let bytes_read = wal_reader.read(&mut buf[..bytes_to_read]).await?;
350 0 : if bytes_read == 0 {
351 0 : bail!("wal_reader.read returned 0 bytes");
352 0 : }
353 0 : hasher.update(&buf[..bytes_read]);
354 0 : bytes_left -= bytes_read;
355 : }
356 :
357 0 : let digest = hasher.finalize();
358 0 : let digest = hex::encode(digest);
359 0 : Ok(TimelineDigest { sha256: digest })
360 0 : }
|