LCOV - code coverage report
Current view: top level - libs/utils/src - pageserver_feedback.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 96.0 % 125 120
Test Date: 2023-09-06 10:18:01 Functions: 34.7 % 49 17

            Line data    Source code
       1              : use std::time::{Duration, SystemTime};
       2              : 
       3              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       4              : use pq_proto::{read_cstr, PG_EPOCH};
       5              : use serde::{Deserialize, Serialize};
       6              : use serde_with::{serde_as, DisplayFromStr};
       7              : use tracing::{trace, warn};
       8              : 
       9              : use crate::lsn::Lsn;
      10              : 
      11              : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
      12              : /// Serialized in custom flexible key/value format. In replication protocol, it
      13              : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
      14              : /// Standby status update / Hot standby feedback messages.
      15              : ///
      16              : /// serde Serialize is used only for human readable dump to json (e.g. in
      17              : /// safekeepers debug_dump).
      18              : #[serde_as]
      19           89 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
      20              : pub struct PageserverFeedback {
      21              :     /// Last known size of the timeline. Used to enforce timeline size limit.
      22              :     pub current_timeline_size: u64,
      23              :     /// LSN last received and ingested by the pageserver. Controls backpressure.
      24              :     #[serde_as(as = "DisplayFromStr")]
      25              :     pub last_received_lsn: Lsn,
      26              :     /// LSN up to which data is persisted by the pageserver to its local disc.
      27              :     /// Controls backpressure.
      28              :     #[serde_as(as = "DisplayFromStr")]
      29              :     pub disk_consistent_lsn: Lsn,
      30              :     /// LSN up to which data is persisted by the pageserver on s3; safekeepers
      31              :     /// consider WAL before it can be removed.
      32              :     #[serde_as(as = "DisplayFromStr")]
      33              :     pub remote_consistent_lsn: Lsn,
      34              :     // Serialize with RFC3339 format.
      35              :     #[serde(with = "serde_systemtime")]
      36              :     pub replytime: SystemTime,
      37              : }
      38              : 
      39              : // NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
      40              : // Do not remove previously available fields because this might be backwards incompatible.
      41              : pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
      42              : 
      43              : impl PageserverFeedback {
      44      3674625 :     pub fn empty() -> PageserverFeedback {
      45      3674625 :         PageserverFeedback {
      46      3674625 :             current_timeline_size: 0,
      47      3674625 :             last_received_lsn: Lsn::INVALID,
      48      3674625 :             remote_consistent_lsn: Lsn::INVALID,
      49      3674625 :             disk_consistent_lsn: Lsn::INVALID,
      50      3674625 :             replytime: *PG_EPOCH,
      51      3674625 :         }
      52      3674625 :     }
      53              : 
      54              :     // Serialize PageserverFeedback using custom format
      55              :     // to support protocol extensibility.
      56              :     //
      57              :     // Following layout is used:
      58              :     // char - number of key-value pairs that follow.
      59              :     //
      60              :     // key-value pairs:
      61              :     // null-terminated string - key,
      62              :     // uint32 - value length in bytes
      63              :     // value itself
      64              :     //
      65              :     // TODO: change serialized fields names once all computes migrate to rename.
      66      2934446 :     pub fn serialize(&self, buf: &mut BytesMut) {
      67      2934446 :         buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
      68      2934446 :         buf.put_slice(b"current_timeline_size\0");
      69      2934446 :         buf.put_i32(8);
      70      2934446 :         buf.put_u64(self.current_timeline_size);
      71      2934446 : 
      72      2934446 :         buf.put_slice(b"ps_writelsn\0");
      73      2934446 :         buf.put_i32(8);
      74      2934446 :         buf.put_u64(self.last_received_lsn.0);
      75      2934446 :         buf.put_slice(b"ps_flushlsn\0");
      76      2934446 :         buf.put_i32(8);
      77      2934446 :         buf.put_u64(self.disk_consistent_lsn.0);
      78      2934446 :         buf.put_slice(b"ps_applylsn\0");
      79      2934446 :         buf.put_i32(8);
      80      2934446 :         buf.put_u64(self.remote_consistent_lsn.0);
      81      2934446 : 
      82      2934446 :         let timestamp = self
      83      2934446 :             .replytime
      84      2934446 :             .duration_since(*PG_EPOCH)
      85      2934446 :             .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
      86      2934446 :             .as_micros() as i64;
      87      2934446 : 
      88      2934446 :         buf.put_slice(b"ps_replytime\0");
      89      2934446 :         buf.put_i32(8);
      90      2934446 :         buf.put_i64(timestamp);
      91      2934446 :     }
      92              : 
      93              :     // Deserialize PageserverFeedback message
      94              :     // TODO: change serialized fields names once all computes migrate to rename.
      95       736451 :     pub fn parse(mut buf: Bytes) -> PageserverFeedback {
      96       736451 :         let mut rf = PageserverFeedback::empty();
      97       736451 :         let nfields = buf.get_u8();
      98       736451 :         for _ in 0..nfields {
      99      3682256 :             let key = read_cstr(&mut buf).unwrap();
     100      3682256 :             match key.as_ref() {
     101      3682256 :                 b"current_timeline_size" => {
     102       736451 :                     let len = buf.get_i32();
     103       736451 :                     assert_eq!(len, 8);
     104       736451 :                     rf.current_timeline_size = buf.get_u64();
     105              :                 }
     106      2945805 :                 b"ps_writelsn" => {
     107       736451 :                     let len = buf.get_i32();
     108       736451 :                     assert_eq!(len, 8);
     109       736451 :                     rf.last_received_lsn = Lsn(buf.get_u64());
     110              :                 }
     111              :                 b"ps_flushlsn" => {
     112       736451 :                     let len = buf.get_i32();
     113       736451 :                     assert_eq!(len, 8);
     114       736451 :                     rf.disk_consistent_lsn = Lsn(buf.get_u64());
     115              :                 }
     116              :                 b"ps_applylsn" => {
     117       736451 :                     let len = buf.get_i32();
     118       736451 :                     assert_eq!(len, 8);
     119       736451 :                     rf.remote_consistent_lsn = Lsn(buf.get_u64());
     120              :                 }
     121       736452 :                 b"ps_replytime" => {
     122       736451 :                     let len = buf.get_i32();
     123       736451 :                     assert_eq!(len, 8);
     124       736451 :                     let raw_time = buf.get_i64();
     125       736451 :                     if raw_time > 0 {
     126       736451 :                         rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
     127       736451 :                     } else {
     128            0 :                         rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
     129            0 :                     }
     130              :                 }
     131              :                 _ => {
     132            1 :                     let len = buf.get_i32();
     133            1 :                     warn!(
     134            0 :                         "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
     135            0 :                         String::from_utf8_lossy(key.as_ref())
     136            0 :                     );
     137            1 :                     buf.advance(len as usize);
     138              :                 }
     139              :             }
     140              :         }
     141       736451 :         trace!("PageserverFeedback parsed is {:?}", rf);
     142       736451 :         rf
     143       736451 :     }
     144              : }
     145              : 
     146              : mod serde_systemtime {
     147              :     use std::time::SystemTime;
     148              : 
     149              :     use chrono::{DateTime, Utc};
     150              :     use serde::{Deserialize, Deserializer, Serializer};
     151              : 
     152           89 :     pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
     153           89 :     where
     154           89 :         S: Serializer,
     155           89 :     {
     156           89 :         let chrono_dt: DateTime<Utc> = (*ts).into();
     157           89 :         serializer.serialize_str(&chrono_dt.to_rfc3339())
     158           89 :     }
     159              : 
     160            1 :     pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
     161            1 :     where
     162            1 :         D: Deserializer<'de>,
     163            1 :     {
     164            1 :         let time: String = Deserialize::deserialize(deserializer)?;
     165            1 :         Ok(DateTime::parse_from_rfc3339(&time)
     166            1 :             .map_err(serde::de::Error::custom)?
     167            1 :             .into())
     168            1 :     }
     169              : }
     170              : 
     171              : #[cfg(test)]
     172              : mod tests {
     173              :     use super::*;
     174              : 
     175            1 :     #[test]
     176            1 :     fn test_replication_feedback_serialization() {
     177            1 :         let mut rf = PageserverFeedback::empty();
     178            1 :         // Fill rf with some values
     179            1 :         rf.current_timeline_size = 12345678;
     180            1 :         // Set rounded time to be able to compare it with deserialized value,
     181            1 :         // because it is rounded up to microseconds during serialization.
     182            1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     183            1 :         let mut data = BytesMut::new();
     184            1 :         rf.serialize(&mut data);
     185            1 : 
     186            1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     187            1 :         assert_eq!(rf, rf_parsed);
     188            1 :     }
     189              : 
     190            1 :     #[test]
     191            1 :     fn test_replication_feedback_unknown_key() {
     192            1 :         let mut rf = PageserverFeedback::empty();
     193            1 :         // Fill rf with some values
     194            1 :         rf.current_timeline_size = 12345678;
     195            1 :         // Set rounded time to be able to compare it with deserialized value,
     196            1 :         // because it is rounded up to microseconds during serialization.
     197            1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     198            1 :         let mut data = BytesMut::new();
     199            1 :         rf.serialize(&mut data);
     200              : 
     201              :         // Add an extra field to the buffer and adjust number of keys
     202            1 :         if let Some(first) = data.first_mut() {
     203            1 :             *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
     204            1 :         }
     205              : 
     206            1 :         data.put_slice(b"new_field_one\0");
     207            1 :         data.put_i32(8);
     208            1 :         data.put_u64(42);
     209            1 : 
     210            1 :         // Parse serialized data and check that new field is not parsed
     211            1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     212            1 :         assert_eq!(rf, rf_parsed);
     213            1 :     }
     214              : }
        

Generated by: LCOV version 2.1-beta