LCOV - differential code coverage report
Current view: top level - libs/utils/src - pageserver_feedback.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 96.0 % 125 120 5 120
Current Date: 2023-10-19 02:04:12 Functions: 34.7 % 49 17 32 17
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use std::time::{Duration, SystemTime};
       2                 : 
       3                 : use bytes::{Buf, BufMut, Bytes, BytesMut};
       4                 : use pq_proto::{read_cstr, PG_EPOCH};
       5                 : use serde::{Deserialize, Serialize};
       6                 : use serde_with::{serde_as, DisplayFromStr};
       7                 : use tracing::{trace, warn};
       8                 : 
       9                 : use crate::lsn::Lsn;
      10                 : 
      11                 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
      12                 : /// Serialized in custom flexible key/value format. In replication protocol, it
      13                 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
      14                 : /// Standby status update / Hot standby feedback messages.
      15                 : ///
      16                 : /// serde Serialize is used only for human readable dump to json (e.g. in
      17                 : /// safekeepers debug_dump).
      18                 : #[serde_as]
      19 CBC          97 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
      20                 : pub struct PageserverFeedback {
      21                 :     /// Last known size of the timeline. Used to enforce timeline size limit.
      22                 :     pub current_timeline_size: u64,
      23                 :     /// LSN last received and ingested by the pageserver. Controls backpressure.
      24                 :     #[serde_as(as = "DisplayFromStr")]
      25                 :     pub last_received_lsn: Lsn,
      26                 :     /// LSN up to which data is persisted by the pageserver to its local disc.
      27                 :     /// Controls backpressure.
      28                 :     #[serde_as(as = "DisplayFromStr")]
      29                 :     pub disk_consistent_lsn: Lsn,
      30                 :     /// LSN up to which data is persisted by the pageserver on s3; safekeepers
      31                 :     /// consider WAL before it can be removed.
      32                 :     #[serde_as(as = "DisplayFromStr")]
      33                 :     pub remote_consistent_lsn: Lsn,
      34                 :     // Serialize with RFC3339 format.
      35                 :     #[serde(with = "serde_systemtime")]
      36                 :     pub replytime: SystemTime,
      37                 : }
      38                 : 
      39                 : // NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
      40                 : // Do not remove previously available fields because this might be backwards incompatible.
      41                 : pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
      42                 : 
      43                 : impl PageserverFeedback {
      44         4020002 :     pub fn empty() -> PageserverFeedback {
      45         4020002 :         PageserverFeedback {
      46         4020002 :             current_timeline_size: 0,
      47         4020002 :             last_received_lsn: Lsn::INVALID,
      48         4020002 :             remote_consistent_lsn: Lsn::INVALID,
      49         4020002 :             disk_consistent_lsn: Lsn::INVALID,
      50         4020002 :             replytime: *PG_EPOCH,
      51         4020002 :         }
      52         4020002 :     }
      53                 : 
      54                 :     // Serialize PageserverFeedback using custom format
      55                 :     // to support protocol extensibility.
      56                 :     //
      57                 :     // Following layout is used:
      58                 :     // char - number of key-value pairs that follow.
      59                 :     //
      60                 :     // key-value pairs:
      61                 :     // null-terminated string - key,
      62                 :     // uint32 - value length in bytes
      63                 :     // value itself
      64                 :     //
      65                 :     // TODO: change serialized fields names once all computes migrate to rename.
      66         3236663 :     pub fn serialize(&self, buf: &mut BytesMut) {
      67         3236663 :         buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
      68         3236663 :         buf.put_slice(b"current_timeline_size\0");
      69         3236663 :         buf.put_i32(8);
      70         3236663 :         buf.put_u64(self.current_timeline_size);
      71         3236663 : 
      72         3236663 :         buf.put_slice(b"ps_writelsn\0");
      73         3236663 :         buf.put_i32(8);
      74         3236663 :         buf.put_u64(self.last_received_lsn.0);
      75         3236663 :         buf.put_slice(b"ps_flushlsn\0");
      76         3236663 :         buf.put_i32(8);
      77         3236663 :         buf.put_u64(self.disk_consistent_lsn.0);
      78         3236663 :         buf.put_slice(b"ps_applylsn\0");
      79         3236663 :         buf.put_i32(8);
      80         3236663 :         buf.put_u64(self.remote_consistent_lsn.0);
      81         3236663 : 
      82         3236663 :         let timestamp = self
      83         3236663 :             .replytime
      84         3236663 :             .duration_since(*PG_EPOCH)
      85         3236663 :             .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
      86         3236663 :             .as_micros() as i64;
      87         3236663 : 
      88         3236663 :         buf.put_slice(b"ps_replytime\0");
      89         3236663 :         buf.put_i32(8);
      90         3236663 :         buf.put_i64(timestamp);
      91         3236663 :     }
      92                 : 
      93                 :     // Deserialize PageserverFeedback message
      94                 :     // TODO: change serialized fields names once all computes migrate to rename.
      95          779443 :     pub fn parse(mut buf: Bytes) -> PageserverFeedback {
      96          779443 :         let mut rf = PageserverFeedback::empty();
      97          779443 :         let nfields = buf.get_u8();
      98          779443 :         for _ in 0..nfields {
      99         3897216 :             let key = read_cstr(&mut buf).unwrap();
     100         3897216 :             match key.as_ref() {
     101         3897216 :                 b"current_timeline_size" => {
     102          779443 :                     let len = buf.get_i32();
     103          779443 :                     assert_eq!(len, 8);
     104          779443 :                     rf.current_timeline_size = buf.get_u64();
     105                 :                 }
     106         3117773 :                 b"ps_writelsn" => {
     107          779443 :                     let len = buf.get_i32();
     108          779443 :                     assert_eq!(len, 8);
     109          779443 :                     rf.last_received_lsn = Lsn(buf.get_u64());
     110                 :                 }
     111                 :                 b"ps_flushlsn" => {
     112          779443 :                     let len = buf.get_i32();
     113          779443 :                     assert_eq!(len, 8);
     114          779443 :                     rf.disk_consistent_lsn = Lsn(buf.get_u64());
     115                 :                 }
     116                 :                 b"ps_applylsn" => {
     117          779443 :                     let len = buf.get_i32();
     118          779443 :                     assert_eq!(len, 8);
     119          779443 :                     rf.remote_consistent_lsn = Lsn(buf.get_u64());
     120                 :                 }
     121          779444 :                 b"ps_replytime" => {
     122          779443 :                     let len = buf.get_i32();
     123          779443 :                     assert_eq!(len, 8);
     124          779443 :                     let raw_time = buf.get_i64();
     125          779443 :                     if raw_time > 0 {
     126          779443 :                         rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
     127          779443 :                     } else {
     128 UBC           0 :                         rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
     129               0 :                     }
     130                 :                 }
     131                 :                 _ => {
     132 CBC           1 :                     let len = buf.get_i32();
     133               1 :                     warn!(
     134 UBC           0 :                         "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
     135               0 :                         String::from_utf8_lossy(key.as_ref())
     136               0 :                     );
     137 CBC           1 :                     buf.advance(len as usize);
     138                 :                 }
     139                 :             }
     140                 :         }
     141          779443 :         trace!("PageserverFeedback parsed is {:?}", rf);
     142          779443 :         rf
     143          779443 :     }
     144                 : }
     145                 : 
     146                 : mod serde_systemtime {
     147                 :     use std::time::SystemTime;
     148                 : 
     149                 :     use chrono::{DateTime, Utc};
     150                 :     use serde::{Deserialize, Deserializer, Serializer};
     151                 : 
     152              97 :     pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
     153              97 :     where
     154              97 :         S: Serializer,
     155              97 :     {
     156              97 :         let chrono_dt: DateTime<Utc> = (*ts).into();
     157              97 :         serializer.serialize_str(&chrono_dt.to_rfc3339())
     158              97 :     }
     159                 : 
     160               1 :     pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
     161               1 :     where
     162               1 :         D: Deserializer<'de>,
     163               1 :     {
     164               1 :         let time: String = Deserialize::deserialize(deserializer)?;
     165               1 :         Ok(DateTime::parse_from_rfc3339(&time)
     166               1 :             .map_err(serde::de::Error::custom)?
     167               1 :             .into())
     168               1 :     }
     169                 : }
     170                 : 
     171                 : #[cfg(test)]
     172                 : mod tests {
     173                 :     use super::*;
     174                 : 
     175               1 :     #[test]
     176               1 :     fn test_replication_feedback_serialization() {
     177               1 :         let mut rf = PageserverFeedback::empty();
     178               1 :         // Fill rf with some values
     179               1 :         rf.current_timeline_size = 12345678;
     180               1 :         // Set rounded time to be able to compare it with deserialized value,
     181               1 :         // because it is rounded up to microseconds during serialization.
     182               1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     183               1 :         let mut data = BytesMut::new();
     184               1 :         rf.serialize(&mut data);
     185               1 : 
     186               1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     187               1 :         assert_eq!(rf, rf_parsed);
     188               1 :     }
     189                 : 
     190               1 :     #[test]
     191               1 :     fn test_replication_feedback_unknown_key() {
     192               1 :         let mut rf = PageserverFeedback::empty();
     193               1 :         // Fill rf with some values
     194               1 :         rf.current_timeline_size = 12345678;
     195               1 :         // Set rounded time to be able to compare it with deserialized value,
     196               1 :         // because it is rounded up to microseconds during serialization.
     197               1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     198               1 :         let mut data = BytesMut::new();
     199               1 :         rf.serialize(&mut data);
     200                 : 
     201                 :         // Add an extra field to the buffer and adjust number of keys
     202               1 :         if let Some(first) = data.first_mut() {
     203               1 :             *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
     204               1 :         }
     205                 : 
     206               1 :         data.put_slice(b"new_field_one\0");
     207               1 :         data.put_i32(8);
     208               1 :         data.put_u64(42);
     209               1 : 
     210               1 :         // Parse serialized data and check that new field is not parsed
     211               1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     212               1 :         assert_eq!(rf, rf_parsed);
     213               1 :     }
     214                 : }
        

Generated by: LCOV version 2.1-beta