Line data Source code
1 : //! Rust definitions of the libpq-based pagestream API
2 : //!
3 : //! See also the C implementation of the same API in pgxn/neon/pagestore_client.h
4 :
5 : use std::io::{BufRead, Read};
6 :
7 : use crate::reltag::RelTag;
8 :
9 : use byteorder::{BigEndian, ReadBytesExt};
10 : use bytes::{Buf, BufMut, Bytes, BytesMut};
11 : use utils::lsn::Lsn;
12 :
13 : /// Block size.
14 : ///
15 : /// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode
16 : /// that in the protocol, because Postgres supports different block sizes as a compile
17 : /// time option.
18 : const BLCKSZ: usize = 8192;
19 :
20 : // Wrapped in libpq CopyData
21 : #[derive(PartialEq, Eq, Debug)]
22 : pub enum PagestreamFeMessage {
23 : Exists(PagestreamExistsRequest),
24 : Nblocks(PagestreamNblocksRequest),
25 : GetPage(PagestreamGetPageRequest),
26 : DbSize(PagestreamDbSizeRequest),
27 : GetSlruSegment(PagestreamGetSlruSegmentRequest),
28 : #[cfg(feature = "testing")]
29 : Test(PagestreamTestRequest),
30 : }
31 :
32 : // Wrapped in libpq CopyData
33 : #[derive(Debug, strum_macros::EnumProperty)]
34 : pub enum PagestreamBeMessage {
35 : Exists(PagestreamExistsResponse),
36 : Nblocks(PagestreamNblocksResponse),
37 : GetPage(PagestreamGetPageResponse),
38 : Error(PagestreamErrorResponse),
39 : DbSize(PagestreamDbSizeResponse),
40 : GetSlruSegment(PagestreamGetSlruSegmentResponse),
41 : #[cfg(feature = "testing")]
42 : Test(PagestreamTestResponse),
43 : }
44 :
45 : // Keep in sync with `pagestore_client.h`
46 : #[repr(u8)]
47 : enum PagestreamFeMessageTag {
48 : Exists = 0,
49 : Nblocks = 1,
50 : GetPage = 2,
51 : DbSize = 3,
52 : GetSlruSegment = 4,
53 : /* future tags above this line */
54 : /// For testing purposes, not available in production.
55 : #[cfg(feature = "testing")]
56 : Test = 99,
57 : }
58 :
59 : // Keep in sync with `pagestore_client.h`
60 : #[repr(u8)]
61 : enum PagestreamBeMessageTag {
62 : Exists = 100,
63 : Nblocks = 101,
64 : GetPage = 102,
65 : Error = 103,
66 : DbSize = 104,
67 : GetSlruSegment = 105,
68 : /* future tags above this line */
69 : /// For testing purposes, not available in production.
70 : #[cfg(feature = "testing")]
71 : Test = 199,
72 : }
73 :
74 : impl TryFrom<u8> for PagestreamFeMessageTag {
75 : type Error = u8;
76 4 : fn try_from(value: u8) -> Result<Self, u8> {
77 4 : match value {
78 1 : 0 => Ok(PagestreamFeMessageTag::Exists),
79 1 : 1 => Ok(PagestreamFeMessageTag::Nblocks),
80 1 : 2 => Ok(PagestreamFeMessageTag::GetPage),
81 1 : 3 => Ok(PagestreamFeMessageTag::DbSize),
82 0 : 4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
83 : #[cfg(feature = "testing")]
84 0 : 99 => Ok(PagestreamFeMessageTag::Test),
85 0 : _ => Err(value),
86 : }
87 4 : }
88 : }
89 :
90 : impl TryFrom<u8> for PagestreamBeMessageTag {
91 : type Error = u8;
92 0 : fn try_from(value: u8) -> Result<Self, u8> {
93 0 : match value {
94 0 : 100 => Ok(PagestreamBeMessageTag::Exists),
95 0 : 101 => Ok(PagestreamBeMessageTag::Nblocks),
96 0 : 102 => Ok(PagestreamBeMessageTag::GetPage),
97 0 : 103 => Ok(PagestreamBeMessageTag::Error),
98 0 : 104 => Ok(PagestreamBeMessageTag::DbSize),
99 0 : 105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
100 : #[cfg(feature = "testing")]
101 0 : 199 => Ok(PagestreamBeMessageTag::Test),
102 0 : _ => Err(value),
103 : }
104 0 : }
105 : }
106 :
107 : // A GetPage request contains two LSN values:
108 : //
109 : // request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
110 : // "get the latest version present". It's used by the primary server, which knows that no one else
111 : // is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
112 : // Lsn::Max. Standby servers use the current replay LSN as the request LSN.
113 : //
114 : // not_modified_since: Hint to the pageserver that the client knows that the page has not been
115 : // modified between 'not_modified_since' and the request LSN. It's always correct to set
116 : // 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
117 : // passing an earlier LSN can speed up the request, by allowing the pageserver to process the
118 : // request without waiting for 'request_lsn' to arrive.
119 : //
120 : // The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
121 : // sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
122 : // 'latest' was set to true. The V2 interface was added because there was no correct way for a
123 : // standby to request a page at a particular non-latest LSN, and also include the
124 : // 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
125 : // request, if the standby knows that the page hasn't been modified since, and risk getting an error
126 : // if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
127 : // require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
128 : // interface allows sending both LSNs, and let the pageserver do the right thing. There was no
129 : // difference in the responses between V1 and V2.
130 : //
131 : // V3 version of protocol adds request ID to all requests. This request ID is also included in response
132 : // as well as other fields from requests, which allows to verify that we receive response for our request.
133 : // We copy fields from request to response to make checking more reliable: request ID is formed from process ID
134 : // and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
135 : //
136 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
137 : pub enum PagestreamProtocolVersion {
138 : V2,
139 : V3,
140 : }
141 :
142 : pub type RequestId = u64;
143 :
144 : #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
145 : pub struct PagestreamRequest {
146 : pub reqid: RequestId,
147 : pub request_lsn: Lsn,
148 : pub not_modified_since: Lsn,
149 : }
150 :
151 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
152 : pub struct PagestreamExistsRequest {
153 : pub hdr: PagestreamRequest,
154 : pub rel: RelTag,
155 : }
156 :
157 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
158 : pub struct PagestreamNblocksRequest {
159 : pub hdr: PagestreamRequest,
160 : pub rel: RelTag,
161 : }
162 :
163 : #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
164 : pub struct PagestreamGetPageRequest {
165 : pub hdr: PagestreamRequest,
166 : pub rel: RelTag,
167 : pub blkno: u32,
168 : }
169 :
170 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
171 : pub struct PagestreamDbSizeRequest {
172 : pub hdr: PagestreamRequest,
173 : pub dbnode: u32,
174 : }
175 :
176 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
177 : pub struct PagestreamGetSlruSegmentRequest {
178 : pub hdr: PagestreamRequest,
179 : pub kind: u8,
180 : pub segno: u32,
181 : }
182 :
183 : #[derive(Debug)]
184 : pub struct PagestreamExistsResponse {
185 : pub req: PagestreamExistsRequest,
186 : pub exists: bool,
187 : }
188 :
189 : #[derive(Debug)]
190 : pub struct PagestreamNblocksResponse {
191 : pub req: PagestreamNblocksRequest,
192 : pub n_blocks: u32,
193 : }
194 :
195 : #[derive(Debug)]
196 : pub struct PagestreamGetPageResponse {
197 : pub req: PagestreamGetPageRequest,
198 : pub page: Bytes,
199 : }
200 :
201 : #[derive(Debug)]
202 : pub struct PagestreamGetSlruSegmentResponse {
203 : pub req: PagestreamGetSlruSegmentRequest,
204 : pub segment: Bytes,
205 : }
206 :
207 : #[derive(Debug)]
208 : pub struct PagestreamErrorResponse {
209 : pub req: PagestreamRequest,
210 : pub message: String,
211 : }
212 :
213 : #[derive(Debug)]
214 : pub struct PagestreamDbSizeResponse {
215 : pub req: PagestreamDbSizeRequest,
216 : pub db_size: i64,
217 : }
218 :
219 : #[cfg(feature = "testing")]
220 : #[derive(Debug, PartialEq, Eq, Clone)]
221 : pub struct PagestreamTestRequest {
222 : pub hdr: PagestreamRequest,
223 : pub batch_key: u64,
224 : pub message: String,
225 : }
226 :
227 : #[cfg(feature = "testing")]
228 : #[derive(Debug)]
229 : pub struct PagestreamTestResponse {
230 : pub req: PagestreamTestRequest,
231 : }
232 :
233 : impl PagestreamFeMessage {
234 : /// Serialize a compute -> pageserver message. This is currently only used in testing
235 : /// tools. Always uses protocol version 3.
236 4 : pub fn serialize(&self) -> Bytes {
237 4 : let mut bytes = BytesMut::new();
238 :
239 4 : match self {
240 1 : Self::Exists(req) => {
241 1 : bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
242 1 : bytes.put_u64(req.hdr.reqid);
243 1 : bytes.put_u64(req.hdr.request_lsn.0);
244 1 : bytes.put_u64(req.hdr.not_modified_since.0);
245 1 : bytes.put_u32(req.rel.spcnode);
246 1 : bytes.put_u32(req.rel.dbnode);
247 1 : bytes.put_u32(req.rel.relnode);
248 1 : bytes.put_u8(req.rel.forknum);
249 1 : }
250 :
251 1 : Self::Nblocks(req) => {
252 1 : bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
253 1 : bytes.put_u64(req.hdr.reqid);
254 1 : bytes.put_u64(req.hdr.request_lsn.0);
255 1 : bytes.put_u64(req.hdr.not_modified_since.0);
256 1 : bytes.put_u32(req.rel.spcnode);
257 1 : bytes.put_u32(req.rel.dbnode);
258 1 : bytes.put_u32(req.rel.relnode);
259 1 : bytes.put_u8(req.rel.forknum);
260 1 : }
261 :
262 1 : Self::GetPage(req) => {
263 1 : bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
264 1 : bytes.put_u64(req.hdr.reqid);
265 1 : bytes.put_u64(req.hdr.request_lsn.0);
266 1 : bytes.put_u64(req.hdr.not_modified_since.0);
267 1 : bytes.put_u32(req.rel.spcnode);
268 1 : bytes.put_u32(req.rel.dbnode);
269 1 : bytes.put_u32(req.rel.relnode);
270 1 : bytes.put_u8(req.rel.forknum);
271 1 : bytes.put_u32(req.blkno);
272 1 : }
273 :
274 1 : Self::DbSize(req) => {
275 1 : bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
276 1 : bytes.put_u64(req.hdr.reqid);
277 1 : bytes.put_u64(req.hdr.request_lsn.0);
278 1 : bytes.put_u64(req.hdr.not_modified_since.0);
279 1 : bytes.put_u32(req.dbnode);
280 1 : }
281 :
282 0 : Self::GetSlruSegment(req) => {
283 0 : bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
284 0 : bytes.put_u64(req.hdr.reqid);
285 0 : bytes.put_u64(req.hdr.request_lsn.0);
286 0 : bytes.put_u64(req.hdr.not_modified_since.0);
287 0 : bytes.put_u8(req.kind);
288 0 : bytes.put_u32(req.segno);
289 0 : }
290 : #[cfg(feature = "testing")]
291 0 : Self::Test(req) => {
292 0 : bytes.put_u8(PagestreamFeMessageTag::Test as u8);
293 0 : bytes.put_u64(req.hdr.reqid);
294 0 : bytes.put_u64(req.hdr.request_lsn.0);
295 0 : bytes.put_u64(req.hdr.not_modified_since.0);
296 0 : bytes.put_u64(req.batch_key);
297 0 : let message = req.message.as_bytes();
298 0 : bytes.put_u64(message.len() as u64);
299 0 : bytes.put_slice(message);
300 0 : }
301 : }
302 :
303 4 : bytes.into()
304 4 : }
305 :
306 4 : pub fn parse<R: std::io::Read>(
307 4 : body: &mut R,
308 4 : protocol_version: PagestreamProtocolVersion,
309 4 : ) -> anyhow::Result<PagestreamFeMessage> {
310 : // these correspond to the NeonMessageTag enum in pagestore_client.h
311 : //
312 : // TODO: consider using protobuf or serde bincode for less error prone
313 : // serialization.
314 4 : let msg_tag = body.read_u8()?;
315 4 : let (reqid, request_lsn, not_modified_since) = match protocol_version {
316 : PagestreamProtocolVersion::V2 => (
317 : 0,
318 0 : Lsn::from(body.read_u64::<BigEndian>()?),
319 0 : Lsn::from(body.read_u64::<BigEndian>()?),
320 : ),
321 : PagestreamProtocolVersion::V3 => (
322 4 : body.read_u64::<BigEndian>()?,
323 4 : Lsn::from(body.read_u64::<BigEndian>()?),
324 4 : Lsn::from(body.read_u64::<BigEndian>()?),
325 : ),
326 : };
327 :
328 4 : match PagestreamFeMessageTag::try_from(msg_tag)
329 4 : .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
330 : {
331 : PagestreamFeMessageTag::Exists => {
332 : Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
333 1 : hdr: PagestreamRequest {
334 1 : reqid,
335 1 : request_lsn,
336 1 : not_modified_since,
337 1 : },
338 : rel: RelTag {
339 1 : spcnode: body.read_u32::<BigEndian>()?,
340 1 : dbnode: body.read_u32::<BigEndian>()?,
341 1 : relnode: body.read_u32::<BigEndian>()?,
342 1 : forknum: body.read_u8()?,
343 : },
344 : }))
345 : }
346 : PagestreamFeMessageTag::Nblocks => {
347 : Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
348 1 : hdr: PagestreamRequest {
349 1 : reqid,
350 1 : request_lsn,
351 1 : not_modified_since,
352 1 : },
353 : rel: RelTag {
354 1 : spcnode: body.read_u32::<BigEndian>()?,
355 1 : dbnode: body.read_u32::<BigEndian>()?,
356 1 : relnode: body.read_u32::<BigEndian>()?,
357 1 : forknum: body.read_u8()?,
358 : },
359 : }))
360 : }
361 : PagestreamFeMessageTag::GetPage => {
362 : Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
363 1 : hdr: PagestreamRequest {
364 1 : reqid,
365 1 : request_lsn,
366 1 : not_modified_since,
367 1 : },
368 : rel: RelTag {
369 1 : spcnode: body.read_u32::<BigEndian>()?,
370 1 : dbnode: body.read_u32::<BigEndian>()?,
371 1 : relnode: body.read_u32::<BigEndian>()?,
372 1 : forknum: body.read_u8()?,
373 : },
374 1 : blkno: body.read_u32::<BigEndian>()?,
375 : }))
376 : }
377 : PagestreamFeMessageTag::DbSize => {
378 : Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
379 1 : hdr: PagestreamRequest {
380 1 : reqid,
381 1 : request_lsn,
382 1 : not_modified_since,
383 1 : },
384 1 : dbnode: body.read_u32::<BigEndian>()?,
385 : }))
386 : }
387 : PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
388 : PagestreamGetSlruSegmentRequest {
389 0 : hdr: PagestreamRequest {
390 0 : reqid,
391 0 : request_lsn,
392 0 : not_modified_since,
393 0 : },
394 0 : kind: body.read_u8()?,
395 0 : segno: body.read_u32::<BigEndian>()?,
396 : },
397 : )),
398 : #[cfg(feature = "testing")]
399 : PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
400 0 : hdr: PagestreamRequest {
401 0 : reqid,
402 0 : request_lsn,
403 0 : not_modified_since,
404 0 : },
405 0 : batch_key: body.read_u64::<BigEndian>()?,
406 : message: {
407 0 : let len = body.read_u64::<BigEndian>()?;
408 0 : let mut buf = vec![0; len as usize];
409 0 : body.read_exact(&mut buf)?;
410 0 : String::from_utf8(buf)?
411 : },
412 : })),
413 : }
414 4 : }
415 : }
416 :
417 : impl PagestreamBeMessage {
418 0 : pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
419 0 : let mut bytes = BytesMut::new();
420 :
421 : use PagestreamBeMessageTag as Tag;
422 0 : match protocol_version {
423 : PagestreamProtocolVersion::V2 => {
424 0 : match self {
425 0 : Self::Exists(resp) => {
426 0 : bytes.put_u8(Tag::Exists as u8);
427 0 : bytes.put_u8(resp.exists as u8);
428 0 : }
429 :
430 0 : Self::Nblocks(resp) => {
431 0 : bytes.put_u8(Tag::Nblocks as u8);
432 0 : bytes.put_u32(resp.n_blocks);
433 0 : }
434 :
435 0 : Self::GetPage(resp) => {
436 0 : bytes.put_u8(Tag::GetPage as u8);
437 0 : bytes.put(&resp.page[..])
438 : }
439 :
440 0 : Self::Error(resp) => {
441 0 : bytes.put_u8(Tag::Error as u8);
442 0 : bytes.put(resp.message.as_bytes());
443 0 : bytes.put_u8(0); // null terminator
444 0 : }
445 0 : Self::DbSize(resp) => {
446 0 : bytes.put_u8(Tag::DbSize as u8);
447 0 : bytes.put_i64(resp.db_size);
448 0 : }
449 :
450 0 : Self::GetSlruSegment(resp) => {
451 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
452 0 : bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
453 0 : bytes.put(&resp.segment[..]);
454 0 : }
455 :
456 : #[cfg(feature = "testing")]
457 0 : Self::Test(resp) => {
458 0 : bytes.put_u8(Tag::Test as u8);
459 0 : bytes.put_u64(resp.req.batch_key);
460 0 : let message = resp.req.message.as_bytes();
461 0 : bytes.put_u64(message.len() as u64);
462 0 : bytes.put_slice(message);
463 0 : }
464 : }
465 : }
466 : PagestreamProtocolVersion::V3 => {
467 0 : match self {
468 0 : Self::Exists(resp) => {
469 0 : bytes.put_u8(Tag::Exists as u8);
470 0 : bytes.put_u64(resp.req.hdr.reqid);
471 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
472 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
473 0 : bytes.put_u32(resp.req.rel.spcnode);
474 0 : bytes.put_u32(resp.req.rel.dbnode);
475 0 : bytes.put_u32(resp.req.rel.relnode);
476 0 : bytes.put_u8(resp.req.rel.forknum);
477 0 : bytes.put_u8(resp.exists as u8);
478 0 : }
479 :
480 0 : Self::Nblocks(resp) => {
481 0 : bytes.put_u8(Tag::Nblocks as u8);
482 0 : bytes.put_u64(resp.req.hdr.reqid);
483 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
484 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
485 0 : bytes.put_u32(resp.req.rel.spcnode);
486 0 : bytes.put_u32(resp.req.rel.dbnode);
487 0 : bytes.put_u32(resp.req.rel.relnode);
488 0 : bytes.put_u8(resp.req.rel.forknum);
489 0 : bytes.put_u32(resp.n_blocks);
490 0 : }
491 :
492 0 : Self::GetPage(resp) => {
493 0 : bytes.put_u8(Tag::GetPage as u8);
494 0 : bytes.put_u64(resp.req.hdr.reqid);
495 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
496 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
497 0 : bytes.put_u32(resp.req.rel.spcnode);
498 0 : bytes.put_u32(resp.req.rel.dbnode);
499 0 : bytes.put_u32(resp.req.rel.relnode);
500 0 : bytes.put_u8(resp.req.rel.forknum);
501 0 : bytes.put_u32(resp.req.blkno);
502 0 : bytes.put(&resp.page[..])
503 : }
504 :
505 0 : Self::Error(resp) => {
506 0 : bytes.put_u8(Tag::Error as u8);
507 0 : bytes.put_u64(resp.req.reqid);
508 0 : bytes.put_u64(resp.req.request_lsn.0);
509 0 : bytes.put_u64(resp.req.not_modified_since.0);
510 0 : bytes.put(resp.message.as_bytes());
511 0 : bytes.put_u8(0); // null terminator
512 0 : }
513 0 : Self::DbSize(resp) => {
514 0 : bytes.put_u8(Tag::DbSize as u8);
515 0 : bytes.put_u64(resp.req.hdr.reqid);
516 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
517 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
518 0 : bytes.put_u32(resp.req.dbnode);
519 0 : bytes.put_i64(resp.db_size);
520 0 : }
521 :
522 0 : Self::GetSlruSegment(resp) => {
523 0 : bytes.put_u8(Tag::GetSlruSegment as u8);
524 0 : bytes.put_u64(resp.req.hdr.reqid);
525 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
526 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
527 0 : bytes.put_u8(resp.req.kind);
528 0 : bytes.put_u32(resp.req.segno);
529 0 : bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
530 0 : bytes.put(&resp.segment[..]);
531 0 : }
532 :
533 : #[cfg(feature = "testing")]
534 0 : Self::Test(resp) => {
535 0 : bytes.put_u8(Tag::Test as u8);
536 0 : bytes.put_u64(resp.req.hdr.reqid);
537 0 : bytes.put_u64(resp.req.hdr.request_lsn.0);
538 0 : bytes.put_u64(resp.req.hdr.not_modified_since.0);
539 0 : bytes.put_u64(resp.req.batch_key);
540 0 : let message = resp.req.message.as_bytes();
541 0 : bytes.put_u64(message.len() as u64);
542 0 : bytes.put_slice(message);
543 0 : }
544 : }
545 : }
546 : }
547 0 : bytes.into()
548 0 : }
549 :
550 0 : pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
551 0 : let mut buf = buf.reader();
552 0 : let msg_tag = buf.read_u8()?;
553 :
554 : use PagestreamBeMessageTag as Tag;
555 0 : let ok =
556 0 : match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
557 : Tag::Exists => {
558 0 : let reqid = buf.read_u64::<BigEndian>()?;
559 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
560 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
561 0 : let rel = RelTag {
562 0 : spcnode: buf.read_u32::<BigEndian>()?,
563 0 : dbnode: buf.read_u32::<BigEndian>()?,
564 0 : relnode: buf.read_u32::<BigEndian>()?,
565 0 : forknum: buf.read_u8()?,
566 : };
567 0 : let exists = buf.read_u8()? != 0;
568 0 : Self::Exists(PagestreamExistsResponse {
569 0 : req: PagestreamExistsRequest {
570 0 : hdr: PagestreamRequest {
571 0 : reqid,
572 0 : request_lsn,
573 0 : not_modified_since,
574 0 : },
575 0 : rel,
576 0 : },
577 0 : exists,
578 0 : })
579 : }
580 : Tag::Nblocks => {
581 0 : let reqid = buf.read_u64::<BigEndian>()?;
582 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
583 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
584 0 : let rel = RelTag {
585 0 : spcnode: buf.read_u32::<BigEndian>()?,
586 0 : dbnode: buf.read_u32::<BigEndian>()?,
587 0 : relnode: buf.read_u32::<BigEndian>()?,
588 0 : forknum: buf.read_u8()?,
589 : };
590 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
591 0 : Self::Nblocks(PagestreamNblocksResponse {
592 0 : req: PagestreamNblocksRequest {
593 0 : hdr: PagestreamRequest {
594 0 : reqid,
595 0 : request_lsn,
596 0 : not_modified_since,
597 0 : },
598 0 : rel,
599 0 : },
600 0 : n_blocks,
601 0 : })
602 : }
603 : Tag::GetPage => {
604 0 : let reqid = buf.read_u64::<BigEndian>()?;
605 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
606 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
607 0 : let rel = RelTag {
608 0 : spcnode: buf.read_u32::<BigEndian>()?,
609 0 : dbnode: buf.read_u32::<BigEndian>()?,
610 0 : relnode: buf.read_u32::<BigEndian>()?,
611 0 : forknum: buf.read_u8()?,
612 : };
613 0 : let blkno = buf.read_u32::<BigEndian>()?;
614 0 : let mut page = vec![0; 8192]; // TODO: use MaybeUninit
615 0 : buf.read_exact(&mut page)?;
616 0 : Self::GetPage(PagestreamGetPageResponse {
617 0 : req: PagestreamGetPageRequest {
618 0 : hdr: PagestreamRequest {
619 0 : reqid,
620 0 : request_lsn,
621 0 : not_modified_since,
622 0 : },
623 0 : rel,
624 0 : blkno,
625 0 : },
626 0 : page: page.into(),
627 0 : })
628 : }
629 : Tag::Error => {
630 0 : let reqid = buf.read_u64::<BigEndian>()?;
631 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
632 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
633 0 : let mut msg = Vec::new();
634 0 : buf.read_until(0, &mut msg)?;
635 0 : let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
636 0 : let rust_str = cstring.to_str()?;
637 0 : Self::Error(PagestreamErrorResponse {
638 0 : req: PagestreamRequest {
639 0 : reqid,
640 0 : request_lsn,
641 0 : not_modified_since,
642 0 : },
643 0 : message: rust_str.to_owned(),
644 0 : })
645 : }
646 : Tag::DbSize => {
647 0 : let reqid = buf.read_u64::<BigEndian>()?;
648 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
649 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
650 0 : let dbnode = buf.read_u32::<BigEndian>()?;
651 0 : let db_size = buf.read_i64::<BigEndian>()?;
652 0 : Self::DbSize(PagestreamDbSizeResponse {
653 0 : req: PagestreamDbSizeRequest {
654 0 : hdr: PagestreamRequest {
655 0 : reqid,
656 0 : request_lsn,
657 0 : not_modified_since,
658 0 : },
659 0 : dbnode,
660 0 : },
661 0 : db_size,
662 0 : })
663 : }
664 : Tag::GetSlruSegment => {
665 0 : let reqid = buf.read_u64::<BigEndian>()?;
666 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
667 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
668 0 : let kind = buf.read_u8()?;
669 0 : let segno = buf.read_u32::<BigEndian>()?;
670 0 : let n_blocks = buf.read_u32::<BigEndian>()?;
671 0 : let mut segment = vec![0; n_blocks as usize * BLCKSZ];
672 0 : buf.read_exact(&mut segment)?;
673 0 : Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
674 0 : req: PagestreamGetSlruSegmentRequest {
675 0 : hdr: PagestreamRequest {
676 0 : reqid,
677 0 : request_lsn,
678 0 : not_modified_since,
679 0 : },
680 0 : kind,
681 0 : segno,
682 0 : },
683 0 : segment: segment.into(),
684 0 : })
685 : }
686 : #[cfg(feature = "testing")]
687 : Tag::Test => {
688 0 : let reqid = buf.read_u64::<BigEndian>()?;
689 0 : let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
690 0 : let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
691 0 : let batch_key = buf.read_u64::<BigEndian>()?;
692 0 : let len = buf.read_u64::<BigEndian>()?;
693 0 : let mut msg = vec![0; len as usize];
694 0 : buf.read_exact(&mut msg)?;
695 0 : let message = String::from_utf8(msg)?;
696 0 : Self::Test(PagestreamTestResponse {
697 0 : req: PagestreamTestRequest {
698 0 : hdr: PagestreamRequest {
699 0 : reqid,
700 0 : request_lsn,
701 0 : not_modified_since,
702 0 : },
703 0 : batch_key,
704 0 : message,
705 0 : },
706 0 : })
707 : }
708 : };
709 0 : let remaining = buf.into_inner();
710 0 : if !remaining.is_empty() {
711 0 : anyhow::bail!(
712 0 : "remaining bytes in msg with tag={msg_tag}: {}",
713 0 : remaining.len()
714 : );
715 0 : }
716 0 : Ok(ok)
717 0 : }
718 :
719 0 : pub fn kind(&self) -> &'static str {
720 0 : match self {
721 0 : Self::Exists(_) => "Exists",
722 0 : Self::Nblocks(_) => "Nblocks",
723 0 : Self::GetPage(_) => "GetPage",
724 0 : Self::Error(_) => "Error",
725 0 : Self::DbSize(_) => "DbSize",
726 0 : Self::GetSlruSegment(_) => "GetSlruSegment",
727 : #[cfg(feature = "testing")]
728 0 : Self::Test(_) => "Test",
729 : }
730 0 : }
731 : }
732 :
733 : #[cfg(test)]
734 : mod tests {
735 : use super::*;
736 :
737 : #[test]
738 1 : fn test_pagestream() {
739 : // Test serialization/deserialization of PagestreamFeMessage
740 1 : let messages = vec![
741 1 : PagestreamFeMessage::Exists(PagestreamExistsRequest {
742 1 : hdr: PagestreamRequest {
743 1 : reqid: 0,
744 1 : request_lsn: Lsn(4),
745 1 : not_modified_since: Lsn(3),
746 1 : },
747 1 : rel: RelTag {
748 1 : forknum: 1,
749 1 : spcnode: 2,
750 1 : dbnode: 3,
751 1 : relnode: 4,
752 1 : },
753 1 : }),
754 1 : PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
755 1 : hdr: PagestreamRequest {
756 1 : reqid: 0,
757 1 : request_lsn: Lsn(4),
758 1 : not_modified_since: Lsn(4),
759 1 : },
760 1 : rel: RelTag {
761 1 : forknum: 1,
762 1 : spcnode: 2,
763 1 : dbnode: 3,
764 1 : relnode: 4,
765 1 : },
766 1 : }),
767 1 : PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
768 1 : hdr: PagestreamRequest {
769 1 : reqid: 0,
770 1 : request_lsn: Lsn(4),
771 1 : not_modified_since: Lsn(3),
772 1 : },
773 1 : rel: RelTag {
774 1 : forknum: 1,
775 1 : spcnode: 2,
776 1 : dbnode: 3,
777 1 : relnode: 4,
778 1 : },
779 1 : blkno: 7,
780 1 : }),
781 1 : PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
782 1 : hdr: PagestreamRequest {
783 1 : reqid: 0,
784 1 : request_lsn: Lsn(4),
785 1 : not_modified_since: Lsn(3),
786 1 : },
787 1 : dbnode: 7,
788 1 : }),
789 : ];
790 5 : for msg in messages {
791 4 : let bytes = msg.serialize();
792 4 : let reconstructed =
793 4 : PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
794 4 : .unwrap();
795 4 : assert!(msg == reconstructed);
796 : }
797 1 : }
798 : }
|