Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
9 : use pageserver_api::value::Value;
10 :
11 : use super::layer::S3_UPLOAD_LIMIT;
12 : use super::{
13 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
14 : };
15 :
16 : pub(crate) enum BatchWriterResult {
17 : Produced(ResidentLayer),
18 : Discarded(PersistentLayerKey),
19 : }
20 :
21 : #[cfg(test)]
22 : impl BatchWriterResult {
23 48 : fn into_resident_layer(self) -> ResidentLayer {
24 48 : match self {
25 48 : BatchWriterResult::Produced(layer) => layer,
26 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
27 : }
28 48 : }
29 :
30 32 : fn into_discarded_layer(self) -> PersistentLayerKey {
31 32 : match self {
32 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
33 32 : BatchWriterResult::Discarded(layer) => layer,
34 32 : }
35 32 : }
36 : }
37 :
38 : enum LayerWriterWrapper {
39 : Image(ImageLayerWriter),
40 : Delta(DeltaLayerWriter),
41 : }
42 :
43 : /// An layer writer that takes unfinished layers and finish them atomically.
44 : #[must_use]
45 : pub struct BatchLayerWriter {
46 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
47 : conf: &'static PageServerConf,
48 : }
49 :
50 : impl BatchLayerWriter {
51 1364 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
52 1364 : Ok(Self {
53 1364 : generated_layer_writers: Vec::new(),
54 1364 : conf,
55 1364 : })
56 1364 : }
57 :
58 608 : pub fn add_unfinished_image_writer(
59 608 : &mut self,
60 608 : writer: ImageLayerWriter,
61 608 : key_range: Range<Key>,
62 608 : lsn: Lsn,
63 608 : ) {
64 608 : self.generated_layer_writers.push((
65 608 : LayerWriterWrapper::Image(writer),
66 608 : PersistentLayerKey {
67 608 : key_range,
68 608 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
69 608 : is_delta: false,
70 608 : },
71 608 : ));
72 608 : }
73 :
74 120 : pub fn add_unfinished_delta_writer(
75 120 : &mut self,
76 120 : writer: DeltaLayerWriter,
77 120 : key_range: Range<Key>,
78 120 : lsn_range: Range<Lsn>,
79 120 : ) {
80 120 : self.generated_layer_writers.push((
81 120 : LayerWriterWrapper::Delta(writer),
82 120 : PersistentLayerKey {
83 120 : key_range,
84 120 : lsn_range,
85 120 : is_delta: true,
86 120 : },
87 120 : ));
88 120 : }
89 :
90 1140 : pub(crate) async fn finish(
91 1140 : self,
92 1140 : tline: &Arc<Timeline>,
93 1140 : ctx: &RequestContext,
94 1140 : ) -> anyhow::Result<Vec<ResidentLayer>> {
95 1140 : let res = self
96 1140 : .finish_with_discard_fn(tline, ctx, |_| async { false })
97 1140 : .await?;
98 1140 : let mut output = Vec::new();
99 1628 : for r in res {
100 488 : if let BatchWriterResult::Produced(layer) = r {
101 488 : output.push(layer);
102 488 : }
103 : }
104 1140 : Ok(output)
105 1140 : }
106 :
107 1348 : pub(crate) async fn finish_with_discard_fn<D, F>(
108 1348 : self,
109 1348 : tline: &Arc<Timeline>,
110 1348 : ctx: &RequestContext,
111 1348 : discard_fn: D,
112 1348 : ) -> anyhow::Result<Vec<BatchWriterResult>>
113 1348 : where
114 1348 : D: Fn(&PersistentLayerKey) -> F,
115 1348 : F: Future<Output = bool>,
116 1348 : {
117 1348 : let Self {
118 1348 : generated_layer_writers,
119 1348 : ..
120 1348 : } = self;
121 1348 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
122 0 : for produced_layer in generated_layers {
123 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
124 0 : let layer: Layer = resident_layer.into();
125 0 : layer.delete_on_drop();
126 0 : }
127 : }
128 0 : };
129 : // BEGIN: catch every error and do the recovery in the below section
130 1348 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
131 2076 : for (inner, layer_key) in generated_layer_writers {
132 728 : if discard_fn(&layer_key).await {
133 76 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
134 76 : } else {
135 652 : let res = match inner {
136 76 : LayerWriterWrapper::Delta(writer) => {
137 76 : writer.finish(layer_key.key_range.end, ctx).await
138 : }
139 576 : LayerWriterWrapper::Image(writer) => {
140 576 : writer
141 576 : .finish_with_end_key(layer_key.key_range.end, ctx)
142 576 : .await
143 : }
144 : };
145 652 : let layer = match res {
146 652 : Ok((desc, path)) => {
147 652 : match Layer::finish_creating(self.conf, tline, desc, &path) {
148 652 : Ok(layer) => layer,
149 0 : Err(e) => {
150 0 : tokio::fs::remove_file(&path).await.ok();
151 0 : clean_up_layers(generated_layers);
152 0 : return Err(e);
153 : }
154 : }
155 : }
156 0 : Err(e) => {
157 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
158 0 : // so we don't need to remove the layer we just failed to create by ourselves.
159 0 : clean_up_layers(generated_layers);
160 0 : return Err(e);
161 : }
162 : };
163 652 : generated_layers.push(BatchWriterResult::Produced(layer));
164 : }
165 : }
166 : // END: catch every error and do the recovery in the above section
167 1348 : Ok(generated_layers)
168 1348 : }
169 :
170 4 : pub fn pending_layer_num(&self) -> usize {
171 4 : self.generated_layer_writers.len()
172 4 : }
173 : }
174 :
175 : /// An image writer that takes images and produces multiple image layers.
176 : #[must_use]
177 : pub struct SplitImageLayerWriter {
178 : inner: ImageLayerWriter,
179 : target_layer_size: u64,
180 : lsn: Lsn,
181 : conf: &'static PageServerConf,
182 : timeline_id: TimelineId,
183 : tenant_shard_id: TenantShardId,
184 : batches: BatchLayerWriter,
185 : start_key: Key,
186 : }
187 :
188 : impl SplitImageLayerWriter {
189 100 : pub async fn new(
190 100 : conf: &'static PageServerConf,
191 100 : timeline_id: TimelineId,
192 100 : tenant_shard_id: TenantShardId,
193 100 : start_key: Key,
194 100 : lsn: Lsn,
195 100 : target_layer_size: u64,
196 100 : ctx: &RequestContext,
197 100 : ) -> anyhow::Result<Self> {
198 100 : Ok(Self {
199 100 : target_layer_size,
200 100 : inner: ImageLayerWriter::new(
201 100 : conf,
202 100 : timeline_id,
203 100 : tenant_shard_id,
204 100 : &(start_key..Key::MAX),
205 100 : lsn,
206 100 : ctx,
207 100 : )
208 100 : .await?,
209 100 : conf,
210 100 : timeline_id,
211 100 : tenant_shard_id,
212 100 : batches: BatchLayerWriter::new(conf).await?,
213 100 : lsn,
214 100 : start_key,
215 : })
216 100 : }
217 :
218 17180 : pub async fn put_image(
219 17180 : &mut self,
220 17180 : key: Key,
221 17180 : img: Bytes,
222 17180 : ctx: &RequestContext,
223 17180 : ) -> anyhow::Result<()> {
224 17180 : // The current estimation is an upper bound of the space that the key/image could take
225 17180 : // because we did not consider compression in this estimation. The resulting image layer
226 17180 : // could be smaller than the target size.
227 17180 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
228 17180 : if self.inner.num_keys() >= 1
229 17080 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
230 : {
231 28 : let next_image_writer = ImageLayerWriter::new(
232 28 : self.conf,
233 28 : self.timeline_id,
234 28 : self.tenant_shard_id,
235 28 : &(key..Key::MAX),
236 28 : self.lsn,
237 28 : ctx,
238 28 : )
239 28 : .await?;
240 28 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
241 28 : self.batches.add_unfinished_image_writer(
242 28 : prev_image_writer,
243 28 : self.start_key..key,
244 28 : self.lsn,
245 28 : );
246 28 : self.start_key = key;
247 17152 : }
248 17180 : self.inner.put_image(key, img, ctx).await
249 17180 : }
250 :
251 92 : pub(crate) async fn finish_with_discard_fn<D, F>(
252 92 : self,
253 92 : tline: &Arc<Timeline>,
254 92 : ctx: &RequestContext,
255 92 : end_key: Key,
256 92 : discard_fn: D,
257 92 : ) -> anyhow::Result<Vec<BatchWriterResult>>
258 92 : where
259 92 : D: Fn(&PersistentLayerKey) -> F,
260 92 : F: Future<Output = bool>,
261 92 : {
262 92 : let Self {
263 92 : mut batches, inner, ..
264 92 : } = self;
265 92 : if inner.num_keys() != 0 {
266 92 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
267 92 : }
268 92 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
269 92 : }
270 :
271 : #[cfg(test)]
272 8 : pub(crate) async fn finish(
273 8 : self,
274 8 : tline: &Arc<Timeline>,
275 8 : ctx: &RequestContext,
276 8 : end_key: Key,
277 8 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
278 12 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
279 8 : .await
280 8 : }
281 : }
282 :
283 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
284 : ///
285 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
286 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
287 : /// will split them into multiple files based on size.
288 : #[must_use]
289 : pub struct SplitDeltaLayerWriter {
290 : inner: Option<(Key, DeltaLayerWriter)>,
291 : target_layer_size: u64,
292 : conf: &'static PageServerConf,
293 : timeline_id: TimelineId,
294 : tenant_shard_id: TenantShardId,
295 : lsn_range: Range<Lsn>,
296 : last_key_written: Key,
297 : batches: BatchLayerWriter,
298 : }
299 :
300 : impl SplitDeltaLayerWriter {
301 124 : pub async fn new(
302 124 : conf: &'static PageServerConf,
303 124 : timeline_id: TimelineId,
304 124 : tenant_shard_id: TenantShardId,
305 124 : lsn_range: Range<Lsn>,
306 124 : target_layer_size: u64,
307 124 : ) -> anyhow::Result<Self> {
308 124 : Ok(Self {
309 124 : target_layer_size,
310 124 : inner: None,
311 124 : conf,
312 124 : timeline_id,
313 124 : tenant_shard_id,
314 124 : lsn_range,
315 124 : last_key_written: Key::MIN,
316 124 : batches: BatchLayerWriter::new(conf).await?,
317 : })
318 124 : }
319 :
320 24416 : pub async fn put_value(
321 24416 : &mut self,
322 24416 : key: Key,
323 24416 : lsn: Lsn,
324 24416 : val: Value,
325 24416 : ctx: &RequestContext,
326 24416 : ) -> anyhow::Result<()> {
327 24416 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
328 24416 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
329 24416 : //
330 24416 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
331 24416 : // strategy. https://github.com/neondatabase/neon/issues/8837
332 24416 :
333 24416 : if self.inner.is_none() {
334 100 : self.inner = Some((
335 100 : key,
336 100 : DeltaLayerWriter::new(
337 100 : self.conf,
338 100 : self.timeline_id,
339 100 : self.tenant_shard_id,
340 100 : key,
341 100 : self.lsn_range.clone(),
342 100 : ctx,
343 100 : )
344 100 : .await?,
345 : ));
346 24316 : }
347 24416 : let (_, inner) = self.inner.as_mut().unwrap();
348 24416 :
349 24416 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
350 24416 : if inner.num_keys() >= 1
351 24316 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
352 : {
353 5992 : if key != self.last_key_written {
354 28 : let next_delta_writer = DeltaLayerWriter::new(
355 28 : self.conf,
356 28 : self.timeline_id,
357 28 : self.tenant_shard_id,
358 28 : key,
359 28 : self.lsn_range.clone(),
360 28 : ctx,
361 28 : )
362 28 : .await?;
363 28 : let (start_key, prev_delta_writer) =
364 28 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
365 28 : self.batches.add_unfinished_delta_writer(
366 28 : prev_delta_writer,
367 28 : start_key..key,
368 28 : self.lsn_range.clone(),
369 28 : );
370 5964 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
371 : // We have to produce a very large file b/c a key is updated too often.
372 0 : anyhow::bail!(
373 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
374 0 : key,
375 0 : inner.estimated_size()
376 0 : );
377 5964 : }
378 18424 : }
379 24416 : self.last_key_written = key;
380 24416 : let (_, inner) = self.inner.as_mut().unwrap();
381 24416 : inner.put_value(key, lsn, val, ctx).await
382 24416 : }
383 :
384 116 : pub(crate) async fn finish_with_discard_fn<D, F>(
385 116 : self,
386 116 : tline: &Arc<Timeline>,
387 116 : ctx: &RequestContext,
388 116 : discard_fn: D,
389 116 : ) -> anyhow::Result<Vec<BatchWriterResult>>
390 116 : where
391 116 : D: Fn(&PersistentLayerKey) -> F,
392 116 : F: Future<Output = bool>,
393 116 : {
394 116 : let Self {
395 116 : mut batches, inner, ..
396 116 : } = self;
397 116 : if let Some((start_key, writer)) = inner {
398 92 : if writer.num_keys() != 0 {
399 92 : let end_key = self.last_key_written.next();
400 92 : batches.add_unfinished_delta_writer(
401 92 : writer,
402 92 : start_key..end_key,
403 92 : self.lsn_range.clone(),
404 92 : );
405 92 : }
406 24 : }
407 116 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
408 116 : }
409 :
410 : #[cfg(test)]
411 12 : pub(crate) async fn finish(
412 12 : self,
413 12 : tline: &Arc<Timeline>,
414 12 : ctx: &RequestContext,
415 12 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
416 16 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
417 12 : .await
418 12 : }
419 : }
420 :
421 : #[cfg(test)]
422 : mod tests {
423 : use itertools::Itertools;
424 : use rand::{RngCore, SeedableRng};
425 :
426 : use crate::{
427 : tenant::{
428 : harness::{TenantHarness, TIMELINE_ID},
429 : storage_layer::AsLayerDesc,
430 : },
431 : DEFAULT_PG_VERSION,
432 : };
433 :
434 : use super::*;
435 :
436 40104 : fn get_key(id: u32) -> Key {
437 40104 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
438 40104 : key.field6 = id;
439 40104 : key
440 40104 : }
441 :
442 16 : fn get_img(id: u32) -> Bytes {
443 16 : format!("{id:064}").into()
444 16 : }
445 :
446 40008 : fn get_large_img() -> Bytes {
447 40008 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
448 40008 : let mut data = vec![0; 8192];
449 40008 : rng.fill_bytes(&mut data);
450 40008 : data.into()
451 40008 : }
452 :
453 : #[tokio::test]
454 4 : async fn write_one_image() {
455 4 : let harness = TenantHarness::create("split_writer_write_one_image")
456 4 : .await
457 4 : .unwrap();
458 4 : let (tenant, ctx) = harness.load().await;
459 4 :
460 4 : let tline = tenant
461 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
462 4 : .await
463 4 : .unwrap();
464 4 :
465 4 : let mut image_writer = SplitImageLayerWriter::new(
466 4 : tenant.conf,
467 4 : tline.timeline_id,
468 4 : tenant.tenant_shard_id,
469 4 : get_key(0),
470 4 : Lsn(0x18),
471 4 : 4 * 1024 * 1024,
472 4 : &ctx,
473 4 : )
474 4 : .await
475 4 : .unwrap();
476 4 :
477 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
478 4 : tenant.conf,
479 4 : tline.timeline_id,
480 4 : tenant.tenant_shard_id,
481 4 : Lsn(0x18)..Lsn(0x20),
482 4 : 4 * 1024 * 1024,
483 4 : )
484 4 : .await
485 4 : .unwrap();
486 4 :
487 4 : image_writer
488 4 : .put_image(get_key(0), get_img(0), &ctx)
489 4 : .await
490 4 : .unwrap();
491 4 : let layers = image_writer
492 4 : .finish(&tline, &ctx, get_key(10))
493 4 : .await
494 4 : .unwrap();
495 4 : assert_eq!(layers.len(), 1);
496 4 :
497 4 : delta_writer
498 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
499 4 : .await
500 4 : .unwrap();
501 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
502 4 : assert_eq!(layers.len(), 1);
503 4 : assert_eq!(
504 4 : layers
505 4 : .into_iter()
506 4 : .next()
507 4 : .unwrap()
508 4 : .into_resident_layer()
509 4 : .layer_desc()
510 4 : .key(),
511 4 : PersistentLayerKey {
512 4 : key_range: get_key(0)..get_key(1),
513 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
514 4 : is_delta: true
515 4 : }
516 4 : );
517 4 : }
518 :
519 : #[tokio::test]
520 4 : async fn write_split() {
521 4 : // Test the split writer with retaining all the layers we have produced (discard=false)
522 4 : write_split_helper("split_writer_write_split", false).await;
523 4 : }
524 :
525 : #[tokio::test]
526 4 : async fn write_split_discard() {
527 4 : // Test the split writer with discarding all the layers we have produced (discard=true)
528 4 : write_split_helper("split_writer_write_split_discard", true).await;
529 4 : }
530 :
531 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
532 : /// set to true, all layers will be discarded.
533 8 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
534 8 : let harness = TenantHarness::create(harness_name).await.unwrap();
535 8 : let (tenant, ctx) = harness.load().await;
536 :
537 8 : let tline = tenant
538 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
539 8 : .await
540 8 : .unwrap();
541 :
542 8 : let mut image_writer = SplitImageLayerWriter::new(
543 8 : tenant.conf,
544 8 : tline.timeline_id,
545 8 : tenant.tenant_shard_id,
546 8 : get_key(0),
547 8 : Lsn(0x18),
548 8 : 4 * 1024 * 1024,
549 8 : &ctx,
550 8 : )
551 8 : .await
552 8 : .unwrap();
553 8 : let mut delta_writer = SplitDeltaLayerWriter::new(
554 8 : tenant.conf,
555 8 : tline.timeline_id,
556 8 : tenant.tenant_shard_id,
557 8 : Lsn(0x18)..Lsn(0x20),
558 8 : 4 * 1024 * 1024,
559 8 : )
560 8 : .await
561 8 : .unwrap();
562 : const N: usize = 2000;
563 16008 : for i in 0..N {
564 16000 : let i = i as u32;
565 16000 : image_writer
566 16000 : .put_image(get_key(i), get_large_img(), &ctx)
567 16000 : .await
568 16000 : .unwrap();
569 16000 : delta_writer
570 16000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
571 16000 : .await
572 16000 : .unwrap();
573 : }
574 8 : let image_layers = image_writer
575 32 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
576 8 : .await
577 8 : .unwrap();
578 8 : let delta_layers = delta_writer
579 32 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
580 8 : .await
581 8 : .unwrap();
582 8 : let image_layers = image_layers
583 8 : .into_iter()
584 32 : .map(|x| {
585 32 : if discard {
586 16 : x.into_discarded_layer()
587 : } else {
588 16 : x.into_resident_layer().layer_desc().key()
589 : }
590 32 : })
591 8 : .collect_vec();
592 8 : let delta_layers = delta_layers
593 8 : .into_iter()
594 32 : .map(|x| {
595 32 : if discard {
596 16 : x.into_discarded_layer()
597 : } else {
598 16 : x.into_resident_layer().layer_desc().key()
599 : }
600 32 : })
601 8 : .collect_vec();
602 8 : assert_eq!(image_layers.len(), N / 512 + 1);
603 8 : assert_eq!(delta_layers.len(), N / 512 + 1);
604 8 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
605 8 : assert_eq!(
606 8 : delta_layers.last().unwrap().key_range.end,
607 8 : get_key(N as u32)
608 8 : );
609 32 : for idx in 0..image_layers.len() {
610 32 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
611 32 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
612 32 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
613 32 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
614 32 : if idx > 0 {
615 24 : assert_eq!(
616 24 : image_layers[idx - 1].key_range.end,
617 24 : image_layers[idx].key_range.start
618 24 : );
619 24 : assert_eq!(
620 24 : delta_layers[idx - 1].key_range.end,
621 24 : delta_layers[idx].key_range.start
622 24 : );
623 8 : }
624 : }
625 8 : }
626 :
627 : #[tokio::test]
628 4 : async fn write_large_img() {
629 4 : let harness = TenantHarness::create("split_writer_write_large_img")
630 4 : .await
631 4 : .unwrap();
632 4 : let (tenant, ctx) = harness.load().await;
633 4 :
634 4 : let tline = tenant
635 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
636 4 : .await
637 4 : .unwrap();
638 4 :
639 4 : let mut image_writer = SplitImageLayerWriter::new(
640 4 : tenant.conf,
641 4 : tline.timeline_id,
642 4 : tenant.tenant_shard_id,
643 4 : get_key(0),
644 4 : Lsn(0x18),
645 4 : 4 * 1024,
646 4 : &ctx,
647 4 : )
648 4 : .await
649 4 : .unwrap();
650 4 :
651 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
652 4 : tenant.conf,
653 4 : tline.timeline_id,
654 4 : tenant.tenant_shard_id,
655 4 : Lsn(0x18)..Lsn(0x20),
656 4 : 4 * 1024,
657 4 : )
658 4 : .await
659 4 : .unwrap();
660 4 :
661 4 : image_writer
662 4 : .put_image(get_key(0), get_img(0), &ctx)
663 4 : .await
664 4 : .unwrap();
665 4 : image_writer
666 4 : .put_image(get_key(1), get_large_img(), &ctx)
667 4 : .await
668 4 : .unwrap();
669 4 : let layers = image_writer
670 4 : .finish(&tline, &ctx, get_key(10))
671 4 : .await
672 4 : .unwrap();
673 4 : assert_eq!(layers.len(), 2);
674 4 :
675 4 : delta_writer
676 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
677 4 : .await
678 4 : .unwrap();
679 4 : delta_writer
680 4 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
681 4 : .await
682 4 : .unwrap();
683 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
684 4 : assert_eq!(layers.len(), 2);
685 4 : let mut layers_iter = layers.into_iter();
686 4 : assert_eq!(
687 4 : layers_iter
688 4 : .next()
689 4 : .unwrap()
690 4 : .into_resident_layer()
691 4 : .layer_desc()
692 4 : .key(),
693 4 : PersistentLayerKey {
694 4 : key_range: get_key(0)..get_key(1),
695 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
696 4 : is_delta: true
697 4 : }
698 4 : );
699 4 : assert_eq!(
700 4 : layers_iter
701 4 : .next()
702 4 : .unwrap()
703 4 : .into_resident_layer()
704 4 : .layer_desc()
705 4 : .key(),
706 4 : PersistentLayerKey {
707 4 : key_range: get_key(1)..get_key(2),
708 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
709 4 : is_delta: true
710 4 : }
711 4 : );
712 4 : }
713 :
714 : #[tokio::test]
715 4 : async fn write_split_single_key() {
716 4 : let harness = TenantHarness::create("split_writer_write_split_single_key")
717 4 : .await
718 4 : .unwrap();
719 4 : let (tenant, ctx) = harness.load().await;
720 4 :
721 4 : let tline = tenant
722 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
723 4 : .await
724 4 : .unwrap();
725 4 :
726 4 : const N: usize = 2000;
727 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
728 4 : tenant.conf,
729 4 : tline.timeline_id,
730 4 : tenant.tenant_shard_id,
731 4 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
732 4 : 4 * 1024 * 1024,
733 4 : )
734 4 : .await
735 4 : .unwrap();
736 4 :
737 8004 : for i in 0..N {
738 8000 : let i = i as u32;
739 8000 : delta_writer
740 8000 : .put_value(
741 8000 : get_key(0),
742 8000 : Lsn(i as u64 * 16 + 0x10),
743 8000 : Value::Image(get_large_img()),
744 8000 : &ctx,
745 8000 : )
746 8000 : .await
747 8000 : .unwrap();
748 4 : }
749 4 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
750 4 : assert_eq!(delta_layers.len(), 1);
751 4 : let delta_layer = delta_layers
752 4 : .into_iter()
753 4 : .next()
754 4 : .unwrap()
755 4 : .into_resident_layer();
756 4 : assert_eq!(
757 4 : delta_layer.layer_desc().key(),
758 4 : PersistentLayerKey {
759 4 : key_range: get_key(0)..get_key(1),
760 4 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
761 4 : is_delta: true
762 4 : }
763 4 : );
764 4 : }
765 : }
|