LCOV - code coverage report
Current view: top level - pageserver/pagebench/src/cmd - idle_streams.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 74 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 7 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::anyhow;
       4              : use futures::StreamExt;
       5              : use tonic::transport::Endpoint;
       6              : use tracing::info;
       7              : 
       8              : use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag};
       9              : use utils::id::TenantTimelineId;
      10              : use utils::lsn::Lsn;
      11              : use utils::shard::ShardIndex;
      12              : 
      13              : /// Starts a large number of idle gRPC GetPage streams.
      14              : #[derive(clap::Parser)]
      15              : pub(crate) struct Args {
      16              :     /// The Pageserver to connect to. Must use grpc://.
      17              :     #[clap(long, default_value = "grpc://localhost:51051")]
      18              :     server: String,
      19              :     /// The Pageserver HTTP API.
      20              :     #[clap(long, default_value = "http://localhost:9898")]
      21              :     http_server: String,
      22              :     /// The number of streams to open.
      23              :     #[clap(long, default_value = "100000")]
      24              :     count: usize,
      25              :     /// Number of streams per connection.
      26              :     #[clap(long, default_value = "100")]
      27              :     per_connection: usize,
      28              :     /// Send a single GetPage request on each stream.
      29              :     #[clap(long, default_value_t = false)]
      30              :     send_request: bool,
      31              : }
      32              : 
      33            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      34            0 :     let rt = tokio::runtime::Builder::new_multi_thread()
      35            0 :         .enable_all()
      36            0 :         .build()?;
      37              : 
      38            0 :     rt.block_on(main_impl(args))
      39            0 : }
      40              : 
      41            0 : async fn main_impl(args: Args) -> anyhow::Result<()> {
      42              :     // Discover a tenant and timeline to use.
      43            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      44            0 :         reqwest::Client::new(),
      45            0 :         args.http_server.clone(),
      46            0 :         None,
      47              :     ));
      48            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
      49            0 :         &mgmt_api_client,
      50            0 :         crate::util::cli::targets::Spec {
      51            0 :             limit_to_first_n_targets: Some(1),
      52            0 :             targets: None,
      53            0 :         },
      54            0 :     )
      55            0 :     .await?;
      56            0 :     let ttid = timelines
      57            0 :         .first()
      58            0 :         .ok_or_else(|| anyhow!("no timelines found"))?;
      59              : 
      60              :     // Set up the initial client.
      61            0 :     let endpoint = Endpoint::from_shared(args.server.clone())?;
      62              : 
      63            0 :     let connect = async || {
      64            0 :         pageserver_page_api::Client::new(
      65            0 :             endpoint.connect().await?,
      66            0 :             ttid.tenant_id,
      67            0 :             ttid.timeline_id,
      68            0 :             ShardIndex::unsharded(),
      69            0 :             None,
      70            0 :             None,
      71              :         )
      72            0 :     };
      73              : 
      74            0 :     let mut client = connect().await?;
      75            0 :     let mut streams = Vec::with_capacity(args.count);
      76              : 
      77              :     // Create streams.
      78            0 :     for i in 0..args.count {
      79            0 :         if i % 100 == 0 {
      80            0 :             info!("opened {}/{} streams", i, args.count);
      81            0 :         }
      82            0 :         if i % args.per_connection == 0 && i > 0 {
      83            0 :             client = connect().await?;
      84            0 :         }
      85              : 
      86            0 :         let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel();
      87            0 :         let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
      88            0 :         let mut resp_stream = client.get_pages(req_stream).await?;
      89              : 
      90              :         // Send request if specified.
      91            0 :         if args.send_request {
      92            0 :             req_tx.send(GetPageRequest {
      93            0 :                 request_id: 1.into(),
      94            0 :                 request_class: GetPageClass::Normal,
      95            0 :                 read_lsn: ReadLsn {
      96            0 :                     request_lsn: Lsn::MAX,
      97            0 :                     not_modified_since_lsn: Some(Lsn(1)),
      98            0 :                 },
      99            0 :                 rel: RelTag {
     100            0 :                     spcnode: 1664, // pg_global
     101            0 :                     dbnode: 0,     // shared database
     102            0 :                     relnode: 1262, // pg_authid
     103            0 :                     forknum: 0,    // init
     104            0 :                 },
     105            0 :                 block_numbers: vec![0],
     106            0 :             })?;
     107            0 :             let resp = resp_stream
     108            0 :                 .next()
     109            0 :                 .await
     110            0 :                 .transpose()?
     111            0 :                 .ok_or_else(|| anyhow!("no response"))?;
     112            0 :             if resp.status_code != GetPageStatusCode::Ok {
     113            0 :                 return Err(anyhow!("{} response", resp.status_code));
     114            0 :             }
     115            0 :         }
     116              : 
     117              :         // Hold onto streams to avoid closing them.
     118            0 :         streams.push((req_tx, resp_stream));
     119              :     }
     120              : 
     121            0 :     info!("opened {} streams, sleeping", args.count);
     122              : 
     123              :     // Block forever, to hold the idle streams open for inspection.
     124            0 :     futures::future::pending::<()>().await;
     125              : 
     126            0 :     Ok(())
     127            0 : }
        

Generated by: LCOV version 2.1-beta