TLA Line data Source code
1 : //! Utils for dumping full state of the safekeeper.
2 :
3 : use std::fs;
4 : use std::fs::DirEntry;
5 : use std::io::BufReader;
6 : use std::io::Read;
7 : use std::path::PathBuf;
8 :
9 : use anyhow::Result;
10 : use camino::Utf8Path;
11 : use chrono::{DateTime, Utc};
12 : use postgres_ffi::XLogSegNo;
13 : use serde::Deserialize;
14 : use serde::Serialize;
15 :
16 : use serde_with::{serde_as, DisplayFromStr};
17 : use utils::id::NodeId;
18 : use utils::id::TenantTimelineId;
19 : use utils::id::{TenantId, TimelineId};
20 : use utils::lsn::Lsn;
21 :
22 : use crate::safekeeper::SafeKeeperState;
23 : use crate::safekeeper::SafekeeperMemState;
24 : use crate::safekeeper::TermHistory;
25 : use crate::SafeKeeperConf;
26 :
27 : use crate::send_wal::WalSenderState;
28 : use crate::GlobalTimelines;
29 :
30 : /// Various filters that influence the resulting JSON output.
31 UBC 0 : #[derive(Debug, Serialize, Deserialize)]
32 : pub struct Args {
33 : /// Dump all available safekeeper state. False by default.
34 : pub dump_all: bool,
35 :
36 : /// Dump control_file content. Uses value of `dump_all` by default.
37 : pub dump_control_file: bool,
38 :
39 : /// Dump in-memory state. Uses value of `dump_all` by default.
40 : pub dump_memory: bool,
41 :
42 : /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
43 : pub dump_disk_content: bool,
44 :
45 : /// Dump full term history. True by default.
46 : pub dump_term_history: bool,
47 :
48 : /// Filter timelines by tenant_id.
49 : pub tenant_id: Option<TenantId>,
50 :
51 : /// Filter timelines by timeline_id.
52 : pub timeline_id: Option<TimelineId>,
53 : }
54 :
55 : /// Response for debug dump request.
56 CBC 11 : #[derive(Debug, Serialize, Deserialize)]
57 : pub struct Response {
58 : pub start_time: DateTime<Utc>,
59 : pub finish_time: DateTime<Utc>,
60 : pub timelines: Vec<Timeline>,
61 : pub timelines_count: usize,
62 : pub config: Config,
63 : }
64 :
65 : /// Safekeeper configuration.
66 15 : #[derive(Debug, Serialize, Deserialize)]
67 : pub struct Config {
68 : pub id: NodeId,
69 : pub workdir: PathBuf,
70 : pub listen_pg_addr: String,
71 : pub listen_http_addr: String,
72 : pub no_sync: bool,
73 : pub max_offloader_lag_bytes: u64,
74 : pub wal_backup_enabled: bool,
75 : }
76 :
77 : #[serde_as]
78 13 : #[derive(Debug, Serialize, Deserialize)]
79 : pub struct Timeline {
80 : #[serde_as(as = "DisplayFromStr")]
81 : pub tenant_id: TenantId,
82 : #[serde_as(as = "DisplayFromStr")]
83 : pub timeline_id: TimelineId,
84 : pub control_file: Option<SafeKeeperState>,
85 : pub memory: Option<Memory>,
86 : pub disk_content: Option<DiskContent>,
87 : }
88 :
89 27 : #[derive(Debug, Serialize, Deserialize)]
90 : pub struct Memory {
91 : pub is_cancelled: bool,
92 : pub peers_info_len: usize,
93 : pub walsenders: Vec<WalSenderState>,
94 : pub wal_backup_active: bool,
95 : pub active: bool,
96 : pub num_computes: u32,
97 : pub last_removed_segno: XLogSegNo,
98 : pub epoch_start_lsn: Lsn,
99 : pub mem_state: SafekeeperMemState,
100 :
101 : // PhysicalStorage state.
102 : pub write_lsn: Lsn,
103 : pub write_record_lsn: Lsn,
104 : pub flush_lsn: Lsn,
105 : pub file_open: bool,
106 : }
107 :
108 5 : #[derive(Debug, Serialize, Deserialize)]
109 : pub struct DiskContent {
110 : pub files: Vec<FileInfo>,
111 : }
112 :
113 39 : #[derive(Debug, Serialize, Deserialize)]
114 : pub struct FileInfo {
115 : pub name: String,
116 : pub size: u64,
117 : pub created: DateTime<Utc>,
118 : pub modified: DateTime<Utc>,
119 : pub start_zeroes: u64,
120 : pub end_zeroes: u64,
121 : // TODO: add sha256 checksum
122 : }
123 :
124 : /// Build debug dump response, using the provided [`Args`] filters.
125 5 : pub async fn build(args: Args) -> Result<Response> {
126 5 : let start_time = Utc::now();
127 5 : let timelines_count = GlobalTimelines::timelines_count();
128 :
129 5 : let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
130 : // If both tenant_id and timeline_id are specified, we can just get the
131 : // timeline directly, without taking a snapshot of the whole list.
132 1 : let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
133 1 : if let Ok(tli) = GlobalTimelines::get(ttid) {
134 1 : vec![tli]
135 : } else {
136 UBC 0 : vec![]
137 : }
138 : } else {
139 : // Otherwise, take a snapshot of the whole list.
140 CBC 4 : GlobalTimelines::get_all()
141 : };
142 :
143 : // TODO: return Stream instead of Vec
144 5 : let mut timelines = Vec::new();
145 10 : for tli in ptrs_snapshot {
146 5 : let ttid = tli.ttid;
147 5 : if let Some(tenant_id) = args.tenant_id {
148 1 : if tenant_id != ttid.tenant_id {
149 UBC 0 : continue;
150 CBC 1 : }
151 4 : }
152 5 : if let Some(timeline_id) = args.timeline_id {
153 1 : if timeline_id != ttid.timeline_id {
154 UBC 0 : continue;
155 CBC 1 : }
156 4 : }
157 :
158 5 : let control_file = if args.dump_control_file {
159 5 : let mut state = tli.get_state().await.1;
160 5 : if !args.dump_term_history {
161 UBC 0 : state.acceptor_state.term_history = TermHistory(vec![]);
162 CBC 5 : }
163 5 : Some(state)
164 : } else {
165 UBC 0 : None
166 : };
167 :
168 CBC 5 : let memory = if args.dump_memory {
169 5 : Some(tli.memory_dump().await)
170 : } else {
171 UBC 0 : None
172 : };
173 :
174 CBC 5 : let disk_content = if args.dump_disk_content {
175 : // build_disk_content can fail, but we don't want to fail the whole
176 : // request because of that.
177 5 : build_disk_content(&tli.timeline_dir).ok()
178 : } else {
179 UBC 0 : None
180 : };
181 :
182 CBC 5 : let timeline = Timeline {
183 5 : tenant_id: ttid.tenant_id,
184 5 : timeline_id: ttid.timeline_id,
185 5 : control_file,
186 5 : memory,
187 5 : disk_content,
188 5 : };
189 5 : timelines.push(timeline);
190 : }
191 :
192 5 : let config = GlobalTimelines::get_global_config();
193 5 :
194 5 : Ok(Response {
195 5 : start_time,
196 5 : finish_time: Utc::now(),
197 5 : timelines,
198 5 : timelines_count,
199 5 : config: build_config(config),
200 5 : })
201 5 : }
202 :
203 : /// Builds DiskContent from a directory path. It can fail if the directory
204 : /// is deleted between the time we get the path and the time we try to open it.
205 5 : fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
206 5 : let mut files = Vec::new();
207 11 : for entry in fs::read_dir(path)? {
208 11 : if entry.is_err() {
209 UBC 0 : continue;
210 CBC 11 : }
211 11 : let file = build_file_info(entry?);
212 11 : if file.is_err() {
213 UBC 0 : continue;
214 CBC 11 : }
215 11 : files.push(file?);
216 : }
217 :
218 5 : Ok(DiskContent { files })
219 5 : }
220 :
221 : /// Builds FileInfo from DirEntry. Sometimes it can return an error
222 : /// if the file is deleted between the time we get the DirEntry
223 : /// and the time we try to open it.
224 11 : fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
225 11 : let metadata = entry.metadata()?;
226 11 : let path = entry.path();
227 11 : let name = path
228 11 : .file_name()
229 11 : .and_then(|x| x.to_str())
230 11 : .unwrap_or("")
231 11 : .to_owned();
232 11 : let mut file = fs::File::open(path)?;
233 100664508 : let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
234 11 :
235 34530451 : let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
236 11 : let mut end_zeroes = 0;
237 66134068 : for b in reader {
238 66134057 : if b == 0 {
239 58076784 : end_zeroes += 1;
240 58076784 : } else {
241 8057273 : end_zeroes = 0;
242 8057273 : }
243 : }
244 :
245 : Ok(FileInfo {
246 11 : name,
247 11 : size: metadata.len(),
248 11 : created: DateTime::from(metadata.created()?),
249 11 : modified: DateTime::from(metadata.modified()?),
250 11 : start_zeroes,
251 11 : end_zeroes,
252 : })
253 11 : }
254 :
255 : /// Converts SafeKeeperConf to Config, filtering out the fields that are not
256 : /// supposed to be exposed.
257 5 : fn build_config(config: SafeKeeperConf) -> Config {
258 5 : Config {
259 5 : id: config.my_id,
260 5 : workdir: config.workdir.into(),
261 5 : listen_pg_addr: config.listen_pg_addr,
262 5 : listen_http_addr: config.listen_http_addr,
263 5 : no_sync: config.no_sync,
264 5 : max_offloader_lag_bytes: config.max_offloader_lag_bytes,
265 5 : wal_backup_enabled: config.wal_backup_enabled,
266 5 : }
267 5 : }
|