Line data Source code
1 : mod draw;
2 :
3 : use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
4 :
5 : use futures::StreamExt;
6 : use rand::Rng;
7 : use tracing::info;
8 :
9 : use utils::lsn::Lsn;
10 :
11 : use std::fmt::Write;
12 : use std::ops::Range;
13 : use std::sync::Arc;
14 : use std::sync::Mutex;
15 :
16 : use crate::helpers::{merge_delta_keys, overlaps_with};
17 :
18 : use crate::interface;
19 : use crate::interface::CompactionLayer;
20 :
21 : //
22 : // Implementation for the CompactionExecutor interface
23 : //
24 : pub struct MockTimeline {
25 : // Parameters for the compaction algorithm
26 : pub target_file_size: u64,
27 : tiers_per_level: u64,
28 :
29 : num_l0_flushes: u64,
30 : last_compact_at_flush: u64,
31 : last_flush_lsn: Lsn,
32 :
33 : // In-memory layer
34 : records: Vec<MockRecord>,
35 : total_len: u64,
36 : start_lsn: Lsn,
37 : end_lsn: Lsn,
38 :
39 : // Current keyspace at `end_lsn`. This is updated on every ingested record.
40 : keyspace: KeySpace,
41 :
42 : // historic keyspaces
43 : old_keyspaces: Vec<(Lsn, KeySpace)>,
44 :
45 : // "on-disk" layers
46 : pub live_layers: Vec<MockLayer>,
47 :
48 : num_deleted_layers: u64,
49 :
50 : // Statistics
51 : wal_ingested: u64,
52 : bytes_written: u64,
53 : bytes_deleted: u64,
54 : layers_created: u64,
55 : layers_deleted: u64,
56 :
57 : // All the events - creation and deletion of files - are collected
58 : // in 'history'. It is used to draw the SVG animation at the end.
59 : time: u64,
60 : history: Vec<draw::LayerTraceEvent>,
61 : }
62 :
63 : type KeySpace = interface::CompactionKeySpace<Key>;
64 :
65 : pub struct MockRequestContext {}
66 : impl interface::CompactionRequestContext for MockRequestContext {}
67 :
68 : pub type Key = u64;
69 :
70 : impl interface::CompactionKey for Key {
71 : const MIN: Self = u64::MIN;
72 : const MAX: Self = u64::MAX;
73 :
74 0 : fn key_range_size(key_range: &Range<Self>) -> u32 {
75 0 : std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
76 0 : }
77 :
78 0 : fn next(&self) -> Self {
79 0 : self + 1
80 0 : }
81 0 : fn skip_some(&self) -> Self {
82 0 : // round up to next xx
83 0 : self + 100
84 0 : }
85 : }
86 :
87 : #[derive(Clone)]
88 : pub struct MockRecord {
89 : lsn: Lsn,
90 : key: Key,
91 : len: u64,
92 : }
93 :
94 : impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
95 0 : fn key(&self) -> Key {
96 0 : self.key
97 0 : }
98 0 : fn lsn(&self) -> Lsn {
99 0 : self.lsn
100 0 : }
101 0 : fn size(&self) -> u64 {
102 0 : self.len
103 0 : }
104 : }
105 :
106 : pub struct MockDeltaLayer {
107 : pub key_range: Range<Key>,
108 : pub lsn_range: Range<Lsn>,
109 :
110 : pub file_size: u64,
111 :
112 : pub deleted: Mutex<bool>,
113 :
114 : pub records: Vec<MockRecord>,
115 : }
116 :
117 : impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
118 76 : fn key_range(&self) -> &Range<Key> {
119 76 : &self.key_range
120 76 : }
121 514 : fn lsn_range(&self) -> &Range<Lsn> {
122 514 : &self.lsn_range
123 514 : }
124 :
125 0 : fn file_size(&self) -> u64 {
126 0 : self.file_size
127 0 : }
128 :
129 2 : fn short_id(&self) -> String {
130 2 : format!(
131 2 : "{:016X}-{:016X}__{:08X}-{:08X}",
132 2 : self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
133 2 : )
134 2 : }
135 :
136 0 : fn is_delta(&self) -> bool {
137 0 : true
138 0 : }
139 : }
140 :
141 : impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
142 : type DeltaEntry<'a> = MockRecord;
143 :
144 0 : async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
145 0 : Ok(self.records.clone())
146 0 : }
147 : }
148 :
149 : pub struct MockImageLayer {
150 : pub key_range: Range<Key>,
151 : pub lsn_range: Range<Lsn>,
152 :
153 : pub file_size: u64,
154 :
155 : pub deleted: Mutex<bool>,
156 : }
157 :
158 : impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
159 :
160 : impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
161 4 : fn key_range(&self) -> &Range<Key> {
162 4 : &self.key_range
163 4 : }
164 14 : fn lsn_range(&self) -> &Range<Lsn> {
165 14 : &self.lsn_range
166 14 : }
167 :
168 0 : fn file_size(&self) -> u64 {
169 0 : self.file_size
170 0 : }
171 :
172 0 : fn short_id(&self) -> String {
173 0 : format!(
174 0 : "{:016X}-{:016X}__{:08X}",
175 0 : self.key_range.start, self.key_range.end, self.lsn_range.start.0,
176 0 : )
177 0 : }
178 :
179 0 : fn is_delta(&self) -> bool {
180 0 : false
181 0 : }
182 : }
183 :
184 : impl MockTimeline {
185 0 : pub fn new() -> Self {
186 0 : MockTimeline {
187 0 : target_file_size: 256 * 1024 * 1024,
188 0 : tiers_per_level: 4,
189 0 :
190 0 : num_l0_flushes: 0,
191 0 : last_compact_at_flush: 0,
192 0 : last_flush_lsn: Lsn(0),
193 0 :
194 0 : records: Vec::new(),
195 0 : total_len: 0,
196 0 : start_lsn: Lsn(1000),
197 0 : end_lsn: Lsn(1000),
198 0 : keyspace: KeySpace::new(),
199 0 :
200 0 : old_keyspaces: vec![],
201 0 :
202 0 : live_layers: vec![],
203 0 :
204 0 : num_deleted_layers: 0,
205 0 :
206 0 : wal_ingested: 0,
207 0 : bytes_written: 0,
208 0 : bytes_deleted: 0,
209 0 : layers_created: 0,
210 0 : layers_deleted: 0,
211 0 :
212 0 : time: 0,
213 0 : history: Vec::new(),
214 0 : }
215 0 : }
216 :
217 0 : pub async fn compact(&mut self) -> anyhow::Result<()> {
218 0 : let ctx = MockRequestContext {};
219 0 :
220 0 : crate::compact_tiered::compact_tiered(
221 0 : self,
222 0 : self.last_flush_lsn,
223 0 : self.target_file_size,
224 0 : self.tiers_per_level,
225 0 : &ctx,
226 0 : )
227 0 : .await?;
228 :
229 0 : Ok(())
230 0 : }
231 :
232 : // Ingest one record to the timeline
233 0 : pub fn ingest_record(&mut self, key: Key, len: u64) {
234 0 : self.records.push(MockRecord {
235 0 : lsn: self.end_lsn,
236 0 : key,
237 0 : len,
238 0 : });
239 0 : self.total_len += len;
240 0 : self.end_lsn += len;
241 0 :
242 0 : if self.total_len > self.target_file_size {
243 0 : self.flush_l0();
244 0 : }
245 0 : }
246 :
247 0 : pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
248 0 : if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
249 0 : self.compact().await?;
250 0 : self.last_compact_at_flush = self.num_l0_flushes;
251 0 : }
252 0 : Ok(())
253 0 : }
254 :
255 0 : pub fn flush_l0(&mut self) {
256 0 : if self.records.is_empty() {
257 0 : return;
258 0 : }
259 0 :
260 0 : let mut records = std::mem::take(&mut self.records);
261 0 : records.sort_by_key(|rec| rec.key);
262 0 :
263 0 : let lsn_range = self.start_lsn..self.end_lsn;
264 0 : let new_layer = Arc::new(MockDeltaLayer {
265 0 : key_range: Key::MIN..Key::MAX,
266 0 : lsn_range: lsn_range.clone(),
267 0 : file_size: self.total_len,
268 0 : records,
269 0 : deleted: Mutex::new(false),
270 0 : });
271 0 : info!("flushed L0 layer {}", new_layer.short_id());
272 0 : self.live_layers.push(MockLayer::from(&new_layer));
273 0 :
274 0 : // reset L0
275 0 : self.start_lsn = self.end_lsn;
276 0 : self.total_len = 0;
277 0 : self.records = Vec::new();
278 0 :
279 0 : self.layers_created += 1;
280 0 : self.bytes_written += new_layer.file_size;
281 0 :
282 0 : self.time += 1;
283 0 : self.history.push(LayerTraceEvent {
284 0 : time_rel: self.time,
285 0 : op: LayerTraceOp::Flush,
286 0 : file: LayerTraceFile {
287 0 : filename: new_layer.short_id(),
288 0 : key_range: new_layer.key_range.clone(),
289 0 : lsn_range: new_layer.lsn_range.clone(),
290 0 : },
291 0 : });
292 0 :
293 0 : self.num_l0_flushes += 1;
294 0 : self.last_flush_lsn = self.end_lsn;
295 0 : }
296 :
297 : // Ingest `num_records' records to the timeline, with random keys
298 : // uniformly distributed in `key_range`
299 0 : pub fn ingest_uniform(
300 0 : &mut self,
301 0 : num_records: u64,
302 0 : len: u64,
303 0 : key_range: &Range<Key>,
304 0 : ) -> anyhow::Result<()> {
305 0 : crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
306 0 : let mut rng = rand::thread_rng();
307 0 : for _ in 0..num_records {
308 0 : self.ingest_record(rng.gen_range(key_range.clone()), len);
309 0 : self.wal_ingested += len;
310 0 : }
311 0 : Ok(())
312 0 : }
313 :
314 0 : pub fn stats(&self) -> anyhow::Result<String> {
315 0 : let mut s = String::new();
316 0 :
317 0 : writeln!(s, "STATISTICS:")?;
318 0 : writeln!(
319 0 : s,
320 0 : "WAL ingested: {:>10} MB",
321 0 : self.wal_ingested / (1024 * 1024)
322 0 : )?;
323 0 : writeln!(
324 0 : s,
325 0 : "size created: {:>10} MB",
326 0 : self.bytes_written / (1024 * 1024)
327 0 : )?;
328 0 : writeln!(
329 0 : s,
330 0 : "size deleted: {:>10} MB",
331 0 : self.bytes_deleted / (1024 * 1024)
332 0 : )?;
333 0 : writeln!(s, "files created: {:>10}", self.layers_created)?;
334 0 : writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
335 0 : writeln!(
336 0 : s,
337 0 : "write amp: {:>10.2}",
338 0 : self.bytes_written as f64 / self.wal_ingested as f64
339 0 : )?;
340 0 : writeln!(
341 0 : s,
342 0 : "storage amp: {:>10.2}",
343 0 : (self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
344 0 : )?;
345 :
346 0 : Ok(s)
347 0 : }
348 :
349 0 : pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
350 0 : draw::draw_history(&self.history, output)
351 0 : }
352 : }
353 :
354 : impl Default for MockTimeline {
355 0 : fn default() -> Self {
356 0 : Self::new()
357 0 : }
358 : }
359 :
360 : #[derive(Clone)]
361 : pub enum MockLayer {
362 : Delta(Arc<MockDeltaLayer>),
363 : Image(Arc<MockImageLayer>),
364 : }
365 :
366 : impl interface::CompactionLayer<Key> for MockLayer {
367 80 : fn key_range(&self) -> &Range<Key> {
368 80 : match self {
369 76 : MockLayer::Delta(this) => this.key_range(),
370 4 : MockLayer::Image(this) => this.key_range(),
371 : }
372 80 : }
373 528 : fn lsn_range(&self) -> &Range<Lsn> {
374 528 : match self {
375 514 : MockLayer::Delta(this) => this.lsn_range(),
376 14 : MockLayer::Image(this) => this.lsn_range(),
377 : }
378 528 : }
379 0 : fn file_size(&self) -> u64 {
380 0 : match self {
381 0 : MockLayer::Delta(this) => this.file_size(),
382 0 : MockLayer::Image(this) => this.file_size(),
383 : }
384 0 : }
385 2 : fn short_id(&self) -> String {
386 2 : match self {
387 2 : MockLayer::Delta(this) => this.short_id(),
388 0 : MockLayer::Image(this) => this.short_id(),
389 : }
390 2 : }
391 :
392 198 : fn is_delta(&self) -> bool {
393 198 : match self {
394 190 : MockLayer::Delta(_) => true,
395 8 : MockLayer::Image(_) => false,
396 : }
397 198 : }
398 : }
399 :
400 : impl MockLayer {
401 0 : fn is_deleted(&self) -> bool {
402 0 : let guard = match self {
403 0 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
404 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
405 : };
406 0 : *guard
407 0 : }
408 0 : fn mark_deleted(&self) {
409 0 : let mut deleted_guard = match self {
410 0 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
411 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
412 : };
413 0 : assert!(!*deleted_guard, "layer already deleted");
414 0 : *deleted_guard = true;
415 0 : }
416 : }
417 :
418 : impl From<&Arc<MockDeltaLayer>> for MockLayer {
419 0 : fn from(l: &Arc<MockDeltaLayer>) -> Self {
420 0 : MockLayer::Delta(l.clone())
421 0 : }
422 : }
423 :
424 : impl From<&Arc<MockImageLayer>> for MockLayer {
425 0 : fn from(l: &Arc<MockImageLayer>) -> Self {
426 0 : MockLayer::Image(l.clone())
427 0 : }
428 : }
429 :
430 : impl interface::CompactionJobExecutor for MockTimeline {
431 : type Key = Key;
432 : type Layer = MockLayer;
433 : type DeltaLayer = Arc<MockDeltaLayer>;
434 : type ImageLayer = Arc<MockImageLayer>;
435 : type RequestContext = MockRequestContext;
436 :
437 0 : async fn get_layers(
438 0 : &mut self,
439 0 : key_range: &Range<Self::Key>,
440 0 : lsn_range: &Range<Lsn>,
441 0 : _ctx: &Self::RequestContext,
442 0 : ) -> anyhow::Result<Vec<Self::Layer>> {
443 0 : // Clear any deleted layers from our vec
444 0 : self.live_layers.retain(|l| !l.is_deleted());
445 0 :
446 0 : let layers: Vec<MockLayer> = self
447 0 : .live_layers
448 0 : .iter()
449 0 : .filter(|l| {
450 0 : overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
451 0 : })
452 0 : .cloned()
453 0 : .collect();
454 0 :
455 0 : Ok(layers)
456 0 : }
457 :
458 0 : async fn get_keyspace(
459 0 : &mut self,
460 0 : key_range: &Range<Self::Key>,
461 0 : _lsn: Lsn,
462 0 : _ctx: &Self::RequestContext,
463 0 : ) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
464 0 : // find it in the levels
465 0 : if self.old_keyspaces.is_empty() {
466 0 : Ok(crate::helpers::intersect_keyspace(
467 0 : &self.keyspace,
468 0 : key_range,
469 0 : ))
470 : } else {
471 : // not implemented
472 :
473 : // The mock implementation only allows requesting the
474 : // keyspace at the level's end LSN. That's all that the
475 : // current implementation needs.
476 0 : panic!("keyspace not available for requested lsn");
477 : }
478 0 : }
479 :
480 0 : async fn downcast_delta_layer(
481 0 : &self,
482 0 : layer: &MockLayer,
483 0 : ) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
484 0 : Ok(match layer {
485 0 : MockLayer::Delta(l) => Some(l.clone()),
486 0 : MockLayer::Image(_) => None,
487 : })
488 0 : }
489 :
490 0 : async fn create_image(
491 0 : &mut self,
492 0 : lsn: Lsn,
493 0 : key_range: &Range<Key>,
494 0 : ctx: &MockRequestContext,
495 0 : ) -> anyhow::Result<()> {
496 0 : let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
497 :
498 0 : let mut accum_size: u64 = 0;
499 0 : for r in keyspace {
500 0 : accum_size += r.end - r.start;
501 0 : }
502 :
503 0 : let new_layer = Arc::new(MockImageLayer {
504 0 : key_range: key_range.clone(),
505 0 : lsn_range: lsn..lsn,
506 0 : file_size: accum_size * 8192,
507 0 : deleted: Mutex::new(false),
508 0 : });
509 0 : info!(
510 0 : "created image layer, size {}: {}",
511 0 : new_layer.file_size,
512 0 : new_layer.short_id()
513 0 : );
514 0 : self.live_layers.push(MockLayer::Image(new_layer.clone()));
515 0 :
516 0 : // update stats
517 0 : self.bytes_written += new_layer.file_size;
518 0 : self.layers_created += 1;
519 0 :
520 0 : self.time += 1;
521 0 : self.history.push(LayerTraceEvent {
522 0 : time_rel: self.time,
523 0 : op: LayerTraceOp::CreateImage,
524 0 : file: LayerTraceFile {
525 0 : filename: new_layer.short_id(),
526 0 : key_range: new_layer.key_range.clone(),
527 0 : lsn_range: new_layer.lsn_range.clone(),
528 0 : },
529 0 : });
530 0 :
531 0 : Ok(())
532 0 : }
533 :
534 0 : async fn create_delta(
535 0 : &mut self,
536 0 : lsn_range: &Range<Lsn>,
537 0 : key_range: &Range<Key>,
538 0 : input_layers: &[Arc<MockDeltaLayer>],
539 0 : ctx: &MockRequestContext,
540 0 : ) -> anyhow::Result<()> {
541 0 : let mut key_value_stream =
542 0 : std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
543 0 : let mut records: Vec<MockRecord> = Vec::new();
544 0 : let mut total_len = 2;
545 0 : while let Some(delta_entry) = key_value_stream.next().await {
546 0 : let delta_entry: MockRecord = delta_entry?;
547 0 : if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
548 0 : total_len += delta_entry.len;
549 0 : records.push(delta_entry);
550 0 : }
551 : }
552 0 : let total_records = records.len();
553 0 : let new_layer = Arc::new(MockDeltaLayer {
554 0 : key_range: key_range.clone(),
555 0 : lsn_range: lsn_range.clone(),
556 0 : file_size: total_len,
557 0 : records,
558 0 : deleted: Mutex::new(false),
559 0 : });
560 0 : info!(
561 0 : "created delta layer, recs {}, size {}: {}",
562 0 : total_records,
563 0 : total_len,
564 0 : new_layer.short_id()
565 0 : );
566 0 : self.live_layers.push(MockLayer::Delta(new_layer.clone()));
567 0 :
568 0 : // update stats
569 0 : self.bytes_written += total_len;
570 0 : self.layers_created += 1;
571 0 :
572 0 : self.time += 1;
573 0 : self.history.push(LayerTraceEvent {
574 0 : time_rel: self.time,
575 0 : op: LayerTraceOp::CreateDelta,
576 0 : file: LayerTraceFile {
577 0 : filename: new_layer.short_id(),
578 0 : key_range: new_layer.key_range.clone(),
579 0 : lsn_range: new_layer.lsn_range.clone(),
580 0 : },
581 0 : });
582 0 :
583 0 : Ok(())
584 0 : }
585 :
586 0 : async fn delete_layer(
587 0 : &mut self,
588 0 : layer: &Self::Layer,
589 0 : _ctx: &MockRequestContext,
590 0 : ) -> anyhow::Result<()> {
591 0 : let layer = std::pin::pin!(layer);
592 0 : info!("deleting layer: {}", layer.short_id());
593 0 : self.num_deleted_layers += 1;
594 0 : self.bytes_deleted += layer.file_size();
595 0 : layer.mark_deleted();
596 0 :
597 0 : self.time += 1;
598 0 : self.history.push(LayerTraceEvent {
599 0 : time_rel: self.time,
600 0 : op: LayerTraceOp::Delete,
601 0 : file: LayerTraceFile {
602 0 : filename: layer.short_id(),
603 0 : key_range: layer.key_range().clone(),
604 0 : lsn_range: layer.lsn_range().clone(),
605 0 : },
606 0 : });
607 0 :
608 0 : Ok(())
609 0 : }
610 : }
|