Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::thread;
4 : use std::time::{Duration, SystemTime};
5 :
6 : use anyhow::{Result, bail};
7 : use compute_api::spec::{ComputeMode, PageserverProtocol};
8 : use itertools::Itertools as _;
9 : use pageserver_page_api as page_api;
10 : use postgres::{NoTls, SimpleQueryMessage};
11 : use tracing::{info, warn};
12 : use utils::id::{TenantId, TimelineId};
13 : use utils::lsn::Lsn;
14 : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
15 :
16 : use crate::compute::ComputeNode;
17 :
18 : /// Spawns a background thread to periodically renew LSN leases for static compute.
19 : /// Do nothing if the compute is not in static mode.
20 0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
21 0 : let (tenant_id, timeline_id, lsn) = {
22 0 : let state = compute.state.lock().unwrap();
23 0 : let spec = state.pspec.as_ref().expect("Spec must be set");
24 0 : match spec.spec.mode {
25 0 : ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
26 0 : _ => return,
27 : }
28 : };
29 0 : let compute = compute.clone();
30 :
31 0 : let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
32 0 : thread::spawn(move || {
33 0 : let _entered = span.entered();
34 0 : if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
35 : // TODO: might need stronger error feedback than logging an warning.
36 0 : warn!("Exited with error: {e}");
37 0 : }
38 0 : });
39 0 : }
40 :
41 : /// Renews lsn lease periodically so static compute are not affected by GC.
42 0 : fn lsn_lease_bg_task(
43 0 : compute: Arc<ComputeNode>,
44 0 : tenant_id: TenantId,
45 0 : timeline_id: TimelineId,
46 0 : lsn: Lsn,
47 0 : ) -> Result<()> {
48 : loop {
49 0 : let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
50 0 : let valid_duration = valid_until
51 0 : .duration_since(SystemTime::now())
52 0 : .unwrap_or(Duration::ZERO);
53 :
54 : // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
55 0 : let sleep_duration = valid_duration
56 0 : .saturating_sub(Duration::from_secs(60))
57 0 : .max(valid_duration / 2);
58 :
59 0 : info!(
60 0 : "Request succeeded, sleeping for {} seconds",
61 0 : sleep_duration.as_secs()
62 : );
63 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
64 : }
65 0 : }
66 :
67 : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
68 : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
69 0 : fn acquire_lsn_lease_with_retry(
70 0 : compute: &Arc<ComputeNode>,
71 0 : tenant_id: TenantId,
72 0 : timeline_id: TimelineId,
73 0 : lsn: Lsn,
74 0 : ) -> Result<SystemTime> {
75 0 : let mut attempts = 0usize;
76 0 : let mut retry_period_ms: f64 = 500.0;
77 : const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
78 :
79 : loop {
80 : // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
81 0 : let (connstrings, auth) = {
82 0 : let state = compute.state.lock().unwrap();
83 0 : let spec = state.pspec.as_ref().expect("spec must be set");
84 0 : (
85 0 : spec.pageserver_connstr.clone(),
86 0 : spec.storage_auth_token.clone(),
87 0 : )
88 0 : };
89 :
90 0 : let result =
91 0 : try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn);
92 0 : match result {
93 0 : Ok(Some(res)) => {
94 0 : return Ok(res);
95 : }
96 : Ok(None) => {
97 0 : bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
98 : }
99 0 : Err(e) => {
100 0 : warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
101 :
102 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
103 0 : retry_period_ms as u64,
104 0 : ));
105 0 : retry_period_ms *= 1.5;
106 0 : retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
107 : }
108 : }
109 0 : attempts += 1;
110 : }
111 0 : }
112 :
113 : /// Tries to acquire LSN leases on all Pageserver shards.
114 0 : fn try_acquire_lsn_lease(
115 0 : connstrings: &str,
116 0 : auth: Option<&str>,
117 0 : tenant_id: TenantId,
118 0 : timeline_id: TimelineId,
119 0 : lsn: Lsn,
120 0 : ) -> Result<Option<SystemTime>> {
121 0 : let connstrings = connstrings.split(',').collect_vec();
122 0 : let shard_count = connstrings.len();
123 0 : let mut leases = Vec::new();
124 :
125 0 : for (shard_number, &connstring) in connstrings.iter().enumerate() {
126 0 : let tenant_shard_id = match shard_count {
127 0 : 0 | 1 => TenantShardId::unsharded(tenant_id),
128 0 : shard_count => TenantShardId {
129 0 : tenant_id,
130 0 : shard_number: ShardNumber(shard_number as u8),
131 0 : shard_count: ShardCount::new(shard_count as u8),
132 0 : },
133 : };
134 :
135 0 : let lease = match PageserverProtocol::from_connstring(connstring)? {
136 : PageserverProtocol::Libpq => {
137 0 : acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)?
138 : }
139 : PageserverProtocol::Grpc => {
140 0 : acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)?
141 : }
142 : };
143 0 : leases.push(lease);
144 : }
145 :
146 0 : Ok(leases.into_iter().min().flatten())
147 0 : }
148 :
149 : /// Acquires an LSN lease on a single shard, using the libpq API. The connstring must use a
150 : /// postgresql:// scheme.
151 0 : fn acquire_lsn_lease_libpq(
152 0 : connstring: &str,
153 0 : auth: Option<&str>,
154 0 : tenant_shard_id: TenantShardId,
155 0 : timeline_id: TimelineId,
156 0 : lsn: Lsn,
157 0 : ) -> Result<Option<SystemTime>> {
158 0 : let mut config = postgres::Config::from_str(connstring)?;
159 0 : if let Some(auth) = auth {
160 0 : config.password(auth);
161 0 : }
162 0 : let mut client = config.connect(NoTls)?;
163 0 : let cmd = format!("lease lsn {tenant_shard_id} {timeline_id} {lsn} ");
164 0 : let res = client.simple_query(&cmd)?;
165 0 : let msg = match res.first() {
166 0 : Some(msg) => msg,
167 0 : None => bail!("empty response"),
168 : };
169 0 : let row = match msg {
170 0 : SimpleQueryMessage::Row(row) => row,
171 0 : _ => bail!("error parsing lsn lease response"),
172 : };
173 :
174 : // Note: this will be None if a lease is explicitly not granted.
175 0 : let valid_until_str = row.get("valid_until");
176 :
177 0 : let valid_until = valid_until_str.map(|s| {
178 0 : SystemTime::UNIX_EPOCH
179 0 : .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
180 0 : .expect("Time larger than max SystemTime could handle")
181 0 : });
182 0 : Ok(valid_until)
183 0 : }
184 :
185 : /// Acquires an LSN lease on a single shard, using the gRPC API. The connstring must use a
186 : /// grpc:// scheme.
187 0 : fn acquire_lsn_lease_grpc(
188 0 : connstring: &str,
189 0 : auth: Option<&str>,
190 0 : tenant_shard_id: TenantShardId,
191 0 : timeline_id: TimelineId,
192 0 : lsn: Lsn,
193 0 : ) -> Result<Option<SystemTime>> {
194 0 : tokio::runtime::Handle::current().block_on(async move {
195 0 : let mut client = page_api::Client::connect(
196 0 : connstring.to_string(),
197 0 : tenant_shard_id.tenant_id,
198 0 : timeline_id,
199 0 : tenant_shard_id.to_index(),
200 0 : auth.map(String::from),
201 0 : None,
202 0 : )
203 0 : .await?;
204 :
205 0 : let req = page_api::LeaseLsnRequest { lsn };
206 0 : match client.lease_lsn(req).await {
207 0 : Ok(expires) => Ok(Some(expires)),
208 : // Lease couldn't be acquired because the LSN has been garbage collected.
209 0 : Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
210 0 : Err(err) => Err(err.into()),
211 : }
212 0 : })
213 0 : }
|