Line data Source code
1 : //! Import a PGDATA directory into an empty root timeline.
2 : //!
3 : //! This module is adapted hackathon code by Heikki and Stas.
4 : //! Other code in the parent module was written by Christian as part of a customer PoC.
5 : //!
6 : //! The hackathon code was producing image layer files as a free-standing program.
7 : //!
8 : //! It has been modified to
9 : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
10 : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
11 : //! - => S3 as the source for the PGDATA instead of local filesystem
12 : //!
13 : //! TODOs before productionization:
14 : //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
15 : //! => produced image layers likely too small.
16 : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
17 : //! - asserts / unwraps need to be replaced with errors
18 : //! - don't trust remote objects will be small (=prevent OOMs in those cases)
19 : //! - limit all in-memory buffers in size, or download to disk and read from there
20 : //! - limit task concurrency
21 : //! - generally play nice with other tenants in the system
22 : //! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
23 : //! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
24 : //! - integrate with layer eviction system
25 : //! - audit for Tenant::cancel nor Timeline::cancel responsivity
26 : //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
27 : //!
28 : //! An incomplete set of TODOs from the Hackathon:
29 : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
30 :
31 : use std::sync::Arc;
32 :
33 : use anyhow::{bail, ensure};
34 : use bytes::Bytes;
35 :
36 : use itertools::Itertools;
37 : use pageserver_api::{
38 : key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY},
39 : reltag::RelTag,
40 : shard::ShardIdentity,
41 : };
42 : use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, BLCKSZ};
43 : use tokio::task::JoinSet;
44 : use tracing::{debug, info_span, instrument, Instrument};
45 :
46 : use crate::{
47 : assert_u64_eq_usize::UsizeIsU64,
48 : pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory},
49 : };
50 : use crate::{
51 : context::{DownloadBehavior, RequestContext},
52 : pgdatadir_mapping::{DbDirectory, RelDirectory},
53 : task_mgr::TaskKind,
54 : tenant::storage_layer::{ImageLayerWriter, Layer},
55 : };
56 :
57 : use pageserver_api::key::Key;
58 : use pageserver_api::key::{
59 : slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, CHECKPOINT_KEY, CONTROLFILE_KEY,
60 : TWOPHASEDIR_KEY,
61 : };
62 : use pageserver_api::keyspace::singleton_range;
63 : use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range};
64 : use pageserver_api::reltag::SlruKind;
65 : use utils::bin_ser::BeSer;
66 : use utils::lsn::Lsn;
67 :
68 : use std::collections::HashSet;
69 : use std::ops::Range;
70 :
71 : use super::{
72 : importbucket_client::{ControlFile, RemoteStorageWrapper},
73 : Timeline,
74 : };
75 :
76 : use remote_storage::RemotePath;
77 :
78 0 : pub async fn run(
79 0 : timeline: Arc<Timeline>,
80 0 : pgdata_lsn: Lsn,
81 0 : control_file: ControlFile,
82 0 : storage: RemoteStorageWrapper,
83 0 : ctx: &RequestContext,
84 0 : ) -> anyhow::Result<()> {
85 0 : Flow {
86 0 : timeline,
87 0 : pgdata_lsn,
88 0 : control_file,
89 0 : tasks: Vec::new(),
90 0 : storage,
91 0 : }
92 0 : .run(ctx)
93 0 : .await
94 0 : }
95 :
96 : struct Flow {
97 : timeline: Arc<Timeline>,
98 : pgdata_lsn: Lsn,
99 : control_file: ControlFile,
100 : tasks: Vec<AnyImportTask>,
101 : storage: RemoteStorageWrapper,
102 : }
103 :
104 : impl Flow {
105 : /// Perform the ingestion into [`Self::timeline`].
106 : /// Assumes the timeline is empty (= no layers).
107 0 : pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
108 0 : let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
109 0 :
110 0 : self.pgdata_lsn = pgdata_lsn;
111 :
112 0 : let datadir = PgDataDir::new(&self.storage).await?;
113 :
114 : // Import dbdir (00:00:00 keyspace)
115 : // This is just constructed here, but will be written to the image layer in the first call to import_db()
116 0 : let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
117 0 : dbdirs: datadir
118 0 : .dbs
119 0 : .iter()
120 0 : .map(|db| ((db.spcnode, db.dboid), true))
121 0 : .collect(),
122 0 : })?);
123 0 : self.tasks
124 0 : .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
125 :
126 : // Import databases (00:spcnode:dbnode keyspace for each db)
127 0 : for db in datadir.dbs {
128 0 : self.import_db(&db).await?;
129 : }
130 :
131 : // Import SLRUs
132 :
133 : // pg_xact (01:00 keyspace)
134 0 : self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
135 0 : .await?;
136 : // pg_multixact/members (01:01 keyspace)
137 0 : self.import_slru(
138 0 : SlruKind::MultiXactMembers,
139 0 : &self.storage.pgdata().join("pg_multixact/members"),
140 0 : )
141 0 : .await?;
142 : // pg_multixact/offsets (01:02 keyspace)
143 0 : self.import_slru(
144 0 : SlruKind::MultiXactOffsets,
145 0 : &self.storage.pgdata().join("pg_multixact/offsets"),
146 0 : )
147 0 : .await?;
148 :
149 : // Import pg_twophase.
150 : // TODO: as empty
151 0 : let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
152 0 : xids: HashSet::new(),
153 0 : })?;
154 0 : self.tasks
155 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
156 0 : TWOPHASEDIR_KEY,
157 0 : Bytes::from(twophasedir_buf),
158 0 : )));
159 0 :
160 0 : // Controlfile, checkpoint
161 0 : self.tasks
162 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
163 0 : CONTROLFILE_KEY,
164 0 : self.control_file.control_file_buf().clone(),
165 0 : )));
166 :
167 0 : let checkpoint_buf = self
168 0 : .control_file
169 0 : .control_file_data()
170 0 : .checkPointCopy
171 0 : .encode()?;
172 0 : self.tasks
173 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
174 0 : CHECKPOINT_KEY,
175 0 : checkpoint_buf,
176 0 : )));
177 0 :
178 0 : // Assigns parts of key space to later parallel jobs
179 0 : let mut last_end_key = Key::MIN;
180 0 : let mut current_chunk = Vec::new();
181 0 : let mut current_chunk_size: usize = 0;
182 0 : let mut parallel_jobs = Vec::new();
183 0 : for task in std::mem::take(&mut self.tasks).into_iter() {
184 0 : if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
185 0 : let key_range = last_end_key..task.key_range().start;
186 0 : parallel_jobs.push(ChunkProcessingJob::new(
187 0 : key_range.clone(),
188 0 : std::mem::take(&mut current_chunk),
189 0 : &self,
190 0 : ));
191 0 : last_end_key = key_range.end;
192 0 : current_chunk_size = 0;
193 0 : }
194 0 : current_chunk_size += task.total_size();
195 0 : current_chunk.push(task);
196 : }
197 0 : parallel_jobs.push(ChunkProcessingJob::new(
198 0 : last_end_key..Key::MAX,
199 0 : current_chunk,
200 0 : &self,
201 0 : ));
202 0 :
203 0 : // Start all jobs simultaneosly
204 0 : let mut work = JoinSet::new();
205 : // TODO: semaphore?
206 0 : for job in parallel_jobs {
207 0 : let ctx: RequestContext =
208 0 : ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
209 0 : work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
210 : }
211 0 : let mut results = Vec::new();
212 0 : while let Some(result) = work.join_next().await {
213 0 : match result {
214 0 : Ok(res) => {
215 0 : results.push(res);
216 0 : }
217 0 : Err(_joinset_err) => {
218 0 : results.push(Err(anyhow::anyhow!(
219 0 : "parallel job panicked or cancelled, check pageserver logs"
220 0 : )));
221 0 : }
222 : }
223 : }
224 :
225 0 : if results.iter().all(|r| r.is_ok()) {
226 0 : Ok(())
227 : } else {
228 0 : let mut msg = String::new();
229 0 : for result in results {
230 0 : if let Err(err) = result {
231 0 : msg.push_str(&format!("{err:?}\n\n"));
232 0 : }
233 : }
234 0 : bail!("Some parallel jobs failed:\n\n{msg}");
235 : }
236 0 : }
237 :
238 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
239 : async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
240 : debug!("start");
241 : scopeguard::defer! {
242 : debug!("return");
243 : }
244 :
245 : // Import relmap (00:spcnode:dbnode:00:*:00)
246 : let relmap_key = relmap_file_key(db.spcnode, db.dboid);
247 : debug!("Constructing relmap entry, key {relmap_key}");
248 : let relmap_path = db.path.join("pg_filenode.map");
249 : let relmap_buf = self.storage.get(&relmap_path).await?;
250 : self.tasks
251 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
252 : relmap_key, relmap_buf,
253 : )));
254 :
255 : // Import reldir (00:spcnode:dbnode:00:*:01)
256 : let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
257 : debug!("Constructing reldirs entry, key {reldir_key}");
258 : let reldir_buf = RelDirectory::ser(&RelDirectory {
259 : rels: db
260 : .files
261 : .iter()
262 0 : .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
263 : .collect(),
264 : })?;
265 : self.tasks
266 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
267 : reldir_key,
268 : Bytes::from(reldir_buf),
269 : )));
270 :
271 : // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
272 : // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
273 : for file in &db.files {
274 : debug!(%file.path, %file.filesize, "importing file");
275 : let len = file.filesize;
276 : ensure!(len % 8192 == 0);
277 : let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
278 : let start_key = rel_block_to_key(file.rel_tag, start_blk);
279 : let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
280 : self.tasks
281 : .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
282 : *self.timeline.get_shard_identity(),
283 : start_key..end_key,
284 : &file.path,
285 : self.storage.clone(),
286 : )));
287 :
288 : // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
289 : if let Some(nblocks) = file.nblocks {
290 : let size_key = rel_size_to_key(file.rel_tag);
291 : //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
292 : let buf = nblocks.to_le_bytes();
293 : self.tasks
294 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
295 : size_key,
296 : Bytes::from(buf.to_vec()),
297 : )));
298 : }
299 : }
300 :
301 : Ok(())
302 : }
303 :
304 0 : async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
305 0 : let segments = self.storage.listfilesindir(path).await?;
306 0 : let segments: Vec<(String, u32, usize)> = segments
307 0 : .into_iter()
308 0 : .filter_map(|(path, size)| {
309 0 : let filename = path.object_name()?;
310 0 : let segno = u32::from_str_radix(filename, 16).ok()?;
311 0 : Some((filename.to_string(), segno, size))
312 0 : })
313 0 : .collect();
314 0 :
315 0 : // Write SlruDir
316 0 : let slrudir_key = slru_dir_to_key(kind);
317 0 : let segnos: HashSet<u32> = segments
318 0 : .iter()
319 0 : .map(|(_path, segno, _size)| *segno)
320 0 : .collect();
321 0 : let slrudir = SlruSegmentDirectory { segments: segnos };
322 0 : let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
323 0 : self.tasks
324 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
325 0 : slrudir_key,
326 0 : Bytes::from(slrudir_buf),
327 0 : )));
328 :
329 0 : for (segpath, segno, size) in segments {
330 : // SlruSegBlocks for each segment
331 0 : let p = path.join(&segpath);
332 0 : let file_size = size;
333 0 : ensure!(file_size % 8192 == 0);
334 0 : let nblocks = u32::try_from(file_size / 8192)?;
335 0 : let start_key = slru_block_to_key(kind, segno, 0);
336 0 : let end_key = slru_block_to_key(kind, segno, nblocks);
337 0 : debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
338 0 : self.tasks
339 0 : .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
340 0 : *self.timeline.get_shard_identity(),
341 0 : start_key..end_key,
342 0 : &p,
343 0 : self.storage.clone(),
344 0 : )));
345 0 :
346 0 : // Followed by SlruSegSize
347 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
348 0 : let segsize_buf = nblocks.to_le_bytes();
349 0 : self.tasks
350 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
351 0 : segsize_key,
352 0 : Bytes::copy_from_slice(&segsize_buf),
353 0 : )));
354 : }
355 0 : Ok(())
356 0 : }
357 : }
358 :
359 : //
360 : // dbdir iteration tools
361 : //
362 :
363 : struct PgDataDir {
364 : pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
365 : }
366 :
367 : struct PgDataDirDb {
368 : pub spcnode: u32,
369 : pub dboid: u32,
370 : pub path: RemotePath,
371 : pub files: Vec<PgDataDirDbFile>,
372 : }
373 :
374 : struct PgDataDirDbFile {
375 : pub path: RemotePath,
376 : pub rel_tag: RelTag,
377 : pub segno: u32,
378 : pub filesize: usize,
379 : // Cummulative size of the given fork, set only for the last segment of that fork
380 : pub nblocks: Option<usize>,
381 : }
382 :
383 : impl PgDataDir {
384 0 : async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
385 0 : let datadir_path = storage.pgdata();
386 0 : // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
387 0 : // Traverse database in increasing oid order
388 0 :
389 0 : let basedir = &datadir_path.join("base");
390 0 : let db_oids: Vec<_> = storage
391 0 : .listdir(basedir)
392 0 : .await?
393 0 : .into_iter()
394 0 : .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
395 0 : .sorted()
396 0 : .collect();
397 0 : debug!(?db_oids, "found databases");
398 0 : let mut databases = Vec::new();
399 0 : for dboid in db_oids {
400 0 : databases.push(
401 0 : PgDataDirDb::new(
402 0 : storage,
403 0 : &basedir.join(dboid.to_string()),
404 0 : pg_constants::DEFAULTTABLESPACE_OID,
405 0 : dboid,
406 0 : &datadir_path,
407 0 : )
408 0 : .await?,
409 : );
410 : }
411 :
412 : // special case for global catalogs
413 0 : databases.push(
414 0 : PgDataDirDb::new(
415 0 : storage,
416 0 : &datadir_path.join("global"),
417 0 : postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
418 0 : 0,
419 0 : &datadir_path,
420 0 : )
421 0 : .await?,
422 : );
423 :
424 0 : databases.sort_by_key(|db| (db.spcnode, db.dboid));
425 0 :
426 0 : Ok(Self { dbs: databases })
427 0 : }
428 : }
429 :
430 : impl PgDataDirDb {
431 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
432 : async fn new(
433 : storage: &RemoteStorageWrapper,
434 : db_path: &RemotePath,
435 : spcnode: u32,
436 : dboid: u32,
437 : datadir_path: &RemotePath,
438 : ) -> anyhow::Result<Self> {
439 : let mut files: Vec<PgDataDirDbFile> = storage
440 : .listfilesindir(db_path)
441 : .await?
442 : .into_iter()
443 0 : .filter_map(|(path, size)| {
444 0 : debug!(%path, %size, "found file in dbdir");
445 0 : path.object_name().and_then(|name| {
446 0 : // returns (relnode, forknum, segno)
447 0 : parse_relfilename(name).ok().map(|x| (size, x))
448 0 : })
449 0 : })
450 0 : .sorted_by_key(|(_, relfilename)| *relfilename)
451 0 : .map(|(filesize, (relnode, forknum, segno))| {
452 0 : let rel_tag = RelTag {
453 0 : spcnode,
454 0 : dbnode: dboid,
455 0 : relnode,
456 0 : forknum,
457 0 : };
458 0 :
459 0 : let path = datadir_path.join(rel_tag.to_segfile_name(segno));
460 0 : assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
461 0 : let nblocks = filesize / BLCKSZ as usize;
462 0 :
463 0 : PgDataDirDbFile {
464 0 : path,
465 0 : filesize,
466 0 : rel_tag,
467 0 : segno,
468 0 : nblocks: Some(nblocks), // first non-cummulative sizes
469 0 : }
470 0 : })
471 : .collect();
472 :
473 : // Set cummulative sizes. Do all of that math here, so that later we could easier
474 : // parallelize over segments and know with which segments we need to write relsize
475 : // entry.
476 : let mut cumulative_nblocks: usize = 0;
477 : let mut prev_rel_tag: Option<RelTag> = None;
478 : for i in 0..files.len() {
479 : if prev_rel_tag == Some(files[i].rel_tag) {
480 : cumulative_nblocks += files[i].nblocks.unwrap();
481 : } else {
482 : cumulative_nblocks = files[i].nblocks.unwrap();
483 : }
484 :
485 : files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
486 : Some(cumulative_nblocks)
487 : } else {
488 : None
489 : };
490 :
491 : prev_rel_tag = Some(files[i].rel_tag);
492 : }
493 :
494 : Ok(PgDataDirDb {
495 : files,
496 : path: db_path.clone(),
497 : spcnode,
498 : dboid,
499 : })
500 : }
501 : }
502 :
503 : trait ImportTask {
504 : fn key_range(&self) -> Range<Key>;
505 :
506 0 : fn total_size(&self) -> usize {
507 0 : // TODO: revisit this
508 0 : if is_contiguous_range(&self.key_range()) {
509 0 : contiguous_range_len(&self.key_range()) as usize * 8192
510 : } else {
511 0 : u32::MAX as usize
512 : }
513 0 : }
514 :
515 : async fn doit(
516 : self,
517 : layer_writer: &mut ImageLayerWriter,
518 : ctx: &RequestContext,
519 : ) -> anyhow::Result<usize>;
520 : }
521 :
522 : struct ImportSingleKeyTask {
523 : key: Key,
524 : buf: Bytes,
525 : }
526 :
527 : impl ImportSingleKeyTask {
528 0 : fn new(key: Key, buf: Bytes) -> Self {
529 0 : ImportSingleKeyTask { key, buf }
530 0 : }
531 : }
532 :
533 : impl ImportTask for ImportSingleKeyTask {
534 0 : fn key_range(&self) -> Range<Key> {
535 0 : singleton_range(self.key)
536 0 : }
537 :
538 0 : async fn doit(
539 0 : self,
540 0 : layer_writer: &mut ImageLayerWriter,
541 0 : ctx: &RequestContext,
542 0 : ) -> anyhow::Result<usize> {
543 0 : layer_writer.put_image(self.key, self.buf, ctx).await?;
544 0 : Ok(1)
545 0 : }
546 : }
547 :
548 : struct ImportRelBlocksTask {
549 : shard_identity: ShardIdentity,
550 : key_range: Range<Key>,
551 : path: RemotePath,
552 : storage: RemoteStorageWrapper,
553 : }
554 :
555 : impl ImportRelBlocksTask {
556 0 : fn new(
557 0 : shard_identity: ShardIdentity,
558 0 : key_range: Range<Key>,
559 0 : path: &RemotePath,
560 0 : storage: RemoteStorageWrapper,
561 0 : ) -> Self {
562 0 : ImportRelBlocksTask {
563 0 : shard_identity,
564 0 : key_range,
565 0 : path: path.clone(),
566 0 : storage,
567 0 : }
568 0 : }
569 : }
570 :
571 : impl ImportTask for ImportRelBlocksTask {
572 0 : fn key_range(&self) -> Range<Key> {
573 0 : self.key_range.clone()
574 0 : }
575 :
576 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
577 : async fn doit(
578 : self,
579 : layer_writer: &mut ImageLayerWriter,
580 : ctx: &RequestContext,
581 : ) -> anyhow::Result<usize> {
582 : debug!("Importing relation file");
583 :
584 : let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
585 : let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
586 : assert_eq!(rel_tag, rel_tag_end);
587 :
588 : let ranges = (start_blk..end_blk)
589 : .enumerate()
590 0 : .filter_map(|(i, blknum)| {
591 0 : let key = rel_block_to_key(rel_tag, blknum);
592 0 : if self.shard_identity.is_key_disposable(&key) {
593 0 : return None;
594 0 : }
595 0 : let file_offset = i.checked_mul(8192).unwrap();
596 0 : Some((
597 0 : vec![key],
598 0 : file_offset,
599 0 : file_offset.checked_add(8192).unwrap(),
600 0 : ))
601 0 : })
602 0 : .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
603 0 : assert_eq!(key.len(), 1);
604 0 : assert!(!acc.is_empty());
605 0 : assert!(acc_end > acc_start);
606 0 : if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
607 0 : acc.push(key.pop().unwrap());
608 0 : Ok((acc, acc_start, end))
609 : } else {
610 0 : Err(((acc, acc_start, acc_end), (key, start, end)))
611 : }
612 0 : });
613 :
614 : let mut nimages = 0;
615 : for (keys, range_start, range_end) in ranges {
616 : let range_buf = self
617 : .storage
618 : .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
619 : .await?;
620 : let mut buf = Bytes::from(range_buf);
621 : // TODO: batched writes
622 : for key in keys {
623 : let image = buf.split_to(8192);
624 : layer_writer.put_image(key, image, ctx).await?;
625 : nimages += 1;
626 : }
627 : }
628 :
629 : Ok(nimages)
630 : }
631 : }
632 :
633 : struct ImportSlruBlocksTask {
634 : shard_identity: ShardIdentity,
635 : key_range: Range<Key>,
636 : path: RemotePath,
637 : storage: RemoteStorageWrapper,
638 : }
639 :
640 : impl ImportSlruBlocksTask {
641 0 : fn new(
642 0 : shard_identity: ShardIdentity,
643 0 : key_range: Range<Key>,
644 0 : path: &RemotePath,
645 0 : storage: RemoteStorageWrapper,
646 0 : ) -> Self {
647 0 : ImportSlruBlocksTask {
648 0 : shard_identity,
649 0 : key_range,
650 0 : path: path.clone(),
651 0 : storage,
652 0 : }
653 0 : }
654 : }
655 :
656 : impl ImportTask for ImportSlruBlocksTask {
657 0 : fn key_range(&self) -> Range<Key> {
658 0 : self.key_range.clone()
659 0 : }
660 :
661 0 : async fn doit(
662 0 : self,
663 0 : layer_writer: &mut ImageLayerWriter,
664 0 : ctx: &RequestContext,
665 0 : ) -> anyhow::Result<usize> {
666 0 : debug!("Importing SLRU segment file {}", self.path);
667 0 : let buf = self.storage.get(&self.path).await?;
668 :
669 0 : let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
670 0 : let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
671 0 : let mut blknum = start_blk;
672 0 : let mut nimages = 0;
673 0 : let mut file_offset = 0;
674 0 : while blknum < end_blk {
675 0 : let key = slru_block_to_key(kind, segno, blknum);
676 0 : assert!(
677 0 : !self.shard_identity.is_key_disposable(&key),
678 0 : "SLRU keys need to go into every shard"
679 : );
680 0 : let buf = &buf[file_offset..(file_offset + 8192)];
681 0 : file_offset += 8192;
682 0 : layer_writer
683 0 : .put_image(key, Bytes::copy_from_slice(buf), ctx)
684 0 : .await?;
685 0 : blknum += 1;
686 0 : nimages += 1;
687 : }
688 0 : Ok(nimages)
689 0 : }
690 : }
691 :
692 : enum AnyImportTask {
693 : SingleKey(ImportSingleKeyTask),
694 : RelBlocks(ImportRelBlocksTask),
695 : SlruBlocks(ImportSlruBlocksTask),
696 : }
697 :
698 : impl ImportTask for AnyImportTask {
699 0 : fn key_range(&self) -> Range<Key> {
700 0 : match self {
701 0 : Self::SingleKey(t) => t.key_range(),
702 0 : Self::RelBlocks(t) => t.key_range(),
703 0 : Self::SlruBlocks(t) => t.key_range(),
704 : }
705 0 : }
706 : /// returns the number of images put into the `layer_writer`
707 0 : async fn doit(
708 0 : self,
709 0 : layer_writer: &mut ImageLayerWriter,
710 0 : ctx: &RequestContext,
711 0 : ) -> anyhow::Result<usize> {
712 0 : match self {
713 0 : Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
714 0 : Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
715 0 : Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
716 : }
717 0 : }
718 : }
719 :
720 : impl From<ImportSingleKeyTask> for AnyImportTask {
721 0 : fn from(t: ImportSingleKeyTask) -> Self {
722 0 : Self::SingleKey(t)
723 0 : }
724 : }
725 :
726 : impl From<ImportRelBlocksTask> for AnyImportTask {
727 0 : fn from(t: ImportRelBlocksTask) -> Self {
728 0 : Self::RelBlocks(t)
729 0 : }
730 : }
731 :
732 : impl From<ImportSlruBlocksTask> for AnyImportTask {
733 0 : fn from(t: ImportSlruBlocksTask) -> Self {
734 0 : Self::SlruBlocks(t)
735 0 : }
736 : }
737 :
738 : struct ChunkProcessingJob {
739 : timeline: Arc<Timeline>,
740 : range: Range<Key>,
741 : tasks: Vec<AnyImportTask>,
742 :
743 : pgdata_lsn: Lsn,
744 : }
745 :
746 : impl ChunkProcessingJob {
747 0 : fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
748 0 : assert!(env.pgdata_lsn.is_valid());
749 0 : Self {
750 0 : timeline: env.timeline.clone(),
751 0 : range,
752 0 : tasks,
753 0 : pgdata_lsn: env.pgdata_lsn,
754 0 : }
755 0 : }
756 :
757 0 : async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
758 0 : let mut writer = ImageLayerWriter::new(
759 0 : self.timeline.conf,
760 0 : self.timeline.timeline_id,
761 0 : self.timeline.tenant_shard_id,
762 0 : &self.range,
763 0 : self.pgdata_lsn,
764 0 : ctx,
765 0 : )
766 0 : .await?;
767 :
768 0 : let mut nimages = 0;
769 0 : for task in self.tasks {
770 0 : nimages += task.doit(&mut writer, ctx).await?;
771 : }
772 :
773 0 : let resident_layer = if nimages > 0 {
774 0 : let (desc, path) = writer.finish(ctx).await?;
775 0 : Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
776 : } else {
777 : // dropping the writer cleans up
778 0 : return Ok(());
779 : };
780 :
781 : // this is sharing the same code as create_image_layers
782 0 : let mut guard = self.timeline.layers.write().await;
783 0 : guard
784 0 : .open_mut()?
785 0 : .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
786 0 : crate::tenant::timeline::drop_wlock(guard);
787 0 :
788 0 : // Schedule the layer for upload but don't add barriers such as
789 0 : // wait for completion or index upload, so we don't inhibit upload parallelism.
790 0 : // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
791 0 : // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
792 0 : self.timeline
793 0 : .remote_client
794 0 : .schedule_layer_file_upload(resident_layer)?;
795 :
796 0 : Ok(())
797 0 : }
798 : }
|