Line data Source code
1 : use std::future::Future;
2 : use std::ops::Range;
3 : use std::sync::Arc;
4 :
5 : use bytes::Bytes;
6 : use pageserver_api::key::{KEY_SIZE, Key};
7 : use pageserver_api::value::Value;
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::TimelineId;
10 : use utils::lsn::Lsn;
11 : use utils::shard::TenantShardId;
12 :
13 : use super::errors::PutError;
14 : use super::layer::S3_UPLOAD_LIMIT;
15 : use super::{
16 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
17 : };
18 : use crate::config::PageServerConf;
19 : use crate::context::RequestContext;
20 : use crate::tenant::Timeline;
21 : use crate::tenant::storage_layer::Layer;
22 :
23 : pub(crate) enum BatchWriterResult {
24 : Produced(ResidentLayer),
25 : Discarded(PersistentLayerKey),
26 : }
27 :
28 : #[cfg(test)]
29 : impl BatchWriterResult {
30 12 : fn into_resident_layer(self) -> ResidentLayer {
31 12 : match self {
32 12 : BatchWriterResult::Produced(layer) => layer,
33 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
34 : }
35 12 : }
36 :
37 8 : fn into_discarded_layer(self) -> PersistentLayerKey {
38 8 : match self {
39 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
40 8 : BatchWriterResult::Discarded(layer) => layer,
41 8 : }
42 8 : }
43 : }
44 :
45 : enum LayerWriterWrapper {
46 : Image(ImageLayerWriter),
47 : Delta(DeltaLayerWriter),
48 : }
49 :
50 : /// An layer writer that takes unfinished layers and finish them atomically.
51 : #[must_use]
52 : pub struct BatchLayerWriter {
53 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
54 : conf: &'static PageServerConf,
55 : }
56 :
57 : impl BatchLayerWriter {
58 349 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
59 349 : Ok(Self {
60 349 : generated_layer_writers: Vec::new(),
61 349 : conf,
62 349 : })
63 349 : }
64 :
65 158 : pub fn add_unfinished_image_writer(
66 158 : &mut self,
67 158 : writer: ImageLayerWriter,
68 158 : key_range: Range<Key>,
69 158 : lsn: Lsn,
70 158 : ) {
71 158 : self.generated_layer_writers.push((
72 158 : LayerWriterWrapper::Image(writer),
73 158 : PersistentLayerKey {
74 158 : key_range,
75 158 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
76 158 : is_delta: false,
77 158 : },
78 158 : ));
79 158 : }
80 :
81 30 : pub fn add_unfinished_delta_writer(
82 30 : &mut self,
83 30 : writer: DeltaLayerWriter,
84 30 : key_range: Range<Key>,
85 30 : lsn_range: Range<Lsn>,
86 30 : ) {
87 30 : self.generated_layer_writers.push((
88 30 : LayerWriterWrapper::Delta(writer),
89 30 : PersistentLayerKey {
90 30 : key_range,
91 30 : lsn_range,
92 30 : is_delta: true,
93 30 : },
94 30 : ));
95 30 : }
96 :
97 291 : pub(crate) async fn finish(
98 291 : self,
99 291 : tline: &Arc<Timeline>,
100 291 : ctx: &RequestContext,
101 291 : ) -> anyhow::Result<Vec<ResidentLayer>> {
102 291 : let res = self
103 291 : .finish_with_discard_fn(tline, ctx, |_| async { false })
104 291 : .await?;
105 291 : let mut output = Vec::new();
106 419 : for r in res {
107 128 : if let BatchWriterResult::Produced(layer) = r {
108 128 : output.push(layer);
109 128 : }
110 : }
111 291 : Ok(output)
112 291 : }
113 :
114 343 : pub(crate) async fn finish_with_discard_fn<D, F>(
115 343 : self,
116 343 : tline: &Arc<Timeline>,
117 343 : ctx: &RequestContext,
118 343 : discard_fn: D,
119 343 : ) -> anyhow::Result<Vec<BatchWriterResult>>
120 343 : where
121 343 : D: Fn(&PersistentLayerKey) -> F,
122 343 : F: Future<Output = bool>,
123 343 : {
124 343 : let Self {
125 343 : generated_layer_writers,
126 343 : ..
127 343 : } = self;
128 343 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
129 0 : for produced_layer in generated_layers {
130 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
131 0 : let layer: Layer = resident_layer.into();
132 0 : layer.delete_on_drop();
133 0 : }
134 : }
135 0 : };
136 : // BEGIN: catch every error and do the recovery in the below section
137 343 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
138 531 : for (inner, layer_key) in generated_layer_writers {
139 188 : if discard_fn(&layer_key).await {
140 19 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
141 19 : } else {
142 169 : let res = match inner {
143 19 : LayerWriterWrapper::Delta(writer) => {
144 19 : writer.finish(layer_key.key_range.end, ctx).await
145 : }
146 150 : LayerWriterWrapper::Image(writer) => {
147 150 : writer
148 150 : .finish_with_end_key(layer_key.key_range.end, ctx)
149 150 : .await
150 : }
151 : };
152 169 : let layer = match res {
153 169 : Ok((desc, path)) => {
154 169 : match Layer::finish_creating(self.conf, tline, desc, &path) {
155 169 : Ok(layer) => layer,
156 0 : Err(e) => {
157 0 : tokio::fs::remove_file(&path).await.ok();
158 0 : clean_up_layers(generated_layers);
159 0 : return Err(e);
160 : }
161 : }
162 : }
163 0 : Err(e) => {
164 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
165 0 : // so we don't need to remove the layer we just failed to create by ourselves.
166 0 : clean_up_layers(generated_layers);
167 0 : return Err(e);
168 : }
169 : };
170 169 : generated_layers.push(BatchWriterResult::Produced(layer));
171 : }
172 : }
173 : // END: catch every error and do the recovery in the above section
174 343 : Ok(generated_layers)
175 343 : }
176 :
177 0 : pub fn pending_layer_num(&self) -> usize {
178 0 : self.generated_layer_writers.len()
179 0 : }
180 : }
181 :
182 : /// An image writer that takes images and produces multiple image layers.
183 : #[must_use]
184 : pub struct SplitImageLayerWriter<'a> {
185 : inner: ImageLayerWriter,
186 : target_layer_size: u64,
187 : lsn: Lsn,
188 : conf: &'static PageServerConf,
189 : timeline_id: TimelineId,
190 : tenant_shard_id: TenantShardId,
191 : batches: BatchLayerWriter,
192 : start_key: Key,
193 : gate: &'a utils::sync::gate::Gate,
194 : cancel: CancellationToken,
195 : }
196 :
197 : impl<'a> SplitImageLayerWriter<'a> {
198 : #[allow(clippy::too_many_arguments)]
199 26 : pub async fn new(
200 26 : conf: &'static PageServerConf,
201 26 : timeline_id: TimelineId,
202 26 : tenant_shard_id: TenantShardId,
203 26 : start_key: Key,
204 26 : lsn: Lsn,
205 26 : target_layer_size: u64,
206 26 : gate: &'a utils::sync::gate::Gate,
207 26 : cancel: CancellationToken,
208 26 : ctx: &RequestContext,
209 26 : ) -> anyhow::Result<Self> {
210 26 : Ok(Self {
211 26 : target_layer_size,
212 26 : inner: ImageLayerWriter::new(
213 26 : conf,
214 26 : timeline_id,
215 26 : tenant_shard_id,
216 26 : &(start_key..Key::MAX),
217 26 : lsn,
218 26 : gate,
219 26 : cancel.clone(),
220 26 : ctx,
221 26 : )
222 26 : .await?,
223 26 : conf,
224 26 : timeline_id,
225 26 : tenant_shard_id,
226 26 : batches: BatchLayerWriter::new(conf).await?,
227 26 : lsn,
228 26 : start_key,
229 26 : gate,
230 26 : cancel,
231 : })
232 26 : }
233 :
234 4303 : pub async fn put_image(
235 4303 : &mut self,
236 4303 : key: Key,
237 4303 : img: Bytes,
238 4303 : ctx: &RequestContext,
239 4303 : ) -> Result<(), PutError> {
240 4303 : // The current estimation is an upper bound of the space that the key/image could take
241 4303 : // because we did not consider compression in this estimation. The resulting image layer
242 4303 : // could be smaller than the target size.
243 4303 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
244 4303 : if self.inner.num_keys() >= 1
245 4277 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
246 : {
247 7 : let next_image_writer = ImageLayerWriter::new(
248 7 : self.conf,
249 7 : self.timeline_id,
250 7 : self.tenant_shard_id,
251 7 : &(key..Key::MAX),
252 7 : self.lsn,
253 7 : self.gate,
254 7 : self.cancel.clone(),
255 7 : ctx,
256 7 : )
257 7 : .await
258 7 : .map_err(PutError::Other)?;
259 7 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
260 7 : self.batches.add_unfinished_image_writer(
261 7 : prev_image_writer,
262 7 : self.start_key..key,
263 7 : self.lsn,
264 7 : );
265 7 : self.start_key = key;
266 4296 : }
267 4303 : self.inner.put_image(key, img, ctx).await
268 4303 : }
269 :
270 23 : pub(crate) async fn finish_with_discard_fn<D, F>(
271 23 : self,
272 23 : tline: &Arc<Timeline>,
273 23 : ctx: &RequestContext,
274 23 : end_key: Key,
275 23 : discard_fn: D,
276 23 : ) -> anyhow::Result<Vec<BatchWriterResult>>
277 23 : where
278 23 : D: Fn(&PersistentLayerKey) -> F,
279 23 : F: Future<Output = bool>,
280 23 : {
281 23 : let Self {
282 23 : mut batches, inner, ..
283 23 : } = self;
284 23 : if inner.num_keys() != 0 {
285 23 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
286 23 : }
287 23 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
288 23 : }
289 :
290 : #[cfg(test)]
291 2 : pub(crate) async fn finish(
292 2 : self,
293 2 : tline: &Arc<Timeline>,
294 2 : ctx: &RequestContext,
295 2 : end_key: Key,
296 2 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
297 3 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
298 2 : .await
299 2 : }
300 : }
301 :
302 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
303 : ///
304 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
305 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
306 : /// will split them into multiple files based on size.
307 : #[must_use]
308 : pub struct SplitDeltaLayerWriter<'a> {
309 : inner: Option<(Key, DeltaLayerWriter)>,
310 : target_layer_size: u64,
311 : conf: &'static PageServerConf,
312 : timeline_id: TimelineId,
313 : tenant_shard_id: TenantShardId,
314 : lsn_range: Range<Lsn>,
315 : last_key_written: Key,
316 : batches: BatchLayerWriter,
317 : gate: &'a utils::sync::gate::Gate,
318 : cancel: CancellationToken,
319 : }
320 :
321 : impl<'a> SplitDeltaLayerWriter<'a> {
322 32 : pub async fn new(
323 32 : conf: &'static PageServerConf,
324 32 : timeline_id: TimelineId,
325 32 : tenant_shard_id: TenantShardId,
326 32 : lsn_range: Range<Lsn>,
327 32 : target_layer_size: u64,
328 32 : gate: &'a utils::sync::gate::Gate,
329 32 : cancel: CancellationToken,
330 32 : ) -> anyhow::Result<Self> {
331 32 : Ok(Self {
332 32 : target_layer_size,
333 32 : inner: None,
334 32 : conf,
335 32 : timeline_id,
336 32 : tenant_shard_id,
337 32 : lsn_range,
338 32 : last_key_written: Key::MIN,
339 32 : batches: BatchLayerWriter::new(conf).await?,
340 32 : gate,
341 32 : cancel,
342 : })
343 32 : }
344 :
345 6104 : pub async fn put_value(
346 6104 : &mut self,
347 6104 : key: Key,
348 6104 : lsn: Lsn,
349 6104 : val: Value,
350 6104 : ctx: &RequestContext,
351 6104 : ) -> Result<(), PutError> {
352 6104 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
353 6104 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
354 6104 : //
355 6104 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
356 6104 : // strategy. https://github.com/neondatabase/neon/issues/8837
357 6104 :
358 6104 : if self.inner.is_none() {
359 25 : self.inner = Some((
360 25 : key,
361 25 : DeltaLayerWriter::new(
362 25 : self.conf,
363 25 : self.timeline_id,
364 25 : self.tenant_shard_id,
365 25 : key,
366 25 : self.lsn_range.clone(),
367 25 : self.gate,
368 25 : self.cancel.clone(),
369 25 : ctx,
370 25 : )
371 25 : .await
372 25 : .map_err(PutError::Other)?,
373 : ));
374 6079 : }
375 6104 : let (_, inner) = self.inner.as_mut().unwrap();
376 6104 :
377 6104 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
378 6104 : if inner.num_keys() >= 1
379 6079 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
380 : {
381 1498 : if key != self.last_key_written {
382 7 : let next_delta_writer = DeltaLayerWriter::new(
383 7 : self.conf,
384 7 : self.timeline_id,
385 7 : self.tenant_shard_id,
386 7 : key,
387 7 : self.lsn_range.clone(),
388 7 : self.gate,
389 7 : self.cancel.clone(),
390 7 : ctx,
391 7 : )
392 7 : .await
393 7 : .map_err(PutError::Other)?;
394 7 : let (start_key, prev_delta_writer) =
395 7 : self.inner.replace((key, next_delta_writer)).unwrap();
396 7 : self.batches.add_unfinished_delta_writer(
397 7 : prev_delta_writer,
398 7 : start_key..key,
399 7 : self.lsn_range.clone(),
400 7 : );
401 1491 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
402 : // We have to produce a very large file b/c a key is updated too often.
403 0 : return Err(PutError::Other(anyhow::anyhow!(
404 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
405 0 : key,
406 0 : inner.estimated_size()
407 0 : )));
408 1491 : }
409 4606 : }
410 6104 : self.last_key_written = key;
411 6104 : let (_, inner) = self.inner.as_mut().unwrap();
412 6104 : inner.put_value(key, lsn, val, ctx).await
413 6104 : }
414 :
415 29 : pub(crate) async fn finish_with_discard_fn<D, F>(
416 29 : self,
417 29 : tline: &Arc<Timeline>,
418 29 : ctx: &RequestContext,
419 29 : discard_fn: D,
420 29 : ) -> anyhow::Result<Vec<BatchWriterResult>>
421 29 : where
422 29 : D: Fn(&PersistentLayerKey) -> F,
423 29 : F: Future<Output = bool>,
424 29 : {
425 29 : let Self {
426 29 : mut batches, inner, ..
427 29 : } = self;
428 29 : if let Some((start_key, writer)) = inner {
429 23 : if writer.num_keys() != 0 {
430 23 : let end_key = self.last_key_written.next();
431 23 : batches.add_unfinished_delta_writer(
432 23 : writer,
433 23 : start_key..end_key,
434 23 : self.lsn_range.clone(),
435 23 : );
436 23 : }
437 6 : }
438 29 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
439 29 : }
440 :
441 : #[cfg(test)]
442 3 : pub(crate) async fn finish(
443 3 : self,
444 3 : tline: &Arc<Timeline>,
445 3 : ctx: &RequestContext,
446 3 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
447 4 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
448 3 : .await
449 3 : }
450 : }
451 :
452 : #[cfg(test)]
453 : mod tests {
454 : use itertools::Itertools;
455 : use rand::{RngCore, SeedableRng};
456 :
457 : use super::*;
458 : use crate::DEFAULT_PG_VERSION;
459 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
460 : use crate::tenant::storage_layer::AsLayerDesc;
461 :
462 10026 : fn get_key(id: u32) -> Key {
463 10026 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
464 10026 : key.field6 = id;
465 10026 : key
466 10026 : }
467 :
468 4 : fn get_img(id: u32) -> Bytes {
469 4 : format!("{id:064}").into()
470 4 : }
471 :
472 10002 : fn get_large_img() -> Bytes {
473 10002 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
474 10002 : let mut data = vec![0; 8192];
475 10002 : rng.fill_bytes(&mut data);
476 10002 : data.into()
477 10002 : }
478 :
479 : #[tokio::test]
480 1 : async fn write_one_image() {
481 1 : let harness = TenantHarness::create("split_writer_write_one_image")
482 1 : .await
483 1 : .unwrap();
484 1 : let (tenant, ctx) = harness.load().await;
485 1 :
486 1 : let tline = tenant
487 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
488 1 : .await
489 1 : .unwrap();
490 1 :
491 1 : let mut image_writer = SplitImageLayerWriter::new(
492 1 : tenant.conf,
493 1 : tline.timeline_id,
494 1 : tenant.tenant_shard_id,
495 1 : get_key(0),
496 1 : Lsn(0x18),
497 1 : 4 * 1024 * 1024,
498 1 : &tline.gate,
499 1 : tline.cancel.clone(),
500 1 : &ctx,
501 1 : )
502 1 : .await
503 1 : .unwrap();
504 1 :
505 1 : let mut delta_writer = SplitDeltaLayerWriter::new(
506 1 : tenant.conf,
507 1 : tline.timeline_id,
508 1 : tenant.tenant_shard_id,
509 1 : Lsn(0x18)..Lsn(0x20),
510 1 : 4 * 1024 * 1024,
511 1 : &tline.gate,
512 1 : tline.cancel.clone(),
513 1 : )
514 1 : .await
515 1 : .unwrap();
516 1 :
517 1 : image_writer
518 1 : .put_image(get_key(0), get_img(0), &ctx)
519 1 : .await
520 1 : .unwrap();
521 1 : let layers = image_writer
522 1 : .finish(&tline, &ctx, get_key(10))
523 1 : .await
524 1 : .unwrap();
525 1 : assert_eq!(layers.len(), 1);
526 1 :
527 1 : delta_writer
528 1 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
529 1 : .await
530 1 : .unwrap();
531 1 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
532 1 : assert_eq!(layers.len(), 1);
533 1 : assert_eq!(
534 1 : layers
535 1 : .into_iter()
536 1 : .next()
537 1 : .unwrap()
538 1 : .into_resident_layer()
539 1 : .layer_desc()
540 1 : .key(),
541 1 : PersistentLayerKey {
542 1 : key_range: get_key(0)..get_key(1),
543 1 : lsn_range: Lsn(0x18)..Lsn(0x20),
544 1 : is_delta: true
545 1 : }
546 1 : );
547 1 : }
548 :
549 : #[tokio::test]
550 1 : async fn write_split() {
551 1 : // Test the split writer with retaining all the layers we have produced (discard=false)
552 1 : write_split_helper("split_writer_write_split", false).await;
553 1 : }
554 :
555 : #[tokio::test]
556 1 : async fn write_split_discard() {
557 1 : // Test the split writer with discarding all the layers we have produced (discard=true)
558 1 : write_split_helper("split_writer_write_split_discard", true).await;
559 1 : }
560 :
561 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
562 : /// set to true, all layers will be discarded.
563 2 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
564 2 : let harness = TenantHarness::create(harness_name).await.unwrap();
565 2 : let (tenant, ctx) = harness.load().await;
566 :
567 2 : let tline = tenant
568 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
569 2 : .await
570 2 : .unwrap();
571 :
572 2 : let mut image_writer = SplitImageLayerWriter::new(
573 2 : tenant.conf,
574 2 : tline.timeline_id,
575 2 : tenant.tenant_shard_id,
576 2 : get_key(0),
577 2 : Lsn(0x18),
578 2 : 4 * 1024 * 1024,
579 2 : &tline.gate,
580 2 : tline.cancel.clone(),
581 2 : &ctx,
582 2 : )
583 2 : .await
584 2 : .unwrap();
585 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
586 2 : tenant.conf,
587 2 : tline.timeline_id,
588 2 : tenant.tenant_shard_id,
589 2 : Lsn(0x18)..Lsn(0x20),
590 2 : 4 * 1024 * 1024,
591 2 : &tline.gate,
592 2 : tline.cancel.clone(),
593 2 : )
594 2 : .await
595 2 : .unwrap();
596 : const N: usize = 2000;
597 4002 : for i in 0..N {
598 4000 : let i = i as u32;
599 4000 : image_writer
600 4000 : .put_image(get_key(i), get_large_img(), &ctx)
601 4000 : .await
602 4000 : .unwrap();
603 4000 : delta_writer
604 4000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
605 4000 : .await
606 4000 : .unwrap();
607 : }
608 2 : let image_layers = image_writer
609 8 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
610 2 : .await
611 2 : .unwrap();
612 2 : let delta_layers = delta_writer
613 8 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
614 2 : .await
615 2 : .unwrap();
616 2 : let image_layers = image_layers
617 2 : .into_iter()
618 8 : .map(|x| {
619 8 : if discard {
620 4 : x.into_discarded_layer()
621 : } else {
622 4 : x.into_resident_layer().layer_desc().key()
623 : }
624 8 : })
625 2 : .collect_vec();
626 2 : let delta_layers = delta_layers
627 2 : .into_iter()
628 8 : .map(|x| {
629 8 : if discard {
630 4 : x.into_discarded_layer()
631 : } else {
632 4 : x.into_resident_layer().layer_desc().key()
633 : }
634 8 : })
635 2 : .collect_vec();
636 2 : assert_eq!(image_layers.len(), N / 512 + 1);
637 2 : assert_eq!(delta_layers.len(), N / 512 + 1);
638 2 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
639 2 : assert_eq!(
640 2 : delta_layers.last().unwrap().key_range.end,
641 2 : get_key(N as u32)
642 2 : );
643 8 : for idx in 0..image_layers.len() {
644 8 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
645 8 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
646 8 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
647 8 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
648 8 : if idx > 0 {
649 6 : assert_eq!(
650 6 : image_layers[idx - 1].key_range.end,
651 6 : image_layers[idx].key_range.start
652 6 : );
653 6 : assert_eq!(
654 6 : delta_layers[idx - 1].key_range.end,
655 6 : delta_layers[idx].key_range.start
656 6 : );
657 2 : }
658 : }
659 2 : }
660 :
661 : #[tokio::test]
662 1 : async fn write_large_img() {
663 1 : let harness = TenantHarness::create("split_writer_write_large_img")
664 1 : .await
665 1 : .unwrap();
666 1 : let (tenant, ctx) = harness.load().await;
667 1 :
668 1 : let tline = tenant
669 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
670 1 : .await
671 1 : .unwrap();
672 1 :
673 1 : let mut image_writer = SplitImageLayerWriter::new(
674 1 : tenant.conf,
675 1 : tline.timeline_id,
676 1 : tenant.tenant_shard_id,
677 1 : get_key(0),
678 1 : Lsn(0x18),
679 1 : 4 * 1024,
680 1 : &tline.gate,
681 1 : tline.cancel.clone(),
682 1 : &ctx,
683 1 : )
684 1 : .await
685 1 : .unwrap();
686 1 :
687 1 : let mut delta_writer = SplitDeltaLayerWriter::new(
688 1 : tenant.conf,
689 1 : tline.timeline_id,
690 1 : tenant.tenant_shard_id,
691 1 : Lsn(0x18)..Lsn(0x20),
692 1 : 4 * 1024,
693 1 : &tline.gate,
694 1 : tline.cancel.clone(),
695 1 : )
696 1 : .await
697 1 : .unwrap();
698 1 :
699 1 : image_writer
700 1 : .put_image(get_key(0), get_img(0), &ctx)
701 1 : .await
702 1 : .unwrap();
703 1 : image_writer
704 1 : .put_image(get_key(1), get_large_img(), &ctx)
705 1 : .await
706 1 : .unwrap();
707 1 : let layers = image_writer
708 1 : .finish(&tline, &ctx, get_key(10))
709 1 : .await
710 1 : .unwrap();
711 1 : assert_eq!(layers.len(), 2);
712 1 :
713 1 : delta_writer
714 1 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
715 1 : .await
716 1 : .unwrap();
717 1 : delta_writer
718 1 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
719 1 : .await
720 1 : .unwrap();
721 1 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
722 1 : assert_eq!(layers.len(), 2);
723 1 : let mut layers_iter = layers.into_iter();
724 1 : assert_eq!(
725 1 : layers_iter
726 1 : .next()
727 1 : .unwrap()
728 1 : .into_resident_layer()
729 1 : .layer_desc()
730 1 : .key(),
731 1 : PersistentLayerKey {
732 1 : key_range: get_key(0)..get_key(1),
733 1 : lsn_range: Lsn(0x18)..Lsn(0x20),
734 1 : is_delta: true
735 1 : }
736 1 : );
737 1 : assert_eq!(
738 1 : layers_iter
739 1 : .next()
740 1 : .unwrap()
741 1 : .into_resident_layer()
742 1 : .layer_desc()
743 1 : .key(),
744 1 : PersistentLayerKey {
745 1 : key_range: get_key(1)..get_key(2),
746 1 : lsn_range: Lsn(0x18)..Lsn(0x20),
747 1 : is_delta: true
748 1 : }
749 1 : );
750 1 : }
751 :
752 : #[tokio::test]
753 1 : async fn write_split_single_key() {
754 1 : let harness = TenantHarness::create("split_writer_write_split_single_key")
755 1 : .await
756 1 : .unwrap();
757 1 : let (tenant, ctx) = harness.load().await;
758 1 :
759 1 : let tline = tenant
760 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
761 1 : .await
762 1 : .unwrap();
763 1 :
764 1 : const N: usize = 2000;
765 1 : let mut delta_writer = SplitDeltaLayerWriter::new(
766 1 : tenant.conf,
767 1 : tline.timeline_id,
768 1 : tenant.tenant_shard_id,
769 1 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
770 1 : 4 * 1024 * 1024,
771 1 : &tline.gate,
772 1 : tline.cancel.clone(),
773 1 : )
774 1 : .await
775 1 : .unwrap();
776 1 :
777 2001 : for i in 0..N {
778 2000 : let i = i as u32;
779 2000 : delta_writer
780 2000 : .put_value(
781 2000 : get_key(0),
782 2000 : Lsn(i as u64 * 16 + 0x10),
783 2000 : Value::Image(get_large_img()),
784 2000 : &ctx,
785 2000 : )
786 2000 : .await
787 2000 : .unwrap();
788 1 : }
789 1 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
790 1 : assert_eq!(delta_layers.len(), 1);
791 1 : let delta_layer = delta_layers
792 1 : .into_iter()
793 1 : .next()
794 1 : .unwrap()
795 1 : .into_resident_layer();
796 1 : assert_eq!(
797 1 : delta_layer.layer_desc().key(),
798 1 : PersistentLayerKey {
799 1 : key_range: get_key(0)..get_key(1),
800 1 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
801 1 : is_delta: true
802 1 : }
803 1 : );
804 1 : }
805 : }
|