Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::thread;
4 : use std::time::{Duration, SystemTime};
5 :
6 : use anyhow::{Result, bail};
7 : use compute_api::spec::{ComputeMode, PageserverConnectionInfo, PageserverProtocol};
8 : use pageserver_page_api as page_api;
9 : use postgres::{NoTls, SimpleQueryMessage};
10 : use tracing::{info, warn};
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 : use utils::shard::TenantShardId;
14 :
15 : use crate::compute::ComputeNode;
16 :
17 : /// Spawns a background thread to periodically renew LSN leases for static compute.
18 : /// Do nothing if the compute is not in static mode.
19 0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
20 0 : let (tenant_id, timeline_id, lsn) = {
21 0 : let state = compute.state.lock().unwrap();
22 0 : let spec = state.pspec.as_ref().expect("Spec must be set");
23 0 : match spec.spec.mode {
24 0 : ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
25 0 : _ => return,
26 : }
27 : };
28 0 : let compute = compute.clone();
29 :
30 0 : let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
31 0 : thread::spawn(move || {
32 0 : let _entered = span.entered();
33 0 : if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
34 : // TODO: might need stronger error feedback than logging an warning.
35 0 : warn!("Exited with error: {e}");
36 0 : }
37 0 : });
38 0 : }
39 :
40 : /// Renews lsn lease periodically so static compute are not affected by GC.
41 0 : fn lsn_lease_bg_task(
42 0 : compute: Arc<ComputeNode>,
43 0 : tenant_id: TenantId,
44 0 : timeline_id: TimelineId,
45 0 : lsn: Lsn,
46 0 : ) -> Result<()> {
47 : loop {
48 0 : let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
49 0 : let valid_duration = valid_until
50 0 : .duration_since(SystemTime::now())
51 0 : .unwrap_or(Duration::ZERO);
52 :
53 : // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
54 0 : let sleep_duration = valid_duration
55 0 : .saturating_sub(Duration::from_secs(60))
56 0 : .max(valid_duration / 2);
57 :
58 0 : info!(
59 0 : "Request succeeded, sleeping for {} seconds",
60 0 : sleep_duration.as_secs()
61 : );
62 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
63 : }
64 0 : }
65 :
66 : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
67 : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
68 0 : fn acquire_lsn_lease_with_retry(
69 0 : compute: &Arc<ComputeNode>,
70 0 : tenant_id: TenantId,
71 0 : timeline_id: TimelineId,
72 0 : lsn: Lsn,
73 0 : ) -> Result<SystemTime> {
74 0 : let mut attempts = 0usize;
75 0 : let mut retry_period_ms: f64 = 500.0;
76 : const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
77 :
78 : loop {
79 : // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
80 0 : let (conninfo, auth) = {
81 0 : let state = compute.state.lock().unwrap();
82 0 : let spec = state.pspec.as_ref().expect("spec must be set");
83 0 : (
84 0 : spec.pageserver_conninfo.clone(),
85 0 : spec.storage_auth_token.clone(),
86 0 : )
87 0 : };
88 :
89 0 : let result = try_acquire_lsn_lease(conninfo, auth.as_deref(), tenant_id, timeline_id, lsn);
90 0 : match result {
91 0 : Ok(Some(res)) => {
92 0 : return Ok(res);
93 : }
94 : Ok(None) => {
95 0 : bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
96 : }
97 0 : Err(e) => {
98 0 : warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
99 :
100 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
101 0 : retry_period_ms as u64,
102 0 : ));
103 0 : retry_period_ms *= 1.5;
104 0 : retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
105 : }
106 : }
107 0 : attempts += 1;
108 : }
109 0 : }
110 :
111 : /// Tries to acquire LSN leases on all Pageserver shards.
112 0 : fn try_acquire_lsn_lease(
113 0 : conninfo: PageserverConnectionInfo,
114 0 : auth: Option<&str>,
115 0 : tenant_id: TenantId,
116 0 : timeline_id: TimelineId,
117 0 : lsn: Lsn,
118 0 : ) -> Result<Option<SystemTime>> {
119 0 : let mut leases = Vec::new();
120 :
121 0 : for (shard_index, shard) in conninfo.shards.into_iter() {
122 0 : let tenant_shard_id = TenantShardId {
123 0 : tenant_id,
124 0 : shard_number: shard_index.shard_number,
125 0 : shard_count: shard_index.shard_count,
126 0 : };
127 :
128 : // XXX: If there are more than pageserver for the one shard, do we need to get a
129 : // leas on all of them? Currently, that's what we assume, but this is hypothetical
130 : // as of this writing, as we never pass the info for more than one pageserver per
131 : // shard.
132 0 : for pageserver in shard.pageservers {
133 0 : let lease = match conninfo.prefer_protocol {
134 0 : PageserverProtocol::Grpc => acquire_lsn_lease_grpc(
135 0 : &pageserver.grpc_url.unwrap(),
136 0 : auth,
137 0 : tenant_shard_id,
138 0 : timeline_id,
139 0 : lsn,
140 0 : )?,
141 0 : PageserverProtocol::Libpq => acquire_lsn_lease_libpq(
142 0 : &pageserver.libpq_url.unwrap(),
143 0 : auth,
144 0 : tenant_shard_id,
145 0 : timeline_id,
146 0 : lsn,
147 0 : )?,
148 : };
149 0 : leases.push(lease);
150 : }
151 : }
152 :
153 0 : Ok(leases.into_iter().min().flatten())
154 0 : }
155 :
156 : /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
157 : /// postgresql:// scheme.
158 0 : fn acquire_lsn_lease_libpq(
159 0 : connstring: &str,
160 0 : auth: Option<&str>,
161 0 : tenant_shard_id: TenantShardId,
162 0 : timeline_id: TimelineId,
163 0 : lsn: Lsn,
164 0 : ) -> Result<Option<SystemTime>> {
165 0 : let mut config = postgres::Config::from_str(connstring)?;
166 0 : if let Some(auth) = auth {
167 0 : config.password(auth);
168 0 : }
169 0 : let mut client = config.connect(NoTls)?;
170 0 : let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
171 0 : let res = client.simple_query(&cmd)?;
172 0 : let msg = match res.first() {
173 0 : Some(msg) => msg,
174 0 : None => bail!("empty response"),
175 : };
176 0 : let row = match msg {
177 0 : SimpleQueryMessage::Row(row) => row,
178 0 : _ => bail!("error parsing lsn lease response"),
179 : };
180 :
181 : // Note: this will be None if a lease is explicitly not granted.
182 0 : let valid_until_str = row.get("valid_until");
183 :
184 0 : let valid_until = valid_until_str.map(|s| {
185 0 : SystemTime::UNIX_EPOCH
186 0 : .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
187 0 : .expect("Time larger than max SystemTime could handle")
188 0 : });
189 0 : Ok(valid_until)
190 0 : }
191 :
192 : /// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
193 : /// grpc:// scheme.
194 0 : fn acquire_lsn_lease_grpc(
195 0 : connstring: &str,
196 0 : auth: Option<&str>,
197 0 : tenant_shard_id: TenantShardId,
198 0 : timeline_id: TimelineId,
199 0 : lsn: Lsn,
200 0 : ) -> Result<Option<SystemTime>> {
201 0 : tokio::runtime::Handle::current().block_on(async move {
202 0 : let mut client = page_api::Client::connect(
203 0 : connstring.to_string(),
204 0 : tenant_shard_id.tenant_id,
205 0 : timeline_id,
206 0 : tenant_shard_id.to_index(),
207 0 : auth.map(String::from),
208 0 : None,
209 0 : )
210 0 : .await?;
211 :
212 0 : let req = page_api::LeaseLsnRequest { lsn };
213 0 : match client.lease_lsn(req).await {
214 0 : Ok(expires) => Ok(Some(expires)),
215 : // Lease couldn't be acquired because the LSN has been garbage collected.
216 0 : Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
217 0 : Err(err) => Err(err.into()),
218 : }
219 0 : })
220 0 : }
|