Line data Source code
1 : use std::sync::{Arc, Mutex};
2 :
3 : use futures::{
4 : stream::{SplitSink, SplitStream},
5 : SinkExt, StreamExt,
6 : };
7 : use pageserver_api::{
8 : models::{
9 : PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
10 : PagestreamGetPageResponse,
11 : },
12 : reltag::RelTag,
13 : };
14 : use tokio::task::JoinHandle;
15 : use tokio_postgres::CopyOutStream;
16 : use tokio_util::sync::CancellationToken;
17 : use utils::{
18 : id::{TenantId, TimelineId},
19 : lsn::Lsn,
20 : };
21 :
22 : pub struct Client {
23 : client: tokio_postgres::Client,
24 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
25 : conn_task: JoinHandle<()>,
26 : }
27 :
28 : pub struct BasebackupRequest {
29 : pub tenant_id: TenantId,
30 : pub timeline_id: TimelineId,
31 : pub lsn: Option<Lsn>,
32 : pub gzip: bool,
33 : }
34 :
35 : impl Client {
36 0 : pub async fn new(connstring: String) -> anyhow::Result<Self> {
37 0 : let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
38 :
39 0 : let conn_task_cancel = CancellationToken::new();
40 0 : let conn_task = tokio::spawn({
41 0 : let conn_task_cancel = conn_task_cancel.clone();
42 0 : async move {
43 0 : tokio::select! {
44 0 : _ = conn_task_cancel.cancelled() => { }
45 0 : res = connection => {
46 0 : res.unwrap();
47 0 : }
48 : }
49 0 : }
50 0 : });
51 0 : Ok(Self {
52 0 : cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
53 0 : conn_task,
54 0 : client,
55 0 : })
56 0 : }
57 :
58 0 : pub async fn pagestream(
59 0 : self,
60 0 : tenant_id: TenantId,
61 0 : timeline_id: TimelineId,
62 0 : ) -> anyhow::Result<PagestreamClient> {
63 0 : let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
64 0 : .client
65 0 : .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}"))
66 0 : .await?;
67 0 : let (sink, stream) = copy_both.split(); // TODO: actually support splitting of the CopyBothDuplex so the lock inside this split adaptor goes away.
68 0 : let Client {
69 0 : cancel_on_client_drop,
70 0 : conn_task,
71 0 : client: _,
72 0 : } = self;
73 0 : let shared = Arc::new(Mutex::new(PagestreamShared::ConnTaskRunning(
74 0 : ConnTaskRunning {
75 0 : cancel_on_client_drop,
76 0 : conn_task,
77 0 : },
78 0 : )));
79 0 : Ok(PagestreamClient {
80 0 : sink: PagestreamSender {
81 0 : shared: shared.clone(),
82 0 : sink,
83 0 : },
84 0 : stream: PagestreamReceiver {
85 0 : shared: shared.clone(),
86 0 : stream,
87 0 : },
88 0 : shared,
89 0 : })
90 0 : }
91 :
92 0 : pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
93 0 : let BasebackupRequest {
94 0 : tenant_id,
95 0 : timeline_id,
96 0 : lsn,
97 0 : gzip,
98 0 : } = req;
99 0 : let mut args = Vec::with_capacity(5);
100 0 : args.push("basebackup".to_string());
101 0 : args.push(format!("{tenant_id}"));
102 0 : args.push(format!("{timeline_id}"));
103 0 : if let Some(lsn) = lsn {
104 0 : args.push(format!("{lsn}"));
105 0 : }
106 0 : if *gzip {
107 0 : args.push("--gzip".to_string())
108 0 : }
109 0 : Ok(self.client.copy_out(&args.join(" ")).await?)
110 0 : }
111 : }
112 :
113 : /// Create using [`Client::pagestream`].
114 : pub struct PagestreamClient {
115 : shared: Arc<Mutex<PagestreamShared>>,
116 : sink: PagestreamSender,
117 : stream: PagestreamReceiver,
118 : }
119 :
120 : pub struct PagestreamSender {
121 : #[allow(dead_code)]
122 : shared: Arc<Mutex<PagestreamShared>>,
123 : sink: SplitSink<tokio_postgres::CopyBothDuplex<bytes::Bytes>, bytes::Bytes>,
124 : }
125 :
126 : pub struct PagestreamReceiver {
127 : #[allow(dead_code)]
128 : shared: Arc<Mutex<PagestreamShared>>,
129 : stream: SplitStream<tokio_postgres::CopyBothDuplex<bytes::Bytes>>,
130 : }
131 :
132 : enum PagestreamShared {
133 : ConnTaskRunning(ConnTaskRunning),
134 : ConnTaskCancelledJoinHandleReturnedOrDropped,
135 : }
136 : struct ConnTaskRunning {
137 : cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
138 : conn_task: JoinHandle<()>,
139 : }
140 :
141 : pub struct RelTagBlockNo {
142 : pub rel_tag: RelTag,
143 : pub block_no: u32,
144 : }
145 :
146 : impl PagestreamClient {
147 0 : pub async fn shutdown(self) {
148 0 : let Self {
149 0 : shared,
150 0 : sink,
151 0 : stream,
152 0 : } = { self };
153 : // The `copy_both` split into `sink` and `stream` contains internal channel sender, the receiver of which is polled by `conn_task`.
154 : // When `conn_task` observes the sender has been dropped, it sends a `FeMessage::CopyFail` into the connection.
155 : // (see https://github.com/neondatabase/rust-postgres/blob/2005bf79573b8add5cf205b52a2b208e356cc8b0/tokio-postgres/src/copy_both.rs#L56).
156 : //
157 : // If we drop(copy_both) first, but then immediately drop the `cancel_on_client_drop`,
158 : // the CopyFail mesage only makes it to the socket sometimes (i.e., it's a race).
159 : //
160 : // Further, the pageserver makes a lot of noise when it receives CopyFail.
161 : // Computes don't send it in practice, they just hard-close the connection.
162 : //
163 : // So, let's behave like the computes and suppress the CopyFail as follows:
164 : // kill the socket first, then drop copy_both.
165 : //
166 : // See also: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
167 : //
168 : // NB: page_service doesn't have a use case to exit the `pagestream` mode currently.
169 : // => https://github.com/neondatabase/neon/issues/6390
170 : let ConnTaskRunning {
171 0 : cancel_on_client_drop,
172 0 : conn_task,
173 : } = {
174 0 : let mut guard = shared.lock().unwrap();
175 0 : match std::mem::replace(
176 0 : &mut *guard,
177 0 : PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped,
178 0 : ) {
179 0 : PagestreamShared::ConnTaskRunning(conn_task_running) => conn_task_running,
180 0 : PagestreamShared::ConnTaskCancelledJoinHandleReturnedOrDropped => unreachable!(),
181 : }
182 : };
183 0 : let _ = cancel_on_client_drop.unwrap();
184 0 : conn_task.await.unwrap();
185 0 :
186 0 : // Now drop the split copy_both.
187 0 : drop(sink);
188 0 : drop(stream);
189 0 : }
190 :
191 0 : pub fn split(self) -> (PagestreamSender, PagestreamReceiver) {
192 0 : let Self {
193 0 : shared: _,
194 0 : sink,
195 0 : stream,
196 0 : } = self;
197 0 : (sink, stream)
198 0 : }
199 :
200 0 : pub async fn getpage(
201 0 : &mut self,
202 0 : req: PagestreamGetPageRequest,
203 0 : ) -> anyhow::Result<PagestreamGetPageResponse> {
204 0 : self.getpage_send(req).await?;
205 0 : self.getpage_recv().await
206 0 : }
207 :
208 0 : pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
209 0 : self.sink.getpage_send(req).await
210 0 : }
211 :
212 0 : pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
213 0 : self.stream.getpage_recv().await
214 0 : }
215 : }
216 :
217 : impl PagestreamSender {
218 : // TODO: maybe make this impl Sink instead for better composability?
219 0 : pub async fn send(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
220 0 : let msg = msg.serialize();
221 0 : self.sink.send_all(&mut tokio_stream::once(Ok(msg))).await?;
222 0 : Ok(())
223 0 : }
224 :
225 0 : pub async fn getpage_send(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
226 0 : self.send(PagestreamFeMessage::GetPage(req)).await
227 0 : }
228 : }
229 :
230 : impl PagestreamReceiver {
231 : // TODO: maybe make this impl Stream instead for better composability?
232 0 : pub async fn recv(&mut self) -> anyhow::Result<PagestreamBeMessage> {
233 0 : let next: Option<Result<bytes::Bytes, _>> = self.stream.next().await;
234 0 : let next: bytes::Bytes = next.unwrap()?;
235 0 : PagestreamBeMessage::deserialize(next)
236 0 : }
237 :
238 0 : pub async fn getpage_recv(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
239 0 : let next: PagestreamBeMessage = self.recv().await?;
240 0 : match next {
241 0 : PagestreamBeMessage::GetPage(p) => Ok(p),
242 0 : PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
243 : PagestreamBeMessage::Exists(_)
244 : | PagestreamBeMessage::Nblocks(_)
245 : | PagestreamBeMessage::DbSize(_)
246 : | PagestreamBeMessage::GetSlruSegment(_) => {
247 0 : anyhow::bail!(
248 0 : "unexpected be message kind in response to getpage request: {}",
249 0 : next.kind()
250 0 : )
251 : }
252 : #[cfg(feature = "testing")]
253 : PagestreamBeMessage::Test(_) => {
254 0 : anyhow::bail!(
255 0 : "unexpected be message kind in response to getpage request: {}",
256 0 : next.kind()
257 0 : )
258 : }
259 : }
260 0 : }
261 : }
|