LCOV - code coverage report
Current view: top level - libs/utils/src - pageserver_feedback.rs (source / functions) Coverage Total Hit
Test: 2aa98e37cd3250b9a68c97ef6050b16fe702ab33.info Lines: 80.6 % 144 116
Test Date: 2024-08-29 11:33:10 Functions: 14.3 % 35 5

            Line data    Source code
       1              : use std::time::{Duration, SystemTime};
       2              : 
       3              : use bytes::{Buf, BufMut, Bytes, BytesMut};
       4              : use pq_proto::{read_cstr, PG_EPOCH};
       5              : use serde::{Deserialize, Serialize};
       6              : use tracing::{trace, warn};
       7              : 
       8              : use crate::lsn::Lsn;
       9              : 
      10              : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
      11              : /// Serialized in custom flexible key/value format. In replication protocol, it
      12              : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
      13              : /// Standby status update / Hot standby feedback messages.
      14              : ///
      15              : /// serde Serialize is used only for human readable dump to json (e.g. in
      16              : /// safekeepers debug_dump).
      17            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
      18              : pub struct PageserverFeedback {
      19              :     /// Last known size of the timeline. Used to enforce timeline size limit.
      20              :     pub current_timeline_size: u64,
      21              :     /// LSN last received and ingested by the pageserver. Controls backpressure.
      22              :     pub last_received_lsn: Lsn,
      23              :     /// LSN up to which data is persisted by the pageserver to its local disc.
      24              :     /// Controls backpressure.
      25              :     pub disk_consistent_lsn: Lsn,
      26              :     /// LSN up to which data is persisted by the pageserver on s3; safekeepers
      27              :     /// consider WAL before it can be removed.
      28              :     pub remote_consistent_lsn: Lsn,
      29              :     // Serialize with RFC3339 format.
      30              :     #[serde(with = "serde_systemtime")]
      31              :     pub replytime: SystemTime,
      32              :     /// Used to track feedbacks from different shards. Always zero for unsharded tenants.
      33              :     pub shard_number: u32,
      34              : }
      35              : 
      36              : impl PageserverFeedback {
      37           36 :     pub fn empty() -> PageserverFeedback {
      38           36 :         PageserverFeedback {
      39           36 :             current_timeline_size: 0,
      40           36 :             last_received_lsn: Lsn::INVALID,
      41           36 :             remote_consistent_lsn: Lsn::INVALID,
      42           36 :             disk_consistent_lsn: Lsn::INVALID,
      43           36 :             replytime: *PG_EPOCH,
      44           36 :             shard_number: 0,
      45           36 :         }
      46           36 :     }
      47              : 
      48              :     // Serialize PageserverFeedback using custom format
      49              :     // to support protocol extensibility.
      50              :     //
      51              :     // Following layout is used:
      52              :     // char - number of key-value pairs that follow.
      53              :     //
      54              :     // key-value pairs:
      55              :     // null-terminated string - key,
      56              :     // uint32 - value length in bytes
      57              :     // value itself
      58              :     //
      59              :     // TODO: change serialized fields names once all computes migrate to rename.
      60           12 :     pub fn serialize(&self, buf: &mut BytesMut) {
      61           12 :         let buf_ptr = buf.len();
      62           12 :         buf.put_u8(0); // # of keys, will be filled later
      63           12 :         let mut nkeys = 0;
      64           12 : 
      65           12 :         nkeys += 1;
      66           12 :         buf.put_slice(b"current_timeline_size\0");
      67           12 :         buf.put_i32(8);
      68           12 :         buf.put_u64(self.current_timeline_size);
      69           12 : 
      70           12 :         nkeys += 1;
      71           12 :         buf.put_slice(b"ps_writelsn\0");
      72           12 :         buf.put_i32(8);
      73           12 :         buf.put_u64(self.last_received_lsn.0);
      74           12 : 
      75           12 :         nkeys += 1;
      76           12 :         buf.put_slice(b"ps_flushlsn\0");
      77           12 :         buf.put_i32(8);
      78           12 :         buf.put_u64(self.disk_consistent_lsn.0);
      79           12 : 
      80           12 :         nkeys += 1;
      81           12 :         buf.put_slice(b"ps_applylsn\0");
      82           12 :         buf.put_i32(8);
      83           12 :         buf.put_u64(self.remote_consistent_lsn.0);
      84           12 : 
      85           12 :         let timestamp = self
      86           12 :             .replytime
      87           12 :             .duration_since(*PG_EPOCH)
      88           12 :             .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
      89           12 :             .as_micros() as i64;
      90           12 : 
      91           12 :         nkeys += 1;
      92           12 :         buf.put_slice(b"ps_replytime\0");
      93           12 :         buf.put_i32(8);
      94           12 :         buf.put_i64(timestamp);
      95           12 : 
      96           12 :         if self.shard_number > 0 {
      97            0 :             nkeys += 1;
      98            0 :             buf.put_slice(b"shard_number\0");
      99            0 :             buf.put_i32(4);
     100            0 :             buf.put_u32(self.shard_number);
     101           12 :         }
     102              : 
     103           12 :         buf[buf_ptr] = nkeys;
     104           12 :     }
     105              : 
     106              :     // Deserialize PageserverFeedback message
     107              :     // TODO: change serialized fields names once all computes migrate to rename.
     108           12 :     pub fn parse(mut buf: Bytes) -> PageserverFeedback {
     109           12 :         let mut rf = PageserverFeedback::empty();
     110           12 :         let nfields = buf.get_u8();
     111           12 :         for _ in 0..nfields {
     112           66 :             let key = read_cstr(&mut buf).unwrap();
     113           66 :             match key.as_ref() {
     114           66 :                 b"current_timeline_size" => {
     115           12 :                     let len = buf.get_i32();
     116           12 :                     assert_eq!(len, 8);
     117           12 :                     rf.current_timeline_size = buf.get_u64();
     118              :                 }
     119           54 :                 b"ps_writelsn" => {
     120           12 :                     let len = buf.get_i32();
     121           12 :                     assert_eq!(len, 8);
     122           12 :                     rf.last_received_lsn = Lsn(buf.get_u64());
     123              :                 }
     124              :                 b"ps_flushlsn" => {
     125           12 :                     let len = buf.get_i32();
     126           12 :                     assert_eq!(len, 8);
     127           12 :                     rf.disk_consistent_lsn = Lsn(buf.get_u64());
     128              :                 }
     129              :                 b"ps_applylsn" => {
     130           12 :                     let len = buf.get_i32();
     131           12 :                     assert_eq!(len, 8);
     132           12 :                     rf.remote_consistent_lsn = Lsn(buf.get_u64());
     133              :                 }
     134           18 :                 b"ps_replytime" => {
     135           12 :                     let len = buf.get_i32();
     136           12 :                     assert_eq!(len, 8);
     137           12 :                     let raw_time = buf.get_i64();
     138           12 :                     if raw_time > 0 {
     139           12 :                         rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
     140           12 :                     } else {
     141            0 :                         rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
     142            0 :                     }
     143              :                 }
     144              :                 b"shard_number" => {
     145            0 :                     let len = buf.get_i32();
     146            0 :                     assert_eq!(len, 4);
     147            0 :                     rf.shard_number = buf.get_u32();
     148              :                 }
     149              :                 _ => {
     150            6 :                     let len = buf.get_i32();
     151            6 :                     warn!(
     152            0 :                         "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
     153            0 :                         String::from_utf8_lossy(key.as_ref())
     154              :                     );
     155            6 :                     buf.advance(len as usize);
     156              :                 }
     157              :             }
     158              :         }
     159           12 :         trace!("PageserverFeedback parsed is {:?}", rf);
     160           12 :         rf
     161           12 :     }
     162              : }
     163              : 
     164              : mod serde_systemtime {
     165              :     use std::time::SystemTime;
     166              : 
     167              :     use chrono::{DateTime, Utc};
     168              :     use serde::{Deserialize, Deserializer, Serializer};
     169              : 
     170            0 :     pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
     171            0 :     where
     172            0 :         S: Serializer,
     173            0 :     {
     174            0 :         let chrono_dt: DateTime<Utc> = (*ts).into();
     175            0 :         serializer.serialize_str(&chrono_dt.to_rfc3339())
     176            0 :     }
     177              : 
     178            0 :     pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
     179            0 :     where
     180            0 :         D: Deserializer<'de>,
     181            0 :     {
     182            0 :         let time: String = Deserialize::deserialize(deserializer)?;
     183            0 :         Ok(DateTime::parse_from_rfc3339(&time)
     184            0 :             .map_err(serde::de::Error::custom)?
     185            0 :             .into())
     186            0 :     }
     187              : }
     188              : 
     189              : #[cfg(test)]
     190              : mod tests {
     191              :     use super::*;
     192              : 
     193              :     #[test]
     194            6 :     fn test_replication_feedback_serialization() {
     195            6 :         let mut rf = PageserverFeedback::empty();
     196            6 :         // Fill rf with some values
     197            6 :         rf.current_timeline_size = 12345678;
     198            6 :         // Set rounded time to be able to compare it with deserialized value,
     199            6 :         // because it is rounded up to microseconds during serialization.
     200            6 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     201            6 :         let mut data = BytesMut::new();
     202            6 :         rf.serialize(&mut data);
     203            6 : 
     204            6 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     205            6 :         assert_eq!(rf, rf_parsed);
     206            6 :     }
     207              : 
     208              :     #[test]
     209            6 :     fn test_replication_feedback_unknown_key() {
     210            6 :         let mut rf = PageserverFeedback::empty();
     211            6 :         // Fill rf with some values
     212            6 :         rf.current_timeline_size = 12345678;
     213            6 :         // Set rounded time to be able to compare it with deserialized value,
     214            6 :         // because it is rounded up to microseconds during serialization.
     215            6 :         rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
     216            6 :         let mut data = BytesMut::new();
     217            6 :         rf.serialize(&mut data);
     218            6 : 
     219            6 :         // Add an extra field to the buffer and adjust number of keys
     220            6 :         data[0] += 1;
     221            6 :         data.put_slice(b"new_field_one\0");
     222            6 :         data.put_i32(8);
     223            6 :         data.put_u64(42);
     224            6 : 
     225            6 :         // Parse serialized data and check that new field is not parsed
     226            6 :         let rf_parsed = PageserverFeedback::parse(data.freeze());
     227            6 :         assert_eq!(rf, rf_parsed);
     228            6 :     }
     229              : }
        

Generated by: LCOV version 2.1-beta