Line data Source code
1 : //! Import a PGDATA directory into an empty root timeline.
2 : //!
3 : //! This module is adapted hackathon code by Heikki and Stas.
4 : //! Other code in the parent module was written by Christian as part of a customer PoC.
5 : //!
6 : //! The hackathon code was producing image layer files as a free-standing program.
7 : //!
8 : //! It has been modified to
9 : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
10 : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
11 : //! - => S3 as the source for the PGDATA instead of local filesystem
12 : //!
13 : //! TODOs before productionization:
14 : //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
15 : //! => produced image layers likely too small.
16 : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
17 : //! - asserts / unwraps need to be replaced with errors
18 : //! - don't trust remote objects will be small (=prevent OOMs in those cases)
19 : //! - limit all in-memory buffers in size, or download to disk and read from there
20 : //! - limit task concurrency
21 : //! - generally play nice with other tenants in the system
22 : //! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
23 : //! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
24 : //! - integrate with layer eviction system
25 : //! - audit for Tenant::cancel nor Timeline::cancel responsivity
26 : //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
27 : //!
28 : //! An incomplete set of TODOs from the Hackathon:
29 : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
30 :
31 : use std::collections::HashSet;
32 : use std::hash::{Hash, Hasher};
33 : use std::ops::Range;
34 : use std::sync::Arc;
35 :
36 : use anyhow::ensure;
37 : use bytes::Bytes;
38 : use futures::stream::FuturesOrdered;
39 : use itertools::Itertools;
40 : use pageserver_api::config::TimelineImportConfig;
41 : use pageserver_api::key::{
42 : CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
43 : rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
44 : slru_segment_size_to_key,
45 : };
46 : use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range};
47 : use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
48 : use pageserver_api::reltag::{RelTag, SlruKind};
49 : use pageserver_api::shard::ShardIdentity;
50 : use postgres_ffi::relfile_utils::parse_relfilename;
51 : use postgres_ffi::{BLCKSZ, pg_constants};
52 : use remote_storage::RemotePath;
53 : use tokio::sync::Semaphore;
54 : use tokio_stream::StreamExt;
55 : use tracing::{debug, instrument};
56 : use utils::bin_ser::BeSer;
57 : use utils::lsn::Lsn;
58 : use utils::pausable_failpoint;
59 :
60 : use super::Timeline;
61 : use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
62 : use crate::assert_u64_eq_usize::UsizeIsU64;
63 : use crate::context::{DownloadBehavior, RequestContext};
64 : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
65 : use crate::pgdatadir_mapping::{
66 : DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory,
67 : };
68 : use crate::task_mgr::TaskKind;
69 : use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
70 :
71 0 : pub async fn run(
72 0 : timeline: Arc<Timeline>,
73 0 : control_file: ControlFile,
74 0 : storage: RemoteStorageWrapper,
75 0 : import_progress: Option<ShardImportProgress>,
76 0 : ctx: &RequestContext,
77 0 : ) -> anyhow::Result<()> {
78 0 : // Match how we run the import based on the progress version.
79 0 : // If there's no import progress, it means that this is a new import
80 0 : // and we can use whichever version we want.
81 0 : match import_progress {
82 0 : Some(ShardImportProgress::V1(progress)) => {
83 0 : run_v1(timeline, control_file, storage, Some(progress), ctx).await
84 : }
85 0 : None => run_v1(timeline, control_file, storage, None, ctx).await,
86 : }
87 0 : }
88 :
89 0 : async fn run_v1(
90 0 : timeline: Arc<Timeline>,
91 0 : control_file: ControlFile,
92 0 : storage: RemoteStorageWrapper,
93 0 : import_progress: Option<ShardImportProgressV1>,
94 0 : ctx: &RequestContext,
95 0 : ) -> anyhow::Result<()> {
96 0 : let planner = Planner {
97 0 : control_file,
98 0 : storage: storage.clone(),
99 0 : shard: timeline.shard_identity,
100 0 : tasks: Vec::default(),
101 0 : };
102 0 :
103 0 : let import_config = &timeline.conf.timeline_import_config;
104 0 : let plan = planner.plan(import_config).await?;
105 :
106 : // Hash the plan and compare with the hash of the plan we got back from the storage controller.
107 : // If the two match, it means that the planning stage had the same output.
108 : //
109 : // This is not intended to be a cryptographically secure hash.
110 : const SEED: u64 = 42;
111 0 : let mut hasher = twox_hash::XxHash64::with_seed(SEED);
112 0 : plan.hash(&mut hasher);
113 0 : let plan_hash = hasher.finish();
114 :
115 0 : if let Some(progress) = &import_progress {
116 : // Handle collisions on jobs of unequal length
117 0 : if progress.jobs != plan.jobs.len() {
118 0 : anyhow::bail!("Import plan job length does not match storcon metadata")
119 0 : }
120 0 :
121 0 : if plan_hash != progress.import_plan_hash {
122 0 : anyhow::bail!("Import plan does not match storcon metadata");
123 0 : }
124 0 : }
125 :
126 0 : pausable_failpoint!("import-timeline-pre-execute-pausable");
127 :
128 0 : let start_from_job_idx = import_progress.map(|progress| progress.completed);
129 0 : plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
130 0 : .await
131 0 : }
132 :
133 : struct Planner {
134 : control_file: ControlFile,
135 : storage: RemoteStorageWrapper,
136 : shard: ShardIdentity,
137 : tasks: Vec<AnyImportTask>,
138 : }
139 :
140 : #[derive(Hash)]
141 : struct Plan {
142 : jobs: Vec<ChunkProcessingJob>,
143 : // Included here such that it ends up in the hash for the plan
144 : shard: ShardIdentity,
145 : }
146 :
147 : impl Planner {
148 : /// Creates an import plan
149 : ///
150 : /// This function is and must remain pure: given the same input, it will generate the same import plan.
151 0 : async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
152 0 : let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
153 :
154 0 : let datadir = PgDataDir::new(&self.storage).await?;
155 :
156 : // Import dbdir (00:00:00 keyspace)
157 : // This is just constructed here, but will be written to the image layer in the first call to import_db()
158 0 : let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
159 0 : dbdirs: datadir
160 0 : .dbs
161 0 : .iter()
162 0 : .map(|db| ((db.spcnode, db.dboid), true))
163 0 : .collect(),
164 0 : })?);
165 0 : self.tasks
166 0 : .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
167 :
168 : // Import databases (00:spcnode:dbnode keyspace for each db)
169 0 : for db in datadir.dbs {
170 0 : self.import_db(&db).await?;
171 : }
172 :
173 : // Import SLRUs
174 0 : if self.shard.is_shard_zero() {
175 : // pg_xact (01:00 keyspace)
176 0 : self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
177 0 : .await?;
178 : // pg_multixact/members (01:01 keyspace)
179 0 : self.import_slru(
180 0 : SlruKind::MultiXactMembers,
181 0 : &self.storage.pgdata().join("pg_multixact/members"),
182 0 : )
183 0 : .await?;
184 : // pg_multixact/offsets (01:02 keyspace)
185 0 : self.import_slru(
186 0 : SlruKind::MultiXactOffsets,
187 0 : &self.storage.pgdata().join("pg_multixact/offsets"),
188 0 : )
189 0 : .await?;
190 0 : }
191 :
192 : // Import pg_twophase.
193 : // TODO: as empty
194 0 : let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
195 0 : xids: HashSet::new(),
196 0 : })?;
197 0 : self.tasks
198 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
199 0 : TWOPHASEDIR_KEY,
200 0 : Bytes::from(twophasedir_buf),
201 0 : )));
202 0 :
203 0 : // Controlfile, checkpoint
204 0 : self.tasks
205 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
206 0 : CONTROLFILE_KEY,
207 0 : self.control_file.control_file_buf().clone(),
208 0 : )));
209 :
210 0 : let checkpoint_buf = self
211 0 : .control_file
212 0 : .control_file_data()
213 0 : .checkPointCopy
214 0 : .encode()?;
215 0 : self.tasks
216 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
217 0 : CHECKPOINT_KEY,
218 0 : checkpoint_buf,
219 0 : )));
220 0 :
221 0 : // Sort the tasks by the key ranges they handle.
222 0 : // The plan being generated here needs to be stable across invocations
223 0 : // of this method.
224 0 : self.tasks.sort_by_key(|task| match task {
225 0 : AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
226 0 : AnyImportTask::RelBlocks(rel_blocks) => {
227 0 : (rel_blocks.key_range.start, rel_blocks.key_range.end)
228 : }
229 0 : AnyImportTask::SlruBlocks(slru_blocks) => {
230 0 : (slru_blocks.key_range.start, slru_blocks.key_range.end)
231 : }
232 0 : });
233 0 :
234 0 : // Assigns parts of key space to later parallel jobs
235 0 : let mut last_end_key = Key::MIN;
236 0 : let mut current_chunk = Vec::new();
237 0 : let mut current_chunk_size: usize = 0;
238 0 : let mut jobs = Vec::new();
239 0 : for task in std::mem::take(&mut self.tasks).into_iter() {
240 0 : if current_chunk_size + task.total_size()
241 0 : > import_config.import_job_soft_size_limit.into()
242 0 : {
243 0 : let key_range = last_end_key..task.key_range().start;
244 0 : jobs.push(ChunkProcessingJob::new(
245 0 : key_range.clone(),
246 0 : std::mem::take(&mut current_chunk),
247 0 : pgdata_lsn,
248 0 : ));
249 0 : last_end_key = key_range.end;
250 0 : current_chunk_size = 0;
251 0 : }
252 0 : current_chunk_size += task.total_size();
253 0 : current_chunk.push(task);
254 : }
255 0 : jobs.push(ChunkProcessingJob::new(
256 0 : last_end_key..Key::MAX,
257 0 : current_chunk,
258 0 : pgdata_lsn,
259 0 : ));
260 0 :
261 0 : Ok(Plan {
262 0 : jobs,
263 0 : shard: self.shard,
264 0 : })
265 0 : }
266 :
267 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
268 : async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
269 : debug!("start");
270 : scopeguard::defer! {
271 : debug!("return");
272 : }
273 :
274 : // Import relmap (00:spcnode:dbnode:00:*:00)
275 : let relmap_key = relmap_file_key(db.spcnode, db.dboid);
276 : debug!("Constructing relmap entry, key {relmap_key}");
277 : let relmap_path = db.path.join("pg_filenode.map");
278 : let relmap_buf = self.storage.get(&relmap_path).await?;
279 : self.tasks
280 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
281 : relmap_key, relmap_buf,
282 : )));
283 :
284 : // Import reldir (00:spcnode:dbnode:00:*:01)
285 : let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
286 : debug!("Constructing reldirs entry, key {reldir_key}");
287 : let reldir_buf = RelDirectory::ser(&RelDirectory {
288 : rels: db
289 : .files
290 : .iter()
291 0 : .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
292 : .collect(),
293 : })?;
294 : self.tasks
295 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
296 : reldir_key,
297 : Bytes::from(reldir_buf),
298 : )));
299 :
300 : // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
301 : // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
302 : for file in &db.files {
303 : debug!(%file.path, %file.filesize, "importing file");
304 : let len = file.filesize;
305 : ensure!(len % 8192 == 0);
306 : let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
307 : let start_key = rel_block_to_key(file.rel_tag, start_blk);
308 : let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
309 : self.tasks
310 : .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
311 : self.shard,
312 : start_key..end_key,
313 : &file.path,
314 : self.storage.clone(),
315 : )));
316 :
317 : // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
318 : if let Some(nblocks) = file.nblocks {
319 : let size_key = rel_size_to_key(file.rel_tag);
320 : //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
321 : let buf = nblocks.to_le_bytes();
322 : self.tasks
323 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
324 : size_key,
325 : Bytes::from(buf.to_vec()),
326 : )));
327 : }
328 : }
329 :
330 : Ok(())
331 : }
332 :
333 0 : async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
334 0 : assert!(self.shard.is_shard_zero());
335 :
336 0 : let segments = self.storage.listfilesindir(path).await?;
337 0 : let segments: Vec<(String, u32, usize)> = segments
338 0 : .into_iter()
339 0 : .filter_map(|(path, size)| {
340 0 : let filename = path.object_name()?;
341 0 : let segno = u32::from_str_radix(filename, 16).ok()?;
342 0 : Some((filename.to_string(), segno, size))
343 0 : })
344 0 : .collect();
345 0 :
346 0 : // Write SlruDir
347 0 : let slrudir_key = slru_dir_to_key(kind);
348 0 : let segnos: HashSet<u32> = segments
349 0 : .iter()
350 0 : .map(|(_path, segno, _size)| *segno)
351 0 : .collect();
352 0 : let slrudir = SlruSegmentDirectory { segments: segnos };
353 0 : let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
354 0 : self.tasks
355 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
356 0 : slrudir_key,
357 0 : Bytes::from(slrudir_buf),
358 0 : )));
359 :
360 0 : for (segpath, segno, size) in segments {
361 : // SlruSegBlocks for each segment
362 0 : let p = path.join(&segpath);
363 0 : let file_size = size;
364 0 : ensure!(file_size % 8192 == 0);
365 0 : let nblocks = u32::try_from(file_size / 8192)?;
366 0 : let start_key = slru_block_to_key(kind, segno, 0);
367 0 : let end_key = slru_block_to_key(kind, segno, nblocks);
368 0 : debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
369 0 : self.tasks
370 0 : .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
371 0 : start_key..end_key,
372 0 : &p,
373 0 : self.storage.clone(),
374 0 : )));
375 0 :
376 0 : // Followed by SlruSegSize
377 0 : let segsize_key = slru_segment_size_to_key(kind, segno);
378 0 : let segsize_buf = nblocks.to_le_bytes();
379 0 : self.tasks
380 0 : .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
381 0 : segsize_key,
382 0 : Bytes::copy_from_slice(&segsize_buf),
383 0 : )));
384 : }
385 0 : Ok(())
386 0 : }
387 : }
388 :
389 : impl Plan {
390 0 : async fn execute(
391 0 : self,
392 0 : timeline: Arc<Timeline>,
393 0 : start_after_job_idx: Option<usize>,
394 0 : import_plan_hash: u64,
395 0 : import_config: &TimelineImportConfig,
396 0 : ctx: &RequestContext,
397 0 : ) -> anyhow::Result<()> {
398 0 : let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &timeline.cancel);
399 0 :
400 0 : let mut work = FuturesOrdered::new();
401 0 : let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
402 0 :
403 0 : let jobs_in_plan = self.jobs.len();
404 0 :
405 0 : let mut jobs = self
406 0 : .jobs
407 0 : .into_iter()
408 0 : .enumerate()
409 0 : .map(|(idx, job)| (idx + 1, job))
410 0 : .filter(|(idx, _job)| {
411 : // Filter out any jobs that have been done already
412 0 : if let Some(start_after) = start_after_job_idx {
413 0 : *idx > start_after
414 : } else {
415 0 : true
416 : }
417 0 : })
418 0 : .peekable();
419 0 :
420 0 : let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0);
421 0 : let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into();
422 :
423 : // Run import jobs concurrently up to the limit specified by the pageserver configuration.
424 : // Note that we process completed futures in the oreder of insertion. This will be the
425 : // building block for resuming imports across pageserver restarts or tenant migrations.
426 0 : while last_completed_job_idx < jobs_in_plan {
427 0 : tokio::select! {
428 0 : permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
429 0 : let permit = permit.expect("never closed");
430 0 : let (job_idx, job) = jobs.next().expect("we peeked");
431 0 :
432 0 : let job_timeline = timeline.clone();
433 0 : let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
434 0 :
435 0 : work.push_back(tokio::task::spawn(async move {
436 0 : let _permit = permit;
437 0 : let res = job.run(job_timeline, &ctx).await;
438 0 : (job_idx, res)
439 0 : }));
440 0 : },
441 0 : maybe_complete_job_idx = work.next() => {
442 0 : pausable_failpoint!("import-task-complete-pausable");
443 :
444 0 : match maybe_complete_job_idx {
445 0 : Some(Ok((job_idx, res))) => {
446 0 : assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
447 :
448 0 : res?;
449 0 : last_completed_job_idx = job_idx;
450 0 :
451 0 : if last_completed_job_idx % checkpoint_every == 0 {
452 0 : let progress = ShardImportProgressV1 {
453 0 : jobs: jobs_in_plan,
454 0 : completed: last_completed_job_idx,
455 0 : import_plan_hash,
456 0 : };
457 0 :
458 0 : timeline.remote_client.schedule_index_upload_for_file_changes()?;
459 0 : timeline.remote_client.wait_completion().await?;
460 :
461 0 : storcon_client.put_timeline_import_status(
462 0 : timeline.tenant_shard_id,
463 0 : timeline.timeline_id,
464 0 : timeline.generation,
465 0 : ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress)))
466 0 : )
467 0 : .await
468 0 : .map_err(|_err| {
469 0 : anyhow::anyhow!("Shut down while putting timeline import status")
470 0 : })?;
471 0 : }
472 : },
473 : Some(Err(_)) => {
474 0 : anyhow::bail!(
475 0 : "import job panicked or cancelled"
476 0 : );
477 : }
478 0 : None => {}
479 : }
480 : }
481 : }
482 : }
483 :
484 0 : Ok(())
485 0 : }
486 : }
487 :
488 : //
489 : // dbdir iteration tools
490 : //
491 :
492 : struct PgDataDir {
493 : pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
494 : }
495 :
496 : struct PgDataDirDb {
497 : pub spcnode: u32,
498 : pub dboid: u32,
499 : pub path: RemotePath,
500 : pub files: Vec<PgDataDirDbFile>,
501 : }
502 :
503 : struct PgDataDirDbFile {
504 : pub path: RemotePath,
505 : pub rel_tag: RelTag,
506 : pub segno: u32,
507 : pub filesize: usize,
508 : // Cummulative size of the given fork, set only for the last segment of that fork
509 : pub nblocks: Option<usize>,
510 : }
511 :
512 : impl PgDataDir {
513 0 : async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
514 0 : let datadir_path = storage.pgdata();
515 0 : // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
516 0 : // Traverse database in increasing oid order
517 0 :
518 0 : let basedir = &datadir_path.join("base");
519 0 : let db_oids: Vec<_> = storage
520 0 : .listdir(basedir)
521 0 : .await?
522 0 : .into_iter()
523 0 : .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
524 0 : .sorted()
525 0 : .collect();
526 0 : debug!(?db_oids, "found databases");
527 0 : let mut databases = Vec::new();
528 0 : for dboid in db_oids {
529 0 : databases.push(
530 0 : PgDataDirDb::new(
531 0 : storage,
532 0 : &basedir.join(dboid.to_string()),
533 0 : pg_constants::DEFAULTTABLESPACE_OID,
534 0 : dboid,
535 0 : &datadir_path,
536 0 : )
537 0 : .await?,
538 : );
539 : }
540 :
541 : // special case for global catalogs
542 0 : databases.push(
543 0 : PgDataDirDb::new(
544 0 : storage,
545 0 : &datadir_path.join("global"),
546 0 : postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
547 0 : 0,
548 0 : &datadir_path,
549 0 : )
550 0 : .await?,
551 : );
552 :
553 0 : databases.sort_by_key(|db| (db.spcnode, db.dboid));
554 0 :
555 0 : Ok(Self { dbs: databases })
556 0 : }
557 : }
558 :
559 : impl PgDataDirDb {
560 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
561 : async fn new(
562 : storage: &RemoteStorageWrapper,
563 : db_path: &RemotePath,
564 : spcnode: u32,
565 : dboid: u32,
566 : datadir_path: &RemotePath,
567 : ) -> anyhow::Result<Self> {
568 : let mut files: Vec<PgDataDirDbFile> = storage
569 : .listfilesindir(db_path)
570 : .await?
571 : .into_iter()
572 0 : .filter_map(|(path, size)| {
573 0 : debug!(%path, %size, "found file in dbdir");
574 0 : path.object_name().and_then(|name| {
575 0 : // returns (relnode, forknum, segno)
576 0 : parse_relfilename(name).ok().map(|x| (size, x))
577 0 : })
578 0 : })
579 0 : .sorted_by_key(|(_, relfilename)| *relfilename)
580 0 : .map(|(filesize, (relnode, forknum, segno))| {
581 0 : let rel_tag = RelTag {
582 0 : spcnode,
583 0 : dbnode: dboid,
584 0 : relnode,
585 0 : forknum,
586 0 : };
587 0 :
588 0 : let path = datadir_path.join(rel_tag.to_segfile_name(segno));
589 0 : assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
590 0 : let nblocks = filesize / BLCKSZ as usize;
591 0 :
592 0 : PgDataDirDbFile {
593 0 : path,
594 0 : filesize,
595 0 : rel_tag,
596 0 : segno,
597 0 : nblocks: Some(nblocks), // first non-cummulative sizes
598 0 : }
599 0 : })
600 : .collect();
601 :
602 : // Set cummulative sizes. Do all of that math here, so that later we could easier
603 : // parallelize over segments and know with which segments we need to write relsize
604 : // entry.
605 : let mut cumulative_nblocks: usize = 0;
606 : let mut prev_rel_tag: Option<RelTag> = None;
607 : for i in 0..files.len() {
608 : if prev_rel_tag == Some(files[i].rel_tag) {
609 : cumulative_nblocks += files[i].nblocks.unwrap();
610 : } else {
611 : cumulative_nblocks = files[i].nblocks.unwrap();
612 : }
613 :
614 : files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
615 : Some(cumulative_nblocks)
616 : } else {
617 : None
618 : };
619 :
620 : prev_rel_tag = Some(files[i].rel_tag);
621 : }
622 :
623 : Ok(PgDataDirDb {
624 : files,
625 : path: db_path.clone(),
626 : spcnode,
627 : dboid,
628 : })
629 : }
630 : }
631 :
632 : trait ImportTask {
633 : fn key_range(&self) -> Range<Key>;
634 :
635 0 : fn total_size(&self) -> usize {
636 0 : // TODO: revisit this
637 0 : if is_contiguous_range(&self.key_range()) {
638 0 : contiguous_range_len(&self.key_range()) as usize * 8192
639 : } else {
640 0 : u32::MAX as usize
641 : }
642 0 : }
643 :
644 : async fn doit(
645 : self,
646 : layer_writer: &mut ImageLayerWriter,
647 : ctx: &RequestContext,
648 : ) -> anyhow::Result<usize>;
649 : }
650 :
651 : struct ImportSingleKeyTask {
652 : key: Key,
653 : buf: Bytes,
654 : }
655 :
656 : impl Hash for ImportSingleKeyTask {
657 0 : fn hash<H: Hasher>(&self, state: &mut H) {
658 0 : let ImportSingleKeyTask { key, buf } = self;
659 0 :
660 0 : key.hash(state);
661 0 : // The key value might not have a stable binary representation.
662 0 : // For instance, the db directory uses an unstable hash-map.
663 0 : // To work around this we are a bit lax here and only hash the
664 0 : // size of the buffer which must be consistent.
665 0 : buf.len().hash(state);
666 0 : }
667 : }
668 :
669 : impl ImportSingleKeyTask {
670 0 : fn new(key: Key, buf: Bytes) -> Self {
671 0 : ImportSingleKeyTask { key, buf }
672 0 : }
673 : }
674 :
675 : impl ImportTask for ImportSingleKeyTask {
676 0 : fn key_range(&self) -> Range<Key> {
677 0 : singleton_range(self.key)
678 0 : }
679 :
680 0 : async fn doit(
681 0 : self,
682 0 : layer_writer: &mut ImageLayerWriter,
683 0 : ctx: &RequestContext,
684 0 : ) -> anyhow::Result<usize> {
685 0 : layer_writer.put_image(self.key, self.buf, ctx).await?;
686 0 : Ok(1)
687 0 : }
688 : }
689 :
690 : struct ImportRelBlocksTask {
691 : shard_identity: ShardIdentity,
692 : key_range: Range<Key>,
693 : path: RemotePath,
694 : storage: RemoteStorageWrapper,
695 : }
696 :
697 : impl Hash for ImportRelBlocksTask {
698 0 : fn hash<H: Hasher>(&self, state: &mut H) {
699 0 : let ImportRelBlocksTask {
700 0 : shard_identity: _,
701 0 : key_range,
702 0 : path,
703 0 : storage: _,
704 0 : } = self;
705 0 :
706 0 : key_range.hash(state);
707 0 : path.hash(state);
708 0 : }
709 : }
710 :
711 : impl ImportRelBlocksTask {
712 0 : fn new(
713 0 : shard_identity: ShardIdentity,
714 0 : key_range: Range<Key>,
715 0 : path: &RemotePath,
716 0 : storage: RemoteStorageWrapper,
717 0 : ) -> Self {
718 0 : ImportRelBlocksTask {
719 0 : shard_identity,
720 0 : key_range,
721 0 : path: path.clone(),
722 0 : storage,
723 0 : }
724 0 : }
725 : }
726 :
727 : impl ImportTask for ImportRelBlocksTask {
728 0 : fn key_range(&self) -> Range<Key> {
729 0 : self.key_range.clone()
730 0 : }
731 :
732 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
733 : async fn doit(
734 : self,
735 : layer_writer: &mut ImageLayerWriter,
736 : ctx: &RequestContext,
737 : ) -> anyhow::Result<usize> {
738 : debug!("Importing relation file");
739 :
740 : let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
741 : let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
742 : assert_eq!(rel_tag, rel_tag_end);
743 :
744 : let ranges = (start_blk..end_blk)
745 : .enumerate()
746 0 : .filter_map(|(i, blknum)| {
747 0 : let key = rel_block_to_key(rel_tag, blknum);
748 0 : if self.shard_identity.is_key_disposable(&key) {
749 0 : return None;
750 0 : }
751 0 : let file_offset = i.checked_mul(8192).unwrap();
752 0 : Some((
753 0 : vec![key],
754 0 : file_offset,
755 0 : file_offset.checked_add(8192).unwrap(),
756 0 : ))
757 0 : })
758 0 : .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
759 0 : assert_eq!(key.len(), 1);
760 0 : assert!(!acc.is_empty());
761 0 : assert!(acc_end > acc_start);
762 0 : if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
763 0 : acc.push(key.pop().unwrap());
764 0 : Ok((acc, acc_start, end))
765 : } else {
766 0 : Err(((acc, acc_start, acc_end), (key, start, end)))
767 : }
768 0 : });
769 :
770 : let mut nimages = 0;
771 : for (keys, range_start, range_end) in ranges {
772 : let range_buf = self
773 : .storage
774 : .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
775 : .await?;
776 : let mut buf = Bytes::from(range_buf);
777 : // TODO: batched writes
778 : for key in keys {
779 : let image = buf.split_to(8192);
780 : layer_writer.put_image(key, image, ctx).await?;
781 : nimages += 1;
782 : }
783 : }
784 :
785 : Ok(nimages)
786 : }
787 : }
788 :
789 : struct ImportSlruBlocksTask {
790 : key_range: Range<Key>,
791 : path: RemotePath,
792 : storage: RemoteStorageWrapper,
793 : }
794 :
795 : impl Hash for ImportSlruBlocksTask {
796 0 : fn hash<H: Hasher>(&self, state: &mut H) {
797 0 : let ImportSlruBlocksTask {
798 0 : key_range,
799 0 : path,
800 0 : storage: _,
801 0 : } = self;
802 0 :
803 0 : key_range.hash(state);
804 0 : path.hash(state);
805 0 : }
806 : }
807 :
808 : impl ImportSlruBlocksTask {
809 0 : fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
810 0 : ImportSlruBlocksTask {
811 0 : key_range,
812 0 : path: path.clone(),
813 0 : storage,
814 0 : }
815 0 : }
816 : }
817 :
818 : impl ImportTask for ImportSlruBlocksTask {
819 0 : fn key_range(&self) -> Range<Key> {
820 0 : self.key_range.clone()
821 0 : }
822 :
823 0 : async fn doit(
824 0 : self,
825 0 : layer_writer: &mut ImageLayerWriter,
826 0 : ctx: &RequestContext,
827 0 : ) -> anyhow::Result<usize> {
828 0 : debug!("Importing SLRU segment file {}", self.path);
829 0 : let buf = self.storage.get(&self.path).await?;
830 :
831 0 : let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
832 0 : let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
833 0 : let mut blknum = start_blk;
834 0 : let mut nimages = 0;
835 0 : let mut file_offset = 0;
836 0 : while blknum < end_blk {
837 0 : let key = slru_block_to_key(kind, segno, blknum);
838 0 : let buf = &buf[file_offset..(file_offset + 8192)];
839 0 : file_offset += 8192;
840 0 : layer_writer
841 0 : .put_image(key, Bytes::copy_from_slice(buf), ctx)
842 0 : .await?;
843 0 : nimages += 1;
844 0 : blknum += 1;
845 : }
846 0 : Ok(nimages)
847 0 : }
848 : }
849 :
850 : #[derive(Hash)]
851 : enum AnyImportTask {
852 : SingleKey(ImportSingleKeyTask),
853 : RelBlocks(ImportRelBlocksTask),
854 : SlruBlocks(ImportSlruBlocksTask),
855 : }
856 :
857 : impl ImportTask for AnyImportTask {
858 0 : fn key_range(&self) -> Range<Key> {
859 0 : match self {
860 0 : Self::SingleKey(t) => t.key_range(),
861 0 : Self::RelBlocks(t) => t.key_range(),
862 0 : Self::SlruBlocks(t) => t.key_range(),
863 : }
864 0 : }
865 : /// returns the number of images put into the `layer_writer`
866 0 : async fn doit(
867 0 : self,
868 0 : layer_writer: &mut ImageLayerWriter,
869 0 : ctx: &RequestContext,
870 0 : ) -> anyhow::Result<usize> {
871 0 : match self {
872 0 : Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
873 0 : Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
874 0 : Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
875 : }
876 0 : }
877 : }
878 :
879 : impl From<ImportSingleKeyTask> for AnyImportTask {
880 0 : fn from(t: ImportSingleKeyTask) -> Self {
881 0 : Self::SingleKey(t)
882 0 : }
883 : }
884 :
885 : impl From<ImportRelBlocksTask> for AnyImportTask {
886 0 : fn from(t: ImportRelBlocksTask) -> Self {
887 0 : Self::RelBlocks(t)
888 0 : }
889 : }
890 :
891 : impl From<ImportSlruBlocksTask> for AnyImportTask {
892 0 : fn from(t: ImportSlruBlocksTask) -> Self {
893 0 : Self::SlruBlocks(t)
894 0 : }
895 : }
896 :
897 : #[derive(Hash)]
898 : struct ChunkProcessingJob {
899 : range: Range<Key>,
900 : tasks: Vec<AnyImportTask>,
901 :
902 : pgdata_lsn: Lsn,
903 : }
904 :
905 : impl ChunkProcessingJob {
906 0 : fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
907 0 : assert!(pgdata_lsn.is_valid());
908 0 : Self {
909 0 : range,
910 0 : tasks,
911 0 : pgdata_lsn,
912 0 : }
913 0 : }
914 :
915 0 : async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
916 0 : let mut writer = ImageLayerWriter::new(
917 0 : timeline.conf,
918 0 : timeline.timeline_id,
919 0 : timeline.tenant_shard_id,
920 0 : &self.range,
921 0 : self.pgdata_lsn,
922 0 : &timeline.gate,
923 0 : timeline.cancel.clone(),
924 0 : ctx,
925 0 : )
926 0 : .await?;
927 :
928 0 : let mut nimages = 0;
929 0 : for task in self.tasks {
930 0 : nimages += task.doit(&mut writer, ctx).await?;
931 : }
932 :
933 0 : let resident_layer = if nimages > 0 {
934 0 : let (desc, path) = writer.finish(ctx).await?;
935 :
936 : {
937 0 : let guard = timeline.layers.read().await;
938 0 : let existing_layer = guard.try_get_from_key(&desc.key());
939 0 : if let Some(layer) = existing_layer {
940 0 : if layer.metadata().generation == timeline.generation {
941 0 : return Err(anyhow::anyhow!(
942 0 : "Import attempted to rewrite layer file in the same generation: {}",
943 0 : layer.local_path()
944 0 : ));
945 0 : }
946 0 : }
947 : }
948 :
949 0 : Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
950 : } else {
951 : // dropping the writer cleans up
952 0 : return Ok(());
953 : };
954 :
955 : // The same import job might run multiple times since not each job is checkpointed.
956 : // Hence, we must support the cases where the layer already exists. We cannot be
957 : // certain that the existing layer is identical to the new one, so in that case
958 : // we replace the old layer with the one we just generated.
959 :
960 0 : let mut guard = timeline.layers.write().await;
961 :
962 0 : let existing_layer = guard
963 0 : .try_get_from_key(&resident_layer.layer_desc().key())
964 0 : .cloned();
965 0 : match existing_layer {
966 0 : Some(existing) => {
967 0 : guard.open_mut()?.rewrite_layers(
968 0 : &[(existing.clone(), resident_layer.clone())],
969 0 : &[],
970 0 : &timeline.metrics,
971 0 : );
972 : }
973 : None => {
974 0 : guard
975 0 : .open_mut()?
976 0 : .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
977 : }
978 : }
979 :
980 0 : crate::tenant::timeline::drop_wlock(guard);
981 0 :
982 0 : timeline
983 0 : .remote_client
984 0 : .schedule_layer_file_upload(resident_layer)?;
985 :
986 0 : Ok(())
987 0 : }
988 : }
|