Line data Source code
1 : //! Import a PGDATA directory into an empty root timeline.
2 : //!
3 : //! This module is adapted hackathon code by Heikki and Stas.
4 : //! Other code in the parent module was written by Christian as part of a customer PoC.
5 : //!
6 : //! The hackathon code was producing image layer files as a free-standing program.
7 : //!
8 : //! It has been modified to
9 : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
10 : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
11 : //! - => S3 as the source for the PGDATA instead of local filesystem
12 : //!
13 : //! TODOs before productionization:
14 : //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
15 : //! => produced image layers likely too small.
16 : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
17 : //! - asserts / unwraps need to be replaced with errors
18 : //! - don't trust remote objects will be small (=prevent OOMs in those cases)
19 : //! - limit all in-memory buffers in size, or download to disk and read from there
20 : //! - limit task concurrency
21 : //! - generally play nice with other tenants in the system
22 : //! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
23 : //! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
24 : //! - integrate with layer eviction system
25 : //! - audit for Tenant::cancel nor Timeline::cancel responsivity
26 : //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
27 : //!
28 : //! An incomplete set of TODOs from the Hackathon:
29 : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
30 :
31 : use std::collections::HashSet;
32 : use std::ops::Range;
33 : use std::sync::Arc;
34 :
35 : use anyhow::{bail, ensure};
36 : use bytes::Bytes;
37 : use itertools::Itertools;
38 : use pageserver_api::key::{
39 : CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
40 : rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
41 : slru_segment_size_to_key,
42 : };
43 : use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range};
44 : use pageserver_api::reltag::{RelTag, SlruKind};
45 : use pageserver_api::shard::ShardIdentity;
46 : use postgres_ffi::relfile_utils::parse_relfilename;
47 : use postgres_ffi::{BLCKSZ, pg_constants};
48 : use remote_storage::RemotePath;
49 : use tokio::task::JoinSet;
50 : use tracing::{Instrument, debug, info_span, instrument};
51 : use utils::bin_ser::BeSer;
52 : use utils::lsn::Lsn;
53 :
54 : use super::Timeline;
55 : use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
56 : use crate::assert_u64_eq_usize::UsizeIsU64;
57 : use crate::context::{DownloadBehavior, RequestContext};
58 : use crate::pgdatadir_mapping::{
59 : DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory,
60 : };
61 : use crate::task_mgr::TaskKind;
62 : use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
63 :
64 0 : pub async fn run(
65 0 : timeline: Arc<Timeline>,
66 0 : pgdata_lsn: Lsn,
67 0 : control_file: ControlFile,
68 0 : storage: RemoteStorageWrapper,
69 0 : ctx: &RequestContext,
70 0 : ) -> anyhow::Result<()> {
71 0 : Flow {
72 0 : timeline,
73 0 : pgdata_lsn,
74 0 : control_file,
75 0 : tasks: Vec::new(),
76 0 : storage,
77 0 : }
78 0 : .run(ctx)
79 0 : .await
80 0 : }
81 :
82 : struct Flow {
83 : timeline: Arc<Timeline>,
84 : pgdata_lsn: Lsn,
85 : control_file: ControlFile,
86 : tasks: Vec<AnyImportTask>,
87 : storage: RemoteStorageWrapper,
88 : }
89 :
90 : impl Flow {
91 : /// Perform the ingestion into [`Self::timeline`].
92 : /// Assumes the timeline is empty (= no layers).
93 0 : pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
94 0 : let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
95 0 :
96 0 : self.pgdata_lsn = pgdata_lsn;
97 :
98 0 : let datadir = PgDataDir::new(&self.storage).await?;
99 :
100 : // Import dbdir (00:00:00 keyspace)
101 : // This is just constructed here, but will be written to the image layer in the first call to import_db()
102 0 : let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
103 0 : dbdirs: datadir
104 0 : .dbs
105 0 : .iter()
106 0 : .map(|db| ((db.spcnode, db.dboid), true))
107 0 : .collect(),
108 0 : })?);
109 0 : self.tasks
110 0 : .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
111 :
112 : // Import databases (00:spcnode:dbnode keyspace for each db)
113 0 : for db in datadir.dbs {
114 0 : self.import_db(&db).await?;
115 : }
116 :
117 : // Import SLRUs
118 0 : if self.timeline.tenant_shard_id.is_shard_zero() {
119 : // pg_xact (01:00 keyspace)
120 0 : self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
121 0 : .await?;
122 : // pg_multixact/members (01:01 keyspace)
123 0 : self.import_slru(
124 0 : SlruKind::MultiXactMembers,
125 0 : &self.storage.pgdata().join("pg_multixact/members"),
126 0 : )
127 0 : .await?;
128 : // pg_multixact/offsets (01:02 keyspace)
129 0 : self.import_slru(
130 0 : SlruKind::MultiXactOffsets,
131 0 : &self.storage.pgdata().join("pg_multixact/offsets"),
132 0 : )
133 0 : .await?;
134 0 : }
135 :
136 : // Import pg_twophase.
137 : // TODO: as empty
138 0 : let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
139 0 : xids: HashSet::new(),
140 0 : })?;
141 0 : self.tasks
142 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
143 0 : TWOPHASEDIR_KEY,
144 0 : Bytes::from(twophasedir_buf),
145 0 : )));
146 0 :
147 0 : // Controlfile, checkpoint
148 0 : self.tasks
149 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
150 0 : CONTROLFILE_KEY,
151 0 : self.control_file.control_file_buf().clone(),
152 0 : )));
153 :
154 0 : let checkpoint_buf = self
155 0 : .control_file
156 0 : .control_file_data()
157 0 : .checkPointCopy
158 0 : .encode()?;
159 0 : self.tasks
160 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
161 0 : CHECKPOINT_KEY,
162 0 : checkpoint_buf,
163 0 : )));
164 0 :
165 0 : // Assigns parts of key space to later parallel jobs
166 0 : let mut last_end_key = Key::MIN;
167 0 : let mut current_chunk = Vec::new();
168 0 : let mut current_chunk_size: usize = 0;
169 0 : let mut parallel_jobs = Vec::new();
170 0 : for task in std::mem::take(&mut self.tasks).into_iter() {
171 0 : if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
172 0 : let key_range = last_end_key..task.key_range().start;
173 0 : parallel_jobs.push(ChunkProcessingJob::new(
174 0 : key_range.clone(),
175 0 : std::mem::take(&mut current_chunk),
176 0 : &self,
177 0 : ));
178 0 : last_end_key = key_range.end;
179 0 : current_chunk_size = 0;
180 0 : }
181 0 : current_chunk_size += task.total_size();
182 0 : current_chunk.push(task);
183 : }
184 0 : parallel_jobs.push(ChunkProcessingJob::new(
185 0 : last_end_key..Key::MAX,
186 0 : current_chunk,
187 0 : &self,
188 0 : ));
189 0 :
190 0 : // Start all jobs simultaneosly
191 0 : let mut work = JoinSet::new();
192 : // TODO: semaphore?
193 0 : for job in parallel_jobs {
194 0 : let ctx: RequestContext =
195 0 : ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
196 0 : work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
197 : }
198 0 : let mut results = Vec::new();
199 0 : while let Some(result) = work.join_next().await {
200 0 : match result {
201 0 : Ok(res) => {
202 0 : results.push(res);
203 0 : }
204 0 : Err(_joinset_err) => {
205 0 : results.push(Err(anyhow::anyhow!(
206 0 : "parallel job panicked or cancelled, check pageserver logs"
207 0 : )));
208 0 : }
209 : }
210 : }
211 :
212 0 : if results.iter().all(|r| r.is_ok()) {
213 0 : Ok(())
214 : } else {
215 0 : let mut msg = String::new();
216 0 : for result in results {
217 0 : if let Err(err) = result {
218 0 : msg.push_str(&format!("{err:?}\n\n"));
219 0 : }
220 : }
221 0 : bail!("Some parallel jobs failed:\n\n{msg}");
222 : }
223 0 : }
224 :
225 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
226 : async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
227 : debug!("start");
228 : scopeguard::defer! {
229 : debug!("return");
230 : }
231 :
232 : // Import relmap (00:spcnode:dbnode:00:*:00)
233 : let relmap_key = relmap_file_key(db.spcnode, db.dboid);
234 : debug!("Constructing relmap entry, key {relmap_key}");
235 : let relmap_path = db.path.join("pg_filenode.map");
236 : let relmap_buf = self.storage.get(&relmap_path).await?;
237 : self.tasks
238 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
239 : relmap_key, relmap_buf,
240 : )));
241 :
242 : // Import reldir (00:spcnode:dbnode:00:*:01)
243 : let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
244 : debug!("Constructing reldirs entry, key {reldir_key}");
245 : let reldir_buf = RelDirectory::ser(&RelDirectory {
246 : rels: db
247 : .files
248 : .iter()
249 0 : .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
250 : .collect(),
251 : })?;
252 : self.tasks
253 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
254 : reldir_key,
255 : Bytes::from(reldir_buf),
256 : )));
257 :
258 : // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
259 : // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
260 : for file in &db.files {
261 : debug!(%file.path, %file.filesize, "importing file");
262 : let len = file.filesize;
263 : ensure!(len % 8192 == 0);
264 : let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
265 : let start_key = rel_block_to_key(file.rel_tag, start_blk);
266 : let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
267 : self.tasks
268 : .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
269 : *self.timeline.get_shard_identity(),
270 : start_key..end_key,
271 : &file.path,
272 : self.storage.clone(),
273 : )));
274 :
275 : // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
276 : if let Some(nblocks) = file.nblocks {
277 : let size_key = rel_size_to_key(file.rel_tag);
278 : //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
279 : let buf = nblocks.to_le_bytes();
280 : self.tasks
281 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
282 : size_key,
283 : Bytes::from(buf.to_vec()),
284 : )));
285 : }
286 : }
287 :
288 : Ok(())
289 : }
290 :
291 0 : async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
292 0 : assert!(self.timeline.tenant_shard_id.is_shard_zero());
293 :
294 0 : let segments = self.storage.listfilesindir(path).await?;
295 0 : let segments: Vec<(String, u32, usize)> = segments
296 0 : .into_iter()
297 0 : .filter_map(|(path, size)| {
298 0 : let filename = path.object_name()?;
299 0 : let segno = u32::from_str_radix(filename, 16).ok()?;
300 0 : Some((filename.to_string(), segno, size))
301 0 : })
302 0 : .collect();
303 0 :
304 0 : // Write SlruDir
305 0 : let slrudir_key = slru_dir_to_key(kind);
306 0 : let segnos: HashSet<u32> = segments
307 0 : .iter()
308 0 : .map(|(_path, segno, _size)| *segno)
309 0 : .collect();
310 0 : let slrudir = SlruSegmentDirectory { segments: segnos };
311 0 : let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
312 0 : self.tasks
313 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
314 0 : slrudir_key,
315 0 : Bytes::from(slrudir_buf),
316 0 : )));
317 :
318 0 : for (segpath, segno, size) in segments {
319 : // SlruSegBlocks for each segment
320 0 : let p = path.join(&segpath);
321 0 : let file_size = size;
322 0 : ensure!(file_size % 8192 == 0);
323 0 : let nblocks = u32::try_from(file_size / 8192)?;
324 0 : let start_key = slru_block_to_key(kind, segno, 0);
325 0 : let end_key = slru_block_to_key(kind, segno, nblocks);
326 0 : debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
327 0 : self.tasks
328 0 : .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
329 0 : start_key..end_key,
330 0 : &p,
331 0 : self.storage.clone(),
332 0 : )));
333 0 :
334 0 : // Followed by SlruSegSize
335 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
336 0 : let segsize_buf = nblocks.to_le_bytes();
337 0 : self.tasks
338 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
339 0 : segsize_key,
340 0 : Bytes::copy_from_slice(&segsize_buf),
341 0 : )));
342 : }
343 0 : Ok(())
344 0 : }
345 : }
346 :
347 : //
348 : // dbdir iteration tools
349 : //
350 :
351 : struct PgDataDir {
352 : pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
353 : }
354 :
355 : struct PgDataDirDb {
356 : pub spcnode: u32,
357 : pub dboid: u32,
358 : pub path: RemotePath,
359 : pub files: Vec<PgDataDirDbFile>,
360 : }
361 :
362 : struct PgDataDirDbFile {
363 : pub path: RemotePath,
364 : pub rel_tag: RelTag,
365 : pub segno: u32,
366 : pub filesize: usize,
367 : // Cummulative size of the given fork, set only for the last segment of that fork
368 : pub nblocks: Option<usize>,
369 : }
370 :
371 : impl PgDataDir {
372 0 : async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
373 0 : let datadir_path = storage.pgdata();
374 0 : // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
375 0 : // Traverse database in increasing oid order
376 0 :
377 0 : let basedir = &datadir_path.join("base");
378 0 : let db_oids: Vec<_> = storage
379 0 : .listdir(basedir)
380 0 : .await?
381 0 : .into_iter()
382 0 : .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
383 0 : .sorted()
384 0 : .collect();
385 0 : debug!(?db_oids, "found databases");
386 0 : let mut databases = Vec::new();
387 0 : for dboid in db_oids {
388 0 : databases.push(
389 0 : PgDataDirDb::new(
390 0 : storage,
391 0 : &basedir.join(dboid.to_string()),
392 0 : pg_constants::DEFAULTTABLESPACE_OID,
393 0 : dboid,
394 0 : &datadir_path,
395 0 : )
396 0 : .await?,
397 : );
398 : }
399 :
400 : // special case for global catalogs
401 0 : databases.push(
402 0 : PgDataDirDb::new(
403 0 : storage,
404 0 : &datadir_path.join("global"),
405 0 : postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
406 0 : 0,
407 0 : &datadir_path,
408 0 : )
409 0 : .await?,
410 : );
411 :
412 0 : databases.sort_by_key(|db| (db.spcnode, db.dboid));
413 0 :
414 0 : Ok(Self { dbs: databases })
415 0 : }
416 : }
417 :
418 : impl PgDataDirDb {
419 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
420 : async fn new(
421 : storage: &RemoteStorageWrapper,
422 : db_path: &RemotePath,
423 : spcnode: u32,
424 : dboid: u32,
425 : datadir_path: &RemotePath,
426 : ) -> anyhow::Result<Self> {
427 : let mut files: Vec<PgDataDirDbFile> = storage
428 : .listfilesindir(db_path)
429 : .await?
430 : .into_iter()
431 0 : .filter_map(|(path, size)| {
432 0 : debug!(%path, %size, "found file in dbdir");
433 0 : path.object_name().and_then(|name| {
434 0 : // returns (relnode, forknum, segno)
435 0 : parse_relfilename(name).ok().map(|x| (size, x))
436 0 : })
437 0 : })
438 0 : .sorted_by_key(|(_, relfilename)| *relfilename)
439 0 : .map(|(filesize, (relnode, forknum, segno))| {
440 0 : let rel_tag = RelTag {
441 0 : spcnode,
442 0 : dbnode: dboid,
443 0 : relnode,
444 0 : forknum,
445 0 : };
446 0 :
447 0 : let path = datadir_path.join(rel_tag.to_segfile_name(segno));
448 0 : assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
449 0 : let nblocks = filesize / BLCKSZ as usize;
450 0 :
451 0 : PgDataDirDbFile {
452 0 : path,
453 0 : filesize,
454 0 : rel_tag,
455 0 : segno,
456 0 : nblocks: Some(nblocks), // first non-cummulative sizes
457 0 : }
458 0 : })
459 : .collect();
460 :
461 : // Set cummulative sizes. Do all of that math here, so that later we could easier
462 : // parallelize over segments and know with which segments we need to write relsize
463 : // entry.
464 : let mut cumulative_nblocks: usize = 0;
465 : let mut prev_rel_tag: Option<RelTag> = None;
466 : for i in 0..files.len() {
467 : if prev_rel_tag == Some(files[i].rel_tag) {
468 : cumulative_nblocks += files[i].nblocks.unwrap();
469 : } else {
470 : cumulative_nblocks = files[i].nblocks.unwrap();
471 : }
472 :
473 : files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
474 : Some(cumulative_nblocks)
475 : } else {
476 : None
477 : };
478 :
479 : prev_rel_tag = Some(files[i].rel_tag);
480 : }
481 :
482 : Ok(PgDataDirDb {
483 : files,
484 : path: db_path.clone(),
485 : spcnode,
486 : dboid,
487 : })
488 : }
489 : }
490 :
491 : trait ImportTask {
492 : fn key_range(&self) -> Range<Key>;
493 :
494 0 : fn total_size(&self) -> usize {
495 0 : // TODO: revisit this
496 0 : if is_contiguous_range(&self.key_range()) {
497 0 : contiguous_range_len(&self.key_range()) as usize * 8192
498 : } else {
499 0 : u32::MAX as usize
500 : }
501 0 : }
502 :
503 : async fn doit(
504 : self,
505 : layer_writer: &mut ImageLayerWriter,
506 : ctx: &RequestContext,
507 : ) -> anyhow::Result<usize>;
508 : }
509 :
510 : struct ImportSingleKeyTask {
511 : key: Key,
512 : buf: Bytes,
513 : }
514 :
515 : impl ImportSingleKeyTask {
516 0 : fn new(key: Key, buf: Bytes) -> Self {
517 0 : ImportSingleKeyTask { key, buf }
518 0 : }
519 : }
520 :
521 : impl ImportTask for ImportSingleKeyTask {
522 0 : fn key_range(&self) -> Range<Key> {
523 0 : singleton_range(self.key)
524 0 : }
525 :
526 0 : async fn doit(
527 0 : self,
528 0 : layer_writer: &mut ImageLayerWriter,
529 0 : ctx: &RequestContext,
530 0 : ) -> anyhow::Result<usize> {
531 0 : layer_writer.put_image(self.key, self.buf, ctx).await?;
532 0 : Ok(1)
533 0 : }
534 : }
535 :
536 : struct ImportRelBlocksTask {
537 : shard_identity: ShardIdentity,
538 : key_range: Range<Key>,
539 : path: RemotePath,
540 : storage: RemoteStorageWrapper,
541 : }
542 :
543 : impl ImportRelBlocksTask {
544 0 : fn new(
545 0 : shard_identity: ShardIdentity,
546 0 : key_range: Range<Key>,
547 0 : path: &RemotePath,
548 0 : storage: RemoteStorageWrapper,
549 0 : ) -> Self {
550 0 : ImportRelBlocksTask {
551 0 : shard_identity,
552 0 : key_range,
553 0 : path: path.clone(),
554 0 : storage,
555 0 : }
556 0 : }
557 : }
558 :
559 : impl ImportTask for ImportRelBlocksTask {
560 0 : fn key_range(&self) -> Range<Key> {
561 0 : self.key_range.clone()
562 0 : }
563 :
564 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
565 : async fn doit(
566 : self,
567 : layer_writer: &mut ImageLayerWriter,
568 : ctx: &RequestContext,
569 : ) -> anyhow::Result<usize> {
570 : debug!("Importing relation file");
571 :
572 : let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
573 : let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
574 : assert_eq!(rel_tag, rel_tag_end);
575 :
576 : let ranges = (start_blk..end_blk)
577 : .enumerate()
578 0 : .filter_map(|(i, blknum)| {
579 0 : let key = rel_block_to_key(rel_tag, blknum);
580 0 : if self.shard_identity.is_key_disposable(&key) {
581 0 : return None;
582 0 : }
583 0 : let file_offset = i.checked_mul(8192).unwrap();
584 0 : Some((
585 0 : vec![key],
586 0 : file_offset,
587 0 : file_offset.checked_add(8192).unwrap(),
588 0 : ))
589 0 : })
590 0 : .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
591 0 : assert_eq!(key.len(), 1);
592 0 : assert!(!acc.is_empty());
593 0 : assert!(acc_end > acc_start);
594 0 : if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
595 0 : acc.push(key.pop().unwrap());
596 0 : Ok((acc, acc_start, end))
597 : } else {
598 0 : Err(((acc, acc_start, acc_end), (key, start, end)))
599 : }
600 0 : });
601 :
602 : let mut nimages = 0;
603 : for (keys, range_start, range_end) in ranges {
604 : let range_buf = self
605 : .storage
606 : .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
607 : .await?;
608 : let mut buf = Bytes::from(range_buf);
609 : // TODO: batched writes
610 : for key in keys {
611 : let image = buf.split_to(8192);
612 : layer_writer.put_image(key, image, ctx).await?;
613 : nimages += 1;
614 : }
615 : }
616 :
617 : Ok(nimages)
618 : }
619 : }
620 :
621 : struct ImportSlruBlocksTask {
622 : key_range: Range<Key>,
623 : path: RemotePath,
624 : storage: RemoteStorageWrapper,
625 : }
626 :
627 : impl ImportSlruBlocksTask {
628 0 : fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
629 0 : ImportSlruBlocksTask {
630 0 : key_range,
631 0 : path: path.clone(),
632 0 : storage,
633 0 : }
634 0 : }
635 : }
636 :
637 : impl ImportTask for ImportSlruBlocksTask {
638 0 : fn key_range(&self) -> Range<Key> {
639 0 : self.key_range.clone()
640 0 : }
641 :
642 0 : async fn doit(
643 0 : self,
644 0 : layer_writer: &mut ImageLayerWriter,
645 0 : ctx: &RequestContext,
646 0 : ) -> anyhow::Result<usize> {
647 0 : debug!("Importing SLRU segment file {}", self.path);
648 0 : let buf = self.storage.get(&self.path).await?;
649 :
650 0 : let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
651 0 : let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
652 0 : let mut blknum = start_blk;
653 0 : let mut nimages = 0;
654 0 : let mut file_offset = 0;
655 0 : while blknum < end_blk {
656 0 : let key = slru_block_to_key(kind, segno, blknum);
657 0 : let buf = &buf[file_offset..(file_offset + 8192)];
658 0 : file_offset += 8192;
659 0 : layer_writer
660 0 : .put_image(key, Bytes::copy_from_slice(buf), ctx)
661 0 : .await?;
662 0 : nimages += 1;
663 0 : blknum += 1;
664 : }
665 0 : Ok(nimages)
666 0 : }
667 : }
668 :
669 : enum AnyImportTask {
670 : SingleKey(ImportSingleKeyTask),
671 : RelBlocks(ImportRelBlocksTask),
672 : SlruBlocks(ImportSlruBlocksTask),
673 : }
674 :
675 : impl ImportTask for AnyImportTask {
676 0 : fn key_range(&self) -> Range<Key> {
677 0 : match self {
678 0 : Self::SingleKey(t) => t.key_range(),
679 0 : Self::RelBlocks(t) => t.key_range(),
680 0 : Self::SlruBlocks(t) => t.key_range(),
681 : }
682 0 : }
683 : /// returns the number of images put into the `layer_writer`
684 0 : async fn doit(
685 0 : self,
686 0 : layer_writer: &mut ImageLayerWriter,
687 0 : ctx: &RequestContext,
688 0 : ) -> anyhow::Result<usize> {
689 0 : match self {
690 0 : Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
691 0 : Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
692 0 : Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
693 : }
694 0 : }
695 : }
696 :
697 : impl From<ImportSingleKeyTask> for AnyImportTask {
698 0 : fn from(t: ImportSingleKeyTask) -> Self {
699 0 : Self::SingleKey(t)
700 0 : }
701 : }
702 :
703 : impl From<ImportRelBlocksTask> for AnyImportTask {
704 0 : fn from(t: ImportRelBlocksTask) -> Self {
705 0 : Self::RelBlocks(t)
706 0 : }
707 : }
708 :
709 : impl From<ImportSlruBlocksTask> for AnyImportTask {
710 0 : fn from(t: ImportSlruBlocksTask) -> Self {
711 0 : Self::SlruBlocks(t)
712 0 : }
713 : }
714 :
715 : struct ChunkProcessingJob {
716 : timeline: Arc<Timeline>,
717 : range: Range<Key>,
718 : tasks: Vec<AnyImportTask>,
719 :
720 : pgdata_lsn: Lsn,
721 : }
722 :
723 : impl ChunkProcessingJob {
724 0 : fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
725 0 : assert!(env.pgdata_lsn.is_valid());
726 0 : Self {
727 0 : timeline: env.timeline.clone(),
728 0 : range,
729 0 : tasks,
730 0 : pgdata_lsn: env.pgdata_lsn,
731 0 : }
732 0 : }
733 :
734 0 : async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
735 0 : let mut writer = ImageLayerWriter::new(
736 0 : self.timeline.conf,
737 0 : self.timeline.timeline_id,
738 0 : self.timeline.tenant_shard_id,
739 0 : &self.range,
740 0 : self.pgdata_lsn,
741 0 : ctx,
742 0 : )
743 0 : .await?;
744 :
745 0 : let mut nimages = 0;
746 0 : for task in self.tasks {
747 0 : nimages += task.doit(&mut writer, ctx).await?;
748 : }
749 :
750 0 : let resident_layer = if nimages > 0 {
751 0 : let (desc, path) = writer.finish(ctx).await?;
752 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
753 : } else {
754 : // dropping the writer cleans up
755 0 : return Ok(());
756 : };
757 :
758 : // this is sharing the same code as create_image_layers
759 0 : let mut guard = self.timeline.layers.write().await;
760 0 : guard
761 0 : .open_mut()?
762 0 : .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
763 0 : crate::tenant::timeline::drop_wlock(guard);
764 0 :
765 0 : // Schedule the layer for upload but don't add barriers such as
766 0 : // wait for completion or index upload, so we don't inhibit upload parallelism.
767 0 : // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
768 0 : // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
769 0 : self.timeline
770 0 : .remote_client
771 0 : .schedule_layer_file_upload(resident_layer)?;
772 :
773 0 : Ok(())
774 0 : }
775 : }
|