Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
4 : use compute_api::responses::LfcOffloadState;
5 : use compute_api::responses::LfcPrewarmState;
6 : use http::StatusCode;
7 : use reqwest::Client;
8 : use std::sync::Arc;
9 : use tokio::{io::AsyncReadExt, spawn};
10 : use tracing::{error, info};
11 :
12 : #[derive(serde::Serialize, Default)]
13 : pub struct LfcPrewarmStateWithProgress {
14 : #[serde(flatten)]
15 : base: LfcPrewarmState,
16 : total: i32,
17 : prewarmed: i32,
18 : skipped: i32,
19 : }
20 :
21 : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
22 : struct EndpointStoragePair {
23 : url: String,
24 : token: String,
25 : }
26 :
27 : const KEY: &str = "lfc_state";
28 : impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
29 : type Error = anyhow::Error;
30 0 : fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
31 0 : let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
32 0 : bail!("pspec.endpoint_id missing")
33 : };
34 0 : let Some(ref base_uri) = pspec.endpoint_storage_addr else {
35 0 : bail!("pspec.endpoint_storage_addr missing")
36 : };
37 0 : let tenant_id = pspec.tenant_id;
38 0 : let timeline_id = pspec.timeline_id;
39 0 :
40 0 : let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
41 0 : let Some(ref token) = pspec.endpoint_storage_token else {
42 0 : bail!("pspec.endpoint_storage_token missing")
43 : };
44 0 : let token = token.clone();
45 0 : Ok(EndpointStoragePair { url, token })
46 0 : }
47 : }
48 :
49 : impl ComputeNode {
50 : // If prewarm failed, we want to get overall number of segments as well as done ones.
51 : // However, this function should be reliable even if querying postgres failed.
52 0 : pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
53 0 : info!("requesting LFC prewarm state from postgres");
54 0 : let mut state = LfcPrewarmStateWithProgress::default();
55 0 : {
56 0 : state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
57 0 : }
58 :
59 0 : let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
60 0 : Ok(client) => client,
61 0 : Err(err) => {
62 0 : error!(%err, "connecting to postgres");
63 0 : return state;
64 : }
65 : };
66 0 : let row = match client
67 0 : .query_one("select * from get_prewarm_info()", &[])
68 0 : .await
69 : {
70 0 : Ok(row) => row,
71 0 : Err(err) => {
72 0 : error!(%err, "querying LFC prewarm status");
73 0 : return state;
74 : }
75 : };
76 0 : state.total = row.try_get(0).unwrap_or_default();
77 0 : state.prewarmed = row.try_get(1).unwrap_or_default();
78 0 : state.skipped = row.try_get(2).unwrap_or_default();
79 0 : state
80 0 : }
81 :
82 0 : pub fn lfc_offload_state(&self) -> LfcOffloadState {
83 0 : self.state.lock().unwrap().lfc_offload_state.clone()
84 0 : }
85 :
86 : /// Returns false if there is a prewarm request ongoing, true otherwise
87 0 : pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
88 0 : crate::metrics::LFC_PREWARM_REQUESTS.inc();
89 0 : {
90 0 : let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
91 0 : if let LfcPrewarmState::Prewarming =
92 0 : std::mem::replace(state, LfcPrewarmState::Prewarming)
93 : {
94 0 : return false;
95 0 : }
96 0 : }
97 0 :
98 0 : let cloned = self.clone();
99 0 : spawn(async move {
100 0 : let Err(err) = cloned.prewarm_impl().await else {
101 0 : cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
102 0 : return;
103 : };
104 0 : error!(%err);
105 0 : cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
106 0 : error: err.to_string(),
107 0 : };
108 0 : });
109 0 : true
110 0 : }
111 :
112 0 : fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
113 0 : let state = self.state.lock().unwrap();
114 0 : state.pspec.as_ref().unwrap().try_into()
115 0 : }
116 :
117 0 : async fn prewarm_impl(&self) -> Result<()> {
118 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
119 0 : info!(%url, "requesting LFC state from endpoint storage");
120 :
121 0 : let request = Client::new().get(&url).bearer_auth(token);
122 0 : let res = request.send().await.context("querying endpoint storage")?;
123 0 : let status = res.status();
124 0 : if status != StatusCode::OK {
125 0 : bail!("{status} querying endpoint storage")
126 0 : }
127 0 :
128 0 : let mut uncompressed = Vec::new();
129 0 : let lfc_state = res
130 0 : .bytes()
131 0 : .await
132 0 : .context("getting request body from endpoint storage")?;
133 0 : ZstdDecoder::new(lfc_state.iter().as_slice())
134 0 : .read_to_end(&mut uncompressed)
135 0 : .await
136 0 : .context("decoding LFC state")?;
137 0 : let uncompressed_len = uncompressed.len();
138 0 : info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
139 :
140 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
141 0 : .await
142 0 : .context("connecting to postgres")?
143 0 : .query_one("select prewarm_local_cache($1)", &[&uncompressed])
144 0 : .await
145 0 : .context("loading LFC state into postgres")
146 0 : .map(|_| ())
147 0 : }
148 :
149 : /// Returns false if there is an offload request ongoing, true otherwise
150 0 : pub fn offload_lfc(self: &Arc<Self>) -> bool {
151 0 : crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
152 0 : {
153 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
154 0 : if let LfcOffloadState::Offloading =
155 0 : std::mem::replace(state, LfcOffloadState::Offloading)
156 : {
157 0 : return false;
158 0 : }
159 0 : }
160 0 :
161 0 : let cloned = self.clone();
162 0 : spawn(async move {
163 0 : let Err(err) = cloned.offload_lfc_impl().await else {
164 0 : cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
165 0 : return;
166 : };
167 0 : error!(%err);
168 0 : cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
169 0 : error: err.to_string(),
170 0 : };
171 0 : });
172 0 : true
173 0 : }
174 :
175 0 : async fn offload_lfc_impl(&self) -> Result<()> {
176 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
177 0 : info!(%url, "requesting LFC state from postgres");
178 :
179 0 : let mut compressed = Vec::new();
180 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
181 0 : .await
182 0 : .context("connecting to postgres")?
183 0 : .query_one("select get_local_cache_state()", &[])
184 0 : .await
185 0 : .context("querying LFC state")?
186 0 : .try_get::<usize, &[u8]>(0)
187 0 : .context("deserializing LFC state")
188 0 : .map(ZstdEncoder::new)?
189 0 : .read_to_end(&mut compressed)
190 0 : .await
191 0 : .context("compressing LFC state")?;
192 0 : let compressed_len = compressed.len();
193 0 : info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
194 :
195 0 : let request = Client::new().put(url).bearer_auth(token).body(compressed);
196 0 : match request.send().await {
197 0 : Ok(res) if res.status() == StatusCode::OK => Ok(()),
198 0 : Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
199 0 : Err(err) => Err(err).context("writing to endpoint storage"),
200 : }
201 0 : }
202 : }
|