Line data Source code
1 : //! This program dumps a remote Postgres database into a local Postgres database
2 : //! and uploads the resulting PGDATA into object storage for import into a Timeline.
3 : //!
4 : //! # Context, Architecture, Design
5 : //!
6 : //! See cloud.git Fast Imports RFC (<https://github.com/neondatabase/cloud/pull/19799>)
7 : //! for the full picture.
8 : //! The RFC describing the storage pieces of importing the PGDATA dump into a Timeline
9 : //! is publicly accessible at <https://github.com/neondatabase/neon/pull/9538>.
10 : //!
11 : //! # This is a Prototype!
12 : //!
13 : //! This program is part of a prototype feature and not yet used in production.
14 : //!
15 : //! The cloud.git RFC contains lots of suggestions for improving e2e throughput
16 : //! of this step of the timeline import process.
17 : //!
18 : //! # Local Testing
19 : //!
20 : //! - Comment out most of the pgxns in compute-node.Dockerfile to speed up the build.
21 : //! - Build the image with the following command:
22 : //!
23 : //! ```bash
24 : //! docker buildx build --platform linux/amd64 --build-arg DEBIAN_VERSION=bullseye --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/compute-node.Dockerfile .
25 : //! docker push localhost:3030/localregistry/compute-node-v14:latest
26 : //! ```
27 :
28 : use anyhow::Context;
29 : use aws_config::BehaviorVersion;
30 : use camino::{Utf8Path, Utf8PathBuf};
31 : use clap::Parser;
32 : use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
33 : use nix::unistd::Pid;
34 : use tracing::{error, info, info_span, warn, Instrument};
35 : use utils::fs_ext::is_directory_empty;
36 :
37 : #[path = "fast_import/aws_s3_sync.rs"]
38 : mod aws_s3_sync;
39 : #[path = "fast_import/child_stdio_to_log.rs"]
40 : mod child_stdio_to_log;
41 : #[path = "fast_import/s3_uri.rs"]
42 : mod s3_uri;
43 :
44 : const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
45 : const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
46 :
47 : #[derive(clap::Parser)]
48 : struct Args {
49 : #[clap(long)]
50 0 : working_directory: Utf8PathBuf,
51 : #[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
52 : s3_prefix: Option<s3_uri::S3Uri>,
53 : #[clap(long)]
54 : source_connection_string: Option<String>,
55 : #[clap(short, long)]
56 0 : interactive: bool,
57 : #[clap(long)]
58 0 : pg_bin_dir: Utf8PathBuf,
59 : #[clap(long)]
60 0 : pg_lib_dir: Utf8PathBuf,
61 : #[clap(long)]
62 : pg_port: Option<u16>, // port to run postgres on, 5432 is default
63 :
64 : /// Number of CPUs in the system. This is used to configure # of
65 : /// parallel worker processes, for index creation.
66 : #[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
67 : num_cpus: Option<usize>,
68 :
69 : /// Amount of RAM in the system. This is used to configure shared_buffers
70 : /// and maintenance_work_mem.
71 : #[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
72 : memory_mb: Option<usize>,
73 : }
74 :
75 : #[serde_with::serde_as]
76 0 : #[derive(serde::Deserialize)]
77 : struct Spec {
78 : encryption_secret: EncryptionSecret,
79 : #[serde_as(as = "serde_with::base64::Base64")]
80 : source_connstring_ciphertext_base64: Vec<u8>,
81 : }
82 :
83 0 : #[derive(serde::Deserialize)]
84 : enum EncryptionSecret {
85 : #[allow(clippy::upper_case_acronyms)]
86 : KMS { key_id: String },
87 : }
88 :
89 : // copied from pageserver_api::config::defaults::DEFAULT_LOCALE to avoid dependency just for a constant
90 : const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
91 : "C"
92 : } else {
93 : "C.UTF-8"
94 : };
95 :
96 : #[tokio::main]
97 0 : pub(crate) async fn main() -> anyhow::Result<()> {
98 0 : utils::logging::init(
99 0 : utils::logging::LogFormat::Plain,
100 0 : utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
101 0 : utils::logging::Output::Stdout,
102 0 : )?;
103 0 :
104 0 : info!("starting");
105 0 :
106 0 : let args = Args::parse();
107 0 :
108 0 : // Validate arguments
109 0 : if args.s3_prefix.is_none() && args.source_connection_string.is_none() {
110 0 : anyhow::bail!("either s3_prefix or source_connection_string must be specified");
111 0 : }
112 0 : if args.s3_prefix.is_some() && args.source_connection_string.is_some() {
113 0 : anyhow::bail!("only one of s3_prefix or source_connection_string can be specified");
114 0 : }
115 0 :
116 0 : let working_directory = args.working_directory;
117 0 : let pg_bin_dir = args.pg_bin_dir;
118 0 : let pg_lib_dir = args.pg_lib_dir;
119 0 : let pg_port = args.pg_port.unwrap_or_else(|| {
120 0 : info!("pg_port not specified, using default 5432");
121 0 : 5432
122 0 : });
123 0 :
124 0 : // Initialize AWS clients only if s3_prefix is specified
125 0 : let (aws_config, kms_client) = if args.s3_prefix.is_some() {
126 0 : let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
127 0 : let kms = aws_sdk_kms::Client::new(&config);
128 0 : (Some(config), Some(kms))
129 0 : } else {
130 0 : (None, None)
131 0 : };
132 0 :
133 0 : // Get source connection string either from S3 spec or direct argument
134 0 : let source_connection_string = if let Some(s3_prefix) = &args.s3_prefix {
135 0 : let spec: Spec = {
136 0 : let spec_key = s3_prefix.append("/spec.json");
137 0 : let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
138 0 : let object = s3_client
139 0 : .get_object()
140 0 : .bucket(&spec_key.bucket)
141 0 : .key(spec_key.key)
142 0 : .send()
143 0 : .await
144 0 : .context("get spec from s3")?
145 0 : .body
146 0 : .collect()
147 0 : .await
148 0 : .context("download spec body")?;
149 0 : serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
150 0 : };
151 0 :
152 0 : match spec.encryption_secret {
153 0 : EncryptionSecret::KMS { key_id } => {
154 0 : let mut output = kms_client
155 0 : .unwrap()
156 0 : .decrypt()
157 0 : .key_id(key_id)
158 0 : .ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
159 0 : spec.source_connstring_ciphertext_base64,
160 0 : ))
161 0 : .send()
162 0 : .await
163 0 : .context("decrypt source connection string")?;
164 0 : let plaintext = output
165 0 : .plaintext
166 0 : .take()
167 0 : .context("get plaintext source connection string")?;
168 0 : String::from_utf8(plaintext.into_inner())
169 0 : .context("parse source connection string as utf8")?
170 0 : }
171 0 : }
172 0 : } else {
173 0 : args.source_connection_string.unwrap()
174 0 : };
175 0 :
176 0 : match tokio::fs::create_dir(&working_directory).await {
177 0 : Ok(()) => {}
178 0 : Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
179 0 : if !is_directory_empty(&working_directory)
180 0 : .await
181 0 : .context("check if working directory is empty")?
182 0 : {
183 0 : anyhow::bail!("working directory is not empty");
184 0 : } else {
185 0 : // ok
186 0 : }
187 0 : }
188 0 : Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
189 0 : }
190 0 :
191 0 : let pgdata_dir = working_directory.join("pgdata");
192 0 : tokio::fs::create_dir(&pgdata_dir)
193 0 : .await
194 0 : .context("create pgdata directory")?;
195 0 :
196 0 : let pgbin = pg_bin_dir.join("postgres");
197 0 : let pg_version = match get_pg_version(pgbin.as_ref()) {
198 0 : PostgresMajorVersion::V14 => 14,
199 0 : PostgresMajorVersion::V15 => 15,
200 0 : PostgresMajorVersion::V16 => 16,
201 0 : PostgresMajorVersion::V17 => 17,
202 0 : };
203 0 : let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
204 0 : postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
205 0 : superuser,
206 0 : locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
207 0 : pg_version,
208 0 : initdb_bin: pg_bin_dir.join("initdb").as_ref(),
209 0 : library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
210 0 : pgdata: &pgdata_dir,
211 0 : })
212 0 : .await
213 0 : .context("initdb")?;
214 0 :
215 0 : // If the caller didn't specify CPU / RAM to use for sizing, default to
216 0 : // number of CPUs in the system, and pretty arbitrarily, 256 MB of RAM.
217 0 : let nproc = args.num_cpus.unwrap_or_else(num_cpus::get);
218 0 : let memory_mb = args.memory_mb.unwrap_or(256);
219 0 :
220 0 : // Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
221 0 : // maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
222 0 : // available for misc other stuff that PostgreSQL uses memory for.
223 0 : let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
224 0 : let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
225 0 :
226 0 : //
227 0 : // Launch postgres process
228 0 : //
229 0 : let mut postgres_proc = tokio::process::Command::new(pgbin)
230 0 : .arg("-D")
231 0 : .arg(&pgdata_dir)
232 0 : .args(["-p", &format!("{pg_port}")])
233 0 : .args(["-c", "wal_level=minimal"])
234 0 : .args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
235 0 : .args(["-c", "max_wal_senders=0"])
236 0 : .args(["-c", "fsync=off"])
237 0 : .args(["-c", "full_page_writes=off"])
238 0 : .args(["-c", "synchronous_commit=off"])
239 0 : .args([
240 0 : "-c",
241 0 : &format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
242 0 : ])
243 0 : .args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
244 0 : .args(["-c", &format!("max_parallel_workers={nproc}")])
245 0 : .args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
246 0 : .args(["-c", &format!("max_worker_processes={nproc}")])
247 0 : .args([
248 0 : "-c",
249 0 : &format!(
250 0 : "effective_io_concurrency={}",
251 0 : if cfg!(target_os = "macos") { 0 } else { 100 }
252 0 : ),
253 0 : ])
254 0 : .env_clear()
255 0 : .env("LD_LIBRARY_PATH", &pg_lib_dir)
256 0 : .env(
257 0 : "ASAN_OPTIONS",
258 0 : std::env::var("ASAN_OPTIONS").unwrap_or_default(),
259 0 : )
260 0 : .env(
261 0 : "UBSAN_OPTIONS",
262 0 : std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
263 0 : )
264 0 : .stdout(std::process::Stdio::piped())
265 0 : .stderr(std::process::Stdio::piped())
266 0 : .spawn()
267 0 : .context("spawn postgres")?;
268 0 :
269 0 : info!("spawned postgres, waiting for it to become ready");
270 0 : tokio::spawn(
271 0 : child_stdio_to_log::relay_process_output(
272 0 : postgres_proc.stdout.take(),
273 0 : postgres_proc.stderr.take(),
274 0 : )
275 0 : .instrument(info_span!("postgres")),
276 0 : );
277 0 :
278 0 : // Create neondb database in the running postgres
279 0 : let restore_pg_connstring =
280 0 : format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
281 0 :
282 0 : let start_time = std::time::Instant::now();
283 0 :
284 0 : loop {
285 0 : if start_time.elapsed() > PG_WAIT_TIMEOUT {
286 0 : error!(
287 0 : "timeout exceeded: failed to poll postgres and create database within 10 minutes"
288 0 : );
289 0 : std::process::exit(1);
290 0 : }
291 0 :
292 0 : match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
293 0 : Ok((client, connection)) => {
294 0 : // Spawn the connection handling task to maintain the connection
295 0 : tokio::spawn(async move {
296 0 : if let Err(e) = connection.await {
297 0 : warn!("connection error: {}", e);
298 0 : }
299 0 : });
300 0 :
301 0 : match client.simple_query("CREATE DATABASE neondb;").await {
302 0 : Ok(_) => {
303 0 : info!("created neondb database");
304 0 : break;
305 0 : }
306 0 : Err(e) => {
307 0 : warn!(
308 0 : "failed to create database: {}, retying in {}s",
309 0 : e,
310 0 : PG_WAIT_RETRY_INTERVAL.as_secs_f32()
311 0 : );
312 0 : tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
313 0 : continue;
314 0 : }
315 0 : }
316 0 : }
317 0 : Err(_) => {
318 0 : info!(
319 0 : "postgres not ready yet, retrying in {}s",
320 0 : PG_WAIT_RETRY_INTERVAL.as_secs_f32()
321 0 : );
322 0 : tokio::time::sleep(PG_WAIT_RETRY_INTERVAL).await;
323 0 : continue;
324 0 : }
325 0 : }
326 0 : }
327 0 :
328 0 : let restore_pg_connstring = restore_pg_connstring.replace("dbname=postgres", "dbname=neondb");
329 0 :
330 0 : let dumpdir = working_directory.join("dumpdir");
331 0 :
332 0 : let common_args = [
333 0 : // schema mapping (prob suffices to specify them on one side)
334 0 : "--no-owner".to_string(),
335 0 : "--no-privileges".to_string(),
336 0 : "--no-publications".to_string(),
337 0 : "--no-security-labels".to_string(),
338 0 : "--no-subscriptions".to_string(),
339 0 : "--no-tablespaces".to_string(),
340 0 : // format
341 0 : "--format".to_string(),
342 0 : "directory".to_string(),
343 0 : // concurrency
344 0 : "--jobs".to_string(),
345 0 : num_cpus::get().to_string(),
346 0 : // progress updates
347 0 : "--verbose".to_string(),
348 0 : ];
349 0 :
350 0 : info!("dump into the working directory");
351 0 : {
352 0 : let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
353 0 : .args(&common_args)
354 0 : .arg("-f")
355 0 : .arg(&dumpdir)
356 0 : .arg("--no-sync")
357 0 : // POSITIONAL args
358 0 : // source db (db name included in connection string)
359 0 : .arg(&source_connection_string)
360 0 : // how we run it
361 0 : .env_clear()
362 0 : .env("LD_LIBRARY_PATH", &pg_lib_dir)
363 0 : .kill_on_drop(true)
364 0 : .stdout(std::process::Stdio::piped())
365 0 : .stderr(std::process::Stdio::piped())
366 0 : .spawn()
367 0 : .context("spawn pg_dump")?;
368 0 :
369 0 : info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump");
370 0 :
371 0 : tokio::spawn(
372 0 : child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take())
373 0 : .instrument(info_span!("pg_dump")),
374 0 : );
375 0 :
376 0 : let st = pg_dump.wait().await.context("wait for pg_dump")?;
377 0 : info!(status=?st, "pg_dump exited");
378 0 : if !st.success() {
379 0 : warn!(status=%st, "pg_dump failed, restore will likely fail as well");
380 0 : }
381 0 : }
382 0 :
383 0 : // TODO: do it in a streaming way, plenty of internal research done on this already
384 0 : // TODO: do the unlogged table trick
385 0 :
386 0 : info!("restore from working directory into vanilla postgres");
387 0 : {
388 0 : let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
389 0 : .args(&common_args)
390 0 : .arg("-d")
391 0 : .arg(&restore_pg_connstring)
392 0 : // POSITIONAL args
393 0 : .arg(&dumpdir)
394 0 : // how we run it
395 0 : .env_clear()
396 0 : .env("LD_LIBRARY_PATH", &pg_lib_dir)
397 0 : .kill_on_drop(true)
398 0 : .stdout(std::process::Stdio::piped())
399 0 : .stderr(std::process::Stdio::piped())
400 0 : .spawn()
401 0 : .context("spawn pg_restore")?;
402 0 :
403 0 : info!(pid=%pg_restore.id().unwrap(), "spawned pg_restore");
404 0 : tokio::spawn(
405 0 : child_stdio_to_log::relay_process_output(
406 0 : pg_restore.stdout.take(),
407 0 : pg_restore.stderr.take(),
408 0 : )
409 0 : .instrument(info_span!("pg_restore")),
410 0 : );
411 0 : let st = pg_restore.wait().await.context("wait for pg_restore")?;
412 0 : info!(status=?st, "pg_restore exited");
413 0 : if !st.success() {
414 0 : warn!(status=%st, "pg_restore failed, restore will likely fail as well");
415 0 : }
416 0 : }
417 0 :
418 0 : // If interactive mode, wait for Ctrl+C
419 0 : if args.interactive {
420 0 : info!("Running in interactive mode. Press Ctrl+C to shut down.");
421 0 : tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
422 0 : }
423 0 :
424 0 : info!("shutdown postgres");
425 0 : {
426 0 : nix::sys::signal::kill(
427 0 : Pid::from_raw(
428 0 : i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
429 0 : ),
430 0 : nix::sys::signal::SIGTERM,
431 0 : )
432 0 : .context("signal postgres to shut down")?;
433 0 : postgres_proc
434 0 : .wait()
435 0 : .await
436 0 : .context("wait for postgres to shut down")?;
437 0 : }
438 0 :
439 0 : // Only sync if s3_prefix was specified
440 0 : if let Some(s3_prefix) = args.s3_prefix {
441 0 : info!("upload pgdata");
442 0 : aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
443 0 : .await
444 0 : .context("sync dump directory to destination")?;
445 0 :
446 0 : info!("write status");
447 0 : {
448 0 : let status_dir = working_directory.join("status");
449 0 : std::fs::create_dir(&status_dir).context("create status directory")?;
450 0 : let status_file = status_dir.join("pgdata");
451 0 : std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
452 0 : .context("write status file")?;
453 0 : aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
454 0 : .await
455 0 : .context("sync status directory to destination")?;
456 0 : }
457 0 : }
458 0 :
459 0 : Ok(())
460 0 : }
|