Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
4 : use compute_api::responses::LfcOffloadState;
5 : use compute_api::responses::LfcPrewarmState;
6 : use http::StatusCode;
7 : use reqwest::Client;
8 : use std::mem::replace;
9 : use std::sync::Arc;
10 : use tokio::{io::AsyncReadExt, spawn};
11 : use tracing::{error, info};
12 :
13 : #[derive(serde::Serialize, Default)]
14 : pub struct LfcPrewarmStateWithProgress {
15 : #[serde(flatten)]
16 : base: LfcPrewarmState,
17 : total: i32,
18 : prewarmed: i32,
19 : skipped: i32,
20 : }
21 :
22 : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
23 : struct EndpointStoragePair {
24 : url: String,
25 : token: String,
26 : }
27 :
28 : const KEY: &str = "lfc_state";
29 : impl EndpointStoragePair {
30 : /// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
31 : /// If not None, takes precedence over pspec.spec.endpoint_id
32 0 : fn from_spec_and_endpoint(
33 0 : pspec: &crate::compute::ParsedSpec,
34 0 : endpoint_id: Option<String>,
35 0 : ) -> Result<Self> {
36 0 : let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
37 0 : let Some(ref endpoint_id) = endpoint_id else {
38 0 : bail!("pspec.endpoint_id missing, other endpoint_id not provided")
39 : };
40 0 : let Some(ref base_uri) = pspec.endpoint_storage_addr else {
41 0 : bail!("pspec.endpoint_storage_addr missing")
42 : };
43 0 : let tenant_id = pspec.tenant_id;
44 0 : let timeline_id = pspec.timeline_id;
45 :
46 0 : let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
47 0 : let Some(ref token) = pspec.endpoint_storage_token else {
48 0 : bail!("pspec.endpoint_storage_token missing")
49 : };
50 0 : let token = token.clone();
51 0 : Ok(EndpointStoragePair { url, token })
52 0 : }
53 : }
54 :
55 : impl ComputeNode {
56 : // If prewarm failed, we want to get overall number of segments as well as done ones.
57 : // However, this function should be reliable even if querying postgres failed.
58 0 : pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
59 0 : info!("requesting LFC prewarm state from postgres");
60 0 : let mut state = LfcPrewarmStateWithProgress::default();
61 0 : {
62 0 : state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
63 0 : }
64 :
65 0 : let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
66 0 : Ok(client) => client,
67 0 : Err(err) => {
68 0 : error!(%err, "connecting to postgres");
69 0 : return state;
70 : }
71 : };
72 0 : let row = match client
73 0 : .query_one("select * from neon.get_prewarm_info()", &[])
74 0 : .await
75 : {
76 0 : Ok(row) => row,
77 0 : Err(err) => {
78 0 : error!(%err, "querying LFC prewarm status");
79 0 : return state;
80 : }
81 : };
82 0 : state.total = row.try_get(0).unwrap_or_default();
83 0 : state.prewarmed = row.try_get(1).unwrap_or_default();
84 0 : state.skipped = row.try_get(2).unwrap_or_default();
85 0 : state
86 0 : }
87 :
88 0 : pub fn lfc_offload_state(&self) -> LfcOffloadState {
89 0 : self.state.lock().unwrap().lfc_offload_state.clone()
90 0 : }
91 :
92 : /// If there is a prewarm request ongoing, return `false`, `true` otherwise.
93 : /// Has a failpoint "compute-prewarm"
94 0 : pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
95 : {
96 0 : let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
97 0 : if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
98 0 : return false;
99 0 : }
100 : }
101 0 : crate::metrics::LFC_PREWARMS.inc();
102 :
103 0 : let cloned = self.clone();
104 0 : spawn(async move {
105 0 : let state = match cloned.prewarm_impl(from_endpoint).await {
106 0 : Ok(true) => LfcPrewarmState::Completed,
107 : Ok(false) => {
108 0 : info!(
109 0 : "skipping LFC prewarm because LFC state is not found in endpoint storage"
110 : );
111 0 : LfcPrewarmState::Skipped
112 : }
113 0 : Err(err) => {
114 0 : crate::metrics::LFC_PREWARM_ERRORS.inc();
115 0 : error!(%err, "could not prewarm LFC");
116 0 : LfcPrewarmState::Failed {
117 0 : error: format!("{err:#}"),
118 0 : }
119 : }
120 : };
121 :
122 0 : cloned.state.lock().unwrap().lfc_prewarm_state = state;
123 0 : });
124 0 : true
125 0 : }
126 :
127 : /// from_endpoint: None for endpoint managed by this compute_ctl
128 0 : fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
129 0 : let state = self.state.lock().unwrap();
130 0 : EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
131 0 : }
132 :
133 : /// Request LFC state from endpoint storage and load corresponding pages into Postgres.
134 : /// Returns a result with `false` if the LFC state is not found in endpoint storage.
135 0 : async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
136 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
137 :
138 : #[cfg(feature = "testing")]
139 0 : fail::fail_point!("compute-prewarm", |_| {
140 0 : bail!("prewarm configured to fail because of a failpoint")
141 0 : });
142 :
143 0 : info!(%url, "requesting LFC state from endpoint storage");
144 0 : let request = Client::new().get(&url).bearer_auth(token);
145 0 : let res = request.send().await.context("querying endpoint storage")?;
146 0 : match res.status() {
147 0 : StatusCode::OK => (),
148 : StatusCode::NOT_FOUND => {
149 0 : return Ok(false);
150 : }
151 0 : status => bail!("{status} querying endpoint storage"),
152 : }
153 :
154 0 : let mut uncompressed = Vec::new();
155 0 : let lfc_state = res
156 0 : .bytes()
157 0 : .await
158 0 : .context("getting request body from endpoint storage")?;
159 0 : ZstdDecoder::new(lfc_state.iter().as_slice())
160 0 : .read_to_end(&mut uncompressed)
161 0 : .await
162 0 : .context("decoding LFC state")?;
163 0 : let uncompressed_len = uncompressed.len();
164 :
165 0 : info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
166 :
167 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
168 0 : .await
169 0 : .context("connecting to postgres")?
170 0 : .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
171 0 : .await
172 0 : .context("loading LFC state into postgres")
173 0 : .map(|_| ())?;
174 :
175 0 : Ok(true)
176 0 : }
177 :
178 : /// If offload request is ongoing, return false, true otherwise
179 0 : pub fn offload_lfc(self: &Arc<Self>) -> bool {
180 : {
181 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
182 0 : if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
183 0 : return false;
184 0 : }
185 : }
186 0 : let cloned = self.clone();
187 0 : spawn(async move { cloned.offload_lfc_with_state_update().await });
188 0 : true
189 0 : }
190 :
191 0 : pub async fn offload_lfc_async(self: &Arc<Self>) {
192 : {
193 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
194 0 : if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
195 0 : return;
196 0 : }
197 : }
198 0 : self.offload_lfc_with_state_update().await
199 0 : }
200 :
201 0 : async fn offload_lfc_with_state_update(&self) {
202 0 : crate::metrics::LFC_OFFLOADS.inc();
203 :
204 0 : let Err(err) = self.offload_lfc_impl().await else {
205 0 : self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
206 0 : return;
207 : };
208 :
209 0 : crate::metrics::LFC_OFFLOAD_ERRORS.inc();
210 0 : error!(%err, "could not offload LFC state to endpoint storage");
211 0 : self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
212 0 : error: format!("{err:#}"),
213 0 : };
214 0 : }
215 :
216 0 : async fn offload_lfc_impl(&self) -> Result<()> {
217 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
218 0 : info!(%url, "requesting LFC state from Postgres");
219 :
220 0 : let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
221 0 : .await
222 0 : .context("connecting to postgres")?
223 0 : .query_one("select neon.get_local_cache_state()", &[])
224 0 : .await
225 0 : .context("querying LFC state")?;
226 0 : let state = row
227 0 : .try_get::<usize, Option<&[u8]>>(0)
228 0 : .context("deserializing LFC state")?;
229 0 : let Some(state) = state else {
230 0 : info!(%url, "empty LFC state, not exporting");
231 0 : return Ok(());
232 : };
233 :
234 0 : let mut compressed = Vec::new();
235 0 : ZstdEncoder::new(state)
236 0 : .read_to_end(&mut compressed)
237 0 : .await
238 0 : .context("compressing LFC state")?;
239 :
240 0 : let compressed_len = compressed.len();
241 0 : info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
242 :
243 0 : let request = Client::new().put(url).bearer_auth(token).body(compressed);
244 0 : match request.send().await {
245 0 : Ok(res) if res.status() == StatusCode::OK => Ok(()),
246 0 : Ok(res) => bail!(
247 0 : "Request to endpoint storage failed with status: {}",
248 0 : res.status()
249 : ),
250 0 : Err(err) => Err(err).context("writing to endpoint storage"),
251 : }
252 0 : }
253 : }
|