Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
4 : use compute_api::responses::LfcOffloadState;
5 : use compute_api::responses::LfcPrewarmState;
6 : use http::StatusCode;
7 : use reqwest::Client;
8 : use std::mem::replace;
9 : use std::sync::Arc;
10 : use tokio::{io::AsyncReadExt, select, spawn};
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{error, info};
13 :
14 : #[derive(serde::Serialize, Default)]
15 : pub struct LfcPrewarmStateWithProgress {
16 : #[serde(flatten)]
17 : base: LfcPrewarmState,
18 : total: i32,
19 : prewarmed: i32,
20 : skipped: i32,
21 : }
22 :
23 : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
24 : struct EndpointStoragePair {
25 : url: String,
26 : token: String,
27 : }
28 :
29 : const KEY: &str = "lfc_state";
30 : impl EndpointStoragePair {
31 : /// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
32 : /// If not None, takes precedence over pspec.spec.endpoint_id
33 0 : fn from_spec_and_endpoint(
34 0 : pspec: &crate::compute::ParsedSpec,
35 0 : endpoint_id: Option<String>,
36 0 : ) -> Result<Self> {
37 0 : let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
38 0 : let Some(ref endpoint_id) = endpoint_id else {
39 0 : bail!("pspec.endpoint_id missing, other endpoint_id not provided")
40 : };
41 0 : let Some(ref base_uri) = pspec.endpoint_storage_addr else {
42 0 : bail!("pspec.endpoint_storage_addr missing")
43 : };
44 0 : let tenant_id = pspec.tenant_id;
45 0 : let timeline_id = pspec.timeline_id;
46 :
47 0 : let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
48 0 : let Some(ref token) = pspec.endpoint_storage_token else {
49 0 : bail!("pspec.endpoint_storage_token missing")
50 : };
51 0 : let token = token.clone();
52 0 : Ok(EndpointStoragePair { url, token })
53 0 : }
54 : }
55 :
56 : impl ComputeNode {
57 : // If prewarm failed, we want to get overall number of segments as well as done ones.
58 : // However, this function should be reliable even if querying postgres failed.
59 0 : pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
60 0 : info!("requesting LFC prewarm state from postgres");
61 0 : let mut state = LfcPrewarmStateWithProgress::default();
62 0 : {
63 0 : state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
64 0 : }
65 :
66 0 : let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
67 0 : Ok(client) => client,
68 0 : Err(err) => {
69 0 : error!(%err, "connecting to postgres");
70 0 : return state;
71 : }
72 : };
73 0 : let row = match client
74 0 : .query_one("select * from neon.get_prewarm_info()", &[])
75 0 : .await
76 : {
77 0 : Ok(row) => row,
78 0 : Err(err) => {
79 0 : error!(%err, "querying LFC prewarm status");
80 0 : return state;
81 : }
82 : };
83 0 : state.total = row.try_get(0).unwrap_or_default();
84 0 : state.prewarmed = row.try_get(1).unwrap_or_default();
85 0 : state.skipped = row.try_get(2).unwrap_or_default();
86 0 : state
87 0 : }
88 :
89 0 : pub fn lfc_offload_state(&self) -> LfcOffloadState {
90 0 : self.state.lock().unwrap().lfc_offload_state.clone()
91 0 : }
92 :
93 : /// If there is a prewarm request ongoing, return `false`, `true` otherwise.
94 : /// Has a failpoint "compute-prewarm"
95 0 : pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
96 : let token: CancellationToken;
97 : {
98 0 : let state = &mut self.state.lock().unwrap();
99 0 : token = state.lfc_prewarm_token.clone();
100 : if let LfcPrewarmState::Prewarming =
101 0 : replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
102 : {
103 0 : return false;
104 0 : }
105 : }
106 0 : crate::metrics::LFC_PREWARMS.inc();
107 :
108 0 : let this = self.clone();
109 0 : spawn(async move {
110 0 : let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
111 0 : Ok(state) => state,
112 0 : Err(err) => {
113 0 : crate::metrics::LFC_PREWARM_ERRORS.inc();
114 0 : error!(%err, "could not prewarm LFC");
115 0 : let error = format!("{err:#}");
116 0 : LfcPrewarmState::Failed { error }
117 : }
118 : };
119 :
120 0 : let state = &mut this.state.lock().unwrap();
121 0 : if let LfcPrewarmState::Cancelled = prewarm_state {
122 0 : state.lfc_prewarm_token = CancellationToken::new();
123 0 : }
124 0 : state.lfc_prewarm_state = prewarm_state;
125 0 : });
126 0 : true
127 0 : }
128 :
129 : /// from_endpoint: None for endpoint managed by this compute_ctl
130 0 : fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
131 0 : let state = self.state.lock().unwrap();
132 0 : EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
133 0 : }
134 :
135 : /// Request LFC state from endpoint storage and load corresponding pages into Postgres.
136 : /// Returns a result with `false` if the LFC state is not found in endpoint storage.
137 0 : async fn prewarm_impl(
138 0 : &self,
139 0 : from_endpoint: Option<String>,
140 0 : token: CancellationToken,
141 0 : ) -> Result<LfcPrewarmState> {
142 : let EndpointStoragePair {
143 0 : url,
144 0 : token: storage_token,
145 0 : } = self.endpoint_storage_pair(from_endpoint)?;
146 :
147 : #[cfg(feature = "testing")]
148 0 : fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
149 :
150 0 : info!(%url, "requesting LFC state from endpoint storage");
151 0 : let request = Client::new().get(&url).bearer_auth(storage_token);
152 0 : let response = select! {
153 0 : _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
154 0 : response = request.send() => response
155 : }
156 0 : .context("querying endpoint storage")?;
157 :
158 0 : match response.status() {
159 0 : StatusCode::OK => (),
160 0 : StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
161 0 : status => bail!("{status} querying endpoint storage"),
162 : }
163 :
164 0 : let mut uncompressed = Vec::new();
165 0 : let lfc_state = select! {
166 0 : _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
167 0 : lfc_state = response.bytes() => lfc_state
168 : }
169 0 : .context("getting request body from endpoint storage")?;
170 :
171 0 : let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
172 0 : select! {
173 0 : _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
174 0 : read = decoder.read_to_end(&mut uncompressed) => read
175 : }
176 0 : .context("decoding LFC state")?;
177 :
178 0 : let uncompressed_len = uncompressed.len();
179 0 : info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
180 :
181 : // Client connection and prewarm info querying are fast and therefore don't need
182 : // cancellation
183 0 : let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
184 0 : .await
185 0 : .context("connecting to postgres")?;
186 0 : let pg_token = client.cancel_token();
187 :
188 0 : let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
189 0 : select! {
190 0 : res = client.query_one("select neon.prewarm_local_cache($1)", ¶ms) => res,
191 0 : _ = token.cancelled() => {
192 0 : pg_token.cancel_query(postgres::NoTls).await
193 0 : .context("cancelling neon.prewarm_local_cache()")?;
194 0 : return Ok(LfcPrewarmState::Cancelled)
195 : }
196 : }
197 0 : .context("loading LFC state into postgres")
198 0 : .map(|_| ())?;
199 :
200 0 : Ok(LfcPrewarmState::Completed)
201 0 : }
202 :
203 : /// If offload request is ongoing, return false, true otherwise
204 0 : pub fn offload_lfc(self: &Arc<Self>) -> bool {
205 : {
206 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
207 0 : if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
208 0 : return false;
209 0 : }
210 : }
211 0 : let cloned = self.clone();
212 0 : spawn(async move { cloned.offload_lfc_with_state_update().await });
213 0 : true
214 0 : }
215 :
216 0 : pub async fn offload_lfc_async(self: &Arc<Self>) {
217 : {
218 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
219 0 : if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
220 0 : return;
221 0 : }
222 : }
223 0 : self.offload_lfc_with_state_update().await
224 0 : }
225 :
226 0 : async fn offload_lfc_with_state_update(&self) {
227 0 : crate::metrics::LFC_OFFLOADS.inc();
228 0 : let state = match self.offload_lfc_impl().await {
229 0 : Ok(state) => state,
230 0 : Err(err) => {
231 0 : crate::metrics::LFC_OFFLOAD_ERRORS.inc();
232 0 : error!(%err, "could not offload LFC");
233 0 : let error = format!("{err:#}");
234 0 : LfcOffloadState::Failed { error }
235 : }
236 : };
237 :
238 0 : self.state.lock().unwrap().lfc_offload_state = state;
239 0 : }
240 :
241 0 : async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
242 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
243 0 : info!(%url, "requesting LFC state from Postgres");
244 :
245 0 : let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
246 0 : .await
247 0 : .context("connecting to postgres")?
248 0 : .query_one("select neon.get_local_cache_state()", &[])
249 0 : .await
250 0 : .context("querying LFC state")?;
251 0 : let state = row
252 0 : .try_get::<usize, Option<&[u8]>>(0)
253 0 : .context("deserializing LFC state")?;
254 0 : let Some(state) = state else {
255 0 : info!(%url, "empty LFC state, not exporting");
256 0 : return Ok(LfcOffloadState::Skipped);
257 : };
258 :
259 0 : let mut compressed = Vec::new();
260 0 : ZstdEncoder::new(state)
261 0 : .read_to_end(&mut compressed)
262 0 : .await
263 0 : .context("compressing LFC state")?;
264 :
265 0 : let compressed_len = compressed.len();
266 0 : info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
267 :
268 0 : let request = Client::new().put(url).bearer_auth(token).body(compressed);
269 0 : match request.send().await {
270 0 : Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
271 0 : Ok(res) => bail!(
272 0 : "Request to endpoint storage failed with status: {}",
273 0 : res.status()
274 : ),
275 0 : Err(err) => Err(err).context("writing to endpoint storage"),
276 : }
277 0 : }
278 :
279 0 : pub fn cancel_prewarm(self: &Arc<Self>) {
280 0 : self.state.lock().unwrap().lfc_prewarm_token.cancel();
281 0 : }
282 : }
|