LCOV - code coverage report
Current view: top level - pageserver/src/tenant - debug.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 10.9 % 239 26
Test Date: 2025-07-16 12:29:03 Functions: 42.9 % 14 6

            Line data    Source code
       1              : use std::{ops::Range, str::FromStr, sync::Arc};
       2              : 
       3              : use crate::walredo::RedoAttemptType;
       4              : use base64::{Engine as _, engine::general_purpose::STANDARD};
       5              : use bytes::{Bytes, BytesMut};
       6              : use camino::Utf8PathBuf;
       7              : use clap::Parser;
       8              : use itertools::Itertools;
       9              : use pageserver_api::{
      10              :     key::Key,
      11              :     keyspace::KeySpace,
      12              :     shard::{ShardIdentity, ShardStripeSize},
      13              : };
      14              : use postgres_ffi::PgMajorVersion;
      15              : use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn};
      16              : use tracing::Instrument;
      17              : use utils::{
      18              :     generation::Generation,
      19              :     id::{TenantId, TimelineId},
      20              :     lsn::Lsn,
      21              :     shard::{ShardCount, ShardIndex, ShardNumber},
      22              : };
      23              : use wal_decoder::models::record::NeonWalRecord;
      24              : 
      25              : use crate::{
      26              :     context::{DownloadBehavior, RequestContext},
      27              :     task_mgr::TaskKind,
      28              :     tenant::storage_layer::ValueReconstructState,
      29              :     walredo::harness::RedoHarness,
      30              : };
      31              : 
      32              : use super::{
      33              :     WalRedoManager, WalredoManagerId,
      34              :     harness::TenantHarness,
      35              :     remote_timeline_client::LayerFileMetadata,
      36              :     storage_layer::{AsLayerDesc, IoConcurrency, Layer, LayerName, ValuesReconstructState},
      37              : };
      38              : 
      39            0 : fn process_page_image(next_record_lsn: Lsn, is_fpw: bool, img_bytes: Bytes) -> Bytes {
      40              :     // To match the logic in libs/wal_decoder/src/serialized_batch.rs
      41            0 :     let mut new_image: BytesMut = img_bytes.into();
      42            0 :     if is_fpw && !page_is_new(&new_image) {
      43            0 :         page_set_lsn(&mut new_image, next_record_lsn);
      44            0 :     }
      45            0 :     assert_eq!(new_image.len(), BLCKSZ as usize);
      46            0 :     new_image.freeze()
      47            0 : }
      48              : 
      49            0 : async fn redo_wals(input: &str, key: Key) -> anyhow::Result<()> {
      50            0 :     let tenant_id = TenantId::generate();
      51            0 :     let timeline_id = TimelineId::generate();
      52            0 :     let redo_harness = RedoHarness::new()?;
      53            0 :     let span = redo_harness.span();
      54            0 :     let tenant_conf = pageserver_api::models::TenantConfig {
      55            0 :         ..Default::default()
      56            0 :     };
      57              : 
      58            0 :     let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
      59            0 :     let tenant = TenantHarness::create_custom(
      60            0 :         "search_key",
      61            0 :         tenant_conf,
      62            0 :         tenant_id,
      63            0 :         ShardIdentity::unsharded(),
      64            0 :         Generation::new(1),
      65            0 :     )
      66            0 :     .await?
      67            0 :     .do_try_load_with_redo(
      68            0 :         Arc::new(WalRedoManager::Prod(
      69            0 :             WalredoManagerId::next(),
      70            0 :             redo_harness.manager,
      71            0 :         )),
      72            0 :         &ctx,
      73              :     )
      74            0 :     .await
      75            0 :     .unwrap();
      76            0 :     let timeline = tenant
      77            0 :         .create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
      78            0 :         .await?;
      79            0 :     let contents = tokio::fs::read_to_string(input)
      80            0 :         .await
      81            0 :         .map_err(|e| anyhow::Error::msg(format!("Failed to read input file {input}: {e}")))
      82            0 :         .unwrap();
      83            0 :     let lines = contents.lines();
      84            0 :     let mut last_wal_lsn: Option<Lsn> = None;
      85            0 :     let state = {
      86            0 :         let mut state = ValueReconstructState::default();
      87            0 :         let mut is_fpw = false;
      88            0 :         let mut is_first_line = true;
      89            0 :         for line in lines {
      90            0 :             if is_first_line {
      91            0 :                 is_first_line = false;
      92            0 :                 if line.trim() == "FPW" {
      93            0 :                     is_fpw = true;
      94            0 :                 }
      95            0 :                 continue; // Skip the first line.
      96            0 :             }
      97              :             // Each input line is in the "<next_record_lsn>,<base64>" format.
      98            0 :             let (lsn_str, payload_b64) = line
      99            0 :                 .split_once(',')
     100            0 :                 .expect("Invalid input format: expected '<lsn>,<base64>'");
     101              : 
     102              :             // Parse the LSN and decode the payload.
     103            0 :             let lsn = Lsn::from_str(lsn_str.trim()).expect("Invalid LSN format");
     104            0 :             let bytes = Bytes::from(
     105            0 :                 STANDARD
     106            0 :                     .decode(payload_b64.trim())
     107            0 :                     .expect("Invalid base64 payload"),
     108              :             );
     109              : 
     110              :             // The first line is considered the base image, the rest are WAL records.
     111            0 :             if state.img.is_none() {
     112            0 :                 state.img = Some((lsn, process_page_image(lsn, is_fpw, bytes)));
     113            0 :             } else {
     114            0 :                 let wal_record = NeonWalRecord::Postgres {
     115            0 :                     will_init: false,
     116            0 :                     rec: bytes,
     117            0 :                 };
     118            0 :                 state.records.push((lsn, wal_record));
     119            0 :                 last_wal_lsn.replace(lsn);
     120            0 :             }
     121              :         }
     122            0 :         state
     123              :     };
     124              : 
     125            0 :     assert!(state.img.is_some(), "No base image found");
     126            0 :     assert!(!state.records.is_empty(), "No WAL records found");
     127            0 :     let result = timeline
     128            0 :         .reconstruct_value(key, last_wal_lsn.unwrap(), state, RedoAttemptType::ReadPage)
     129            0 :         .instrument(span.clone())
     130            0 :         .await?;
     131              : 
     132            0 :     eprintln!("final image: {:?}", STANDARD.encode(result));
     133              : 
     134            0 :     Ok(())
     135            0 : }
     136              : 
     137            0 : async fn search_key(
     138            0 :     tenant_id: TenantId,
     139            0 :     timeline_id: TimelineId,
     140            0 :     dir: String,
     141            0 :     key: Key,
     142            0 :     lsn: Lsn,
     143            0 : ) -> anyhow::Result<()> {
     144            0 :     let shard_index = ShardIndex {
     145            0 :         shard_number: ShardNumber(0),
     146            0 :         shard_count: ShardCount(4),
     147            0 :     };
     148              : 
     149            0 :     let redo_harness = RedoHarness::new()?;
     150            0 :     let span = redo_harness.span();
     151            0 :     let tenant_conf = pageserver_api::models::TenantConfig {
     152            0 :         ..Default::default()
     153            0 :     };
     154            0 :     let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
     155            0 :     let tenant = TenantHarness::create_custom(
     156            0 :         "search_key",
     157            0 :         tenant_conf,
     158            0 :         tenant_id,
     159            0 :         ShardIdentity::new(
     160            0 :             shard_index.shard_number,
     161            0 :             shard_index.shard_count,
     162            0 :             ShardStripeSize(32768),
     163            0 :         )
     164            0 :         .unwrap(),
     165            0 :         Generation::new(1),
     166            0 :     )
     167            0 :     .await?
     168            0 :     .do_try_load_with_redo(
     169            0 :         Arc::new(WalRedoManager::Prod(
     170            0 :             WalredoManagerId::next(),
     171            0 :             redo_harness.manager,
     172            0 :         )),
     173            0 :         &ctx,
     174              :     )
     175            0 :     .await
     176            0 :     .unwrap();
     177              : 
     178            0 :     let timeline = tenant
     179            0 :         .create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
     180            0 :         .await?;
     181              : 
     182            0 :     let mut delta_layers: Vec<Layer> = Vec::new();
     183            0 :     let mut img_layer: Option<Layer> = Option::None;
     184            0 :     let mut dir = tokio::fs::read_dir(dir).await?;
     185              :     loop {
     186            0 :         let entry = dir.next_entry().await?;
     187            0 :         if entry.is_none() || !entry.as_ref().unwrap().file_type().await?.is_file() {
     188            0 :             break;
     189            0 :         }
     190            0 :         let path = Utf8PathBuf::from_path_buf(entry.unwrap().path()).unwrap();
     191            0 :         let layer_name = match LayerName::from_str(path.file_name().unwrap()) {
     192            0 :             Ok(name) => name,
     193              :             Err(_) => {
     194            0 :                 eprintln!("Skipped invalid layer: {path}");
     195            0 :                 continue;
     196              :             }
     197              :         };
     198            0 :         let layer = Layer::for_resident(
     199            0 :             tenant.conf,
     200            0 :             &timeline,
     201            0 :             path.clone(),
     202            0 :             layer_name,
     203            0 :             LayerFileMetadata::new(
     204            0 :                 tokio::fs::metadata(path.clone()).await?.len(),
     205            0 :                 Generation::new(1),
     206            0 :                 shard_index,
     207              :             ),
     208              :         );
     209            0 :         if layer.layer_desc().is_delta() {
     210            0 :             delta_layers.push(layer.into());
     211            0 :         } else if img_layer.is_none() {
     212            0 :             img_layer = Some(layer.into());
     213            0 :         } else {
     214            0 :             anyhow::bail!("Found multiple image layers");
     215              :         }
     216              :     }
     217              :     // sort delta layers based on the descending order of LSN
     218            0 :     delta_layers.sort_by(|a, b| {
     219            0 :         b.layer_desc()
     220            0 :             .get_lsn_range()
     221            0 :             .start
     222            0 :             .cmp(&a.layer_desc().get_lsn_range().start)
     223            0 :     });
     224              : 
     225            0 :     let mut state = ValuesReconstructState::new(IoConcurrency::Sequential);
     226              : 
     227            0 :     let key_space = KeySpace::single(Range {
     228            0 :         start: key,
     229            0 :         end: key.next(),
     230            0 :     });
     231            0 :     let lsn_range = Range {
     232            0 :         start: img_layer
     233            0 :             .as_ref()
     234            0 :             .map_or(Lsn(0x00), |img| img.layer_desc().image_layer_lsn()),
     235            0 :         end: lsn,
     236              :     };
     237            0 :     for delta_layer in delta_layers.iter() {
     238            0 :         delta_layer
     239            0 :             .get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
     240            0 :             .await?;
     241              :     }
     242              : 
     243            0 :     img_layer
     244            0 :         .as_ref()
     245            0 :         .unwrap()
     246            0 :         .get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
     247            0 :         .await?;
     248              : 
     249            0 :     for (_key, result) in std::mem::take(&mut state.keys) {
     250            0 :         let state = result.collect_pending_ios().await?;
     251            0 :         if state.img.is_some() {
     252            0 :             eprintln!(
     253            0 :                 "image: {}: {:x?}",
     254            0 :                 state.img.as_ref().unwrap().0,
     255            0 :                 STANDARD.encode(state.img.as_ref().unwrap().1.clone())
     256            0 :             );
     257            0 :         }
     258            0 :         for delta in state.records.iter() {
     259            0 :             match &delta.1 {
     260            0 :                 NeonWalRecord::Postgres { will_init, rec } => {
     261            0 :                     eprintln!(
     262            0 :                         "delta: {}: will_init: {}, {:x?}",
     263            0 :                         delta.0,
     264            0 :                         will_init,
     265            0 :                         STANDARD.encode(rec)
     266            0 :                     );
     267            0 :                 }
     268            0 :                 _ => {
     269            0 :                     eprintln!("delta: {}: {:x?}", delta.0, delta.1);
     270            0 :                 }
     271              :             }
     272              :         }
     273              : 
     274            0 :         let result = timeline
     275            0 :             .reconstruct_value(key, lsn_range.end, state, RedoAttemptType::ReadPage)
     276            0 :             .instrument(span.clone())
     277            0 :             .await?;
     278            0 :         eprintln!("final image: {lsn} : {result:?}");
     279              :     }
     280              : 
     281            0 :     Ok(())
     282            0 : }
     283              : 
     284              : /// Redo all WALs against the base image in the input file. Return the base64 encoded final image.
     285              : /// Each line in the input file must be in the form "<lsn>,<base64>" where:
     286              : ///   * `<lsn>` is a PostgreSQL LSN in hexadecimal notation, e.g. `0/16ABCDE`.
     287              : ///   * `<base64>` is the base64‐encoded page image (first line) or WAL record (subsequent lines).
     288              : ///
     289              : /// The first line provides the base image of a page. The LSN is the LSN of "next record" following
     290              : /// the record containing the FPI. For example, if the FPI was extracted from a WAL record occuping
     291              : /// [0/1, 0/200) in the WAL stream, the LSN appearing along side the page image here should be 0/200.
     292              : ///
     293              : /// The subsequent lines are WAL records, ordered from the oldest to the newest. The LSN is the
     294              : /// record LSN of the WAL record, not the "next record" LSN. For example, if the WAL record here
     295              : /// occupies [0/1, 0/200) in the WAL stream, the LSN appearing along side the WAL record here should
     296              : /// be 0/1.
     297              : #[derive(Parser)]
     298              : struct RedoWalsCmd {
     299              :     #[clap(long)]
     300              :     input: String,
     301              :     #[clap(long)]
     302              :     key: String,
     303              : }
     304              : 
     305              : #[tokio::test]
     306            1 : async fn test_redo_wals() -> anyhow::Result<()> {
     307            1 :     let args = std::env::args().collect_vec();
     308            1 :     let pos = args
     309            1 :         .iter()
     310            4 :         .position(|arg| arg == "--")
     311            1 :         .unwrap_or(args.len());
     312            1 :     let slice = &args[pos..args.len()];
     313            1 :     let cmd = match RedoWalsCmd::try_parse_from(slice) {
     314            0 :         Ok(cmd) => cmd,
     315            1 :         Err(err) => {
     316            1 :             eprintln!("{err}");
     317            1 :             return Ok(());
     318              :         }
     319              :     };
     320              : 
     321            0 :     let key = Key::from_hex(&cmd.key).unwrap();
     322            0 :     redo_wals(&cmd.input, key).await?;
     323              : 
     324            1 :     Ok(())
     325            1 : }
     326              : 
     327              : /// Search for a page at the given LSN in all layers of the data_dir.
     328              : /// Return the base64-encoded image and all WAL records, as well as the final reconstructed image.
     329              : #[derive(Parser)]
     330              : struct SearchKeyCmd {
     331              :     #[clap(long)]
     332              :     tenant_id: String,
     333              :     #[clap(long)]
     334              :     timeline_id: String,
     335              :     #[clap(long)]
     336              :     data_dir: String,
     337              :     #[clap(long)]
     338              :     key: String,
     339              :     #[clap(long)]
     340              :     lsn: String,
     341              : }
     342              : 
     343              : #[tokio::test]
     344            1 : async fn test_search_key() -> anyhow::Result<()> {
     345            1 :     let args = std::env::args().collect_vec();
     346            1 :     let pos = args
     347            1 :         .iter()
     348            4 :         .position(|arg| arg == "--")
     349            1 :         .unwrap_or(args.len());
     350            1 :     let slice = &args[pos..args.len()];
     351            1 :     let cmd = match SearchKeyCmd::try_parse_from(slice) {
     352            0 :         Ok(cmd) => cmd,
     353            1 :         Err(err) => {
     354            1 :             eprintln!("{err}");
     355            1 :             return Ok(());
     356              :         }
     357              :     };
     358              : 
     359            0 :     let tenant_id = TenantId::from_str(&cmd.tenant_id).unwrap();
     360            0 :     let timeline_id = TimelineId::from_str(&cmd.timeline_id).unwrap();
     361            0 :     let key = Key::from_hex(&cmd.key).unwrap();
     362            0 :     let lsn = Lsn::from_str(&cmd.lsn).unwrap();
     363            0 :     search_key(tenant_id, timeline_id, cmd.data_dir, key, lsn).await?;
     364              : 
     365            1 :     Ok(())
     366            1 : }
        

Generated by: LCOV version 2.1-beta