Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
4 : use compute_api::responses::LfcOffloadState;
5 : use compute_api::responses::LfcPrewarmState;
6 : use http::StatusCode;
7 : use reqwest::Client;
8 : use std::mem::replace;
9 : use std::sync::Arc;
10 : use std::time::Instant;
11 : use tokio::{io::AsyncReadExt, select, spawn};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{error, info};
14 :
15 : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
16 : struct EndpointStoragePair {
17 : url: String,
18 : token: String,
19 : }
20 :
21 : const KEY: &str = "lfc_state";
22 : impl EndpointStoragePair {
23 : /// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
24 : /// If not None, takes precedence over pspec.spec.endpoint_id
25 0 : fn from_spec_and_endpoint(
26 0 : pspec: &crate::compute::ParsedSpec,
27 0 : endpoint_id: Option<String>,
28 0 : ) -> Result<Self> {
29 0 : let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
30 0 : let Some(ref endpoint_id) = endpoint_id else {
31 0 : bail!("pspec.endpoint_id missing, other endpoint_id not provided")
32 : };
33 0 : let Some(ref base_uri) = pspec.endpoint_storage_addr else {
34 0 : bail!("pspec.endpoint_storage_addr missing")
35 : };
36 0 : let tenant_id = pspec.tenant_id;
37 0 : let timeline_id = pspec.timeline_id;
38 :
39 0 : let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
40 0 : let Some(ref token) = pspec.endpoint_storage_token else {
41 0 : bail!("pspec.endpoint_storage_token missing")
42 : };
43 0 : let token = token.clone();
44 0 : Ok(EndpointStoragePair { url, token })
45 0 : }
46 : }
47 :
48 : impl ComputeNode {
49 0 : pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
50 0 : self.state.lock().unwrap().lfc_prewarm_state.clone()
51 0 : }
52 :
53 0 : pub fn lfc_offload_state(&self) -> LfcOffloadState {
54 0 : self.state.lock().unwrap().lfc_offload_state.clone()
55 0 : }
56 :
57 : /// If there is a prewarm request ongoing, return `false`, `true` otherwise.
58 : /// Has a failpoint "compute-prewarm"
59 0 : pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
60 : let token: CancellationToken;
61 : {
62 0 : let state = &mut self.state.lock().unwrap();
63 0 : token = state.lfc_prewarm_token.clone();
64 : if let LfcPrewarmState::Prewarming =
65 0 : replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
66 : {
67 0 : return false;
68 0 : }
69 : }
70 0 : crate::metrics::LFC_PREWARMS.inc();
71 :
72 0 : let this = self.clone();
73 0 : spawn(async move {
74 0 : let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
75 0 : Ok(state) => state,
76 0 : Err(err) => {
77 0 : crate::metrics::LFC_PREWARM_ERRORS.inc();
78 0 : error!(%err, "could not prewarm LFC");
79 0 : let error = format!("{err:#}");
80 0 : LfcPrewarmState::Failed { error }
81 : }
82 : };
83 :
84 0 : let state = &mut this.state.lock().unwrap();
85 0 : if let LfcPrewarmState::Cancelled = prewarm_state {
86 0 : state.lfc_prewarm_token = CancellationToken::new();
87 0 : }
88 0 : state.lfc_prewarm_state = prewarm_state;
89 0 : });
90 0 : true
91 0 : }
92 :
93 : /// from_endpoint: None for endpoint managed by this compute_ctl
94 0 : fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
95 0 : let state = self.state.lock().unwrap();
96 0 : EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
97 0 : }
98 :
99 : /// Request LFC state from endpoint storage and load corresponding pages into Postgres.
100 0 : async fn prewarm_impl(
101 0 : &self,
102 0 : from_endpoint: Option<String>,
103 0 : token: CancellationToken,
104 0 : ) -> Result<LfcPrewarmState> {
105 : let EndpointStoragePair {
106 0 : url,
107 0 : token: storage_token,
108 0 : } = self.endpoint_storage_pair(from_endpoint)?;
109 :
110 : #[cfg(feature = "testing")]
111 0 : fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
112 :
113 0 : info!(%url, "requesting LFC state from endpoint storage");
114 0 : let mut now = Instant::now();
115 0 : let request = Client::new().get(&url).bearer_auth(storage_token);
116 0 : let response = select! {
117 0 : _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
118 0 : response = request.send() => response
119 : }
120 0 : .context("querying endpoint storage")?;
121 :
122 0 : match response.status() {
123 0 : StatusCode::OK => (),
124 0 : StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
125 0 : status => bail!("{status} querying endpoint storage"),
126 : }
127 0 : let state_download_time_ms = now.elapsed().as_millis() as u32;
128 0 : now = Instant::now();
129 :
130 0 : let mut uncompressed = Vec::new();
131 0 : let lfc_state = select! {
132 0 : _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
133 0 : lfc_state = response.bytes() => lfc_state
134 : }
135 0 : .context("getting request body from endpoint storage")?;
136 :
137 0 : let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
138 0 : select! {
139 0 : _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
140 0 : read = decoder.read_to_end(&mut uncompressed) => read
141 : }
142 0 : .context("decoding LFC state")?;
143 0 : let uncompress_time_ms = now.elapsed().as_millis() as u32;
144 0 : now = Instant::now();
145 :
146 0 : let uncompressed_len = uncompressed.len();
147 0 : info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
148 :
149 : // Client connection and prewarm info querying are fast and therefore don't need
150 : // cancellation
151 0 : let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
152 0 : .await
153 0 : .context("connecting to postgres")?;
154 0 : let pg_token = client.cancel_token();
155 :
156 0 : let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
157 0 : select! {
158 0 : res = client.query_one("select neon.prewarm_local_cache($1)", ¶ms) => res,
159 0 : _ = token.cancelled() => {
160 0 : pg_token.cancel_query(postgres::NoTls).await
161 0 : .context("cancelling neon.prewarm_local_cache()")?;
162 0 : return Ok(LfcPrewarmState::Cancelled)
163 : }
164 : }
165 0 : .context("loading LFC state into postgres")
166 0 : .map(|_| ())?;
167 0 : let prewarm_time_ms = now.elapsed().as_millis() as u32;
168 :
169 0 : let row = client
170 0 : .query_one("select * from neon.get_prewarm_info()", &[])
171 0 : .await
172 0 : .context("querying prewarm info")?;
173 0 : let total = row.try_get(0).unwrap_or_default();
174 0 : let prewarmed = row.try_get(1).unwrap_or_default();
175 0 : let skipped = row.try_get(2).unwrap_or_default();
176 :
177 0 : Ok(LfcPrewarmState::Completed {
178 0 : total,
179 0 : prewarmed,
180 0 : skipped,
181 0 : state_download_time_ms,
182 0 : uncompress_time_ms,
183 0 : prewarm_time_ms,
184 0 : })
185 0 : }
186 :
187 : /// If offload request is ongoing, return false, true otherwise
188 0 : pub fn offload_lfc(self: &Arc<Self>) -> bool {
189 : {
190 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
191 0 : if matches!(
192 0 : replace(state, LfcOffloadState::Offloading),
193 : LfcOffloadState::Offloading
194 : ) {
195 0 : return false;
196 0 : }
197 : }
198 0 : let cloned = self.clone();
199 0 : spawn(async move { cloned.offload_lfc_with_state_update().await });
200 0 : true
201 0 : }
202 :
203 0 : pub async fn offload_lfc_async(self: &Arc<Self>) {
204 : {
205 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
206 0 : if matches!(
207 0 : replace(state, LfcOffloadState::Offloading),
208 : LfcOffloadState::Offloading
209 : ) {
210 0 : return;
211 0 : }
212 : }
213 0 : self.offload_lfc_with_state_update().await
214 0 : }
215 :
216 0 : async fn offload_lfc_with_state_update(&self) {
217 0 : crate::metrics::LFC_OFFLOADS.inc();
218 0 : let state = match self.offload_lfc_impl().await {
219 0 : Ok(state) => state,
220 0 : Err(err) => {
221 0 : crate::metrics::LFC_OFFLOAD_ERRORS.inc();
222 0 : error!(%err, "could not offload LFC");
223 0 : let error = format!("{err:#}");
224 0 : LfcOffloadState::Failed { error }
225 : }
226 : };
227 0 : self.state.lock().unwrap().lfc_offload_state = state;
228 0 : }
229 :
230 0 : async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
231 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
232 0 : info!(%url, "requesting LFC state from Postgres");
233 :
234 0 : let mut now = Instant::now();
235 0 : let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
236 0 : .await
237 0 : .context("connecting to postgres")?
238 0 : .query_one("select neon.get_local_cache_state()", &[])
239 0 : .await
240 0 : .context("querying LFC state")?;
241 0 : let state = row
242 0 : .try_get::<usize, Option<&[u8]>>(0)
243 0 : .context("deserializing LFC state")?;
244 0 : let Some(state) = state else {
245 0 : info!(%url, "empty LFC state, not exporting");
246 0 : return Ok(LfcOffloadState::Skipped);
247 : };
248 0 : let state_query_time_ms = now.elapsed().as_millis() as u32;
249 0 : now = Instant::now();
250 :
251 0 : let mut compressed = Vec::new();
252 0 : ZstdEncoder::new(state)
253 0 : .read_to_end(&mut compressed)
254 0 : .await
255 0 : .context("compressing LFC state")?;
256 0 : let compress_time_ms = now.elapsed().as_millis() as u32;
257 0 : now = Instant::now();
258 :
259 0 : let compressed_len = compressed.len();
260 0 : info!(%url, "downloaded LFC state, compressed size {compressed_len}");
261 :
262 0 : let request = Client::new().put(url).bearer_auth(token).body(compressed);
263 0 : let response = request
264 0 : .send()
265 0 : .await
266 0 : .context("writing to endpoint storage")?;
267 0 : let state_upload_time_ms = now.elapsed().as_millis() as u32;
268 0 : let status = response.status();
269 0 : if status != StatusCode::OK {
270 0 : bail!("request to endpoint storage failed: {status}");
271 0 : }
272 :
273 0 : Ok(LfcOffloadState::Completed {
274 0 : compress_time_ms,
275 0 : state_query_time_ms,
276 0 : state_upload_time_ms,
277 0 : })
278 0 : }
279 :
280 0 : pub fn cancel_prewarm(self: &Arc<Self>) {
281 0 : self.state.lock().unwrap().lfc_prewarm_token.cancel();
282 0 : }
283 : }
|