Line data Source code
1 : use std::collections::HashMap;
2 : use std::num::NonZeroUsize;
3 : use std::ops::Range;
4 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
5 : use std::sync::{Arc, Mutex};
6 : use std::time::Instant;
7 :
8 : use anyhow::Context;
9 : use pageserver_api::shard::TenantShardId;
10 : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
11 : use pageserver_client::page_service::BasebackupRequest;
12 : use rand::prelude::*;
13 : use tokio::sync::Barrier;
14 : use tokio::task::JoinSet;
15 : use tracing::{info, instrument};
16 : use utils::id::TenantTimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
20 : use crate::util::{request_stats, tokio_thread_local_stats};
21 :
22 : /// basebackup@LatestLSN
23 : #[derive(clap::Parser)]
24 : pub(crate) struct Args {
25 : #[clap(long, default_value = "http://localhost:9898")]
26 0 : mgmt_api_endpoint: String,
27 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
28 0 : page_service_connstring: String,
29 : #[clap(long)]
30 : pageserver_jwt: Option<String>,
31 : #[clap(long, default_value = "1")]
32 0 : num_clients: NonZeroUsize,
33 : #[clap(long, default_value = "1.0")]
34 0 : gzip_probability: f64,
35 : #[clap(long)]
36 : runtime: Option<humantime::Duration>,
37 : #[clap(long)]
38 : limit_to_first_n_targets: Option<usize>,
39 0 : targets: Option<Vec<TenantTimelineId>>,
40 : }
41 :
42 : #[derive(Debug, Default)]
43 : struct LiveStats {
44 : completed_requests: AtomicU64,
45 : }
46 :
47 : impl LiveStats {
48 0 : fn inc(&self) {
49 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
50 0 : }
51 : }
52 :
53 : struct Target {
54 : timeline: TenantTimelineId,
55 : lsn_range: Option<Range<Lsn>>,
56 : }
57 :
58 : #[derive(serde::Serialize)]
59 : struct Output {
60 : total: request_stats::Output,
61 : }
62 :
63 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
64 :
65 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
66 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
67 0 : main_impl(args, thread_local_stats)
68 0 : })
69 0 : }
70 :
71 0 : async fn main_impl(
72 0 : args: Args,
73 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
74 0 : ) -> anyhow::Result<()> {
75 0 : let args: &'static Args = Box::leak(Box::new(args));
76 :
77 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
78 0 : args.mgmt_api_endpoint.clone(),
79 0 : args.pageserver_jwt.as_deref(),
80 0 : None, // TODO: support ssl_ca_file for https APIs in pagebench.
81 0 : )?);
82 :
83 : // discover targets
84 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
85 0 : &mgmt_api_client,
86 0 : crate::util::cli::targets::Spec {
87 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
88 0 : targets: args.targets.clone(),
89 0 : },
90 0 : )
91 0 : .await?;
92 0 : let mut js = JoinSet::new();
93 0 : for timeline in &timelines {
94 0 : js.spawn({
95 0 : let timeline = *timeline;
96 0 : let info = mgmt_api_client
97 0 : .timeline_info(
98 0 : TenantShardId::unsharded(timeline.tenant_id),
99 0 : timeline.timeline_id,
100 0 : ForceAwaitLogicalSize::No,
101 0 : )
102 0 : .await
103 0 : .unwrap();
104 0 : async move {
105 0 : anyhow::Ok(Target {
106 0 : timeline,
107 0 : // TODO: support lsn_range != latest LSN
108 0 : lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
109 0 : })
110 0 : }
111 0 : });
112 0 : }
113 0 : let mut all_targets: Vec<Target> = Vec::new();
114 0 : while let Some(res) = js.join_next().await {
115 0 : all_targets.push(res.unwrap().unwrap());
116 0 : }
117 :
118 0 : let live_stats = Arc::new(LiveStats::default());
119 0 :
120 0 : let num_client_tasks = timelines.len();
121 0 : let num_live_stats_dump = 1;
122 0 : let num_work_sender_tasks = 1;
123 0 :
124 0 : let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
125 0 : num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
126 0 : ));
127 0 : let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
128 0 :
129 0 : tokio::spawn({
130 0 : let stats = Arc::clone(&live_stats);
131 0 : let start_work_barrier = Arc::clone(&start_work_barrier);
132 0 : async move {
133 0 : start_work_barrier.wait().await;
134 : loop {
135 0 : let start = std::time::Instant::now();
136 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
137 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
138 0 : let elapsed = start.elapsed();
139 0 : info!(
140 0 : "RPS: {:.0}",
141 0 : completed_requests as f64 / elapsed.as_secs_f64()
142 : );
143 : }
144 0 : }
145 0 : });
146 0 :
147 0 : let mut work_senders = HashMap::new();
148 0 : let mut tasks = Vec::new();
149 0 : for tl in &timelines {
150 0 : let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
151 0 : work_senders.insert(tl, sender);
152 0 : tasks.push(tokio::spawn(client(
153 0 : args,
154 0 : *tl,
155 0 : Arc::clone(&start_work_barrier),
156 0 : receiver,
157 0 : Arc::clone(&all_work_done_barrier),
158 0 : Arc::clone(&live_stats),
159 0 : )));
160 0 : }
161 :
162 0 : let work_sender = async move {
163 0 : start_work_barrier.wait().await;
164 : loop {
165 0 : let (timeline, work) = {
166 0 : let mut rng = rand::thread_rng();
167 0 : let target = all_targets.choose(&mut rng).unwrap();
168 0 : let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
169 0 : (
170 0 : target.timeline,
171 0 : Work {
172 0 : lsn,
173 0 : gzip: rng.gen_bool(args.gzip_probability),
174 0 : },
175 0 : )
176 0 : };
177 0 : let sender = work_senders.get(&timeline).unwrap();
178 0 : // TODO: what if this blocks?
179 0 : sender.send(work).await.ok().unwrap();
180 : }
181 : };
182 :
183 0 : if let Some(runtime) = args.runtime {
184 0 : match tokio::time::timeout(runtime.into(), work_sender).await {
185 0 : Ok(()) => unreachable!("work sender never terminates"),
186 0 : Err(_timeout) => {
187 0 : // this implicitly drops the work_senders, making all the clients exit
188 0 : }
189 : }
190 : } else {
191 0 : work_sender.await;
192 0 : unreachable!("work sender never terminates");
193 : }
194 :
195 0 : for t in tasks {
196 0 : t.await.unwrap();
197 : }
198 :
199 0 : let output = Output {
200 : total: {
201 0 : let mut agg_stats = request_stats::Stats::new();
202 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
203 0 : let stats = stats.lock().unwrap();
204 0 : agg_stats.add(&stats);
205 0 : }
206 0 : agg_stats.output()
207 0 : },
208 0 : };
209 0 :
210 0 : let output = serde_json::to_string_pretty(&output).unwrap();
211 0 : println!("{output}");
212 0 :
213 0 : anyhow::Ok(())
214 0 : }
215 :
216 : #[derive(Copy, Clone)]
217 : struct Work {
218 : lsn: Option<Lsn>,
219 : gzip: bool,
220 : }
221 :
222 : #[instrument(skip_all)]
223 : async fn client(
224 : args: &'static Args,
225 : timeline: TenantTimelineId,
226 : start_work_barrier: Arc<Barrier>,
227 : mut work: tokio::sync::mpsc::Receiver<Work>,
228 : all_work_done_barrier: Arc<Barrier>,
229 : live_stats: Arc<LiveStats>,
230 : ) {
231 : start_work_barrier.wait().await;
232 :
233 : let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
234 : .await
235 : .unwrap();
236 :
237 : while let Some(Work { lsn, gzip }) = work.recv().await {
238 : let start = Instant::now();
239 : let copy_out_stream = client
240 : .basebackup(&BasebackupRequest {
241 : tenant_id: timeline.tenant_id,
242 : timeline_id: timeline.timeline_id,
243 : lsn,
244 : gzip,
245 : })
246 : .await
247 0 : .with_context(|| format!("start basebackup for {timeline}"))
248 : .unwrap();
249 :
250 : use futures::StreamExt;
251 : let size = Arc::new(AtomicUsize::new(0));
252 : copy_out_stream
253 : .for_each({
254 0 : |r| {
255 0 : let size = Arc::clone(&size);
256 0 : async move {
257 0 : let size = Arc::clone(&size);
258 0 : size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
259 0 : }
260 0 : }
261 : })
262 : .await;
263 : info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
264 : let elapsed = start.elapsed();
265 : live_stats.inc();
266 0 : STATS.with(|stats| {
267 0 : stats.borrow().lock().unwrap().observe(elapsed).unwrap();
268 0 : });
269 : }
270 :
271 : all_work_done_barrier.wait().await;
272 : }
|