Line data Source code
1 : //! Code to manage pageservers
2 : //!
3 : //! In the local test environment, the pageserver stores its data directly in
4 : //!
5 : //! .neon/
6 : //!
7 : use std::borrow::Cow;
8 : use std::collections::HashMap;
9 : use std::fs::File;
10 : use std::io::{BufReader, Write};
11 : use std::num::NonZeroU64;
12 : use std::path::PathBuf;
13 : use std::process::{Child, Command};
14 : use std::{io, result};
15 :
16 : use anyhow::{bail, Context};
17 : use pageserver_api::models::{self, TenantInfo, TimelineInfo};
18 : use postgres_backend::AuthType;
19 : use postgres_connection::{parse_host_port, PgConnectionConfig};
20 : use reqwest::blocking::{Client, RequestBuilder, Response};
21 : use reqwest::{IntoUrl, Method};
22 : use thiserror::Error;
23 : use utils::auth::{Claims, Scope};
24 : use utils::{
25 : http::error::HttpErrorBody,
26 : id::{TenantId, TimelineId},
27 : lsn::Lsn,
28 : };
29 :
30 : use crate::{background_process, local_env::LocalEnv};
31 :
32 14 : #[derive(Error, Debug)]
33 : pub enum PageserverHttpError {
34 : #[error("Reqwest error: {0}")]
35 : Transport(#[from] reqwest::Error),
36 :
37 : #[error("Error: {0}")]
38 : Response(String),
39 : }
40 :
41 : impl From<anyhow::Error> for PageserverHttpError {
42 0 : fn from(e: anyhow::Error) -> Self {
43 0 : Self::Response(e.to_string())
44 0 : }
45 : }
46 :
47 : type Result<T> = result::Result<T, PageserverHttpError>;
48 :
49 : pub trait ResponseErrorMessageExt: Sized {
50 : fn error_from_body(self) -> Result<Self>;
51 : }
52 :
53 : impl ResponseErrorMessageExt for Response {
54 1972 : fn error_from_body(self) -> Result<Self> {
55 1972 : let status = self.status();
56 1972 : if !(status.is_client_error() || status.is_server_error()) {
57 1965 : return Ok(self);
58 7 : }
59 7 :
60 7 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
61 7 : let url = self.url().to_owned();
62 7 : Err(PageserverHttpError::Response(
63 7 : match self.json::<HttpErrorBody>() {
64 7 : Ok(err_body) => format!("Error: {}", err_body.msg),
65 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
66 : },
67 : ))
68 1972 : }
69 : }
70 :
71 : //
72 : // Control routines for pageserver.
73 : //
74 : // Used in CLI and tests.
75 : //
76 0 : #[derive(Debug)]
77 : pub struct PageServerNode {
78 : pub pg_connection_config: PgConnectionConfig,
79 : pub env: LocalEnv,
80 : pub http_client: Client,
81 : pub http_base_url: String,
82 : }
83 :
84 : impl PageServerNode {
85 4370 : pub fn from_env(env: &LocalEnv) -> PageServerNode {
86 4370 : let (host, port) = parse_host_port(&env.pageserver.listen_pg_addr)
87 4370 : .expect("Unable to parse listen_pg_addr");
88 4370 : let port = port.unwrap_or(5432);
89 4370 : Self {
90 4370 : pg_connection_config: PgConnectionConfig::new_host_port(host, port),
91 4370 : env: env.clone(),
92 4370 : http_client: Client::new(),
93 4370 : http_base_url: format!("http://{}/v1", env.pageserver.listen_http_addr),
94 4370 : }
95 4370 : }
96 :
97 : // pageserver conf overrides defined by neon_local configuration.
98 1519 : fn neon_local_overrides(&self) -> Vec<String> {
99 1519 : let id = format!("id={}", self.env.pageserver.id);
100 1519 : // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
101 1519 : let pg_distrib_dir_param = format!(
102 1519 : "pg_distrib_dir='{}'",
103 1519 : self.env.pg_distrib_dir_raw().display()
104 1519 : );
105 1519 :
106 1519 : let http_auth_type_param =
107 1519 : format!("http_auth_type='{}'", self.env.pageserver.http_auth_type);
108 1519 : let listen_http_addr_param = format!(
109 1519 : "listen_http_addr='{}'",
110 1519 : self.env.pageserver.listen_http_addr
111 1519 : );
112 1519 :
113 1519 : let pg_auth_type_param = format!("pg_auth_type='{}'", self.env.pageserver.pg_auth_type);
114 1519 : let listen_pg_addr_param =
115 1519 : format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
116 1519 :
117 1519 : let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
118 1519 :
119 1519 : let mut overrides = vec![
120 1519 : id,
121 1519 : pg_distrib_dir_param,
122 1519 : http_auth_type_param,
123 1519 : pg_auth_type_param,
124 1519 : listen_http_addr_param,
125 1519 : listen_pg_addr_param,
126 1519 : broker_endpoint_param,
127 1519 : ];
128 1519 :
129 1519 : if self.env.pageserver.http_auth_type != AuthType::Trust
130 1492 : || self.env.pageserver.pg_auth_type != AuthType::Trust
131 27 : {
132 27 : overrides.push("auth_validation_public_key_path='auth_public_key.pem'".to_owned());
133 1492 : }
134 1519 : overrides
135 1519 : }
136 :
137 : /// Initializes a pageserver node by creating its config with the overrides provided.
138 369 : pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
139 369 : // First, run `pageserver --init` and wait for it to write a config into FS and exit.
140 369 : self.pageserver_init(config_overrides).with_context(|| {
141 0 : format!(
142 0 : "Failed to run init for pageserver node {}",
143 0 : self.env.pageserver.id,
144 0 : )
145 369 : })
146 369 : }
147 :
148 2096 : pub fn repo_path(&self) -> PathBuf {
149 2096 : self.env.pageserver_data_dir()
150 2096 : }
151 :
152 : /// The pid file is created by the pageserver process, with its pid stored inside.
153 : /// Other pageservers cannot lock the same file and overwrite it for as long as the current
154 : /// pageserver runs. (Unless someone removes the file manually; never do that!)
155 1152 : fn pid_file(&self) -> PathBuf {
156 1152 : self.repo_path().join("pageserver.pid")
157 1152 : }
158 :
159 575 : pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
160 575 : self.start_node(config_overrides, false)
161 575 : }
162 :
163 369 : fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
164 369 : let datadir = self.repo_path();
165 369 : let node_id = self.env.pageserver.id;
166 369 : println!(
167 369 : "Initializing pageserver node {} at '{}' in {:?}",
168 369 : node_id,
169 369 : self.pg_connection_config.raw_address(),
170 369 : datadir
171 369 : );
172 369 : io::stdout().flush()?;
173 :
174 369 : let datadir_path_str = datadir.to_str().with_context(|| {
175 0 : format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
176 369 : })?;
177 369 : let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
178 369 : args.push(Cow::Borrowed("--init"));
179 :
180 369 : let init_output = Command::new(self.env.pageserver_bin())
181 369 : .args(args.iter().map(Cow::as_ref))
182 369 : .envs(self.pageserver_env_variables()?)
183 369 : .output()
184 369 : .with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
185 :
186 369 : anyhow::ensure!(
187 369 : init_output.status.success(),
188 0 : "Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}",
189 0 : node_id,
190 0 : String::from_utf8_lossy(&init_output.stdout),
191 0 : String::from_utf8_lossy(&init_output.stderr),
192 : );
193 :
194 369 : Ok(())
195 369 : }
196 :
197 575 : fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
198 575 : let mut overrides = self.neon_local_overrides();
199 575 : overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
200 575 :
201 575 : let datadir = self.repo_path();
202 575 : print!(
203 575 : "Starting pageserver node {} at '{}' in {:?}",
204 575 : self.env.pageserver.id,
205 575 : self.pg_connection_config.raw_address(),
206 575 : datadir
207 575 : );
208 575 : io::stdout().flush()?;
209 :
210 575 : let datadir_path_str = datadir.to_str().with_context(|| {
211 0 : format!(
212 0 : "Cannot start pageserver node {} in path that has no string representation: {:?}",
213 0 : self.env.pageserver.id, datadir,
214 0 : )
215 575 : })?;
216 575 : let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
217 575 : if update_config {
218 0 : args.push(Cow::Borrowed("--update-config"));
219 575 : }
220 :
221 : background_process::start_process(
222 575 : "pageserver",
223 575 : &datadir,
224 575 : &self.env.pageserver_bin(),
225 575 : args.iter().map(Cow::as_ref),
226 575 : self.pageserver_env_variables()?,
227 575 : background_process::InitialPidFile::Expect(&self.pid_file()),
228 1157 : || match self.check_status() {
229 575 : Ok(()) => Ok(true),
230 582 : Err(PageserverHttpError::Transport(_)) => Ok(false),
231 0 : Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
232 1157 : },
233 : )
234 575 : }
235 :
236 944 : fn pageserver_basic_args<'a>(
237 944 : &self,
238 944 : config_overrides: &'a [&'a str],
239 944 : datadir_path_str: &'a str,
240 944 : ) -> Vec<Cow<'a, str>> {
241 944 : let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
242 944 :
243 944 : let mut overrides = self.neon_local_overrides();
244 944 : overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
245 8220 : for config_override in overrides {
246 7276 : args.push(Cow::Borrowed("-c"));
247 7276 : args.push(Cow::Owned(config_override));
248 7276 : }
249 :
250 944 : args
251 944 : }
252 :
253 944 : fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
254 944 : // FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
255 944 : // needs a token, and how to generate that token, seems independent to whether
256 944 : // the pageserver requires a token in incoming requests.
257 944 : Ok(if self.env.pageserver.http_auth_type != AuthType::Trust {
258 : // Generate a token to connect from the pageserver to a safekeeper
259 18 : let token = self
260 18 : .env
261 18 : .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
262 18 : vec![("NEON_AUTH_TOKEN".to_owned(), token)]
263 : } else {
264 926 : Vec::new()
265 : })
266 944 : }
267 :
268 : ///
269 : /// Stop the server.
270 : ///
271 : /// If 'immediate' is true, we use SIGQUIT, killing the process immediately.
272 : /// Otherwise we use SIGTERM, triggering a clean shutdown
273 : ///
274 : /// If the server is not running, returns success
275 : ///
276 577 : pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
277 577 : background_process::stop_process(immediate, "pageserver", &self.pid_file())
278 577 : }
279 :
280 5 : pub fn page_server_psql_client(&self) -> anyhow::Result<postgres::Client> {
281 5 : let mut config = self.pg_connection_config.clone();
282 5 : if self.env.pageserver.pg_auth_type == AuthType::NeonJWT {
283 0 : let token = self
284 0 : .env
285 0 : .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
286 0 : config = config.set_password(Some(token));
287 5 : }
288 5 : Ok(config.connect_no_tls()?)
289 5 : }
290 :
291 2554 : fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> anyhow::Result<RequestBuilder> {
292 2554 : let mut builder = self.http_client.request(method, url);
293 2554 : if self.env.pageserver.http_auth_type == AuthType::NeonJWT {
294 48 : let token = self
295 48 : .env
296 48 : .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
297 48 : builder = builder.bearer_auth(token)
298 2506 : }
299 2554 : Ok(builder)
300 2554 : }
301 :
302 1157 : pub fn check_status(&self) -> Result<()> {
303 1157 : self.http_request(Method::GET, format!("{}/status", self.http_base_url))?
304 1157 : .send()?
305 575 : .error_from_body()?;
306 575 : Ok(())
307 1157 : }
308 :
309 6 : pub fn tenant_list(&self) -> Result<Vec<TenantInfo>> {
310 6 : Ok(self
311 6 : .http_request(Method::GET, format!("{}/tenant", self.http_base_url))?
312 6 : .send()?
313 6 : .error_from_body()?
314 6 : .json()?)
315 6 : }
316 :
317 472 : pub fn tenant_create(
318 472 : &self,
319 472 : new_tenant_id: Option<TenantId>,
320 472 : settings: HashMap<&str, &str>,
321 472 : ) -> anyhow::Result<TenantId> {
322 472 : let mut settings = settings.clone();
323 :
324 472 : let config = models::TenantConfig {
325 472 : checkpoint_distance: settings
326 472 : .remove("checkpoint_distance")
327 472 : .map(|x| x.parse::<u64>())
328 472 : .transpose()?,
329 472 : checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
330 472 : compaction_target_size: settings
331 472 : .remove("compaction_target_size")
332 472 : .map(|x| x.parse::<u64>())
333 472 : .transpose()?,
334 472 : compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
335 472 : compaction_threshold: settings
336 472 : .remove("compaction_threshold")
337 472 : .map(|x| x.parse::<usize>())
338 472 : .transpose()?,
339 472 : gc_horizon: settings
340 472 : .remove("gc_horizon")
341 472 : .map(|x| x.parse::<u64>())
342 472 : .transpose()?,
343 472 : gc_period: settings.remove("gc_period").map(|x| x.to_string()),
344 472 : image_creation_threshold: settings
345 472 : .remove("image_creation_threshold")
346 472 : .map(|x| x.parse::<usize>())
347 472 : .transpose()?,
348 472 : pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
349 472 : walreceiver_connect_timeout: settings
350 472 : .remove("walreceiver_connect_timeout")
351 472 : .map(|x| x.to_string()),
352 472 : lagging_wal_timeout: settings
353 472 : .remove("lagging_wal_timeout")
354 472 : .map(|x| x.to_string()),
355 472 : max_lsn_wal_lag: settings
356 472 : .remove("max_lsn_wal_lag")
357 472 : .map(|x| x.parse::<NonZeroU64>())
358 472 : .transpose()
359 472 : .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
360 472 : trace_read_requests: settings
361 472 : .remove("trace_read_requests")
362 472 : .map(|x| x.parse::<bool>())
363 472 : .transpose()
364 472 : .context("Failed to parse 'trace_read_requests' as bool")?,
365 472 : eviction_policy: settings
366 472 : .remove("eviction_policy")
367 472 : .map(serde_json::from_str)
368 472 : .transpose()
369 472 : .context("Failed to parse 'eviction_policy' json")?,
370 472 : min_resident_size_override: settings
371 472 : .remove("min_resident_size_override")
372 472 : .map(|x| x.parse::<u64>())
373 472 : .transpose()
374 472 : .context("Failed to parse 'min_resident_size_override' as integer")?,
375 472 : evictions_low_residence_duration_metric_threshold: settings
376 472 : .remove("evictions_low_residence_duration_metric_threshold")
377 472 : .map(|x| x.to_string()),
378 472 : gc_feedback: settings
379 472 : .remove("gc_feedback")
380 472 : .map(|x| x.parse::<bool>())
381 472 : .transpose()
382 472 : .context("Failed to parse 'gc_feedback' as bool")?,
383 : };
384 :
385 : // If tenant ID was not specified, generate one
386 472 : let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
387 472 :
388 472 : let request = models::TenantCreateRequest {
389 472 : new_tenant_id,
390 472 : config,
391 472 : };
392 472 : if !settings.is_empty() {
393 1 : bail!("Unrecognized tenant settings: {settings:?}")
394 471 : }
395 471 : self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
396 471 : .json(&request)
397 471 : .send()?
398 471 : .error_from_body()?
399 470 : .json::<Option<String>>()
400 470 : .with_context(|| {
401 0 : format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
402 470 : })?
403 470 : .context("No tenant id was found in the tenant creation response")
404 470 : .and_then(|tenant_id_string| {
405 470 : tenant_id_string.parse().with_context(|| {
406 0 : format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
407 470 : })
408 470 : })
409 472 : }
410 :
411 14 : pub fn tenant_config(
412 14 : &self,
413 14 : tenant_id: TenantId,
414 14 : mut settings: HashMap<&str, &str>,
415 14 : ) -> anyhow::Result<()> {
416 14 : let config = {
417 : // Braces to make the diff easier to read
418 : models::TenantConfig {
419 14 : checkpoint_distance: settings
420 14 : .remove("checkpoint_distance")
421 14 : .map(|x| x.parse::<u64>())
422 14 : .transpose()
423 14 : .context("Failed to parse 'checkpoint_distance' as an integer")?,
424 14 : checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
425 14 : compaction_target_size: settings
426 14 : .remove("compaction_target_size")
427 14 : .map(|x| x.parse::<u64>())
428 14 : .transpose()
429 14 : .context("Failed to parse 'compaction_target_size' as an integer")?,
430 14 : compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
431 14 : compaction_threshold: settings
432 14 : .remove("compaction_threshold")
433 14 : .map(|x| x.parse::<usize>())
434 14 : .transpose()
435 14 : .context("Failed to parse 'compaction_threshold' as an integer")?,
436 14 : gc_horizon: settings
437 14 : .remove("gc_horizon")
438 14 : .map(|x| x.parse::<u64>())
439 14 : .transpose()
440 14 : .context("Failed to parse 'gc_horizon' as an integer")?,
441 14 : gc_period: settings.remove("gc_period").map(|x| x.to_string()),
442 14 : image_creation_threshold: settings
443 14 : .remove("image_creation_threshold")
444 14 : .map(|x| x.parse::<usize>())
445 14 : .transpose()
446 14 : .context("Failed to parse 'image_creation_threshold' as non zero integer")?,
447 14 : pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
448 14 : walreceiver_connect_timeout: settings
449 14 : .remove("walreceiver_connect_timeout")
450 14 : .map(|x| x.to_string()),
451 14 : lagging_wal_timeout: settings
452 14 : .remove("lagging_wal_timeout")
453 14 : .map(|x| x.to_string()),
454 14 : max_lsn_wal_lag: settings
455 14 : .remove("max_lsn_wal_lag")
456 14 : .map(|x| x.parse::<NonZeroU64>())
457 14 : .transpose()
458 14 : .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
459 14 : trace_read_requests: settings
460 14 : .remove("trace_read_requests")
461 14 : .map(|x| x.parse::<bool>())
462 14 : .transpose()
463 14 : .context("Failed to parse 'trace_read_requests' as bool")?,
464 14 : eviction_policy: settings
465 14 : .remove("eviction_policy")
466 14 : .map(serde_json::from_str)
467 14 : .transpose()
468 14 : .context("Failed to parse 'eviction_policy' json")?,
469 14 : min_resident_size_override: settings
470 14 : .remove("min_resident_size_override")
471 14 : .map(|x| x.parse::<u64>())
472 14 : .transpose()
473 14 : .context("Failed to parse 'min_resident_size_override' as an integer")?,
474 14 : evictions_low_residence_duration_metric_threshold: settings
475 14 : .remove("evictions_low_residence_duration_metric_threshold")
476 14 : .map(|x| x.to_string()),
477 14 : gc_feedback: settings
478 14 : .remove("gc_feedback")
479 14 : .map(|x| x.parse::<bool>())
480 14 : .transpose()
481 14 : .context("Failed to parse 'gc_feedback' as bool")?,
482 : }
483 : };
484 :
485 14 : if !settings.is_empty() {
486 0 : bail!("Unrecognized tenant settings: {settings:?}")
487 14 : }
488 14 :
489 14 : self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
490 14 : .json(&models::TenantConfigRequest { tenant_id, config })
491 14 : .send()?
492 14 : .error_from_body()?;
493 :
494 14 : Ok(())
495 14 : }
496 :
497 15 : pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
498 15 : let timeline_infos: Vec<TimelineInfo> = self
499 15 : .http_request(
500 15 : Method::GET,
501 15 : format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
502 15 : )?
503 15 : .send()?
504 15 : .error_from_body()?
505 15 : .json()?;
506 :
507 15 : Ok(timeline_infos)
508 15 : }
509 :
510 891 : pub fn timeline_create(
511 891 : &self,
512 891 : tenant_id: TenantId,
513 891 : new_timeline_id: Option<TimelineId>,
514 891 : ancestor_start_lsn: Option<Lsn>,
515 891 : ancestor_timeline_id: Option<TimelineId>,
516 891 : pg_version: Option<u32>,
517 891 : ) -> anyhow::Result<TimelineInfo> {
518 891 : // If timeline ID was not specified, generate one
519 891 : let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
520 891 :
521 891 : self.http_request(
522 891 : Method::POST,
523 891 : format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
524 891 : )?
525 891 : .json(&models::TimelineCreateRequest {
526 891 : new_timeline_id,
527 891 : ancestor_start_lsn,
528 891 : ancestor_timeline_id,
529 891 : pg_version,
530 891 : })
531 891 : .send()?
532 891 : .error_from_body()?
533 885 : .json::<Option<TimelineInfo>>()
534 885 : .with_context(|| {
535 0 : format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
536 885 : })?
537 885 : .with_context(|| {
538 0 : format!(
539 0 : "No timeline id was found in the timeline creation response for tenant {tenant_id}"
540 0 : )
541 885 : })
542 891 : }
543 :
544 : /// Import a basebackup prepared using either:
545 : /// a) `pg_basebackup -F tar`, or
546 : /// b) The `fullbackup` pageserver endpoint
547 : ///
548 : /// # Arguments
549 : /// * `tenant_id` - tenant to import into. Created if not exists
550 : /// * `timeline_id` - id to assign to imported timeline
551 : /// * `base` - (start lsn of basebackup, path to `base.tar` file)
552 : /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
553 5 : pub fn timeline_import(
554 5 : &self,
555 5 : tenant_id: TenantId,
556 5 : timeline_id: TimelineId,
557 5 : base: (Lsn, PathBuf),
558 5 : pg_wal: Option<(Lsn, PathBuf)>,
559 5 : pg_version: u32,
560 5 : ) -> anyhow::Result<()> {
561 5 : let mut client = self.page_server_psql_client()?;
562 :
563 : // Init base reader
564 5 : let (start_lsn, base_tarfile_path) = base;
565 5 : let base_tarfile = File::open(base_tarfile_path)?;
566 5 : let mut base_reader = BufReader::new(base_tarfile);
567 :
568 : // Init wal reader if necessary
569 5 : let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
570 4 : let wal_tarfile = File::open(wal_tarfile_path)?;
571 4 : let wal_reader = BufReader::new(wal_tarfile);
572 4 : (end_lsn, Some(wal_reader))
573 : } else {
574 1 : (start_lsn, None)
575 : };
576 :
577 : // Import base
578 5 : let import_cmd = format!(
579 5 : "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
580 5 : );
581 5 : let mut writer = client.copy_in(&import_cmd)?;
582 5 : io::copy(&mut base_reader, &mut writer)?;
583 5 : writer.finish()?;
584 :
585 : // Import wal if necessary
586 3 : if let Some(mut wal_reader) = wal_reader {
587 2 : let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
588 2 : let mut writer = client.copy_in(&import_cmd)?;
589 2 : io::copy(&mut wal_reader, &mut writer)?;
590 2 : writer.finish()?;
591 1 : }
592 :
593 3 : Ok(())
594 5 : }
595 : }
|