Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
4 : use compute_api::responses::LfcOffloadState;
5 : use compute_api::responses::LfcPrewarmState;
6 : use http::StatusCode;
7 : use reqwest::Client;
8 : use std::mem::replace;
9 : use std::sync::Arc;
10 : use tokio::{io::AsyncReadExt, spawn};
11 : use tracing::{error, info};
12 :
13 : #[derive(serde::Serialize, Default)]
14 : pub struct LfcPrewarmStateWithProgress {
15 : #[serde(flatten)]
16 : base: LfcPrewarmState,
17 : total: i32,
18 : prewarmed: i32,
19 : skipped: i32,
20 : }
21 :
22 : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
23 : struct EndpointStoragePair {
24 : url: String,
25 : token: String,
26 : }
27 :
28 : const KEY: &str = "lfc_state";
29 : impl EndpointStoragePair {
30 : /// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
31 : /// If not None, takes precedence over pspec.spec.endpoint_id
32 0 : fn from_spec_and_endpoint(
33 0 : pspec: &crate::compute::ParsedSpec,
34 0 : endpoint_id: Option<String>,
35 0 : ) -> Result<Self> {
36 0 : let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
37 0 : let Some(ref endpoint_id) = endpoint_id else {
38 0 : bail!("pspec.endpoint_id missing, other endpoint_id not provided")
39 : };
40 0 : let Some(ref base_uri) = pspec.endpoint_storage_addr else {
41 0 : bail!("pspec.endpoint_storage_addr missing")
42 : };
43 0 : let tenant_id = pspec.tenant_id;
44 0 : let timeline_id = pspec.timeline_id;
45 :
46 0 : let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
47 0 : let Some(ref token) = pspec.endpoint_storage_token else {
48 0 : bail!("pspec.endpoint_storage_token missing")
49 : };
50 0 : let token = token.clone();
51 0 : Ok(EndpointStoragePair { url, token })
52 0 : }
53 : }
54 :
55 : impl ComputeNode {
56 : // If prewarm failed, we want to get overall number of segments as well as done ones.
57 : // However, this function should be reliable even if querying postgres failed.
58 0 : pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
59 0 : info!("requesting LFC prewarm state from postgres");
60 0 : let mut state = LfcPrewarmStateWithProgress::default();
61 0 : {
62 0 : state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
63 0 : }
64 :
65 0 : let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
66 0 : Ok(client) => client,
67 0 : Err(err) => {
68 0 : error!(%err, "connecting to postgres");
69 0 : return state;
70 : }
71 : };
72 0 : let row = match client
73 0 : .query_one("select * from neon.get_prewarm_info()", &[])
74 0 : .await
75 : {
76 0 : Ok(row) => row,
77 0 : Err(err) => {
78 0 : error!(%err, "querying LFC prewarm status");
79 0 : return state;
80 : }
81 : };
82 0 : state.total = row.try_get(0).unwrap_or_default();
83 0 : state.prewarmed = row.try_get(1).unwrap_or_default();
84 0 : state.skipped = row.try_get(2).unwrap_or_default();
85 0 : state
86 0 : }
87 :
88 0 : pub fn lfc_offload_state(&self) -> LfcOffloadState {
89 0 : self.state.lock().unwrap().lfc_offload_state.clone()
90 0 : }
91 :
92 : /// If there is a prewarm request ongoing, return false, true otherwise
93 0 : pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
94 : {
95 0 : let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
96 0 : if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
97 0 : return false;
98 0 : }
99 : }
100 0 : crate::metrics::LFC_PREWARMS.inc();
101 :
102 0 : let cloned = self.clone();
103 0 : spawn(async move {
104 0 : let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
105 0 : cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
106 0 : return;
107 : };
108 0 : crate::metrics::LFC_PREWARM_ERRORS.inc();
109 0 : error!(%err, "prewarming lfc");
110 0 : cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
111 0 : error: err.to_string(),
112 0 : };
113 0 : });
114 0 : true
115 0 : }
116 :
117 : /// from_endpoint: None for endpoint managed by this compute_ctl
118 0 : fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
119 0 : let state = self.state.lock().unwrap();
120 0 : EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
121 0 : }
122 :
123 0 : async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
124 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
125 0 : info!(%url, "requesting LFC state from endpoint storage");
126 :
127 0 : let request = Client::new().get(&url).bearer_auth(token);
128 0 : let res = request.send().await.context("querying endpoint storage")?;
129 0 : let status = res.status();
130 0 : if status != StatusCode::OK {
131 0 : bail!("{status} querying endpoint storage")
132 0 : }
133 :
134 0 : let mut uncompressed = Vec::new();
135 0 : let lfc_state = res
136 0 : .bytes()
137 0 : .await
138 0 : .context("getting request body from endpoint storage")?;
139 0 : ZstdDecoder::new(lfc_state.iter().as_slice())
140 0 : .read_to_end(&mut uncompressed)
141 0 : .await
142 0 : .context("decoding LFC state")?;
143 0 : let uncompressed_len = uncompressed.len();
144 0 : info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
145 :
146 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
147 0 : .await
148 0 : .context("connecting to postgres")?
149 0 : .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
150 0 : .await
151 0 : .context("loading LFC state into postgres")
152 0 : .map(|_| ())
153 0 : }
154 :
155 : /// If offload request is ongoing, return false, true otherwise
156 0 : pub fn offload_lfc(self: &Arc<Self>) -> bool {
157 : {
158 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
159 0 : if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
160 0 : return false;
161 0 : }
162 : }
163 0 : let cloned = self.clone();
164 0 : spawn(async move { cloned.offload_lfc_with_state_update().await });
165 0 : true
166 0 : }
167 :
168 0 : pub async fn offload_lfc_async(self: &Arc<Self>) {
169 : {
170 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
171 0 : if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
172 0 : return;
173 0 : }
174 : }
175 0 : self.offload_lfc_with_state_update().await
176 0 : }
177 :
178 0 : async fn offload_lfc_with_state_update(&self) {
179 0 : crate::metrics::LFC_OFFLOADS.inc();
180 0 : let Err(err) = self.offload_lfc_impl().await else {
181 0 : self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
182 0 : return;
183 : };
184 0 : crate::metrics::LFC_OFFLOAD_ERRORS.inc();
185 0 : error!(%err, "offloading lfc");
186 0 : self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
187 0 : error: err.to_string(),
188 0 : };
189 0 : }
190 :
191 0 : async fn offload_lfc_impl(&self) -> Result<()> {
192 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
193 0 : info!(%url, "requesting LFC state from postgres");
194 :
195 0 : let mut compressed = Vec::new();
196 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
197 0 : .await
198 0 : .context("connecting to postgres")?
199 0 : .query_one("select neon.get_local_cache_state()", &[])
200 0 : .await
201 0 : .context("querying LFC state")?
202 0 : .try_get::<usize, &[u8]>(0)
203 0 : .context("deserializing LFC state")
204 0 : .map(ZstdEncoder::new)?
205 0 : .read_to_end(&mut compressed)
206 0 : .await
207 0 : .context("compressing LFC state")?;
208 0 : let compressed_len = compressed.len();
209 0 : info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
210 :
211 0 : let request = Client::new().put(url).bearer_auth(token).body(compressed);
212 0 : match request.send().await {
213 0 : Ok(res) if res.status() == StatusCode::OK => Ok(()),
214 0 : Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
215 0 : Err(err) => Err(err).context("writing to endpoint storage"),
216 : }
217 0 : }
218 : }
|