LCOV - code coverage report
Current view: top level - control_plane/src - pageserver.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 93.1 % 452 421
Test Date: 2023-09-06 10:18:01 Functions: 71.8 % 71 51

            Line data    Source code
       1              : //! Code to manage pageservers
       2              : //!
       3              : //! In the local test environment, the pageserver stores its data directly in
       4              : //!
       5              : //!   .neon/
       6              : //!
       7              : use std::borrow::Cow;
       8              : use std::collections::HashMap;
       9              : use std::fs::File;
      10              : use std::io::{BufReader, Write};
      11              : use std::num::NonZeroU64;
      12              : use std::path::PathBuf;
      13              : use std::process::{Child, Command};
      14              : use std::{io, result};
      15              : 
      16              : use anyhow::{bail, Context};
      17              : use pageserver_api::models::{self, TenantInfo, TimelineInfo};
      18              : use postgres_backend::AuthType;
      19              : use postgres_connection::{parse_host_port, PgConnectionConfig};
      20              : use reqwest::blocking::{Client, RequestBuilder, Response};
      21              : use reqwest::{IntoUrl, Method};
      22              : use thiserror::Error;
      23              : use utils::auth::{Claims, Scope};
      24              : use utils::{
      25              :     http::error::HttpErrorBody,
      26              :     id::{TenantId, TimelineId},
      27              :     lsn::Lsn,
      28              : };
      29              : 
      30              : use crate::{background_process, local_env::LocalEnv};
      31              : 
      32           14 : #[derive(Error, Debug)]
      33              : pub enum PageserverHttpError {
      34              :     #[error("Reqwest error: {0}")]
      35              :     Transport(#[from] reqwest::Error),
      36              : 
      37              :     #[error("Error: {0}")]
      38              :     Response(String),
      39              : }
      40              : 
      41              : impl From<anyhow::Error> for PageserverHttpError {
      42            0 :     fn from(e: anyhow::Error) -> Self {
      43            0 :         Self::Response(e.to_string())
      44            0 :     }
      45              : }
      46              : 
      47              : type Result<T> = result::Result<T, PageserverHttpError>;
      48              : 
      49              : pub trait ResponseErrorMessageExt: Sized {
      50              :     fn error_from_body(self) -> Result<Self>;
      51              : }
      52              : 
      53              : impl ResponseErrorMessageExt for Response {
      54         1972 :     fn error_from_body(self) -> Result<Self> {
      55         1972 :         let status = self.status();
      56         1972 :         if !(status.is_client_error() || status.is_server_error()) {
      57         1965 :             return Ok(self);
      58            7 :         }
      59            7 : 
      60            7 :         // reqwest does not export its error construction utility functions, so let's craft the message ourselves
      61            7 :         let url = self.url().to_owned();
      62            7 :         Err(PageserverHttpError::Response(
      63            7 :             match self.json::<HttpErrorBody>() {
      64            7 :                 Ok(err_body) => format!("Error: {}", err_body.msg),
      65            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
      66              :             },
      67              :         ))
      68         1972 :     }
      69              : }
      70              : 
      71              : //
      72              : // Control routines for pageserver.
      73              : //
      74              : // Used in CLI and tests.
      75              : //
      76            0 : #[derive(Debug)]
      77              : pub struct PageServerNode {
      78              :     pub pg_connection_config: PgConnectionConfig,
      79              :     pub env: LocalEnv,
      80              :     pub http_client: Client,
      81              :     pub http_base_url: String,
      82              : }
      83              : 
      84              : impl PageServerNode {
      85         4370 :     pub fn from_env(env: &LocalEnv) -> PageServerNode {
      86         4370 :         let (host, port) = parse_host_port(&env.pageserver.listen_pg_addr)
      87         4370 :             .expect("Unable to parse listen_pg_addr");
      88         4370 :         let port = port.unwrap_or(5432);
      89         4370 :         Self {
      90         4370 :             pg_connection_config: PgConnectionConfig::new_host_port(host, port),
      91         4370 :             env: env.clone(),
      92         4370 :             http_client: Client::new(),
      93         4370 :             http_base_url: format!("http://{}/v1", env.pageserver.listen_http_addr),
      94         4370 :         }
      95         4370 :     }
      96              : 
      97              :     // pageserver conf overrides defined by neon_local configuration.
      98         1519 :     fn neon_local_overrides(&self) -> Vec<String> {
      99         1519 :         let id = format!("id={}", self.env.pageserver.id);
     100         1519 :         // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
     101         1519 :         let pg_distrib_dir_param = format!(
     102         1519 :             "pg_distrib_dir='{}'",
     103         1519 :             self.env.pg_distrib_dir_raw().display()
     104         1519 :         );
     105         1519 : 
     106         1519 :         let http_auth_type_param =
     107         1519 :             format!("http_auth_type='{}'", self.env.pageserver.http_auth_type);
     108         1519 :         let listen_http_addr_param = format!(
     109         1519 :             "listen_http_addr='{}'",
     110         1519 :             self.env.pageserver.listen_http_addr
     111         1519 :         );
     112         1519 : 
     113         1519 :         let pg_auth_type_param = format!("pg_auth_type='{}'", self.env.pageserver.pg_auth_type);
     114         1519 :         let listen_pg_addr_param =
     115         1519 :             format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
     116         1519 : 
     117         1519 :         let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
     118         1519 : 
     119         1519 :         let mut overrides = vec![
     120         1519 :             id,
     121         1519 :             pg_distrib_dir_param,
     122         1519 :             http_auth_type_param,
     123         1519 :             pg_auth_type_param,
     124         1519 :             listen_http_addr_param,
     125         1519 :             listen_pg_addr_param,
     126         1519 :             broker_endpoint_param,
     127         1519 :         ];
     128         1519 : 
     129         1519 :         if self.env.pageserver.http_auth_type != AuthType::Trust
     130         1492 :             || self.env.pageserver.pg_auth_type != AuthType::Trust
     131           27 :         {
     132           27 :             overrides.push("auth_validation_public_key_path='auth_public_key.pem'".to_owned());
     133         1492 :         }
     134         1519 :         overrides
     135         1519 :     }
     136              : 
     137              :     /// Initializes a pageserver node by creating its config with the overrides provided.
     138          369 :     pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
     139          369 :         // First, run `pageserver --init` and wait for it to write a config into FS and exit.
     140          369 :         self.pageserver_init(config_overrides).with_context(|| {
     141            0 :             format!(
     142            0 :                 "Failed to run init for pageserver node {}",
     143            0 :                 self.env.pageserver.id,
     144            0 :             )
     145          369 :         })
     146          369 :     }
     147              : 
     148         2096 :     pub fn repo_path(&self) -> PathBuf {
     149         2096 :         self.env.pageserver_data_dir()
     150         2096 :     }
     151              : 
     152              :     /// The pid file is created by the pageserver process, with its pid stored inside.
     153              :     /// Other pageservers cannot lock the same file and overwrite it for as long as the current
     154              :     /// pageserver runs. (Unless someone removes the file manually; never do that!)
     155         1152 :     fn pid_file(&self) -> PathBuf {
     156         1152 :         self.repo_path().join("pageserver.pid")
     157         1152 :     }
     158              : 
     159          575 :     pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
     160          575 :         self.start_node(config_overrides, false)
     161          575 :     }
     162              : 
     163          369 :     fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
     164          369 :         let datadir = self.repo_path();
     165          369 :         let node_id = self.env.pageserver.id;
     166          369 :         println!(
     167          369 :             "Initializing pageserver node {} at '{}' in {:?}",
     168          369 :             node_id,
     169          369 :             self.pg_connection_config.raw_address(),
     170          369 :             datadir
     171          369 :         );
     172          369 :         io::stdout().flush()?;
     173              : 
     174          369 :         let datadir_path_str = datadir.to_str().with_context(|| {
     175            0 :             format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
     176          369 :         })?;
     177          369 :         let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
     178          369 :         args.push(Cow::Borrowed("--init"));
     179              : 
     180          369 :         let init_output = Command::new(self.env.pageserver_bin())
     181          369 :             .args(args.iter().map(Cow::as_ref))
     182          369 :             .envs(self.pageserver_env_variables()?)
     183          369 :             .output()
     184          369 :             .with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
     185              : 
     186          369 :         anyhow::ensure!(
     187          369 :             init_output.status.success(),
     188            0 :             "Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}",
     189            0 :             node_id,
     190            0 :             String::from_utf8_lossy(&init_output.stdout),
     191            0 :             String::from_utf8_lossy(&init_output.stderr),
     192              :         );
     193              : 
     194          369 :         Ok(())
     195          369 :     }
     196              : 
     197          575 :     fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
     198          575 :         let mut overrides = self.neon_local_overrides();
     199          575 :         overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
     200          575 : 
     201          575 :         let datadir = self.repo_path();
     202          575 :         print!(
     203          575 :             "Starting pageserver node {} at '{}' in {:?}",
     204          575 :             self.env.pageserver.id,
     205          575 :             self.pg_connection_config.raw_address(),
     206          575 :             datadir
     207          575 :         );
     208          575 :         io::stdout().flush()?;
     209              : 
     210          575 :         let datadir_path_str = datadir.to_str().with_context(|| {
     211            0 :             format!(
     212            0 :                 "Cannot start pageserver node {} in path that has no string representation: {:?}",
     213            0 :                 self.env.pageserver.id, datadir,
     214            0 :             )
     215          575 :         })?;
     216          575 :         let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
     217          575 :         if update_config {
     218            0 :             args.push(Cow::Borrowed("--update-config"));
     219          575 :         }
     220              : 
     221              :         background_process::start_process(
     222          575 :             "pageserver",
     223          575 :             &datadir,
     224          575 :             &self.env.pageserver_bin(),
     225          575 :             args.iter().map(Cow::as_ref),
     226          575 :             self.pageserver_env_variables()?,
     227          575 :             background_process::InitialPidFile::Expect(&self.pid_file()),
     228         1157 :             || match self.check_status() {
     229          575 :                 Ok(()) => Ok(true),
     230          582 :                 Err(PageserverHttpError::Transport(_)) => Ok(false),
     231            0 :                 Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
     232         1157 :             },
     233              :         )
     234          575 :     }
     235              : 
     236          944 :     fn pageserver_basic_args<'a>(
     237          944 :         &self,
     238          944 :         config_overrides: &'a [&'a str],
     239          944 :         datadir_path_str: &'a str,
     240          944 :     ) -> Vec<Cow<'a, str>> {
     241          944 :         let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
     242          944 : 
     243          944 :         let mut overrides = self.neon_local_overrides();
     244          944 :         overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
     245         8220 :         for config_override in overrides {
     246         7276 :             args.push(Cow::Borrowed("-c"));
     247         7276 :             args.push(Cow::Owned(config_override));
     248         7276 :         }
     249              : 
     250          944 :         args
     251          944 :     }
     252              : 
     253          944 :     fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
     254          944 :         // FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
     255          944 :         // needs a token, and how to generate that token, seems independent to whether
     256          944 :         // the pageserver requires a token in incoming requests.
     257          944 :         Ok(if self.env.pageserver.http_auth_type != AuthType::Trust {
     258              :             // Generate a token to connect from the pageserver to a safekeeper
     259           18 :             let token = self
     260           18 :                 .env
     261           18 :                 .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
     262           18 :             vec![("NEON_AUTH_TOKEN".to_owned(), token)]
     263              :         } else {
     264          926 :             Vec::new()
     265              :         })
     266          944 :     }
     267              : 
     268              :     ///
     269              :     /// Stop the server.
     270              :     ///
     271              :     /// If 'immediate' is true, we use SIGQUIT, killing the process immediately.
     272              :     /// Otherwise we use SIGTERM, triggering a clean shutdown
     273              :     ///
     274              :     /// If the server is not running, returns success
     275              :     ///
     276          577 :     pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
     277          577 :         background_process::stop_process(immediate, "pageserver", &self.pid_file())
     278          577 :     }
     279              : 
     280            5 :     pub fn page_server_psql_client(&self) -> anyhow::Result<postgres::Client> {
     281            5 :         let mut config = self.pg_connection_config.clone();
     282            5 :         if self.env.pageserver.pg_auth_type == AuthType::NeonJWT {
     283            0 :             let token = self
     284            0 :                 .env
     285            0 :                 .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
     286            0 :             config = config.set_password(Some(token));
     287            5 :         }
     288            5 :         Ok(config.connect_no_tls()?)
     289            5 :     }
     290              : 
     291         2554 :     fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> anyhow::Result<RequestBuilder> {
     292         2554 :         let mut builder = self.http_client.request(method, url);
     293         2554 :         if self.env.pageserver.http_auth_type == AuthType::NeonJWT {
     294           48 :             let token = self
     295           48 :                 .env
     296           48 :                 .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
     297           48 :             builder = builder.bearer_auth(token)
     298         2506 :         }
     299         2554 :         Ok(builder)
     300         2554 :     }
     301              : 
     302         1157 :     pub fn check_status(&self) -> Result<()> {
     303         1157 :         self.http_request(Method::GET, format!("{}/status", self.http_base_url))?
     304         1157 :             .send()?
     305          575 :             .error_from_body()?;
     306          575 :         Ok(())
     307         1157 :     }
     308              : 
     309            6 :     pub fn tenant_list(&self) -> Result<Vec<TenantInfo>> {
     310            6 :         Ok(self
     311            6 :             .http_request(Method::GET, format!("{}/tenant", self.http_base_url))?
     312            6 :             .send()?
     313            6 :             .error_from_body()?
     314            6 :             .json()?)
     315            6 :     }
     316              : 
     317          472 :     pub fn tenant_create(
     318          472 :         &self,
     319          472 :         new_tenant_id: Option<TenantId>,
     320          472 :         settings: HashMap<&str, &str>,
     321          472 :     ) -> anyhow::Result<TenantId> {
     322          472 :         let mut settings = settings.clone();
     323              : 
     324          472 :         let config = models::TenantConfig {
     325          472 :             checkpoint_distance: settings
     326          472 :                 .remove("checkpoint_distance")
     327          472 :                 .map(|x| x.parse::<u64>())
     328          472 :                 .transpose()?,
     329          472 :             checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
     330          472 :             compaction_target_size: settings
     331          472 :                 .remove("compaction_target_size")
     332          472 :                 .map(|x| x.parse::<u64>())
     333          472 :                 .transpose()?,
     334          472 :             compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
     335          472 :             compaction_threshold: settings
     336          472 :                 .remove("compaction_threshold")
     337          472 :                 .map(|x| x.parse::<usize>())
     338          472 :                 .transpose()?,
     339          472 :             gc_horizon: settings
     340          472 :                 .remove("gc_horizon")
     341          472 :                 .map(|x| x.parse::<u64>())
     342          472 :                 .transpose()?,
     343          472 :             gc_period: settings.remove("gc_period").map(|x| x.to_string()),
     344          472 :             image_creation_threshold: settings
     345          472 :                 .remove("image_creation_threshold")
     346          472 :                 .map(|x| x.parse::<usize>())
     347          472 :                 .transpose()?,
     348          472 :             pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
     349          472 :             walreceiver_connect_timeout: settings
     350          472 :                 .remove("walreceiver_connect_timeout")
     351          472 :                 .map(|x| x.to_string()),
     352          472 :             lagging_wal_timeout: settings
     353          472 :                 .remove("lagging_wal_timeout")
     354          472 :                 .map(|x| x.to_string()),
     355          472 :             max_lsn_wal_lag: settings
     356          472 :                 .remove("max_lsn_wal_lag")
     357          472 :                 .map(|x| x.parse::<NonZeroU64>())
     358          472 :                 .transpose()
     359          472 :                 .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
     360          472 :             trace_read_requests: settings
     361          472 :                 .remove("trace_read_requests")
     362          472 :                 .map(|x| x.parse::<bool>())
     363          472 :                 .transpose()
     364          472 :                 .context("Failed to parse 'trace_read_requests' as bool")?,
     365          472 :             eviction_policy: settings
     366          472 :                 .remove("eviction_policy")
     367          472 :                 .map(serde_json::from_str)
     368          472 :                 .transpose()
     369          472 :                 .context("Failed to parse 'eviction_policy' json")?,
     370          472 :             min_resident_size_override: settings
     371          472 :                 .remove("min_resident_size_override")
     372          472 :                 .map(|x| x.parse::<u64>())
     373          472 :                 .transpose()
     374          472 :                 .context("Failed to parse 'min_resident_size_override' as integer")?,
     375          472 :             evictions_low_residence_duration_metric_threshold: settings
     376          472 :                 .remove("evictions_low_residence_duration_metric_threshold")
     377          472 :                 .map(|x| x.to_string()),
     378          472 :             gc_feedback: settings
     379          472 :                 .remove("gc_feedback")
     380          472 :                 .map(|x| x.parse::<bool>())
     381          472 :                 .transpose()
     382          472 :                 .context("Failed to parse 'gc_feedback' as bool")?,
     383              :         };
     384              : 
     385              :         // If tenant ID was not specified, generate one
     386          472 :         let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
     387          472 : 
     388          472 :         let request = models::TenantCreateRequest {
     389          472 :             new_tenant_id,
     390          472 :             config,
     391          472 :         };
     392          472 :         if !settings.is_empty() {
     393            1 :             bail!("Unrecognized tenant settings: {settings:?}")
     394          471 :         }
     395          471 :         self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
     396          471 :             .json(&request)
     397          471 :             .send()?
     398          471 :             .error_from_body()?
     399          470 :             .json::<Option<String>>()
     400          470 :             .with_context(|| {
     401            0 :                 format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
     402          470 :             })?
     403          470 :             .context("No tenant id was found in the tenant creation response")
     404          470 :             .and_then(|tenant_id_string| {
     405          470 :                 tenant_id_string.parse().with_context(|| {
     406            0 :                     format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
     407          470 :                 })
     408          470 :             })
     409          472 :     }
     410              : 
     411           14 :     pub fn tenant_config(
     412           14 :         &self,
     413           14 :         tenant_id: TenantId,
     414           14 :         mut settings: HashMap<&str, &str>,
     415           14 :     ) -> anyhow::Result<()> {
     416           14 :         let config = {
     417              :             // Braces to make the diff easier to read
     418              :             models::TenantConfig {
     419           14 :                 checkpoint_distance: settings
     420           14 :                     .remove("checkpoint_distance")
     421           14 :                     .map(|x| x.parse::<u64>())
     422           14 :                     .transpose()
     423           14 :                     .context("Failed to parse 'checkpoint_distance' as an integer")?,
     424           14 :                 checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
     425           14 :                 compaction_target_size: settings
     426           14 :                     .remove("compaction_target_size")
     427           14 :                     .map(|x| x.parse::<u64>())
     428           14 :                     .transpose()
     429           14 :                     .context("Failed to parse 'compaction_target_size' as an integer")?,
     430           14 :                 compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
     431           14 :                 compaction_threshold: settings
     432           14 :                     .remove("compaction_threshold")
     433           14 :                     .map(|x| x.parse::<usize>())
     434           14 :                     .transpose()
     435           14 :                     .context("Failed to parse 'compaction_threshold' as an integer")?,
     436           14 :                 gc_horizon: settings
     437           14 :                     .remove("gc_horizon")
     438           14 :                     .map(|x| x.parse::<u64>())
     439           14 :                     .transpose()
     440           14 :                     .context("Failed to parse 'gc_horizon' as an integer")?,
     441           14 :                 gc_period: settings.remove("gc_period").map(|x| x.to_string()),
     442           14 :                 image_creation_threshold: settings
     443           14 :                     .remove("image_creation_threshold")
     444           14 :                     .map(|x| x.parse::<usize>())
     445           14 :                     .transpose()
     446           14 :                     .context("Failed to parse 'image_creation_threshold' as non zero integer")?,
     447           14 :                 pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
     448           14 :                 walreceiver_connect_timeout: settings
     449           14 :                     .remove("walreceiver_connect_timeout")
     450           14 :                     .map(|x| x.to_string()),
     451           14 :                 lagging_wal_timeout: settings
     452           14 :                     .remove("lagging_wal_timeout")
     453           14 :                     .map(|x| x.to_string()),
     454           14 :                 max_lsn_wal_lag: settings
     455           14 :                     .remove("max_lsn_wal_lag")
     456           14 :                     .map(|x| x.parse::<NonZeroU64>())
     457           14 :                     .transpose()
     458           14 :                     .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
     459           14 :                 trace_read_requests: settings
     460           14 :                     .remove("trace_read_requests")
     461           14 :                     .map(|x| x.parse::<bool>())
     462           14 :                     .transpose()
     463           14 :                     .context("Failed to parse 'trace_read_requests' as bool")?,
     464           14 :                 eviction_policy: settings
     465           14 :                     .remove("eviction_policy")
     466           14 :                     .map(serde_json::from_str)
     467           14 :                     .transpose()
     468           14 :                     .context("Failed to parse 'eviction_policy' json")?,
     469           14 :                 min_resident_size_override: settings
     470           14 :                     .remove("min_resident_size_override")
     471           14 :                     .map(|x| x.parse::<u64>())
     472           14 :                     .transpose()
     473           14 :                     .context("Failed to parse 'min_resident_size_override' as an integer")?,
     474           14 :                 evictions_low_residence_duration_metric_threshold: settings
     475           14 :                     .remove("evictions_low_residence_duration_metric_threshold")
     476           14 :                     .map(|x| x.to_string()),
     477           14 :                 gc_feedback: settings
     478           14 :                     .remove("gc_feedback")
     479           14 :                     .map(|x| x.parse::<bool>())
     480           14 :                     .transpose()
     481           14 :                     .context("Failed to parse 'gc_feedback' as bool")?,
     482              :             }
     483              :         };
     484              : 
     485           14 :         if !settings.is_empty() {
     486            0 :             bail!("Unrecognized tenant settings: {settings:?}")
     487           14 :         }
     488           14 : 
     489           14 :         self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
     490           14 :             .json(&models::TenantConfigRequest { tenant_id, config })
     491           14 :             .send()?
     492           14 :             .error_from_body()?;
     493              : 
     494           14 :         Ok(())
     495           14 :     }
     496              : 
     497           15 :     pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
     498           15 :         let timeline_infos: Vec<TimelineInfo> = self
     499           15 :             .http_request(
     500           15 :                 Method::GET,
     501           15 :                 format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
     502           15 :             )?
     503           15 :             .send()?
     504           15 :             .error_from_body()?
     505           15 :             .json()?;
     506              : 
     507           15 :         Ok(timeline_infos)
     508           15 :     }
     509              : 
     510          891 :     pub fn timeline_create(
     511          891 :         &self,
     512          891 :         tenant_id: TenantId,
     513          891 :         new_timeline_id: Option<TimelineId>,
     514          891 :         ancestor_start_lsn: Option<Lsn>,
     515          891 :         ancestor_timeline_id: Option<TimelineId>,
     516          891 :         pg_version: Option<u32>,
     517          891 :     ) -> anyhow::Result<TimelineInfo> {
     518          891 :         // If timeline ID was not specified, generate one
     519          891 :         let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
     520          891 : 
     521          891 :         self.http_request(
     522          891 :             Method::POST,
     523          891 :             format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
     524          891 :         )?
     525          891 :         .json(&models::TimelineCreateRequest {
     526          891 :             new_timeline_id,
     527          891 :             ancestor_start_lsn,
     528          891 :             ancestor_timeline_id,
     529          891 :             pg_version,
     530          891 :         })
     531          891 :         .send()?
     532          891 :         .error_from_body()?
     533          885 :         .json::<Option<TimelineInfo>>()
     534          885 :         .with_context(|| {
     535            0 :             format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
     536          885 :         })?
     537          885 :         .with_context(|| {
     538            0 :             format!(
     539            0 :                 "No timeline id was found in the timeline creation response for tenant {tenant_id}"
     540            0 :             )
     541          885 :         })
     542          891 :     }
     543              : 
     544              :     /// Import a basebackup prepared using either:
     545              :     /// a) `pg_basebackup -F tar`, or
     546              :     /// b) The `fullbackup` pageserver endpoint
     547              :     ///
     548              :     /// # Arguments
     549              :     /// * `tenant_id` - tenant to import into. Created if not exists
     550              :     /// * `timeline_id` - id to assign to imported timeline
     551              :     /// * `base` - (start lsn of basebackup, path to `base.tar` file)
     552              :     /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
     553            5 :     pub fn timeline_import(
     554            5 :         &self,
     555            5 :         tenant_id: TenantId,
     556            5 :         timeline_id: TimelineId,
     557            5 :         base: (Lsn, PathBuf),
     558            5 :         pg_wal: Option<(Lsn, PathBuf)>,
     559            5 :         pg_version: u32,
     560            5 :     ) -> anyhow::Result<()> {
     561            5 :         let mut client = self.page_server_psql_client()?;
     562              : 
     563              :         // Init base reader
     564            5 :         let (start_lsn, base_tarfile_path) = base;
     565            5 :         let base_tarfile = File::open(base_tarfile_path)?;
     566            5 :         let mut base_reader = BufReader::new(base_tarfile);
     567              : 
     568              :         // Init wal reader if necessary
     569            5 :         let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
     570            4 :             let wal_tarfile = File::open(wal_tarfile_path)?;
     571            4 :             let wal_reader = BufReader::new(wal_tarfile);
     572            4 :             (end_lsn, Some(wal_reader))
     573              :         } else {
     574            1 :             (start_lsn, None)
     575              :         };
     576              : 
     577              :         // Import base
     578            5 :         let import_cmd = format!(
     579            5 :             "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
     580            5 :         );
     581            5 :         let mut writer = client.copy_in(&import_cmd)?;
     582            5 :         io::copy(&mut base_reader, &mut writer)?;
     583            5 :         writer.finish()?;
     584              : 
     585              :         // Import wal if necessary
     586            3 :         if let Some(mut wal_reader) = wal_reader {
     587            2 :             let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
     588            2 :             let mut writer = client.copy_in(&import_cmd)?;
     589            2 :             io::copy(&mut wal_reader, &mut writer)?;
     590            2 :             writer.finish()?;
     591            1 :         }
     592              : 
     593            3 :         Ok(())
     594            5 :     }
     595              : }
        

Generated by: LCOV version 2.1-beta