TLA Line data Source code
1 : use std::pin::Pin;
2 :
3 : use futures::SinkExt;
4 : use pageserver_api::{
5 : models::{
6 : PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
7 : PagestreamGetPageResponse,
8 : },
9 : reltag::RelTag,
10 : };
11 : use tokio::task::JoinHandle;
12 : use tokio_postgres::CopyOutStream;
13 : use tokio_stream::StreamExt;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::{
16 : id::{TenantId, TimelineId},
17 : lsn::Lsn,
18 : };
19 :
20 : pub struct Client {
21 : client: tokio_postgres::Client,
22 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
23 : conn_task: JoinHandle<()>,
24 : }
25 :
26 : pub struct BasebackupRequest {
27 : pub tenant_id: TenantId,
28 : pub timeline_id: TimelineId,
29 : pub lsn: Option<Lsn>,
30 : pub gzip: bool,
31 : }
32 :
33 : impl Client {
34 UBC 0 : pub async fn new(connstring: String) -> anyhow::Result<Self> {
35 0 : let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
36 :
37 0 : let conn_task_cancel = CancellationToken::new();
38 0 : let conn_task = tokio::spawn({
39 0 : let conn_task_cancel = conn_task_cancel.clone();
40 0 : async move {
41 0 : tokio::select! {
42 : _ = conn_task_cancel.cancelled() => { }
43 0 : res = connection => {
44 : res.unwrap();
45 : }
46 : }
47 0 : }
48 0 : });
49 0 : Ok(Self {
50 0 : cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
51 0 : conn_task,
52 0 : client,
53 0 : })
54 0 : }
55 :
56 0 : pub async fn pagestream(
57 0 : self,
58 0 : tenant_id: TenantId,
59 0 : timeline_id: TimelineId,
60 0 : ) -> anyhow::Result<PagestreamClient> {
61 0 : let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
62 0 : .client
63 0 : .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
64 0 : .await?;
65 : let Client {
66 0 : cancel_on_client_drop,
67 0 : conn_task,
68 0 : client: _,
69 0 : } = self;
70 0 : Ok(PagestreamClient {
71 0 : copy_both: Box::pin(copy_both),
72 0 : conn_task,
73 0 : cancel_on_client_drop,
74 0 : })
75 0 : }
76 :
77 0 : pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
78 0 : let BasebackupRequest {
79 0 : tenant_id,
80 0 : timeline_id,
81 0 : lsn,
82 0 : gzip,
83 0 : } = req;
84 0 : let mut args = Vec::with_capacity(5);
85 0 : args.push("basebackup".to_string());
86 0 : args.push(format!("{tenant_id}"));
87 0 : args.push(format!("{timeline_id}"));
88 0 : if let Some(lsn) = lsn {
89 0 : args.push(format!("{lsn}"));
90 0 : }
91 0 : if *gzip {
92 0 : args.push("--gzip".to_string())
93 0 : }
94 0 : Ok(self.client.copy_out(&args.join(" ")).await?)
95 0 : }
96 : }
97 :
98 : /// Create using [`Client::pagestream`].
99 : pub struct PagestreamClient {
100 : copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
101 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
102 : conn_task: JoinHandle<()>,
103 : }
104 :
105 : pub struct RelTagBlockNo {
106 : pub rel_tag: RelTag,
107 : pub block_no: u32,
108 : }
109 :
110 : impl PagestreamClient {
111 0 : pub async fn shutdown(mut self) {
112 0 : let _ = self.cancel_on_client_drop.take();
113 0 : self.conn_task.await.unwrap();
114 0 : }
115 :
116 0 : pub async fn getpage(
117 0 : &mut self,
118 0 : req: PagestreamGetPageRequest,
119 0 : ) -> anyhow::Result<PagestreamGetPageResponse> {
120 0 : let req = PagestreamFeMessage::GetPage(req);
121 0 : let req: bytes::Bytes = req.serialize();
122 0 : // let mut req = tokio_util::io::ReaderStream::new(&req);
123 0 : let mut req = tokio_stream::once(Ok(req));
124 0 :
125 0 : self.copy_both.send_all(&mut req).await?;
126 :
127 0 : let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
128 0 : let next: bytes::Bytes = next.unwrap()?;
129 :
130 0 : let msg = PagestreamBeMessage::deserialize(next)?;
131 0 : match msg {
132 0 : PagestreamBeMessage::GetPage(p) => Ok(p),
133 0 : PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
134 : PagestreamBeMessage::Exists(_)
135 : | PagestreamBeMessage::Nblocks(_)
136 : | PagestreamBeMessage::DbSize(_) => {
137 0 : anyhow::bail!(
138 0 : "unexpected be message kind in response to getpage request: {}",
139 0 : msg.kind()
140 0 : )
141 : }
142 : }
143 0 : }
144 : }
|