Line data Source code
1 : //! Structs representing the canonical page service API.
2 : //!
3 : //! These mirror the autogenerated Protobuf types. The differences are:
4 : //!
5 : //! - Types that are in fact required by the API are not Options. The protobuf "required"
6 : //! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
7 : //! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
8 : //!
9 : //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
10 : //!
11 : //! - Validate protocol invariants, via try_from() and try_into().
12 :
13 : use bytes::Bytes;
14 : use postgres_ffi::Oid;
15 : use smallvec::SmallVec;
16 : // TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid
17 : // pulling in all of their other crate dependencies when building the client.
18 : use utils::lsn::Lsn;
19 :
20 : use crate::proto;
21 :
22 : /// A protocol error. Typically returned via try_from() or try_into().
23 : #[derive(thiserror::Error, Debug)]
24 : pub enum ProtocolError {
25 : #[error("field '{0}' has invalid value '{1}'")]
26 : Invalid(&'static str, String),
27 : #[error("required field '{0}' is missing")]
28 : Missing(&'static str),
29 : }
30 :
31 : impl ProtocolError {
32 : /// Helper to generate a new ProtocolError::Invalid for the given field and value.
33 0 : pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
34 0 : Self::Invalid(field, format!("{value:?}"))
35 0 : }
36 : }
37 :
38 : /// The LSN a request should read at.
39 : #[derive(Clone, Copy, Debug)]
40 : pub struct ReadLsn {
41 : /// The request's read LSN.
42 : pub request_lsn: Lsn,
43 : /// If given, the caller guarantees that the page has not been modified since this LSN. Must be
44 : /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
45 : /// without waiting for the request LSN to arrive. Valid for all request types.
46 : ///
47 : /// It is undefined behaviour to make a request such that the page was, in fact, modified
48 : /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
49 : /// error, or it might return the old page version or the new page version. Setting
50 : /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
51 : /// waiting.
52 : pub not_modified_since_lsn: Option<Lsn>,
53 : }
54 :
55 : impl ReadLsn {
56 : /// Validates the ReadLsn.
57 0 : pub fn validate(&self) -> Result<(), ProtocolError> {
58 0 : if self.request_lsn == Lsn::INVALID {
59 0 : return Err(ProtocolError::invalid("request_lsn", self.request_lsn));
60 0 : }
61 0 : if self.not_modified_since_lsn > Some(self.request_lsn) {
62 0 : return Err(ProtocolError::invalid(
63 0 : "not_modified_since_lsn",
64 0 : self.not_modified_since_lsn,
65 0 : ));
66 0 : }
67 0 : Ok(())
68 0 : }
69 : }
70 :
71 : impl TryFrom<proto::ReadLsn> for ReadLsn {
72 : type Error = ProtocolError;
73 :
74 0 : fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
75 0 : let read_lsn = Self {
76 0 : request_lsn: Lsn(pb.request_lsn),
77 0 : not_modified_since_lsn: match pb.not_modified_since_lsn {
78 0 : 0 => None,
79 0 : lsn => Some(Lsn(lsn)),
80 : },
81 : };
82 0 : read_lsn.validate()?;
83 0 : Ok(read_lsn)
84 0 : }
85 : }
86 :
87 : impl TryFrom<ReadLsn> for proto::ReadLsn {
88 : type Error = ProtocolError;
89 :
90 0 : fn try_from(read_lsn: ReadLsn) -> Result<Self, Self::Error> {
91 0 : read_lsn.validate()?;
92 0 : Ok(Self {
93 0 : request_lsn: read_lsn.request_lsn.0,
94 0 : not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
95 0 : })
96 0 : }
97 : }
98 :
99 : // RelTag is defined in pageserver_api::reltag.
100 : pub type RelTag = pageserver_api::reltag::RelTag;
101 :
102 : impl TryFrom<proto::RelTag> for RelTag {
103 : type Error = ProtocolError;
104 :
105 0 : fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
106 0 : Ok(Self {
107 0 : spcnode: pb.spc_oid,
108 0 : dbnode: pb.db_oid,
109 0 : relnode: pb.rel_number,
110 0 : forknum: pb
111 0 : .fork_number
112 0 : .try_into()
113 0 : .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
114 : })
115 0 : }
116 : }
117 :
118 : impl From<RelTag> for proto::RelTag {
119 0 : fn from(rel_tag: RelTag) -> Self {
120 0 : Self {
121 0 : spc_oid: rel_tag.spcnode,
122 0 : db_oid: rel_tag.dbnode,
123 0 : rel_number: rel_tag.relnode,
124 0 : fork_number: rel_tag.forknum as u32,
125 0 : }
126 0 : }
127 : }
128 :
129 : /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
130 : #[derive(Clone, Copy, Debug)]
131 : pub struct CheckRelExistsRequest {
132 : pub read_lsn: ReadLsn,
133 : pub rel: RelTag,
134 : }
135 :
136 : impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
137 : type Error = ProtocolError;
138 :
139 0 : fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
140 0 : Ok(Self {
141 0 : read_lsn: pb
142 0 : .read_lsn
143 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
144 0 : .try_into()?,
145 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
146 : })
147 0 : }
148 : }
149 :
150 : pub type CheckRelExistsResponse = bool;
151 :
152 : impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
153 0 : fn from(pb: proto::CheckRelExistsResponse) -> Self {
154 0 : pb.exists
155 0 : }
156 : }
157 :
158 : impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
159 0 : fn from(exists: CheckRelExistsResponse) -> Self {
160 0 : Self { exists }
161 0 : }
162 : }
163 :
164 : /// Requests a base backup at a given LSN.
165 : #[derive(Clone, Copy, Debug)]
166 : pub struct GetBaseBackupRequest {
167 : /// The LSN to fetch a base backup at.
168 : pub read_lsn: ReadLsn,
169 : /// If true, logical replication slots will not be created.
170 : pub replica: bool,
171 : }
172 :
173 : impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
174 : type Error = ProtocolError;
175 :
176 0 : fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
177 0 : Ok(Self {
178 0 : read_lsn: pb
179 0 : .read_lsn
180 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
181 0 : .try_into()?,
182 0 : replica: pb.replica,
183 : })
184 0 : }
185 : }
186 :
187 : impl TryFrom<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
188 : type Error = ProtocolError;
189 :
190 0 : fn try_from(request: GetBaseBackupRequest) -> Result<Self, Self::Error> {
191 0 : Ok(Self {
192 0 : read_lsn: Some(request.read_lsn.try_into()?),
193 0 : replica: request.replica,
194 : })
195 0 : }
196 : }
197 :
198 : pub type GetBaseBackupResponseChunk = Bytes;
199 :
200 : impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
201 : type Error = ProtocolError;
202 :
203 0 : fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
204 0 : if pb.chunk.is_empty() {
205 0 : return Err(ProtocolError::Missing("chunk"));
206 0 : }
207 0 : Ok(pb.chunk)
208 0 : }
209 : }
210 :
211 : impl TryFrom<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
212 : type Error = ProtocolError;
213 :
214 0 : fn try_from(chunk: GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
215 0 : if chunk.is_empty() {
216 0 : return Err(ProtocolError::Missing("chunk"));
217 0 : }
218 0 : Ok(Self { chunk })
219 0 : }
220 : }
221 :
222 : /// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
223 : #[derive(Clone, Copy, Debug)]
224 : pub struct GetDbSizeRequest {
225 : pub read_lsn: ReadLsn,
226 : pub db_oid: Oid,
227 : }
228 :
229 : impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
230 : type Error = ProtocolError;
231 :
232 0 : fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
233 0 : Ok(Self {
234 0 : read_lsn: pb
235 0 : .read_lsn
236 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
237 0 : .try_into()?,
238 0 : db_oid: pb.db_oid,
239 : })
240 0 : }
241 : }
242 :
243 : impl TryFrom<GetDbSizeRequest> for proto::GetDbSizeRequest {
244 : type Error = ProtocolError;
245 :
246 0 : fn try_from(request: GetDbSizeRequest) -> Result<Self, Self::Error> {
247 0 : Ok(Self {
248 0 : read_lsn: Some(request.read_lsn.try_into()?),
249 0 : db_oid: request.db_oid,
250 : })
251 0 : }
252 : }
253 :
254 : pub type GetDbSizeResponse = u64;
255 :
256 : impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
257 0 : fn from(pb: proto::GetDbSizeResponse) -> Self {
258 0 : pb.num_bytes
259 0 : }
260 : }
261 :
262 : impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
263 0 : fn from(num_bytes: GetDbSizeResponse) -> Self {
264 0 : Self { num_bytes }
265 0 : }
266 : }
267 :
268 : /// Requests one or more pages.
269 : #[derive(Clone, Debug)]
270 : pub struct GetPageRequest {
271 : /// A request ID. Will be included in the response. Should be unique for in-flight requests on
272 : /// the stream.
273 : pub request_id: RequestID,
274 : /// The request class.
275 : pub request_class: GetPageClass,
276 : /// The LSN to read at.
277 : pub read_lsn: ReadLsn,
278 : /// The relation to read from.
279 : pub rel: RelTag,
280 : /// Page numbers to read. Must belong to the remote shard.
281 : ///
282 : /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
283 : /// costs and parallelizing them. This may increase the latency of any individual request, but
284 : /// improves the overall latency and throughput of the batch as a whole.
285 : pub block_numbers: SmallVec<[u32; 1]>,
286 : }
287 :
288 : impl TryFrom<proto::GetPageRequest> for GetPageRequest {
289 : type Error = ProtocolError;
290 :
291 0 : fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
292 0 : if pb.block_number.is_empty() {
293 0 : return Err(ProtocolError::Missing("block_number"));
294 0 : }
295 0 : Ok(Self {
296 0 : request_id: pb.request_id,
297 0 : request_class: pb.request_class.into(),
298 0 : read_lsn: pb
299 0 : .read_lsn
300 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
301 0 : .try_into()?,
302 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
303 0 : block_numbers: pb.block_number.into(),
304 : })
305 0 : }
306 : }
307 :
308 : impl TryFrom<GetPageRequest> for proto::GetPageRequest {
309 : type Error = ProtocolError;
310 :
311 0 : fn try_from(request: GetPageRequest) -> Result<Self, Self::Error> {
312 0 : if request.block_numbers.is_empty() {
313 0 : return Err(ProtocolError::Missing("block_number"));
314 0 : }
315 0 : Ok(Self {
316 0 : request_id: request.request_id,
317 0 : request_class: request.request_class.into(),
318 0 : read_lsn: Some(request.read_lsn.try_into()?),
319 0 : rel: Some(request.rel.into()),
320 0 : block_number: request.block_numbers.into_vec(),
321 : })
322 0 : }
323 : }
324 :
325 : /// A GetPage request ID.
326 : pub type RequestID = u64;
327 :
328 : /// A GetPage request class.
329 : #[derive(Clone, Copy, Debug)]
330 : pub enum GetPageClass {
331 : /// Unknown status. For backwards compatibility: used when an older client version sends a class
332 : /// that a newer server version has removed.
333 : Unknown,
334 : /// A normal request. This is the default.
335 : Normal,
336 : /// A prefetch request. NB: can only be classified on pg < 18.
337 : Prefetch,
338 : /// A background request (e.g. vacuum).
339 : Background,
340 : }
341 :
342 : impl From<proto::GetPageClass> for GetPageClass {
343 0 : fn from(pb: proto::GetPageClass) -> Self {
344 0 : match pb {
345 0 : proto::GetPageClass::Unknown => Self::Unknown,
346 0 : proto::GetPageClass::Normal => Self::Normal,
347 0 : proto::GetPageClass::Prefetch => Self::Prefetch,
348 0 : proto::GetPageClass::Background => Self::Background,
349 : }
350 0 : }
351 : }
352 :
353 : impl From<i32> for GetPageClass {
354 0 : fn from(class: i32) -> Self {
355 0 : proto::GetPageClass::try_from(class)
356 0 : .unwrap_or(proto::GetPageClass::Unknown)
357 0 : .into()
358 0 : }
359 : }
360 :
361 : impl From<GetPageClass> for proto::GetPageClass {
362 0 : fn from(class: GetPageClass) -> Self {
363 0 : match class {
364 0 : GetPageClass::Unknown => Self::Unknown,
365 0 : GetPageClass::Normal => Self::Normal,
366 0 : GetPageClass::Prefetch => Self::Prefetch,
367 0 : GetPageClass::Background => Self::Background,
368 : }
369 0 : }
370 : }
371 :
372 : impl From<GetPageClass> for i32 {
373 0 : fn from(class: GetPageClass) -> Self {
374 0 : proto::GetPageClass::from(class).into()
375 0 : }
376 : }
377 :
378 : /// A GetPage response.
379 : ///
380 : /// A batch response will contain all of the requested pages. We could eagerly emit individual pages
381 : /// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
382 : /// batch and we'll only return once the entire batch is ready, so no one can make use of the
383 : /// individual pages.
384 : #[derive(Clone, Debug)]
385 : pub struct GetPageResponse {
386 : /// The original request's ID.
387 : pub request_id: RequestID,
388 : /// The response status code.
389 : pub status: GetPageStatus,
390 : /// A string describing the status, if any.
391 : pub reason: Option<String>,
392 : /// The 8KB page images, in the same order as the request. Empty if status != OK.
393 : pub page_images: SmallVec<[Bytes; 1]>,
394 : }
395 :
396 : impl From<proto::GetPageResponse> for GetPageResponse {
397 0 : fn from(pb: proto::GetPageResponse) -> Self {
398 0 : Self {
399 0 : request_id: pb.request_id,
400 0 : status: pb.status.into(),
401 0 : reason: Some(pb.reason).filter(|r| !r.is_empty()),
402 0 : page_images: pb.page_image.into(),
403 0 : }
404 0 : }
405 : }
406 :
407 : impl From<GetPageResponse> for proto::GetPageResponse {
408 0 : fn from(response: GetPageResponse) -> Self {
409 0 : Self {
410 0 : request_id: response.request_id,
411 0 : status: response.status.into(),
412 0 : reason: response.reason.unwrap_or_default(),
413 0 : page_image: response.page_images.into_vec(),
414 0 : }
415 0 : }
416 : }
417 :
418 : /// A GetPage response status.
419 : #[derive(Clone, Copy, Debug)]
420 : pub enum GetPageStatus {
421 : /// Unknown status. For forwards compatibility: used when an older client version receives a new
422 : /// status code from a newer server version.
423 : Unknown,
424 : /// The request was successful.
425 : Ok,
426 : /// The page did not exist. The tenant/timeline/shard has already been validated during stream
427 : /// setup.
428 : NotFound,
429 : /// The request was invalid.
430 : Invalid,
431 : /// The tenant is rate limited. Slow down and retry later.
432 : SlowDown,
433 : }
434 :
435 : impl From<proto::GetPageStatus> for GetPageStatus {
436 0 : fn from(pb: proto::GetPageStatus) -> Self {
437 0 : match pb {
438 0 : proto::GetPageStatus::Unknown => Self::Unknown,
439 0 : proto::GetPageStatus::Ok => Self::Ok,
440 0 : proto::GetPageStatus::NotFound => Self::NotFound,
441 0 : proto::GetPageStatus::Invalid => Self::Invalid,
442 0 : proto::GetPageStatus::SlowDown => Self::SlowDown,
443 : }
444 0 : }
445 : }
446 :
447 : impl From<i32> for GetPageStatus {
448 0 : fn from(status: i32) -> Self {
449 0 : proto::GetPageStatus::try_from(status)
450 0 : .unwrap_or(proto::GetPageStatus::Unknown)
451 0 : .into()
452 0 : }
453 : }
454 :
455 : impl From<GetPageStatus> for proto::GetPageStatus {
456 0 : fn from(status: GetPageStatus) -> Self {
457 0 : match status {
458 0 : GetPageStatus::Unknown => Self::Unknown,
459 0 : GetPageStatus::Ok => Self::Ok,
460 0 : GetPageStatus::NotFound => Self::NotFound,
461 0 : GetPageStatus::Invalid => Self::Invalid,
462 0 : GetPageStatus::SlowDown => Self::SlowDown,
463 : }
464 0 : }
465 : }
466 :
467 : impl From<GetPageStatus> for i32 {
468 0 : fn from(status: GetPageStatus) -> Self {
469 0 : proto::GetPageStatus::from(status).into()
470 0 : }
471 : }
472 :
473 : // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
474 : // shards will error.
475 : pub struct GetRelSizeRequest {
476 : pub read_lsn: ReadLsn,
477 : pub rel: RelTag,
478 : }
479 :
480 : impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
481 : type Error = ProtocolError;
482 :
483 0 : fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
484 0 : Ok(Self {
485 0 : read_lsn: proto
486 0 : .read_lsn
487 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
488 0 : .try_into()?,
489 0 : rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
490 : })
491 0 : }
492 : }
493 :
494 : impl TryFrom<GetRelSizeRequest> for proto::GetRelSizeRequest {
495 : type Error = ProtocolError;
496 :
497 0 : fn try_from(request: GetRelSizeRequest) -> Result<Self, Self::Error> {
498 0 : Ok(Self {
499 0 : read_lsn: Some(request.read_lsn.try_into()?),
500 0 : rel: Some(request.rel.into()),
501 : })
502 0 : }
503 : }
504 :
505 : pub type GetRelSizeResponse = u32;
506 :
507 : impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
508 0 : fn from(proto: proto::GetRelSizeResponse) -> Self {
509 0 : proto.num_blocks
510 0 : }
511 : }
512 :
513 : impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
514 0 : fn from(num_blocks: GetRelSizeResponse) -> Self {
515 0 : Self { num_blocks }
516 0 : }
517 : }
518 :
519 : /// Requests an SLRU segment. Only valid on shard 0, other shards will error.
520 : pub struct GetSlruSegmentRequest {
521 : pub read_lsn: ReadLsn,
522 : pub kind: SlruKind,
523 : pub segno: u32,
524 : }
525 :
526 : impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
527 : type Error = ProtocolError;
528 :
529 0 : fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
530 0 : Ok(Self {
531 0 : read_lsn: pb
532 0 : .read_lsn
533 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
534 0 : .try_into()?,
535 0 : kind: u8::try_from(pb.kind)
536 0 : .ok()
537 0 : .and_then(SlruKind::from_repr)
538 0 : .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
539 0 : segno: pb.segno,
540 : })
541 0 : }
542 : }
543 :
544 : impl TryFrom<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
545 : type Error = ProtocolError;
546 :
547 0 : fn try_from(request: GetSlruSegmentRequest) -> Result<Self, Self::Error> {
548 0 : Ok(Self {
549 0 : read_lsn: Some(request.read_lsn.try_into()?),
550 0 : kind: request.kind as u32,
551 0 : segno: request.segno,
552 : })
553 0 : }
554 : }
555 :
556 : pub type GetSlruSegmentResponse = Bytes;
557 :
558 : impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
559 : type Error = ProtocolError;
560 :
561 0 : fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
562 0 : if pb.segment.is_empty() {
563 0 : return Err(ProtocolError::Missing("segment"));
564 0 : }
565 0 : Ok(pb.segment)
566 0 : }
567 : }
568 :
569 : impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
570 : type Error = ProtocolError;
571 :
572 0 : fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
573 0 : if segment.is_empty() {
574 0 : return Err(ProtocolError::Missing("segment"));
575 0 : }
576 0 : Ok(Self { segment })
577 0 : }
578 : }
579 :
580 : // SlruKind is defined in pageserver_api::reltag.
581 : pub type SlruKind = pageserver_api::reltag::SlruKind;
|