Line data Source code
1 : use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId};
2 :
3 : use pageserver_client::mgmt_api;
4 : use rand::seq::SliceRandom;
5 : use tracing::{debug, info};
6 : use utils::id::{TenantTimelineId, TimelineId};
7 :
8 : use tokio::{
9 : sync::{mpsc, OwnedSemaphorePermit},
10 : task::JoinSet,
11 : };
12 :
13 : use std::{
14 : num::NonZeroUsize,
15 : sync::{
16 : atomic::{AtomicU64, Ordering},
17 : Arc,
18 : },
19 : time::{Duration, Instant},
20 : };
21 :
22 : /// Evict & on-demand download random layers.
23 0 : #[derive(clap::Parser)]
24 : pub(crate) struct Args {
25 : #[clap(long, default_value = "http://localhost:9898")]
26 0 : mgmt_api_endpoint: String,
27 : #[clap(long)]
28 : pageserver_jwt: Option<String>,
29 : #[clap(long)]
30 : runtime: Option<humantime::Duration>,
31 : #[clap(long, default_value = "1")]
32 0 : tasks_per_target: NonZeroUsize,
33 : #[clap(long, default_value = "1")]
34 0 : concurrency_per_target: NonZeroUsize,
35 : /// Probability for sending `latest=true` in the request (uniform distribution).
36 : #[clap(long)]
37 : limit_to_first_n_targets: Option<usize>,
38 : /// Before starting the benchmark, live-reconfigure the pageserver to use the given
39 : /// [`pageserver_api::models::virtual_file::IoEngineKind`].
40 : #[clap(long)]
41 : set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
42 0 : targets: Option<Vec<TenantTimelineId>>,
43 : }
44 :
45 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
46 0 : let rt = tokio::runtime::Builder::new_multi_thread()
47 0 : .enable_all()
48 0 : .build()?;
49 0 : let task = rt.spawn(main_impl(args));
50 0 : rt.block_on(task).unwrap().unwrap();
51 0 : Ok(())
52 0 : }
53 :
54 : #[derive(Debug, Default)]
55 : struct LiveStats {
56 : evictions: AtomicU64,
57 : downloads: AtomicU64,
58 : timeline_restarts: AtomicU64,
59 : }
60 :
61 : impl LiveStats {
62 0 : fn eviction_done(&self) {
63 0 : self.evictions.fetch_add(1, Ordering::Relaxed);
64 0 : }
65 0 : fn download_done(&self) {
66 0 : self.downloads.fetch_add(1, Ordering::Relaxed);
67 0 : }
68 0 : fn timeline_restart_done(&self) {
69 0 : self.timeline_restarts.fetch_add(1, Ordering::Relaxed);
70 0 : }
71 : }
72 :
73 0 : async fn main_impl(args: Args) -> anyhow::Result<()> {
74 0 : let args: &'static Args = Box::leak(Box::new(args));
75 0 :
76 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
77 0 : args.mgmt_api_endpoint.clone(),
78 0 : args.pageserver_jwt.as_deref(),
79 0 : ));
80 :
81 0 : if let Some(engine_str) = &args.set_io_engine {
82 0 : mgmt_api_client.put_io_engine(engine_str).await?;
83 0 : }
84 :
85 : // discover targets
86 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
87 0 : &mgmt_api_client,
88 0 : crate::util::cli::targets::Spec {
89 0 : limit_to_first_n_targets: args.limit_to_first_n_targets,
90 0 : targets: args.targets.clone(),
91 0 : },
92 0 : )
93 0 : .await?;
94 :
95 0 : let mut tasks = JoinSet::new();
96 0 :
97 0 : let live_stats = Arc::new(LiveStats::default());
98 0 : tasks.spawn({
99 0 : let live_stats = Arc::clone(&live_stats);
100 0 : async move {
101 0 : let mut last_at = Instant::now();
102 : loop {
103 0 : tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await;
104 0 : let now = Instant::now();
105 0 : let delta: Duration = now - last_at;
106 0 : last_at = now;
107 0 :
108 0 : let LiveStats {
109 0 : evictions,
110 0 : downloads,
111 0 : timeline_restarts,
112 0 : } = &*live_stats;
113 0 : let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
114 0 : let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
115 0 : let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed);
116 0 : info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}");
117 : }
118 0 : }
119 0 : });
120 :
121 0 : for tl in timelines {
122 0 : for _ in 0..args.tasks_per_target.get() {
123 0 : tasks.spawn(timeline_actor(
124 0 : args,
125 0 : Arc::clone(&mgmt_api_client),
126 0 : tl,
127 0 : Arc::clone(&live_stats),
128 0 : ));
129 0 : }
130 : }
131 :
132 0 : while let Some(res) = tasks.join_next().await {
133 0 : res.unwrap();
134 0 : }
135 0 : Ok(())
136 0 : }
137 :
138 0 : async fn timeline_actor(
139 0 : args: &'static Args,
140 0 : mgmt_api_client: Arc<pageserver_client::mgmt_api::Client>,
141 0 : timeline: TenantTimelineId,
142 0 : live_stats: Arc<LiveStats>,
143 0 : ) {
144 0 : // TODO: support sharding
145 0 : let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
146 :
147 : struct Timeline {
148 : joinset: JoinSet<()>,
149 : layers: Vec<mpsc::Sender<OwnedSemaphorePermit>>,
150 : concurrency: Arc<tokio::sync::Semaphore>,
151 : }
152 0 : loop {
153 0 : debug!("restarting timeline");
154 0 : let layer_map_info = mgmt_api_client
155 0 : .layer_map_info(tenant_shard_id, timeline.timeline_id)
156 0 : .await
157 0 : .unwrap();
158 0 : let concurrency = Arc::new(tokio::sync::Semaphore::new(
159 0 : args.concurrency_per_target.get(),
160 0 : ));
161 0 :
162 0 : let mut joinset = JoinSet::new();
163 0 : let layers = layer_map_info
164 0 : .historic_layers
165 0 : .into_iter()
166 0 : .map(|historic_layer| {
167 0 : let (tx, rx) = mpsc::channel(1);
168 0 : joinset.spawn(layer_actor(
169 0 : tenant_shard_id,
170 0 : timeline.timeline_id,
171 0 : historic_layer,
172 0 : rx,
173 0 : Arc::clone(&mgmt_api_client),
174 0 : Arc::clone(&live_stats),
175 0 : ));
176 0 : tx
177 0 : })
178 0 : .collect::<Vec<_>>();
179 0 :
180 0 : let mut timeline = Timeline {
181 0 : joinset,
182 0 : layers,
183 0 : concurrency,
184 0 : };
185 0 :
186 0 : live_stats.timeline_restart_done();
187 :
188 0 : loop {
189 0 : assert!(!timeline.joinset.is_empty());
190 0 : if let Some(res) = timeline.joinset.try_join_next() {
191 0 : debug!(?res, "a layer actor exited, should not happen");
192 0 : timeline.joinset.shutdown().await;
193 0 : break;
194 0 : }
195 :
196 0 : let mut permit = Some(
197 0 : Arc::clone(&timeline.concurrency)
198 0 : .acquire_owned()
199 0 : .await
200 0 : .unwrap(),
201 : );
202 :
203 0 : loop {
204 0 : let layer_tx = {
205 0 : let mut rng = rand::thread_rng();
206 0 : timeline.layers.choose_mut(&mut rng).expect("no layers")
207 0 : };
208 0 : match layer_tx.try_send(permit.take().unwrap()) {
209 0 : Ok(_) => break,
210 0 : Err(e) => match e {
211 0 : mpsc::error::TrySendError::Full(back) => {
212 0 : // TODO: retrying introduces bias away from slow downloaders
213 0 : permit.replace(back);
214 0 : }
215 0 : mpsc::error::TrySendError::Closed(_) => panic!(),
216 : },
217 : }
218 : }
219 : }
220 : }
221 : }
222 :
223 0 : async fn layer_actor(
224 0 : tenant_shard_id: TenantShardId,
225 0 : timeline_id: TimelineId,
226 0 : mut layer: HistoricLayerInfo,
227 0 : mut rx: mpsc::Receiver<tokio::sync::OwnedSemaphorePermit>,
228 0 : mgmt_api_client: Arc<mgmt_api::Client>,
229 0 : live_stats: Arc<LiveStats>,
230 0 : ) {
231 : #[derive(Clone, Copy)]
232 : enum Action {
233 : Evict,
234 : OnDemandDownload,
235 : }
236 :
237 0 : while let Some(_permit) = rx.recv().await {
238 0 : let action = if layer.is_remote() {
239 0 : Action::OnDemandDownload
240 : } else {
241 0 : Action::Evict
242 : };
243 :
244 0 : let did_it = match action {
245 : Action::Evict => {
246 0 : let did_it = mgmt_api_client
247 0 : .layer_evict(tenant_shard_id, timeline_id, layer.layer_file_name())
248 0 : .await
249 0 : .unwrap();
250 0 : live_stats.eviction_done();
251 0 : did_it
252 : }
253 : Action::OnDemandDownload => {
254 0 : let did_it = mgmt_api_client
255 0 : .layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name())
256 0 : .await
257 0 : .unwrap();
258 0 : live_stats.download_done();
259 0 : did_it
260 : }
261 : };
262 0 : if !did_it {
263 0 : debug!("local copy of layer map appears out of sync, re-downloading");
264 0 : return;
265 0 : }
266 0 : debug!("did it");
267 0 : layer.set_remote(match action {
268 0 : Action::Evict => true,
269 0 : Action::OnDemandDownload => false,
270 : });
271 : }
272 0 : }
|