Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::thread;
4 : use std::time::{Duration, SystemTime};
5 :
6 : use anyhow::{Result, bail};
7 : use compute_api::spec::ComputeMode;
8 : use postgres::{NoTls, SimpleQueryMessage};
9 : use tracing::{info, warn};
10 : use utils::id::{TenantId, TimelineId};
11 : use utils::lsn::Lsn;
12 : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
13 :
14 : use crate::compute::ComputeNode;
15 :
16 : /// Spawns a background thread to periodically renew LSN leases for static compute.
17 : /// Do nothing if the compute is not in static mode.
18 0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
19 0 : let (tenant_id, timeline_id, lsn) = {
20 0 : let state = compute.state.lock().unwrap();
21 0 : let spec = state.pspec.as_ref().expect("Spec must be set");
22 0 : match spec.spec.mode {
23 0 : ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
24 0 : _ => return,
25 : }
26 : };
27 0 : let compute = compute.clone();
28 :
29 0 : let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
30 0 : thread::spawn(move || {
31 0 : let _entered = span.entered();
32 0 : if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
33 : // TODO: might need stronger error feedback than logging an warning.
34 0 : warn!("Exited with error: {e}");
35 0 : }
36 0 : });
37 0 : }
38 :
39 : /// Renews lsn lease periodically so static compute are not affected by GC.
40 0 : fn lsn_lease_bg_task(
41 0 : compute: Arc<ComputeNode>,
42 0 : tenant_id: TenantId,
43 0 : timeline_id: TimelineId,
44 0 : lsn: Lsn,
45 0 : ) -> Result<()> {
46 : loop {
47 0 : let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
48 0 : let valid_duration = valid_until
49 0 : .duration_since(SystemTime::now())
50 0 : .unwrap_or(Duration::ZERO);
51 0 :
52 0 : // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
53 0 : let sleep_duration = valid_duration
54 0 : .saturating_sub(Duration::from_secs(60))
55 0 : .max(valid_duration / 2);
56 0 :
57 0 : info!(
58 0 : "Request succeeded, sleeping for {} seconds",
59 0 : sleep_duration.as_secs()
60 : );
61 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
62 : }
63 0 : }
64 :
65 : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
66 : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
67 0 : fn acquire_lsn_lease_with_retry(
68 0 : compute: &Arc<ComputeNode>,
69 0 : tenant_id: TenantId,
70 0 : timeline_id: TimelineId,
71 0 : lsn: Lsn,
72 0 : ) -> Result<SystemTime> {
73 0 : let mut attempts = 0usize;
74 0 : let mut retry_period_ms: f64 = 500.0;
75 : const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
76 :
77 : loop {
78 : // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
79 0 : let configs = {
80 0 : let state = compute.state.lock().unwrap();
81 0 :
82 0 : let spec = state.pspec.as_ref().expect("spec must be set");
83 0 :
84 0 : let conn_strings = spec.pageserver_connstr.split(',');
85 0 :
86 0 : conn_strings
87 0 : .map(|connstr| {
88 0 : let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
89 0 : if let Some(storage_auth_token) = &spec.storage_auth_token {
90 0 : config.password(storage_auth_token.clone());
91 0 : }
92 0 : config
93 0 : })
94 0 : .collect::<Vec<_>>()
95 0 : };
96 0 :
97 0 : let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs);
98 0 : match result {
99 0 : Ok(Some(res)) => {
100 0 : return Ok(res);
101 : }
102 : Ok(None) => {
103 0 : bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
104 : }
105 0 : Err(e) => {
106 0 : warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
107 :
108 0 : compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
109 0 : retry_period_ms as u64,
110 0 : ));
111 0 : retry_period_ms *= 1.5;
112 0 : retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
113 0 : }
114 0 : }
115 0 : attempts += 1;
116 : }
117 0 : }
118 :
119 : /// Tries to acquire an LSN lease through PS page_service API.
120 0 : fn try_acquire_lsn_lease(
121 0 : tenant_id: TenantId,
122 0 : timeline_id: TimelineId,
123 0 : lsn: Lsn,
124 0 : configs: &[postgres::Config],
125 0 : ) -> Result<Option<SystemTime>> {
126 0 : fn get_valid_until(
127 0 : config: &postgres::Config,
128 0 : tenant_shard_id: TenantShardId,
129 0 : timeline_id: TimelineId,
130 0 : lsn: Lsn,
131 0 : ) -> Result<Option<SystemTime>> {
132 0 : let mut client = config.connect(NoTls)?;
133 0 : let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
134 0 : let res = client.simple_query(&cmd)?;
135 0 : let msg = match res.first() {
136 0 : Some(msg) => msg,
137 0 : None => bail!("empty response"),
138 : };
139 0 : let row = match msg {
140 0 : SimpleQueryMessage::Row(row) => row,
141 0 : _ => bail!("error parsing lsn lease response"),
142 : };
143 :
144 : // Note: this will be None if a lease is explicitly not granted.
145 0 : let valid_until_str = row.get("valid_until");
146 0 :
147 0 : let valid_until = valid_until_str.map(|s| {
148 0 : SystemTime::UNIX_EPOCH
149 0 : .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
150 0 : .expect("Time larger than max SystemTime could handle")
151 0 : });
152 0 : Ok(valid_until)
153 0 : }
154 :
155 0 : let shard_count = configs.len();
156 :
157 0 : let valid_until = if shard_count > 1 {
158 0 : configs
159 0 : .iter()
160 0 : .enumerate()
161 0 : .map(|(shard_number, config)| {
162 0 : let tenant_shard_id = TenantShardId {
163 0 : tenant_id,
164 0 : shard_count: ShardCount::new(shard_count as u8),
165 0 : shard_number: ShardNumber(shard_number as u8),
166 0 : };
167 0 : get_valid_until(config, tenant_shard_id, timeline_id, lsn)
168 0 : })
169 0 : .collect::<Result<Vec<Option<SystemTime>>>>()?
170 0 : .into_iter()
171 0 : .min()
172 0 : .unwrap()
173 : } else {
174 0 : get_valid_until(
175 0 : &configs[0],
176 0 : TenantShardId::unsharded(tenant_id),
177 0 : timeline_id,
178 0 : lsn,
179 0 : )?
180 : };
181 :
182 0 : Ok(valid_until)
183 0 : }
|