|             Line data    Source code 
       1              : use std::collections::HashMap;
       2              : use std::sync::Arc;
       3              : use std::time::Instant;
       4              : 
       5              : use pageserver_api::models::{TenantConfig, TenantConfigRequest};
       6              : use pageserver_api::shard::TenantShardId;
       7              : use utils::id::TenantTimelineId;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : /// Ingest aux files into the pageserver.
      11              : #[derive(clap::Parser)]
      12              : pub(crate) struct Args {
      13              :     #[clap(long, default_value = "http://localhost:9898")]
      14              :     mgmt_api_endpoint: String,
      15              :     #[clap(long, default_value = "postgres://postgres@localhost:64000")]
      16              :     page_service_connstring: String,
      17              :     #[clap(long)]
      18              :     pageserver_jwt: Option<String>,
      19              : 
      20              :     targets: Option<Vec<TenantTimelineId>>,
      21              : }
      22              : 
      23            0 : pub(crate) fn main(args: Args) -> anyhow::Result<()> {
      24            0 :     let rt = tokio::runtime::Builder::new_multi_thread()
      25            0 :         .enable_all()
      26            0 :         .build()
      27            0 :         .unwrap();
      28              : 
      29            0 :     let main_task = rt.spawn(main_impl(args));
      30            0 :     rt.block_on(main_task).unwrap()
      31            0 : }
      32              : 
      33            0 : async fn main_impl(args: Args) -> anyhow::Result<()> {
      34            0 :     let args: &'static Args = Box::leak(Box::new(args));
      35              : 
      36            0 :     let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
      37            0 :         reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench.
      38            0 :         args.mgmt_api_endpoint.clone(),
      39            0 :         args.pageserver_jwt.as_deref(),
      40              :     ));
      41              : 
      42              :     // discover targets
      43            0 :     let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
      44            0 :         &mgmt_api_client,
      45              :         crate::util::cli::targets::Spec {
      46            0 :             limit_to_first_n_targets: None,
      47              :             targets: {
      48            0 :                 if let Some(targets) = &args.targets {
      49            0 :                     if targets.len() != 1 {
      50            0 :                         anyhow::bail!("must specify exactly one target");
      51            0 :                     }
      52            0 :                     Some(targets.clone())
      53              :                 } else {
      54            0 :                     None
      55              :                 }
      56              :             },
      57              :         },
      58              :     )
      59            0 :     .await?;
      60              : 
      61            0 :     let timeline = timelines[0];
      62            0 :     let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
      63            0 :     let timeline_id = timeline.timeline_id;
      64              : 
      65            0 :     println!("operating on timeline {timeline}");
      66              : 
      67            0 :     mgmt_api_client
      68            0 :         .set_tenant_config(&TenantConfigRequest {
      69            0 :             tenant_id: timeline.tenant_id,
      70            0 :             config: TenantConfig::default(),
      71            0 :         })
      72            0 :         .await?;
      73              : 
      74            0 :     for batch in 0..100 {
      75            0 :         let items = (0..100)
      76            0 :             .map(|id| {
      77            0 :                 (
      78            0 :                     format!("pg_logical/mappings/{batch:03}.{id:03}"),
      79            0 :                     format!("{id:08}"),
      80            0 :                 )
      81            0 :             })
      82            0 :             .collect::<HashMap<_, _>>();
      83            0 :         let file_cnt = items.len();
      84            0 :         mgmt_api_client
      85            0 :             .ingest_aux_files(tenant_shard_id, timeline_id, items)
      86            0 :             .await?;
      87            0 :         println!("ingested {file_cnt} files");
      88              :     }
      89              : 
      90            0 :     for _ in 0..100 {
      91            0 :         let start = Instant::now();
      92            0 :         let files = mgmt_api_client
      93            0 :             .list_aux_files(tenant_shard_id, timeline_id, Lsn(Lsn::MAX.0 - 1))
      94            0 :             .await?;
      95            0 :         println!(
      96            0 :             "{} files found in {}s",
      97            0 :             files.len(),
      98            0 :             start.elapsed().as_secs_f64()
      99              :         );
     100              :     }
     101              : 
     102            0 :     anyhow::Ok(())
     103            0 : }
         |