Line data Source code
1 : use anyhow::bail;
2 : use anyhow::Result;
3 : use postgres::{NoTls, SimpleQueryMessage};
4 : use std::time::SystemTime;
5 : use std::{str::FromStr, sync::Arc, thread, time::Duration};
6 : use utils::id::TenantId;
7 : use utils::id::TimelineId;
8 :
9 : use compute_api::spec::ComputeMode;
10 : use tracing::{info, warn};
11 : use utils::{
12 : lsn::Lsn,
13 : shard::{ShardCount, ShardNumber, TenantShardId},
14 : };
15 :
16 : use crate::compute::ComputeNode;
17 :
18 : /// Spawns a background thread to periodically renew LSN leases for static compute.
19 : /// Do nothing if the compute is not in static mode.
20 0 : pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc<ComputeNode>) {
21 0 : let (tenant_id, timeline_id, lsn) = {
22 0 : let state = compute.state.lock().unwrap();
23 0 : let spec = state.pspec.as_ref().expect("Spec must be set");
24 0 : match spec.spec.mode {
25 0 : ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn),
26 0 : _ => return,
27 : }
28 : };
29 0 : let compute = compute.clone();
30 :
31 0 : let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn);
32 0 : thread::spawn(move || {
33 0 : let _entered = span.entered();
34 0 : if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) {
35 : // TODO: might need stronger error feedback than logging an warning.
36 0 : warn!("Exited with error: {e}");
37 0 : }
38 0 : });
39 0 : }
40 :
41 : /// Renews lsn lease periodically so static compute are not affected by GC.
42 0 : fn lsn_lease_bg_task(
43 0 : compute: Arc<ComputeNode>,
44 0 : tenant_id: TenantId,
45 0 : timeline_id: TimelineId,
46 0 : lsn: Lsn,
47 0 : ) -> Result<()> {
48 : loop {
49 0 : let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?;
50 0 : let valid_duration = valid_until
51 0 : .duration_since(SystemTime::now())
52 0 : .unwrap_or(Duration::ZERO);
53 0 :
54 0 : // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
55 0 : let sleep_duration = valid_duration
56 0 : .saturating_sub(Duration::from_secs(60))
57 0 : .max(valid_duration / 2);
58 0 :
59 0 : info!(
60 0 : "Succeeded, sleeping for {} seconds",
61 0 : sleep_duration.as_secs()
62 : );
63 0 : thread::sleep(sleep_duration);
64 : }
65 0 : }
66 :
67 : /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted.
68 : /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests.
69 0 : fn acquire_lsn_lease_with_retry(
70 0 : compute: &Arc<ComputeNode>,
71 0 : tenant_id: TenantId,
72 0 : timeline_id: TimelineId,
73 0 : lsn: Lsn,
74 0 : ) -> Result<SystemTime> {
75 0 : let mut attempts = 0usize;
76 0 : let mut retry_period_ms: f64 = 500.0;
77 : const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0;
78 :
79 : loop {
80 : // Note: List of pageservers is dynamic, need to re-read configs before each attempt.
81 0 : let configs = {
82 0 : let state = compute.state.lock().unwrap();
83 0 :
84 0 : let spec = state.pspec.as_ref().expect("spec must be set");
85 0 :
86 0 : let conn_strings = spec.pageserver_connstr.split(',');
87 0 :
88 0 : conn_strings
89 0 : .map(|connstr| {
90 0 : let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
91 0 : if let Some(storage_auth_token) = &spec.storage_auth_token {
92 0 : info!("Got storage auth token from spec file");
93 0 : config.password(storage_auth_token.clone());
94 : } else {
95 0 : info!("Storage auth token not set");
96 : }
97 0 : config
98 0 : })
99 0 : .collect::<Vec<_>>()
100 0 : };
101 0 :
102 0 : let result = try_acquire_lsn_lease(tenant_id, timeline_id, lsn, &configs);
103 0 : match result {
104 0 : Ok(Some(res)) => {
105 0 : return Ok(res);
106 : }
107 : Ok(None) => {
108 0 : bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
109 : }
110 0 : Err(e) => {
111 0 : warn!("Failed to acquire lsn lease: {e} (attempt {attempts}");
112 :
113 0 : thread::sleep(Duration::from_millis(retry_period_ms as u64));
114 0 : retry_period_ms *= 1.5;
115 0 : retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
116 0 : }
117 0 : }
118 0 : attempts += 1;
119 : }
120 0 : }
121 :
122 : /// Tries to acquire an LSN lease through PS page_service API.
123 0 : fn try_acquire_lsn_lease(
124 0 : tenant_id: TenantId,
125 0 : timeline_id: TimelineId,
126 0 : lsn: Lsn,
127 0 : configs: &[postgres::Config],
128 0 : ) -> Result<Option<SystemTime>> {
129 0 : fn get_valid_until(
130 0 : config: &postgres::Config,
131 0 : tenant_shard_id: TenantShardId,
132 0 : timeline_id: TimelineId,
133 0 : lsn: Lsn,
134 0 : ) -> Result<Option<SystemTime>> {
135 0 : let mut client = config.connect(NoTls)?;
136 0 : let cmd = format!("lease lsn {} {} {} ", tenant_shard_id, timeline_id, lsn);
137 0 : let res = client.simple_query(&cmd)?;
138 0 : let msg = match res.first() {
139 0 : Some(msg) => msg,
140 0 : None => bail!("empty response"),
141 : };
142 0 : let row = match msg {
143 0 : SimpleQueryMessage::Row(row) => row,
144 0 : _ => bail!("error parsing lsn lease response"),
145 : };
146 :
147 : // Note: this will be None if a lease is explicitly not granted.
148 0 : let valid_until_str = row.get("valid_until");
149 0 :
150 0 : let valid_until = valid_until_str.map(|s| {
151 0 : SystemTime::UNIX_EPOCH
152 0 : .checked_add(Duration::from_millis(u128::from_str(s).unwrap() as u64))
153 0 : .expect("Time larger than max SystemTime could handle")
154 0 : });
155 0 : Ok(valid_until)
156 0 : }
157 :
158 0 : let shard_count = configs.len();
159 :
160 0 : let valid_until = if shard_count > 1 {
161 0 : configs
162 0 : .iter()
163 0 : .enumerate()
164 0 : .map(|(shard_number, config)| {
165 0 : let tenant_shard_id = TenantShardId {
166 0 : tenant_id,
167 0 : shard_count: ShardCount::new(shard_count as u8),
168 0 : shard_number: ShardNumber(shard_number as u8),
169 0 : };
170 0 : get_valid_until(config, tenant_shard_id, timeline_id, lsn)
171 0 : })
172 0 : .collect::<Result<Vec<Option<SystemTime>>>>()?
173 0 : .into_iter()
174 0 : .min()
175 0 : .unwrap()
176 : } else {
177 0 : get_valid_until(
178 0 : &configs[0],
179 0 : TenantShardId::unsharded(tenant_id),
180 0 : timeline_id,
181 0 : lsn,
182 0 : )?
183 : };
184 :
185 0 : Ok(valid_until)
186 0 : }
|