Line data Source code
1 : use std::{ops::Range, str::FromStr, sync::Arc};
2 :
3 : use crate::walredo::RedoAttemptType;
4 : use base64::{Engine as _, engine::general_purpose::STANDARD};
5 : use bytes::{Bytes, BytesMut};
6 : use camino::Utf8PathBuf;
7 : use clap::Parser;
8 : use itertools::Itertools;
9 : use pageserver_api::{
10 : key::Key,
11 : keyspace::KeySpace,
12 : shard::{ShardIdentity, ShardStripeSize},
13 : };
14 : use postgres_ffi::PgMajorVersion;
15 : use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn};
16 : use tracing::Instrument;
17 : use utils::{
18 : generation::Generation,
19 : id::{TenantId, TimelineId},
20 : lsn::Lsn,
21 : shard::{ShardCount, ShardIndex, ShardNumber},
22 : };
23 : use wal_decoder::models::record::NeonWalRecord;
24 :
25 : use crate::{
26 : context::{DownloadBehavior, RequestContext},
27 : task_mgr::TaskKind,
28 : tenant::storage_layer::ValueReconstructState,
29 : walredo::harness::RedoHarness,
30 : };
31 :
32 : use super::{
33 : WalRedoManager, WalredoManagerId,
34 : harness::TenantHarness,
35 : remote_timeline_client::LayerFileMetadata,
36 : storage_layer::{AsLayerDesc, IoConcurrency, Layer, LayerName, ValuesReconstructState},
37 : };
38 :
39 0 : fn process_page_image(next_record_lsn: Lsn, is_fpw: bool, img_bytes: Bytes) -> Bytes {
40 : // To match the logic in libs/wal_decoder/src/serialized_batch.rs
41 0 : let mut new_image: BytesMut = img_bytes.into();
42 0 : if is_fpw && !page_is_new(&new_image) {
43 0 : page_set_lsn(&mut new_image, next_record_lsn);
44 0 : }
45 0 : assert_eq!(new_image.len(), BLCKSZ as usize);
46 0 : new_image.freeze()
47 0 : }
48 :
49 0 : async fn redo_wals(input: &str, key: Key) -> anyhow::Result<()> {
50 0 : let tenant_id = TenantId::generate();
51 0 : let timeline_id = TimelineId::generate();
52 0 : let redo_harness = RedoHarness::new()?;
53 0 : let span = redo_harness.span();
54 0 : let tenant_conf = pageserver_api::models::TenantConfig {
55 0 : ..Default::default()
56 0 : };
57 :
58 0 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
59 0 : let tenant = TenantHarness::create_custom(
60 0 : "search_key",
61 0 : tenant_conf,
62 0 : tenant_id,
63 0 : ShardIdentity::unsharded(),
64 0 : Generation::new(1),
65 0 : )
66 0 : .await?
67 0 : .do_try_load_with_redo(
68 0 : Arc::new(WalRedoManager::Prod(
69 0 : WalredoManagerId::next(),
70 0 : redo_harness.manager,
71 0 : )),
72 0 : &ctx,
73 : )
74 0 : .await
75 0 : .unwrap();
76 0 : let timeline = tenant
77 0 : .create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
78 0 : .await?;
79 0 : let contents = tokio::fs::read_to_string(input)
80 0 : .await
81 0 : .map_err(|e| anyhow::Error::msg(format!("Failed to read input file {input}: {e}")))
82 0 : .unwrap();
83 0 : let lines = contents.lines();
84 0 : let mut last_wal_lsn: Option<Lsn> = None;
85 0 : let state = {
86 0 : let mut state = ValueReconstructState::default();
87 0 : let mut is_fpw = false;
88 0 : let mut is_first_line = true;
89 0 : for line in lines {
90 0 : if is_first_line {
91 0 : is_first_line = false;
92 0 : if line.trim() == "FPW" {
93 0 : is_fpw = true;
94 0 : }
95 0 : continue; // Skip the first line.
96 0 : }
97 : // Each input line is in the "<next_record_lsn>,<base64>" format.
98 0 : let (lsn_str, payload_b64) = line
99 0 : .split_once(',')
100 0 : .expect("Invalid input format: expected '<lsn>,<base64>'");
101 :
102 : // Parse the LSN and decode the payload.
103 0 : let lsn = Lsn::from_str(lsn_str.trim()).expect("Invalid LSN format");
104 0 : let bytes = Bytes::from(
105 0 : STANDARD
106 0 : .decode(payload_b64.trim())
107 0 : .expect("Invalid base64 payload"),
108 : );
109 :
110 : // The first line is considered the base image, the rest are WAL records.
111 0 : if state.img.is_none() {
112 0 : state.img = Some((lsn, process_page_image(lsn, is_fpw, bytes)));
113 0 : } else {
114 0 : let wal_record = NeonWalRecord::Postgres {
115 0 : will_init: false,
116 0 : rec: bytes,
117 0 : };
118 0 : state.records.push((lsn, wal_record));
119 0 : last_wal_lsn.replace(lsn);
120 0 : }
121 : }
122 0 : state
123 : };
124 :
125 0 : assert!(state.img.is_some(), "No base image found");
126 0 : assert!(!state.records.is_empty(), "No WAL records found");
127 0 : let result = timeline
128 0 : .reconstruct_value(key, last_wal_lsn.unwrap(), state, RedoAttemptType::ReadPage)
129 0 : .instrument(span.clone())
130 0 : .await?;
131 :
132 0 : eprintln!("final image: {:?}", STANDARD.encode(result));
133 :
134 0 : Ok(())
135 0 : }
136 :
137 0 : async fn search_key(
138 0 : tenant_id: TenantId,
139 0 : timeline_id: TimelineId,
140 0 : dir: String,
141 0 : key: Key,
142 0 : lsn: Lsn,
143 0 : ) -> anyhow::Result<()> {
144 0 : let shard_index = ShardIndex {
145 0 : shard_number: ShardNumber(0),
146 0 : shard_count: ShardCount(4),
147 0 : };
148 :
149 0 : let redo_harness = RedoHarness::new()?;
150 0 : let span = redo_harness.span();
151 0 : let tenant_conf = pageserver_api::models::TenantConfig {
152 0 : ..Default::default()
153 0 : };
154 0 : let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
155 0 : let tenant = TenantHarness::create_custom(
156 0 : "search_key",
157 0 : tenant_conf,
158 0 : tenant_id,
159 0 : ShardIdentity::new(
160 0 : shard_index.shard_number,
161 0 : shard_index.shard_count,
162 0 : ShardStripeSize(32768),
163 0 : )
164 0 : .unwrap(),
165 0 : Generation::new(1),
166 0 : )
167 0 : .await?
168 0 : .do_try_load_with_redo(
169 0 : Arc::new(WalRedoManager::Prod(
170 0 : WalredoManagerId::next(),
171 0 : redo_harness.manager,
172 0 : )),
173 0 : &ctx,
174 : )
175 0 : .await
176 0 : .unwrap();
177 :
178 0 : let timeline = tenant
179 0 : .create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
180 0 : .await?;
181 :
182 0 : let mut delta_layers: Vec<Layer> = Vec::new();
183 0 : let mut img_layer: Option<Layer> = Option::None;
184 0 : let mut dir = tokio::fs::read_dir(dir).await?;
185 : loop {
186 0 : let entry = dir.next_entry().await?;
187 0 : if entry.is_none() || !entry.as_ref().unwrap().file_type().await?.is_file() {
188 0 : break;
189 0 : }
190 0 : let path = Utf8PathBuf::from_path_buf(entry.unwrap().path()).unwrap();
191 0 : let layer_name = match LayerName::from_str(path.file_name().unwrap()) {
192 0 : Ok(name) => name,
193 : Err(_) => {
194 0 : eprintln!("Skipped invalid layer: {path}");
195 0 : continue;
196 : }
197 : };
198 0 : let layer = Layer::for_resident(
199 0 : tenant.conf,
200 0 : &timeline,
201 0 : path.clone(),
202 0 : layer_name,
203 0 : LayerFileMetadata::new(
204 0 : tokio::fs::metadata(path.clone()).await?.len(),
205 0 : Generation::new(1),
206 0 : shard_index,
207 : ),
208 : );
209 0 : if layer.layer_desc().is_delta() {
210 0 : delta_layers.push(layer.into());
211 0 : } else if img_layer.is_none() {
212 0 : img_layer = Some(layer.into());
213 0 : } else {
214 0 : anyhow::bail!("Found multiple image layers");
215 : }
216 : }
217 : // sort delta layers based on the descending order of LSN
218 0 : delta_layers.sort_by(|a, b| {
219 0 : b.layer_desc()
220 0 : .get_lsn_range()
221 0 : .start
222 0 : .cmp(&a.layer_desc().get_lsn_range().start)
223 0 : });
224 :
225 0 : let mut state = ValuesReconstructState::new(IoConcurrency::Sequential);
226 :
227 0 : let key_space = KeySpace::single(Range {
228 0 : start: key,
229 0 : end: key.next(),
230 0 : });
231 0 : let lsn_range = Range {
232 0 : start: img_layer
233 0 : .as_ref()
234 0 : .map_or(Lsn(0x00), |img| img.layer_desc().image_layer_lsn()),
235 0 : end: lsn,
236 : };
237 0 : for delta_layer in delta_layers.iter() {
238 0 : delta_layer
239 0 : .get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
240 0 : .await?;
241 : }
242 :
243 0 : img_layer
244 0 : .as_ref()
245 0 : .unwrap()
246 0 : .get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
247 0 : .await?;
248 :
249 0 : for (_key, result) in std::mem::take(&mut state.keys) {
250 0 : let state = result.collect_pending_ios().await?;
251 0 : if state.img.is_some() {
252 0 : eprintln!(
253 0 : "image: {}: {:x?}",
254 0 : state.img.as_ref().unwrap().0,
255 0 : STANDARD.encode(state.img.as_ref().unwrap().1.clone())
256 0 : );
257 0 : }
258 0 : for delta in state.records.iter() {
259 0 : match &delta.1 {
260 0 : NeonWalRecord::Postgres { will_init, rec } => {
261 0 : eprintln!(
262 0 : "delta: {}: will_init: {}, {:x?}",
263 0 : delta.0,
264 0 : will_init,
265 0 : STANDARD.encode(rec)
266 0 : );
267 0 : }
268 0 : _ => {
269 0 : eprintln!("delta: {}: {:x?}", delta.0, delta.1);
270 0 : }
271 : }
272 : }
273 :
274 0 : let result = timeline
275 0 : .reconstruct_value(key, lsn_range.end, state, RedoAttemptType::ReadPage)
276 0 : .instrument(span.clone())
277 0 : .await?;
278 0 : eprintln!("final image: {lsn} : {result:?}");
279 : }
280 :
281 0 : Ok(())
282 0 : }
283 :
284 : /// Redo all WALs against the base image in the input file. Return the base64 encoded final image.
285 : /// Each line in the input file must be in the form "<lsn>,<base64>" where:
286 : /// * `<lsn>` is a PostgreSQL LSN in hexadecimal notation, e.g. `0/16ABCDE`.
287 : /// * `<base64>` is the base64‐encoded page image (first line) or WAL record (subsequent lines).
288 : ///
289 : /// The first line provides the base image of a page. The LSN is the LSN of "next record" following
290 : /// the record containing the FPI. For example, if the FPI was extracted from a WAL record occuping
291 : /// [0/1, 0/200) in the WAL stream, the LSN appearing along side the page image here should be 0/200.
292 : ///
293 : /// The subsequent lines are WAL records, ordered from the oldest to the newest. The LSN is the
294 : /// record LSN of the WAL record, not the "next record" LSN. For example, if the WAL record here
295 : /// occupies [0/1, 0/200) in the WAL stream, the LSN appearing along side the WAL record here should
296 : /// be 0/1.
297 : #[derive(Parser)]
298 : struct RedoWalsCmd {
299 : #[clap(long)]
300 : input: String,
301 : #[clap(long)]
302 : key: String,
303 : }
304 :
305 : #[tokio::test]
306 1 : async fn test_redo_wals() -> anyhow::Result<()> {
307 1 : let args = std::env::args().collect_vec();
308 1 : let pos = args
309 1 : .iter()
310 4 : .position(|arg| arg == "--")
311 1 : .unwrap_or(args.len());
312 1 : let slice = &args[pos..args.len()];
313 1 : let cmd = match RedoWalsCmd::try_parse_from(slice) {
314 0 : Ok(cmd) => cmd,
315 1 : Err(err) => {
316 1 : eprintln!("{err}");
317 1 : return Ok(());
318 : }
319 : };
320 :
321 0 : let key = Key::from_hex(&cmd.key).unwrap();
322 0 : redo_wals(&cmd.input, key).await?;
323 :
324 1 : Ok(())
325 1 : }
326 :
327 : /// Search for a page at the given LSN in all layers of the data_dir.
328 : /// Return the base64-encoded image and all WAL records, as well as the final reconstructed image.
329 : #[derive(Parser)]
330 : struct SearchKeyCmd {
331 : #[clap(long)]
332 : tenant_id: String,
333 : #[clap(long)]
334 : timeline_id: String,
335 : #[clap(long)]
336 : data_dir: String,
337 : #[clap(long)]
338 : key: String,
339 : #[clap(long)]
340 : lsn: String,
341 : }
342 :
343 : #[tokio::test]
344 1 : async fn test_search_key() -> anyhow::Result<()> {
345 1 : let args = std::env::args().collect_vec();
346 1 : let pos = args
347 1 : .iter()
348 4 : .position(|arg| arg == "--")
349 1 : .unwrap_or(args.len());
350 1 : let slice = &args[pos..args.len()];
351 1 : let cmd = match SearchKeyCmd::try_parse_from(slice) {
352 0 : Ok(cmd) => cmd,
353 1 : Err(err) => {
354 1 : eprintln!("{err}");
355 1 : return Ok(());
356 : }
357 : };
358 :
359 0 : let tenant_id = TenantId::from_str(&cmd.tenant_id).unwrap();
360 0 : let timeline_id = TimelineId::from_str(&cmd.timeline_id).unwrap();
361 0 : let key = Key::from_hex(&cmd.key).unwrap();
362 0 : let lsn = Lsn::from_str(&cmd.lsn).unwrap();
363 0 : search_key(tenant_id, timeline_id, cmd.data_dir, key, lsn).await?;
364 :
365 1 : Ok(())
366 1 : }
|