Line data Source code
1 : mod draw;
2 :
3 : use std::fmt::Write;
4 : use std::ops::Range;
5 : use std::sync::{Arc, Mutex};
6 :
7 : use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
8 : use futures::StreamExt;
9 : use pageserver_api::shard::ShardIdentity;
10 : use rand::Rng;
11 : use tracing::info;
12 : use utils::lsn::Lsn;
13 :
14 : use crate::helpers::{PAGE_SZ, merge_delta_keys, overlaps_with};
15 : use crate::interface;
16 : use crate::interface::CompactionLayer;
17 :
18 : //
19 : // Implementation for the CompactionExecutor interface
20 : //
21 : pub struct MockTimeline {
22 : // Parameters for the compaction algorithm
23 : pub target_file_size: u64,
24 : tiers_per_level: u64,
25 :
26 : num_l0_flushes: u64,
27 : last_compact_at_flush: u64,
28 : last_flush_lsn: Lsn,
29 :
30 : // In-memory layer
31 : records: Vec<MockRecord>,
32 : total_len: u64,
33 : start_lsn: Lsn,
34 : end_lsn: Lsn,
35 :
36 : // Current keyspace at `end_lsn`. This is updated on every ingested record.
37 : keyspace: KeySpace,
38 :
39 : // historic keyspaces
40 : old_keyspaces: Vec<(Lsn, KeySpace)>,
41 :
42 : // "on-disk" layers
43 : pub live_layers: Vec<MockLayer>,
44 :
45 : num_deleted_layers: u64,
46 :
47 : // Statistics
48 : wal_ingested: u64,
49 : bytes_written: u64,
50 : bytes_deleted: u64,
51 : layers_created: u64,
52 : layers_deleted: u64,
53 :
54 : // All the events - creation and deletion of files - are collected
55 : // in 'history'. It is used to draw the SVG animation at the end.
56 : time: u64,
57 : history: Vec<draw::LayerTraceEvent>,
58 : }
59 :
60 : type KeySpace = interface::CompactionKeySpace<Key>;
61 :
62 : pub struct MockRequestContext {}
63 : impl interface::CompactionRequestContext for MockRequestContext {}
64 :
65 : pub type Key = u64;
66 :
67 : impl interface::CompactionKey for Key {
68 : const MIN: Self = u64::MIN;
69 : const MAX: Self = u64::MAX;
70 :
71 3 : fn key_range_size(key_range: &Range<Self>, _shard_identity: &ShardIdentity) -> u32 {
72 3 : std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
73 3 : }
74 :
75 1494 : fn next(&self) -> Self {
76 1494 : self + 1
77 1494 : }
78 0 : fn skip_some(&self) -> Self {
79 0 : // round up to next xx
80 0 : self + 100
81 0 : }
82 : }
83 :
84 : #[derive(Clone)]
85 : pub struct MockRecord {
86 : lsn: Lsn,
87 : key: Key,
88 : len: u64,
89 : }
90 :
91 : impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
92 39822446 : fn key(&self) -> Key {
93 39822446 : self.key
94 39822446 : }
95 38889881 : fn lsn(&self) -> Lsn {
96 38889881 : self.lsn
97 38889881 : }
98 839047 : fn size(&self) -> u64 {
99 839047 : self.len
100 839047 : }
101 : }
102 :
103 : pub struct MockDeltaLayer {
104 : pub key_range: Range<Key>,
105 : pub lsn_range: Range<Lsn>,
106 :
107 : pub file_size: u64,
108 :
109 : pub deleted: Mutex<bool>,
110 :
111 : pub records: Vec<MockRecord>,
112 : }
113 :
114 : impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
115 7316387 : fn key_range(&self) -> &Range<Key> {
116 7316387 : &self.key_range
117 7316387 : }
118 7343806 : fn lsn_range(&self) -> &Range<Lsn> {
119 7343806 : &self.lsn_range
120 7343806 : }
121 :
122 0 : fn file_size(&self) -> u64 {
123 0 : self.file_size
124 0 : }
125 :
126 381 : fn short_id(&self) -> String {
127 381 : format!(
128 381 : "{:016X}-{:016X}__{:08X}-{:08X}",
129 381 : self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
130 381 : )
131 381 : }
132 :
133 0 : fn is_delta(&self) -> bool {
134 0 : true
135 0 : }
136 : }
137 :
138 : impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
139 : type DeltaEntry<'a> = MockRecord;
140 :
141 1608 : async fn load_keys(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
142 1608 : Ok(self.records.clone())
143 1608 : }
144 : }
145 :
146 : pub struct MockImageLayer {
147 : pub key_range: Range<Key>,
148 : pub lsn_range: Range<Lsn>,
149 :
150 : pub file_size: u64,
151 :
152 : pub deleted: Mutex<bool>,
153 : }
154 :
155 : impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
156 :
157 : impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
158 3 : fn key_range(&self) -> &Range<Key> {
159 3 : &self.key_range
160 3 : }
161 7 : fn lsn_range(&self) -> &Range<Lsn> {
162 7 : &self.lsn_range
163 7 : }
164 :
165 0 : fn file_size(&self) -> u64 {
166 0 : self.file_size
167 0 : }
168 :
169 0 : fn short_id(&self) -> String {
170 0 : format!(
171 0 : "{:016X}-{:016X}__{:08X}",
172 0 : self.key_range.start, self.key_range.end, self.lsn_range.start.0,
173 0 : )
174 0 : }
175 :
176 0 : fn is_delta(&self) -> bool {
177 0 : false
178 0 : }
179 : }
180 :
181 : impl MockTimeline {
182 2 : pub fn new() -> Self {
183 2 : MockTimeline {
184 2 : target_file_size: 256 * 1024 * 1024,
185 2 : tiers_per_level: 4,
186 2 :
187 2 : num_l0_flushes: 0,
188 2 : last_compact_at_flush: 0,
189 2 : last_flush_lsn: Lsn(0),
190 2 :
191 2 : records: Vec::new(),
192 2 : total_len: 0,
193 2 : start_lsn: Lsn(1000),
194 2 : end_lsn: Lsn(1000),
195 2 : keyspace: KeySpace::new(),
196 2 :
197 2 : old_keyspaces: vec![],
198 2 :
199 2 : live_layers: vec![],
200 2 :
201 2 : num_deleted_layers: 0,
202 2 :
203 2 : wal_ingested: 0,
204 2 : bytes_written: 0,
205 2 : bytes_deleted: 0,
206 2 : layers_created: 0,
207 2 : layers_deleted: 0,
208 2 :
209 2 : time: 0,
210 2 : history: Vec::new(),
211 2 : }
212 2 : }
213 :
214 1000 : pub async fn compact(&mut self) -> anyhow::Result<()> {
215 1000 : let ctx = MockRequestContext {};
216 1000 :
217 1000 : crate::compact_tiered::compact_tiered(
218 1000 : self,
219 1000 : self.last_flush_lsn,
220 1000 : self.target_file_size,
221 1000 : self.tiers_per_level,
222 1000 : &ctx,
223 1000 : )
224 1000 : .await?;
225 :
226 1000 : Ok(())
227 0 : }
228 :
229 : // Ingest one record to the timeline
230 1138800 : pub fn ingest_record(&mut self, key: Key, len: u64) {
231 1138800 : self.records.push(MockRecord {
232 1138800 : lsn: self.end_lsn,
233 1138800 : key,
234 1138800 : len,
235 1138800 : });
236 1138800 : self.total_len += len;
237 1138800 : self.end_lsn += len;
238 1138800 :
239 1138800 : if self.total_len > self.target_file_size {
240 49 : self.flush_l0();
241 1138751 : }
242 1138800 : }
243 :
244 0 : pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
245 0 : if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
246 0 : self.compact().await?;
247 0 : self.last_compact_at_flush = self.num_l0_flushes;
248 0 : }
249 0 : Ok(())
250 0 : }
251 :
252 49 : pub fn flush_l0(&mut self) {
253 49 : if self.records.is_empty() {
254 0 : return;
255 49 : }
256 49 :
257 49 : let mut records = std::mem::take(&mut self.records);
258 15921596 : records.sort_by_key(|rec| rec.key);
259 49 :
260 49 : let lsn_range = self.start_lsn..self.end_lsn;
261 49 : let new_layer = Arc::new(MockDeltaLayer {
262 49 : key_range: Key::MIN..Key::MAX,
263 49 : lsn_range: lsn_range.clone(),
264 49 : file_size: self.total_len,
265 49 : records,
266 49 : deleted: Mutex::new(false),
267 49 : });
268 49 : info!("flushed L0 layer {}", new_layer.short_id());
269 49 : self.live_layers.push(MockLayer::from(&new_layer));
270 49 :
271 49 : // reset L0
272 49 : self.start_lsn = self.end_lsn;
273 49 : self.total_len = 0;
274 49 : self.records = Vec::new();
275 49 :
276 49 : self.layers_created += 1;
277 49 : self.bytes_written += new_layer.file_size;
278 49 :
279 49 : self.time += 1;
280 49 : self.history.push(LayerTraceEvent {
281 49 : time_rel: self.time,
282 49 : op: LayerTraceOp::Flush,
283 49 : file: LayerTraceFile {
284 49 : filename: new_layer.short_id(),
285 49 : key_range: new_layer.key_range.clone(),
286 49 : lsn_range: new_layer.lsn_range.clone(),
287 49 : },
288 49 : });
289 49 :
290 49 : self.num_l0_flushes += 1;
291 49 : self.last_flush_lsn = self.end_lsn;
292 49 : }
293 :
294 : // Ingest `num_records' records to the timeline, with random keys
295 : // uniformly distributed in `key_range`
296 2397 : pub fn ingest_uniform(
297 2397 : &mut self,
298 2397 : num_records: u64,
299 2397 : len: u64,
300 2397 : key_range: &Range<Key>,
301 2397 : ) -> anyhow::Result<()> {
302 2397 : crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
303 2397 : let mut rng = rand::thread_rng();
304 1138800 : for _ in 0..num_records {
305 1138800 : self.ingest_record(rng.gen_range(key_range.clone()), len);
306 1138800 : self.wal_ingested += len;
307 1138800 : }
308 2397 : Ok(())
309 2397 : }
310 :
311 0 : pub fn stats(&self) -> anyhow::Result<String> {
312 0 : let mut s = String::new();
313 0 :
314 0 : writeln!(s, "STATISTICS:")?;
315 0 : writeln!(
316 0 : s,
317 0 : "WAL ingested: {:>10} MB",
318 0 : self.wal_ingested / (1024 * 1024)
319 0 : )?;
320 0 : writeln!(
321 0 : s,
322 0 : "size created: {:>10} MB",
323 0 : self.bytes_written / (1024 * 1024)
324 0 : )?;
325 0 : writeln!(
326 0 : s,
327 0 : "size deleted: {:>10} MB",
328 0 : self.bytes_deleted / (1024 * 1024)
329 0 : )?;
330 0 : writeln!(s, "files created: {:>10}", self.layers_created)?;
331 0 : writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
332 0 : writeln!(
333 0 : s,
334 0 : "write amp: {:>10.2}",
335 0 : self.bytes_written as f64 / self.wal_ingested as f64
336 0 : )?;
337 0 : writeln!(
338 0 : s,
339 0 : "storage amp: {:>10.2}",
340 0 : (self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
341 0 : )?;
342 :
343 0 : Ok(s)
344 0 : }
345 :
346 0 : pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
347 0 : draw::draw_history(&self.history, output)
348 0 : }
349 : }
350 :
351 : impl Default for MockTimeline {
352 0 : fn default() -> Self {
353 0 : Self::new()
354 0 : }
355 : }
356 :
357 : #[derive(Clone)]
358 : pub enum MockLayer {
359 : Delta(Arc<MockDeltaLayer>),
360 : Image(Arc<MockImageLayer>),
361 : }
362 :
363 : impl interface::CompactionLayer<Key> for MockLayer {
364 11913 : fn key_range(&self) -> &Range<Key> {
365 11913 : match self {
366 11910 : MockLayer::Delta(this) => this.key_range(),
367 3 : MockLayer::Image(this) => this.key_range(),
368 : }
369 11913 : }
370 39336 : fn lsn_range(&self) -> &Range<Lsn> {
371 39336 : match self {
372 39329 : MockLayer::Delta(this) => this.lsn_range(),
373 7 : MockLayer::Image(this) => this.lsn_range(),
374 : }
375 39336 : }
376 208 : fn file_size(&self) -> u64 {
377 208 : match self {
378 208 : MockLayer::Delta(this) => this.file_size,
379 0 : MockLayer::Image(this) => this.file_size,
380 : }
381 208 : }
382 185 : fn short_id(&self) -> String {
383 185 : match self {
384 185 : MockLayer::Delta(this) => this.short_id(),
385 0 : MockLayer::Image(this) => this.short_id(),
386 : }
387 185 : }
388 :
389 11051 : fn is_delta(&self) -> bool {
390 11051 : match self {
391 11047 : MockLayer::Delta(_) => true,
392 4 : MockLayer::Image(_) => false,
393 : }
394 11051 : }
395 : }
396 :
397 : impl MockLayer {
398 6041 : fn is_deleted(&self) -> bool {
399 6041 : let guard = match self {
400 6041 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
401 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
402 : };
403 6041 : *guard
404 6041 : }
405 47 : fn mark_deleted(&self) {
406 47 : let mut deleted_guard = match self {
407 47 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
408 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
409 : };
410 47 : assert!(!*deleted_guard, "layer already deleted");
411 47 : *deleted_guard = true;
412 47 : }
413 : }
414 :
415 : impl From<&Arc<MockDeltaLayer>> for MockLayer {
416 49 : fn from(l: &Arc<MockDeltaLayer>) -> Self {
417 49 : MockLayer::Delta(l.clone())
418 49 : }
419 : }
420 :
421 : impl From<&Arc<MockImageLayer>> for MockLayer {
422 0 : fn from(l: &Arc<MockImageLayer>) -> Self {
423 0 : MockLayer::Image(l.clone())
424 0 : }
425 : }
426 :
427 : impl interface::CompactionJobExecutor for MockTimeline {
428 : type Key = Key;
429 : type Layer = MockLayer;
430 : type DeltaLayer = Arc<MockDeltaLayer>;
431 : type ImageLayer = Arc<MockImageLayer>;
432 : type RequestContext = MockRequestContext;
433 :
434 3 : fn get_shard_identity(&self) -> &ShardIdentity {
435 : static IDENTITY: ShardIdentity = ShardIdentity::unsharded();
436 3 : &IDENTITY
437 3 : }
438 :
439 1003 : async fn get_layers(
440 1003 : &mut self,
441 1003 : key_range: &Range<Self::Key>,
442 1003 : lsn_range: &Range<Lsn>,
443 1003 : _ctx: &Self::RequestContext,
444 1003 : ) -> anyhow::Result<Vec<Self::Layer>> {
445 1003 : // Clear any deleted layers from our vec
446 6041 : self.live_layers.retain(|l| !l.is_deleted());
447 1003 :
448 1003 : let layers: Vec<MockLayer> = self
449 1003 : .live_layers
450 1003 : .iter()
451 5994 : .filter(|l| {
452 5994 : overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
453 1003 : })
454 1003 : .cloned()
455 1003 : .collect();
456 1003 :
457 1003 : Ok(layers)
458 1003 : }
459 :
460 3 : async fn get_keyspace(
461 3 : &mut self,
462 3 : key_range: &Range<Self::Key>,
463 3 : _lsn: Lsn,
464 3 : _ctx: &Self::RequestContext,
465 3 : ) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
466 3 : // find it in the levels
467 3 : if self.old_keyspaces.is_empty() {
468 3 : Ok(crate::helpers::intersect_keyspace(
469 3 : &self.keyspace,
470 3 : key_range,
471 3 : ))
472 : } else {
473 : // not implemented
474 :
475 : // The mock implementation only allows requesting the
476 : // keyspace at the level's end LSN. That's all that the
477 : // current implementation needs.
478 0 : panic!("keyspace not available for requested lsn");
479 : }
480 0 : }
481 :
482 1608 : async fn downcast_delta_layer(
483 1608 : &self,
484 1608 : layer: &MockLayer,
485 1608 : _ctx: &MockRequestContext,
486 1608 : ) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
487 1608 : Ok(match layer {
488 1608 : MockLayer::Delta(l) => Some(l.clone()),
489 0 : MockLayer::Image(_) => None,
490 : })
491 0 : }
492 :
493 0 : async fn create_image(
494 0 : &mut self,
495 0 : lsn: Lsn,
496 0 : key_range: &Range<Key>,
497 0 : ctx: &MockRequestContext,
498 0 : ) -> anyhow::Result<()> {
499 0 : let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
500 :
501 0 : let mut accum_size: u64 = 0;
502 0 : for r in keyspace {
503 0 : accum_size += r.end - r.start;
504 0 : }
505 :
506 0 : let new_layer = Arc::new(MockImageLayer {
507 0 : key_range: key_range.clone(),
508 0 : lsn_range: lsn..lsn,
509 0 : file_size: accum_size * PAGE_SZ,
510 0 : deleted: Mutex::new(false),
511 0 : });
512 0 : info!(
513 0 : "created image layer, size {}: {}",
514 0 : new_layer.file_size,
515 0 : new_layer.short_id()
516 : );
517 0 : self.live_layers.push(MockLayer::Image(new_layer.clone()));
518 0 :
519 0 : // update stats
520 0 : self.bytes_written += new_layer.file_size;
521 0 : self.layers_created += 1;
522 0 :
523 0 : self.time += 1;
524 0 : self.history.push(LayerTraceEvent {
525 0 : time_rel: self.time,
526 0 : op: LayerTraceOp::CreateImage,
527 0 : file: LayerTraceFile {
528 0 : filename: new_layer.short_id(),
529 0 : key_range: new_layer.key_range.clone(),
530 0 : lsn_range: new_layer.lsn_range.clone(),
531 0 : },
532 0 : });
533 0 :
534 0 : Ok(())
535 0 : }
536 :
537 49 : async fn create_delta(
538 49 : &mut self,
539 49 : lsn_range: &Range<Lsn>,
540 49 : key_range: &Range<Key>,
541 49 : input_layers: &[Arc<MockDeltaLayer>],
542 49 : ctx: &MockRequestContext,
543 49 : ) -> anyhow::Result<()> {
544 49 : let mut key_value_stream =
545 49 : std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
546 49 : let mut records: Vec<MockRecord> = Vec::new();
547 49 : let mut total_len = 2;
548 5522610 : while let Some(delta_entry) = key_value_stream.next().await {
549 5522561 : let delta_entry: MockRecord = delta_entry?;
550 5522561 : if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
551 839047 : total_len += delta_entry.len;
552 839047 : records.push(delta_entry);
553 839047 : }
554 : }
555 49 : let total_records = records.len();
556 49 : let new_layer = Arc::new(MockDeltaLayer {
557 49 : key_range: key_range.clone(),
558 49 : lsn_range: lsn_range.clone(),
559 49 : file_size: total_len,
560 49 : records,
561 49 : deleted: Mutex::new(false),
562 49 : });
563 49 : info!(
564 0 : "created delta layer, recs {}, size {}: {}",
565 0 : total_records,
566 0 : total_len,
567 0 : new_layer.short_id()
568 : );
569 49 : self.live_layers.push(MockLayer::Delta(new_layer.clone()));
570 49 :
571 49 : // update stats
572 49 : self.bytes_written += total_len;
573 49 : self.layers_created += 1;
574 49 :
575 49 : self.time += 1;
576 49 : self.history.push(LayerTraceEvent {
577 49 : time_rel: self.time,
578 49 : op: LayerTraceOp::CreateDelta,
579 49 : file: LayerTraceFile {
580 49 : filename: new_layer.short_id(),
581 49 : key_range: new_layer.key_range.clone(),
582 49 : lsn_range: new_layer.lsn_range.clone(),
583 49 : },
584 49 : });
585 49 :
586 49 : Ok(())
587 0 : }
588 :
589 47 : async fn delete_layer(
590 47 : &mut self,
591 47 : layer: &Self::Layer,
592 47 : _ctx: &MockRequestContext,
593 47 : ) -> anyhow::Result<()> {
594 47 : let layer = std::pin::pin!(layer);
595 47 : info!("deleting layer: {}", layer.short_id());
596 47 : self.num_deleted_layers += 1;
597 47 : self.bytes_deleted += layer.file_size();
598 47 : layer.mark_deleted();
599 47 :
600 47 : self.time += 1;
601 47 : self.history.push(LayerTraceEvent {
602 47 : time_rel: self.time,
603 47 : op: LayerTraceOp::Delete,
604 47 : file: LayerTraceFile {
605 47 : filename: layer.short_id(),
606 47 : key_range: layer.key_range().clone(),
607 47 : lsn_range: layer.lsn_range().clone(),
608 47 : },
609 47 : });
610 47 :
611 47 : Ok(())
612 47 : }
613 : }
|