Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
9 : use pageserver_api::value::Value;
10 :
11 : use super::layer::S3_UPLOAD_LIMIT;
12 : use super::{
13 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
14 : };
15 :
16 : pub(crate) enum BatchWriterResult {
17 : Produced(ResidentLayer),
18 : Discarded(PersistentLayerKey),
19 : }
20 :
21 : #[cfg(test)]
22 : impl BatchWriterResult {
23 48 : fn into_resident_layer(self) -> ResidentLayer {
24 48 : match self {
25 48 : BatchWriterResult::Produced(layer) => layer,
26 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
27 : }
28 48 : }
29 :
30 32 : fn into_discarded_layer(self) -> PersistentLayerKey {
31 32 : match self {
32 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
33 32 : BatchWriterResult::Discarded(layer) => layer,
34 32 : }
35 32 : }
36 : }
37 :
38 : enum LayerWriterWrapper {
39 : Image(ImageLayerWriter),
40 : Delta(DeltaLayerWriter),
41 : }
42 :
43 : /// An layer writer that takes unfinished layers and finish them atomically.
44 : #[must_use]
45 : pub struct BatchLayerWriter {
46 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
47 : conf: &'static PageServerConf,
48 : }
49 :
50 : impl BatchLayerWriter {
51 224 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
52 224 : Ok(Self {
53 224 : generated_layer_writers: Vec::new(),
54 224 : conf,
55 224 : })
56 224 : }
57 :
58 120 : pub fn add_unfinished_image_writer(
59 120 : &mut self,
60 120 : writer: ImageLayerWriter,
61 120 : key_range: Range<Key>,
62 120 : lsn: Lsn,
63 120 : ) {
64 120 : self.generated_layer_writers.push((
65 120 : LayerWriterWrapper::Image(writer),
66 120 : PersistentLayerKey {
67 120 : key_range,
68 120 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
69 120 : is_delta: false,
70 120 : },
71 120 : ));
72 120 : }
73 :
74 120 : pub fn add_unfinished_delta_writer(
75 120 : &mut self,
76 120 : writer: DeltaLayerWriter,
77 120 : key_range: Range<Key>,
78 120 : lsn_range: Range<Lsn>,
79 120 : ) {
80 120 : self.generated_layer_writers.push((
81 120 : LayerWriterWrapper::Delta(writer),
82 120 : PersistentLayerKey {
83 120 : key_range,
84 120 : lsn_range,
85 120 : is_delta: true,
86 120 : },
87 120 : ));
88 120 : }
89 :
90 208 : pub(crate) async fn finish_with_discard_fn<D, F>(
91 208 : self,
92 208 : tline: &Arc<Timeline>,
93 208 : ctx: &RequestContext,
94 208 : discard_fn: D,
95 208 : ) -> anyhow::Result<Vec<BatchWriterResult>>
96 208 : where
97 208 : D: Fn(&PersistentLayerKey) -> F,
98 208 : F: Future<Output = bool>,
99 208 : {
100 208 : let Self {
101 208 : generated_layer_writers,
102 208 : ..
103 208 : } = self;
104 208 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
105 0 : for produced_layer in generated_layers {
106 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
107 0 : let layer: Layer = resident_layer.into();
108 0 : layer.delete_on_drop();
109 0 : }
110 : }
111 0 : };
112 : // BEGIN: catch every error and do the recovery in the below section
113 208 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
114 448 : for (inner, layer_key) in generated_layer_writers {
115 240 : if discard_fn(&layer_key).await {
116 76 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
117 76 : } else {
118 164 : let res = match inner {
119 76 : LayerWriterWrapper::Delta(writer) => {
120 76 : writer.finish(layer_key.key_range.end, ctx).await
121 : }
122 88 : LayerWriterWrapper::Image(writer) => {
123 88 : writer
124 88 : .finish_with_end_key(layer_key.key_range.end, ctx)
125 88 : .await
126 : }
127 : };
128 164 : let layer = match res {
129 164 : Ok((desc, path)) => {
130 164 : match Layer::finish_creating(self.conf, tline, desc, &path) {
131 164 : Ok(layer) => layer,
132 0 : Err(e) => {
133 0 : tokio::fs::remove_file(&path).await.ok();
134 0 : clean_up_layers(generated_layers);
135 0 : return Err(e);
136 : }
137 : }
138 : }
139 0 : Err(e) => {
140 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
141 0 : // so we don't need to remove the layer we just failed to create by ourselves.
142 0 : clean_up_layers(generated_layers);
143 0 : return Err(e);
144 : }
145 : };
146 164 : generated_layers.push(BatchWriterResult::Produced(layer));
147 : }
148 : }
149 : // END: catch every error and do the recovery in the above section
150 208 : Ok(generated_layers)
151 208 : }
152 : }
153 :
154 : /// An image writer that takes images and produces multiple image layers.
155 : #[must_use]
156 : pub struct SplitImageLayerWriter {
157 : inner: ImageLayerWriter,
158 : target_layer_size: u64,
159 : lsn: Lsn,
160 : conf: &'static PageServerConf,
161 : timeline_id: TimelineId,
162 : tenant_shard_id: TenantShardId,
163 : batches: BatchLayerWriter,
164 : start_key: Key,
165 : }
166 :
167 : impl SplitImageLayerWriter {
168 100 : pub async fn new(
169 100 : conf: &'static PageServerConf,
170 100 : timeline_id: TimelineId,
171 100 : tenant_shard_id: TenantShardId,
172 100 : start_key: Key,
173 100 : lsn: Lsn,
174 100 : target_layer_size: u64,
175 100 : ctx: &RequestContext,
176 100 : ) -> anyhow::Result<Self> {
177 100 : Ok(Self {
178 100 : target_layer_size,
179 100 : inner: ImageLayerWriter::new(
180 100 : conf,
181 100 : timeline_id,
182 100 : tenant_shard_id,
183 100 : &(start_key..Key::MAX),
184 100 : lsn,
185 100 : ctx,
186 100 : )
187 100 : .await?,
188 100 : conf,
189 100 : timeline_id,
190 100 : tenant_shard_id,
191 100 : batches: BatchLayerWriter::new(conf).await?,
192 100 : lsn,
193 100 : start_key,
194 : })
195 100 : }
196 :
197 17180 : pub async fn put_image(
198 17180 : &mut self,
199 17180 : key: Key,
200 17180 : img: Bytes,
201 17180 : ctx: &RequestContext,
202 17180 : ) -> anyhow::Result<()> {
203 17180 : // The current estimation is an upper bound of the space that the key/image could take
204 17180 : // because we did not consider compression in this estimation. The resulting image layer
205 17180 : // could be smaller than the target size.
206 17180 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
207 17180 : if self.inner.num_keys() >= 1
208 17080 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
209 : {
210 28 : let next_image_writer = ImageLayerWriter::new(
211 28 : self.conf,
212 28 : self.timeline_id,
213 28 : self.tenant_shard_id,
214 28 : &(key..Key::MAX),
215 28 : self.lsn,
216 28 : ctx,
217 28 : )
218 28 : .await?;
219 28 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
220 28 : self.batches.add_unfinished_image_writer(
221 28 : prev_image_writer,
222 28 : self.start_key..key,
223 28 : self.lsn,
224 28 : );
225 28 : self.start_key = key;
226 17152 : }
227 17180 : self.inner.put_image(key, img, ctx).await
228 17180 : }
229 :
230 92 : pub(crate) async fn finish_with_discard_fn<D, F>(
231 92 : self,
232 92 : tline: &Arc<Timeline>,
233 92 : ctx: &RequestContext,
234 92 : end_key: Key,
235 92 : discard_fn: D,
236 92 : ) -> anyhow::Result<Vec<BatchWriterResult>>
237 92 : where
238 92 : D: Fn(&PersistentLayerKey) -> F,
239 92 : F: Future<Output = bool>,
240 92 : {
241 92 : let Self {
242 92 : mut batches, inner, ..
243 92 : } = self;
244 92 : if inner.num_keys() != 0 {
245 92 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
246 92 : }
247 92 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
248 92 : }
249 :
250 : #[cfg(test)]
251 8 : pub(crate) async fn finish(
252 8 : self,
253 8 : tline: &Arc<Timeline>,
254 8 : ctx: &RequestContext,
255 8 : end_key: Key,
256 8 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
257 12 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
258 8 : .await
259 8 : }
260 : }
261 :
262 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
263 : ///
264 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
265 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
266 : /// will split them into multiple files based on size.
267 : #[must_use]
268 : pub struct SplitDeltaLayerWriter {
269 : inner: Option<(Key, DeltaLayerWriter)>,
270 : target_layer_size: u64,
271 : conf: &'static PageServerConf,
272 : timeline_id: TimelineId,
273 : tenant_shard_id: TenantShardId,
274 : lsn_range: Range<Lsn>,
275 : last_key_written: Key,
276 : batches: BatchLayerWriter,
277 : }
278 :
279 : impl SplitDeltaLayerWriter {
280 124 : pub async fn new(
281 124 : conf: &'static PageServerConf,
282 124 : timeline_id: TimelineId,
283 124 : tenant_shard_id: TenantShardId,
284 124 : lsn_range: Range<Lsn>,
285 124 : target_layer_size: u64,
286 124 : ) -> anyhow::Result<Self> {
287 124 : Ok(Self {
288 124 : target_layer_size,
289 124 : inner: None,
290 124 : conf,
291 124 : timeline_id,
292 124 : tenant_shard_id,
293 124 : lsn_range,
294 124 : last_key_written: Key::MIN,
295 124 : batches: BatchLayerWriter::new(conf).await?,
296 : })
297 124 : }
298 :
299 24416 : pub async fn put_value(
300 24416 : &mut self,
301 24416 : key: Key,
302 24416 : lsn: Lsn,
303 24416 : val: Value,
304 24416 : ctx: &RequestContext,
305 24416 : ) -> anyhow::Result<()> {
306 24416 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
307 24416 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
308 24416 : //
309 24416 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
310 24416 : // strategy. https://github.com/neondatabase/neon/issues/8837
311 24416 :
312 24416 : if self.inner.is_none() {
313 100 : self.inner = Some((
314 100 : key,
315 100 : DeltaLayerWriter::new(
316 100 : self.conf,
317 100 : self.timeline_id,
318 100 : self.tenant_shard_id,
319 100 : key,
320 100 : self.lsn_range.clone(),
321 100 : ctx,
322 100 : )
323 100 : .await?,
324 : ));
325 24316 : }
326 24416 : let (_, inner) = self.inner.as_mut().unwrap();
327 24416 :
328 24416 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
329 24416 : if inner.num_keys() >= 1
330 24316 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
331 : {
332 5992 : if key != self.last_key_written {
333 28 : let next_delta_writer = DeltaLayerWriter::new(
334 28 : self.conf,
335 28 : self.timeline_id,
336 28 : self.tenant_shard_id,
337 28 : key,
338 28 : self.lsn_range.clone(),
339 28 : ctx,
340 28 : )
341 28 : .await?;
342 28 : let (start_key, prev_delta_writer) =
343 28 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
344 28 : self.batches.add_unfinished_delta_writer(
345 28 : prev_delta_writer,
346 28 : start_key..key,
347 28 : self.lsn_range.clone(),
348 28 : );
349 5964 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
350 : // We have to produce a very large file b/c a key is updated too often.
351 0 : anyhow::bail!(
352 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
353 0 : key,
354 0 : inner.estimated_size()
355 0 : );
356 5964 : }
357 18424 : }
358 24416 : self.last_key_written = key;
359 24416 : let (_, inner) = self.inner.as_mut().unwrap();
360 24416 : inner.put_value(key, lsn, val, ctx).await
361 24416 : }
362 :
363 116 : pub(crate) async fn finish_with_discard_fn<D, F>(
364 116 : self,
365 116 : tline: &Arc<Timeline>,
366 116 : ctx: &RequestContext,
367 116 : discard_fn: D,
368 116 : ) -> anyhow::Result<Vec<BatchWriterResult>>
369 116 : where
370 116 : D: Fn(&PersistentLayerKey) -> F,
371 116 : F: Future<Output = bool>,
372 116 : {
373 116 : let Self {
374 116 : mut batches, inner, ..
375 116 : } = self;
376 116 : if let Some((start_key, writer)) = inner {
377 92 : if writer.num_keys() != 0 {
378 92 : let end_key = self.last_key_written.next();
379 92 : batches.add_unfinished_delta_writer(
380 92 : writer,
381 92 : start_key..end_key,
382 92 : self.lsn_range.clone(),
383 92 : );
384 92 : }
385 24 : }
386 116 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
387 116 : }
388 :
389 : #[cfg(test)]
390 12 : pub(crate) async fn finish(
391 12 : self,
392 12 : tline: &Arc<Timeline>,
393 12 : ctx: &RequestContext,
394 12 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
395 16 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
396 12 : .await
397 12 : }
398 : }
399 :
400 : #[cfg(test)]
401 : mod tests {
402 : use itertools::Itertools;
403 : use rand::{RngCore, SeedableRng};
404 :
405 : use crate::{
406 : tenant::{
407 : harness::{TenantHarness, TIMELINE_ID},
408 : storage_layer::AsLayerDesc,
409 : },
410 : DEFAULT_PG_VERSION,
411 : };
412 :
413 : use super::*;
414 :
415 40104 : fn get_key(id: u32) -> Key {
416 40104 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
417 40104 : key.field6 = id;
418 40104 : key
419 40104 : }
420 :
421 16 : fn get_img(id: u32) -> Bytes {
422 16 : format!("{id:064}").into()
423 16 : }
424 :
425 40008 : fn get_large_img() -> Bytes {
426 40008 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
427 40008 : let mut data = vec![0; 8192];
428 40008 : rng.fill_bytes(&mut data);
429 40008 : data.into()
430 40008 : }
431 :
432 : #[tokio::test]
433 4 : async fn write_one_image() {
434 4 : let harness = TenantHarness::create("split_writer_write_one_image")
435 4 : .await
436 4 : .unwrap();
437 4 : let (tenant, ctx) = harness.load().await;
438 4 :
439 4 : let tline = tenant
440 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
441 4 : .await
442 4 : .unwrap();
443 4 :
444 4 : let mut image_writer = SplitImageLayerWriter::new(
445 4 : tenant.conf,
446 4 : tline.timeline_id,
447 4 : tenant.tenant_shard_id,
448 4 : get_key(0),
449 4 : Lsn(0x18),
450 4 : 4 * 1024 * 1024,
451 4 : &ctx,
452 4 : )
453 4 : .await
454 4 : .unwrap();
455 4 :
456 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
457 4 : tenant.conf,
458 4 : tline.timeline_id,
459 4 : tenant.tenant_shard_id,
460 4 : Lsn(0x18)..Lsn(0x20),
461 4 : 4 * 1024 * 1024,
462 4 : )
463 4 : .await
464 4 : .unwrap();
465 4 :
466 4 : image_writer
467 4 : .put_image(get_key(0), get_img(0), &ctx)
468 4 : .await
469 4 : .unwrap();
470 4 : let layers = image_writer
471 4 : .finish(&tline, &ctx, get_key(10))
472 4 : .await
473 4 : .unwrap();
474 4 : assert_eq!(layers.len(), 1);
475 4 :
476 4 : delta_writer
477 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
478 4 : .await
479 4 : .unwrap();
480 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
481 4 : assert_eq!(layers.len(), 1);
482 4 : assert_eq!(
483 4 : layers
484 4 : .into_iter()
485 4 : .next()
486 4 : .unwrap()
487 4 : .into_resident_layer()
488 4 : .layer_desc()
489 4 : .key(),
490 4 : PersistentLayerKey {
491 4 : key_range: get_key(0)..get_key(1),
492 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
493 4 : is_delta: true
494 4 : }
495 4 : );
496 4 : }
497 :
498 : #[tokio::test]
499 4 : async fn write_split() {
500 4 : // Test the split writer with retaining all the layers we have produced (discard=false)
501 4 : write_split_helper("split_writer_write_split", false).await;
502 4 : }
503 :
504 : #[tokio::test]
505 4 : async fn write_split_discard() {
506 4 : // Test the split writer with discarding all the layers we have produced (discard=true)
507 4 : write_split_helper("split_writer_write_split_discard", true).await;
508 4 : }
509 :
510 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
511 : /// set to true, all layers will be discarded.
512 8 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
513 8 : let harness = TenantHarness::create(harness_name).await.unwrap();
514 8 : let (tenant, ctx) = harness.load().await;
515 :
516 8 : let tline = tenant
517 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
518 8 : .await
519 8 : .unwrap();
520 :
521 8 : let mut image_writer = SplitImageLayerWriter::new(
522 8 : tenant.conf,
523 8 : tline.timeline_id,
524 8 : tenant.tenant_shard_id,
525 8 : get_key(0),
526 8 : Lsn(0x18),
527 8 : 4 * 1024 * 1024,
528 8 : &ctx,
529 8 : )
530 8 : .await
531 8 : .unwrap();
532 8 : let mut delta_writer = SplitDeltaLayerWriter::new(
533 8 : tenant.conf,
534 8 : tline.timeline_id,
535 8 : tenant.tenant_shard_id,
536 8 : Lsn(0x18)..Lsn(0x20),
537 8 : 4 * 1024 * 1024,
538 8 : )
539 8 : .await
540 8 : .unwrap();
541 : const N: usize = 2000;
542 16008 : for i in 0..N {
543 16000 : let i = i as u32;
544 16000 : image_writer
545 16000 : .put_image(get_key(i), get_large_img(), &ctx)
546 16000 : .await
547 16000 : .unwrap();
548 16000 : delta_writer
549 16000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
550 16000 : .await
551 16000 : .unwrap();
552 : }
553 8 : let image_layers = image_writer
554 32 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
555 8 : .await
556 8 : .unwrap();
557 8 : let delta_layers = delta_writer
558 32 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
559 8 : .await
560 8 : .unwrap();
561 8 : let image_layers = image_layers
562 8 : .into_iter()
563 32 : .map(|x| {
564 32 : if discard {
565 16 : x.into_discarded_layer()
566 : } else {
567 16 : x.into_resident_layer().layer_desc().key()
568 : }
569 32 : })
570 8 : .collect_vec();
571 8 : let delta_layers = delta_layers
572 8 : .into_iter()
573 32 : .map(|x| {
574 32 : if discard {
575 16 : x.into_discarded_layer()
576 : } else {
577 16 : x.into_resident_layer().layer_desc().key()
578 : }
579 32 : })
580 8 : .collect_vec();
581 8 : assert_eq!(image_layers.len(), N / 512 + 1);
582 8 : assert_eq!(delta_layers.len(), N / 512 + 1);
583 8 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
584 8 : assert_eq!(
585 8 : delta_layers.last().unwrap().key_range.end,
586 8 : get_key(N as u32)
587 8 : );
588 32 : for idx in 0..image_layers.len() {
589 32 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
590 32 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
591 32 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
592 32 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
593 32 : if idx > 0 {
594 24 : assert_eq!(
595 24 : image_layers[idx - 1].key_range.end,
596 24 : image_layers[idx].key_range.start
597 24 : );
598 24 : assert_eq!(
599 24 : delta_layers[idx - 1].key_range.end,
600 24 : delta_layers[idx].key_range.start
601 24 : );
602 8 : }
603 : }
604 8 : }
605 :
606 : #[tokio::test]
607 4 : async fn write_large_img() {
608 4 : let harness = TenantHarness::create("split_writer_write_large_img")
609 4 : .await
610 4 : .unwrap();
611 4 : let (tenant, ctx) = harness.load().await;
612 4 :
613 4 : let tline = tenant
614 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
615 4 : .await
616 4 : .unwrap();
617 4 :
618 4 : let mut image_writer = SplitImageLayerWriter::new(
619 4 : tenant.conf,
620 4 : tline.timeline_id,
621 4 : tenant.tenant_shard_id,
622 4 : get_key(0),
623 4 : Lsn(0x18),
624 4 : 4 * 1024,
625 4 : &ctx,
626 4 : )
627 4 : .await
628 4 : .unwrap();
629 4 :
630 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
631 4 : tenant.conf,
632 4 : tline.timeline_id,
633 4 : tenant.tenant_shard_id,
634 4 : Lsn(0x18)..Lsn(0x20),
635 4 : 4 * 1024,
636 4 : )
637 4 : .await
638 4 : .unwrap();
639 4 :
640 4 : image_writer
641 4 : .put_image(get_key(0), get_img(0), &ctx)
642 4 : .await
643 4 : .unwrap();
644 4 : image_writer
645 4 : .put_image(get_key(1), get_large_img(), &ctx)
646 4 : .await
647 4 : .unwrap();
648 4 : let layers = image_writer
649 4 : .finish(&tline, &ctx, get_key(10))
650 4 : .await
651 4 : .unwrap();
652 4 : assert_eq!(layers.len(), 2);
653 4 :
654 4 : delta_writer
655 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
656 4 : .await
657 4 : .unwrap();
658 4 : delta_writer
659 4 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
660 4 : .await
661 4 : .unwrap();
662 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
663 4 : assert_eq!(layers.len(), 2);
664 4 : let mut layers_iter = layers.into_iter();
665 4 : assert_eq!(
666 4 : layers_iter
667 4 : .next()
668 4 : .unwrap()
669 4 : .into_resident_layer()
670 4 : .layer_desc()
671 4 : .key(),
672 4 : PersistentLayerKey {
673 4 : key_range: get_key(0)..get_key(1),
674 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
675 4 : is_delta: true
676 4 : }
677 4 : );
678 4 : assert_eq!(
679 4 : layers_iter
680 4 : .next()
681 4 : .unwrap()
682 4 : .into_resident_layer()
683 4 : .layer_desc()
684 4 : .key(),
685 4 : PersistentLayerKey {
686 4 : key_range: get_key(1)..get_key(2),
687 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
688 4 : is_delta: true
689 4 : }
690 4 : );
691 4 : }
692 :
693 : #[tokio::test]
694 4 : async fn write_split_single_key() {
695 4 : let harness = TenantHarness::create("split_writer_write_split_single_key")
696 4 : .await
697 4 : .unwrap();
698 4 : let (tenant, ctx) = harness.load().await;
699 4 :
700 4 : let tline = tenant
701 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
702 4 : .await
703 4 : .unwrap();
704 4 :
705 4 : const N: usize = 2000;
706 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
707 4 : tenant.conf,
708 4 : tline.timeline_id,
709 4 : tenant.tenant_shard_id,
710 4 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
711 4 : 4 * 1024 * 1024,
712 4 : )
713 4 : .await
714 4 : .unwrap();
715 4 :
716 8004 : for i in 0..N {
717 8000 : let i = i as u32;
718 8000 : delta_writer
719 8000 : .put_value(
720 8000 : get_key(0),
721 8000 : Lsn(i as u64 * 16 + 0x10),
722 8000 : Value::Image(get_large_img()),
723 8000 : &ctx,
724 8000 : )
725 8000 : .await
726 8000 : .unwrap();
727 4 : }
728 4 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
729 4 : assert_eq!(delta_layers.len(), 1);
730 4 : let delta_layer = delta_layers
731 4 : .into_iter()
732 4 : .next()
733 4 : .unwrap()
734 4 : .into_resident_layer();
735 4 : assert_eq!(
736 4 : delta_layer.layer_desc().key(),
737 4 : PersistentLayerKey {
738 4 : key_range: get_key(0)..get_key(1),
739 4 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
740 4 : is_delta: true
741 4 : }
742 4 : );
743 4 : }
744 : }
|