Line data Source code
1 : use crate::compute::ComputeNode;
2 : use anyhow::{Context, Result, bail};
3 : use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
4 : use compute_api::responses::LfcOffloadState;
5 : use compute_api::responses::LfcPrewarmState;
6 : use http::StatusCode;
7 : use reqwest::Client;
8 : use std::sync::Arc;
9 : use tokio::{io::AsyncReadExt, spawn};
10 : use tracing::{error, info};
11 :
12 : #[derive(serde::Serialize, Default)]
13 : pub struct LfcPrewarmStateWithProgress {
14 : #[serde(flatten)]
15 : base: LfcPrewarmState,
16 : total: i32,
17 : prewarmed: i32,
18 : skipped: i32,
19 : }
20 :
21 : /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
22 : struct EndpointStoragePair {
23 : url: String,
24 : token: String,
25 : }
26 :
27 : const KEY: &str = "lfc_state";
28 : impl EndpointStoragePair {
29 : /// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
30 : /// If not None, takes precedence over pspec.spec.endpoint_id
31 0 : fn from_spec_and_endpoint(
32 0 : pspec: &crate::compute::ParsedSpec,
33 0 : endpoint_id: Option<String>,
34 0 : ) -> Result<Self> {
35 0 : let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
36 0 : let Some(ref endpoint_id) = endpoint_id else {
37 0 : bail!("pspec.endpoint_id missing, other endpoint_id not provided")
38 : };
39 0 : let Some(ref base_uri) = pspec.endpoint_storage_addr else {
40 0 : bail!("pspec.endpoint_storage_addr missing")
41 : };
42 0 : let tenant_id = pspec.tenant_id;
43 0 : let timeline_id = pspec.timeline_id;
44 0 :
45 0 : let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}");
46 0 : let Some(ref token) = pspec.endpoint_storage_token else {
47 0 : bail!("pspec.endpoint_storage_token missing")
48 : };
49 0 : let token = token.clone();
50 0 : Ok(EndpointStoragePair { url, token })
51 0 : }
52 : }
53 :
54 : impl ComputeNode {
55 : // If prewarm failed, we want to get overall number of segments as well as done ones.
56 : // However, this function should be reliable even if querying postgres failed.
57 0 : pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
58 0 : info!("requesting LFC prewarm state from postgres");
59 0 : let mut state = LfcPrewarmStateWithProgress::default();
60 0 : {
61 0 : state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
62 0 : }
63 :
64 0 : let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
65 0 : Ok(client) => client,
66 0 : Err(err) => {
67 0 : error!(%err, "connecting to postgres");
68 0 : return state;
69 : }
70 : };
71 0 : let row = match client
72 0 : .query_one("select * from get_prewarm_info()", &[])
73 0 : .await
74 : {
75 0 : Ok(row) => row,
76 0 : Err(err) => {
77 0 : error!(%err, "querying LFC prewarm status");
78 0 : return state;
79 : }
80 : };
81 0 : state.total = row.try_get(0).unwrap_or_default();
82 0 : state.prewarmed = row.try_get(1).unwrap_or_default();
83 0 : state.skipped = row.try_get(2).unwrap_or_default();
84 0 : state
85 0 : }
86 :
87 0 : pub fn lfc_offload_state(&self) -> LfcOffloadState {
88 0 : self.state.lock().unwrap().lfc_offload_state.clone()
89 0 : }
90 :
91 : /// Returns false if there is a prewarm request ongoing, true otherwise
92 0 : pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
93 0 : crate::metrics::LFC_PREWARM_REQUESTS.inc();
94 0 : {
95 0 : let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
96 0 : if let LfcPrewarmState::Prewarming =
97 0 : std::mem::replace(state, LfcPrewarmState::Prewarming)
98 : {
99 0 : return false;
100 0 : }
101 0 : }
102 0 :
103 0 : let cloned = self.clone();
104 0 : spawn(async move {
105 0 : let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
106 0 : cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
107 0 : return;
108 : };
109 0 : error!(%err);
110 0 : cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
111 0 : error: err.to_string(),
112 0 : };
113 0 : });
114 0 : true
115 0 : }
116 :
117 : /// from_endpoint: None for endpoint managed by this compute_ctl
118 0 : fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
119 0 : let state = self.state.lock().unwrap();
120 0 : EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
121 0 : }
122 :
123 0 : async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
124 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
125 0 : info!(%url, "requesting LFC state from endpoint storage");
126 :
127 0 : let request = Client::new().get(&url).bearer_auth(token);
128 0 : let res = request.send().await.context("querying endpoint storage")?;
129 0 : let status = res.status();
130 0 : if status != StatusCode::OK {
131 0 : bail!("{status} querying endpoint storage")
132 0 : }
133 0 :
134 0 : let mut uncompressed = Vec::new();
135 0 : let lfc_state = res
136 0 : .bytes()
137 0 : .await
138 0 : .context("getting request body from endpoint storage")?;
139 0 : ZstdDecoder::new(lfc_state.iter().as_slice())
140 0 : .read_to_end(&mut uncompressed)
141 0 : .await
142 0 : .context("decoding LFC state")?;
143 0 : let uncompressed_len = uncompressed.len();
144 0 : info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into postgres");
145 :
146 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
147 0 : .await
148 0 : .context("connecting to postgres")?
149 0 : .query_one("select prewarm_local_cache($1)", &[&uncompressed])
150 0 : .await
151 0 : .context("loading LFC state into postgres")
152 0 : .map(|_| ())
153 0 : }
154 :
155 : /// Returns false if there is an offload request ongoing, true otherwise
156 0 : pub fn offload_lfc(self: &Arc<Self>) -> bool {
157 0 : crate::metrics::LFC_OFFLOAD_REQUESTS.inc();
158 0 : {
159 0 : let state = &mut self.state.lock().unwrap().lfc_offload_state;
160 0 : if let LfcOffloadState::Offloading =
161 0 : std::mem::replace(state, LfcOffloadState::Offloading)
162 : {
163 0 : return false;
164 0 : }
165 0 : }
166 0 :
167 0 : let cloned = self.clone();
168 0 : spawn(async move {
169 0 : let Err(err) = cloned.offload_lfc_impl().await else {
170 0 : cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
171 0 : return;
172 : };
173 0 : error!(%err);
174 0 : cloned.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
175 0 : error: err.to_string(),
176 0 : };
177 0 : });
178 0 : true
179 0 : }
180 :
181 0 : async fn offload_lfc_impl(&self) -> Result<()> {
182 0 : let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
183 0 : info!(%url, "requesting LFC state from postgres");
184 :
185 0 : let mut compressed = Vec::new();
186 0 : ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
187 0 : .await
188 0 : .context("connecting to postgres")?
189 0 : .query_one("select get_local_cache_state()", &[])
190 0 : .await
191 0 : .context("querying LFC state")?
192 0 : .try_get::<usize, &[u8]>(0)
193 0 : .context("deserializing LFC state")
194 0 : .map(ZstdEncoder::new)?
195 0 : .read_to_end(&mut compressed)
196 0 : .await
197 0 : .context("compressing LFC state")?;
198 0 : let compressed_len = compressed.len();
199 0 : info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
200 :
201 0 : let request = Client::new().put(url).bearer_auth(token).body(compressed);
202 0 : match request.send().await {
203 0 : Ok(res) if res.status() == StatusCode::OK => Ok(()),
204 0 : Ok(res) => bail!("Error writing to endpoint storage: {}", res.status()),
205 0 : Err(err) => Err(err).context("writing to endpoint storage"),
206 : }
207 0 : }
208 : }
|