Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::anyhow;
4 : use futures::StreamExt;
5 : use tonic::transport::Endpoint;
6 : use tracing::info;
7 :
8 : use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag};
9 : use utils::id::TenantTimelineId;
10 : use utils::lsn::Lsn;
11 : use utils::shard::ShardIndex;
12 :
13 : /// Starts a large number of idle gRPC GetPage streams.
14 : #[derive(clap::Parser)]
15 : pub(crate) struct Args {
16 : /// The Pageserver to connect to. Must use grpc://.
17 : #[clap(long, default_value = "grpc://localhost:51051")]
18 : server: String,
19 : /// The Pageserver HTTP API.
20 : #[clap(long, default_value = "http://localhost:9898")]
21 : http_server: String,
22 : /// The number of streams to open.
23 : #[clap(long, default_value = "100000")]
24 : count: usize,
25 : /// Number of streams per connection.
26 : #[clap(long, default_value = "100")]
27 : per_connection: usize,
28 : /// Send a single GetPage request on each stream.
29 : #[clap(long, default_value_t = false)]
30 : send_request: bool,
31 : }
32 :
33 0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
34 0 : let rt = tokio::runtime::Builder::new_multi_thread()
35 0 : .enable_all()
36 0 : .build()?;
37 :
38 0 : rt.block_on(main_impl(args))
39 0 : }
40 :
41 0 : async fn main_impl(args: Args) -> anyhow::Result<()> {
42 : // Discover a tenant and timeline to use.
43 0 : let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
44 0 : reqwest::Client::new(),
45 0 : args.http_server.clone(),
46 0 : None,
47 : ));
48 0 : let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
49 0 : &mgmt_api_client,
50 0 : crate::util::cli::targets::Spec {
51 0 : limit_to_first_n_targets: Some(1),
52 0 : targets: None,
53 0 : },
54 0 : )
55 0 : .await?;
56 0 : let ttid = timelines
57 0 : .first()
58 0 : .ok_or_else(|| anyhow!("no timelines found"))?;
59 :
60 : // Set up the initial client.
61 0 : let endpoint = Endpoint::from_shared(args.server.clone())?;
62 :
63 0 : let connect = async || {
64 0 : pageserver_page_api::Client::new(
65 0 : endpoint.connect().await?,
66 0 : ttid.tenant_id,
67 0 : ttid.timeline_id,
68 0 : ShardIndex::unsharded(),
69 0 : None,
70 0 : None,
71 : )
72 0 : };
73 :
74 0 : let mut client = connect().await?;
75 0 : let mut streams = Vec::with_capacity(args.count);
76 :
77 : // Create streams.
78 0 : for i in 0..args.count {
79 0 : if i % 100 == 0 {
80 0 : info!("opened {}/{} streams", i, args.count);
81 0 : }
82 0 : if i % args.per_connection == 0 && i > 0 {
83 0 : client = connect().await?;
84 0 : }
85 :
86 0 : let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel();
87 0 : let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
88 0 : let mut resp_stream = client.get_pages(req_stream).await?;
89 :
90 : // Send request if specified.
91 0 : if args.send_request {
92 0 : req_tx.send(GetPageRequest {
93 0 : request_id: 1.into(),
94 0 : request_class: GetPageClass::Normal,
95 0 : read_lsn: ReadLsn {
96 0 : request_lsn: Lsn::MAX,
97 0 : not_modified_since_lsn: Some(Lsn(1)),
98 0 : },
99 0 : rel: RelTag {
100 0 : spcnode: 1664, // pg_global
101 0 : dbnode: 0, // shared database
102 0 : relnode: 1262, // pg_authid
103 0 : forknum: 0, // init
104 0 : },
105 0 : block_numbers: vec![0],
106 0 : })?;
107 0 : let resp = resp_stream
108 0 : .next()
109 0 : .await
110 0 : .transpose()?
111 0 : .ok_or_else(|| anyhow!("no response"))?;
112 0 : if resp.status_code != GetPageStatusCode::Ok {
113 0 : return Err(anyhow!("{} response", resp.status_code));
114 0 : }
115 0 : }
116 :
117 : // Hold onto streams to avoid closing them.
118 0 : streams.push((req_tx, resp_stream));
119 : }
120 :
121 0 : info!("opened {} streams, sleeping", args.count);
122 :
123 : // Block forever, to hold the idle streams open for inspection.
124 0 : futures::future::pending::<()>().await;
125 :
126 0 : Ok(())
127 0 : }
|