Line data Source code
1 : use std::future::Future;
2 : use std::ops::Range;
3 : use std::sync::Arc;
4 :
5 : use bytes::Bytes;
6 : use pageserver_api::key::{KEY_SIZE, Key};
7 : use tokio_util::sync::CancellationToken;
8 : use utils::id::TimelineId;
9 : use utils::lsn::Lsn;
10 : use utils::shard::TenantShardId;
11 : use wal_decoder::models::value::Value;
12 :
13 : use super::errors::PutError;
14 : use super::layer::S3_UPLOAD_LIMIT;
15 : use super::{
16 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
17 : };
18 : use crate::config::PageServerConf;
19 : use crate::context::RequestContext;
20 : use crate::tenant::Timeline;
21 : use crate::tenant::storage_layer::Layer;
22 :
23 : pub(crate) enum BatchWriterResult {
24 : Produced(ResidentLayer),
25 : Discarded(PersistentLayerKey),
26 : }
27 :
28 : #[cfg(test)]
29 : impl BatchWriterResult {
30 12 : fn into_resident_layer(self) -> ResidentLayer {
31 12 : match self {
32 12 : BatchWriterResult::Produced(layer) => layer,
33 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
34 : }
35 12 : }
36 :
37 8 : fn into_discarded_layer(self) -> PersistentLayerKey {
38 8 : match self {
39 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
40 8 : BatchWriterResult::Discarded(layer) => layer,
41 : }
42 8 : }
43 : }
44 :
45 : enum LayerWriterWrapper {
46 : Image(ImageLayerWriter),
47 : Delta(DeltaLayerWriter),
48 : }
49 :
50 : /// An layer writer that takes unfinished layers and finish them atomically.
51 : #[must_use]
52 : pub struct BatchLayerWriter {
53 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
54 : conf: &'static PageServerConf,
55 : }
56 :
57 : impl BatchLayerWriter {
58 249 : pub fn new(conf: &'static PageServerConf) -> Self {
59 249 : Self {
60 249 : generated_layer_writers: Vec::new(),
61 249 : conf,
62 249 : }
63 249 : }
64 :
65 160 : pub fn add_unfinished_image_writer(
66 160 : &mut self,
67 160 : writer: ImageLayerWriter,
68 160 : key_range: Range<Key>,
69 160 : lsn: Lsn,
70 160 : ) {
71 160 : self.generated_layer_writers.push((
72 160 : LayerWriterWrapper::Image(writer),
73 160 : PersistentLayerKey {
74 160 : key_range,
75 160 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
76 160 : is_delta: false,
77 160 : },
78 160 : ));
79 160 : }
80 :
81 30 : pub fn add_unfinished_delta_writer(
82 30 : &mut self,
83 30 : writer: DeltaLayerWriter,
84 30 : key_range: Range<Key>,
85 30 : lsn_range: Range<Lsn>,
86 30 : ) {
87 30 : self.generated_layer_writers.push((
88 30 : LayerWriterWrapper::Delta(writer),
89 30 : PersistentLayerKey {
90 30 : key_range,
91 30 : lsn_range,
92 30 : is_delta: true,
93 30 : },
94 30 : ));
95 30 : }
96 :
97 191 : pub(crate) async fn finish(
98 191 : self,
99 191 : tline: &Arc<Timeline>,
100 191 : ctx: &RequestContext,
101 191 : ) -> anyhow::Result<Vec<ResidentLayer>> {
102 191 : let res = self
103 260 : .finish_with_discard_fn(tline, ctx, |_| async { false })
104 191 : .await?;
105 191 : let mut output = Vec::new();
106 321 : for r in res {
107 130 : if let BatchWriterResult::Produced(layer) = r {
108 130 : output.push(layer);
109 130 : }
110 : }
111 191 : Ok(output)
112 191 : }
113 :
114 243 : pub(crate) async fn finish_with_discard_fn<D, F>(
115 243 : self,
116 243 : tline: &Arc<Timeline>,
117 243 : ctx: &RequestContext,
118 243 : discard_fn: D,
119 243 : ) -> anyhow::Result<Vec<BatchWriterResult>>
120 243 : where
121 243 : D: Fn(&PersistentLayerKey) -> F,
122 243 : F: Future<Output = bool>,
123 243 : {
124 : let Self {
125 243 : generated_layer_writers,
126 : ..
127 243 : } = self;
128 243 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
129 0 : for produced_layer in generated_layers {
130 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
131 0 : let layer: Layer = resident_layer.into();
132 0 : layer.delete_on_drop();
133 0 : }
134 : }
135 0 : };
136 : // BEGIN: catch every error and do the recovery in the below section
137 243 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
138 433 : for (inner, layer_key) in generated_layer_writers {
139 190 : if discard_fn(&layer_key).await {
140 19 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
141 19 : } else {
142 171 : let res = match inner {
143 19 : LayerWriterWrapper::Delta(writer) => {
144 19 : writer.finish(layer_key.key_range.end, ctx).await
145 : }
146 152 : LayerWriterWrapper::Image(writer) => {
147 152 : writer
148 152 : .finish_with_end_key(layer_key.key_range.end, ctx)
149 152 : .await
150 : }
151 : };
152 171 : let layer = match res {
153 171 : Ok((desc, path)) => {
154 171 : match Layer::finish_creating(self.conf, tline, desc, &path) {
155 171 : Ok(layer) => layer,
156 0 : Err(e) => {
157 0 : tokio::fs::remove_file(&path).await.ok();
158 0 : clean_up_layers(generated_layers);
159 0 : return Err(e);
160 : }
161 : }
162 : }
163 0 : Err(e) => {
164 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
165 : // so we don't need to remove the layer we just failed to create by ourselves.
166 0 : clean_up_layers(generated_layers);
167 0 : return Err(e);
168 : }
169 : };
170 171 : generated_layers.push(BatchWriterResult::Produced(layer));
171 : }
172 : }
173 : // END: catch every error and do the recovery in the above section
174 243 : Ok(generated_layers)
175 243 : }
176 :
177 0 : pub fn pending_layer_num(&self) -> usize {
178 0 : self.generated_layer_writers.len()
179 0 : }
180 : }
181 :
182 : /// An image writer that takes images and produces multiple image layers.
183 : #[must_use]
184 : pub struct SplitImageLayerWriter<'a> {
185 : inner: Option<ImageLayerWriter>,
186 : target_layer_size: u64,
187 : lsn: Lsn,
188 : conf: &'static PageServerConf,
189 : timeline_id: TimelineId,
190 : tenant_shard_id: TenantShardId,
191 : batches: BatchLayerWriter,
192 : start_key: Key,
193 : gate: &'a utils::sync::gate::Gate,
194 : cancel: CancellationToken,
195 : }
196 :
197 : impl<'a> SplitImageLayerWriter<'a> {
198 : #[allow(clippy::too_many_arguments)]
199 26 : pub fn new(
200 26 : conf: &'static PageServerConf,
201 26 : timeline_id: TimelineId,
202 26 : tenant_shard_id: TenantShardId,
203 26 : start_key: Key,
204 26 : lsn: Lsn,
205 26 : target_layer_size: u64,
206 26 : gate: &'a utils::sync::gate::Gate,
207 26 : cancel: CancellationToken,
208 26 : ) -> Self {
209 26 : Self {
210 26 : target_layer_size,
211 26 : inner: None,
212 26 : conf,
213 26 : timeline_id,
214 26 : tenant_shard_id,
215 26 : batches: BatchLayerWriter::new(conf),
216 26 : lsn,
217 26 : start_key,
218 26 : gate,
219 26 : cancel,
220 26 : }
221 26 : }
222 :
223 4303 : pub async fn put_image(
224 4303 : &mut self,
225 4303 : key: Key,
226 4303 : img: Bytes,
227 4303 : ctx: &RequestContext,
228 4303 : ) -> Result<(), PutError> {
229 4303 : if self.inner.is_none() {
230 26 : self.inner = Some(
231 26 : ImageLayerWriter::new(
232 26 : self.conf,
233 26 : self.timeline_id,
234 26 : self.tenant_shard_id,
235 26 : &(self.start_key..Key::MAX),
236 26 : self.lsn,
237 26 : self.gate,
238 26 : self.cancel.clone(),
239 26 : ctx,
240 26 : )
241 26 : .await
242 26 : .map_err(PutError::Other)?,
243 : );
244 4277 : }
245 :
246 4303 : let inner = self.inner.as_mut().unwrap();
247 :
248 : // The current estimation is an upper bound of the space that the key/image could take
249 : // because we did not consider compression in this estimation. The resulting image layer
250 : // could be smaller than the target size.
251 4303 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
252 4303 : if inner.num_keys() >= 1
253 4277 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
254 : {
255 7 : let next_image_writer = ImageLayerWriter::new(
256 7 : self.conf,
257 7 : self.timeline_id,
258 7 : self.tenant_shard_id,
259 7 : &(key..Key::MAX),
260 7 : self.lsn,
261 7 : self.gate,
262 7 : self.cancel.clone(),
263 7 : ctx,
264 7 : )
265 7 : .await
266 7 : .map_err(PutError::Other)?;
267 7 : let prev_image_writer = std::mem::replace(inner, next_image_writer);
268 7 : self.batches.add_unfinished_image_writer(
269 7 : prev_image_writer,
270 7 : self.start_key..key,
271 7 : self.lsn,
272 : );
273 7 : self.start_key = key;
274 4296 : }
275 4303 : inner.put_image(key, img, ctx).await
276 4303 : }
277 :
278 23 : pub(crate) async fn finish_with_discard_fn<D, F>(
279 23 : self,
280 23 : tline: &Arc<Timeline>,
281 23 : ctx: &RequestContext,
282 23 : end_key: Key,
283 23 : discard_fn: D,
284 23 : ) -> anyhow::Result<Vec<BatchWriterResult>>
285 23 : where
286 23 : D: Fn(&PersistentLayerKey) -> F,
287 23 : F: Future<Output = bool>,
288 23 : {
289 23 : let Self {
290 23 : mut batches, inner, ..
291 23 : } = self;
292 23 : if let Some(inner) = inner {
293 23 : if inner.num_keys() != 0 {
294 23 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
295 23 : }
296 0 : }
297 23 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
298 23 : }
299 :
300 : #[cfg(test)]
301 2 : pub(crate) async fn finish(
302 2 : self,
303 2 : tline: &Arc<Timeline>,
304 2 : ctx: &RequestContext,
305 2 : end_key: Key,
306 2 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
307 6 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
308 2 : .await
309 2 : }
310 : }
311 :
312 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
313 : ///
314 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
315 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
316 : /// will split them into multiple files based on size.
317 : #[must_use]
318 : pub struct SplitDeltaLayerWriter<'a> {
319 : inner: Option<(Key, DeltaLayerWriter)>,
320 : target_layer_size: u64,
321 : conf: &'static PageServerConf,
322 : timeline_id: TimelineId,
323 : tenant_shard_id: TenantShardId,
324 : lsn_range: Range<Lsn>,
325 : last_key_written: Key,
326 : batches: BatchLayerWriter,
327 : gate: &'a utils::sync::gate::Gate,
328 : cancel: CancellationToken,
329 : }
330 :
331 : impl<'a> SplitDeltaLayerWriter<'a> {
332 32 : pub fn new(
333 32 : conf: &'static PageServerConf,
334 32 : timeline_id: TimelineId,
335 32 : tenant_shard_id: TenantShardId,
336 32 : lsn_range: Range<Lsn>,
337 32 : target_layer_size: u64,
338 32 : gate: &'a utils::sync::gate::Gate,
339 32 : cancel: CancellationToken,
340 32 : ) -> Self {
341 32 : Self {
342 32 : target_layer_size,
343 32 : inner: None,
344 32 : conf,
345 32 : timeline_id,
346 32 : tenant_shard_id,
347 32 : lsn_range,
348 32 : last_key_written: Key::MIN,
349 32 : batches: BatchLayerWriter::new(conf),
350 32 : gate,
351 32 : cancel,
352 32 : }
353 32 : }
354 :
355 6104 : pub async fn put_value(
356 6104 : &mut self,
357 6104 : key: Key,
358 6104 : lsn: Lsn,
359 6104 : val: Value,
360 6104 : ctx: &RequestContext,
361 6104 : ) -> Result<(), PutError> {
362 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
363 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
364 : //
365 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
366 : // strategy. https://github.com/neondatabase/neon/issues/8837
367 :
368 6104 : if self.inner.is_none() {
369 25 : self.inner = Some((
370 25 : key,
371 25 : DeltaLayerWriter::new(
372 25 : self.conf,
373 25 : self.timeline_id,
374 25 : self.tenant_shard_id,
375 25 : key,
376 25 : self.lsn_range.clone(),
377 25 : self.gate,
378 25 : self.cancel.clone(),
379 25 : ctx,
380 25 : )
381 25 : .await
382 25 : .map_err(PutError::Other)?,
383 : ));
384 6079 : }
385 6104 : let (_, inner) = self.inner.as_mut().unwrap();
386 :
387 6104 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
388 6104 : if inner.num_keys() >= 1
389 6079 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
390 : {
391 1498 : if key != self.last_key_written {
392 7 : let next_delta_writer = DeltaLayerWriter::new(
393 7 : self.conf,
394 7 : self.timeline_id,
395 7 : self.tenant_shard_id,
396 7 : key,
397 7 : self.lsn_range.clone(),
398 7 : self.gate,
399 7 : self.cancel.clone(),
400 7 : ctx,
401 7 : )
402 7 : .await
403 7 : .map_err(PutError::Other)?;
404 7 : let (start_key, prev_delta_writer) =
405 7 : self.inner.replace((key, next_delta_writer)).unwrap();
406 7 : self.batches.add_unfinished_delta_writer(
407 7 : prev_delta_writer,
408 7 : start_key..key,
409 7 : self.lsn_range.clone(),
410 : );
411 1491 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
412 : // We have to produce a very large file b/c a key is updated too often.
413 0 : return Err(PutError::Other(anyhow::anyhow!(
414 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
415 0 : key,
416 0 : inner.estimated_size()
417 0 : )));
418 1491 : }
419 4606 : }
420 6104 : self.last_key_written = key;
421 6104 : let (_, inner) = self.inner.as_mut().unwrap();
422 6104 : inner.put_value(key, lsn, val, ctx).await
423 6104 : }
424 :
425 29 : pub(crate) async fn finish_with_discard_fn<D, F>(
426 29 : self,
427 29 : tline: &Arc<Timeline>,
428 29 : ctx: &RequestContext,
429 29 : discard_fn: D,
430 29 : ) -> anyhow::Result<Vec<BatchWriterResult>>
431 29 : where
432 29 : D: Fn(&PersistentLayerKey) -> F,
433 29 : F: Future<Output = bool>,
434 29 : {
435 29 : let Self {
436 29 : mut batches, inner, ..
437 29 : } = self;
438 29 : if let Some((start_key, writer)) = inner {
439 23 : if writer.num_keys() != 0 {
440 23 : let end_key = self.last_key_written.next();
441 23 : batches.add_unfinished_delta_writer(
442 23 : writer,
443 23 : start_key..end_key,
444 23 : self.lsn_range.clone(),
445 23 : );
446 23 : }
447 6 : }
448 29 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
449 29 : }
450 :
451 : #[cfg(test)]
452 3 : pub(crate) async fn finish(
453 3 : self,
454 3 : tline: &Arc<Timeline>,
455 3 : ctx: &RequestContext,
456 3 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
457 8 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
458 3 : .await
459 3 : }
460 : }
461 :
462 : #[cfg(test)]
463 : mod tests {
464 : use itertools::Itertools;
465 : use rand::{RngCore, SeedableRng};
466 :
467 : use super::*;
468 : use crate::DEFAULT_PG_VERSION;
469 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
470 : use crate::tenant::storage_layer::AsLayerDesc;
471 :
472 10026 : fn get_key(id: u32) -> Key {
473 10026 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
474 10026 : key.field6 = id;
475 10026 : key
476 10026 : }
477 :
478 4 : fn get_img(id: u32) -> Bytes {
479 4 : format!("{id:064}").into()
480 4 : }
481 :
482 10002 : fn get_large_img() -> Bytes {
483 10002 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
484 10002 : let mut data = vec![0; 8192];
485 10002 : rng.fill_bytes(&mut data);
486 10002 : data.into()
487 10002 : }
488 :
489 : #[tokio::test]
490 1 : async fn write_one_image() {
491 1 : let harness = TenantHarness::create("split_writer_write_one_image")
492 1 : .await
493 1 : .unwrap();
494 1 : let (tenant, ctx) = harness.load().await;
495 :
496 1 : let tline = tenant
497 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
498 1 : .await
499 1 : .unwrap();
500 :
501 1 : let mut image_writer = SplitImageLayerWriter::new(
502 1 : tenant.conf,
503 1 : tline.timeline_id,
504 1 : tenant.tenant_shard_id,
505 1 : get_key(0),
506 1 : Lsn(0x18),
507 1 : 4 * 1024 * 1024,
508 1 : &tline.gate,
509 1 : tline.cancel.clone(),
510 : );
511 :
512 1 : let mut delta_writer = SplitDeltaLayerWriter::new(
513 1 : tenant.conf,
514 1 : tline.timeline_id,
515 1 : tenant.tenant_shard_id,
516 1 : Lsn(0x18)..Lsn(0x20),
517 1 : 4 * 1024 * 1024,
518 1 : &tline.gate,
519 1 : tline.cancel.clone(),
520 : );
521 :
522 1 : image_writer
523 1 : .put_image(get_key(0), get_img(0), &ctx)
524 1 : .await
525 1 : .unwrap();
526 1 : let layers = image_writer
527 1 : .finish(&tline, &ctx, get_key(10))
528 1 : .await
529 1 : .unwrap();
530 1 : assert_eq!(layers.len(), 1);
531 :
532 1 : delta_writer
533 1 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
534 1 : .await
535 1 : .unwrap();
536 1 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
537 1 : assert_eq!(layers.len(), 1);
538 1 : assert_eq!(
539 1 : layers
540 1 : .into_iter()
541 1 : .next()
542 1 : .unwrap()
543 1 : .into_resident_layer()
544 1 : .layer_desc()
545 1 : .key(),
546 1 : PersistentLayerKey {
547 1 : key_range: get_key(0)..get_key(1),
548 1 : lsn_range: Lsn(0x18)..Lsn(0x20),
549 1 : is_delta: true
550 1 : }
551 1 : );
552 1 : }
553 :
554 : #[tokio::test]
555 1 : async fn write_split() {
556 : // Test the split writer with retaining all the layers we have produced (discard=false)
557 1 : write_split_helper("split_writer_write_split", false).await;
558 1 : }
559 :
560 : #[tokio::test]
561 1 : async fn write_split_discard() {
562 : // Test the split writer with discarding all the layers we have produced (discard=true)
563 1 : write_split_helper("split_writer_write_split_discard", true).await;
564 1 : }
565 :
566 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
567 : /// set to true, all layers will be discarded.
568 2 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
569 2 : let harness = TenantHarness::create(harness_name).await.unwrap();
570 2 : let (tenant, ctx) = harness.load().await;
571 :
572 2 : let tline = tenant
573 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
574 2 : .await
575 2 : .unwrap();
576 :
577 2 : let mut image_writer = SplitImageLayerWriter::new(
578 2 : tenant.conf,
579 2 : tline.timeline_id,
580 2 : tenant.tenant_shard_id,
581 2 : get_key(0),
582 2 : Lsn(0x18),
583 2 : 4 * 1024 * 1024,
584 2 : &tline.gate,
585 2 : tline.cancel.clone(),
586 : );
587 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
588 2 : tenant.conf,
589 2 : tline.timeline_id,
590 2 : tenant.tenant_shard_id,
591 2 : Lsn(0x18)..Lsn(0x20),
592 2 : 4 * 1024 * 1024,
593 2 : &tline.gate,
594 2 : tline.cancel.clone(),
595 : );
596 : const N: usize = 2000;
597 4002 : for i in 0..N {
598 4000 : let i = i as u32;
599 4000 : image_writer
600 4000 : .put_image(get_key(i), get_large_img(), &ctx)
601 4000 : .await
602 4000 : .unwrap();
603 4000 : delta_writer
604 4000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
605 4000 : .await
606 4000 : .unwrap();
607 : }
608 2 : let image_layers = image_writer
609 16 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
610 2 : .await
611 2 : .unwrap();
612 2 : let delta_layers = delta_writer
613 16 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
614 2 : .await
615 2 : .unwrap();
616 2 : let image_layers = image_layers
617 2 : .into_iter()
618 8 : .map(|x| {
619 8 : if discard {
620 4 : x.into_discarded_layer()
621 : } else {
622 4 : x.into_resident_layer().layer_desc().key()
623 : }
624 8 : })
625 2 : .collect_vec();
626 2 : let delta_layers = delta_layers
627 2 : .into_iter()
628 8 : .map(|x| {
629 8 : if discard {
630 4 : x.into_discarded_layer()
631 : } else {
632 4 : x.into_resident_layer().layer_desc().key()
633 : }
634 8 : })
635 2 : .collect_vec();
636 2 : assert_eq!(image_layers.len(), N / 512 + 1);
637 2 : assert_eq!(delta_layers.len(), N / 512 + 1);
638 2 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
639 2 : assert_eq!(
640 2 : delta_layers.last().unwrap().key_range.end,
641 2 : get_key(N as u32)
642 : );
643 8 : for idx in 0..image_layers.len() {
644 8 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
645 8 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
646 8 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
647 8 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
648 8 : if idx > 0 {
649 6 : assert_eq!(
650 6 : image_layers[idx - 1].key_range.end,
651 6 : image_layers[idx].key_range.start
652 : );
653 6 : assert_eq!(
654 6 : delta_layers[idx - 1].key_range.end,
655 6 : delta_layers[idx].key_range.start
656 : );
657 2 : }
658 : }
659 2 : }
660 :
661 : #[tokio::test]
662 1 : async fn write_large_img() {
663 1 : let harness = TenantHarness::create("split_writer_write_large_img")
664 1 : .await
665 1 : .unwrap();
666 1 : let (tenant, ctx) = harness.load().await;
667 :
668 1 : let tline = tenant
669 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
670 1 : .await
671 1 : .unwrap();
672 :
673 1 : let mut image_writer = SplitImageLayerWriter::new(
674 1 : tenant.conf,
675 1 : tline.timeline_id,
676 1 : tenant.tenant_shard_id,
677 1 : get_key(0),
678 1 : Lsn(0x18),
679 1 : 4 * 1024,
680 1 : &tline.gate,
681 1 : tline.cancel.clone(),
682 : );
683 :
684 1 : let mut delta_writer = SplitDeltaLayerWriter::new(
685 1 : tenant.conf,
686 1 : tline.timeline_id,
687 1 : tenant.tenant_shard_id,
688 1 : Lsn(0x18)..Lsn(0x20),
689 1 : 4 * 1024,
690 1 : &tline.gate,
691 1 : tline.cancel.clone(),
692 : );
693 :
694 1 : image_writer
695 1 : .put_image(get_key(0), get_img(0), &ctx)
696 1 : .await
697 1 : .unwrap();
698 1 : image_writer
699 1 : .put_image(get_key(1), get_large_img(), &ctx)
700 1 : .await
701 1 : .unwrap();
702 1 : let layers = image_writer
703 1 : .finish(&tline, &ctx, get_key(10))
704 1 : .await
705 1 : .unwrap();
706 1 : assert_eq!(layers.len(), 2);
707 :
708 1 : delta_writer
709 1 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
710 1 : .await
711 1 : .unwrap();
712 1 : delta_writer
713 1 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
714 1 : .await
715 1 : .unwrap();
716 1 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
717 1 : assert_eq!(layers.len(), 2);
718 1 : let mut layers_iter = layers.into_iter();
719 1 : assert_eq!(
720 1 : layers_iter
721 1 : .next()
722 1 : .unwrap()
723 1 : .into_resident_layer()
724 1 : .layer_desc()
725 1 : .key(),
726 1 : PersistentLayerKey {
727 1 : key_range: get_key(0)..get_key(1),
728 1 : lsn_range: Lsn(0x18)..Lsn(0x20),
729 1 : is_delta: true
730 1 : }
731 : );
732 1 : assert_eq!(
733 1 : layers_iter
734 1 : .next()
735 1 : .unwrap()
736 1 : .into_resident_layer()
737 1 : .layer_desc()
738 1 : .key(),
739 1 : PersistentLayerKey {
740 1 : key_range: get_key(1)..get_key(2),
741 1 : lsn_range: Lsn(0x18)..Lsn(0x20),
742 1 : is_delta: true
743 1 : }
744 1 : );
745 1 : }
746 :
747 : #[tokio::test]
748 1 : async fn write_split_single_key() {
749 1 : let harness = TenantHarness::create("split_writer_write_split_single_key")
750 1 : .await
751 1 : .unwrap();
752 1 : let (tenant, ctx) = harness.load().await;
753 :
754 1 : let tline = tenant
755 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
756 1 : .await
757 1 : .unwrap();
758 :
759 : const N: usize = 2000;
760 1 : let mut delta_writer = SplitDeltaLayerWriter::new(
761 1 : tenant.conf,
762 1 : tline.timeline_id,
763 1 : tenant.tenant_shard_id,
764 1 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
765 1 : 4 * 1024 * 1024,
766 1 : &tline.gate,
767 1 : tline.cancel.clone(),
768 : );
769 :
770 2001 : for i in 0..N {
771 2000 : let i = i as u32;
772 2000 : delta_writer
773 2000 : .put_value(
774 2000 : get_key(0),
775 2000 : Lsn(i as u64 * 16 + 0x10),
776 2000 : Value::Image(get_large_img()),
777 2000 : &ctx,
778 2000 : )
779 2000 : .await
780 2000 : .unwrap();
781 : }
782 1 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
783 1 : assert_eq!(delta_layers.len(), 1);
784 1 : let delta_layer = delta_layers
785 1 : .into_iter()
786 1 : .next()
787 1 : .unwrap()
788 1 : .into_resident_layer();
789 1 : assert_eq!(
790 1 : delta_layer.layer_desc().key(),
791 1 : PersistentLayerKey {
792 1 : key_range: get_key(0)..get_key(1),
793 1 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
794 1 : is_delta: true
795 1 : }
796 1 : );
797 1 : }
798 : }
|