Line data Source code
1 : use std::io::{Read, Write, stdin, stdout};
2 : use std::time::Duration;
3 :
4 : use clap::Parser;
5 : use pageserver_api::pagestream_api::{
6 : PagestreamFeMessage, PagestreamRequest, PagestreamTestRequest,
7 : };
8 : use utils::id::{TenantId, TimelineId};
9 : use utils::lsn::Lsn;
10 :
11 : #[derive(clap::Parser)]
12 : struct Args {
13 : connstr: String,
14 : tenant_id: TenantId,
15 : timeline_id: TimelineId,
16 : }
17 :
18 : #[tokio::main]
19 0 : async fn main() -> anyhow::Result<()> {
20 : let Args {
21 0 : connstr,
22 0 : tenant_id,
23 0 : timeline_id,
24 0 : } = Args::parse();
25 0 : let client = pageserver_client::page_service::Client::new(connstr).await?;
26 0 : let client = client.pagestream(tenant_id, timeline_id).await?;
27 0 : let (mut sender, _receiver) = client.split();
28 :
29 0 : eprintln!("filling the pipe");
30 0 : let mut msg = 0;
31 : loop {
32 0 : msg += 1;
33 0 : let fut = sender.send(PagestreamFeMessage::Test(PagestreamTestRequest {
34 0 : hdr: PagestreamRequest {
35 0 : reqid: 0,
36 0 : request_lsn: Lsn(23),
37 0 : not_modified_since: Lsn(23),
38 0 : },
39 0 : batch_key: 42,
40 0 : message: format!("message {msg}"),
41 0 : }));
42 0 : let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
43 0 : eprintln!("pipe seems full");
44 0 : break;
45 : };
46 0 : let _: () = res?;
47 : }
48 :
49 0 : let n = stdout().write(b"R")?;
50 0 : assert_eq!(n, 1);
51 0 : stdout().flush()?;
52 :
53 0 : eprintln!("waiting for signal to tell us to exit");
54 :
55 0 : let mut buf = [0u8; 1];
56 0 : stdin().read_exact(&mut buf)?;
57 :
58 0 : eprintln!("termination signal received, exiting");
59 :
60 0 : anyhow::Ok(())
61 0 : }
|