Line data Source code
1 : //! This program dumps a remote Postgres database into a local Postgres database
2 : //! and uploads the resulting PGDATA into object storage for import into a Timeline.
3 : //!
4 : //! # Context, Architecture, Design
5 : //!
6 : //! See cloud.git Fast Imports RFC (<https://github.com/neondatabase/cloud/pull/19799>)
7 : //! for the full picture.
8 : //! The RFC describing the storage pieces of importing the PGDATA dump into a Timeline
9 : //! is publicly accessible at <https://github.com/neondatabase/neon/pull/9538>.
10 : //!
11 : //! # This is a Prototype!
12 : //!
13 : //! This program is part of a prototype feature and not yet used in production.
14 : //!
15 : //! The cloud.git RFC contains lots of suggestions for improving e2e throughput
16 : //! of this step of the timeline import process.
17 : //!
18 : //! # Local Testing
19 : //!
20 : //! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build.
21 : //! - Build the image with the following command:
22 : //!
23 : //! ```bash
24 : //! docker buildx build --build-arg DEBIAN_FLAVOR=bullseye-slim --build-arg GIT_VERSION=local --build-arg PG_VERSION=v14 --build-arg BUILD_TAG="$(date --iso-8601=s -u)" -t localhost:3030/localregistry/compute-node-v14:latest -f compute/Dockerfile.com
25 : //! docker push localhost:3030/localregistry/compute-node-v14:latest
26 : //! ```
27 :
28 : use anyhow::Context;
29 : use aws_config::BehaviorVersion;
30 : use camino::{Utf8Path, Utf8PathBuf};
31 : use clap::Parser;
32 : use nix::unistd::Pid;
33 : use tracing::{info, info_span, warn, Instrument};
34 : use utils::fs_ext::is_directory_empty;
35 :
36 : #[path = "fast_import/child_stdio_to_log.rs"]
37 : mod child_stdio_to_log;
38 : #[path = "fast_import/s3_uri.rs"]
39 : mod s3_uri;
40 : #[path = "fast_import/s5cmd.rs"]
41 : mod s5cmd;
42 :
43 0 : #[derive(clap::Parser)]
44 : struct Args {
45 : #[clap(long)]
46 0 : working_directory: Utf8PathBuf,
47 : #[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
48 0 : s3_prefix: s3_uri::S3Uri,
49 : #[clap(long)]
50 0 : pg_bin_dir: Utf8PathBuf,
51 : #[clap(long)]
52 0 : pg_lib_dir: Utf8PathBuf,
53 : }
54 :
55 : #[serde_with::serde_as]
56 0 : #[derive(serde::Deserialize)]
57 : struct Spec {
58 : encryption_secret: EncryptionSecret,
59 : #[serde_as(as = "serde_with::base64::Base64")]
60 : source_connstring_ciphertext_base64: Vec<u8>,
61 : }
62 :
63 0 : #[derive(serde::Deserialize)]
64 : enum EncryptionSecret {
65 : #[allow(clippy::upper_case_acronyms)]
66 : KMS { key_id: String },
67 : }
68 :
69 : #[tokio::main]
70 0 : pub(crate) async fn main() -> anyhow::Result<()> {
71 0 : utils::logging::init(
72 0 : utils::logging::LogFormat::Plain,
73 0 : utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
74 0 : utils::logging::Output::Stdout,
75 0 : )?;
76 0 :
77 0 : info!("starting");
78 0 :
79 0 : let Args {
80 0 : working_directory,
81 0 : s3_prefix,
82 0 : pg_bin_dir,
83 0 : pg_lib_dir,
84 0 : } = Args::parse();
85 0 :
86 0 : let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
87 0 :
88 0 : let spec: Spec = {
89 0 : let spec_key = s3_prefix.append("/spec.json");
90 0 : let s3_client = aws_sdk_s3::Client::new(&aws_config);
91 0 : let object = s3_client
92 0 : .get_object()
93 0 : .bucket(&spec_key.bucket)
94 0 : .key(spec_key.key)
95 0 : .send()
96 0 : .await
97 0 : .context("get spec from s3")?
98 0 : .body
99 0 : .collect()
100 0 : .await
101 0 : .context("download spec body")?;
102 0 : serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
103 0 : };
104 0 :
105 0 : match tokio::fs::create_dir(&working_directory).await {
106 0 : Ok(()) => {}
107 0 : Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
108 0 : if !is_directory_empty(&working_directory)
109 0 : .await
110 0 : .context("check if working directory is empty")?
111 0 : {
112 0 : anyhow::bail!("working directory is not empty");
113 0 : } else {
114 0 : // ok
115 0 : }
116 0 : }
117 0 : Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
118 0 : }
119 0 :
120 0 : let pgdata_dir = working_directory.join("pgdata");
121 0 : tokio::fs::create_dir(&pgdata_dir)
122 0 : .await
123 0 : .context("create pgdata directory")?;
124 0 :
125 0 : //
126 0 : // Setup clients
127 0 : //
128 0 : let aws_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
129 0 : let kms_client = aws_sdk_kms::Client::new(&aws_config);
130 0 :
131 0 : //
132 0 : // Initialize pgdata
133 0 : //
134 0 : let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
135 0 : postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
136 0 : superuser,
137 0 : locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded,
138 0 : pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in
139 0 : initdb_bin: pg_bin_dir.join("initdb").as_ref(),
140 0 : library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
141 0 : pgdata: &pgdata_dir,
142 0 : })
143 0 : .await
144 0 : .context("initdb")?;
145 0 :
146 0 : let nproc = num_cpus::get();
147 0 :
148 0 : //
149 0 : // Launch postgres process
150 0 : //
151 0 : let mut postgres_proc = tokio::process::Command::new(pg_bin_dir.join("postgres"))
152 0 : .arg("-D")
153 0 : .arg(&pgdata_dir)
154 0 : .args(["-c", "wal_level=minimal"])
155 0 : .args(["-c", "shared_buffers=10GB"])
156 0 : .args(["-c", "max_wal_senders=0"])
157 0 : .args(["-c", "fsync=off"])
158 0 : .args(["-c", "full_page_writes=off"])
159 0 : .args(["-c", "synchronous_commit=off"])
160 0 : .args(["-c", "maintenance_work_mem=8388608"])
161 0 : .args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
162 0 : .args(["-c", &format!("max_parallel_workers={nproc}")])
163 0 : .args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
164 0 : .args(["-c", &format!("max_worker_processes={nproc}")])
165 0 : .args(["-c", "effective_io_concurrency=100"])
166 0 : .env_clear()
167 0 : .stdout(std::process::Stdio::piped())
168 0 : .stderr(std::process::Stdio::piped())
169 0 : .spawn()
170 0 : .context("spawn postgres")?;
171 0 :
172 0 : info!("spawned postgres, waiting for it to become ready");
173 0 : tokio::spawn(
174 0 : child_stdio_to_log::relay_process_output(
175 0 : postgres_proc.stdout.take(),
176 0 : postgres_proc.stderr.take(),
177 0 : )
178 0 : .instrument(info_span!("postgres")),
179 0 : );
180 0 : let restore_pg_connstring =
181 0 : format!("host=localhost port=5432 user={superuser} dbname=postgres");
182 0 : loop {
183 0 : let res = tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await;
184 0 : if res.is_ok() {
185 0 : info!("postgres is ready, could connect to it");
186 0 : break;
187 0 : }
188 0 : }
189 0 :
190 0 : //
191 0 : // Decrypt connection string
192 0 : //
193 0 : let source_connection_string = {
194 0 : match spec.encryption_secret {
195 0 : EncryptionSecret::KMS { key_id } => {
196 0 : let mut output = kms_client
197 0 : .decrypt()
198 0 : .key_id(key_id)
199 0 : .ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
200 0 : spec.source_connstring_ciphertext_base64,
201 0 : ))
202 0 : .send()
203 0 : .await
204 0 : .context("decrypt source connection string")?;
205 0 : let plaintext = output
206 0 : .plaintext
207 0 : .take()
208 0 : .context("get plaintext source connection string")?;
209 0 : String::from_utf8(plaintext.into_inner())
210 0 : .context("parse source connection string as utf8")?
211 0 : }
212 0 : }
213 0 : };
214 0 :
215 0 : //
216 0 : // Start the work
217 0 : //
218 0 :
219 0 : let dumpdir = working_directory.join("dumpdir");
220 0 :
221 0 : let common_args = [
222 0 : // schema mapping (prob suffices to specify them on one side)
223 0 : "--no-owner".to_string(),
224 0 : "--no-privileges".to_string(),
225 0 : "--no-publications".to_string(),
226 0 : "--no-security-labels".to_string(),
227 0 : "--no-subscriptions".to_string(),
228 0 : "--no-tablespaces".to_string(),
229 0 : // format
230 0 : "--format".to_string(),
231 0 : "directory".to_string(),
232 0 : // concurrency
233 0 : "--jobs".to_string(),
234 0 : num_cpus::get().to_string(),
235 0 : // progress updates
236 0 : "--verbose".to_string(),
237 0 : ];
238 0 :
239 0 : info!("dump into the working directory");
240 0 : {
241 0 : let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
242 0 : .args(&common_args)
243 0 : .arg("-f")
244 0 : .arg(&dumpdir)
245 0 : .arg("--no-sync")
246 0 : // POSITIONAL args
247 0 : // source db (db name included in connection string)
248 0 : .arg(&source_connection_string)
249 0 : // how we run it
250 0 : .env_clear()
251 0 : .kill_on_drop(true)
252 0 : .stdout(std::process::Stdio::piped())
253 0 : .stderr(std::process::Stdio::piped())
254 0 : .spawn()
255 0 : .context("spawn pg_dump")?;
256 0 :
257 0 : info!(pid=%pg_dump.id().unwrap(), "spawned pg_dump");
258 0 :
259 0 : tokio::spawn(
260 0 : child_stdio_to_log::relay_process_output(pg_dump.stdout.take(), pg_dump.stderr.take())
261 0 : .instrument(info_span!("pg_dump")),
262 0 : );
263 0 :
264 0 : let st = pg_dump.wait().await.context("wait for pg_dump")?;
265 0 : info!(status=?st, "pg_dump exited");
266 0 : if !st.success() {
267 0 : warn!(status=%st, "pg_dump failed, restore will likely fail as well");
268 0 : }
269 0 : }
270 0 :
271 0 : // TODO: do it in a streaming way, plenty of internal research done on this already
272 0 : // TODO: do the unlogged table trick
273 0 :
274 0 : info!("restore from working directory into vanilla postgres");
275 0 : {
276 0 : let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
277 0 : .args(&common_args)
278 0 : .arg("-d")
279 0 : .arg(&restore_pg_connstring)
280 0 : // POSITIONAL args
281 0 : .arg(&dumpdir)
282 0 : // how we run it
283 0 : .env_clear()
284 0 : .kill_on_drop(true)
285 0 : .stdout(std::process::Stdio::piped())
286 0 : .stderr(std::process::Stdio::piped())
287 0 : .spawn()
288 0 : .context("spawn pg_restore")?;
289 0 :
290 0 : info!(pid=%pg_restore.id().unwrap(), "spawned pg_restore");
291 0 : tokio::spawn(
292 0 : child_stdio_to_log::relay_process_output(
293 0 : pg_restore.stdout.take(),
294 0 : pg_restore.stderr.take(),
295 0 : )
296 0 : .instrument(info_span!("pg_restore")),
297 0 : );
298 0 : let st = pg_restore.wait().await.context("wait for pg_restore")?;
299 0 : info!(status=?st, "pg_restore exited");
300 0 : if !st.success() {
301 0 : warn!(status=%st, "pg_restore failed, restore will likely fail as well");
302 0 : }
303 0 : }
304 0 :
305 0 : info!("shutdown postgres");
306 0 : {
307 0 : nix::sys::signal::kill(
308 0 : Pid::from_raw(
309 0 : i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
310 0 : ),
311 0 : nix::sys::signal::SIGTERM,
312 0 : )
313 0 : .context("signal postgres to shut down")?;
314 0 : postgres_proc
315 0 : .wait()
316 0 : .await
317 0 : .context("wait for postgres to shut down")?;
318 0 : }
319 0 :
320 0 : info!("upload pgdata");
321 0 : s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
322 0 : .await
323 0 : .context("sync dump directory to destination")?;
324 0 :
325 0 : info!("write status");
326 0 : {
327 0 : let status_dir = working_directory.join("status");
328 0 : std::fs::create_dir(&status_dir).context("create status directory")?;
329 0 : let status_file = status_dir.join("status");
330 0 : std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
331 0 : .context("write status file")?;
332 0 : s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
333 0 : .await
334 0 : .context("sync status directory to destination")?;
335 0 : }
336 0 :
337 0 : Ok(())
338 0 : }
|