Line data Source code
1 : //! Structs representing the canonical page service API.
2 : //!
3 : //! These mirror the autogenerated Protobuf types. The differences are:
4 : //!
5 : //! - Types that are in fact required by the API are not Options. The protobuf "required"
6 : //! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
7 : //! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
8 : //!
9 : //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
10 : //!
11 : //! - Validate protocol invariants, via try_from() and try_into().
12 : //!
13 : //! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain
14 : //! types. This is where it matters -- the Protobuf types are less strict than the domain types, and
15 : //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g.
16 : //! stream combinators without dealing with errors, and avoids validating the same message twice.
17 :
18 : use std::fmt::Display;
19 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
20 :
21 : use bytes::Bytes;
22 : use postgres_ffi_types::Oid;
23 : // TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid
24 : // pulling in all of their other crate dependencies when building the client.
25 : use utils::lsn::Lsn;
26 :
27 : use crate::proto;
28 :
29 : /// A protocol error. Typically returned via try_from() or try_into().
30 : #[derive(thiserror::Error, Clone, Debug)]
31 : pub enum ProtocolError {
32 : #[error("field '{0}' has invalid value '{1}'")]
33 : Invalid(&'static str, String),
34 : #[error("required field '{0}' is missing")]
35 : Missing(&'static str),
36 : }
37 :
38 : impl ProtocolError {
39 : /// Helper to generate a new ProtocolError::Invalid for the given field and value.
40 0 : pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
41 0 : Self::Invalid(field, format!("{value:?}"))
42 0 : }
43 : }
44 :
45 : impl From<ProtocolError> for tonic::Status {
46 0 : fn from(err: ProtocolError) -> Self {
47 0 : tonic::Status::invalid_argument(format!("{err}"))
48 0 : }
49 : }
50 :
51 : /// The LSN a request should read at.
52 : #[derive(Clone, Copy, Debug)]
53 : pub struct ReadLsn {
54 : /// The request's read LSN.
55 : pub request_lsn: Lsn,
56 : /// If given, the caller guarantees that the page has not been modified since this LSN. Must be
57 : /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
58 : /// without waiting for the request LSN to arrive. If not given, the request will read at the
59 : /// request_lsn and wait for it to arrive if necessary. Valid for all request types.
60 : ///
61 : /// It is undefined behaviour to make a request such that the page was, in fact, modified
62 : /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
63 : /// error, or it might return the old page version or the new page version. Setting
64 : /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
65 : /// waiting.
66 : pub not_modified_since_lsn: Option<Lsn>,
67 : }
68 :
69 : impl Display for ReadLsn {
70 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 0 : let req_lsn = self.request_lsn;
72 0 : if let Some(mod_lsn) = self.not_modified_since_lsn {
73 0 : write!(f, "{req_lsn}>={mod_lsn}")
74 : } else {
75 0 : req_lsn.fmt(f)
76 : }
77 0 : }
78 : }
79 :
80 : impl TryFrom<proto::ReadLsn> for ReadLsn {
81 : type Error = ProtocolError;
82 :
83 0 : fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
84 0 : if pb.request_lsn == 0 {
85 0 : return Err(ProtocolError::invalid("request_lsn", pb.request_lsn));
86 0 : }
87 0 : if pb.not_modified_since_lsn > pb.request_lsn {
88 0 : return Err(ProtocolError::invalid(
89 0 : "not_modified_since_lsn",
90 0 : pb.not_modified_since_lsn,
91 0 : ));
92 0 : }
93 : Ok(Self {
94 0 : request_lsn: Lsn(pb.request_lsn),
95 0 : not_modified_since_lsn: match pb.not_modified_since_lsn {
96 0 : 0 => None,
97 0 : lsn => Some(Lsn(lsn)),
98 : },
99 : })
100 0 : }
101 : }
102 :
103 : impl From<ReadLsn> for proto::ReadLsn {
104 0 : fn from(read_lsn: ReadLsn) -> Self {
105 0 : Self {
106 0 : request_lsn: read_lsn.request_lsn.0,
107 0 : not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
108 0 : }
109 0 : }
110 : }
111 :
112 : // RelTag is defined in pageserver_api::reltag.
113 : pub type RelTag = pageserver_api::reltag::RelTag;
114 :
115 : impl TryFrom<proto::RelTag> for RelTag {
116 : type Error = ProtocolError;
117 :
118 0 : fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
119 : Ok(Self {
120 0 : spcnode: pb.spc_oid,
121 0 : dbnode: pb.db_oid,
122 0 : relnode: pb.rel_number,
123 0 : forknum: pb
124 0 : .fork_number
125 0 : .try_into()
126 0 : .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
127 : })
128 0 : }
129 : }
130 :
131 : impl From<RelTag> for proto::RelTag {
132 0 : fn from(rel_tag: RelTag) -> Self {
133 0 : Self {
134 0 : spc_oid: rel_tag.spcnode,
135 0 : db_oid: rel_tag.dbnode,
136 0 : rel_number: rel_tag.relnode,
137 0 : fork_number: rel_tag.forknum as u32,
138 0 : }
139 0 : }
140 : }
141 :
142 : /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
143 : #[derive(Clone, Copy, Debug)]
144 : pub struct CheckRelExistsRequest {
145 : pub read_lsn: ReadLsn,
146 : pub rel: RelTag,
147 : }
148 :
149 : impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
150 : type Error = ProtocolError;
151 :
152 0 : fn try_from(pb: proto::CheckRelExistsRequest) -> Result<Self, Self::Error> {
153 : Ok(Self {
154 0 : read_lsn: pb
155 0 : .read_lsn
156 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
157 0 : .try_into()?,
158 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
159 : })
160 0 : }
161 : }
162 :
163 : impl From<CheckRelExistsRequest> for proto::CheckRelExistsRequest {
164 0 : fn from(request: CheckRelExistsRequest) -> Self {
165 0 : Self {
166 0 : read_lsn: Some(request.read_lsn.into()),
167 0 : rel: Some(request.rel.into()),
168 0 : }
169 0 : }
170 : }
171 :
172 : pub type CheckRelExistsResponse = bool;
173 :
174 : impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
175 0 : fn from(pb: proto::CheckRelExistsResponse) -> Self {
176 0 : pb.exists
177 0 : }
178 : }
179 :
180 : impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
181 0 : fn from(exists: CheckRelExistsResponse) -> Self {
182 0 : Self { exists }
183 0 : }
184 : }
185 :
186 : /// Requests a base backup.
187 : #[derive(Clone, Copy, Debug)]
188 : pub struct GetBaseBackupRequest {
189 : /// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
190 : pub lsn: Option<Lsn>,
191 : /// If true, logical replication slots will not be created.
192 : pub replica: bool,
193 : /// If true, include relation files in the base backup. Mainly for debugging and tests.
194 : pub full: bool,
195 : /// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
196 : /// compression, so that we can cache compressed backups on the server.
197 : pub compression: BaseBackupCompression,
198 : }
199 :
200 : impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
201 : type Error = ProtocolError;
202 :
203 0 : fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
204 : Ok(Self {
205 0 : lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
206 0 : replica: pb.replica,
207 0 : full: pb.full,
208 0 : compression: pb.compression.try_into()?,
209 : })
210 0 : }
211 : }
212 :
213 : impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
214 0 : fn from(request: GetBaseBackupRequest) -> Self {
215 0 : Self {
216 0 : lsn: request.lsn.unwrap_or_default().0,
217 0 : replica: request.replica,
218 0 : full: request.full,
219 0 : compression: request.compression.into(),
220 0 : }
221 0 : }
222 : }
223 :
224 : /// Base backup compression algorithm.
225 : #[derive(Clone, Copy, Debug)]
226 : pub enum BaseBackupCompression {
227 : None,
228 : Gzip,
229 : }
230 :
231 : impl TryFrom<proto::BaseBackupCompression> for BaseBackupCompression {
232 : type Error = ProtocolError;
233 :
234 0 : fn try_from(pb: proto::BaseBackupCompression) -> Result<Self, Self::Error> {
235 0 : match pb {
236 0 : proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)),
237 0 : proto::BaseBackupCompression::None => Ok(Self::None),
238 0 : proto::BaseBackupCompression::Gzip => Ok(Self::Gzip),
239 : }
240 0 : }
241 : }
242 :
243 : impl TryFrom<i32> for BaseBackupCompression {
244 : type Error = ProtocolError;
245 :
246 0 : fn try_from(compression: i32) -> Result<Self, Self::Error> {
247 0 : proto::BaseBackupCompression::try_from(compression)
248 0 : .map_err(|_| ProtocolError::invalid("compression", compression))
249 0 : .and_then(Self::try_from)
250 0 : }
251 : }
252 :
253 : impl From<BaseBackupCompression> for proto::BaseBackupCompression {
254 0 : fn from(compression: BaseBackupCompression) -> Self {
255 0 : match compression {
256 0 : BaseBackupCompression::None => Self::None,
257 0 : BaseBackupCompression::Gzip => Self::Gzip,
258 : }
259 0 : }
260 : }
261 :
262 : impl From<BaseBackupCompression> for i32 {
263 0 : fn from(compression: BaseBackupCompression) -> Self {
264 0 : proto::BaseBackupCompression::from(compression).into()
265 0 : }
266 : }
267 :
268 : pub type GetBaseBackupResponseChunk = Bytes;
269 :
270 : impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
271 : type Error = ProtocolError;
272 :
273 0 : fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
274 0 : if pb.chunk.is_empty() {
275 0 : return Err(ProtocolError::Missing("chunk"));
276 0 : }
277 0 : Ok(pb.chunk)
278 0 : }
279 : }
280 :
281 : impl From<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
282 0 : fn from(chunk: GetBaseBackupResponseChunk) -> Self {
283 0 : Self { chunk }
284 0 : }
285 : }
286 :
287 : /// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
288 : #[derive(Clone, Copy, Debug)]
289 : pub struct GetDbSizeRequest {
290 : pub read_lsn: ReadLsn,
291 : pub db_oid: Oid,
292 : }
293 :
294 : impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
295 : type Error = ProtocolError;
296 :
297 0 : fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
298 : Ok(Self {
299 0 : read_lsn: pb
300 0 : .read_lsn
301 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
302 0 : .try_into()?,
303 0 : db_oid: pb.db_oid,
304 : })
305 0 : }
306 : }
307 :
308 : impl From<GetDbSizeRequest> for proto::GetDbSizeRequest {
309 0 : fn from(request: GetDbSizeRequest) -> Self {
310 0 : Self {
311 0 : read_lsn: Some(request.read_lsn.into()),
312 0 : db_oid: request.db_oid,
313 0 : }
314 0 : }
315 : }
316 :
317 : pub type GetDbSizeResponse = u64;
318 :
319 : impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
320 0 : fn from(pb: proto::GetDbSizeResponse) -> Self {
321 0 : pb.num_bytes
322 0 : }
323 : }
324 :
325 : impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
326 0 : fn from(num_bytes: GetDbSizeResponse) -> Self {
327 0 : Self { num_bytes }
328 0 : }
329 : }
330 :
331 : /// Requests one or more pages.
332 : #[derive(Clone, Debug)]
333 : pub struct GetPageRequest {
334 : /// A request ID. Will be included in the response. Should be unique for in-flight requests on
335 : /// the stream.
336 : pub request_id: RequestID,
337 : /// The request class.
338 : pub request_class: GetPageClass,
339 : /// The LSN to read at.
340 : pub read_lsn: ReadLsn,
341 : /// The relation to read from.
342 : pub rel: RelTag,
343 : /// Page numbers to read. Must belong to the remote shard.
344 : ///
345 : /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
346 : /// costs and parallelizing them. This may increase the latency of any individual request, but
347 : /// improves the overall latency and throughput of the batch as a whole.
348 : pub block_numbers: Vec<u32>,
349 : }
350 :
351 : impl TryFrom<proto::GetPageRequest> for GetPageRequest {
352 : type Error = ProtocolError;
353 :
354 0 : fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
355 0 : if pb.block_number.is_empty() {
356 0 : return Err(ProtocolError::Missing("block_number"));
357 0 : }
358 : Ok(Self {
359 0 : request_id: pb
360 0 : .request_id
361 0 : .ok_or(ProtocolError::Missing("request_id"))?
362 0 : .into(),
363 0 : request_class: pb.request_class.into(),
364 0 : read_lsn: pb
365 0 : .read_lsn
366 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
367 0 : .try_into()?,
368 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
369 0 : block_numbers: pb.block_number,
370 : })
371 0 : }
372 : }
373 :
374 : impl From<GetPageRequest> for proto::GetPageRequest {
375 0 : fn from(request: GetPageRequest) -> Self {
376 0 : Self {
377 0 : request_id: Some(request.request_id.into()),
378 0 : request_class: request.request_class.into(),
379 0 : read_lsn: Some(request.read_lsn.into()),
380 0 : rel: Some(request.rel.into()),
381 0 : block_number: request.block_numbers,
382 0 : }
383 0 : }
384 : }
385 :
386 : /// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream.
387 : #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
388 : pub struct RequestID {
389 : /// The base request ID.
390 : pub id: u64,
391 : // The request attempt. Starts at 0, incremented on each retry.
392 : pub attempt: u32,
393 : }
394 :
395 : impl RequestID {
396 : /// Creates a new RequestID with the given ID and an initial attempt of 0.
397 0 : pub fn new(id: u64) -> Self {
398 0 : Self { id, attempt: 0 }
399 0 : }
400 : }
401 :
402 : impl Display for RequestID {
403 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
404 0 : write!(f, "{}.{}", self.id, self.attempt)
405 0 : }
406 : }
407 :
408 : impl From<proto::RequestId> for RequestID {
409 0 : fn from(pb: proto::RequestId) -> Self {
410 0 : Self {
411 0 : id: pb.id,
412 0 : attempt: pb.attempt,
413 0 : }
414 0 : }
415 : }
416 :
417 : impl From<u64> for RequestID {
418 0 : fn from(id: u64) -> Self {
419 0 : Self::new(id)
420 0 : }
421 : }
422 :
423 : impl From<RequestID> for proto::RequestId {
424 0 : fn from(request_id: RequestID) -> Self {
425 0 : Self {
426 0 : id: request_id.id,
427 0 : attempt: request_id.attempt,
428 0 : }
429 0 : }
430 : }
431 :
432 : /// A GetPage request class.
433 : #[derive(Clone, Copy, Debug, strum_macros::Display)]
434 : pub enum GetPageClass {
435 : /// Unknown class. For backwards compatibility: used when an older client version sends a class
436 : /// that a newer server version has removed.
437 : Unknown,
438 : /// A normal request. This is the default.
439 : Normal,
440 : /// A prefetch request. NB: can only be classified on pg < 18.
441 : Prefetch,
442 : /// A background request (e.g. vacuum).
443 : Background,
444 : }
445 :
446 : impl GetPageClass {
447 : /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than
448 : /// latency-sensitive).
449 0 : pub fn is_bulk(&self) -> bool {
450 0 : match self {
451 0 : Self::Unknown => false,
452 0 : Self::Normal => false,
453 0 : Self::Prefetch => true,
454 0 : Self::Background => true,
455 : }
456 0 : }
457 : }
458 :
459 : impl From<proto::GetPageClass> for GetPageClass {
460 0 : fn from(pb: proto::GetPageClass) -> Self {
461 0 : match pb {
462 0 : proto::GetPageClass::Unknown => Self::Unknown,
463 0 : proto::GetPageClass::Normal => Self::Normal,
464 0 : proto::GetPageClass::Prefetch => Self::Prefetch,
465 0 : proto::GetPageClass::Background => Self::Background,
466 : }
467 0 : }
468 : }
469 :
470 : impl From<i32> for GetPageClass {
471 0 : fn from(class: i32) -> Self {
472 0 : proto::GetPageClass::try_from(class)
473 0 : .unwrap_or(proto::GetPageClass::Unknown)
474 0 : .into()
475 0 : }
476 : }
477 :
478 : impl From<GetPageClass> for proto::GetPageClass {
479 0 : fn from(class: GetPageClass) -> Self {
480 0 : match class {
481 0 : GetPageClass::Unknown => Self::Unknown,
482 0 : GetPageClass::Normal => Self::Normal,
483 0 : GetPageClass::Prefetch => Self::Prefetch,
484 0 : GetPageClass::Background => Self::Background,
485 : }
486 0 : }
487 : }
488 :
489 : impl From<GetPageClass> for i32 {
490 0 : fn from(class: GetPageClass) -> Self {
491 0 : proto::GetPageClass::from(class).into()
492 0 : }
493 : }
494 :
495 : /// A GetPage response.
496 : ///
497 : /// A batch response will contain all of the requested pages. We could eagerly emit individual pages
498 : /// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
499 : /// batch and we'll only return once the entire batch is ready, so no one can make use of the
500 : /// individual pages.
501 : #[derive(Clone, Debug)]
502 : pub struct GetPageResponse {
503 : /// The original request's ID.
504 : pub request_id: RequestID,
505 : /// The response status code. If not OK, the `rel` and `pages` fields will be empty.
506 : pub status_code: GetPageStatusCode,
507 : /// A string describing the status, if any.
508 : pub reason: Option<String>,
509 : /// The relation that the pages belong to.
510 : pub rel: RelTag,
511 : // The page(s), in the same order as the request.
512 : pub pages: Vec<Page>,
513 : }
514 :
515 : impl TryFrom<proto::GetPageResponse> for GetPageResponse {
516 : type Error = ProtocolError;
517 :
518 0 : fn try_from(pb: proto::GetPageResponse) -> Result<Self, ProtocolError> {
519 : Ok(Self {
520 0 : request_id: pb
521 0 : .request_id
522 0 : .ok_or(ProtocolError::Missing("request_id"))?
523 0 : .into(),
524 0 : status_code: pb.status_code.into(),
525 0 : reason: Some(pb.reason).filter(|r| !r.is_empty()),
526 0 : rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
527 0 : pages: pb.page.into_iter().map(Page::from).collect(),
528 : })
529 0 : }
530 : }
531 :
532 : impl From<GetPageResponse> for proto::GetPageResponse {
533 0 : fn from(response: GetPageResponse) -> Self {
534 0 : Self {
535 0 : request_id: Some(response.request_id.into()),
536 0 : status_code: response.status_code.into(),
537 0 : reason: response.reason.unwrap_or_default(),
538 0 : rel: Some(response.rel.into()),
539 0 : page: response.pages.into_iter().map(proto::Page::from).collect(),
540 0 : }
541 0 : }
542 : }
543 :
544 : impl GetPageResponse {
545 : /// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
546 : /// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
547 : /// GetPageResponse with a non-OK status code instead.
548 : #[allow(clippy::result_large_err)]
549 0 : pub fn try_from_status(
550 0 : status: tonic::Status,
551 0 : request_id: RequestID,
552 0 : ) -> Result<Self, tonic::Status> {
553 : // We shouldn't see an OK status here, because we're emitting an error.
554 0 : debug_assert_ne!(status.code(), tonic::Code::Ok);
555 0 : if status.code() == tonic::Code::Ok {
556 0 : return Err(tonic::Status::internal(format!(
557 0 : "unexpected OK status: {status:?}",
558 0 : )));
559 0 : }
560 :
561 : // If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
562 : // error and we should return a tonic::Status to terminate the stream.
563 0 : let Ok(status_code) = status.code().try_into() else {
564 0 : return Err(status);
565 : };
566 :
567 : // Return a GetPageResponse for the status.
568 0 : Ok(Self {
569 0 : request_id,
570 0 : status_code,
571 0 : reason: Some(status.message().to_string()),
572 0 : rel: RelTag::default(),
573 0 : pages: Vec::new(),
574 0 : })
575 0 : }
576 : }
577 :
578 : // A page.
579 : #[derive(Clone, Debug)]
580 : pub struct Page {
581 : /// The page number.
582 : pub block_number: u32,
583 : /// The materialized page image, as an 8KB byte vector.
584 : pub image: Bytes,
585 : }
586 :
587 : impl From<proto::Page> for Page {
588 0 : fn from(pb: proto::Page) -> Self {
589 0 : Self {
590 0 : block_number: pb.block_number,
591 0 : image: pb.image,
592 0 : }
593 0 : }
594 : }
595 :
596 : impl From<Page> for proto::Page {
597 0 : fn from(page: Page) -> Self {
598 0 : Self {
599 0 : block_number: page.block_number,
600 0 : image: page.image,
601 0 : }
602 0 : }
603 : }
604 :
605 : /// A GetPage response status code.
606 : ///
607 : /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
608 : /// (potentially shared by many backends), and a gRPC status response would terminate the stream so
609 : /// we send GetPageResponse messages with these codes instead.
610 : #[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)]
611 : pub enum GetPageStatusCode {
612 : /// Unknown status. For forwards compatibility: used when an older client version receives a new
613 : /// status code from a newer server version.
614 : Unknown,
615 : /// The request was successful.
616 : Ok,
617 : /// The page did not exist. The tenant/timeline/shard has already been validated during stream
618 : /// setup.
619 : NotFound,
620 : /// The request was invalid.
621 : InvalidRequest,
622 : /// The request failed due to an internal server error.
623 : InternalError,
624 : /// The tenant is rate limited. Slow down and retry later.
625 : SlowDown,
626 : }
627 :
628 : impl From<proto::GetPageStatusCode> for GetPageStatusCode {
629 0 : fn from(pb: proto::GetPageStatusCode) -> Self {
630 0 : match pb {
631 0 : proto::GetPageStatusCode::Unknown => Self::Unknown,
632 0 : proto::GetPageStatusCode::Ok => Self::Ok,
633 0 : proto::GetPageStatusCode::NotFound => Self::NotFound,
634 0 : proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
635 0 : proto::GetPageStatusCode::InternalError => Self::InternalError,
636 0 : proto::GetPageStatusCode::SlowDown => Self::SlowDown,
637 : }
638 0 : }
639 : }
640 :
641 : impl From<i32> for GetPageStatusCode {
642 0 : fn from(status_code: i32) -> Self {
643 0 : proto::GetPageStatusCode::try_from(status_code)
644 0 : .unwrap_or(proto::GetPageStatusCode::Unknown)
645 0 : .into()
646 0 : }
647 : }
648 :
649 : impl From<GetPageStatusCode> for proto::GetPageStatusCode {
650 0 : fn from(status_code: GetPageStatusCode) -> Self {
651 0 : match status_code {
652 0 : GetPageStatusCode::Unknown => Self::Unknown,
653 0 : GetPageStatusCode::Ok => Self::Ok,
654 0 : GetPageStatusCode::NotFound => Self::NotFound,
655 0 : GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
656 0 : GetPageStatusCode::InternalError => Self::InternalError,
657 0 : GetPageStatusCode::SlowDown => Self::SlowDown,
658 : }
659 0 : }
660 : }
661 :
662 : impl From<GetPageStatusCode> for i32 {
663 0 : fn from(status_code: GetPageStatusCode) -> Self {
664 0 : proto::GetPageStatusCode::from(status_code).into()
665 0 : }
666 : }
667 :
668 : impl TryFrom<tonic::Code> for GetPageStatusCode {
669 : type Error = tonic::Code;
670 :
671 0 : fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
672 : use tonic::Code;
673 :
674 0 : let status_code = match code {
675 0 : Code::Ok => Self::Ok,
676 :
677 : // These are per-request errors, which should be returned as GetPageResponses.
678 0 : Code::AlreadyExists => Self::InvalidRequest,
679 0 : Code::DataLoss => Self::InternalError,
680 0 : Code::FailedPrecondition => Self::InvalidRequest,
681 0 : Code::InvalidArgument => Self::InvalidRequest,
682 0 : Code::Internal => Self::InternalError,
683 0 : Code::NotFound => Self::NotFound,
684 0 : Code::OutOfRange => Self::InvalidRequest,
685 0 : Code::ResourceExhausted => Self::SlowDown,
686 :
687 : // These should terminate the stream by returning a tonic::Status.
688 : Code::Aborted
689 : | Code::Cancelled
690 : | Code::DeadlineExceeded
691 : | Code::PermissionDenied
692 : | Code::Unauthenticated
693 : | Code::Unavailable
694 : | Code::Unimplemented
695 0 : | Code::Unknown => return Err(code),
696 : };
697 0 : Ok(status_code)
698 0 : }
699 : }
700 :
701 : impl From<GetPageStatusCode> for tonic::Code {
702 0 : fn from(status_code: GetPageStatusCode) -> Self {
703 : use tonic::Code;
704 :
705 0 : match status_code {
706 0 : GetPageStatusCode::Unknown => Code::Unknown,
707 0 : GetPageStatusCode::Ok => Code::Ok,
708 0 : GetPageStatusCode::NotFound => Code::NotFound,
709 0 : GetPageStatusCode::InvalidRequest => Code::InvalidArgument,
710 0 : GetPageStatusCode::InternalError => Code::Internal,
711 0 : GetPageStatusCode::SlowDown => Code::ResourceExhausted,
712 : }
713 0 : }
714 : }
715 :
716 : // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
717 : // shards will error.
718 : #[derive(Clone, Copy, Debug)]
719 : pub struct GetRelSizeRequest {
720 : pub read_lsn: ReadLsn,
721 : pub rel: RelTag,
722 : }
723 :
724 : impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
725 : type Error = ProtocolError;
726 :
727 0 : fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
728 : Ok(Self {
729 0 : read_lsn: proto
730 0 : .read_lsn
731 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
732 0 : .try_into()?,
733 0 : rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
734 : })
735 0 : }
736 : }
737 :
738 : impl From<GetRelSizeRequest> for proto::GetRelSizeRequest {
739 0 : fn from(request: GetRelSizeRequest) -> Self {
740 0 : Self {
741 0 : read_lsn: Some(request.read_lsn.into()),
742 0 : rel: Some(request.rel.into()),
743 0 : }
744 0 : }
745 : }
746 :
747 : pub type GetRelSizeResponse = u32;
748 :
749 : impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
750 0 : fn from(proto: proto::GetRelSizeResponse) -> Self {
751 0 : proto.num_blocks
752 0 : }
753 : }
754 :
755 : impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
756 0 : fn from(num_blocks: GetRelSizeResponse) -> Self {
757 0 : Self { num_blocks }
758 0 : }
759 : }
760 :
761 : /// Requests an SLRU segment. Only valid on shard 0, other shards will error.
762 : #[derive(Clone, Copy, Debug)]
763 : pub struct GetSlruSegmentRequest {
764 : pub read_lsn: ReadLsn,
765 : pub kind: SlruKind,
766 : pub segno: u32,
767 : }
768 :
769 : impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
770 : type Error = ProtocolError;
771 :
772 0 : fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
773 : Ok(Self {
774 0 : read_lsn: pb
775 0 : .read_lsn
776 0 : .ok_or(ProtocolError::Missing("read_lsn"))?
777 0 : .try_into()?,
778 0 : kind: u8::try_from(pb.kind)
779 0 : .ok()
780 0 : .and_then(SlruKind::from_repr)
781 0 : .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
782 0 : segno: pb.segno,
783 : })
784 0 : }
785 : }
786 :
787 : impl From<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
788 0 : fn from(request: GetSlruSegmentRequest) -> Self {
789 0 : Self {
790 0 : read_lsn: Some(request.read_lsn.into()),
791 0 : kind: request.kind as u32,
792 0 : segno: request.segno,
793 0 : }
794 0 : }
795 : }
796 :
797 : pub type GetSlruSegmentResponse = Bytes;
798 :
799 : impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
800 : type Error = ProtocolError;
801 :
802 0 : fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
803 0 : if pb.segment.is_empty() {
804 0 : return Err(ProtocolError::Missing("segment"));
805 0 : }
806 0 : Ok(pb.segment)
807 0 : }
808 : }
809 :
810 : impl From<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
811 0 : fn from(segment: GetSlruSegmentResponse) -> Self {
812 0 : Self { segment }
813 0 : }
814 : }
815 :
816 : // SlruKind is defined in pageserver_api::reltag.
817 : pub type SlruKind = pageserver_api::reltag::SlruKind;
818 :
819 : /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
820 : /// collect the LSN until the lease expires.
821 : pub struct LeaseLsnRequest {
822 : /// The LSN to lease.
823 : pub lsn: Lsn,
824 : }
825 :
826 : impl TryFrom<proto::LeaseLsnRequest> for LeaseLsnRequest {
827 : type Error = ProtocolError;
828 :
829 0 : fn try_from(pb: proto::LeaseLsnRequest) -> Result<Self, Self::Error> {
830 0 : if pb.lsn == 0 {
831 0 : return Err(ProtocolError::Missing("lsn"));
832 0 : }
833 0 : Ok(Self { lsn: Lsn(pb.lsn) })
834 0 : }
835 : }
836 :
837 : impl From<LeaseLsnRequest> for proto::LeaseLsnRequest {
838 0 : fn from(request: LeaseLsnRequest) -> Self {
839 0 : Self { lsn: request.lsn.0 }
840 0 : }
841 : }
842 :
843 : /// Lease expiration time. If the lease could not be granted because the LSN has already been
844 : /// garbage collected, a FailedPrecondition status will be returned instead.
845 : pub type LeaseLsnResponse = SystemTime;
846 :
847 : impl TryFrom<proto::LeaseLsnResponse> for LeaseLsnResponse {
848 : type Error = ProtocolError;
849 :
850 0 : fn try_from(pb: proto::LeaseLsnResponse) -> Result<Self, Self::Error> {
851 0 : let expires = pb.expires.ok_or(ProtocolError::Missing("expires"))?;
852 0 : UNIX_EPOCH
853 0 : .checked_add(Duration::new(expires.seconds as u64, expires.nanos as u32))
854 0 : .ok_or_else(|| ProtocolError::invalid("expires", expires))
855 0 : }
856 : }
857 :
858 : impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
859 0 : fn from(response: LeaseLsnResponse) -> Self {
860 0 : let expires = response.duration_since(UNIX_EPOCH).unwrap_or_default();
861 0 : Self {
862 0 : expires: Some(prost_types::Timestamp {
863 0 : seconds: expires.as_secs() as i64,
864 0 : nanos: expires.subsec_nanos() as i32,
865 0 : }),
866 0 : }
867 0 : }
868 : }
|