LCOV - code coverage report
Current view: top level - compute_tools/src - compute_prewarm.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 159 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 18 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, Result, bail};
       3              : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
       4              : use compute_api::responses::LfcOffloadState;
       5              : use compute_api::responses::LfcPrewarmState;
       6              : use http::StatusCode;
       7              : use reqwest::Client;
       8              : use std::mem::replace;
       9              : use std::sync::Arc;
      10              : use tokio::{io::AsyncReadExt, spawn};
      11              : use tracing::{error, info};
      12              : 
      13              : #[derive(serde::Serialize, Default)]
      14              : pub struct LfcPrewarmStateWithProgress {
      15              :     #[serde(flatten)]
      16              :     base: LfcPrewarmState,
      17              :     total: i32,
      18              :     prewarmed: i32,
      19              :     skipped: i32,
      20              : }
      21              : 
      22              : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
      23              : struct EndpointStoragePair {
      24              :     url: String,
      25              :     token: String,
      26              : }
      27              : 
      28              : const KEY: &str = "lfc_state";
      29              : impl EndpointStoragePair {
      30              :     /// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
      31              :     /// If not None, takes precedence over pspec.spec.endpoint_id
      32            0 :     fn from_spec_and_endpoint(
      33            0 :         pspec: &crate::compute::ParsedSpec,
      34            0 :         endpoint_id: Option<String>,
      35            0 :     ) -> Result<Self> {
      36            0 :         let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
      37            0 :         let Some(ref endpoint_id) = endpoint_id else {
      38            0 :             bail!("pspec.endpoint_id missing, other endpoint_id not provided")
      39              :         };
      40            0 :         let Some(ref base_uri) = pspec.endpoint_storage_addr else {
      41            0 :             bail!("pspec.endpoint_storage_addr missing")
      42              :         };
      43            0 :         let tenant_id = pspec.tenant_id;
      44            0 :         let timeline_id = pspec.timeline_id;
      45              : 
      46            0 :         let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
      47            0 :         let Some(ref token) = pspec.endpoint_storage_token else {
      48            0 :             bail!("pspec.endpoint_storage_token missing")
      49              :         };
      50            0 :         let token = token.clone();
      51            0 :         Ok(EndpointStoragePair { url, token })
      52            0 :     }
      53              : }
      54              : 
      55              : impl ComputeNode {
      56              :     // If prewarm failed, we want to get overall number of segments as well as done ones.
      57              :     // However, this function should be reliable even if querying postgres failed.
      58            0 :     pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
      59            0 :         info!("requesting LFC prewarm state from postgres");
      60            0 :         let mut state = LfcPrewarmStateWithProgress::default();
      61            0 :         {
      62            0 :             state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
      63            0 :         }
      64              : 
      65            0 :         let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
      66            0 :             Ok(client) => client,
      67            0 :             Err(err) => {
      68            0 :                 error!(%err, "connecting to postgres");
      69            0 :                 return state;
      70              :             }
      71              :         };
      72            0 :         let row = match client
      73            0 :             .query_one("select * from neon.get_prewarm_info()", &[])
      74            0 :             .await
      75              :         {
      76            0 :             Ok(row) => row,
      77            0 :             Err(err) => {
      78            0 :                 error!(%err, "querying LFC prewarm status");
      79            0 :                 return state;
      80              :             }
      81              :         };
      82            0 :         state.total = row.try_get(0).unwrap_or_default();
      83            0 :         state.prewarmed = row.try_get(1).unwrap_or_default();
      84            0 :         state.skipped = row.try_get(2).unwrap_or_default();
      85            0 :         state
      86            0 :     }
      87              : 
      88            0 :     pub fn lfc_offload_state(&self) -> LfcOffloadState {
      89            0 :         self.state.lock().unwrap().lfc_offload_state.clone()
      90            0 :     }
      91              : 
      92              :     /// If there is a prewarm request ongoing, return `false`, `true` otherwise.
      93              :     /// Has a failpoint "compute-prewarm"
      94            0 :     pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
      95              :         {
      96            0 :             let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
      97            0 :             if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
      98            0 :                 return false;
      99            0 :             }
     100              :         }
     101            0 :         crate::metrics::LFC_PREWARMS.inc();
     102              : 
     103            0 :         let cloned = self.clone();
     104            0 :         spawn(async move {
     105            0 :             let state = match cloned.prewarm_impl(from_endpoint).await {
     106            0 :                 Ok(true) => LfcPrewarmState::Completed,
     107              :                 Ok(false) => {
     108            0 :                     info!(
     109            0 :                         "skipping LFC prewarm because LFC state is not found in endpoint storage"
     110              :                     );
     111            0 :                     LfcPrewarmState::Skipped
     112              :                 }
     113            0 :                 Err(err) => {
     114            0 :                     crate::metrics::LFC_PREWARM_ERRORS.inc();
     115            0 :                     error!(%err, "could not prewarm LFC");
     116            0 :                     LfcPrewarmState::Failed {
     117            0 :                         error: format!("{err:#}"),
     118            0 :                     }
     119              :                 }
     120              :             };
     121              : 
     122            0 :             cloned.state.lock().unwrap().lfc_prewarm_state = state;
     123            0 :         });
     124            0 :         true
     125            0 :     }
     126              : 
     127              :     /// from_endpoint: None for endpoint managed by this compute_ctl
     128            0 :     fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
     129            0 :         let state = self.state.lock().unwrap();
     130            0 :         EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
     131            0 :     }
     132              : 
     133              :     /// Request LFC state from endpoint storage and load corresponding pages into Postgres.
     134              :     /// Returns a result with `false` if the LFC state is not found in endpoint storage.
     135            0 :     async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
     136            0 :         let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
     137              : 
     138              :         #[cfg(feature = "testing")]
     139            0 :         fail::fail_point!("compute-prewarm", |_| {
     140            0 :             bail!("prewarm configured to fail because of a failpoint")
     141            0 :         });
     142              : 
     143            0 :         info!(%url, "requesting LFC state from endpoint storage");
     144            0 :         let request = Client::new().get(&url).bearer_auth(token);
     145            0 :         let res = request.send().await.context("querying endpoint storage")?;
     146            0 :         match res.status() {
     147            0 :             StatusCode::OK => (),
     148              :             StatusCode::NOT_FOUND => {
     149            0 :                 return Ok(false);
     150              :             }
     151            0 :             status => bail!("{status} querying endpoint storage"),
     152              :         }
     153              : 
     154            0 :         let mut uncompressed = Vec::new();
     155            0 :         let lfc_state = res
     156            0 :             .bytes()
     157            0 :             .await
     158            0 :             .context("getting request body from endpoint storage")?;
     159            0 :         ZstdDecoder::new(lfc_state.iter().as_slice())
     160            0 :             .read_to_end(&mut uncompressed)
     161            0 :             .await
     162            0 :             .context("decoding LFC state")?;
     163            0 :         let uncompressed_len = uncompressed.len();
     164              : 
     165            0 :         info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
     166              : 
     167            0 :         ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     168            0 :             .await
     169            0 :             .context("connecting to postgres")?
     170            0 :             .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
     171            0 :             .await
     172            0 :             .context("loading LFC state into postgres")
     173            0 :             .map(|_| ())?;
     174              : 
     175            0 :         Ok(true)
     176            0 :     }
     177              : 
     178              :     /// If offload request is ongoing, return false, true otherwise
     179            0 :     pub fn offload_lfc(self: &Arc<Self>) -> bool {
     180              :         {
     181            0 :             let state = &mut self.state.lock().unwrap().lfc_offload_state;
     182            0 :             if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
     183            0 :                 return false;
     184            0 :             }
     185              :         }
     186            0 :         let cloned = self.clone();
     187            0 :         spawn(async move { cloned.offload_lfc_with_state_update().await });
     188            0 :         true
     189            0 :     }
     190              : 
     191            0 :     pub async fn offload_lfc_async(self: &Arc<Self>) {
     192              :         {
     193            0 :             let state = &mut self.state.lock().unwrap().lfc_offload_state;
     194            0 :             if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
     195            0 :                 return;
     196            0 :             }
     197              :         }
     198            0 :         self.offload_lfc_with_state_update().await
     199            0 :     }
     200              : 
     201            0 :     async fn offload_lfc_with_state_update(&self) {
     202            0 :         crate::metrics::LFC_OFFLOADS.inc();
     203              : 
     204            0 :         let Err(err) = self.offload_lfc_impl().await else {
     205            0 :             self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
     206            0 :             return;
     207              :         };
     208              : 
     209            0 :         crate::metrics::LFC_OFFLOAD_ERRORS.inc();
     210            0 :         error!(%err, "could not offload LFC state to endpoint storage");
     211            0 :         self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
     212            0 :             error: format!("{err:#}"),
     213            0 :         };
     214            0 :     }
     215              : 
     216            0 :     async fn offload_lfc_impl(&self) -> Result<()> {
     217            0 :         let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
     218            0 :         info!(%url, "requesting LFC state from Postgres");
     219              : 
     220            0 :         let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     221            0 :             .await
     222            0 :             .context("connecting to postgres")?
     223            0 :             .query_one("select neon.get_local_cache_state()", &[])
     224            0 :             .await
     225            0 :             .context("querying LFC state")?;
     226            0 :         let state = row
     227            0 :             .try_get::<usize, Option<&[u8]>>(0)
     228            0 :             .context("deserializing LFC state")?;
     229            0 :         let Some(state) = state else {
     230            0 :             info!(%url, "empty LFC state, not exporting");
     231            0 :             return Ok(());
     232              :         };
     233              : 
     234            0 :         let mut compressed = Vec::new();
     235            0 :         ZstdEncoder::new(state)
     236            0 :             .read_to_end(&mut compressed)
     237            0 :             .await
     238            0 :             .context("compressing LFC state")?;
     239              : 
     240            0 :         let compressed_len = compressed.len();
     241            0 :         info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
     242              : 
     243            0 :         let request = Client::new().put(url).bearer_auth(token).body(compressed);
     244            0 :         match request.send().await {
     245            0 :             Ok(res) if res.status() == StatusCode::OK => Ok(()),
     246            0 :             Ok(res) => bail!(
     247            0 :                 "Request to endpoint storage failed with status: {}",
     248            0 :                 res.status()
     249              :             ),
     250            0 :             Err(err) => Err(err).context("writing to endpoint storage"),
     251              :         }
     252            0 :     }
     253              : }
        

Generated by: LCOV version 2.1-beta