TLA Line data Source code
1 : use anyhow::Context;
2 : use futures::future::join_all;
3 : use pageserver::pgdatadir_mapping::key_to_rel_block;
4 : use pageserver::repository;
5 : use pageserver_api::key::is_rel_block_key;
6 : use pageserver_api::models::PagestreamGetPageRequest;
7 :
8 : use utils::id::TenantTimelineId;
9 : use utils::lsn::Lsn;
10 :
11 : use rand::prelude::*;
12 : use tokio::sync::Barrier;
13 : use tokio::task::JoinSet;
14 : use tracing::{info, instrument};
15 :
16 : use std::collections::HashMap;
17 : use std::future::Future;
18 : use std::num::NonZeroUsize;
19 : use std::pin::Pin;
20 : use std::sync::atomic::{AtomicU64, Ordering};
21 : use std::sync::{Arc, Mutex};
22 : use std::time::{Duration, Instant};
23 :
24 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
25 : use crate::util::{request_stats, tokio_thread_local_stats};
26 :
27 : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
28 UBC 0 : #[derive(clap::Parser)]
29 : pub(crate) struct Args {
30 : #[clap(long, default_value = "http://localhost:9898")]
31 0 : mgmt_api_endpoint: String,
32 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
33 0 : page_service_connstring: String,
34 : #[clap(long)]
35 : pageserver_jwt: Option<String>,
36 : #[clap(long, default_value = "1")]
37 0 : num_clients: NonZeroUsize,
38 : #[clap(long)]
39 : runtime: Option<humantime::Duration>,
40 : #[clap(long)]
41 : per_target_rate_limit: Option<usize>,
42 : /// Probability for sending `latest=true` in the request (uniform distribution).
43 : #[clap(long, default_value = "1")]
44 0 : req_latest_probability: f64,
45 : #[clap(long)]
46 : limit_to_first_n_targets: Option<usize>,
47 0 : targets: Option<Vec<TenantTimelineId>>,
48 : }
49 :
50 0 : #[derive(Debug, Default)]
51 : struct LiveStats {
52 : completed_requests: AtomicU64,
53 : }
54 :
55 : impl LiveStats {
56 0 : fn inc(&self) {
57 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
58 0 : }
59 : }
60 :
61 0 : #[derive(Clone)]
62 : struct KeyRange {
63 : timeline: TenantTimelineId,
64 : timeline_lsn: Lsn,
65 : start: i128,
66 : end: i128,
67 : }
68 :
69 : impl KeyRange {
70 0 : fn len(&self) -> i128 {
71 0 : self.end - self.start
72 0 : }
73 : }
74 :
75 0 : #[derive(serde::Serialize)]
76 : struct Output {
77 : total: request_stats::Output,
78 : }
79 :
80 0 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
81 :
82 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
83 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
84 0 : main_impl(args, thread_local_stats)
85 0 : })
86 0 : }
87 :
88 0 : async fn main_impl(
89 0 : args: Args,
90 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
91 0 : ) -> anyhow::Result<()> {
92 0 : let args: &'static Args = Box::leak(Box::new(args));
93 0 :
94 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
95 0 : args.mgmt_api_endpoint.clone(),
96 0 : args.pageserver_jwt.as_deref(),
97 0 : ));
98 :
99 : // discover targets
100 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
101 0 : &mgmt_api_client,
102 0 : crate::util::cli::targets::Spec {
103 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
104 0 : targets: args.targets.clone(),
105 0 : },
106 0 : )
107 0 : .await?;
108 :
109 0 : let mut js = JoinSet::new();
110 0 : for timeline in &timelines {
111 0 : js.spawn({
112 0 : let mgmt_api_client = Arc::clone(&mgmt_api_client);
113 0 : let timeline = *timeline;
114 0 : async move {
115 0 : let partitioning = mgmt_api_client
116 0 : .keyspace(timeline.tenant_id, timeline.timeline_id)
117 0 : .await?;
118 0 : let lsn = partitioning.at_lsn;
119 0 :
120 0 : let ranges = partitioning
121 0 : .keys
122 0 : .ranges
123 0 : .iter()
124 0 : .filter_map(|r| {
125 0 : let start = r.start;
126 0 : let end = r.end;
127 0 : // filter out non-relblock keys
128 0 : match (is_rel_block_key(&start), is_rel_block_key(&end)) {
129 0 : (true, true) => Some(KeyRange {
130 0 : timeline,
131 0 : timeline_lsn: lsn,
132 0 : start: start.to_i128(),
133 0 : end: end.to_i128(),
134 0 : }),
135 : (true, false) | (false, true) => {
136 0 : unimplemented!("split up range")
137 : }
138 0 : (false, false) => None,
139 : }
140 0 : })
141 0 : .collect::<Vec<_>>();
142 0 :
143 0 : anyhow::Ok(ranges)
144 0 : }
145 0 : });
146 0 : }
147 0 : let mut all_ranges: Vec<KeyRange> = Vec::new();
148 0 : while let Some(res) = js.join_next().await {
149 0 : all_ranges.extend(res.unwrap().unwrap());
150 0 : }
151 :
152 0 : let live_stats = Arc::new(LiveStats::default());
153 0 :
154 0 : let num_client_tasks = timelines.len();
155 0 : let num_live_stats_dump = 1;
156 0 : let num_work_sender_tasks = 1;
157 0 :
158 0 : let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
159 0 : num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
160 0 : ));
161 0 : let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
162 0 :
163 0 : tokio::spawn({
164 0 : let stats = Arc::clone(&live_stats);
165 0 : let start_work_barrier = Arc::clone(&start_work_barrier);
166 0 : async move {
167 0 : start_work_barrier.wait().await;
168 : loop {
169 0 : let start = std::time::Instant::now();
170 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
171 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
172 0 : let elapsed = start.elapsed();
173 0 : info!(
174 0 : "RPS: {:.0}",
175 0 : completed_requests as f64 / elapsed.as_secs_f64()
176 0 : );
177 : }
178 0 : }
179 0 : });
180 0 :
181 0 : let mut work_senders = HashMap::new();
182 0 : let mut tasks = Vec::new();
183 0 : for tl in &timelines {
184 0 : let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
185 0 : work_senders.insert(tl, sender);
186 0 : tasks.push(tokio::spawn(client(
187 0 : args,
188 0 : *tl,
189 0 : Arc::clone(&start_work_barrier),
190 0 : receiver,
191 0 : Arc::clone(&all_work_done_barrier),
192 0 : Arc::clone(&live_stats),
193 0 : )));
194 0 : }
195 :
196 0 : let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = match args.per_target_rate_limit {
197 0 : None => Box::pin(async move {
198 0 : let weights = rand::distributions::weighted::WeightedIndex::new(
199 0 : all_ranges.iter().map(|v| v.len()),
200 0 : )
201 0 : .unwrap();
202 0 :
203 0 : start_work_barrier.wait().await;
204 :
205 : loop {
206 0 : let (timeline, req) = {
207 0 : let mut rng = rand::thread_rng();
208 0 : let r = &all_ranges[weights.sample(&mut rng)];
209 0 : let key: i128 = rng.gen_range(r.start..r.end);
210 0 : let key = repository::Key::from_i128(key);
211 0 : let (rel_tag, block_no) =
212 0 : key_to_rel_block(key).expect("we filter non-rel-block keys out above");
213 0 : (
214 0 : r.timeline,
215 0 : PagestreamGetPageRequest {
216 0 : latest: rng.gen_bool(args.req_latest_probability),
217 0 : lsn: r.timeline_lsn,
218 0 : rel: rel_tag,
219 0 : blkno: block_no,
220 0 : },
221 0 : )
222 0 : };
223 0 : let sender = work_senders.get(&timeline).unwrap();
224 0 : // TODO: what if this blocks?
225 0 : sender.send(req).await.ok().unwrap();
226 : }
227 0 : }),
228 0 : Some(rps_limit) => Box::pin(async move {
229 0 : let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
230 0 :
231 0 : let make_timeline_task: &dyn Fn(
232 0 : TenantTimelineId,
233 0 : )
234 0 : -> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
235 0 : let sender = work_senders.get(&timeline).unwrap();
236 0 : let ranges: Vec<KeyRange> = all_ranges
237 0 : .iter()
238 0 : .filter(|r| r.timeline == timeline)
239 0 : .cloned()
240 0 : .collect();
241 0 : let weights = rand::distributions::weighted::WeightedIndex::new(
242 0 : ranges.iter().map(|v| v.len()),
243 0 : )
244 0 : .unwrap();
245 0 :
246 0 : Box::pin(async move {
247 0 : let mut ticker = tokio::time::interval(period);
248 0 : ticker.set_missed_tick_behavior(
249 0 : /* TODO review this choice */
250 0 : tokio::time::MissedTickBehavior::Burst,
251 0 : );
252 : loop {
253 0 : ticker.tick().await;
254 0 : let req = {
255 0 : let mut rng = rand::thread_rng();
256 0 : let r = &ranges[weights.sample(&mut rng)];
257 0 : let key: i128 = rng.gen_range(r.start..r.end);
258 0 : let key = repository::Key::from_i128(key);
259 0 : let (rel_tag, block_no) = key_to_rel_block(key)
260 0 : .expect("we filter non-rel-block keys out above");
261 0 : PagestreamGetPageRequest {
262 0 : latest: rng.gen_bool(args.req_latest_probability),
263 0 : lsn: r.timeline_lsn,
264 0 : rel: rel_tag,
265 0 : blkno: block_no,
266 0 : }
267 0 : };
268 0 : sender.send(req).await.ok().unwrap();
269 : }
270 0 : })
271 0 : };
272 :
273 0 : let tasks: Vec<_> = work_senders
274 0 : .keys()
275 0 : .map(|tl| make_timeline_task(**tl))
276 0 : .collect();
277 0 :
278 0 : start_work_barrier.wait().await;
279 :
280 0 : join_all(tasks).await;
281 0 : }),
282 : };
283 :
284 0 : if let Some(runtime) = args.runtime {
285 0 : match tokio::time::timeout(runtime.into(), work_sender).await {
286 0 : Ok(()) => unreachable!("work sender never terminates"),
287 0 : Err(_timeout) => {
288 0 : // this implicitly drops the work_senders, making all the clients exit
289 0 : }
290 : }
291 : } else {
292 0 : work_sender.await;
293 0 : unreachable!("work sender never terminates");
294 : }
295 :
296 0 : for t in tasks {
297 0 : t.await.unwrap();
298 : }
299 :
300 0 : let output = Output {
301 : total: {
302 0 : let mut agg_stats = request_stats::Stats::new();
303 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
304 0 : let stats = stats.lock().unwrap();
305 0 : agg_stats.add(&stats);
306 0 : }
307 0 : agg_stats.output()
308 0 : },
309 0 : };
310 0 :
311 0 : let output = serde_json::to_string_pretty(&output).unwrap();
312 0 : println!("{output}");
313 0 :
314 0 : anyhow::Ok(())
315 0 : }
316 :
317 0 : #[instrument(skip_all)]
318 : async fn client(
319 : args: &'static Args,
320 : timeline: TenantTimelineId,
321 : start_work_barrier: Arc<Barrier>,
322 : mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
323 : all_work_done_barrier: Arc<Barrier>,
324 : live_stats: Arc<LiveStats>,
325 : ) {
326 : start_work_barrier.wait().await;
327 :
328 : let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
329 : .await
330 : .unwrap();
331 : let mut client = client
332 : .pagestream(timeline.tenant_id, timeline.timeline_id)
333 : .await
334 : .unwrap();
335 :
336 : while let Some(req) = work.recv().await {
337 : let start = Instant::now();
338 : client
339 : .getpage(req)
340 : .await
341 0 : .with_context(|| format!("getpage for {timeline}"))
342 : .unwrap();
343 : let elapsed = start.elapsed();
344 : live_stats.inc();
345 0 : STATS.with(|stats| {
346 0 : stats.borrow().lock().unwrap().observe(elapsed).unwrap();
347 0 : });
348 : }
349 :
350 : all_work_done_barrier.wait().await;
351 : }
|