Line data Source code
1 : //! A tool for working with read traces generated by the pageserver.
2 : use std::collections::HashMap;
3 : use std::path::PathBuf;
4 : use std::str::FromStr;
5 : use std::{
6 : fs::{read_dir, File},
7 : io::BufReader,
8 : };
9 :
10 : use pageserver_api::models::{
11 : PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest,
12 : };
13 : use utils::id::{ConnectionId, TenantId, TimelineId};
14 :
15 : use clap::{Parser, Subcommand};
16 :
17 : /// Utils for working with pageserver read traces. For generating
18 : /// traces, see the `trace_read_requests` tenant config option.
19 0 : #[derive(Parser, Debug)]
20 : #[command(author, version, about, long_about = None)]
21 : struct Args {
22 : /// Path of trace directory
23 : #[arg(short, long)]
24 0 : path: PathBuf,
25 :
26 : #[command(subcommand)]
27 : command: Command,
28 : }
29 :
30 : /// What to do with the read trace
31 0 : #[derive(Subcommand, Debug)]
32 : enum Command {
33 : /// List traces in the directory
34 : List,
35 :
36 : /// Print the traces in text format
37 : Dump,
38 :
39 : /// Print stats and anomalies about the traces
40 : Analyze,
41 :
42 : /// Draw the traces in svg format
43 : Draw,
44 :
45 : /// Send the read requests to a pageserver
46 : Replay,
47 : }
48 :
49 : // HACK This function will change and improve as we see what kind of analysis is useful.
50 : // Currently it collects the difference in blkno of consecutive GetPage requests,
51 : // and counts the frequency of each value. This information is useful in order to:
52 : // - see how sequential a workload is by seeing how often the delta is 1
53 : // - detect any prefetching anomalies by looking for negative deltas during seqscan
54 0 : fn analyze_trace<R: std::io::Read>(mut reader: R) {
55 0 : let mut total = 0; // Total requests traced
56 0 : let mut old = 0; // Old requests traced
57 0 : let mut cross_rel = 0; // Requests that ask for different rel than previous request
58 0 : let mut deltas = HashMap::<i32, u32>::new(); // Consecutive blkno differences
59 0 : let mut prev: Option<PagestreamGetPageRequest> = None;
60 0 : let mut old_prev: Option<PagestreamGetLatestPageRequest> = None;
61 :
62 : // Compute stats
63 0 : while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
64 0 : match msg {
65 0 : PagestreamFeMessage::Exists(_) => {}
66 0 : PagestreamFeMessage::Nblocks(_) => {}
67 0 : PagestreamFeMessage::GetSlruSegment(_) => {}
68 0 : PagestreamFeMessage::GetLatestPage(req) => {
69 0 : total += 1;
70 0 : old += 1;
71 :
72 0 : if let Some(prev) = old_prev {
73 0 : if prev.rel == req.rel {
74 0 : let delta = (req.blkno as i32) - (prev.blkno as i32);
75 0 : deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1);
76 0 : } else {
77 0 : cross_rel += 1;
78 0 : }
79 0 : }
80 0 : old_prev = Some(req);
81 : }
82 0 : PagestreamFeMessage::GetPage(req) => {
83 0 : total += 1;
84 :
85 0 : if let Some(prev) = prev {
86 0 : if prev.rel == req.rel {
87 0 : let delta = (req.blkno as i32) - (prev.blkno as i32);
88 0 : deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1);
89 0 : } else {
90 0 : cross_rel += 1;
91 0 : }
92 0 : }
93 0 : prev = Some(req);
94 : }
95 0 : PagestreamFeMessage::DbSize(_) => {}
96 : };
97 : }
98 :
99 : // Print stats.
100 0 : let mut other = deltas.len();
101 0 : deltas.retain(|_, count| *count > 300);
102 0 : other -= deltas.len();
103 0 : dbg!(total);
104 0 : dbg!(old);
105 0 : dbg!(cross_rel);
106 0 : dbg!(other);
107 0 : dbg!(deltas);
108 0 : }
109 :
110 0 : fn dump_trace<R: std::io::Read>(mut reader: R) {
111 0 : while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
112 0 : println!("{msg:?}");
113 0 : }
114 0 : }
115 :
116 0 : #[derive(Debug)]
117 : struct TraceFile {
118 : #[allow(dead_code)]
119 : pub tenant_id: TenantId,
120 :
121 : #[allow(dead_code)]
122 : pub timeline_id: TimelineId,
123 :
124 : #[allow(dead_code)]
125 : pub connection_id: ConnectionId,
126 :
127 : pub path: PathBuf,
128 : }
129 :
130 0 : fn get_trace_files(traces_dir: &PathBuf) -> anyhow::Result<Vec<TraceFile>> {
131 0 : let mut trace_files = Vec::<TraceFile>::new();
132 :
133 : // Trace files are organized as {tenant_id}/{timeline_id}/{connection_id}
134 0 : for tenant_dir in read_dir(traces_dir)? {
135 0 : let entry = tenant_dir?;
136 0 : let path = entry.path();
137 0 : let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
138 :
139 0 : for timeline_dir in read_dir(path)? {
140 0 : let entry = timeline_dir?;
141 0 : let path = entry.path();
142 0 : let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
143 :
144 0 : for trace_dir in read_dir(path)? {
145 0 : let entry = trace_dir?;
146 0 : let path = entry.path();
147 0 : let connection_id =
148 0 : ConnectionId::from_str(path.file_name().unwrap().to_str().unwrap())?;
149 :
150 0 : trace_files.push(TraceFile {
151 0 : tenant_id,
152 0 : timeline_id,
153 0 : connection_id,
154 0 : path,
155 0 : });
156 : }
157 : }
158 : }
159 :
160 0 : Ok(trace_files)
161 0 : }
162 :
163 0 : fn main() -> anyhow::Result<()> {
164 0 : let args = Args::parse();
165 0 :
166 0 : match args.command {
167 : Command::List => {
168 0 : for trace_file in get_trace_files(&args.path)? {
169 0 : println!("{trace_file:?}");
170 0 : }
171 : }
172 : Command::Dump => {
173 0 : for trace_file in get_trace_files(&args.path)? {
174 0 : let file = File::open(trace_file.path.clone())?;
175 0 : let reader = BufReader::new(file);
176 0 : dump_trace(reader);
177 : }
178 : }
179 : Command::Analyze => {
180 0 : for trace_file in get_trace_files(&args.path)? {
181 0 : println!("analyzing {trace_file:?}");
182 0 : let file = File::open(trace_file.path.clone())?;
183 0 : let reader = BufReader::new(file);
184 0 : analyze_trace(reader);
185 : }
186 : }
187 0 : Command::Draw => todo!(),
188 0 : Command::Replay => todo!(),
189 : }
190 :
191 0 : Ok(())
192 0 : }
|