Line data Source code
1 : use std::pin::Pin;
2 :
3 : use futures::SinkExt;
4 : use pageserver_api::{
5 : models::{
6 : PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
7 : PagestreamGetPageResponse,
8 : },
9 : reltag::RelTag,
10 : };
11 : use tokio::task::JoinHandle;
12 : use tokio_postgres::CopyOutStream;
13 : use tokio_stream::StreamExt;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::{
16 : id::{TenantId, TimelineId},
17 : lsn::Lsn,
18 : };
19 :
20 : pub struct Client {
21 : client: tokio_postgres::Client,
22 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
23 : conn_task: JoinHandle<()>,
24 : }
25 :
26 : pub struct BasebackupRequest {
27 : pub tenant_id: TenantId,
28 : pub timeline_id: TimelineId,
29 : pub lsn: Option<Lsn>,
30 : pub gzip: bool,
31 : }
32 :
33 : impl Client {
34 0 : pub async fn new(connstring: String) -> anyhow::Result<Self> {
35 0 : let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
36 :
37 0 : let conn_task_cancel = CancellationToken::new();
38 0 : let conn_task = tokio::spawn({
39 0 : let conn_task_cancel = conn_task_cancel.clone();
40 0 : async move {
41 0 : tokio::select! {
42 0 : _ = conn_task_cancel.cancelled() => { }
43 0 : res = connection => {
44 0 : res.unwrap();
45 0 : }
46 : }
47 0 : }
48 0 : });
49 0 : Ok(Self {
50 0 : cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
51 0 : conn_task,
52 0 : client,
53 0 : })
54 0 : }
55 :
56 0 : pub async fn pagestream(
57 0 : self,
58 0 : tenant_id: TenantId,
59 0 : timeline_id: TimelineId,
60 0 : ) -> anyhow::Result<PagestreamClient> {
61 0 : let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
62 0 : .client
63 0 : .copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}"))
64 0 : .await?;
65 : let Client {
66 0 : cancel_on_client_drop,
67 0 : conn_task,
68 0 : client: _,
69 0 : } = self;
70 0 : Ok(PagestreamClient {
71 0 : copy_both: Box::pin(copy_both),
72 0 : conn_task,
73 0 : cancel_on_client_drop,
74 0 : })
75 0 : }
76 :
77 0 : pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
78 0 : let BasebackupRequest {
79 0 : tenant_id,
80 0 : timeline_id,
81 0 : lsn,
82 0 : gzip,
83 0 : } = req;
84 0 : let mut args = Vec::with_capacity(5);
85 0 : args.push("basebackup".to_string());
86 0 : args.push(format!("{tenant_id}"));
87 0 : args.push(format!("{timeline_id}"));
88 0 : if let Some(lsn) = lsn {
89 0 : args.push(format!("{lsn}"));
90 0 : }
91 0 : if *gzip {
92 0 : args.push("--gzip".to_string())
93 0 : }
94 0 : Ok(self.client.copy_out(&args.join(" ")).await?)
95 0 : }
96 : }
97 :
98 : /// Create using [`Client::pagestream`].
99 : pub struct PagestreamClient {
100 : copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
101 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
102 : conn_task: JoinHandle<()>,
103 : }
104 :
105 : pub struct RelTagBlockNo {
106 : pub rel_tag: RelTag,
107 : pub block_no: u32,
108 : }
109 :
110 : impl PagestreamClient {
111 0 : pub async fn shutdown(self) {
112 0 : let Self {
113 0 : copy_both,
114 0 : cancel_on_client_drop: cancel_conn_task,
115 0 : conn_task,
116 0 : } = self;
117 0 : // The `copy_both` contains internal channel sender, the receiver of which is polled by `conn_task`.
118 0 : // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
119 0 : // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
120 0 : //
121 0 : // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`,
122 0 : // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race).
123 0 : //
124 0 : // Further, the pageserver makes a lot of noise when it receives CopyFail.
125 0 : // Computes don't send it in practice, they just hard-close the connection.
126 0 : //
127 0 : // So, let's behave like the computes and suppress the CopyFail as follows:
128 0 : // kill the socket first, then drop copy_both.
129 0 : //
130 0 : // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
131 0 : //
132 0 : // NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
133 0 : // => https://github.com/neondatabase/neon/issues/6390
134 0 : let _ = cancel_conn_task.unwrap();
135 0 : conn_task.await.unwrap();
136 0 : drop(copy_both);
137 0 : }
138 :
139 0 : pub async fn getpage(
140 0 : &mut self,
141 0 : req: PagestreamGetPageRequest,
142 0 : ) -> anyhow::Result<PagestreamGetPageResponse> {
143 0 : let req = PagestreamFeMessage::GetPage(req);
144 0 : let req: bytes::Bytes = req.serialize();
145 0 : // let mut req = tokio_util::io::ReaderStream::new(&req);
146 0 : let mut req = tokio_stream::once(Ok(req));
147 0 :
148 0 : self.copy_both.send_all(&mut req).await?;
149 :
150 0 : let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
151 0 : let next: bytes::Bytes = next.unwrap()?;
152 :
153 0 : let msg = PagestreamBeMessage::deserialize(next)?;
154 0 : match msg {
155 0 : PagestreamBeMessage::GetPage(p) => Ok(p),
156 0 : PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
157 : PagestreamBeMessage::Exists(_)
158 : | PagestreamBeMessage::Nblocks(_)
159 : | PagestreamBeMessage::DbSize(_)
160 : | PagestreamBeMessage::GetSlruSegment(_) => {
161 0 : anyhow::bail!(
162 0 : "unexpected be message kind in response to getpage request: {}",
163 0 : msg.kind()
164 0 : )
165 : }
166 : }
167 0 : }
168 : }
|