Line data Source code
1 : //! Structs representing the canonical page service API.
2 : //!
3 : //! These mirror the autogenerated Protobuf types. The differences are:
4 : //!
5 : //! - Types that are in fact required by the API are not Options. The protobuf "required"
6 : //! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
7 : //! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
8 : //!
9 : //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
10 : //!
11 : //! - Validate protocol invariants, via try_from() and try_into().
12 : //!
13 : //! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain
14 : //! types. This is where it matters -- the Protobuf types are less strict than the domain types, and
15 : //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g.
16 : //! stream combinators without dealing with errors, and avoids validating the same message twice.
17 :
18 : use std::fmt::Display;
19 :
20 : use bytes::Bytes;
21 : use postgres_ffi::Oid;
22 : // TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid
23 : // pulling in all of their other crate dependencies when building the client.
24 : use utils::lsn::Lsn;
25 :
26 : use crate::proto;
27 :
28 : /// A protocol error. Typically returned via try_from() or try_into().
29 : #[derive(thiserror::Error, Debug)]
30 : pub enum ProtocolError {
31 : #[error("field '{0}' has invalid value '{1}'")]
32 : Invalid(&'static str, String),
33 : #[error("required field '{0}' is missing")]
34 : Missing(&'static str),
35 : }
36 :
37 : impl ProtocolError {
38 : /// Helper to generate a new ProtocolError::Invalid for the given field and value.
39 0 : pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
40 0 : Self::Invalid(field, format!("{value:?}"))
41 0 : }
42 : }
43 :
44 : impl From<ProtocolError> for tonic::Status {
45 0 : fn from(err: ProtocolError) -> Self {
46 0 : tonic::Status::invalid_argument(format!("{err}"))
47 0 : }
48 : }
49 :
50 : /// The LSN a request should read at.
51 : #[derive(Clone, Copy, Debug)]
52 : pub struct ReadLsn {
53 : /// The request's read LSN.
54 : pub request_lsn: Lsn,
55 : /// If given, the caller guarantees that the page has not been modified since this LSN. Must be
56 : /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
57 : /// without waiting for the request LSN to arrive. If not given, the request will read at the
58 : /// request_lsn and wait for it to arrive if necessary. Valid for all request types.
59 : ///
60 : /// It is undefined behaviour to make a request such that the page was, in fact, modified
61 : /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
62 : /// error, or it might return the old page version or the new page version. Setting
63 : /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
64 : /// waiting.
65 : pub not_modified_since_lsn: Option<Lsn>,
66 : }
67 :
68 : impl Display for ReadLsn {
69 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 0 : let req_lsn = self.request_lsn;
71 0 : if let Some(mod_lsn) = self.not_modified_since_lsn {
72 0 : write!(f, "{req_lsn}>={mod_lsn}")
73 : } else {
74 0 : req_lsn.fmt(f)
75 : }
76 0 : }
77 : }
78 :
79 : impl TryFrom<proto::ReadLsn> for ReadLsn {
80 : type Error = ProtocolError;
81 :
82 0 : fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
83 0 : if pb.request_lsn == 0 {
84 0 : return Err(ProtocolError::invalid("request_lsn", pb.request_lsn));
85 0 : }
86 0 : if pb.not_modified_since_lsn > pb.request_lsn {
87 0 : return Err(ProtocolError::invalid(
88 0 : "not_modified_since_lsn",
89 0 : pb.not_modified_since_lsn,
90 0 : ));
91 0 : }
92 0 : Ok(Self {
93 0 : request_lsn: Lsn(pb.request_lsn),
94 0 : not_modified_since_lsn: match pb.not_modified_since_lsn {
95 0 : 0 => None,
96 0 : lsn => Some(Lsn(lsn)),
97 : },
98 : })
99 0 : }
100 : }
101 :
102 : impl From<ReadLsn> for proto::ReadLsn {
103 0 : fn from(read_lsn: ReadLsn) -> Self {
104 0 : Self {
105 0 : request_lsn: read_lsn.request_lsn.0,
106 0 : not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
107 0 : }
108 0 : }
109 : }
110 :
111 : // RelTag is defined in pageserver_api::reltag.
112 : pub type RelTag = pageserver_api::reltag::RelTag;
113 :
114 : impl TryFrom<proto::RelTag> for RelTag {
115 : type Error = ProtocolError;
116 :
117 0 : fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
118 0 : Ok(Self {
119 0 : spcnode: pb.spc_oid,
120 0 : dbnode: pb.db_oid,
121 0 : relnode: pb.rel_number,
122 0 : forknum: pb
123 0 : .fork_number
124 0 : .try_into()
125 0 : .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
126 : })
127 0 : }
128 : }
129 :
130 : impl From<RelTag> for proto::RelTag {
131 0 : fn from(rel_tag: RelTag) -> Self {
132 0 : Self {
133 0 : spc_oid: rel_tag.spcnode,
134 0 : db_oid: rel_tag.dbnode,
135 0 : rel_number: rel_tag.relnode,
136 0 : fork_number: rel_tag.forknum as u32,
137 0 : }
138 0 : }
139 : }
140 :
141 : /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
142 : #[derive(Clone, Copy, Debug)]
143 : pub struct CheckRelExistsRequest {
144 : pub read_lsn: ReadLsn,
145 : pub rel: RelTag,
146 : }
147 :
148 : impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
149 : type Error = ProtocolError;
150 :
151 0 : fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
152 0 : Ok(Self {
153 0 : read_lsn: pb
154 0 : .read_lsn
155 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
156 0 : .try_into()?,
157 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
158 : })
159 0 : }
160 : }
161 :
162 : impl From<CheckRelExistsRequest> for proto::CheckRelExistsRequest {
163 0 : fn from(request: CheckRelExistsRequest) -> Self {
164 0 : Self {
165 0 : read_lsn: Some(request.read_lsn.into()),
166 0 : rel: Some(request.rel.into()),
167 0 : }
168 0 : }
169 : }
170 :
171 : pub type CheckRelExistsResponse = bool;
172 :
173 : impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
174 0 : fn from(pb: proto::CheckRelExistsResponse) -> Self {
175 0 : pb.exists
176 0 : }
177 : }
178 :
179 : impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
180 0 : fn from(exists: CheckRelExistsResponse) -> Self {
181 0 : Self { exists }
182 0 : }
183 : }
184 :
185 : /// Requests a base backup at a given LSN.
186 : #[derive(Clone, Copy, Debug)]
187 : pub struct GetBaseBackupRequest {
188 : /// The LSN to fetch a base backup at.
189 : pub read_lsn: ReadLsn,
190 : /// If true, logical replication slots will not be created.
191 : pub replica: bool,
192 : }
193 :
194 : impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
195 : type Error = ProtocolError;
196 :
197 0 : fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
198 0 : Ok(Self {
199 0 : read_lsn: pb
200 0 : .read_lsn
201 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
202 0 : .try_into()?,
203 0 : replica: pb.replica,
204 : })
205 0 : }
206 : }
207 :
208 : impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
209 0 : fn from(request: GetBaseBackupRequest) -> Self {
210 0 : Self {
211 0 : read_lsn: Some(request.read_lsn.into()),
212 0 : replica: request.replica,
213 0 : }
214 0 : }
215 : }
216 :
217 : pub type GetBaseBackupResponseChunk = Bytes;
218 :
219 : impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
220 : type Error = ProtocolError;
221 :
222 0 : fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
223 0 : if pb.chunk.is_empty() {
224 0 : return Err(ProtocolError::Missing("chunk"));
225 0 : }
226 0 : Ok(pb.chunk)
227 0 : }
228 : }
229 :
230 : impl From<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
231 0 : fn from(chunk: GetBaseBackupResponseChunk) -> Self {
232 0 : Self { chunk }
233 0 : }
234 : }
235 :
236 : /// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
237 : #[derive(Clone, Copy, Debug)]
238 : pub struct GetDbSizeRequest {
239 : pub read_lsn: ReadLsn,
240 : pub db_oid: Oid,
241 : }
242 :
243 : impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
244 : type Error = ProtocolError;
245 :
246 0 : fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
247 0 : Ok(Self {
248 0 : read_lsn: pb
249 0 : .read_lsn
250 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
251 0 : .try_into()?,
252 0 : db_oid: pb.db_oid,
253 : })
254 0 : }
255 : }
256 :
257 : impl From<GetDbSizeRequest> for proto::GetDbSizeRequest {
258 0 : fn from(request: GetDbSizeRequest) -> Self {
259 0 : Self {
260 0 : read_lsn: Some(request.read_lsn.into()),
261 0 : db_oid: request.db_oid,
262 0 : }
263 0 : }
264 : }
265 :
266 : pub type GetDbSizeResponse = u64;
267 :
268 : impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
269 0 : fn from(pb: proto::GetDbSizeResponse) -> Self {
270 0 : pb.num_bytes
271 0 : }
272 : }
273 :
274 : impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
275 0 : fn from(num_bytes: GetDbSizeResponse) -> Self {
276 0 : Self { num_bytes }
277 0 : }
278 : }
279 :
280 : /// Requests one or more pages.
281 : #[derive(Clone, Debug)]
282 : pub struct GetPageRequest {
283 : /// A request ID. Will be included in the response. Should be unique for in-flight requests on
284 : /// the stream.
285 : pub request_id: RequestID,
286 : /// The request class.
287 : pub request_class: GetPageClass,
288 : /// The LSN to read at.
289 : pub read_lsn: ReadLsn,
290 : /// The relation to read from.
291 : pub rel: RelTag,
292 : /// Page numbers to read. Must belong to the remote shard.
293 : ///
294 : /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
295 : /// costs and parallelizing them. This may increase the latency of any individual request, but
296 : /// improves the overall latency and throughput of the batch as a whole.
297 : pub block_numbers: Vec<u32>,
298 : }
299 :
300 : impl TryFrom<proto::GetPageRequest> for GetPageRequest {
301 : type Error = ProtocolError;
302 :
303 0 : fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
304 0 : if pb.block_number.is_empty() {
305 0 : return Err(ProtocolError::Missing("block_number"));
306 0 : }
307 0 : Ok(Self {
308 0 : request_id: pb.request_id,
309 0 : request_class: pb.request_class.into(),
310 0 : read_lsn: pb
311 0 : .read_lsn
312 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
313 0 : .try_into()?,
314 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
315 0 : block_numbers: pb.block_number,
316 : })
317 0 : }
318 : }
319 :
320 : impl From<GetPageRequest> for proto::GetPageRequest {
321 0 : fn from(request: GetPageRequest) -> Self {
322 0 : Self {
323 0 : request_id: request.request_id,
324 0 : request_class: request.request_class.into(),
325 0 : read_lsn: Some(request.read_lsn.into()),
326 0 : rel: Some(request.rel.into()),
327 0 : block_number: request.block_numbers,
328 0 : }
329 0 : }
330 : }
331 :
332 : /// A GetPage request ID.
333 : pub type RequestID = u64;
334 :
335 : /// A GetPage request class.
336 : #[derive(Clone, Copy, Debug)]
337 : pub enum GetPageClass {
338 : /// Unknown class. For backwards compatibility: used when an older client version sends a class
339 : /// that a newer server version has removed.
340 : Unknown,
341 : /// A normal request. This is the default.
342 : Normal,
343 : /// A prefetch request. NB: can only be classified on pg < 18.
344 : Prefetch,
345 : /// A background request (e.g. vacuum).
346 : Background,
347 : }
348 :
349 : impl From<proto::GetPageClass> for GetPageClass {
350 0 : fn from(pb: proto::GetPageClass) -> Self {
351 0 : match pb {
352 0 : proto::GetPageClass::Unknown => Self::Unknown,
353 0 : proto::GetPageClass::Normal => Self::Normal,
354 0 : proto::GetPageClass::Prefetch => Self::Prefetch,
355 0 : proto::GetPageClass::Background => Self::Background,
356 : }
357 0 : }
358 : }
359 :
360 : impl From<i32> for GetPageClass {
361 0 : fn from(class: i32) -> Self {
362 0 : proto::GetPageClass::try_from(class)
363 0 : .unwrap_or(proto::GetPageClass::Unknown)
364 0 : .into()
365 0 : }
366 : }
367 :
368 : impl From<GetPageClass> for proto::GetPageClass {
369 0 : fn from(class: GetPageClass) -> Self {
370 0 : match class {
371 0 : GetPageClass::Unknown => Self::Unknown,
372 0 : GetPageClass::Normal => Self::Normal,
373 0 : GetPageClass::Prefetch => Self::Prefetch,
374 0 : GetPageClass::Background => Self::Background,
375 : }
376 0 : }
377 : }
378 :
379 : impl From<GetPageClass> for i32 {
380 0 : fn from(class: GetPageClass) -> Self {
381 0 : proto::GetPageClass::from(class).into()
382 0 : }
383 : }
384 :
385 : /// A GetPage response.
386 : ///
387 : /// A batch response will contain all of the requested pages. We could eagerly emit individual pages
388 : /// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
389 : /// batch and we'll only return once the entire batch is ready, so no one can make use of the
390 : /// individual pages.
391 : #[derive(Clone, Debug)]
392 : pub struct GetPageResponse {
393 : /// The original request's ID.
394 : pub request_id: RequestID,
395 : /// The response status code.
396 : pub status_code: GetPageStatusCode,
397 : /// A string describing the status, if any.
398 : pub reason: Option<String>,
399 : /// The 8KB page images, in the same order as the request. Empty if status != OK.
400 : pub page_images: Vec<Bytes>,
401 : }
402 :
403 : impl From<proto::GetPageResponse> for GetPageResponse {
404 0 : fn from(pb: proto::GetPageResponse) -> Self {
405 0 : Self {
406 0 : request_id: pb.request_id,
407 0 : status_code: pb.status_code.into(),
408 0 : reason: Some(pb.reason).filter(|r| !r.is_empty()),
409 0 : page_images: pb.page_image,
410 0 : }
411 0 : }
412 : }
413 :
414 : impl From<GetPageResponse> for proto::GetPageResponse {
415 0 : fn from(response: GetPageResponse) -> Self {
416 0 : Self {
417 0 : request_id: response.request_id,
418 0 : status_code: response.status_code.into(),
419 0 : reason: response.reason.unwrap_or_default(),
420 0 : page_image: response.page_images,
421 0 : }
422 0 : }
423 : }
424 :
425 : /// A GetPage response status code.
426 : ///
427 : /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
428 : /// (potentially shared by many backends), and a gRPC status response would terminate the stream so
429 : /// we send GetPageResponse messages with these codes instead.
430 : #[derive(Clone, Copy, Debug)]
431 : pub enum GetPageStatusCode {
432 : /// Unknown status. For forwards compatibility: used when an older client version receives a new
433 : /// status code from a newer server version.
434 : Unknown,
435 : /// The request was successful.
436 : Ok,
437 : /// The page did not exist. The tenant/timeline/shard has already been validated during stream
438 : /// setup.
439 : NotFound,
440 : /// The request was invalid.
441 : InvalidRequest,
442 : /// The request failed due to an internal server error.
443 : InternalError,
444 : /// The tenant is rate limited. Slow down and retry later.
445 : SlowDown,
446 : }
447 :
448 : impl From<proto::GetPageStatusCode> for GetPageStatusCode {
449 0 : fn from(pb: proto::GetPageStatusCode) -> Self {
450 0 : match pb {
451 0 : proto::GetPageStatusCode::Unknown => Self::Unknown,
452 0 : proto::GetPageStatusCode::Ok => Self::Ok,
453 0 : proto::GetPageStatusCode::NotFound => Self::NotFound,
454 0 : proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
455 0 : proto::GetPageStatusCode::InternalError => Self::InternalError,
456 0 : proto::GetPageStatusCode::SlowDown => Self::SlowDown,
457 : }
458 0 : }
459 : }
460 :
461 : impl From<i32> for GetPageStatusCode {
462 0 : fn from(status_code: i32) -> Self {
463 0 : proto::GetPageStatusCode::try_from(status_code)
464 0 : .unwrap_or(proto::GetPageStatusCode::Unknown)
465 0 : .into()
466 0 : }
467 : }
468 :
469 : impl From<GetPageStatusCode> for proto::GetPageStatusCode {
470 0 : fn from(status_code: GetPageStatusCode) -> Self {
471 0 : match status_code {
472 0 : GetPageStatusCode::Unknown => Self::Unknown,
473 0 : GetPageStatusCode::Ok => Self::Ok,
474 0 : GetPageStatusCode::NotFound => Self::NotFound,
475 0 : GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
476 0 : GetPageStatusCode::InternalError => Self::InternalError,
477 0 : GetPageStatusCode::SlowDown => Self::SlowDown,
478 : }
479 0 : }
480 : }
481 :
482 : impl From<GetPageStatusCode> for i32 {
483 0 : fn from(status_code: GetPageStatusCode) -> Self {
484 0 : proto::GetPageStatusCode::from(status_code).into()
485 0 : }
486 : }
487 :
488 : // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
489 : // shards will error.
490 : pub struct GetRelSizeRequest {
491 : pub read_lsn: ReadLsn,
492 : pub rel: RelTag,
493 : }
494 :
495 : impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
496 : type Error = ProtocolError;
497 :
498 0 : fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
499 0 : Ok(Self {
500 0 : read_lsn: proto
501 0 : .read_lsn
502 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
503 0 : .try_into()?,
504 0 : rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
505 : })
506 0 : }
507 : }
508 :
509 : impl From<GetRelSizeRequest> for proto::GetRelSizeRequest {
510 0 : fn from(request: GetRelSizeRequest) -> Self {
511 0 : Self {
512 0 : read_lsn: Some(request.read_lsn.into()),
513 0 : rel: Some(request.rel.into()),
514 0 : }
515 0 : }
516 : }
517 :
518 : pub type GetRelSizeResponse = u32;
519 :
520 : impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
521 0 : fn from(proto: proto::GetRelSizeResponse) -> Self {
522 0 : proto.num_blocks
523 0 : }
524 : }
525 :
526 : impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
527 0 : fn from(num_blocks: GetRelSizeResponse) -> Self {
528 0 : Self { num_blocks }
529 0 : }
530 : }
531 :
532 : /// Requests an SLRU segment. Only valid on shard 0, other shards will error.
533 : pub struct GetSlruSegmentRequest {
534 : pub read_lsn: ReadLsn,
535 : pub kind: SlruKind,
536 : pub segno: u32,
537 : }
538 :
539 : impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
540 : type Error = ProtocolError;
541 :
542 0 : fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
543 0 : Ok(Self {
544 0 : read_lsn: pb
545 0 : .read_lsn
546 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
547 0 : .try_into()?,
548 0 : kind: u8::try_from(pb.kind)
549 0 : .ok()
550 0 : .and_then(SlruKind::from_repr)
551 0 : .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
552 0 : segno: pb.segno,
553 : })
554 0 : }
555 : }
556 :
557 : impl From<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
558 0 : fn from(request: GetSlruSegmentRequest) -> Self {
559 0 : Self {
560 0 : read_lsn: Some(request.read_lsn.into()),
561 0 : kind: request.kind as u32,
562 0 : segno: request.segno,
563 0 : }
564 0 : }
565 : }
566 :
567 : pub type GetSlruSegmentResponse = Bytes;
568 :
569 : impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
570 : type Error = ProtocolError;
571 :
572 0 : fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
573 0 : if pb.segment.is_empty() {
574 0 : return Err(ProtocolError::Missing("segment"));
575 0 : }
576 0 : Ok(pb.segment)
577 0 : }
578 : }
579 :
580 : impl From<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
581 0 : fn from(segment: GetSlruSegmentResponse) -> Self {
582 0 : Self { segment }
583 0 : }
584 : }
585 :
586 : // SlruKind is defined in pageserver_api::reltag.
587 : pub type SlruKind = pageserver_api::reltag::SlruKind;
|