Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::time::{Duration, SystemTime};
4 :
5 : use anyhow::{Result, bail};
6 : use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
7 : use futures::StreamExt;
8 : use pageserver_page_api as page_api;
9 : use postgres::{NoTls, SimpleQueryMessage};
10 : use tracing::{Instrument, info, warn};
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 : use utils::shard::TenantShardId;
14 :
15 : use crate::compute::ComputeNode;
16 :
17 : /// Spawns a background thread to periodically renew LSN leases for static compute.
18 : /// Do nothing if the compute is not in static mode. MUST run this within a tokio runtime.
19 0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
20 0 : let (tenant_id, timeline_id, lsn) = {
21 0 : let state = compute.state.lock().unwrap();
22 0 : let spec = state.pspec.as_ref().expect("Spec must be set");
23 0 : match spec.spec.mode {
24 0 : ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
25 0 : _ => return,
26 : }
27 : };
28 0 : let compute = compute.clone();
29 :
30 0 : let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
31 0 : tokio::spawn(
32 0 : async move {
33 0 : if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn).await {
34 : // TODO: might need stronger error feedback than logging an warning.
35 0 : warn!("Exited with error: {e}");
36 0 : }
37 0 : }
38 0 : .instrument(span),
39 : );
40 0 : }
41 :
42 : /// Renews lsn lease periodically so static compute are not affected by GC.
43 0 : async fn lsn_lease_bg_task(
44 0 : compute: Arc<ComputeNode>,
45 0 : tenant_id: TenantId,
46 0 : timeline_id: TimelineId,
47 0 : lsn: Lsn,
48 0 : ) -> Result<()> {
49 : loop {
50 0 : let valid_until =
51 0 : acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn).await?;
52 0 : let valid_duration = valid_until
53 0 : .duration_since(SystemTime::now())
54 0 : .unwrap_or(Duration::ZERO);
55 :
56 : // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
57 0 : let sleep_duration = valid_duration
58 0 : .saturating_sub(Duration::from_secs(60))
59 0 : .max(valid_duration / 2);
60 :
61 0 : info!(
62 0 : "Request succeeded, sleeping for {} seconds",
63 0 : sleep_duration.as_secs()
64 : );
65 0 : let compute = compute.clone();
66 0 : tokio::task::spawn_blocking(move || {
67 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
68 0 : })
69 0 : .await?;
70 : }
71 0 : }
72 :
73 : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
74 : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
75 0 : async fn acquire_lsn_lease_with_retry(
76 0 : compute: &Arc<ComputeNode>,
77 0 : tenant_id: TenantId,
78 0 : timeline_id: TimelineId,
79 0 : lsn: Lsn,
80 0 : ) -> Result<SystemTime> {
81 0 : let mut attempts = 0usize;
82 0 : let mut retry_period_ms: f64 = 500.0;
83 : const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
84 :
85 : loop {
86 : // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
87 0 : let (conninfo, auth) = {
88 0 : let state = compute.state.lock().unwrap();
89 0 : let spec = state.pspec.as_ref().expect("spec must be set");
90 0 : (
91 0 : spec.pageserver_conninfo.clone(),
92 0 : spec.storage_auth_token.clone(),
93 0 : )
94 0 : };
95 :
96 0 : let result =
97 0 : try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn).await;
98 0 : match result {
99 0 : Ok(Some(res)) => {
100 0 : return Ok(res);
101 : }
102 : Ok(None) => {
103 0 : bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
104 : }
105 0 : Err(e) => {
106 0 : warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
107 :
108 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
109 0 : retry_period_ms as u64,
110 0 : ));
111 0 : retry_period_ms *= 1.5;
112 0 : retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
113 : }
114 : }
115 0 : attempts += 1;
116 : }
117 0 : }
118 :
119 : /// Tries to acquire LSN leases on all Pageserver shards.
120 0 : async fn try_acquire_lsn_lease(
121 0 : conninfo: PageserverConnectionInfo,
122 0 : auth: Option<&str>,
123 0 : tenant_id: TenantId,
124 0 : timeline_id: TimelineId,
125 0 : lsn: Lsn,
126 0 : ) -> Result<Option<SystemTime>> {
127 : const MAX_CONCURRENT_LEASE_CONNECTIONS: usize = 8;
128 :
129 0 : let mut jobs = Vec::new();
130 0 : for (shard_index, shard) in conninfo.shards.into_iter() {
131 0 : let tenant_shard_id = TenantShardId {
132 0 : tenant_id,
133 0 : shard_number: shard_index.shard_number,
134 0 : shard_count: shard_index.shard_count,
135 0 : };
136 :
137 : // XXX: If there are more than pageserver for the one shard, do we need to get a
138 : // leas on all of them? Currently, that's what we assume, but this is hypothetical
139 : // as of this writing, as we never pass the info for more than one pageserver per
140 : // shard.
141 :
142 0 : for shard in shard.pageservers {
143 0 : let shard = shard.clone();
144 0 : jobs.push(async move {
145 0 : match conninfo.prefer_protocol {
146 : PageserverProtocol::Grpc => {
147 0 : acquire_lsn_lease_grpc(
148 0 : &shard.grpc_url.unwrap(),
149 0 : auth,
150 0 : tenant_shard_id,
151 0 : timeline_id,
152 0 : lsn,
153 0 : )
154 0 : .await
155 : }
156 : PageserverProtocol::Libpq => {
157 0 : acquire_lsn_lease_libpq(
158 0 : &shard.libpq_url.unwrap(),
159 0 : auth,
160 0 : tenant_shard_id,
161 0 : timeline_id,
162 0 : lsn,
163 0 : )
164 0 : .await
165 : }
166 : }
167 0 : });
168 : }
169 : }
170 :
171 0 : let mut stream = futures::stream::iter(jobs).buffer_unordered(MAX_CONCURRENT_LEASE_CONNECTIONS);
172 0 : let mut leases = Vec::new();
173 0 : while let Some(res) = stream.next().await {
174 0 : let lease = res?;
175 0 : leases.push(lease);
176 : }
177 0 : Ok(leases.into_iter().flatten().min())
178 0 : }
179 :
180 : /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
181 : /// postgresql:// scheme.
182 0 : async fn acquire_lsn_lease_libpq(
183 0 : connstring: &str,
184 0 : auth: Option<&str>,
185 0 : tenant_shard_id: TenantShardId,
186 0 : timeline_id: TimelineId,
187 0 : lsn: Lsn,
188 0 : ) -> Result<Option<SystemTime>> {
189 0 : let mut config = tokio_postgres::Config::from_str(connstring)?;
190 0 : if let Some(auth) = auth {
191 0 : config.password(auth);
192 0 : }
193 0 : let (client, connection) = config.connect(NoTls).await?;
194 :
195 0 : tokio::spawn(async move {
196 0 : if let Err(e) = connection.await {
197 0 : tracing::warn!("lease lsn connection error: {}", e);
198 0 : }
199 0 : });
200 :
201 0 : let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
202 0 : let res = client.simple_query(&cmd).await?;
203 0 : let msg = match res.first() {
204 0 : Some(msg) => msg,
205 0 : None => bail!("empty response"),
206 : };
207 0 : let row = match msg {
208 0 : SimpleQueryMessage::Row(row) => row,
209 0 : _ => bail!("error parsing lsn lease response"),
210 : };
211 :
212 : // Note: this will be None if a lease is explicitly not granted.
213 0 : let valid_until_str = row.get("valid_until");
214 :
215 0 : let valid_until = valid_until_str.map(|s| {
216 0 : SystemTime::UNIX_EPOCH
217 0 : .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
218 0 : .expect("Time larger than max SystemTime could handle")
219 0 : });
220 :
221 0 : Ok(valid_until)
222 0 : }
223 :
224 : /// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
225 : /// grpc:// scheme.
226 0 : async fn acquire_lsn_lease_grpc(
227 0 : connstring: &str,
228 0 : auth: Option<&str>,
229 0 : tenant_shard_id: TenantShardId,
230 0 : timeline_id: TimelineId,
231 0 : lsn: Lsn,
232 0 : ) -> Result<Option<SystemTime>> {
233 0 : let mut client = page_api::Client::connect(
234 0 : connstring.to_string(),
235 0 : tenant_shard_id.tenant_id,
236 0 : timeline_id,
237 0 : tenant_shard_id.to_index(),
238 0 : auth.map(String::from),
239 0 : None,
240 0 : )
241 0 : .await?;
242 :
243 0 : let req = page_api::LeaseLsnRequest { lsn };
244 0 : match client.lease_lsn(req).await {
245 0 : Ok(expires) => Ok(Some(expires)),
246 : // Lease couldn't be acquired because the LSN has been garbage collected.
247 0 : Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
248 0 : Err(err) => Err(err.into()),
249 : }
250 0 : }
|