Line data Source code
1 : mod draw;
2 :
3 : use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
4 :
5 : use futures::StreamExt;
6 : use pageserver_api::shard::ShardIdentity;
7 : use rand::Rng;
8 : use tracing::info;
9 :
10 : use utils::lsn::Lsn;
11 :
12 : use std::fmt::Write;
13 : use std::ops::Range;
14 : use std::sync::Arc;
15 : use std::sync::Mutex;
16 :
17 : use crate::helpers::{merge_delta_keys, overlaps_with};
18 :
19 : use crate::interface;
20 : use crate::interface::CompactionLayer;
21 :
22 : //
23 : // Implementation for the CompactionExecutor interface
24 : //
25 : pub struct MockTimeline {
26 : // Parameters for the compaction algorithm
27 : pub target_file_size: u64,
28 : tiers_per_level: u64,
29 :
30 : num_l0_flushes: u64,
31 : last_compact_at_flush: u64,
32 : last_flush_lsn: Lsn,
33 :
34 : // In-memory layer
35 : records: Vec<MockRecord>,
36 : total_len: u64,
37 : start_lsn: Lsn,
38 : end_lsn: Lsn,
39 :
40 : // Current keyspace at `end_lsn`. This is updated on every ingested record.
41 : keyspace: KeySpace,
42 :
43 : // historic keyspaces
44 : old_keyspaces: Vec<(Lsn, KeySpace)>,
45 :
46 : // "on-disk" layers
47 : pub live_layers: Vec<MockLayer>,
48 :
49 : num_deleted_layers: u64,
50 :
51 : // Statistics
52 : wal_ingested: u64,
53 : bytes_written: u64,
54 : bytes_deleted: u64,
55 : layers_created: u64,
56 : layers_deleted: u64,
57 :
58 : // All the events - creation and deletion of files - are collected
59 : // in 'history'. It is used to draw the SVG animation at the end.
60 : time: u64,
61 : history: Vec<draw::LayerTraceEvent>,
62 : }
63 :
64 : type KeySpace = interface::CompactionKeySpace<Key>;
65 :
66 : pub struct MockRequestContext {}
67 : impl interface::CompactionRequestContext for MockRequestContext {}
68 :
69 : pub type Key = u64;
70 :
71 : impl interface::CompactionKey for Key {
72 : const MIN: Self = u64::MIN;
73 : const MAX: Self = u64::MAX;
74 :
75 2 : fn key_range_size(key_range: &Range<Self>, _shard_identity: &ShardIdentity) -> u32 {
76 2 : std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
77 2 : }
78 :
79 78 : fn next(&self) -> Self {
80 78 : self + 1
81 78 : }
82 0 : fn skip_some(&self) -> Self {
83 0 : // round up to next xx
84 0 : self + 100
85 0 : }
86 : }
87 :
88 : #[derive(Clone)]
89 : pub struct MockRecord {
90 : lsn: Lsn,
91 : key: Key,
92 : len: u64,
93 : }
94 :
95 : impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
96 50151571 : fn key(&self) -> Key {
97 50151571 : self.key
98 50151571 : }
99 50008949 : fn lsn(&self) -> Lsn {
100 50008949 : self.lsn
101 50008949 : }
102 78078 : fn size(&self) -> u64 {
103 78078 : self.len
104 78078 : }
105 : }
106 :
107 : pub struct MockDeltaLayer {
108 : pub key_range: Range<Key>,
109 : pub lsn_range: Range<Lsn>,
110 :
111 : pub file_size: u64,
112 :
113 : pub deleted: Mutex<bool>,
114 :
115 : pub records: Vec<MockRecord>,
116 : }
117 :
118 : impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
119 67819 : fn key_range(&self) -> &Range<Key> {
120 67819 : &self.key_range
121 67819 : }
122 66067 : fn lsn_range(&self) -> &Range<Lsn> {
123 66067 : &self.lsn_range
124 66067 : }
125 :
126 312 : fn file_size(&self) -> u64 {
127 312 : self.file_size
128 312 : }
129 :
130 626 : fn short_id(&self) -> String {
131 626 : format!(
132 626 : "{:016X}-{:016X}__{:08X}-{:08X}",
133 626 : self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
134 626 : )
135 626 : }
136 :
137 0 : fn is_delta(&self) -> bool {
138 0 : true
139 0 : }
140 : }
141 :
142 : impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
143 : type DeltaEntry<'a> = MockRecord;
144 :
145 3120 : async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
146 3120 : Ok(self.records.clone())
147 3120 : }
148 : }
149 :
150 : pub struct MockImageLayer {
151 : pub key_range: Range<Key>,
152 : pub lsn_range: Range<Lsn>,
153 :
154 : pub file_size: u64,
155 :
156 : pub deleted: Mutex<bool>,
157 : }
158 :
159 : impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
160 :
161 : impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
162 4 : fn key_range(&self) -> &Range<Key> {
163 4 : &self.key_range
164 4 : }
165 14 : fn lsn_range(&self) -> &Range<Lsn> {
166 14 : &self.lsn_range
167 14 : }
168 :
169 0 : fn file_size(&self) -> u64 {
170 0 : self.file_size
171 0 : }
172 :
173 0 : fn short_id(&self) -> String {
174 0 : format!(
175 0 : "{:016X}-{:016X}__{:08X}",
176 0 : self.key_range.start, self.key_range.end, self.lsn_range.start.0,
177 0 : )
178 0 : }
179 :
180 0 : fn is_delta(&self) -> bool {
181 0 : false
182 0 : }
183 : }
184 :
185 : impl MockTimeline {
186 2 : pub fn new() -> Self {
187 2 : MockTimeline {
188 2 : target_file_size: 256 * 1024 * 1024,
189 2 : tiers_per_level: 4,
190 2 :
191 2 : num_l0_flushes: 0,
192 2 : last_compact_at_flush: 0,
193 2 : last_flush_lsn: Lsn(0),
194 2 :
195 2 : records: Vec::new(),
196 2 : total_len: 0,
197 2 : start_lsn: Lsn(1000),
198 2 : end_lsn: Lsn(1000),
199 2 : keyspace: KeySpace::new(),
200 2 :
201 2 : old_keyspaces: vec![],
202 2 :
203 2 : live_layers: vec![],
204 2 :
205 2 : num_deleted_layers: 0,
206 2 :
207 2 : wal_ingested: 0,
208 2 : bytes_written: 0,
209 2 : bytes_deleted: 0,
210 2 : layers_created: 0,
211 2 : layers_deleted: 0,
212 2 :
213 2 : time: 0,
214 2 : history: Vec::new(),
215 2 : }
216 2 : }
217 :
218 2 : pub async fn compact(&mut self) -> anyhow::Result<()> {
219 2 : let ctx = MockRequestContext {};
220 2 :
221 2 : crate::compact_tiered::compact_tiered(
222 2 : self,
223 2 : self.last_flush_lsn,
224 2 : self.target_file_size,
225 2 : self.tiers_per_level,
226 2 : &ctx,
227 2 : )
228 0 : .await?;
229 :
230 2 : Ok(())
231 2 : }
232 :
233 : // Ingest one record to the timeline
234 79800 : pub fn ingest_record(&mut self, key: Key, len: u64) {
235 79800 : self.records.push(MockRecord {
236 79800 : lsn: self.end_lsn,
237 79800 : key,
238 79800 : len,
239 79800 : });
240 79800 : self.total_len += len;
241 79800 : self.end_lsn += len;
242 79800 :
243 79800 : if self.total_len > self.target_file_size {
244 78 : self.flush_l0();
245 79722 : }
246 79800 : }
247 :
248 0 : pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
249 0 : if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
250 0 : self.compact().await?;
251 0 : self.last_compact_at_flush = self.num_l0_flushes;
252 0 : }
253 0 : Ok(())
254 0 : }
255 :
256 78 : pub fn flush_l0(&mut self) {
257 78 : if self.records.is_empty() {
258 0 : return;
259 78 : }
260 78 :
261 78 : let mut records = std::mem::take(&mut self.records);
262 1520026 : records.sort_by_key(|rec| rec.key);
263 78 :
264 78 : let lsn_range = self.start_lsn..self.end_lsn;
265 78 : let new_layer = Arc::new(MockDeltaLayer {
266 78 : key_range: Key::MIN..Key::MAX,
267 78 : lsn_range: lsn_range.clone(),
268 78 : file_size: self.total_len,
269 78 : records,
270 78 : deleted: Mutex::new(false),
271 78 : });
272 78 : info!("flushed L0 layer {}", new_layer.short_id());
273 78 : self.live_layers.push(MockLayer::from(&new_layer));
274 78 :
275 78 : // reset L0
276 78 : self.start_lsn = self.end_lsn;
277 78 : self.total_len = 0;
278 78 : self.records = Vec::new();
279 78 :
280 78 : self.layers_created += 1;
281 78 : self.bytes_written += new_layer.file_size;
282 78 :
283 78 : self.time += 1;
284 78 : self.history.push(LayerTraceEvent {
285 78 : time_rel: self.time,
286 78 : op: LayerTraceOp::Flush,
287 78 : file: LayerTraceFile {
288 78 : filename: new_layer.short_id(),
289 78 : key_range: new_layer.key_range.clone(),
290 78 : lsn_range: new_layer.lsn_range.clone(),
291 78 : },
292 78 : });
293 78 :
294 78 : self.num_l0_flushes += 1;
295 78 : self.last_flush_lsn = self.end_lsn;
296 78 : }
297 :
298 : // Ingest `num_records' records to the timeline, with random keys
299 : // uniformly distributed in `key_range`
300 798 : pub fn ingest_uniform(
301 798 : &mut self,
302 798 : num_records: u64,
303 798 : len: u64,
304 798 : key_range: &Range<Key>,
305 798 : ) -> anyhow::Result<()> {
306 798 : crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
307 798 : let mut rng = rand::thread_rng();
308 79800 : for _ in 0..num_records {
309 79800 : self.ingest_record(rng.gen_range(key_range.clone()), len);
310 79800 : self.wal_ingested += len;
311 79800 : }
312 798 : Ok(())
313 798 : }
314 :
315 0 : pub fn stats(&self) -> anyhow::Result<String> {
316 0 : let mut s = String::new();
317 0 :
318 0 : writeln!(s, "STATISTICS:")?;
319 0 : writeln!(
320 0 : s,
321 0 : "WAL ingested: {:>10} MB",
322 0 : self.wal_ingested / (1024 * 1024)
323 0 : )?;
324 0 : writeln!(
325 0 : s,
326 0 : "size created: {:>10} MB",
327 0 : self.bytes_written / (1024 * 1024)
328 0 : )?;
329 0 : writeln!(
330 0 : s,
331 0 : "size deleted: {:>10} MB",
332 0 : self.bytes_deleted / (1024 * 1024)
333 0 : )?;
334 0 : writeln!(s, "files created: {:>10}", self.layers_created)?;
335 0 : writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
336 0 : writeln!(
337 0 : s,
338 0 : "write amp: {:>10.2}",
339 0 : self.bytes_written as f64 / self.wal_ingested as f64
340 0 : )?;
341 0 : writeln!(
342 0 : s,
343 0 : "storage amp: {:>10.2}",
344 0 : (self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
345 0 : )?;
346 :
347 0 : Ok(s)
348 0 : }
349 :
350 0 : pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
351 0 : draw::draw_history(&self.history, output)
352 0 : }
353 : }
354 :
355 : impl Default for MockTimeline {
356 0 : fn default() -> Self {
357 0 : Self::new()
358 0 : }
359 : }
360 :
361 : #[derive(Clone)]
362 : pub enum MockLayer {
363 : Delta(Arc<MockDeltaLayer>),
364 : Image(Arc<MockImageLayer>),
365 : }
366 :
367 : impl interface::CompactionLayer<Key> for MockLayer {
368 3512 : fn key_range(&self) -> &Range<Key> {
369 3512 : match self {
370 3508 : MockLayer::Delta(this) => this.key_range(),
371 4 : MockLayer::Image(this) => this.key_range(),
372 : }
373 3512 : }
374 1770 : fn lsn_range(&self) -> &Range<Lsn> {
375 1770 : match self {
376 1756 : MockLayer::Delta(this) => this.lsn_range(),
377 14 : MockLayer::Image(this) => this.lsn_range(),
378 : }
379 1770 : }
380 312 : fn file_size(&self) -> u64 {
381 312 : match self {
382 312 : MockLayer::Delta(this) => this.file_size(),
383 0 : MockLayer::Image(this) => this.file_size(),
384 : }
385 312 : }
386 314 : fn short_id(&self) -> String {
387 314 : match self {
388 314 : MockLayer::Delta(this) => this.short_id(),
389 0 : MockLayer::Image(this) => this.short_id(),
390 : }
391 314 : }
392 :
393 668 : fn is_delta(&self) -> bool {
394 668 : match self {
395 660 : MockLayer::Delta(_) => true,
396 8 : MockLayer::Image(_) => false,
397 : }
398 668 : }
399 : }
400 :
401 : impl MockLayer {
402 234 : fn is_deleted(&self) -> bool {
403 234 : let guard = match self {
404 234 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
405 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
406 : };
407 234 : *guard
408 234 : }
409 78 : fn mark_deleted(&self) {
410 78 : let mut deleted_guard = match self {
411 78 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
412 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
413 : };
414 78 : assert!(!*deleted_guard, "layer already deleted");
415 78 : *deleted_guard = true;
416 78 : }
417 : }
418 :
419 : impl From<&Arc<MockDeltaLayer>> for MockLayer {
420 78 : fn from(l: &Arc<MockDeltaLayer>) -> Self {
421 78 : MockLayer::Delta(l.clone())
422 78 : }
423 : }
424 :
425 : impl From<&Arc<MockImageLayer>> for MockLayer {
426 0 : fn from(l: &Arc<MockImageLayer>) -> Self {
427 0 : MockLayer::Image(l.clone())
428 0 : }
429 : }
430 :
431 : impl interface::CompactionJobExecutor for MockTimeline {
432 : type Key = Key;
433 : type Layer = MockLayer;
434 : type DeltaLayer = Arc<MockDeltaLayer>;
435 : type ImageLayer = Arc<MockImageLayer>;
436 : type RequestContext = MockRequestContext;
437 :
438 2 : fn get_shard_identity(&self) -> &ShardIdentity {
439 2 : static IDENTITY: ShardIdentity = ShardIdentity::unsharded();
440 2 : &IDENTITY
441 2 : }
442 :
443 4 : async fn get_layers(
444 4 : &mut self,
445 4 : key_range: &Range<Self::Key>,
446 4 : lsn_range: &Range<Lsn>,
447 4 : _ctx: &Self::RequestContext,
448 4 : ) -> anyhow::Result<Vec<Self::Layer>> {
449 4 : // Clear any deleted layers from our vec
450 234 : self.live_layers.retain(|l| !l.is_deleted());
451 4 :
452 4 : let layers: Vec<MockLayer> = self
453 4 : .live_layers
454 4 : .iter()
455 156 : .filter(|l| {
456 156 : overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
457 156 : })
458 4 : .cloned()
459 4 : .collect();
460 4 :
461 4 : Ok(layers)
462 4 : }
463 :
464 2 : async fn get_keyspace(
465 2 : &mut self,
466 2 : key_range: &Range<Self::Key>,
467 2 : _lsn: Lsn,
468 2 : _ctx: &Self::RequestContext,
469 2 : ) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
470 2 : // find it in the levels
471 2 : if self.old_keyspaces.is_empty() {
472 2 : Ok(crate::helpers::intersect_keyspace(
473 2 : &self.keyspace,
474 2 : key_range,
475 2 : ))
476 : } else {
477 : // not implemented
478 :
479 : // The mock implementation only allows requesting the
480 : // keyspace at the level's end LSN. That's all that the
481 : // current implementation needs.
482 0 : panic!("keyspace not available for requested lsn");
483 : }
484 2 : }
485 :
486 3120 : async fn downcast_delta_layer(
487 3120 : &self,
488 3120 : layer: &MockLayer,
489 3120 : ) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
490 3120 : Ok(match layer {
491 3120 : MockLayer::Delta(l) => Some(l.clone()),
492 0 : MockLayer::Image(_) => None,
493 : })
494 3120 : }
495 :
496 0 : async fn create_image(
497 0 : &mut self,
498 0 : lsn: Lsn,
499 0 : key_range: &Range<Key>,
500 0 : ctx: &MockRequestContext,
501 0 : ) -> anyhow::Result<()> {
502 0 : let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
503 :
504 0 : let mut accum_size: u64 = 0;
505 0 : for r in keyspace {
506 0 : accum_size += r.end - r.start;
507 0 : }
508 :
509 0 : let new_layer = Arc::new(MockImageLayer {
510 0 : key_range: key_range.clone(),
511 0 : lsn_range: lsn..lsn,
512 0 : file_size: accum_size * 8192,
513 0 : deleted: Mutex::new(false),
514 0 : });
515 0 : info!(
516 0 : "created image layer, size {}: {}",
517 0 : new_layer.file_size,
518 0 : new_layer.short_id()
519 : );
520 0 : self.live_layers.push(MockLayer::Image(new_layer.clone()));
521 0 :
522 0 : // update stats
523 0 : self.bytes_written += new_layer.file_size;
524 0 : self.layers_created += 1;
525 0 :
526 0 : self.time += 1;
527 0 : self.history.push(LayerTraceEvent {
528 0 : time_rel: self.time,
529 0 : op: LayerTraceOp::CreateImage,
530 0 : file: LayerTraceFile {
531 0 : filename: new_layer.short_id(),
532 0 : key_range: new_layer.key_range.clone(),
533 0 : lsn_range: new_layer.lsn_range.clone(),
534 0 : },
535 0 : });
536 0 :
537 0 : Ok(())
538 0 : }
539 :
540 78 : async fn create_delta(
541 78 : &mut self,
542 78 : lsn_range: &Range<Lsn>,
543 78 : key_range: &Range<Key>,
544 78 : input_layers: &[Arc<MockDeltaLayer>],
545 78 : ctx: &MockRequestContext,
546 78 : ) -> anyhow::Result<()> {
547 78 : let mut key_value_stream =
548 78 : std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
549 78 : let mut records: Vec<MockRecord> = Vec::new();
550 78 : let mut total_len = 2;
551 3045120 : while let Some(delta_entry) = key_value_stream.next().await {
552 3045042 : let delta_entry: MockRecord = delta_entry?;
553 3045042 : if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
554 78078 : total_len += delta_entry.len;
555 78078 : records.push(delta_entry);
556 2966964 : }
557 : }
558 78 : let total_records = records.len();
559 78 : let new_layer = Arc::new(MockDeltaLayer {
560 78 : key_range: key_range.clone(),
561 78 : lsn_range: lsn_range.clone(),
562 78 : file_size: total_len,
563 78 : records,
564 78 : deleted: Mutex::new(false),
565 78 : });
566 78 : info!(
567 0 : "created delta layer, recs {}, size {}: {}",
568 0 : total_records,
569 0 : total_len,
570 0 : new_layer.short_id()
571 : );
572 78 : self.live_layers.push(MockLayer::Delta(new_layer.clone()));
573 78 :
574 78 : // update stats
575 78 : self.bytes_written += total_len;
576 78 : self.layers_created += 1;
577 78 :
578 78 : self.time += 1;
579 78 : self.history.push(LayerTraceEvent {
580 78 : time_rel: self.time,
581 78 : op: LayerTraceOp::CreateDelta,
582 78 : file: LayerTraceFile {
583 78 : filename: new_layer.short_id(),
584 78 : key_range: new_layer.key_range.clone(),
585 78 : lsn_range: new_layer.lsn_range.clone(),
586 78 : },
587 78 : });
588 78 :
589 78 : Ok(())
590 78 : }
591 :
592 78 : async fn delete_layer(
593 78 : &mut self,
594 78 : layer: &Self::Layer,
595 78 : _ctx: &MockRequestContext,
596 78 : ) -> anyhow::Result<()> {
597 78 : let layer = std::pin::pin!(layer);
598 78 : info!("deleting layer: {}", layer.short_id());
599 78 : self.num_deleted_layers += 1;
600 78 : self.bytes_deleted += layer.file_size();
601 78 : layer.mark_deleted();
602 78 :
603 78 : self.time += 1;
604 78 : self.history.push(LayerTraceEvent {
605 78 : time_rel: self.time,
606 78 : op: LayerTraceOp::Delete,
607 78 : file: LayerTraceFile {
608 78 : filename: layer.short_id(),
609 78 : key_range: layer.key_range().clone(),
610 78 : lsn_range: layer.lsn_range().clone(),
611 78 : },
612 78 : });
613 78 :
614 78 : Ok(())
615 78 : }
616 : }
|