TLA Line data Source code
1 : //! Code to manage pageservers
2 : //!
3 : //! In the local test environment, the pageserver stores its data directly in
4 : //!
5 : //! .neon/
6 : //!
7 : use std::borrow::Cow;
8 : use std::collections::HashMap;
9 : use std::fs::File;
10 : use std::io::{BufReader, Write};
11 : use std::num::NonZeroU64;
12 : use std::path::PathBuf;
13 : use std::process::{Child, Command};
14 : use std::{io, result};
15 :
16 : use anyhow::{bail, Context};
17 : use camino::Utf8PathBuf;
18 : use pageserver_api::models::{self, TenantInfo, TimelineInfo};
19 : use postgres_backend::AuthType;
20 : use postgres_connection::{parse_host_port, PgConnectionConfig};
21 : use reqwest::blocking::{Client, RequestBuilder, Response};
22 : use reqwest::{IntoUrl, Method};
23 : use thiserror::Error;
24 : use utils::auth::{Claims, Scope};
25 : use utils::{
26 : http::error::HttpErrorBody,
27 : id::{TenantId, TimelineId},
28 : lsn::Lsn,
29 : };
30 :
31 : use crate::local_env::PageServerConf;
32 : use crate::{background_process, local_env::LocalEnv};
33 :
34 CBC 14 : #[derive(Error, Debug)]
35 : pub enum PageserverHttpError {
36 : #[error("Reqwest error: {0}")]
37 : Transport(#[from] reqwest::Error),
38 :
39 : #[error("Error: {0}")]
40 : Response(String),
41 : }
42 :
43 : impl From<anyhow::Error> for PageserverHttpError {
44 UBC 0 : fn from(e: anyhow::Error) -> Self {
45 0 : Self::Response(e.to_string())
46 0 : }
47 : }
48 :
49 : type Result<T> = result::Result<T, PageserverHttpError>;
50 :
51 : pub trait ResponseErrorMessageExt: Sized {
52 : fn error_from_body(self) -> Result<Self>;
53 : }
54 :
55 : impl ResponseErrorMessageExt for Response {
56 CBC 1836 : fn error_from_body(self) -> Result<Self> {
57 1836 : let status = self.status();
58 1836 : if !(status.is_client_error() || status.is_server_error()) {
59 1829 : return Ok(self);
60 7 : }
61 7 :
62 7 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
63 7 : let url = self.url().to_owned();
64 7 : Err(PageserverHttpError::Response(
65 7 : match self.json::<HttpErrorBody>() {
66 7 : Ok(err_body) => format!("Error: {}", err_body.msg),
67 UBC 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
68 : },
69 : ))
70 CBC 1836 : }
71 : }
72 :
73 : //
74 : // Control routines for pageserver.
75 : //
76 : // Used in CLI and tests.
77 : //
78 UBC 0 : #[derive(Debug)]
79 : pub struct PageServerNode {
80 : pub pg_connection_config: PgConnectionConfig,
81 : pub conf: PageServerConf,
82 : pub env: LocalEnv,
83 : pub http_client: Client,
84 : pub http_base_url: String,
85 : }
86 :
87 : impl PageServerNode {
88 CBC 8648 : pub fn from_env(env: &LocalEnv, conf: &PageServerConf) -> PageServerNode {
89 8648 : let (host, port) =
90 8648 : parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
91 8648 : let port = port.unwrap_or(5432);
92 8648 : Self {
93 8648 : pg_connection_config: PgConnectionConfig::new_host_port(host, port),
94 8648 : conf: conf.clone(),
95 8648 : env: env.clone(),
96 8648 : http_client: Client::new(),
97 8648 : http_base_url: format!("http://{}/v1", conf.listen_http_addr),
98 8648 : }
99 8648 : }
100 :
101 : // pageserver conf overrides defined by neon_local configuration.
102 1469 : fn neon_local_overrides(&self) -> Vec<String> {
103 1469 : let id = format!("id={}", self.conf.id);
104 1469 : // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
105 1469 : let pg_distrib_dir_param = format!(
106 1469 : "pg_distrib_dir='{}'",
107 1469 : self.env.pg_distrib_dir_raw().display()
108 1469 : );
109 1469 :
110 1469 : let http_auth_type_param = format!("http_auth_type='{}'", self.conf.http_auth_type);
111 1469 : let listen_http_addr_param = format!("listen_http_addr='{}'", self.conf.listen_http_addr);
112 1469 :
113 1469 : let pg_auth_type_param = format!("pg_auth_type='{}'", self.conf.pg_auth_type);
114 1469 : let listen_pg_addr_param = format!("listen_pg_addr='{}'", self.conf.listen_pg_addr);
115 1469 :
116 1469 : let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
117 1469 :
118 1469 : let mut overrides = vec![
119 1469 : id,
120 1469 : pg_distrib_dir_param,
121 1469 : http_auth_type_param,
122 1469 : pg_auth_type_param,
123 1469 : listen_http_addr_param,
124 1469 : listen_pg_addr_param,
125 1469 : broker_endpoint_param,
126 1469 : ];
127 :
128 1469 : if let Some(control_plane_api) = &self.env.control_plane_api {
129 76 : overrides.push(format!(
130 76 : "control_plane_api='{}'",
131 76 : control_plane_api.as_str()
132 76 : ));
133 1393 : }
134 :
135 1469 : if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust
136 27 : {
137 27 : // Keys are generated in the toplevel repo dir, pageservers' workdirs
138 27 : // are one level below that, so refer to keys with ../
139 27 : overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
140 1442 : }
141 1469 : overrides
142 1469 : }
143 :
144 : /// Initializes a pageserver node by creating its config with the overrides provided.
145 349 : pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
146 349 : // First, run `pageserver --init` and wait for it to write a config into FS and exit.
147 349 : self.pageserver_init(config_overrides)
148 349 : .with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id))
149 349 : }
150 :
151 2032 : pub fn repo_path(&self) -> PathBuf {
152 2032 : self.env.pageserver_data_dir(self.conf.id)
153 2032 : }
154 :
155 : /// The pid file is created by the pageserver process, with its pid stored inside.
156 : /// Other pageservers cannot lock the same file and overwrite it for as long as the current
157 : /// pageserver runs. (Unless someone removes the file manually; never do that!)
158 1123 : fn pid_file(&self) -> Utf8PathBuf {
159 1123 : Utf8PathBuf::from_path_buf(self.repo_path().join("pageserver.pid"))
160 1123 : .expect("non-Unicode path")
161 1123 : }
162 :
163 560 : pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
164 560 : self.start_node(config_overrides, false)
165 560 : }
166 :
167 349 : fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
168 349 : let datadir = self.repo_path();
169 349 : let node_id = self.conf.id;
170 349 : println!(
171 349 : "Initializing pageserver node {} at '{}' in {:?}",
172 349 : node_id,
173 349 : self.pg_connection_config.raw_address(),
174 349 : datadir
175 349 : );
176 349 : io::stdout().flush()?;
177 :
178 349 : if !datadir.exists() {
179 349 : std::fs::create_dir(&datadir)?;
180 UBC 0 : }
181 :
182 CBC 349 : let datadir_path_str = datadir.to_str().with_context(|| {
183 UBC 0 : format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
184 CBC 349 : })?;
185 349 : let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
186 349 : args.push(Cow::Borrowed("--init"));
187 :
188 349 : let init_output = Command::new(self.env.pageserver_bin())
189 349 : .args(args.iter().map(Cow::as_ref))
190 349 : .envs(self.pageserver_env_variables()?)
191 349 : .output()
192 349 : .with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
193 :
194 349 : anyhow::ensure!(
195 349 : init_output.status.success(),
196 UBC 0 : "Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}",
197 0 : node_id,
198 0 : String::from_utf8_lossy(&init_output.stdout),
199 0 : String::from_utf8_lossy(&init_output.stderr),
200 : );
201 :
202 CBC 349 : Ok(())
203 349 : }
204 :
205 560 : fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
206 560 : let mut overrides = self.neon_local_overrides();
207 660 : overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
208 560 :
209 560 : let datadir = self.repo_path();
210 560 : print!(
211 560 : "Starting pageserver node {} at '{}' in {:?}",
212 560 : self.conf.id,
213 560 : self.pg_connection_config.raw_address(),
214 560 : datadir
215 560 : );
216 560 : io::stdout().flush()?;
217 :
218 560 : let datadir_path_str = datadir.to_str().with_context(|| {
219 UBC 0 : format!(
220 0 : "Cannot start pageserver node {} in path that has no string representation: {:?}",
221 0 : self.conf.id, datadir,
222 0 : )
223 CBC 560 : })?;
224 560 : let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
225 560 : if update_config {
226 UBC 0 : args.push(Cow::Borrowed("--update-config"));
227 CBC 560 : }
228 :
229 : background_process::start_process(
230 560 : "pageserver",
231 560 : &datadir,
232 560 : &self.env.pageserver_bin(),
233 560 : args.iter().map(Cow::as_ref),
234 560 : self.pageserver_env_variables()?,
235 560 : background_process::InitialPidFile::Expect(&self.pid_file()),
236 1134 : || match self.check_status() {
237 560 : Ok(()) => Ok(true),
238 574 : Err(PageserverHttpError::Transport(_)) => Ok(false),
239 UBC 0 : Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
240 CBC 1134 : },
241 : )
242 560 : }
243 :
244 909 : fn pageserver_basic_args<'a>(
245 909 : &self,
246 909 : config_overrides: &'a [&'a str],
247 909 : datadir_path_str: &'a str,
248 909 : ) -> Vec<Cow<'a, str>> {
249 909 : let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
250 909 :
251 909 : let mut overrides = self.neon_local_overrides();
252 1068 : overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
253 8402 : for config_override in overrides {
254 7493 : args.push(Cow::Borrowed("-c"));
255 7493 : args.push(Cow::Owned(config_override));
256 7493 : }
257 :
258 909 : args
259 909 : }
260 :
261 909 : fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
262 909 : // FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
263 909 : // needs a token, and how to generate that token, seems independent to whether
264 909 : // the pageserver requires a token in incoming requests.
265 909 : Ok(if self.conf.http_auth_type != AuthType::Trust {
266 : // Generate a token to connect from the pageserver to a safekeeper
267 18 : let token = self
268 18 : .env
269 18 : .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
270 18 : vec![("NEON_AUTH_TOKEN".to_owned(), token)]
271 : } else {
272 891 : Vec::new()
273 : })
274 909 : }
275 :
276 : ///
277 : /// Stop the server.
278 : ///
279 : /// If 'immediate' is true, we use SIGQUIT, killing the process immediately.
280 : /// Otherwise we use SIGTERM, triggering a clean shutdown
281 : ///
282 : /// If the server is not running, returns success
283 : ///
284 563 : pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
285 563 : background_process::stop_process(immediate, "pageserver", &self.pid_file())
286 563 : }
287 :
288 5 : pub fn page_server_psql_client(&self) -> anyhow::Result<postgres::Client> {
289 5 : let mut config = self.pg_connection_config.clone();
290 5 : if self.conf.pg_auth_type == AuthType::NeonJWT {
291 UBC 0 : let token = self
292 0 : .env
293 0 : .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
294 0 : config = config.set_password(Some(token));
295 CBC 5 : }
296 5 : Ok(config.connect_no_tls()?)
297 5 : }
298 :
299 2410 : fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> anyhow::Result<RequestBuilder> {
300 2410 : let mut builder = self.http_client.request(method, url);
301 2410 : if self.conf.http_auth_type == AuthType::NeonJWT {
302 48 : let token = self
303 48 : .env
304 48 : .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
305 48 : builder = builder.bearer_auth(token)
306 2362 : }
307 2410 : Ok(builder)
308 2410 : }
309 :
310 1134 : pub fn check_status(&self) -> Result<()> {
311 1134 : self.http_request(Method::GET, format!("{}/status", self.http_base_url))?
312 1134 : .send()?
313 560 : .error_from_body()?;
314 560 : Ok(())
315 1134 : }
316 :
317 6 : pub fn tenant_list(&self) -> Result<Vec<TenantInfo>> {
318 6 : Ok(self
319 6 : .http_request(Method::GET, format!("{}/tenant", self.http_base_url))?
320 6 : .send()?
321 6 : .error_from_body()?
322 6 : .json()?)
323 6 : }
324 :
325 437 : pub fn tenant_create(
326 437 : &self,
327 437 : new_tenant_id: TenantId,
328 437 : generation: Option<u32>,
329 437 : settings: HashMap<&str, &str>,
330 437 : ) -> anyhow::Result<TenantId> {
331 437 : let mut settings = settings.clone();
332 :
333 437 : let config = models::TenantConfig {
334 437 : checkpoint_distance: settings
335 437 : .remove("checkpoint_distance")
336 437 : .map(|x| x.parse::<u64>())
337 437 : .transpose()?,
338 437 : checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
339 437 : compaction_target_size: settings
340 437 : .remove("compaction_target_size")
341 437 : .map(|x| x.parse::<u64>())
342 437 : .transpose()?,
343 437 : compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
344 437 : compaction_threshold: settings
345 437 : .remove("compaction_threshold")
346 437 : .map(|x| x.parse::<usize>())
347 437 : .transpose()?,
348 437 : gc_horizon: settings
349 437 : .remove("gc_horizon")
350 437 : .map(|x| x.parse::<u64>())
351 437 : .transpose()?,
352 437 : gc_period: settings.remove("gc_period").map(|x| x.to_string()),
353 437 : image_creation_threshold: settings
354 437 : .remove("image_creation_threshold")
355 437 : .map(|x| x.parse::<usize>())
356 437 : .transpose()?,
357 437 : pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
358 437 : walreceiver_connect_timeout: settings
359 437 : .remove("walreceiver_connect_timeout")
360 437 : .map(|x| x.to_string()),
361 437 : lagging_wal_timeout: settings
362 437 : .remove("lagging_wal_timeout")
363 437 : .map(|x| x.to_string()),
364 437 : max_lsn_wal_lag: settings
365 437 : .remove("max_lsn_wal_lag")
366 437 : .map(|x| x.parse::<NonZeroU64>())
367 437 : .transpose()
368 437 : .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
369 437 : trace_read_requests: settings
370 437 : .remove("trace_read_requests")
371 437 : .map(|x| x.parse::<bool>())
372 437 : .transpose()
373 437 : .context("Failed to parse 'trace_read_requests' as bool")?,
374 437 : eviction_policy: settings
375 437 : .remove("eviction_policy")
376 437 : .map(serde_json::from_str)
377 437 : .transpose()
378 437 : .context("Failed to parse 'eviction_policy' json")?,
379 437 : min_resident_size_override: settings
380 437 : .remove("min_resident_size_override")
381 437 : .map(|x| x.parse::<u64>())
382 437 : .transpose()
383 437 : .context("Failed to parse 'min_resident_size_override' as integer")?,
384 437 : evictions_low_residence_duration_metric_threshold: settings
385 437 : .remove("evictions_low_residence_duration_metric_threshold")
386 437 : .map(|x| x.to_string()),
387 437 : gc_feedback: settings
388 437 : .remove("gc_feedback")
389 437 : .map(|x| x.parse::<bool>())
390 437 : .transpose()
391 437 : .context("Failed to parse 'gc_feedback' as bool")?,
392 : };
393 :
394 437 : let request = models::TenantCreateRequest {
395 437 : new_tenant_id,
396 437 : generation,
397 437 : config,
398 437 : };
399 437 : if !settings.is_empty() {
400 1 : bail!("Unrecognized tenant settings: {settings:?}")
401 436 : }
402 436 : self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
403 436 : .json(&request)
404 436 : .send()?
405 436 : .error_from_body()?
406 435 : .json::<Option<String>>()
407 435 : .with_context(|| {
408 UBC 0 : format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
409 CBC 435 : })?
410 435 : .context("No tenant id was found in the tenant creation response")
411 435 : .and_then(|tenant_id_string| {
412 435 : tenant_id_string.parse().with_context(|| {
413 UBC 0 : format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
414 CBC 435 : })
415 435 : })
416 437 : }
417 :
418 14 : pub fn tenant_config(
419 14 : &self,
420 14 : tenant_id: TenantId,
421 14 : mut settings: HashMap<&str, &str>,
422 14 : ) -> anyhow::Result<()> {
423 14 : let config = {
424 : // Braces to make the diff easier to read
425 : models::TenantConfig {
426 14 : checkpoint_distance: settings
427 14 : .remove("checkpoint_distance")
428 14 : .map(|x| x.parse::<u64>())
429 14 : .transpose()
430 14 : .context("Failed to parse 'checkpoint_distance' as an integer")?,
431 14 : checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
432 14 : compaction_target_size: settings
433 14 : .remove("compaction_target_size")
434 14 : .map(|x| x.parse::<u64>())
435 14 : .transpose()
436 14 : .context("Failed to parse 'compaction_target_size' as an integer")?,
437 14 : compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
438 14 : compaction_threshold: settings
439 14 : .remove("compaction_threshold")
440 14 : .map(|x| x.parse::<usize>())
441 14 : .transpose()
442 14 : .context("Failed to parse 'compaction_threshold' as an integer")?,
443 14 : gc_horizon: settings
444 14 : .remove("gc_horizon")
445 14 : .map(|x| x.parse::<u64>())
446 14 : .transpose()
447 14 : .context("Failed to parse 'gc_horizon' as an integer")?,
448 14 : gc_period: settings.remove("gc_period").map(|x| x.to_string()),
449 14 : image_creation_threshold: settings
450 14 : .remove("image_creation_threshold")
451 14 : .map(|x| x.parse::<usize>())
452 14 : .transpose()
453 14 : .context("Failed to parse 'image_creation_threshold' as non zero integer")?,
454 14 : pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
455 14 : walreceiver_connect_timeout: settings
456 14 : .remove("walreceiver_connect_timeout")
457 14 : .map(|x| x.to_string()),
458 14 : lagging_wal_timeout: settings
459 14 : .remove("lagging_wal_timeout")
460 14 : .map(|x| x.to_string()),
461 14 : max_lsn_wal_lag: settings
462 14 : .remove("max_lsn_wal_lag")
463 14 : .map(|x| x.parse::<NonZeroU64>())
464 14 : .transpose()
465 14 : .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
466 14 : trace_read_requests: settings
467 14 : .remove("trace_read_requests")
468 14 : .map(|x| x.parse::<bool>())
469 14 : .transpose()
470 14 : .context("Failed to parse 'trace_read_requests' as bool")?,
471 14 : eviction_policy: settings
472 14 : .remove("eviction_policy")
473 14 : .map(serde_json::from_str)
474 14 : .transpose()
475 14 : .context("Failed to parse 'eviction_policy' json")?,
476 14 : min_resident_size_override: settings
477 14 : .remove("min_resident_size_override")
478 14 : .map(|x| x.parse::<u64>())
479 14 : .transpose()
480 14 : .context("Failed to parse 'min_resident_size_override' as an integer")?,
481 14 : evictions_low_residence_duration_metric_threshold: settings
482 14 : .remove("evictions_low_residence_duration_metric_threshold")
483 14 : .map(|x| x.to_string()),
484 14 : gc_feedback: settings
485 14 : .remove("gc_feedback")
486 14 : .map(|x| x.parse::<bool>())
487 14 : .transpose()
488 14 : .context("Failed to parse 'gc_feedback' as bool")?,
489 : }
490 : };
491 :
492 14 : if !settings.is_empty() {
493 UBC 0 : bail!("Unrecognized tenant settings: {settings:?}")
494 CBC 14 : }
495 14 :
496 14 : self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
497 14 : .json(&models::TenantConfigRequest { tenant_id, config })
498 14 : .send()?
499 14 : .error_from_body()?;
500 :
501 14 : Ok(())
502 14 : }
503 :
504 15 : pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
505 15 : let timeline_infos: Vec<TimelineInfo> = self
506 15 : .http_request(
507 15 : Method::GET,
508 15 : format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
509 15 : )?
510 15 : .send()?
511 15 : .error_from_body()?
512 15 : .json()?;
513 :
514 15 : Ok(timeline_infos)
515 15 : }
516 :
517 805 : pub fn timeline_create(
518 805 : &self,
519 805 : tenant_id: TenantId,
520 805 : new_timeline_id: Option<TimelineId>,
521 805 : ancestor_start_lsn: Option<Lsn>,
522 805 : ancestor_timeline_id: Option<TimelineId>,
523 805 : pg_version: Option<u32>,
524 805 : ) -> anyhow::Result<TimelineInfo> {
525 805 : // If timeline ID was not specified, generate one
526 805 : let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
527 805 :
528 805 : self.http_request(
529 805 : Method::POST,
530 805 : format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
531 805 : )?
532 805 : .json(&models::TimelineCreateRequest {
533 805 : new_timeline_id,
534 805 : ancestor_start_lsn,
535 805 : ancestor_timeline_id,
536 805 : pg_version,
537 805 : })
538 805 : .send()?
539 805 : .error_from_body()?
540 799 : .json::<Option<TimelineInfo>>()
541 799 : .with_context(|| {
542 UBC 0 : format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
543 CBC 799 : })?
544 799 : .with_context(|| {
545 UBC 0 : format!(
546 0 : "No timeline id was found in the timeline creation response for tenant {tenant_id}"
547 0 : )
548 CBC 799 : })
549 805 : }
550 :
551 : /// Import a basebackup prepared using either:
552 : /// a) `pg_basebackup -F tar`, or
553 : /// b) The `fullbackup` pageserver endpoint
554 : ///
555 : /// # Arguments
556 : /// * `tenant_id` - tenant to import into. Created if not exists
557 : /// * `timeline_id` - id to assign to imported timeline
558 : /// * `base` - (start lsn of basebackup, path to `base.tar` file)
559 : /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
560 5 : pub fn timeline_import(
561 5 : &self,
562 5 : tenant_id: TenantId,
563 5 : timeline_id: TimelineId,
564 5 : base: (Lsn, PathBuf),
565 5 : pg_wal: Option<(Lsn, PathBuf)>,
566 5 : pg_version: u32,
567 5 : ) -> anyhow::Result<()> {
568 5 : let mut client = self.page_server_psql_client()?;
569 :
570 : // Init base reader
571 5 : let (start_lsn, base_tarfile_path) = base;
572 5 : let base_tarfile = File::open(base_tarfile_path)?;
573 5 : let mut base_reader = BufReader::new(base_tarfile);
574 :
575 : // Init wal reader if necessary
576 5 : let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
577 4 : let wal_tarfile = File::open(wal_tarfile_path)?;
578 4 : let wal_reader = BufReader::new(wal_tarfile);
579 4 : (end_lsn, Some(wal_reader))
580 : } else {
581 1 : (start_lsn, None)
582 : };
583 :
584 : // Import base
585 5 : let import_cmd = format!(
586 5 : "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
587 5 : );
588 5 : let mut writer = client.copy_in(&import_cmd)?;
589 5 : io::copy(&mut base_reader, &mut writer)?;
590 5 : writer.finish()?;
591 :
592 : // Import wal if necessary
593 3 : if let Some(mut wal_reader) = wal_reader {
594 2 : let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
595 2 : let mut writer = client.copy_in(&import_cmd)?;
596 2 : io::copy(&mut wal_reader, &mut writer)?;
597 2 : writer.finish()?;
598 1 : }
599 :
600 3 : Ok(())
601 5 : }
602 : }
|