Line data Source code
1 : use std::sync::{Arc, Mutex};
2 :
3 : use futures::stream::{SplitSink, SplitStream};
4 : use futures::{SinkExt, StreamExt};
5 : use pageserver_api::models::{
6 : PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
7 : };
8 : use pageserver_api::reltag::RelTag;
9 : use tokio::task::JoinHandle;
10 : use tokio_postgres::CopyOutStream;
11 : use tokio_util::sync::CancellationToken;
12 : use utils::id::{TenantId, TimelineId};
13 : use utils::lsn::Lsn;
14 :
15 : pub struct Client {
16 : client: tokio_postgres::Client,
17 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
18 : conn_task: JoinHandle<()>,
19 : }
20 :
21 : pub struct BasebackupRequest {
22 : pub tenant_id: TenantId,
23 : pub timeline_id: TimelineId,
24 : pub lsn: Option<Lsn>,
25 : pub gzip: bool,
26 : }
27 :
28 : impl Client {
29 0 : pub async fn new(connstring: String) -> anyhow::Result<Self> {
30 0 : let (client, connection) =
31 0 : tokio_postgres::connect(&connstring, tokio_postgres::NoTls).await?;
32 :
33 0 : let conn_task_cancel = CancellationToken::new();
34 0 : let conn_task = tokio::spawn({
35 0 : let conn_task_cancel = conn_task_cancel.clone();
36 0 : async move {
37 0 : tokio::select! {
38 0 : _ = conn_task_cancel.cancelled() => { }
39 0 : res = connection => {
40 0 : res.unwrap();
41 0 : }
42 : }
43 0 : }
44 0 : });
45 0 : Ok(Self {
46 0 : cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
47 0 : conn_task,
48 0 : client,
49 0 : })
50 0 : }
51 :
52 0 : pub async fn pagestream(
53 0 : self,
54 0 : tenant_id: TenantId,
55 0 : timeline_id: TimelineId,
56 0 : ) -> anyhow::Result<PagestreamClient> {
57 0 : let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
58 0 : .client
59 0 : .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
60 0 : .await?;
61 0 : let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
62 0 : let Client {
63 0 : cancel_on_client_drop,
64 0 : conn_task,
65 0 : client: _,
66 0 : } = self;
67 0 : let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
68 0 : ConnTaskRunning {
69 0 : cancel_on_client_drop,
70 0 : conn_task,
71 0 : },
72 0 : )));
73 0 : Ok(PagestreamClient {
74 0 : sink: PagestreamSender {
75 0 : shared: shared.clone(),
76 0 : sink,
77 0 : },
78 0 : stream: PagestreamReceiver {
79 0 : shared: shared.clone(),
80 0 : stream,
81 0 : },
82 0 : shared,
83 0 : })
84 0 : }
85 :
86 0 : pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
87 0 : let BasebackupRequest {
88 0 : tenant_id,
89 0 : timeline_id,
90 0 : lsn,
91 0 : gzip,
92 0 : } = req;
93 0 : let mut args = Vec::with_capacity(5);
94 0 : args.push("basebackup".to_string());
95 0 : args.push(format!("{tenant_id}"));
96 0 : args.push(format!("{timeline_id}"));
97 0 : if let Some(lsn) = lsn {
98 0 : args.push(format!("{lsn}"));
99 0 : }
100 0 : if *gzip {
101 0 : args.push("--gzip".to_string())
102 0 : }
103 0 : Ok(self.client.copy_out(&args.join(" ")).await?)
104 0 : }
105 : }
106 :
107 : /// Create using [`Client::pagestream`].
108 : pub struct PagestreamClient {
109 : shared: Arc<Mutex<PagestreamShared>>,
110 : sink: PagestreamSender,
111 : stream: PagestreamReceiver,
112 : }
113 :
114 : pub struct PagestreamSender {
115 : #[allow(dead_code)]
116 : shared: Arc<Mutex<PagestreamShared>>,
117 : sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
118 : }
119 :
120 : pub struct PagestreamReceiver {
121 : #[allow(dead_code)]
122 : shared: Arc<Mutex<PagestreamShared>>,
123 : stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
124 : }
125 :
126 : enum PagestreamShared {
127 : ConnTaskRunning(ConnTaskRunning),
128 : ConnTaskCancelledJoinHandleReturnedOrDropped,
129 : }
130 : struct ConnTaskRunning {
131 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
132 : conn_task: JoinHandle<()>,
133 : }
134 :
135 : pub struct RelTagBlockNo {
136 : pub rel_tag: RelTag,
137 : pub block_no: u32,
138 : }
139 :
140 : impl PagestreamClient {
141 0 : pub async fn shutdown(self) {
142 0 : let Self {
143 0 : shared,
144 0 : sink,
145 0 : stream,
146 0 : } = { self };
147 : // The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
148 : // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
149 : // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
150 : //
151 : // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`,
152 : // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race).
153 : //
154 : // Further, the pageserver makes a lot of noise when it receives CopyFail.
155 : // Computes don't send it in practice, they just hard-close the connection.
156 : //
157 : // So, let's behave like the computes and suppress the CopyFail as follows:
158 : // kill the socket first, then drop copy_both.
159 : //
160 : // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
161 : //
162 : // NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
163 : // => https://github.com/neondatabase/neon/issues/6390
164 : let ConnTaskRunning {
165 0 : cancel_on_client_drop,
166 0 : conn_task,
167 : } = {
168 0 : let mut guard = shared.lock().unwrap();
169 0 : match std::mem::replace(
170 0 : &mut *guard,
171 0 : PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
172 0 : ) {
173 0 : PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
174 0 : PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
175 : }
176 : };
177 0 : let _ = cancel_on_client_drop.unwrap();
178 0 : conn_task.await.unwrap();
179 0 :
180 0 : // Now drop the split copy_both.
181 0 : drop(sink);
182 0 : drop(stream);
183 0 : }
184 :
185 0 : pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
186 0 : let Self {
187 0 : shared: _,
188 0 : sink,
189 0 : stream,
190 0 : } = self;
191 0 : (sink, stream)
192 0 : }
193 :
194 0 : pub async fn getpage(
195 0 : &mut self,
196 0 : req: PagestreamGetPageRequest,
197 0 : ) -> anyhow::Result<PagestreamGetPageResponse> {
198 0 : self.getpage_send(req).await?;
199 0 : self.getpage_recv().await
200 0 : }
201 :
202 0 : pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
203 0 : self.sink.getpage_send(req).await
204 0 : }
205 :
206 0 : pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
207 0 : self.stream.getpage_recv().await
208 0 : }
209 : }
210 :
211 : impl PagestreamSender {
212 : // TODO: maybe make this impl Sink instead for better composability?
213 0 : pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
214 0 : let msg = msg.serialize();
215 0 : self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
216 0 : Ok(())
217 0 : }
218 :
219 0 : pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
220 0 : self.send(PagestreamFeMessage::GetPage(req)).await
221 0 : }
222 : }
223 :
224 : impl PagestreamReceiver {
225 : // TODO: maybe make this impl Stream instead for better composability?
226 0 : pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
227 0 : let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
228 0 : let next: bytes::Bytes = next.unwrap()?;
229 0 : PagestreamBeMessage::deserialize(next)
230 0 : }
231 :
232 0 : pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
233 0 : let next: PagestreamBeMessage = self.recv().await?;
234 0 : match next {
235 0 : PagestreamBeMessage::GetPage(p) => Ok(p),
236 0 : PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
237 : PagestreamBeMessage::Exists(_)
238 : | PagestreamBeMessage::Nblocks(_)
239 : | PagestreamBeMessage::DbSize(_)
240 : | PagestreamBeMessage::GetSlruSegment(_) => {
241 0 : anyhow::bail!(
242 0 : "unexpected be message kind in response to getpage request: {}",
243 0 : next.kind()
244 0 : )
245 : }
246 : #[cfg(feature = "testing")]
247 : PagestreamBeMessage::Test(_) => {
248 0 : anyhow::bail!(
249 0 : "unexpected be message kind in response to getpage request: {}",
250 0 : next.kind()
251 0 : )
252 : }
253 : }
254 0 : }
255 : }
|