LCOV - code coverage report
Current view: top level - compute_tools/src - compute_prewarm.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 0.0 % 192 0
Test Date: 2025-07-31 15:59:03 Functions: 0.0 % 19 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, Result, bail};
       3              : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
       4              : use compute_api::responses::LfcOffloadState;
       5              : use compute_api::responses::LfcPrewarmState;
       6              : use http::StatusCode;
       7              : use reqwest::Client;
       8              : use std::mem::replace;
       9              : use std::sync::Arc;
      10              : use std::time::Instant;
      11              : use tokio::{io::AsyncReadExt, select, spawn};
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{error, info};
      14              : 
      15              : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
      16              : struct EndpointStoragePair {
      17              :     url: String,
      18              :     token: String,
      19              : }
      20              : 
      21              : const KEY: &str = "lfc_state";
      22              : impl EndpointStoragePair {
      23              :     /// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
      24              :     /// If not None, takes precedence over pspec.spec.endpoint_id
      25            0 :     fn from_spec_and_endpoint(
      26            0 :         pspec: &crate::compute::ParsedSpec,
      27            0 :         endpoint_id: Option<String>,
      28            0 :     ) -> Result<Self> {
      29            0 :         let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
      30            0 :         let Some(ref endpoint_id) = endpoint_id else {
      31            0 :             bail!("pspec.endpoint_id missing, other endpoint_id not provided")
      32              :         };
      33            0 :         let Some(ref base_uri) = pspec.endpoint_storage_addr else {
      34            0 :             bail!("pspec.endpoint_storage_addr missing")
      35              :         };
      36            0 :         let tenant_id = pspec.tenant_id;
      37            0 :         let timeline_id = pspec.timeline_id;
      38              : 
      39            0 :         let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
      40            0 :         let Some(ref token) = pspec.endpoint_storage_token else {
      41            0 :             bail!("pspec.endpoint_storage_token missing")
      42              :         };
      43            0 :         let token = token.clone();
      44            0 :         Ok(EndpointStoragePair { url, token })
      45            0 :     }
      46              : }
      47              : 
      48              : impl ComputeNode {
      49            0 :     pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
      50            0 :         self.state.lock().unwrap().lfc_prewarm_state.clone()
      51            0 :     }
      52              : 
      53            0 :     pub fn lfc_offload_state(&self) -> LfcOffloadState {
      54            0 :         self.state.lock().unwrap().lfc_offload_state.clone()
      55            0 :     }
      56              : 
      57              :     /// If there is a prewarm request ongoing, return `false`, `true` otherwise.
      58              :     /// Has a failpoint "compute-prewarm"
      59            0 :     pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
      60              :         let token: CancellationToken;
      61              :         {
      62            0 :             let state = &mut self.state.lock().unwrap();
      63            0 :             token = state.lfc_prewarm_token.clone();
      64              :             if let LfcPrewarmState::Prewarming =
      65            0 :                 replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
      66              :             {
      67            0 :                 return false;
      68            0 :             }
      69              :         }
      70            0 :         crate::metrics::LFC_PREWARMS.inc();
      71              : 
      72            0 :         let this = self.clone();
      73            0 :         spawn(async move {
      74            0 :             let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
      75            0 :                 Ok(state) => state,
      76            0 :                 Err(err) => {
      77            0 :                     crate::metrics::LFC_PREWARM_ERRORS.inc();
      78            0 :                     error!(%err, "could not prewarm LFC");
      79            0 :                     let error = format!("{err:#}");
      80            0 :                     LfcPrewarmState::Failed { error }
      81              :                 }
      82              :             };
      83              : 
      84            0 :             let state = &mut this.state.lock().unwrap();
      85            0 :             if let LfcPrewarmState::Cancelled = prewarm_state {
      86            0 :                 state.lfc_prewarm_token = CancellationToken::new();
      87            0 :             }
      88            0 :             state.lfc_prewarm_state = prewarm_state;
      89            0 :         });
      90            0 :         true
      91            0 :     }
      92              : 
      93              :     /// from_endpoint: None for endpoint managed by this compute_ctl
      94            0 :     fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
      95            0 :         let state = self.state.lock().unwrap();
      96            0 :         EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
      97            0 :     }
      98              : 
      99              :     /// Request LFC state from endpoint storage and load corresponding pages into Postgres.
     100            0 :     async fn prewarm_impl(
     101            0 :         &self,
     102            0 :         from_endpoint: Option<String>,
     103            0 :         token: CancellationToken,
     104            0 :     ) -> Result<LfcPrewarmState> {
     105              :         let EndpointStoragePair {
     106            0 :             url,
     107            0 :             token: storage_token,
     108            0 :         } = self.endpoint_storage_pair(from_endpoint)?;
     109              : 
     110              :         #[cfg(feature = "testing")]
     111            0 :         fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
     112              : 
     113            0 :         info!(%url, "requesting LFC state from endpoint storage");
     114            0 :         let mut now = Instant::now();
     115            0 :         let request = Client::new().get(&url).bearer_auth(storage_token);
     116            0 :         let response = select! {
     117            0 :             _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
     118            0 :             response = request.send() => response
     119              :         }
     120            0 :         .context("querying endpoint storage")?;
     121              : 
     122            0 :         match response.status() {
     123            0 :             StatusCode::OK => (),
     124            0 :             StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
     125            0 :             status => bail!("{status} querying endpoint storage"),
     126              :         }
     127            0 :         let state_download_time_ms = now.elapsed().as_millis() as u32;
     128            0 :         now = Instant::now();
     129              : 
     130            0 :         let mut uncompressed = Vec::new();
     131            0 :         let lfc_state = select! {
     132            0 :             _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
     133            0 :             lfc_state = response.bytes() => lfc_state
     134              :         }
     135            0 :         .context("getting request body from endpoint storage")?;
     136              : 
     137            0 :         let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
     138            0 :         select! {
     139            0 :             _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
     140            0 :             read = decoder.read_to_end(&mut uncompressed) => read
     141              :         }
     142            0 :         .context("decoding LFC state")?;
     143            0 :         let uncompress_time_ms = now.elapsed().as_millis() as u32;
     144            0 :         now = Instant::now();
     145              : 
     146            0 :         let uncompressed_len = uncompressed.len();
     147            0 :         info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
     148              : 
     149              :         // Client connection and prewarm info querying are fast and therefore don't need
     150              :         // cancellation
     151            0 :         let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     152            0 :             .await
     153            0 :             .context("connecting to postgres")?;
     154            0 :         let pg_token = client.cancel_token();
     155              : 
     156            0 :         let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
     157            0 :         select! {
     158            0 :             res = client.query_one("select neon.prewarm_local_cache($1)", &params) => res,
     159            0 :             _ = token.cancelled() => {
     160            0 :                 pg_token.cancel_query(postgres::NoTls).await
     161            0 :                     .context("cancelling neon.prewarm_local_cache()")?;
     162            0 :                 return Ok(LfcPrewarmState::Cancelled)
     163              :             }
     164              :         }
     165            0 :         .context("loading LFC state into postgres")
     166            0 :         .map(|_| ())?;
     167            0 :         let prewarm_time_ms = now.elapsed().as_millis() as u32;
     168              : 
     169            0 :         let row = client
     170            0 :             .query_one("select * from neon.get_prewarm_info()", &[])
     171            0 :             .await
     172            0 :             .context("querying prewarm info")?;
     173            0 :         let total = row.try_get(0).unwrap_or_default();
     174            0 :         let prewarmed = row.try_get(1).unwrap_or_default();
     175            0 :         let skipped = row.try_get(2).unwrap_or_default();
     176              : 
     177            0 :         Ok(LfcPrewarmState::Completed {
     178            0 :             total,
     179            0 :             prewarmed,
     180            0 :             skipped,
     181            0 :             state_download_time_ms,
     182            0 :             uncompress_time_ms,
     183            0 :             prewarm_time_ms,
     184            0 :         })
     185            0 :     }
     186              : 
     187              :     /// If offload request is ongoing, return false, true otherwise
     188            0 :     pub fn offload_lfc(self: &Arc<Self>) -> bool {
     189              :         {
     190            0 :             let state = &mut self.state.lock().unwrap().lfc_offload_state;
     191            0 :             if matches!(
     192            0 :                 replace(state, LfcOffloadState::Offloading),
     193              :                 LfcOffloadState::Offloading
     194              :             ) {
     195            0 :                 return false;
     196            0 :             }
     197              :         }
     198            0 :         let cloned = self.clone();
     199            0 :         spawn(async move { cloned.offload_lfc_with_state_update().await });
     200            0 :         true
     201            0 :     }
     202              : 
     203            0 :     pub async fn offload_lfc_async(self: &Arc<Self>) {
     204              :         {
     205            0 :             let state = &mut self.state.lock().unwrap().lfc_offload_state;
     206            0 :             if matches!(
     207            0 :                 replace(state, LfcOffloadState::Offloading),
     208              :                 LfcOffloadState::Offloading
     209              :             ) {
     210            0 :                 return;
     211            0 :             }
     212              :         }
     213            0 :         self.offload_lfc_with_state_update().await
     214            0 :     }
     215              : 
     216            0 :     async fn offload_lfc_with_state_update(&self) {
     217            0 :         crate::metrics::LFC_OFFLOADS.inc();
     218            0 :         let state = match self.offload_lfc_impl().await {
     219            0 :             Ok(state) => state,
     220            0 :             Err(err) => {
     221            0 :                 crate::metrics::LFC_OFFLOAD_ERRORS.inc();
     222            0 :                 error!(%err, "could not offload LFC");
     223            0 :                 let error = format!("{err:#}");
     224            0 :                 LfcOffloadState::Failed { error }
     225              :             }
     226              :         };
     227            0 :         self.state.lock().unwrap().lfc_offload_state = state;
     228            0 :     }
     229              : 
     230            0 :     async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
     231            0 :         let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
     232            0 :         info!(%url, "requesting LFC state from Postgres");
     233              : 
     234            0 :         let mut now = Instant::now();
     235            0 :         let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     236            0 :             .await
     237            0 :             .context("connecting to postgres")?
     238            0 :             .query_one("select neon.get_local_cache_state()", &[])
     239            0 :             .await
     240            0 :             .context("querying LFC state")?;
     241            0 :         let state = row
     242            0 :             .try_get::<usize, Option<&[u8]>>(0)
     243            0 :             .context("deserializing LFC state")?;
     244            0 :         let Some(state) = state else {
     245            0 :             info!(%url, "empty LFC state, not exporting");
     246            0 :             return Ok(LfcOffloadState::Skipped);
     247              :         };
     248            0 :         let state_query_time_ms = now.elapsed().as_millis() as u32;
     249            0 :         now = Instant::now();
     250              : 
     251            0 :         let mut compressed = Vec::new();
     252            0 :         ZstdEncoder::new(state)
     253            0 :             .read_to_end(&mut compressed)
     254            0 :             .await
     255            0 :             .context("compressing LFC state")?;
     256            0 :         let compress_time_ms = now.elapsed().as_millis() as u32;
     257            0 :         now = Instant::now();
     258              : 
     259            0 :         let compressed_len = compressed.len();
     260            0 :         info!(%url, "downloaded LFC state, compressed size {compressed_len}");
     261              : 
     262            0 :         let request = Client::new().put(url).bearer_auth(token).body(compressed);
     263            0 :         let response = request
     264            0 :             .send()
     265            0 :             .await
     266            0 :             .context("writing to endpoint storage")?;
     267            0 :         let state_upload_time_ms = now.elapsed().as_millis() as u32;
     268            0 :         let status = response.status();
     269            0 :         if status != StatusCode::OK {
     270            0 :             bail!("request to endpoint storage failed: {status}");
     271            0 :         }
     272              : 
     273            0 :         Ok(LfcOffloadState::Completed {
     274            0 :             compress_time_ms,
     275            0 :             state_query_time_ms,
     276            0 :             state_upload_time_ms,
     277            0 :         })
     278            0 :     }
     279              : 
     280            0 :     pub fn cancel_prewarm(self: &Arc<Self>) {
     281            0 :         self.state.lock().unwrap().lfc_prewarm_token.cancel();
     282            0 :     }
     283              : }
        

Generated by: LCOV version 2.1-beta