Line data Source code
1 : use anyhow::Context;
2 : use camino::Utf8PathBuf;
3 : use futures::future::join_all;
4 : use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
5 : use pageserver_api::keyspace::KeySpaceAccum;
6 : use pageserver_api::models::PagestreamGetPageRequest;
7 :
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::TenantTimelineId;
10 : use utils::lsn::Lsn;
11 :
12 : use rand::prelude::*;
13 : use tokio::sync::Barrier;
14 : use tokio::task::JoinSet;
15 : use tracing::{info, instrument};
16 :
17 : use std::collections::{HashMap, HashSet};
18 : use std::future::Future;
19 : use std::num::NonZeroUsize;
20 : use std::pin::Pin;
21 : use std::sync::atomic::{AtomicU64, Ordering};
22 : use std::sync::{Arc, Mutex};
23 : use std::time::{Duration, Instant};
24 :
25 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
26 : use crate::util::{request_stats, tokio_thread_local_stats};
27 :
28 : /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
29 0 : #[derive(clap::Parser)]
30 : pub(crate) struct Args {
31 : #[clap(long, default_value = "http://localhost:9898")]
32 0 : mgmt_api_endpoint: String,
33 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
34 0 : page_service_connstring: String,
35 : #[clap(long)]
36 : pageserver_jwt: Option<String>,
37 : #[clap(long, default_value = "1")]
38 0 : num_clients: NonZeroUsize,
39 : #[clap(long)]
40 : runtime: Option<humantime::Duration>,
41 : #[clap(long)]
42 : per_target_rate_limit: Option<usize>,
43 : /// Probability for sending `latest=true` in the request (uniform distribution).
44 : #[clap(long, default_value = "1")]
45 0 : req_latest_probability: f64,
46 : #[clap(long)]
47 : limit_to_first_n_targets: Option<usize>,
48 : /// For large pageserver installations, enumerating the keyspace takes a lot of time.
49 : /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result.
50 : /// The cache is tagged and auto-invalided by the tenant/timeline ids only.
51 : /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
52 : #[clap(long)]
53 : keyspace_cache: Option<Utf8PathBuf>,
54 : /// Before starting the benchmark, live-reconfigure the pageserver to use the given
55 : /// [`pageserver_api::models::virtual_file::IoEngineKind`].
56 : #[clap(long)]
57 : set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
58 0 : targets: Option<Vec<TenantTimelineId>>,
59 : }
60 :
61 0 : #[derive(Debug, Default)]
62 : struct LiveStats {
63 : completed_requests: AtomicU64,
64 : }
65 :
66 : impl LiveStats {
67 0 : fn inc(&self) {
68 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
69 0 : }
70 : }
71 :
72 0 : #[derive(Clone, serde::Serialize, serde::Deserialize)]
73 : struct KeyRange {
74 : timeline: TenantTimelineId,
75 : timeline_lsn: Lsn,
76 : start: i128,
77 : end: i128,
78 : }
79 :
80 : impl KeyRange {
81 0 : fn len(&self) -> i128 {
82 0 : self.end - self.start
83 0 : }
84 : }
85 :
86 0 : #[derive(PartialEq, Eq, Hash, Copy, Clone)]
87 : struct WorkerId {
88 : timeline: TenantTimelineId,
89 : num_client: usize, // from 0..args.num_clients
90 : }
91 :
92 0 : #[derive(serde::Serialize)]
93 : struct Output {
94 : total: request_stats::Output,
95 : }
96 :
97 0 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
98 :
99 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
100 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
101 0 : main_impl(args, thread_local_stats)
102 0 : })
103 0 : }
104 :
105 0 : async fn main_impl(
106 0 : args: Args,
107 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
108 0 : ) -> anyhow::Result<()> {
109 0 : let args: &'static Args = Box::leak(Box::new(args));
110 0 :
111 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
112 0 : args.mgmt_api_endpoint.clone(),
113 0 : args.pageserver_jwt.as_deref(),
114 0 : ));
115 :
116 0 : if let Some(engine_str) = &args.set_io_engine {
117 0 : mgmt_api_client.put_io_engine(engine_str).await?;
118 0 : }
119 :
120 : // discover targets
121 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
122 0 : &mgmt_api_client,
123 0 : crate::util::cli::targets::Spec {
124 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
125 0 : targets: args.targets.clone(),
126 0 : },
127 0 : )
128 0 : .await?;
129 :
130 0 : #[derive(serde::Deserialize)]
131 : struct KeyspaceCacheDe {
132 : tag: Vec<TenantTimelineId>,
133 : data: Vec<KeyRange>,
134 : }
135 0 : #[derive(serde::Serialize)]
136 : struct KeyspaceCacheSer<'a> {
137 : tag: &'a [TenantTimelineId],
138 : data: &'a [KeyRange],
139 : }
140 0 : let cache = args
141 0 : .keyspace_cache
142 0 : .as_ref()
143 0 : .map(|keyspace_cache_file| {
144 0 : let contents = match std::fs::read(keyspace_cache_file) {
145 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
146 0 : return anyhow::Ok(None);
147 : }
148 0 : x => x.context("read keyspace cache file")?,
149 : };
150 0 : let cache: KeyspaceCacheDe =
151 0 : serde_json::from_slice(&contents).context("deserialize cache file")?;
152 0 : let tag_ok = HashSet::<TenantTimelineId>::from_iter(cache.tag.into_iter())
153 0 : == HashSet::from_iter(timelines.iter().cloned());
154 0 : info!("keyspace cache file matches tag: {tag_ok}");
155 0 : anyhow::Ok(if tag_ok { Some(cache.data) } else { None })
156 0 : })
157 0 : .transpose()?
158 0 : .flatten();
159 0 : let all_ranges: Vec<KeyRange> = if let Some(cached) = cache {
160 0 : info!("using keyspace cache file");
161 0 : cached
162 : } else {
163 0 : let mut js = JoinSet::new();
164 0 : for timeline in &timelines {
165 0 : js.spawn({
166 0 : let mgmt_api_client = Arc::clone(&mgmt_api_client);
167 0 : let timeline = *timeline;
168 0 : async move {
169 0 : let partitioning = mgmt_api_client
170 0 : .keyspace(timeline.tenant_id, timeline.timeline_id)
171 0 : .await?;
172 0 : let lsn = partitioning.at_lsn;
173 0 : let start = Instant::now();
174 0 : let mut filtered = KeySpaceAccum::new();
175 : // let's hope this is inlined and vectorized...
176 : // TODO: turn this loop into a is_rel_block_range() function.
177 0 : for r in partitioning.keys.ranges.iter() {
178 0 : let mut i = r.start;
179 0 : while i != r.end {
180 0 : if is_rel_block_key(&i) {
181 0 : filtered.add_key(i);
182 0 : }
183 0 : i = i.next();
184 : }
185 : }
186 0 : let filtered = filtered.to_keyspace();
187 0 : let filter_duration = start.elapsed();
188 0 :
189 0 : anyhow::Ok((
190 0 : filter_duration,
191 0 : filtered.ranges.into_iter().map(move |r| KeyRange {
192 0 : timeline,
193 0 : timeline_lsn: lsn,
194 0 : start: r.start.to_i128(),
195 0 : end: r.end.to_i128(),
196 0 : }),
197 0 : ))
198 0 : }
199 0 : });
200 0 : }
201 0 : let mut total_filter_duration = Duration::from_secs(0);
202 0 : let mut all_ranges: Vec<KeyRange> = Vec::new();
203 0 : while let Some(res) = js.join_next().await {
204 0 : let (filter_duration, range) = res.unwrap().unwrap();
205 0 : all_ranges.extend(range);
206 0 : total_filter_duration += filter_duration;
207 0 : }
208 0 : info!("filter duration: {}", total_filter_duration.as_secs_f64());
209 0 : if let Some(cachefile) = args.keyspace_cache.as_ref() {
210 0 : let cache = KeyspaceCacheSer {
211 0 : tag: &timelines,
212 0 : data: &all_ranges,
213 0 : };
214 0 : let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?;
215 0 : std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?;
216 0 : info!("successfully wrote keyspace cache file");
217 0 : }
218 0 : all_ranges
219 : };
220 :
221 0 : let live_stats = Arc::new(LiveStats::default());
222 0 :
223 0 : let num_client_tasks = args.num_clients.get() * timelines.len();
224 0 : let num_live_stats_dump = 1;
225 0 : let num_work_sender_tasks = 1;
226 0 : let num_main_impl = 1;
227 0 :
228 0 : let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
229 0 : num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl,
230 0 : ));
231 0 :
232 0 : tokio::spawn({
233 0 : let stats = Arc::clone(&live_stats);
234 0 : let start_work_barrier = Arc::clone(&start_work_barrier);
235 0 : async move {
236 0 : start_work_barrier.wait().await;
237 : loop {
238 0 : let start = std::time::Instant::now();
239 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
240 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
241 0 : let elapsed = start.elapsed();
242 0 : info!(
243 0 : "RPS: {:.0}",
244 0 : completed_requests as f64 / elapsed.as_secs_f64()
245 0 : );
246 : }
247 0 : }
248 0 : });
249 0 :
250 0 : let cancel = CancellationToken::new();
251 0 :
252 0 : let mut work_senders: HashMap<WorkerId, _> = HashMap::new();
253 0 : let mut tasks = Vec::new();
254 0 : for timeline in timelines.iter().cloned() {
255 0 : for num_client in 0..args.num_clients.get() {
256 0 : let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
257 0 : let worker_id = WorkerId {
258 0 : timeline,
259 0 : num_client,
260 0 : };
261 0 : work_senders.insert(worker_id, sender);
262 0 : tasks.push(tokio::spawn(client(
263 0 : args,
264 0 : worker_id,
265 0 : Arc::clone(&start_work_barrier),
266 0 : receiver,
267 0 : Arc::clone(&live_stats),
268 0 : cancel.clone(),
269 0 : )));
270 0 : }
271 : }
272 :
273 0 : let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = {
274 0 : let start_work_barrier = start_work_barrier.clone();
275 0 : let cancel = cancel.clone();
276 0 : match args.per_target_rate_limit {
277 0 : None => Box::pin(async move {
278 0 : let weights = rand::distributions::weighted::WeightedIndex::new(
279 0 : all_ranges.iter().map(|v| v.len()),
280 0 : )
281 0 : .unwrap();
282 0 :
283 0 : start_work_barrier.wait().await;
284 :
285 0 : while !cancel.is_cancelled() {
286 0 : let (timeline, req) = {
287 0 : let mut rng = rand::thread_rng();
288 0 : let r = &all_ranges[weights.sample(&mut rng)];
289 0 : let key: i128 = rng.gen_range(r.start..r.end);
290 0 : let key = Key::from_i128(key);
291 0 : let (rel_tag, block_no) =
292 0 : key_to_rel_block(key).expect("we filter non-rel-block keys out above");
293 0 : (
294 0 : WorkerId {
295 0 : timeline: r.timeline,
296 0 : num_client: rng.gen_range(0..args.num_clients.get()),
297 0 : },
298 0 : PagestreamGetPageRequest {
299 0 : latest: rng.gen_bool(args.req_latest_probability),
300 0 : lsn: r.timeline_lsn,
301 0 : rel: rel_tag,
302 0 : blkno: block_no,
303 0 : },
304 0 : )
305 0 : };
306 0 : let sender = work_senders.get(&timeline).unwrap();
307 0 : // TODO: what if this blocks?
308 0 : if sender.send(req).await.is_err() {
309 0 : assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
310 0 : }
311 : }
312 0 : }),
313 0 : Some(rps_limit) => Box::pin(async move {
314 0 : let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
315 0 : let make_task: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
316 0 : &|worker_id| {
317 0 : let sender = work_senders.get(&worker_id).unwrap();
318 0 : let ranges: Vec<KeyRange> = all_ranges
319 0 : .iter()
320 0 : .filter(|r| r.timeline == worker_id.timeline)
321 0 : .cloned()
322 0 : .collect();
323 0 : let weights = rand::distributions::weighted::WeightedIndex::new(
324 0 : ranges.iter().map(|v| v.len()),
325 0 : )
326 0 : .unwrap();
327 0 :
328 0 : let cancel = cancel.clone();
329 0 : Box::pin(async move {
330 0 : let mut ticker = tokio::time::interval(period);
331 0 : ticker.set_missed_tick_behavior(
332 0 : /* TODO review this choice */
333 0 : tokio::time::MissedTickBehavior::Burst,
334 0 : );
335 0 : while !cancel.is_cancelled() {
336 0 : ticker.tick().await;
337 0 : let req = {
338 0 : let mut rng = rand::thread_rng();
339 0 : let r = &ranges[weights.sample(&mut rng)];
340 0 : let key: i128 = rng.gen_range(r.start..r.end);
341 0 : let key = Key::from_i128(key);
342 0 : assert!(is_rel_block_key(&key));
343 0 : let (rel_tag, block_no) = key_to_rel_block(key)
344 0 : .expect("we filter non-rel-block keys out above");
345 0 : PagestreamGetPageRequest {
346 0 : latest: rng.gen_bool(args.req_latest_probability),
347 0 : lsn: r.timeline_lsn,
348 0 : rel: rel_tag,
349 0 : blkno: block_no,
350 0 : }
351 0 : };
352 0 : if sender.send(req).await.is_err() {
353 0 : assert!(
354 0 : cancel.is_cancelled(),
355 0 : "client has gone away unexpectedly"
356 : );
357 0 : }
358 : }
359 0 : })
360 0 : };
361 :
362 0 : let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect();
363 0 :
364 0 : start_work_barrier.wait().await;
365 :
366 0 : join_all(tasks).await;
367 0 : }),
368 : }
369 : };
370 :
371 0 : let work_sender_task = tokio::spawn(work_sender);
372 :
373 0 : info!("waiting for everything to become ready");
374 0 : start_work_barrier.wait().await;
375 0 : info!("work started");
376 0 : if let Some(runtime) = args.runtime {
377 0 : tokio::time::sleep(runtime.into()).await;
378 0 : info!("runtime over, signalling cancellation");
379 0 : cancel.cancel();
380 0 : work_sender_task.await.unwrap();
381 0 : info!("work sender exited");
382 : } else {
383 0 : work_sender_task.await.unwrap();
384 0 : unreachable!("work sender never terminates");
385 : }
386 :
387 0 : info!("joining clients");
388 0 : for t in tasks {
389 0 : t.await.unwrap();
390 : }
391 :
392 0 : info!("all clients stopped");
393 :
394 0 : let output = Output {
395 : total: {
396 0 : let mut agg_stats = request_stats::Stats::new();
397 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
398 0 : let stats = stats.lock().unwrap();
399 0 : agg_stats.add(&stats);
400 0 : }
401 0 : agg_stats.output()
402 0 : },
403 0 : };
404 0 :
405 0 : let output = serde_json::to_string_pretty(&output).unwrap();
406 0 : println!("{output}");
407 0 :
408 0 : anyhow::Ok(())
409 0 : }
410 :
411 0 : #[instrument(skip_all)]
412 : async fn client(
413 : args: &'static Args,
414 : id: WorkerId,
415 : start_work_barrier: Arc<Barrier>,
416 : mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
417 : live_stats: Arc<LiveStats>,
418 : cancel: CancellationToken,
419 : ) {
420 : let WorkerId {
421 : timeline,
422 : num_client: _,
423 : } = id;
424 : let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
425 : .await
426 : .unwrap();
427 : let mut client = client
428 : .pagestream(timeline.tenant_id, timeline.timeline_id)
429 : .await
430 : .unwrap();
431 :
432 0 : let do_requests = async {
433 0 : start_work_barrier.wait().await;
434 0 : while let Some(req) = work.recv().await {
435 0 : let start = Instant::now();
436 0 : client
437 0 : .getpage(req)
438 0 : .await
439 0 : .with_context(|| format!("getpage for {timeline}"))
440 0 : .unwrap();
441 0 : let elapsed = start.elapsed();
442 0 : live_stats.inc();
443 0 : STATS.with(|stats| {
444 0 : stats.borrow().lock().unwrap().observe(elapsed).unwrap();
445 0 : });
446 : }
447 0 : };
448 0 : tokio::select! {
449 0 : res = do_requests => { res },
450 : _ = cancel.cancelled() => {
451 : // fallthrough to shutdown
452 : }
453 : }
454 : client.shutdown().await;
455 : }
|