Line data Source code
1 : use std::collections::HashMap;
2 : use std::num::NonZeroUsize;
3 : use std::ops::Range;
4 : use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
5 : use std::sync::{Arc, Mutex};
6 : use std::time::Instant;
7 :
8 : use anyhow::Context;
9 : use pageserver_api::shard::TenantShardId;
10 : use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
11 : use pageserver_client::page_service::BasebackupRequest;
12 : use rand::prelude::*;
13 : use tokio::sync::Barrier;
14 : use tokio::task::JoinSet;
15 : use tracing::{info, instrument};
16 : use utils::id::TenantTimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
20 : use crate::util::{request_stats, tokio_thread_local_stats};
21 :
22 : /// basebackup@LatestLSN
23 : #[derive(clap::Parser)]
24 : pub(crate) struct Args {
25 : #[clap(long, default_value = "http://localhost:9898")]
26 0 : mgmt_api_endpoint: String,
27 : #[clap(long, default_value = "postgres://postgres@localhost:64000")]
28 0 : page_service_connstring: String,
29 : #[clap(long)]
30 : pageserver_jwt: Option<String>,
31 : #[clap(long, default_value = "1")]
32 0 : num_clients: NonZeroUsize,
33 : #[clap(long, default_value = "1.0")]
34 0 : gzip_probability: f64,
35 : #[clap(long)]
36 : runtime: Option<humantime::Duration>,
37 : #[clap(long)]
38 : limit_to_first_n_targets: Option<usize>,
39 0 : targets: Option<Vec<TenantTimelineId>>,
40 : }
41 :
42 : #[derive(Debug, Default)]
43 : struct LiveStats {
44 : completed_requests: AtomicU64,
45 : }
46 :
47 : impl LiveStats {
48 0 : fn inc(&self) {
49 0 : self.completed_requests.fetch_add(1, Ordering::Relaxed);
50 0 : }
51 : }
52 :
53 : struct Target {
54 : timeline: TenantTimelineId,
55 : lsn_range: Option<Range<Lsn>>,
56 : }
57 :
58 : #[derive(serde::Serialize)]
59 : struct Output {
60 : total: request_stats::Output,
61 : }
62 :
63 : tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
64 :
65 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
66 0 : tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
67 0 : main_impl(args, thread_local_stats)
68 0 : })
69 0 : }
70 :
71 0 : async fn main_impl(
72 0 : args: Args,
73 0 : all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
74 0 : ) -> anyhow::Result<()> {
75 0 : let args: &'static Args = Box::leak(Box::new(args));
76 0 :
77 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
78 0 : args.mgmt_api_endpoint.clone(),
79 0 : args.pageserver_jwt.as_deref(),
80 0 : ));
81 :
82 : // discover targets
83 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
84 0 : &mgmt_api_client,
85 0 : crate::util::cli::targets::Spec {
86 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
87 0 : targets: args.targets.clone(),
88 0 : },
89 0 : )
90 0 : .await?;
91 0 : let mut js = JoinSet::new();
92 0 : for timeline in &timelines {
93 0 : js.spawn({
94 0 : let timeline = *timeline;
95 0 : let info = mgmt_api_client
96 0 : .timeline_info(
97 0 : TenantShardId::unsharded(timeline.tenant_id),
98 0 : timeline.timeline_id,
99 0 : ForceAwaitLogicalSize::No,
100 0 : )
101 0 : .await
102 0 : .unwrap();
103 0 : async move {
104 0 : anyhow::Ok(Target {
105 0 : timeline,
106 0 : // TODO: support lsn_range != latest LSN
107 0 : lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
108 0 : })
109 0 : }
110 0 : });
111 0 : }
112 0 : let mut all_targets: Vec<Target> = Vec::new();
113 0 : while let Some(res) = js.join_next().await {
114 0 : all_targets.push(res.unwrap().unwrap());
115 0 : }
116 :
117 0 : let live_stats = Arc::new(LiveStats::default());
118 0 :
119 0 : let num_client_tasks = timelines.len();
120 0 : let num_live_stats_dump = 1;
121 0 : let num_work_sender_tasks = 1;
122 0 :
123 0 : let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
124 0 : num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
125 0 : ));
126 0 : let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
127 0 :
128 0 : tokio::spawn({
129 0 : let stats = Arc::clone(&live_stats);
130 0 : let start_work_barrier = Arc::clone(&start_work_barrier);
131 0 : async move {
132 0 : start_work_barrier.wait().await;
133 : loop {
134 0 : let start = std::time::Instant::now();
135 0 : tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 0 : let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
137 0 : let elapsed = start.elapsed();
138 0 : info!(
139 0 : "RPS: {:.0}",
140 0 : completed_requests as f64 / elapsed.as_secs_f64()
141 : );
142 : }
143 0 : }
144 0 : });
145 0 :
146 0 : let mut work_senders = HashMap::new();
147 0 : let mut tasks = Vec::new();
148 0 : for tl in &timelines {
149 0 : let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
150 0 : work_senders.insert(tl, sender);
151 0 : tasks.push(tokio::spawn(client(
152 0 : args,
153 0 : *tl,
154 0 : Arc::clone(&start_work_barrier),
155 0 : receiver,
156 0 : Arc::clone(&all_work_done_barrier),
157 0 : Arc::clone(&live_stats),
158 0 : )));
159 0 : }
160 :
161 0 : let work_sender = async move {
162 0 : start_work_barrier.wait().await;
163 : loop {
164 0 : let (timeline, work) = {
165 0 : let mut rng = rand::thread_rng();
166 0 : let target = all_targets.choose(&mut rng).unwrap();
167 0 : let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
168 0 : (
169 0 : target.timeline,
170 0 : Work {
171 0 : lsn,
172 0 : gzip: rng.gen_bool(args.gzip_probability),
173 0 : },
174 0 : )
175 0 : };
176 0 : let sender = work_senders.get(&timeline).unwrap();
177 0 : // TODO: what if this blocks?
178 0 : sender.send(work).await.ok().unwrap();
179 : }
180 : };
181 :
182 0 : if let Some(runtime) = args.runtime {
183 0 : match tokio::time::timeout(runtime.into(), work_sender).await {
184 0 : Ok(()) => unreachable!("work sender never terminates"),
185 0 : Err(_timeout) => {
186 0 : // this implicitly drops the work_senders, making all the clients exit
187 0 : }
188 : }
189 : } else {
190 0 : work_sender.await;
191 0 : unreachable!("work sender never terminates");
192 : }
193 :
194 0 : for t in tasks {
195 0 : t.await.unwrap();
196 : }
197 :
198 0 : let output = Output {
199 : total: {
200 0 : let mut agg_stats = request_stats::Stats::new();
201 0 : for stats in all_thread_local_stats.lock().unwrap().iter() {
202 0 : let stats = stats.lock().unwrap();
203 0 : agg_stats.add(&stats);
204 0 : }
205 0 : agg_stats.output()
206 0 : },
207 0 : };
208 0 :
209 0 : let output = serde_json::to_string_pretty(&output).unwrap();
210 0 : println!("{output}");
211 0 :
212 0 : anyhow::Ok(())
213 0 : }
214 :
215 : #[derive(Copy, Clone)]
216 : struct Work {
217 : lsn: Option<Lsn>,
218 : gzip: bool,
219 : }
220 :
221 : #[instrument(skip_all)]
222 : async fn client(
223 : args: &'static Args,
224 : timeline: TenantTimelineId,
225 : start_work_barrier: Arc<Barrier>,
226 : mut work: tokio::sync::mpsc::Receiver<Work>,
227 : all_work_done_barrier: Arc<Barrier>,
228 : live_stats: Arc<LiveStats>,
229 : ) {
230 : start_work_barrier.wait().await;
231 :
232 : let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
233 : .await
234 : .unwrap();
235 :
236 : while let Some(Work { lsn, gzip }) = work.recv().await {
237 : let start = Instant::now();
238 : let copy_out_stream = client
239 : .basebackup(&BasebackupRequest {
240 : tenant_id: timeline.tenant_id,
241 : timeline_id: timeline.timeline_id,
242 : lsn,
243 : gzip,
244 : })
245 : .await
246 0 : .with_context(|| format!("start basebackup for {timeline}"))
247 : .unwrap();
248 :
249 : use futures::StreamExt;
250 : let size = Arc::new(AtomicUsize::new(0));
251 : copy_out_stream
252 : .for_each({
253 0 : |r| {
254 0 : let size = Arc::clone(&size);
255 0 : async move {
256 0 : let size = Arc::clone(&size);
257 0 : size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
258 0 : }
259 0 : }
260 : })
261 : .await;
262 : info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
263 : let elapsed = start.elapsed();
264 : live_stats.inc();
265 0 : STATS.with(|stats| {
266 0 : stats.borrow().lock().unwrap().observe(elapsed).unwrap();
267 0 : });
268 : }
269 :
270 : all_work_done_barrier.wait().await;
271 : }
|