LCOV - code coverage report
Current view: top level - compute_tools/src - compute_prewarm.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 0.0 % 141 0
Test Date: 2025-05-26 10:37:33 Functions: 0.0 % 14 0

            Line data    Source code
       1              : use crate::compute::ComputeNode;
       2              : use anyhow::{Context, Result, bail};
       3              : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
       4              : use compute_api::responses::LfcOffloadState;
       5              : use compute_api::responses::LfcPrewarmState;
       6              : use http::StatusCode;
       7              : use reqwest::Client;
       8              : use std::sync::Arc;
       9              : use tokio::{io::AsyncReadExt, spawn};
      10              : use tracing::{error, info};
      11              : 
      12              : #[derive(serde::Serialize, Default)]
      13              : pub struct LfcPrewarmStateWithProgress {
      14              :     #[serde(flatten)]
      15              :     base: LfcPrewarmState,
      16              :     total: i32,
      17              :     prewarmed: i32,
      18              :     skipped: i32,
      19              : }
      20              : 
      21              : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
      22              : struct EndpointStoragePair {
      23              :     url: String,
      24              :     token: String,
      25              : }
      26              : 
      27              : const KEY: &str = "lfc_state";
      28              : impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
      29              :     type Error = anyhow::Error;
      30            0 :     fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
      31            0 :         let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
      32            0 :             bail!("pspec.endpoint_id missing")
      33              :         };
      34            0 :         let Some(ref base_uri) = pspec.endpoint_storage_addr else {
      35            0 :             bail!("pspec.endpoint_storage_addr missing")
      36              :         };
      37            0 :         let tenant_id = pspec.tenant_id;
      38            0 :         let timeline_id = pspec.timeline_id;
      39            0 : 
      40            0 :         let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
      41            0 :         let Some(ref token) = pspec.endpoint_storage_token else {
      42            0 :             bail!("pspec.endpoint_storage_token missing")
      43              :         };
      44            0 :         let token = token.clone();
      45            0 :         Ok(EndpointStoragePair { url, token })
      46            0 :     }
      47              : }
      48              : 
      49              : impl ComputeNode {
      50              :     // If prewarm failed, we want to get overall number of segments as well as done ones.
      51              :     // However, this function should be reliable even if querying postgres failed.
      52            0 :     pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
      53            0 :         info!("requesting LFC prewarm state from postgres");
      54            0 :         let mut state = LfcPrewarmStateWithProgress::default();
      55            0 :         {
      56            0 :             state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
      57            0 :         }
      58              : 
      59            0 :         let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
      60            0 :             Ok(client) => client,
      61            0 :             Err(err) => {
      62            0 :                 error!(%err, "connecting to postgres");
      63            0 :                 return state;
      64              :             }
      65              :         };
      66            0 :         let row = match client
      67            0 :             .query_one("select * from get_prewarm_info()", &[])
      68            0 :             .await
      69              :         {
      70            0 :             Ok(row) => row,
      71            0 :             Err(err) => {
      72            0 :                 error!(%err, "querying LFC prewarm status");
      73            0 :                 return state;
      74              :             }
      75              :         };
      76            0 :         state.total = row.try_get(0).unwrap_or_default();
      77            0 :         state.prewarmed = row.try_get(1).unwrap_or_default();
      78            0 :         state.skipped = row.try_get(2).unwrap_or_default();
      79            0 :         state
      80            0 :     }
      81              : 
      82            0 :     pub fn lfc_offload_state(&self) -> LfcOffloadState {
      83            0 :         self.state.lock().unwrap().lfc_offload_state.clone()
      84            0 :     }
      85              : 
      86              :     /// Returns false if there is a prewarm request ongoing, true otherwise
      87            0 :     pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
      88            0 :         crate::metrics::LFC_PREWARM_REQUESTS.inc();
      89            0 :         {
      90            0 :             let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
      91            0 :             if let LfcPrewarmState::Prewarming =
      92            0 :                 std::mem::replace(state, LfcPrewarmState::Prewarming)
      93              :             {
      94            0 :                 return false;
      95            0 :             }
      96            0 :         }
      97            0 : 
      98            0 :         let cloned = self.clone();
      99            0 :         spawn(async move {
     100            0 :             let Err(err) = cloned.prewarm_impl().await else {
     101            0 :                 cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
     102            0 :                 return;
     103              :             };
     104            0 :             error!(%err);
     105            0 :             cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
     106            0 :                 error: err.to_string(),
     107            0 :             };
     108            0 :         });
     109            0 :         true
     110            0 :     }
     111              : 
     112            0 :     fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
     113            0 :         let state = self.state.lock().unwrap();
     114            0 :         state.pspec.as_ref().unwrap().try_into()
     115            0 :     }
     116              : 
     117            0 :     async fn prewarm_impl(&self) -> Result<()> {
     118            0 :         let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
     119            0 :         info!(%url, "requesting LFC state from endpoint storage");
     120              : 
     121            0 :         let request = Client::new().get(&url).bearer_auth(token);
     122            0 :         let res = request.send().await.context("querying endpoint storage")?;
     123            0 :         let status = res.status();
     124            0 :         if status != StatusCode::OK {
     125            0 :             bail!("{status} querying endpoint storage")
     126            0 :         }
     127            0 : 
     128            0 :         let mut uncompressed = Vec::new();
     129            0 :         let lfc_state = res
     130            0 :             .bytes()
     131            0 :             .await
     132            0 :             .context("getting request body from endpoint storage")?;
     133            0 :         ZstdDecoder::new(lfc_state.iter().as_slice())
     134            0 :             .read_to_end(&mut uncompressed)
     135            0 :             .await
     136            0 :             .context("decoding LFC state")?;
     137            0 :         let uncompressed_len = uncompressed.len();
     138            0 :         info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
     139              : 
     140            0 :         ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     141            0 :             .await
     142            0 :             .context("connecting to postgres")?
     143            0 :             .query_one("select prewarm_local_cache($1)", &[&uncompressed])
     144            0 :             .await
     145            0 :             .context("loading LFC state into postgres")
     146            0 :             .map(|_| ())
     147            0 :     }
     148              : 
     149              :     /// Returns false if there is an offload request ongoing, true otherwise
     150            0 :     pub fn offload_lfc(self: &Arc<Self>) -> bool {
     151            0 :         crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
     152            0 :         {
     153            0 :             let state = &mut self.state.lock().unwrap().lfc_offload_state;
     154            0 :             if let LfcOffloadState::Offloading =
     155            0 :                 std::mem::replace(state, LfcOffloadState::Offloading)
     156              :             {
     157            0 :                 return false;
     158            0 :             }
     159            0 :         }
     160            0 : 
     161            0 :         let cloned = self.clone();
     162            0 :         spawn(async move {
     163            0 :             let Err(err) = cloned.offload_lfc_impl().await else {
     164            0 :                 cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
     165            0 :                 return;
     166              :             };
     167            0 :             error!(%err);
     168            0 :             cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
     169            0 :                 error: err.to_string(),
     170            0 :             };
     171            0 :         });
     172            0 :         true
     173            0 :     }
     174              : 
     175            0 :     async fn offload_lfc_impl(&self) -> Result<()> {
     176            0 :         let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
     177            0 :         info!(%url, "requesting LFC state from postgres");
     178              : 
     179            0 :         let mut compressed = Vec::new();
     180            0 :         ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
     181            0 :             .await
     182            0 :             .context("connecting to postgres")?
     183            0 :             .query_one("select get_local_cache_state()", &[])
     184            0 :             .await
     185            0 :             .context("querying LFC state")?
     186            0 :             .try_get::<usize, &[u8]>(0)
     187            0 :             .context("deserializing LFC state")
     188            0 :             .map(ZstdEncoder::new)?
     189            0 :             .read_to_end(&mut compressed)
     190            0 :             .await
     191            0 :             .context("compressing LFC state")?;
     192            0 :         let compressed_len = compressed.len();
     193            0 :         info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
     194              : 
     195            0 :         let request = Client::new().put(url).bearer_auth(token).body(compressed);
     196            0 :         match request.send().await {
     197            0 :             Ok(res) if res.status() == StatusCode::OK => Ok(()),
     198            0 :             Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
     199            0 :             Err(err) => Err(err).context("writing to endpoint storage"),
     200              :         }
     201            0 :     }
     202              : }
        

Generated by: LCOV version 2.1-beta