LCOV - differential code coverage report
Current view: top level - control_plane/src - pageserver.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 93.8 % 454 426 28 426
Current Date: 2023-10-19 02:04:12 Functions: 71.8 % 71 51 20 51
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Code to manage pageservers
       2                 : //!
       3                 : //! In the local test environment, the pageserver stores its data directly in
       4                 : //!
       5                 : //!   .neon/
       6                 : //!
       7                 : use std::borrow::Cow;
       8                 : use std::collections::HashMap;
       9                 : use std::fs::File;
      10                 : use std::io::{BufReader, Write};
      11                 : use std::num::NonZeroU64;
      12                 : use std::path::PathBuf;
      13                 : use std::process::{Child, Command};
      14                 : use std::{io, result};
      15                 : 
      16                 : use anyhow::{bail, Context};
      17                 : use camino::Utf8PathBuf;
      18                 : use pageserver_api::models::{self, TenantInfo, TimelineInfo};
      19                 : use postgres_backend::AuthType;
      20                 : use postgres_connection::{parse_host_port, PgConnectionConfig};
      21                 : use reqwest::blocking::{Client, RequestBuilder, Response};
      22                 : use reqwest::{IntoUrl, Method};
      23                 : use thiserror::Error;
      24                 : use utils::auth::{Claims, Scope};
      25                 : use utils::{
      26                 :     http::error::HttpErrorBody,
      27                 :     id::{TenantId, TimelineId},
      28                 :     lsn::Lsn,
      29                 : };
      30                 : 
      31                 : use crate::local_env::PageServerConf;
      32                 : use crate::{background_process, local_env::LocalEnv};
      33                 : 
      34 CBC          14 : #[derive(Error, Debug)]
      35                 : pub enum PageserverHttpError {
      36                 :     #[error("Reqwest error: {0}")]
      37                 :     Transport(#[from] reqwest::Error),
      38                 : 
      39                 :     #[error("Error: {0}")]
      40                 :     Response(String),
      41                 : }
      42                 : 
      43                 : impl From<anyhow::Error> for PageserverHttpError {
      44 UBC           0 :     fn from(e: anyhow::Error) -> Self {
      45               0 :         Self::Response(e.to_string())
      46               0 :     }
      47                 : }
      48                 : 
      49                 : type Result<T> = result::Result<T, PageserverHttpError>;
      50                 : 
      51                 : pub trait ResponseErrorMessageExt: Sized {
      52                 :     fn error_from_body(self) -> Result<Self>;
      53                 : }
      54                 : 
      55                 : impl ResponseErrorMessageExt for Response {
      56 CBC        1836 :     fn error_from_body(self) -> Result<Self> {
      57            1836 :         let status = self.status();
      58            1836 :         if !(status.is_client_error() || status.is_server_error()) {
      59            1829 :             return Ok(self);
      60               7 :         }
      61               7 : 
      62               7 :         // reqwest does not export its error construction utility functions, so let's craft the message ourselves
      63               7 :         let url = self.url().to_owned();
      64               7 :         Err(PageserverHttpError::Response(
      65               7 :             match self.json::<HttpErrorBody>() {
      66               7 :                 Ok(err_body) => format!("Error: {}", err_body.msg),
      67 UBC           0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
      68                 :             },
      69                 :         ))
      70 CBC        1836 :     }
      71                 : }
      72                 : 
      73                 : //
      74                 : // Control routines for pageserver.
      75                 : //
      76                 : // Used in CLI and tests.
      77                 : //
      78 UBC           0 : #[derive(Debug)]
      79                 : pub struct PageServerNode {
      80                 :     pub pg_connection_config: PgConnectionConfig,
      81                 :     pub conf: PageServerConf,
      82                 :     pub env: LocalEnv,
      83                 :     pub http_client: Client,
      84                 :     pub http_base_url: String,
      85                 : }
      86                 : 
      87                 : impl PageServerNode {
      88 CBC        8648 :     pub fn from_env(env: &LocalEnv, conf: &PageServerConf) -> PageServerNode {
      89            8648 :         let (host, port) =
      90            8648 :             parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
      91            8648 :         let port = port.unwrap_or(5432);
      92            8648 :         Self {
      93            8648 :             pg_connection_config: PgConnectionConfig::new_host_port(host, port),
      94            8648 :             conf: conf.clone(),
      95            8648 :             env: env.clone(),
      96            8648 :             http_client: Client::new(),
      97            8648 :             http_base_url: format!("http://{}/v1", conf.listen_http_addr),
      98            8648 :         }
      99            8648 :     }
     100                 : 
     101                 :     // pageserver conf overrides defined by neon_local configuration.
     102            1469 :     fn neon_local_overrides(&self) -> Vec<String> {
     103            1469 :         let id = format!("id={}", self.conf.id);
     104            1469 :         // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
     105            1469 :         let pg_distrib_dir_param = format!(
     106            1469 :             "pg_distrib_dir='{}'",
     107            1469 :             self.env.pg_distrib_dir_raw().display()
     108            1469 :         );
     109            1469 : 
     110            1469 :         let http_auth_type_param = format!("http_auth_type='{}'", self.conf.http_auth_type);
     111            1469 :         let listen_http_addr_param = format!("listen_http_addr='{}'", self.conf.listen_http_addr);
     112            1469 : 
     113            1469 :         let pg_auth_type_param = format!("pg_auth_type='{}'", self.conf.pg_auth_type);
     114            1469 :         let listen_pg_addr_param = format!("listen_pg_addr='{}'", self.conf.listen_pg_addr);
     115            1469 : 
     116            1469 :         let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
     117            1469 : 
     118            1469 :         let mut overrides = vec![
     119            1469 :             id,
     120            1469 :             pg_distrib_dir_param,
     121            1469 :             http_auth_type_param,
     122            1469 :             pg_auth_type_param,
     123            1469 :             listen_http_addr_param,
     124            1469 :             listen_pg_addr_param,
     125            1469 :             broker_endpoint_param,
     126            1469 :         ];
     127                 : 
     128            1469 :         if let Some(control_plane_api) = &self.env.control_plane_api {
     129              76 :             overrides.push(format!(
     130              76 :                 "control_plane_api='{}'",
     131              76 :                 control_plane_api.as_str()
     132              76 :             ));
     133            1393 :         }
     134                 : 
     135            1469 :         if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust
     136              27 :         {
     137              27 :             // Keys are generated in the toplevel repo dir, pageservers' workdirs
     138              27 :             // are one level below that, so refer to keys with ../
     139              27 :             overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
     140            1442 :         }
     141            1469 :         overrides
     142            1469 :     }
     143                 : 
     144                 :     /// Initializes a pageserver node by creating its config with the overrides provided.
     145             349 :     pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
     146             349 :         // First, run `pageserver --init` and wait for it to write a config into FS and exit.
     147             349 :         self.pageserver_init(config_overrides)
     148             349 :             .with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id))
     149             349 :     }
     150                 : 
     151            2032 :     pub fn repo_path(&self) -> PathBuf {
     152            2032 :         self.env.pageserver_data_dir(self.conf.id)
     153            2032 :     }
     154                 : 
     155                 :     /// The pid file is created by the pageserver process, with its pid stored inside.
     156                 :     /// Other pageservers cannot lock the same file and overwrite it for as long as the current
     157                 :     /// pageserver runs. (Unless someone removes the file manually; never do that!)
     158            1123 :     fn pid_file(&self) -> Utf8PathBuf {
     159            1123 :         Utf8PathBuf::from_path_buf(self.repo_path().join("pageserver.pid"))
     160            1123 :             .expect("non-Unicode path")
     161            1123 :     }
     162                 : 
     163             560 :     pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
     164             560 :         self.start_node(config_overrides, false)
     165             560 :     }
     166                 : 
     167             349 :     fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
     168             349 :         let datadir = self.repo_path();
     169             349 :         let node_id = self.conf.id;
     170             349 :         println!(
     171             349 :             "Initializing pageserver node {} at '{}' in {:?}",
     172             349 :             node_id,
     173             349 :             self.pg_connection_config.raw_address(),
     174             349 :             datadir
     175             349 :         );
     176             349 :         io::stdout().flush()?;
     177                 : 
     178             349 :         if !datadir.exists() {
     179             349 :             std::fs::create_dir(&datadir)?;
     180 UBC           0 :         }
     181                 : 
     182 CBC         349 :         let datadir_path_str = datadir.to_str().with_context(|| {
     183 UBC           0 :             format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
     184 CBC         349 :         })?;
     185             349 :         let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
     186             349 :         args.push(Cow::Borrowed("--init"));
     187                 : 
     188             349 :         let init_output = Command::new(self.env.pageserver_bin())
     189             349 :             .args(args.iter().map(Cow::as_ref))
     190             349 :             .envs(self.pageserver_env_variables()?)
     191             349 :             .output()
     192             349 :             .with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
     193                 : 
     194             349 :         anyhow::ensure!(
     195             349 :             init_output.status.success(),
     196 UBC           0 :             "Pageserver init for node {} did not finish successfully, stdout: {}, stderr: {}",
     197               0 :             node_id,
     198               0 :             String::from_utf8_lossy(&init_output.stdout),
     199               0 :             String::from_utf8_lossy(&init_output.stderr),
     200                 :         );
     201                 : 
     202 CBC         349 :         Ok(())
     203             349 :     }
     204                 : 
     205             560 :     fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
     206             560 :         let mut overrides = self.neon_local_overrides();
     207             660 :         overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
     208             560 : 
     209             560 :         let datadir = self.repo_path();
     210             560 :         print!(
     211             560 :             "Starting pageserver node {} at '{}' in {:?}",
     212             560 :             self.conf.id,
     213             560 :             self.pg_connection_config.raw_address(),
     214             560 :             datadir
     215             560 :         );
     216             560 :         io::stdout().flush()?;
     217                 : 
     218             560 :         let datadir_path_str = datadir.to_str().with_context(|| {
     219 UBC           0 :             format!(
     220               0 :                 "Cannot start pageserver node {} in path that has no string representation: {:?}",
     221               0 :                 self.conf.id, datadir,
     222               0 :             )
     223 CBC         560 :         })?;
     224             560 :         let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
     225             560 :         if update_config {
     226 UBC           0 :             args.push(Cow::Borrowed("--update-config"));
     227 CBC         560 :         }
     228                 : 
     229                 :         background_process::start_process(
     230             560 :             "pageserver",
     231             560 :             &datadir,
     232             560 :             &self.env.pageserver_bin(),
     233             560 :             args.iter().map(Cow::as_ref),
     234             560 :             self.pageserver_env_variables()?,
     235             560 :             background_process::InitialPidFile::Expect(&self.pid_file()),
     236            1134 :             || match self.check_status() {
     237             560 :                 Ok(()) => Ok(true),
     238             574 :                 Err(PageserverHttpError::Transport(_)) => Ok(false),
     239 UBC           0 :                 Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
     240 CBC        1134 :             },
     241                 :         )
     242             560 :     }
     243                 : 
     244             909 :     fn pageserver_basic_args<'a>(
     245             909 :         &self,
     246             909 :         config_overrides: &'a [&'a str],
     247             909 :         datadir_path_str: &'a str,
     248             909 :     ) -> Vec<Cow<'a, str>> {
     249             909 :         let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
     250             909 : 
     251             909 :         let mut overrides = self.neon_local_overrides();
     252            1068 :         overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
     253            8402 :         for config_override in overrides {
     254            7493 :             args.push(Cow::Borrowed("-c"));
     255            7493 :             args.push(Cow::Owned(config_override));
     256            7493 :         }
     257                 : 
     258             909 :         args
     259             909 :     }
     260                 : 
     261             909 :     fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
     262             909 :         // FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
     263             909 :         // needs a token, and how to generate that token, seems independent to whether
     264             909 :         // the pageserver requires a token in incoming requests.
     265             909 :         Ok(if self.conf.http_auth_type != AuthType::Trust {
     266                 :             // Generate a token to connect from the pageserver to a safekeeper
     267              18 :             let token = self
     268              18 :                 .env
     269              18 :                 .generate_auth_token(&Claims::new(None, Scope::SafekeeperData))?;
     270              18 :             vec![("NEON_AUTH_TOKEN".to_owned(), token)]
     271                 :         } else {
     272             891 :             Vec::new()
     273                 :         })
     274             909 :     }
     275                 : 
     276                 :     ///
     277                 :     /// Stop the server.
     278                 :     ///
     279                 :     /// If 'immediate' is true, we use SIGQUIT, killing the process immediately.
     280                 :     /// Otherwise we use SIGTERM, triggering a clean shutdown
     281                 :     ///
     282                 :     /// If the server is not running, returns success
     283                 :     ///
     284             563 :     pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
     285             563 :         background_process::stop_process(immediate, "pageserver", &self.pid_file())
     286             563 :     }
     287                 : 
     288               5 :     pub fn page_server_psql_client(&self) -> anyhow::Result<postgres::Client> {
     289               5 :         let mut config = self.pg_connection_config.clone();
     290               5 :         if self.conf.pg_auth_type == AuthType::NeonJWT {
     291 UBC           0 :             let token = self
     292               0 :                 .env
     293               0 :                 .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
     294               0 :             config = config.set_password(Some(token));
     295 CBC           5 :         }
     296               5 :         Ok(config.connect_no_tls()?)
     297               5 :     }
     298                 : 
     299            2410 :     fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> anyhow::Result<RequestBuilder> {
     300            2410 :         let mut builder = self.http_client.request(method, url);
     301            2410 :         if self.conf.http_auth_type == AuthType::NeonJWT {
     302              48 :             let token = self
     303              48 :                 .env
     304              48 :                 .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
     305              48 :             builder = builder.bearer_auth(token)
     306            2362 :         }
     307            2410 :         Ok(builder)
     308            2410 :     }
     309                 : 
     310            1134 :     pub fn check_status(&self) -> Result<()> {
     311            1134 :         self.http_request(Method::GET, format!("{}/status", self.http_base_url))?
     312            1134 :             .send()?
     313             560 :             .error_from_body()?;
     314             560 :         Ok(())
     315            1134 :     }
     316                 : 
     317               6 :     pub fn tenant_list(&self) -> Result<Vec<TenantInfo>> {
     318               6 :         Ok(self
     319               6 :             .http_request(Method::GET, format!("{}/tenant", self.http_base_url))?
     320               6 :             .send()?
     321               6 :             .error_from_body()?
     322               6 :             .json()?)
     323               6 :     }
     324                 : 
     325             437 :     pub fn tenant_create(
     326             437 :         &self,
     327             437 :         new_tenant_id: TenantId,
     328             437 :         generation: Option<u32>,
     329             437 :         settings: HashMap<&str, &str>,
     330             437 :     ) -> anyhow::Result<TenantId> {
     331             437 :         let mut settings = settings.clone();
     332                 : 
     333             437 :         let config = models::TenantConfig {
     334             437 :             checkpoint_distance: settings
     335             437 :                 .remove("checkpoint_distance")
     336             437 :                 .map(|x| x.parse::<u64>())
     337             437 :                 .transpose()?,
     338             437 :             checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
     339             437 :             compaction_target_size: settings
     340             437 :                 .remove("compaction_target_size")
     341             437 :                 .map(|x| x.parse::<u64>())
     342             437 :                 .transpose()?,
     343             437 :             compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
     344             437 :             compaction_threshold: settings
     345             437 :                 .remove("compaction_threshold")
     346             437 :                 .map(|x| x.parse::<usize>())
     347             437 :                 .transpose()?,
     348             437 :             gc_horizon: settings
     349             437 :                 .remove("gc_horizon")
     350             437 :                 .map(|x| x.parse::<u64>())
     351             437 :                 .transpose()?,
     352             437 :             gc_period: settings.remove("gc_period").map(|x| x.to_string()),
     353             437 :             image_creation_threshold: settings
     354             437 :                 .remove("image_creation_threshold")
     355             437 :                 .map(|x| x.parse::<usize>())
     356             437 :                 .transpose()?,
     357             437 :             pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
     358             437 :             walreceiver_connect_timeout: settings
     359             437 :                 .remove("walreceiver_connect_timeout")
     360             437 :                 .map(|x| x.to_string()),
     361             437 :             lagging_wal_timeout: settings
     362             437 :                 .remove("lagging_wal_timeout")
     363             437 :                 .map(|x| x.to_string()),
     364             437 :             max_lsn_wal_lag: settings
     365             437 :                 .remove("max_lsn_wal_lag")
     366             437 :                 .map(|x| x.parse::<NonZeroU64>())
     367             437 :                 .transpose()
     368             437 :                 .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
     369             437 :             trace_read_requests: settings
     370             437 :                 .remove("trace_read_requests")
     371             437 :                 .map(|x| x.parse::<bool>())
     372             437 :                 .transpose()
     373             437 :                 .context("Failed to parse 'trace_read_requests' as bool")?,
     374             437 :             eviction_policy: settings
     375             437 :                 .remove("eviction_policy")
     376             437 :                 .map(serde_json::from_str)
     377             437 :                 .transpose()
     378             437 :                 .context("Failed to parse 'eviction_policy' json")?,
     379             437 :             min_resident_size_override: settings
     380             437 :                 .remove("min_resident_size_override")
     381             437 :                 .map(|x| x.parse::<u64>())
     382             437 :                 .transpose()
     383             437 :                 .context("Failed to parse 'min_resident_size_override' as integer")?,
     384             437 :             evictions_low_residence_duration_metric_threshold: settings
     385             437 :                 .remove("evictions_low_residence_duration_metric_threshold")
     386             437 :                 .map(|x| x.to_string()),
     387             437 :             gc_feedback: settings
     388             437 :                 .remove("gc_feedback")
     389             437 :                 .map(|x| x.parse::<bool>())
     390             437 :                 .transpose()
     391             437 :                 .context("Failed to parse 'gc_feedback' as bool")?,
     392                 :         };
     393                 : 
     394             437 :         let request = models::TenantCreateRequest {
     395             437 :             new_tenant_id,
     396             437 :             generation,
     397             437 :             config,
     398             437 :         };
     399             437 :         if !settings.is_empty() {
     400               1 :             bail!("Unrecognized tenant settings: {settings:?}")
     401             436 :         }
     402             436 :         self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
     403             436 :             .json(&request)
     404             436 :             .send()?
     405             436 :             .error_from_body()?
     406             435 :             .json::<Option<String>>()
     407             435 :             .with_context(|| {
     408 UBC           0 :                 format!("Failed to parse tenant creation response for tenant id: {new_tenant_id:?}")
     409 CBC         435 :             })?
     410             435 :             .context("No tenant id was found in the tenant creation response")
     411             435 :             .and_then(|tenant_id_string| {
     412             435 :                 tenant_id_string.parse().with_context(|| {
     413 UBC           0 :                     format!("Failed to parse response string as tenant id: '{tenant_id_string}'")
     414 CBC         435 :                 })
     415             435 :             })
     416             437 :     }
     417                 : 
     418              14 :     pub fn tenant_config(
     419              14 :         &self,
     420              14 :         tenant_id: TenantId,
     421              14 :         mut settings: HashMap<&str, &str>,
     422              14 :     ) -> anyhow::Result<()> {
     423              14 :         let config = {
     424                 :             // Braces to make the diff easier to read
     425                 :             models::TenantConfig {
     426              14 :                 checkpoint_distance: settings
     427              14 :                     .remove("checkpoint_distance")
     428              14 :                     .map(|x| x.parse::<u64>())
     429              14 :                     .transpose()
     430              14 :                     .context("Failed to parse 'checkpoint_distance' as an integer")?,
     431              14 :                 checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
     432              14 :                 compaction_target_size: settings
     433              14 :                     .remove("compaction_target_size")
     434              14 :                     .map(|x| x.parse::<u64>())
     435              14 :                     .transpose()
     436              14 :                     .context("Failed to parse 'compaction_target_size' as an integer")?,
     437              14 :                 compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
     438              14 :                 compaction_threshold: settings
     439              14 :                     .remove("compaction_threshold")
     440              14 :                     .map(|x| x.parse::<usize>())
     441              14 :                     .transpose()
     442              14 :                     .context("Failed to parse 'compaction_threshold' as an integer")?,
     443              14 :                 gc_horizon: settings
     444              14 :                     .remove("gc_horizon")
     445              14 :                     .map(|x| x.parse::<u64>())
     446              14 :                     .transpose()
     447              14 :                     .context("Failed to parse 'gc_horizon' as an integer")?,
     448              14 :                 gc_period: settings.remove("gc_period").map(|x| x.to_string()),
     449              14 :                 image_creation_threshold: settings
     450              14 :                     .remove("image_creation_threshold")
     451              14 :                     .map(|x| x.parse::<usize>())
     452              14 :                     .transpose()
     453              14 :                     .context("Failed to parse 'image_creation_threshold' as non zero integer")?,
     454              14 :                 pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
     455              14 :                 walreceiver_connect_timeout: settings
     456              14 :                     .remove("walreceiver_connect_timeout")
     457              14 :                     .map(|x| x.to_string()),
     458              14 :                 lagging_wal_timeout: settings
     459              14 :                     .remove("lagging_wal_timeout")
     460              14 :                     .map(|x| x.to_string()),
     461              14 :                 max_lsn_wal_lag: settings
     462              14 :                     .remove("max_lsn_wal_lag")
     463              14 :                     .map(|x| x.parse::<NonZeroU64>())
     464              14 :                     .transpose()
     465              14 :                     .context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
     466              14 :                 trace_read_requests: settings
     467              14 :                     .remove("trace_read_requests")
     468              14 :                     .map(|x| x.parse::<bool>())
     469              14 :                     .transpose()
     470              14 :                     .context("Failed to parse 'trace_read_requests' as bool")?,
     471              14 :                 eviction_policy: settings
     472              14 :                     .remove("eviction_policy")
     473              14 :                     .map(serde_json::from_str)
     474              14 :                     .transpose()
     475              14 :                     .context("Failed to parse 'eviction_policy' json")?,
     476              14 :                 min_resident_size_override: settings
     477              14 :                     .remove("min_resident_size_override")
     478              14 :                     .map(|x| x.parse::<u64>())
     479              14 :                     .transpose()
     480              14 :                     .context("Failed to parse 'min_resident_size_override' as an integer")?,
     481              14 :                 evictions_low_residence_duration_metric_threshold: settings
     482              14 :                     .remove("evictions_low_residence_duration_metric_threshold")
     483              14 :                     .map(|x| x.to_string()),
     484              14 :                 gc_feedback: settings
     485              14 :                     .remove("gc_feedback")
     486              14 :                     .map(|x| x.parse::<bool>())
     487              14 :                     .transpose()
     488              14 :                     .context("Failed to parse 'gc_feedback' as bool")?,
     489                 :             }
     490                 :         };
     491                 : 
     492              14 :         if !settings.is_empty() {
     493 UBC           0 :             bail!("Unrecognized tenant settings: {settings:?}")
     494 CBC          14 :         }
     495              14 : 
     496              14 :         self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
     497              14 :             .json(&models::TenantConfigRequest { tenant_id, config })
     498              14 :             .send()?
     499              14 :             .error_from_body()?;
     500                 : 
     501              14 :         Ok(())
     502              14 :     }
     503                 : 
     504              15 :     pub fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result<Vec<TimelineInfo>> {
     505              15 :         let timeline_infos: Vec<TimelineInfo> = self
     506              15 :             .http_request(
     507              15 :                 Method::GET,
     508              15 :                 format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
     509              15 :             )?
     510              15 :             .send()?
     511              15 :             .error_from_body()?
     512              15 :             .json()?;
     513                 : 
     514              15 :         Ok(timeline_infos)
     515              15 :     }
     516                 : 
     517             805 :     pub fn timeline_create(
     518             805 :         &self,
     519             805 :         tenant_id: TenantId,
     520             805 :         new_timeline_id: Option<TimelineId>,
     521             805 :         ancestor_start_lsn: Option<Lsn>,
     522             805 :         ancestor_timeline_id: Option<TimelineId>,
     523             805 :         pg_version: Option<u32>,
     524             805 :     ) -> anyhow::Result<TimelineInfo> {
     525             805 :         // If timeline ID was not specified, generate one
     526             805 :         let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate());
     527             805 : 
     528             805 :         self.http_request(
     529             805 :             Method::POST,
     530             805 :             format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
     531             805 :         )?
     532             805 :         .json(&models::TimelineCreateRequest {
     533             805 :             new_timeline_id,
     534             805 :             ancestor_start_lsn,
     535             805 :             ancestor_timeline_id,
     536             805 :             pg_version,
     537             805 :         })
     538             805 :         .send()?
     539             805 :         .error_from_body()?
     540             799 :         .json::<Option<TimelineInfo>>()
     541             799 :         .with_context(|| {
     542 UBC           0 :             format!("Failed to parse timeline creation response for tenant id: {tenant_id}")
     543 CBC         799 :         })?
     544             799 :         .with_context(|| {
     545 UBC           0 :             format!(
     546               0 :                 "No timeline id was found in the timeline creation response for tenant {tenant_id}"
     547               0 :             )
     548 CBC         799 :         })
     549             805 :     }
     550                 : 
     551                 :     /// Import a basebackup prepared using either:
     552                 :     /// a) `pg_basebackup -F tar`, or
     553                 :     /// b) The `fullbackup` pageserver endpoint
     554                 :     ///
     555                 :     /// # Arguments
     556                 :     /// * `tenant_id` - tenant to import into. Created if not exists
     557                 :     /// * `timeline_id` - id to assign to imported timeline
     558                 :     /// * `base` - (start lsn of basebackup, path to `base.tar` file)
     559                 :     /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
     560               5 :     pub fn timeline_import(
     561               5 :         &self,
     562               5 :         tenant_id: TenantId,
     563               5 :         timeline_id: TimelineId,
     564               5 :         base: (Lsn, PathBuf),
     565               5 :         pg_wal: Option<(Lsn, PathBuf)>,
     566               5 :         pg_version: u32,
     567               5 :     ) -> anyhow::Result<()> {
     568               5 :         let mut client = self.page_server_psql_client()?;
     569                 : 
     570                 :         // Init base reader
     571               5 :         let (start_lsn, base_tarfile_path) = base;
     572               5 :         let base_tarfile = File::open(base_tarfile_path)?;
     573               5 :         let mut base_reader = BufReader::new(base_tarfile);
     574                 : 
     575                 :         // Init wal reader if necessary
     576               5 :         let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
     577               4 :             let wal_tarfile = File::open(wal_tarfile_path)?;
     578               4 :             let wal_reader = BufReader::new(wal_tarfile);
     579               4 :             (end_lsn, Some(wal_reader))
     580                 :         } else {
     581               1 :             (start_lsn, None)
     582                 :         };
     583                 : 
     584                 :         // Import base
     585               5 :         let import_cmd = format!(
     586               5 :             "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
     587               5 :         );
     588               5 :         let mut writer = client.copy_in(&import_cmd)?;
     589               5 :         io::copy(&mut base_reader, &mut writer)?;
     590               5 :         writer.finish()?;
     591                 : 
     592                 :         // Import wal if necessary
     593               3 :         if let Some(mut wal_reader) = wal_reader {
     594               2 :             let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
     595               2 :             let mut writer = client.copy_in(&import_cmd)?;
     596               2 :             io::copy(&mut wal_reader, &mut writer)?;
     597               2 :             writer.finish()?;
     598               1 :         }
     599                 : 
     600               3 :         Ok(())
     601               5 :     }
     602                 : }
        

Generated by: LCOV version 2.1-beta