LCOV - code coverage report
Current view: top level - compute_tools/src/bin - fast_import.rs (source / functions) Coverage Total Hit
Test: 1c072cf775425a8f441e533586eea4edd880d500.info Lines: 0.0 % 370 0
Test Date: 2025-02-14 11:35:56 Functions: 0.0 % 24 0

            Line data    Source code
       1              : //! This program dumps a remote Postgres database into a local Postgres database
       2              : //! and uploads the resulting PGDATA into object storage for import into a Timeline.
       3              : //!
       4              : //! # Context, Architecture, Design
       5              : //!
       6              : //! See cloud.git Fast Imports RFC (<https://github.com/neondatabase/cloud/pull/19799>)
       7              : //! for the full picture.
       8              : //! The RFC describing the storage pieces of importing the PGDATA dump into a Timeline
       9              : //! is publicly accessible at <https://github.com/neondatabase/neon/pull/9538>.
      10              : //!
      11              : //! # This is a Prototype!
      12              : //!
      13              : //! This program is part of a prototype feature and not yet used in production.
      14              : //!
      15              : //! The cloud.git RFC contains lots of suggestions for improving e2e throughput
      16              : //! of this step of the timeline import process.
      17              : //!
      18              : //! # Local Testing
      19              : //!
      20              : //! - Comment out most of the pgxns in compute-node.Dockerfile to speed up the build.
      21              : //! - Build the image with the following command:
      22              : //!
      23              : //! ```bash
      24              : //! docker buildx build --platform linux/amd64 --build-arg DEBIAN_VERSION=bullseye --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/compute-node.Dockerfile .
      25              : //! docker push localhost:3030/localregistry/compute-node-v14:latest
      26              : //! ```
      27              : 
      28              : use anyhow::Context;
      29              : use aws_config::BehaviorVersion;
      30              : use camino::{Utf8Path, Utf8PathBuf};
      31              : use clap::Parser;
      32              : use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
      33              : use nix::unistd::Pid;
      34              : use tracing::{error, info, info_span, warn, Instrument};
      35              : use utils::fs_ext::is_directory_empty;
      36              : 
      37              : #[path = "fast_import/aws_s3_sync.rs"]
      38              : mod aws_s3_sync;
      39              : #[path = "fast_import/child_stdio_to_log.rs"]
      40              : mod child_stdio_to_log;
      41              : #[path = "fast_import/s3_uri.rs"]
      42              : mod s3_uri;
      43              : 
      44              : const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
      45              : const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
      46              : 
      47              : #[derive(clap::Parser)]
      48              : struct Args {
      49              :     #[clap(long)]
      50            0 :     working_directory: Utf8PathBuf,
      51              :     #[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
      52              :     s3_prefix: Option<s3_uri::S3Uri>,
      53              :     #[clap(long)]
      54              :     source_connection_string: Option<String>,
      55              :     #[clap(short, long)]
      56            0 :     interactive: bool,
      57              :     #[clap(long)]
      58            0 :     pg_bin_dir: Utf8PathBuf,
      59              :     #[clap(long)]
      60            0 :     pg_lib_dir: Utf8PathBuf,
      61              :     #[clap(long)]
      62              :     pg_port: Option<u16>, // port to run postgres on, 5432 is default
      63              : 
      64              :     /// Number of CPUs in the system. This is used to configure # of
      65              :     /// parallel worker processes, for index creation.
      66              :     #[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
      67              :     num_cpus: Option<usize>,
      68              : 
      69              :     /// Amount of RAM in the system. This is used to configure shared_buffers
      70              :     /// and maintenance_work_mem.
      71              :     #[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
      72              :     memory_mb: Option<usize>,
      73              : }
      74              : 
      75              : #[serde_with::serde_as]
      76            0 : #[derive(serde::Deserialize)]
      77              : struct Spec {
      78              :     encryption_secret: EncryptionSecret,
      79              :     #[serde_as(as = "serde_with::base64::Base64")]
      80              :     source_connstring_ciphertext_base64: Vec<u8>,
      81              : }
      82              : 
      83            0 : #[derive(serde::Deserialize)]
      84              : enum EncryptionSecret {
      85              :     #[allow(clippy::upper_case_acronyms)]
      86              :     KMS { key_id: String },
      87              : }
      88              : 
      89              : // copied from pageserver_api::config::defaults::DEFAULT_LOCALE to avoid dependency just for a constant
      90              : const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
      91              :     "C"
      92              : } else {
      93              :     "C.UTF-8"
      94              : };
      95              : 
      96              : #[tokio::main]
      97            0 : pub(crate) async fn main() -> anyhow::Result<()> {
      98            0 :     utils::logging::init(
      99            0 :         utils::logging::LogFormat::Plain,
     100            0 :         utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
     101            0 :         utils::logging::Output::Stdout,
     102            0 :     )?;
     103            0 : 
     104            0 :     info!("starting");
     105            0 : 
     106            0 :     let args = Args::parse();
     107            0 : 
     108            0 :     // Validate arguments
     109            0 :     if args.s3_prefix.is_none() && args.source_connection_string.is_none() {
     110            0 :         anyhow::bail!("either s3_prefix or source_connection_string must be specified");
     111            0 :     }
     112            0 :     if args.s3_prefix.is_some() && args.source_connection_string.is_some() {
     113            0 :         anyhow::bail!("only one of s3_prefix or source_connection_string can be specified");
     114            0 :     }
     115            0 : 
     116            0 :     let working_directory = args.working_directory;
     117            0 :     let pg_bin_dir = args.pg_bin_dir;
     118            0 :     let pg_lib_dir = args.pg_lib_dir;
     119            0 :     let pg_port = args.pg_port.unwrap_or_else(|| {
     120            0 :         info!("pg_port not specified, using default 5432");
     121            0 :         5432
     122            0 :     });
     123            0 : 
     124            0 :     // Initialize AWS clients only if s3_prefix is specified
     125            0 :     let (aws_config, kms_client) = if args.s3_prefix.is_some() {
     126            0 :         let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
     127            0 :         let kms = aws_sdk_kms::Client::new(&config);
     128            0 :         (Some(config), Some(kms))
     129            0 :     } else {
     130            0 :         (None, None)
     131            0 :     };
     132            0 : 
     133            0 :     // Get source connection string either from S3 spec or direct argument
     134            0 :     let source_connection_string = if let Some(s3_prefix) = &args.s3_prefix {
     135            0 :         let spec: Spec = {
     136            0 :             let spec_key = s3_prefix.append("/spec.json");
     137            0 :             let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
     138            0 :             let object = s3_client
     139            0 :                 .get_object()
     140            0 :                 .bucket(&spec_key.bucket)
     141            0 :                 .key(spec_key.key)
     142            0 :                 .send()
     143            0 :                 .await
     144            0 :                 .context("get spec from s3")?
     145            0 :                 .body
     146            0 :                 .collect()
     147            0 :                 .await
     148            0 :                 .context("download spec body")?;
     149            0 :             serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
     150            0 :         };
     151            0 : 
     152            0 :         match spec.encryption_secret {
     153            0 :             EncryptionSecret::KMS { key_id } => {
     154            0 :                 let mut output = kms_client
     155            0 :                     .unwrap()
     156            0 :                     .decrypt()
     157            0 :                     .key_id(key_id)
     158            0 :                     .ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
     159            0 :                         spec.source_connstring_ciphertext_base64,
     160            0 :                     ))
     161            0 :                     .send()
     162            0 :                     .await
     163            0 :                     .context("decrypt source connection string")?;
     164            0 :                 let plaintext = output
     165            0 :                     .plaintext
     166            0 :                     .take()
     167            0 :                     .context("get plaintext source connection string")?;
     168            0 :                 String::from_utf8(plaintext.into_inner())
     169            0 :                     .context("parse source connection string as utf8")?
     170            0 :             }
     171            0 :         }
     172            0 :     } else {
     173            0 :         args.source_connection_string.unwrap()
     174            0 :     };
     175            0 : 
     176            0 :     match tokio::fs::create_dir(&working_directory).await {
     177            0 :         Ok(()) => {}
     178            0 :         Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
     179            0 :             if !is_directory_empty(&working_directory)
     180            0 :                 .await
     181            0 :                 .context("check if working directory is empty")?
     182            0 :             {
     183            0 :                 anyhow::bail!("working directory is not empty");
     184            0 :             } else {
     185            0 :                 // ok
     186            0 :             }
     187            0 :         }
     188            0 :         Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
     189            0 :     }
     190            0 : 
     191            0 :     let pgdata_dir = working_directory.join("pgdata");
     192            0 :     tokio::fs::create_dir(&pgdata_dir)
     193            0 :         .await
     194            0 :         .context("create pgdata directory")?;
     195            0 : 
     196            0 :     let pgbin = pg_bin_dir.join("postgres");
     197            0 :     let pg_version = match get_pg_version(pgbin.as_ref()) {
     198            0 :         PostgresMajorVersion::V14 => 14,
     199            0 :         PostgresMajorVersion::V15 => 15,
     200            0 :         PostgresMajorVersion::V16 => 16,
     201            0 :         PostgresMajorVersion::V17 => 17,
     202            0 :     };
     203            0 :     let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
     204            0 :     postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
     205            0 :         superuser,
     206            0 :         locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
     207            0 :         pg_version,
     208            0 :         initdb_bin: pg_bin_dir.join("initdb").as_ref(),
     209            0 :         library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
     210            0 :         pgdata: &pgdata_dir,
     211            0 :     })
     212            0 :     .await
     213            0 :     .context("initdb")?;
     214            0 : 
     215            0 :     // If the caller didn't specify CPU / RAM to use for sizing, default to
     216            0 :     // number of CPUs in the system, and pretty arbitrarily, 256 MB of RAM.
     217            0 :     let nproc = args.num_cpus.unwrap_or_else(num_cpus::get);
     218            0 :     let memory_mb = args.memory_mb.unwrap_or(256);
     219            0 : 
     220            0 :     // Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
     221            0 :     // maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
     222            0 :     // available for misc other stuff that PostgreSQL uses memory for.
     223            0 :     let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
     224            0 :     let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
     225            0 : 
     226            0 :     //
     227            0 :     // Launch postgres process
     228            0 :     //
     229            0 :     let mut postgres_proc = tokio::process::Command::new(pgbin)
     230            0 :         .arg("-D")
     231            0 :         .arg(&pgdata_dir)
     232            0 :         .args(["-p", &format!("{pg_port}")])
     233            0 :         .args(["-c", "wal_level=minimal"])
     234            0 :         .args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
     235            0 :         .args(["-c", "max_wal_senders=0"])
     236            0 :         .args(["-c", "fsync=off"])
     237            0 :         .args(["-c", "full_page_writes=off"])
     238            0 :         .args(["-c", "synchronous_commit=off"])
     239            0 :         .args([
     240            0 :             "-c",
     241            0 :             &format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
     242            0 :         ])
     243            0 :         .args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
     244            0 :         .args(["-c", &format!("max_parallel_workers={nproc}")])
     245            0 :         .args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
     246            0 :         .args(["-c", &format!("max_worker_processes={nproc}")])
     247            0 :         .args([
     248            0 :             "-c",
     249            0 :             &format!(
     250            0 :                 "effective_io_concurrency={}",
     251            0 :                 if cfg!(target_os = "macos") { 0 } else { 100 }
     252            0 :             ),
     253            0 :         ])
     254            0 :         .env_clear()
     255            0 :         .env("LD_LIBRARY_PATH", &pg_lib_dir)
     256            0 :         .env(
     257            0 :             "ASAN_OPTIONS",
     258            0 :             std::env::var("ASAN_OPTIONS").unwrap_or_default(),
     259            0 :         )
     260            0 :         .env(
     261            0 :             "UBSAN_OPTIONS",
     262            0 :             std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
     263            0 :         )
     264            0 :         .stdout(std::process::Stdio::piped())
     265            0 :         .stderr(std::process::Stdio::piped())
     266            0 :         .spawn()
     267            0 :         .context("spawn postgres")?;
     268            0 : 
     269            0 :     info!("spawned postgres, waiting for it to become ready");
     270            0 :     tokio::spawn(
     271            0 :         child_stdio_to_log::relay_process_output(
     272            0 :             postgres_proc.stdout.take(),
     273            0 :             postgres_proc.stderr.take(),
     274            0 :         )
     275            0 :         .instrument(info_span!("postgres")),
     276            0 :     );
     277            0 : 
     278            0 :     // Create neondb database in the running postgres
     279            0 :     let restore_pg_connstring =
     280            0 :         format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
     281            0 : 
     282            0 :     let start_time = std::time::Instant::now();
     283            0 : 
     284            0 :     loop {
     285            0 :         if start_time.elapsed() > PG_WAIT_TIMEOUT {
     286            0 :             error!(
     287            0 :                 "timeout exceeded: failed to poll postgres and create database within 10 minutes"
     288            0 :             );
     289            0 :             std::process::exit(1);
     290            0 :         }
     291            0 : 
     292            0 :         match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
     293            0 :             Ok((client, connection)) => {
     294            0 :                 // Spawn the connection handling task to maintain the connection
     295            0 :                 tokio::spawn(async move {
     296            0 :                     if let Err(e) = connection.await {
     297            0 :                         warn!("connection error: {}", e);
     298            0 :                     }
     299            0 :                 });
     300            0 : 
     301            0 :                 match client.simple_query("CREATE DATABASE neondb;").await {
     302            0 :                     Ok(_) => {
     303            0 :                         info!("created neondb database");
     304            0 :                         break;
     305            0 :                     }
     306            0 :                     Err(e) => {
     307            0 :                         warn!(
     308            0 :                             "failed to create database: {}, retying in {}s",
     309            0 :                             e,
     310            0 :                             PG_WAIT_RETRY_INTERVAL.as_secs_f32()
     311            0 :                         );
     312            0 :                         tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
     313            0 :                         continue;
     314            0 :                     }
     315            0 :                 }
     316            0 :             }
     317            0 :             Err(_) => {
     318            0 :                 info!(
     319            0 :                     "postgres not ready yet, retrying in {}s",
     320            0 :                     PG_WAIT_RETRY_INTERVAL.as_secs_f32()
     321            0 :                 );
     322            0 :                 tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
     323            0 :                 continue;
     324            0 :             }
     325            0 :         }
     326            0 :     }
     327            0 : 
     328            0 :     let restore_pg_connstring = restore_pg_connstring.replace("dbname=postgres", "dbname=neondb");
     329            0 : 
     330            0 :     let dumpdir = working_directory.join("dumpdir");
     331            0 : 
     332            0 :     let common_args = [
     333            0 :         // schema mapping (prob suffices to specify them on one side)
     334            0 :         "--no-owner".to_string(),
     335            0 :         "--no-privileges".to_string(),
     336            0 :         "--no-publications".to_string(),
     337            0 :         "--no-security-labels".to_string(),
     338            0 :         "--no-subscriptions".to_string(),
     339            0 :         "--no-tablespaces".to_string(),
     340            0 :         // format
     341            0 :         "--format".to_string(),
     342            0 :         "directory".to_string(),
     343            0 :         // concurrency
     344            0 :         "--jobs".to_string(),
     345            0 :         num_cpus::get().to_string(),
     346            0 :         // progress updates
     347            0 :         "--verbose".to_string(),
     348            0 :     ];
     349            0 : 
     350            0 :     info!("dump into the working directory");
     351            0 :     {
     352            0 :         let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
     353            0 :             .args(&common_args)
     354            0 :             .arg("-f")
     355            0 :             .arg(&dumpdir)
     356            0 :             .arg("--no-sync")
     357            0 :             // POSITIONAL args
     358            0 :             // source db (db name included in connection string)
     359            0 :             .arg(&source_connection_string)
     360            0 :             // how we run it
     361            0 :             .env_clear()
     362            0 :             .env("LD_LIBRARY_PATH", &pg_lib_dir)
     363            0 :             .kill_on_drop(true)
     364            0 :             .stdout(std::process::Stdio::piped())
     365            0 :             .stderr(std::process::Stdio::piped())
     366            0 :             .spawn()
     367            0 :             .context("spawn pg_dump")?;
     368            0 : 
     369            0 :         info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump");
     370            0 : 
     371            0 :         tokio::spawn(
     372            0 :             child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take())
     373            0 :                 .instrument(info_span!("pg_dump")),
     374            0 :         );
     375            0 : 
     376            0 :         let st = pg_dump.wait().await.context("wait for pg_dump")?;
     377            0 :         info!(status=?st, "pg_dump exited");
     378            0 :         if !st.success() {
     379            0 :             warn!(status=%st, "pg_dump failed, restore will likely fail as well");
     380            0 :         }
     381            0 :     }
     382            0 : 
     383            0 :     // TODO: do it in a streaming way, plenty of internal research done on this already
     384            0 :     // TODO: do the unlogged table trick
     385            0 : 
     386            0 :     info!("restore from working directory into vanilla postgres");
     387            0 :     {
     388            0 :         let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
     389            0 :             .args(&common_args)
     390            0 :             .arg("-d")
     391            0 :             .arg(&restore_pg_connstring)
     392            0 :             // POSITIONAL args
     393            0 :             .arg(&dumpdir)
     394            0 :             // how we run it
     395            0 :             .env_clear()
     396            0 :             .env("LD_LIBRARY_PATH", &pg_lib_dir)
     397            0 :             .kill_on_drop(true)
     398            0 :             .stdout(std::process::Stdio::piped())
     399            0 :             .stderr(std::process::Stdio::piped())
     400            0 :             .spawn()
     401            0 :             .context("spawn pg_restore")?;
     402            0 : 
     403            0 :         info!(pid=%pg_restore.id().unwrap(), "spawned pg_restore");
     404            0 :         tokio::spawn(
     405            0 :             child_stdio_to_log::relay_process_output(
     406            0 :                 pg_restore.stdout.take(),
     407            0 :                 pg_restore.stderr.take(),
     408            0 :             )
     409            0 :             .instrument(info_span!("pg_restore")),
     410            0 :         );
     411            0 :         let st = pg_restore.wait().await.context("wait for pg_restore")?;
     412            0 :         info!(status=?st, "pg_restore exited");
     413            0 :         if !st.success() {
     414            0 :             warn!(status=%st, "pg_restore failed, restore will likely fail as well");
     415            0 :         }
     416            0 :     }
     417            0 : 
     418            0 :     // If interactive mode, wait for Ctrl+C
     419            0 :     if args.interactive {
     420            0 :         info!("Running in interactive mode. Press Ctrl+C to shut down.");
     421            0 :         tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
     422            0 :     }
     423            0 : 
     424            0 :     info!("shutdown postgres");
     425            0 :     {
     426            0 :         nix::sys::signal::kill(
     427            0 :             Pid::from_raw(
     428            0 :                 i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
     429            0 :             ),
     430            0 :             nix::sys::signal::SIGTERM,
     431            0 :         )
     432            0 :         .context("signal postgres to shut down")?;
     433            0 :         postgres_proc
     434            0 :             .wait()
     435            0 :             .await
     436            0 :             .context("wait for postgres to shut down")?;
     437            0 :     }
     438            0 : 
     439            0 :     // Only sync if s3_prefix was specified
     440            0 :     if let Some(s3_prefix) = args.s3_prefix {
     441            0 :         info!("upload pgdata");
     442            0 :         aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
     443            0 :             .await
     444            0 :             .context("sync dump directory to destination")?;
     445            0 : 
     446            0 :         info!("write status");
     447            0 :         {
     448            0 :             let status_dir = working_directory.join("status");
     449            0 :             std::fs::create_dir(&status_dir).context("create status directory")?;
     450            0 :             let status_file = status_dir.join("pgdata");
     451            0 :             std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
     452            0 :                 .context("write status file")?;
     453            0 :             aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
     454            0 :                 .await
     455            0 :                 .context("sync status directory to destination")?;
     456            0 :         }
     457            0 :     }
     458            0 : 
     459            0 :     Ok(())
     460            0 : }
        

Generated by: LCOV version 2.1-beta