Line data Source code
1 : use std::future::Future;
2 : use std::ops::Range;
3 : use std::sync::Arc;
4 :
5 : use bytes::Bytes;
6 : use pageserver_api::key::{KEY_SIZE, Key};
7 : use pageserver_api::value::Value;
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::TimelineId;
10 : use utils::lsn::Lsn;
11 : use utils::shard::TenantShardId;
12 :
13 : use super::layer::S3_UPLOAD_LIMIT;
14 : use super::{
15 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
16 : };
17 : use crate::config::PageServerConf;
18 : use crate::context::RequestContext;
19 : use crate::tenant::Timeline;
20 : use crate::tenant::storage_layer::Layer;
21 :
22 : pub(crate) enum BatchWriterResult {
23 : Produced(ResidentLayer),
24 : Discarded(PersistentLayerKey),
25 : }
26 :
27 : #[cfg(test)]
28 : impl BatchWriterResult {
29 48 : fn into_resident_layer(self) -> ResidentLayer {
30 48 : match self {
31 48 : BatchWriterResult::Produced(layer) => layer,
32 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
33 : }
34 48 : }
35 :
36 32 : fn into_discarded_layer(self) -> PersistentLayerKey {
37 32 : match self {
38 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
39 32 : BatchWriterResult::Discarded(layer) => layer,
40 32 : }
41 32 : }
42 : }
43 :
44 : enum LayerWriterWrapper {
45 : Image(ImageLayerWriter),
46 : Delta(DeltaLayerWriter),
47 : }
48 :
49 : /// An layer writer that takes unfinished layers and finish them atomically.
50 : #[must_use]
51 : pub struct BatchLayerWriter {
52 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
53 : conf: &'static PageServerConf,
54 : }
55 :
56 : impl BatchLayerWriter {
57 1392 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
58 1392 : Ok(Self {
59 1392 : generated_layer_writers: Vec::new(),
60 1392 : conf,
61 1392 : })
62 1392 : }
63 :
64 628 : pub fn add_unfinished_image_writer(
65 628 : &mut self,
66 628 : writer: ImageLayerWriter,
67 628 : key_range: Range<Key>,
68 628 : lsn: Lsn,
69 628 : ) {
70 628 : self.generated_layer_writers.push((
71 628 : LayerWriterWrapper::Image(writer),
72 628 : PersistentLayerKey {
73 628 : key_range,
74 628 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
75 628 : is_delta: false,
76 628 : },
77 628 : ));
78 628 : }
79 :
80 120 : pub fn add_unfinished_delta_writer(
81 120 : &mut self,
82 120 : writer: DeltaLayerWriter,
83 120 : key_range: Range<Key>,
84 120 : lsn_range: Range<Lsn>,
85 120 : ) {
86 120 : self.generated_layer_writers.push((
87 120 : LayerWriterWrapper::Delta(writer),
88 120 : PersistentLayerKey {
89 120 : key_range,
90 120 : lsn_range,
91 120 : is_delta: true,
92 120 : },
93 120 : ));
94 120 : }
95 :
96 1160 : pub(crate) async fn finish(
97 1160 : self,
98 1160 : tline: &Arc<Timeline>,
99 1160 : ctx: &RequestContext,
100 1160 : ) -> anyhow::Result<Vec<ResidentLayer>> {
101 1160 : let res = self
102 1160 : .finish_with_discard_fn(tline, ctx, |_| async { false })
103 1160 : .await?;
104 1160 : let mut output = Vec::new();
105 1668 : for r in res {
106 508 : if let BatchWriterResult::Produced(layer) = r {
107 508 : output.push(layer);
108 508 : }
109 : }
110 1160 : Ok(output)
111 1160 : }
112 :
113 1368 : pub(crate) async fn finish_with_discard_fn<D, F>(
114 1368 : self,
115 1368 : tline: &Arc<Timeline>,
116 1368 : ctx: &RequestContext,
117 1368 : discard_fn: D,
118 1368 : ) -> anyhow::Result<Vec<BatchWriterResult>>
119 1368 : where
120 1368 : D: Fn(&PersistentLayerKey) -> F,
121 1368 : F: Future<Output = bool>,
122 1368 : {
123 1368 : let Self {
124 1368 : generated_layer_writers,
125 1368 : ..
126 1368 : } = self;
127 1368 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
128 0 : for produced_layer in generated_layers {
129 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
130 0 : let layer: Layer = resident_layer.into();
131 0 : layer.delete_on_drop();
132 0 : }
133 : }
134 0 : };
135 : // BEGIN: catch every error and do the recovery in the below section
136 1368 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
137 2116 : for (inner, layer_key) in generated_layer_writers {
138 748 : if discard_fn(&layer_key).await {
139 76 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
140 76 : } else {
141 672 : let res = match inner {
142 76 : LayerWriterWrapper::Delta(writer) => {
143 76 : writer.finish(layer_key.key_range.end, ctx).await
144 : }
145 596 : LayerWriterWrapper::Image(writer) => {
146 596 : writer
147 596 : .finish_with_end_key(layer_key.key_range.end, ctx)
148 596 : .await
149 : }
150 : };
151 672 : let layer = match res {
152 672 : Ok((desc, path)) => {
153 672 : match Layer::finish_creating(self.conf, tline, desc, &path) {
154 672 : Ok(layer) => layer,
155 0 : Err(e) => {
156 0 : tokio::fs::remove_file(&path).await.ok();
157 0 : clean_up_layers(generated_layers);
158 0 : return Err(e);
159 : }
160 : }
161 : }
162 0 : Err(e) => {
163 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
164 0 : // so we don't need to remove the layer we just failed to create by ourselves.
165 0 : clean_up_layers(generated_layers);
166 0 : return Err(e);
167 : }
168 : };
169 672 : generated_layers.push(BatchWriterResult::Produced(layer));
170 : }
171 : }
172 : // END: catch every error and do the recovery in the above section
173 1368 : Ok(generated_layers)
174 1368 : }
175 :
176 0 : pub fn pending_layer_num(&self) -> usize {
177 0 : self.generated_layer_writers.len()
178 0 : }
179 : }
180 :
181 : /// An image writer that takes images and produces multiple image layers.
182 : #[must_use]
183 : pub struct SplitImageLayerWriter<'a> {
184 : inner: ImageLayerWriter,
185 : target_layer_size: u64,
186 : lsn: Lsn,
187 : conf: &'static PageServerConf,
188 : timeline_id: TimelineId,
189 : tenant_shard_id: TenantShardId,
190 : batches: BatchLayerWriter,
191 : start_key: Key,
192 : gate: &'a utils::sync::gate::Gate,
193 : cancel: CancellationToken,
194 : }
195 :
196 : impl<'a> SplitImageLayerWriter<'a> {
197 : #[allow(clippy::too_many_arguments)]
198 104 : pub async fn new(
199 104 : conf: &'static PageServerConf,
200 104 : timeline_id: TimelineId,
201 104 : tenant_shard_id: TenantShardId,
202 104 : start_key: Key,
203 104 : lsn: Lsn,
204 104 : target_layer_size: u64,
205 104 : gate: &'a utils::sync::gate::Gate,
206 104 : cancel: CancellationToken,
207 104 : ctx: &RequestContext,
208 104 : ) -> anyhow::Result<Self> {
209 104 : Ok(Self {
210 104 : target_layer_size,
211 104 : inner: ImageLayerWriter::new(
212 104 : conf,
213 104 : timeline_id,
214 104 : tenant_shard_id,
215 104 : &(start_key..Key::MAX),
216 104 : lsn,
217 104 : gate,
218 104 : cancel.clone(),
219 104 : ctx,
220 104 : )
221 104 : .await?,
222 104 : conf,
223 104 : timeline_id,
224 104 : tenant_shard_id,
225 104 : batches: BatchLayerWriter::new(conf).await?,
226 104 : lsn,
227 104 : start_key,
228 104 : gate,
229 104 : cancel,
230 : })
231 104 : }
232 :
233 17212 : pub async fn put_image(
234 17212 : &mut self,
235 17212 : key: Key,
236 17212 : img: Bytes,
237 17212 : ctx: &RequestContext,
238 17212 : ) -> anyhow::Result<()> {
239 17212 : // The current estimation is an upper bound of the space that the key/image could take
240 17212 : // because we did not consider compression in this estimation. The resulting image layer
241 17212 : // could be smaller than the target size.
242 17212 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
243 17212 : if self.inner.num_keys() >= 1
244 17108 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
245 : {
246 28 : let next_image_writer = ImageLayerWriter::new(
247 28 : self.conf,
248 28 : self.timeline_id,
249 28 : self.tenant_shard_id,
250 28 : &(key..Key::MAX),
251 28 : self.lsn,
252 28 : self.gate,
253 28 : self.cancel.clone(),
254 28 : ctx,
255 28 : )
256 28 : .await?;
257 28 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
258 28 : self.batches.add_unfinished_image_writer(
259 28 : prev_image_writer,
260 28 : self.start_key..key,
261 28 : self.lsn,
262 28 : );
263 28 : self.start_key = key;
264 17184 : }
265 17212 : self.inner.put_image(key, img, ctx).await
266 17212 : }
267 :
268 92 : pub(crate) async fn finish_with_discard_fn<D, F>(
269 92 : self,
270 92 : tline: &Arc<Timeline>,
271 92 : ctx: &RequestContext,
272 92 : end_key: Key,
273 92 : discard_fn: D,
274 92 : ) -> anyhow::Result<Vec<BatchWriterResult>>
275 92 : where
276 92 : D: Fn(&PersistentLayerKey) -> F,
277 92 : F: Future<Output = bool>,
278 92 : {
279 92 : let Self {
280 92 : mut batches, inner, ..
281 92 : } = self;
282 92 : if inner.num_keys() != 0 {
283 92 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
284 92 : }
285 92 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
286 92 : }
287 :
288 : #[cfg(test)]
289 8 : pub(crate) async fn finish(
290 8 : self,
291 8 : tline: &Arc<Timeline>,
292 8 : ctx: &RequestContext,
293 8 : end_key: Key,
294 8 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
295 12 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
296 8 : .await
297 8 : }
298 : }
299 :
300 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
301 : ///
302 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
303 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
304 : /// will split them into multiple files based on size.
305 : #[must_use]
306 : pub struct SplitDeltaLayerWriter<'a> {
307 : inner: Option<(Key, DeltaLayerWriter)>,
308 : target_layer_size: u64,
309 : conf: &'static PageServerConf,
310 : timeline_id: TimelineId,
311 : tenant_shard_id: TenantShardId,
312 : lsn_range: Range<Lsn>,
313 : last_key_written: Key,
314 : batches: BatchLayerWriter,
315 : gate: &'a utils::sync::gate::Gate,
316 : cancel: CancellationToken,
317 : }
318 :
319 : impl<'a> SplitDeltaLayerWriter<'a> {
320 128 : pub async fn new(
321 128 : conf: &'static PageServerConf,
322 128 : timeline_id: TimelineId,
323 128 : tenant_shard_id: TenantShardId,
324 128 : lsn_range: Range<Lsn>,
325 128 : target_layer_size: u64,
326 128 : gate: &'a utils::sync::gate::Gate,
327 128 : cancel: CancellationToken,
328 128 : ) -> anyhow::Result<Self> {
329 128 : Ok(Self {
330 128 : target_layer_size,
331 128 : inner: None,
332 128 : conf,
333 128 : timeline_id,
334 128 : tenant_shard_id,
335 128 : lsn_range,
336 128 : last_key_written: Key::MIN,
337 128 : batches: BatchLayerWriter::new(conf).await?,
338 128 : gate,
339 128 : cancel,
340 : })
341 128 : }
342 :
343 24416 : pub async fn put_value(
344 24416 : &mut self,
345 24416 : key: Key,
346 24416 : lsn: Lsn,
347 24416 : val: Value,
348 24416 : ctx: &RequestContext,
349 24416 : ) -> anyhow::Result<()> {
350 24416 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
351 24416 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
352 24416 : //
353 24416 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
354 24416 : // strategy. https://github.com/neondatabase/neon/issues/8837
355 24416 :
356 24416 : if self.inner.is_none() {
357 100 : self.inner = Some((
358 100 : key,
359 100 : DeltaLayerWriter::new(
360 100 : self.conf,
361 100 : self.timeline_id,
362 100 : self.tenant_shard_id,
363 100 : key,
364 100 : self.lsn_range.clone(),
365 100 : self.gate,
366 100 : self.cancel.clone(),
367 100 : ctx,
368 100 : )
369 100 : .await?,
370 : ));
371 24316 : }
372 24416 : let (_, inner) = self.inner.as_mut().unwrap();
373 24416 :
374 24416 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
375 24416 : if inner.num_keys() >= 1
376 24316 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
377 : {
378 5992 : if key != self.last_key_written {
379 28 : let next_delta_writer = DeltaLayerWriter::new(
380 28 : self.conf,
381 28 : self.timeline_id,
382 28 : self.tenant_shard_id,
383 28 : key,
384 28 : self.lsn_range.clone(),
385 28 : self.gate,
386 28 : self.cancel.clone(),
387 28 : ctx,
388 28 : )
389 28 : .await?;
390 28 : let (start_key, prev_delta_writer) =
391 28 : self.inner.replace((key, next_delta_writer)).unwrap();
392 28 : self.batches.add_unfinished_delta_writer(
393 28 : prev_delta_writer,
394 28 : start_key..key,
395 28 : self.lsn_range.clone(),
396 28 : );
397 5964 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
398 : // We have to produce a very large file b/c a key is updated too often.
399 0 : anyhow::bail!(
400 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
401 0 : key,
402 0 : inner.estimated_size()
403 0 : );
404 5964 : }
405 18424 : }
406 24416 : self.last_key_written = key;
407 24416 : let (_, inner) = self.inner.as_mut().unwrap();
408 24416 : inner.put_value(key, lsn, val, ctx).await
409 24416 : }
410 :
411 116 : pub(crate) async fn finish_with_discard_fn<D, F>(
412 116 : self,
413 116 : tline: &Arc<Timeline>,
414 116 : ctx: &RequestContext,
415 116 : discard_fn: D,
416 116 : ) -> anyhow::Result<Vec<BatchWriterResult>>
417 116 : where
418 116 : D: Fn(&PersistentLayerKey) -> F,
419 116 : F: Future<Output = bool>,
420 116 : {
421 116 : let Self {
422 116 : mut batches, inner, ..
423 116 : } = self;
424 116 : if let Some((start_key, writer)) = inner {
425 92 : if writer.num_keys() != 0 {
426 92 : let end_key = self.last_key_written.next();
427 92 : batches.add_unfinished_delta_writer(
428 92 : writer,
429 92 : start_key..end_key,
430 92 : self.lsn_range.clone(),
431 92 : );
432 92 : }
433 24 : }
434 116 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
435 116 : }
436 :
437 : #[cfg(test)]
438 12 : pub(crate) async fn finish(
439 12 : self,
440 12 : tline: &Arc<Timeline>,
441 12 : ctx: &RequestContext,
442 12 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
443 16 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
444 12 : .await
445 12 : }
446 : }
447 :
448 : #[cfg(test)]
449 : mod tests {
450 : use itertools::Itertools;
451 : use rand::{RngCore, SeedableRng};
452 :
453 : use super::*;
454 : use crate::DEFAULT_PG_VERSION;
455 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
456 : use crate::tenant::storage_layer::AsLayerDesc;
457 :
458 40104 : fn get_key(id: u32) -> Key {
459 40104 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
460 40104 : key.field6 = id;
461 40104 : key
462 40104 : }
463 :
464 16 : fn get_img(id: u32) -> Bytes {
465 16 : format!("{id:064}").into()
466 16 : }
467 :
468 40008 : fn get_large_img() -> Bytes {
469 40008 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
470 40008 : let mut data = vec![0; 8192];
471 40008 : rng.fill_bytes(&mut data);
472 40008 : data.into()
473 40008 : }
474 :
475 : #[tokio::test]
476 4 : async fn write_one_image() {
477 4 : let harness = TenantHarness::create("split_writer_write_one_image")
478 4 : .await
479 4 : .unwrap();
480 4 : let (tenant, ctx) = harness.load().await;
481 4 :
482 4 : let tline = tenant
483 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
484 4 : .await
485 4 : .unwrap();
486 4 :
487 4 : let mut image_writer = SplitImageLayerWriter::new(
488 4 : tenant.conf,
489 4 : tline.timeline_id,
490 4 : tenant.tenant_shard_id,
491 4 : get_key(0),
492 4 : Lsn(0x18),
493 4 : 4 * 1024 * 1024,
494 4 : &tline.gate,
495 4 : tline.cancel.clone(),
496 4 : &ctx,
497 4 : )
498 4 : .await
499 4 : .unwrap();
500 4 :
501 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
502 4 : tenant.conf,
503 4 : tline.timeline_id,
504 4 : tenant.tenant_shard_id,
505 4 : Lsn(0x18)..Lsn(0x20),
506 4 : 4 * 1024 * 1024,
507 4 : &tline.gate,
508 4 : tline.cancel.clone(),
509 4 : )
510 4 : .await
511 4 : .unwrap();
512 4 :
513 4 : image_writer
514 4 : .put_image(get_key(0), get_img(0), &ctx)
515 4 : .await
516 4 : .unwrap();
517 4 : let layers = image_writer
518 4 : .finish(&tline, &ctx, get_key(10))
519 4 : .await
520 4 : .unwrap();
521 4 : assert_eq!(layers.len(), 1);
522 4 :
523 4 : delta_writer
524 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
525 4 : .await
526 4 : .unwrap();
527 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
528 4 : assert_eq!(layers.len(), 1);
529 4 : assert_eq!(
530 4 : layers
531 4 : .into_iter()
532 4 : .next()
533 4 : .unwrap()
534 4 : .into_resident_layer()
535 4 : .layer_desc()
536 4 : .key(),
537 4 : PersistentLayerKey {
538 4 : key_range: get_key(0)..get_key(1),
539 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
540 4 : is_delta: true
541 4 : }
542 4 : );
543 4 : }
544 :
545 : #[tokio::test]
546 4 : async fn write_split() {
547 4 : // Test the split writer with retaining all the layers we have produced (discard=false)
548 4 : write_split_helper("split_writer_write_split", false).await;
549 4 : }
550 :
551 : #[tokio::test]
552 4 : async fn write_split_discard() {
553 4 : // Test the split writer with discarding all the layers we have produced (discard=true)
554 4 : write_split_helper("split_writer_write_split_discard", true).await;
555 4 : }
556 :
557 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
558 : /// set to true, all layers will be discarded.
559 8 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
560 8 : let harness = TenantHarness::create(harness_name).await.unwrap();
561 8 : let (tenant, ctx) = harness.load().await;
562 :
563 8 : let tline = tenant
564 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
565 8 : .await
566 8 : .unwrap();
567 :
568 8 : let mut image_writer = SplitImageLayerWriter::new(
569 8 : tenant.conf,
570 8 : tline.timeline_id,
571 8 : tenant.tenant_shard_id,
572 8 : get_key(0),
573 8 : Lsn(0x18),
574 8 : 4 * 1024 * 1024,
575 8 : &tline.gate,
576 8 : tline.cancel.clone(),
577 8 : &ctx,
578 8 : )
579 8 : .await
580 8 : .unwrap();
581 8 : let mut delta_writer = SplitDeltaLayerWriter::new(
582 8 : tenant.conf,
583 8 : tline.timeline_id,
584 8 : tenant.tenant_shard_id,
585 8 : Lsn(0x18)..Lsn(0x20),
586 8 : 4 * 1024 * 1024,
587 8 : &tline.gate,
588 8 : tline.cancel.clone(),
589 8 : )
590 8 : .await
591 8 : .unwrap();
592 : const N: usize = 2000;
593 16008 : for i in 0..N {
594 16000 : let i = i as u32;
595 16000 : image_writer
596 16000 : .put_image(get_key(i), get_large_img(), &ctx)
597 16000 : .await
598 16000 : .unwrap();
599 16000 : delta_writer
600 16000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
601 16000 : .await
602 16000 : .unwrap();
603 : }
604 8 : let image_layers = image_writer
605 32 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
606 8 : .await
607 8 : .unwrap();
608 8 : let delta_layers = delta_writer
609 32 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
610 8 : .await
611 8 : .unwrap();
612 8 : let image_layers = image_layers
613 8 : .into_iter()
614 32 : .map(|x| {
615 32 : if discard {
616 16 : x.into_discarded_layer()
617 : } else {
618 16 : x.into_resident_layer().layer_desc().key()
619 : }
620 32 : })
621 8 : .collect_vec();
622 8 : let delta_layers = delta_layers
623 8 : .into_iter()
624 32 : .map(|x| {
625 32 : if discard {
626 16 : x.into_discarded_layer()
627 : } else {
628 16 : x.into_resident_layer().layer_desc().key()
629 : }
630 32 : })
631 8 : .collect_vec();
632 8 : assert_eq!(image_layers.len(), N / 512 + 1);
633 8 : assert_eq!(delta_layers.len(), N / 512 + 1);
634 8 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
635 8 : assert_eq!(
636 8 : delta_layers.last().unwrap().key_range.end,
637 8 : get_key(N as u32)
638 8 : );
639 32 : for idx in 0..image_layers.len() {
640 32 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
641 32 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
642 32 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
643 32 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
644 32 : if idx > 0 {
645 24 : assert_eq!(
646 24 : image_layers[idx - 1].key_range.end,
647 24 : image_layers[idx].key_range.start
648 24 : );
649 24 : assert_eq!(
650 24 : delta_layers[idx - 1].key_range.end,
651 24 : delta_layers[idx].key_range.start
652 24 : );
653 8 : }
654 : }
655 8 : }
656 :
657 : #[tokio::test]
658 4 : async fn write_large_img() {
659 4 : let harness = TenantHarness::create("split_writer_write_large_img")
660 4 : .await
661 4 : .unwrap();
662 4 : let (tenant, ctx) = harness.load().await;
663 4 :
664 4 : let tline = tenant
665 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
666 4 : .await
667 4 : .unwrap();
668 4 :
669 4 : let mut image_writer = SplitImageLayerWriter::new(
670 4 : tenant.conf,
671 4 : tline.timeline_id,
672 4 : tenant.tenant_shard_id,
673 4 : get_key(0),
674 4 : Lsn(0x18),
675 4 : 4 * 1024,
676 4 : &tline.gate,
677 4 : tline.cancel.clone(),
678 4 : &ctx,
679 4 : )
680 4 : .await
681 4 : .unwrap();
682 4 :
683 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
684 4 : tenant.conf,
685 4 : tline.timeline_id,
686 4 : tenant.tenant_shard_id,
687 4 : Lsn(0x18)..Lsn(0x20),
688 4 : 4 * 1024,
689 4 : &tline.gate,
690 4 : tline.cancel.clone(),
691 4 : )
692 4 : .await
693 4 : .unwrap();
694 4 :
695 4 : image_writer
696 4 : .put_image(get_key(0), get_img(0), &ctx)
697 4 : .await
698 4 : .unwrap();
699 4 : image_writer
700 4 : .put_image(get_key(1), get_large_img(), &ctx)
701 4 : .await
702 4 : .unwrap();
703 4 : let layers = image_writer
704 4 : .finish(&tline, &ctx, get_key(10))
705 4 : .await
706 4 : .unwrap();
707 4 : assert_eq!(layers.len(), 2);
708 4 :
709 4 : delta_writer
710 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
711 4 : .await
712 4 : .unwrap();
713 4 : delta_writer
714 4 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
715 4 : .await
716 4 : .unwrap();
717 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
718 4 : assert_eq!(layers.len(), 2);
719 4 : let mut layers_iter = layers.into_iter();
720 4 : assert_eq!(
721 4 : layers_iter
722 4 : .next()
723 4 : .unwrap()
724 4 : .into_resident_layer()
725 4 : .layer_desc()
726 4 : .key(),
727 4 : PersistentLayerKey {
728 4 : key_range: get_key(0)..get_key(1),
729 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
730 4 : is_delta: true
731 4 : }
732 4 : );
733 4 : assert_eq!(
734 4 : layers_iter
735 4 : .next()
736 4 : .unwrap()
737 4 : .into_resident_layer()
738 4 : .layer_desc()
739 4 : .key(),
740 4 : PersistentLayerKey {
741 4 : key_range: get_key(1)..get_key(2),
742 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
743 4 : is_delta: true
744 4 : }
745 4 : );
746 4 : }
747 :
748 : #[tokio::test]
749 4 : async fn write_split_single_key() {
750 4 : let harness = TenantHarness::create("split_writer_write_split_single_key")
751 4 : .await
752 4 : .unwrap();
753 4 : let (tenant, ctx) = harness.load().await;
754 4 :
755 4 : let tline = tenant
756 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
757 4 : .await
758 4 : .unwrap();
759 4 :
760 4 : const N: usize = 2000;
761 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
762 4 : tenant.conf,
763 4 : tline.timeline_id,
764 4 : tenant.tenant_shard_id,
765 4 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
766 4 : 4 * 1024 * 1024,
767 4 : &tline.gate,
768 4 : tline.cancel.clone(),
769 4 : )
770 4 : .await
771 4 : .unwrap();
772 4 :
773 8004 : for i in 0..N {
774 8000 : let i = i as u32;
775 8000 : delta_writer
776 8000 : .put_value(
777 8000 : get_key(0),
778 8000 : Lsn(i as u64 * 16 + 0x10),
779 8000 : Value::Image(get_large_img()),
780 8000 : &ctx,
781 8000 : )
782 8000 : .await
783 8000 : .unwrap();
784 4 : }
785 4 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
786 4 : assert_eq!(delta_layers.len(), 1);
787 4 : let delta_layer = delta_layers
788 4 : .into_iter()
789 4 : .next()
790 4 : .unwrap()
791 4 : .into_resident_layer();
792 4 : assert_eq!(
793 4 : delta_layer.layer_desc().key(),
794 4 : PersistentLayerKey {
795 4 : key_range: get_key(0)..get_key(1),
796 4 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
797 4 : is_delta: true
798 4 : }
799 4 : );
800 4 : }
801 : }
|