LCOV - differential code coverage report
Current view: top level - libs/utils/src - pageserver_feedback.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 96.0 % 125 120 5 120
Current Date: 2024-01-09 02:06:09 Functions: 41.7 % 48 20 28 20
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::time::{Duration, SystemTime};
       2                 : 
       3                 : use bytes::{Buf, BufMut, Bytes, BytesMut};
       4                 : use pq_proto::{read_cstr, PG_EPOCH};
       5                 : use serde::{Deserialize, Serialize};
       6                 : use tracing::{trace, warn};
       7                 : 
       8                 : use crate::lsn::Lsn;
       9                 : 
      10                 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
      11                 : /// Serialized in custom flexible key/value format. In replication protocol, it
      12                 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
      13                 : /// Standby status update / Hot standby feedback messages.
      14                 : ///
      15                 : /// serde Serialize is used only for human readable dump to json (e.g. in
      16                 : /// safekeepers debug_dump).
      17 CBC          95 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
      18                 : pub struct PageserverFeedback {
      19                 :     /// Last known size of the timeline. Used to enforce timeline size limit.
      20                 :     pub current_timeline_size: u64,
      21                 :     /// LSN last received and ingested by the pageserver. Controls backpressure.
      22                 :     pub last_received_lsn: Lsn,
      23                 :     /// LSN up to which data is persisted by the pageserver to its local disc.
      24                 :     /// Controls backpressure.
      25                 :     pub disk_consistent_lsn: Lsn,
      26                 :     /// LSN up to which data is persisted by the pageserver on s3; safekeepers
      27                 :     /// consider WAL before it can be removed.
      28                 :     pub remote_consistent_lsn: Lsn,
      29                 :     // Serialize with RFC3339 format.
      30                 :     #[serde(with = "serde_systemtime")]
      31                 :     pub replytime: SystemTime,
      32                 : }
      33                 : 
      34                 : // NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
      35                 : // Do not remove previously available fields because this might be backwards incompatible.
      36                 : pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
      37                 : 
      38                 : impl PageserverFeedback {
      39         2427181 :     pub fn empty() -> PageserverFeedback {
      40         2427181 :         PageserverFeedback {
      41         2427181 :             current_timeline_size: 0,
      42         2427181 :             last_received_lsn: Lsn::INVALID,
      43         2427181 :             remote_consistent_lsn: Lsn::INVALID,
      44         2427181 :             disk_consistent_lsn: Lsn::INVALID,
      45         2427181 :             replytime: *PG_EPOCH,
      46         2427181 :         }
      47         2427181 :     }
      48                 : 
      49                 :     // Serialize PageserverFeedback using custom format
      50                 :     // to support protocol extensibility.
      51                 :     //
      52                 :     // Following layout is used:
      53                 :     // char - number of key-value pairs that follow.
      54                 :     //
      55                 :     // key-value pairs:
      56                 :     // null-terminated string - key,
      57                 :     // uint32 - value length in bytes
      58                 :     // value itself
      59                 :     //
      60                 :     // TODO: change serialized fields names once all computes migrate to rename.
      61         1860100 :     pub fn serialize(&self, buf: &mut BytesMut) {
      62         1860100 :         buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
      63         1860100 :         buf.put_slice(b"current_timeline_size\0");
      64         1860100 :         buf.put_i32(8);
      65         1860100 :         buf.put_u64(self.current_timeline_size);
      66         1860100 : 
      67         1860100 :         buf.put_slice(b"ps_writelsn\0");
      68         1860100 :         buf.put_i32(8);
      69         1860100 :         buf.put_u64(self.last_received_lsn.0);
      70         1860100 :         buf.put_slice(b"ps_flushlsn\0");
      71         1860100 :         buf.put_i32(8);
      72         1860100 :         buf.put_u64(self.disk_consistent_lsn.0);
      73         1860100 :         buf.put_slice(b"ps_applylsn\0");
      74         1860100 :         buf.put_i32(8);
      75         1860100 :         buf.put_u64(self.remote_consistent_lsn.0);
      76         1860100 : 
      77         1860100 :         let timestamp = self
      78         1860100 :             .replytime
      79         1860100 :             .duration_since(*PG_EPOCH)
      80         1860100 :             .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
      81         1860100 :             .as_micros() as i64;
      82         1860100 : 
      83         1860100 :         buf.put_slice(b"ps_replytime\0");
      84         1860100 :         buf.put_i32(8);
      85         1860100 :         buf.put_i64(timestamp);
      86         1860100 :     }
      87                 : 
      88                 :     // Deserialize PageserverFeedback message
      89                 :     // TODO: change serialized fields names once all computes migrate to rename.
      90          565813 :     pub fn parse(mut buf: Bytes) -> PageserverFeedback {
      91          565813 :         let mut rf = PageserverFeedback::empty();
      92          565813 :         let nfields = buf.get_u8();
      93          565813 :         for _ in 0..nfields {
      94         2829066 :             let key = read_cstr(&mut buf).unwrap();
      95         2829066 :             match key.as_ref() {
      96         2829066 :                 b"current_timeline_size" => {
      97          565813 :                     let len = buf.get_i32();
      98          565813 :                     assert_eq!(len, 8);
      99          565813 :                     rf.current_timeline_size = buf.get_u64();
     100                 :                 }
     101         2263253 :                 b"ps_writelsn" => {
     102          565813 :                     let len = buf.get_i32();
     103          565813 :                     assert_eq!(len, 8);
     104          565813 :                     rf.last_received_lsn = Lsn(buf.get_u64());
     105                 :                 }
     106                 :                 b"ps_flushlsn" => {
     107          565813 :                     let len = buf.get_i32();
     108          565813 :                     assert_eq!(len, 8);
     109          565813 :                     rf.disk_consistent_lsn = Lsn(buf.get_u64());
     110                 :                 }
     111                 :                 b"ps_applylsn" => {
     112          565813 :                     let len = buf.get_i32();
     113          565813 :                     assert_eq!(len, 8);
     114          565813 :                     rf.remote_consistent_lsn = Lsn(buf.get_u64());
     115                 :                 }
     116          565814 :                 b"ps_replytime" => {
     117          565813 :                     let len = buf.get_i32();
     118          565813 :                     assert_eq!(len, 8);
     119          565813 :                     let raw_time = buf.get_i64();
     120          565813 :                     if raw_time > 0 {
     121          565813 :                         rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
     122          565813 :                     } else {
     123 UBC           0 :                         rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
     124               0 :                     }
     125                 :                 }
     126                 :                 _ => {
     127 CBC           1 :                     let len = buf.get_i32();
     128               1 :                     warn!(
     129 UBC           0 :                         "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
     130               0 :                         String::from_utf8_lossy(key.as_ref())
     131               0 :                     );
     132 CBC           1 :                     buf.advance(len as usize);
     133                 :                 }
     134                 :             }
     135                 :         }
     136          565813 :         trace!("PageserverFeedback parsed is {:?}", rf);
     137          565813 :         rf
     138          565813 :     }
     139                 : }
     140                 : 
     141                 : mod serde_systemtime {
     142                 :     use std::time::SystemTime;
     143                 : 
     144                 :     use chrono::{DateTime, Utc};
     145                 :     use serde::{Deserialize, Deserializer, Serializer};
     146                 : 
     147              95 :     pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
     148              95 :     where
     149              95 :         S: Serializer,
     150              95 :     {
     151              95 :         let chrono_dt: DateTime<Utc> = (*ts).into();
     152              95 :         serializer.serialize_str(&chrono_dt.to_rfc3339())
     153              95 :     }
     154                 : 
     155               3 :     pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
     156               3 :     where
     157               3 :         D: Deserializer<'de>,
     158               3 :     {
     159               3 :         let time: String = Deserialize::deserialize(deserializer)?;
     160               3 :         Ok(DateTime::parse_from_rfc3339(&time)
     161               3 :             .map_err(serde::de::Error::custom)?
     162               3 :             .into())
     163               3 :     }
     164                 : }
     165                 : 
     166                 : #[cfg(test)]
     167                 : mod tests {
     168                 :     use super::*;
     169                 : 
     170               1 :     #[test]
     171               1 :     fn test_replication_feedback_serialization() {
     172               1 :         let mut rf = PageserverFeedback::empty();
     173               1 :         // Fill rf with some values
     174               1 :         rf.current_timeline_size = 12345678;
     175               1 :         // Set rounded time to be able to compare it with deserialized value,
     176               1 :         // because it is rounded up to microseconds during serialization.
     177               1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     178               1 :         let mut data = BytesMut::new();
     179               1 :         rf.serialize(&mut data);
     180               1 : 
     181               1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     182               1 :         assert_eq!(rf, rf_parsed);
     183               1 :     }
     184                 : 
     185               1 :     #[test]
     186               1 :     fn test_replication_feedback_unknown_key() {
     187               1 :         let mut rf = PageserverFeedback::empty();
     188               1 :         // Fill rf with some values
     189               1 :         rf.current_timeline_size = 12345678;
     190               1 :         // Set rounded time to be able to compare it with deserialized value,
     191               1 :         // because it is rounded up to microseconds during serialization.
     192               1 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     193               1 :         let mut data = BytesMut::new();
     194               1 :         rf.serialize(&mut data);
     195                 : 
     196                 :         // Add an extra field to the buffer and adjust number of keys
     197               1 :         if let Some(first) = data.first_mut() {
     198               1 :             *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
     199               1 :         }
     200                 : 
     201               1 :         data.put_slice(b"new_field_one\0");
     202               1 :         data.put_i32(8);
     203               1 :         data.put_u64(42);
     204               1 : 
     205               1 :         // Parse serialized data and check that new field is not parsed
     206               1 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     207               1 :         assert_eq!(rf, rf_parsed);
     208               1 :     }
     209                 : }
        

Generated by: LCOV version 2.1-beta